-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
157 lines (128 loc) · 6.14 KB
/
app.py
File metadata and controls
157 lines (128 loc) · 6.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import sys
sys.dont_write_bytecode = True
from api import cachet, components, componentsgroup
from services import geturls, checkstatus
import json
import asyncio
from datetime import datetime
baseurl = "https://status.sillydev.co.uk"
apitoken = "" # Get from Cachet dashboard -> Settings -> Manage API Keys
urls = {} # Do NOT touch this line
def log(level: str, message: str):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")
async def start_script():
starttime = datetime.now()
log("INFO", "Starting Cachet Auto Status...")
# Run all API calls concurrently for faster startup
log("INFO", "Fetching API information...")
apitest, version, systemstatus, componentcount, compontentgroupcount = await asyncio.gather(
cachet.test_ping_api(baseurl, apitoken),
cachet.get_version_info(baseurl, apitoken),
cachet.get_system_status(baseurl, apitoken),
components.list_components(baseurl, apitoken),
componentsgroup.list_components_groups(baseurl, apitoken)
)
log("INFO", "Made by Gamer3514 for Silly Developers")
log("INFO", "Web: https://sillydev.co.uk | Discord: https://discord.gg/mUpVm596As")
log("INFO", f"API Version: {json.loads(version.content)['data']['version']}")
log("INFO", f"System Status: {json.loads(systemstatus.content)['data']['message']} (HTTP {apitest.status_code})")
log("INFO", f"Components: {len(json.loads(componentcount.content)['data'])} | Groups: {len(json.loads(compontentgroupcount.content)['data'])}")
duration = datetime.now() - starttime
mins, secs = divmod(duration.total_seconds(), 60)
log("SUCCESS", f"Startup complete in {int(mins)}m {secs:.2f}s")
async def check_config_updates():
"""Every 60s compare local url db with API config"""
global urls
while True:
log("INFO", "Checking for configuration updates...")
try:
new_urls = await geturls.get_urls(baseurl, apitoken)
log("INFO", f"Fetched {len(new_urls)} URLs from API")
if new_urls != urls:
added = set(new_urls.keys()) - set(urls.keys())
removed = set(urls.keys()) - set(new_urls.keys())
if added:
log("INFO", f"New URLs added: {', '.join(added)}")
if removed:
log("WARNING", f"URLs removed: {', '.join(removed)}")
log("SUCCESS", f"Configuration updated: {len(urls)} → {len(new_urls)} URLs")
urls = new_urls
else:
log("INFO", f"No configuration changes ({len(urls)} URLs)")
except Exception as e:
log("ERROR", f"Failed to check config: {str(e)}")
await asyncio.sleep(60)
# Cachet status names for logging
CACHET_STATUS_NAMES = {
1: "Operational",
2: "Performance Issues",
3: "Partial Outage",
4: "Major Outage"
}
async def monitor_status():
"""Every 120s check status of all URLs"""
while True:
log("INFO", "Starting status check cycle...")
if not urls:
log("WARNING", "No URLs configured yet, waiting for config update...")
await asyncio.sleep(10) # Check more frequently when no URLs
continue
log("INFO", f"Checking {len(urls)} URLs...")
# Check all URLs concurrently for faster monitoring
tasks = [checkstatus.check_status(url, config['expected_status']) for url, config in urls.items()]
results = await asyncio.gather(*tasks)
# Process results and update storage
up_count = 0
down_count = 0
api_updates = []
for (url, config), status_data in zip(urls.items(), results):
is_up = status_data['is_up']
status_code = status_data.get('status_code')
response_time = status_data.get('response_time')
error = status_data.get('error')
if is_up:
up_count += 1
log("SUCCESS", f"[{config['component_id']}] {config['component_name']}: UP (HTTP {status_code}, {response_time:.3f}s)")
# Update Cachet to operational
new_status = 1
else:
down_count += 1
error_msg = f", Error: {error}" if error else ""
status_msg = f"HTTP {status_code}" if status_code else "No response"
log("ERROR", f"[{config['component_id']}] {config['component_name']}: DOWN ({status_msg}{error_msg})")
# Update Cachet to major outage
new_status = 4
# Queue Cachet API update
api_updates.append({
'component_id': config['component_id'],
'component_name': config['component_name'],
'new_status': new_status
})
log("INFO", f"Status check complete: {up_count} UP, {down_count} DOWN")
# Send Cachet API updates
if api_updates:
log("INFO", f"Sending {len(api_updates)} Cachet status updates...")
for update in api_updates:
try:
new_status_name = CACHET_STATUS_NAMES.get(update['new_status'], 'Unknown')
await components.update_component_status(
baseurl, apitoken,
update['component_id'],
update['new_status']
)
log("INFO", f"[{update['component_id']}] {update['component_name']}: Cachet → {new_status_name}")
except Exception as e:
log("ERROR", f"[{update['component_id']}] Failed to update Cachet: {str(e)}")
await asyncio.sleep(120)
async def main():
"""Main async entry point"""
await start_script()
log("INFO", "Starting monitoring loops (config: 60s, status: 120s)")
# Run both loops concurrently
await asyncio.gather(
check_config_updates(),
monitor_status()
)
if __name__ == "__main__":
asyncio.run(main())