Skip to content

Commit af05037

Browse files
committed
🚸 Add DIVA reporter health status to validation UI
1 parent b0177e6 commit af05037

11 files changed

Lines changed: 677 additions & 4 deletions

File tree

moderate_api/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ def url_report(self, validator: str | None = None) -> str:
100100
return f"{base_url}?{urlencode({'validator': validator})}"
101101
return base_url
102102

103+
def url_report_status(self) -> str:
104+
"""URL for fetching reporter health status."""
105+
return self.build_reporter_url("report", "status")
106+
103107

104108
class Settings(BaseSettings):
105109
class Config:

moderate_api/diva.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,35 @@ def from_entries(
101101
)
102102

103103

104+
class ReporterState(str, Enum):
105+
"""State of the DIVA Quality Reporter service."""
106+
107+
STARTING = "starting"
108+
HEALTHY = "healthy"
109+
CATCHING_UP = "catching_up"
110+
STALE = "stale"
111+
ERROR = "error"
112+
113+
114+
class ReporterStatus(BaseModel):
115+
"""Health status of the DIVA Quality Reporter."""
116+
117+
state: ReporterState
118+
kafka_connected: bool
119+
consumer_group: str | None = None
120+
last_poll_at: float | None = None
121+
last_message_at: float | None = None
122+
last_flush_at: float | None = None
123+
seconds_since_last_flush: float | None = None
124+
messages_processed_total: int = 0
125+
current_batch_size: int = 0
126+
validation_pending_messages: int | None = None
127+
validation_pending_by_partition: dict[str, int] = Field(default_factory=dict)
128+
reconnect_count: int = 0
129+
last_error: str | None = None
130+
is_mock: bool = False
131+
132+
104133
class DivaClient:
105134
"""Client for DIVA Kafka REST Gateway and Quality Reporter APIs.
106135
@@ -377,3 +406,34 @@ async def get_validation_results(
377406
processed_rows=processed_rows,
378407
last_requested_at=start_time,
379408
)
409+
410+
async def get_reporter_status(self) -> ReporterStatus:
411+
"""Fetch health status from DIVA Quality Reporter.
412+
413+
Returns:
414+
ReporterStatus with current health state. Returns ERROR state
415+
on any failure instead of raising.
416+
"""
417+
url = self.settings.url_report_status()
418+
419+
_logger.debug("Fetching reporter status from DIVA: url=%s", url)
420+
421+
try:
422+
async with httpx.AsyncClient(
423+
timeout=self.settings.request_timeout
424+
) as client:
425+
response = await client.get(url, auth=self._auth)
426+
response.raise_for_status()
427+
data = response.json()
428+
return ReporterStatus(**data)
429+
except (
430+
httpx.HTTPStatusError,
431+
httpx.RequestError,
432+
Exception,
433+
) as e:
434+
_logger.error("Failed to fetch reporter status: %s", str(e))
435+
return ReporterStatus(
436+
state=ReporterState.ERROR,
437+
kafka_connected=False,
438+
last_error=str(e),
439+
)

moderate_api/diva_mock.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
from moderate_api.config import DivaSettings
1414
from moderate_api.diva import (
1515
DivaClient,
16+
ReporterState,
17+
ReporterStatus,
1618
ValidationEntry,
1719
ValidationResult,
1820
ValidationStatus,
@@ -239,6 +241,19 @@ def _generate_mock_entries(
239241

240242
return entries
241243

244+
async def get_reporter_status(self) -> ReporterStatus:
245+
"""Return a healthy mock reporter status.
246+
247+
Returns:
248+
ReporterStatus indicating healthy mock state
249+
"""
250+
return ReporterStatus(
251+
state=ReporterState.HEALTHY,
252+
kafka_connected=True,
253+
consumer_group="mock-consumer-group",
254+
is_mock=True,
255+
)
256+
242257
@classmethod
243258
def reset_mock_state(cls) -> None:
244259
"""Reset all mock validation states.

moderate_api/entities/asset/router.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@
2626
from moderate_api.authz.user import OptionalUserDep, User, UserDep
2727
from moderate_api.config import SettingsDep
2828
from moderate_api.db import AsyncSessionDep
29-
from moderate_api.diva import ValidationResult, ValidationStatus
29+
from moderate_api.diva import (
30+
ReporterStatus,
31+
ValidationResult,
32+
ValidationStatus,
33+
)
3034
from moderate_api.diva_deps import DivaClientDep
3135
from moderate_api.entities.asset import trust_routes
3236
from moderate_api.entities.asset.models import (
@@ -996,6 +1000,24 @@ async def get_supported_extensions(
9961000
return settings.diva.supported_extensions
9971001

9981002

1003+
@router.get(
1004+
"/validation/reporter-status",
1005+
response_model=ReporterStatus,
1006+
tags=[_TAG],
1007+
summary="Get Quality Reporter health status",
1008+
)
1009+
async def get_reporter_status(
1010+
user: UserDep,
1011+
diva: DivaClientDep,
1012+
) -> ReporterStatus:
1013+
"""Fetch the current health status of the DIVA Quality Reporter.
1014+
1015+
Returns telemetry including state machine status, Kafka connectivity,
1016+
processing lag, and pending messages.
1017+
"""
1018+
return await diva.get_reporter_status()
1019+
1020+
9991021
@router.post(
10001022
"/{id}/object/{object_id}/validate",
10011023
response_model=StartValidationResponse,

moderate_ui/public/locales/en/common.json

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,5 +129,28 @@
129129
"includeMine": "Toggle this to view private datasets you have uploaded, in addition to public ones.",
130130
"fileFormat": "Filter results to show only specific file types (e.g., CSV, JSON)."
131131
}
132+
},
133+
"reporter": {
134+
"healthy": "Validation ready",
135+
"healthyTooltip": "Data Quality service is available and validation can run now.",
136+
"starting": "Validation starting",
137+
"startingTooltip": "Data Quality service is starting up. Validation requests may take a moment.",
138+
"catchingUp": "Validation delayed",
139+
"catchingUpTooltip": "Data Quality service is catching up on queued work. New validation requests may take longer.",
140+
"stale": "Validation delayed",
141+
"staleTitle": "Validation delayed",
142+
"staleTooltip": "Data Quality service has not processed recent work. Validation results may be delayed.",
143+
"staleMessage": "The Data Quality service has not processed recent work. Validation results may be delayed.",
144+
"error": "Validation unavailable",
145+
"errorTitle": "Validation unavailable",
146+
"errorTooltip": "Data Quality service is currently unavailable. Starting a new validation may fail.",
147+
"errorMessage": "The Data Quality service is currently unavailable. Starting a new validation may fail.",
148+
"kafkaConnected": "Kafka Connected",
149+
"kafkaDisconnected": "Kafka Disconnected"
150+
},
151+
"validation": {
152+
"serviceStatusLabel": "Service status",
153+
"serviceUnavailableTitle": "Validation temporarily unavailable",
154+
"serviceUnavailableMessage": "Validation is temporarily unavailable because the Data Quality service is offline."
132155
}
133156
}

moderate_ui/src/api/validation.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,36 @@ export interface AssetObjectRowCountResponse {
6161
estimated: boolean;
6262
}
6363

64+
/**
65+
* State of the DIVA Quality Reporter service.
66+
*/
67+
export type ReporterState =
68+
| "starting"
69+
| "healthy"
70+
| "catching_up"
71+
| "stale"
72+
| "error";
73+
74+
/**
75+
* Health status of the DIVA Quality Reporter.
76+
*/
77+
export interface ReporterStatus {
78+
state: ReporterState;
79+
kafka_connected: boolean;
80+
consumer_group?: string;
81+
last_poll_at?: number;
82+
last_message_at?: number;
83+
last_flush_at?: number;
84+
seconds_since_last_flush?: number;
85+
messages_processed_total: number;
86+
current_batch_size: number;
87+
validation_pending_messages?: number;
88+
validation_pending_by_partition: Record<string, number>;
89+
reconnect_count: number;
90+
last_error?: string;
91+
is_mock?: boolean;
92+
}
93+
6494
/**
6595
* Add computed stats to validation entries.
6696
*/
@@ -157,3 +187,12 @@ export async function getAssetObjectRowCount({
157187
const response = await axios.get(url);
158188
return response.data;
159189
}
190+
191+
/**
192+
* Get the current health status of the DIVA Quality Reporter.
193+
*/
194+
export async function getReporterStatus(): Promise<ReporterStatus> {
195+
const url = buildApiUrl("asset", "validation", "reporter-status");
196+
const response = await axios.get(url);
197+
return response.data;
198+
}

0 commit comments

Comments
 (0)