diff --git a/docs/ogcapi.md b/docs/ogcapi.md index ade93ae..be13f8e 100644 --- a/docs/ogcapi.md +++ b/docs/ogcapi.md @@ -206,7 +206,10 @@ OGC API - Processes exposes server-side processing tasks. Each process defines t | Zonal statistics | `zonal-statistics` | Compute zonal stats from GeoJSON features and a raster source | | ERA5-Land | `era5-land-download` | Download ERA5-Land hourly climate data (temperature, precipitation, etc.) | | CHIRPS3 | `chirps3-download` | Download CHIRPS3 daily precipitation data | -| CHIRPS3 -> DHIS2 pipeline | `chirps3-dhis2-pipeline` | Fetch features, download CHIRPS3, aggregate by feature, generate DHIS2 dataValueSet (optional auto-import) | +| Feature fetch | `feature-fetch` | Normalize features from inline GeoJSON or DHIS2 selectors | +| Data aggregate | `data-aggregate` | Aggregate downloaded raster data over workflow features | +| DHIS2 dataValue build | `dhis2-datavalue-build` | Build DHIS2 `dataValueSet` and table output from aggregated rows | +| CHIRPS3 -> DHIS2 workflow | `chirps3-dhis2-workflow` | Process-first assembly: feature-fetch -> chirps3-download -> data-aggregate -> dhis2-datavalue-build | ### Endpoints @@ -231,7 +234,7 @@ OGC API - Processes exposes server-side processing tasks. Each process defines t | `bbox` | array[number] | yes | Bounding box `[west, south, east, north]` | | `dry_run` | boolean | no | If true (default), return data without pushing to DHIS2 | -Note: `chirps3-dhis2-pipeline` has its own contract (`start_date`, `end_date`, feature-source selectors, and output options). +Note: `chirps3-dhis2-workflow` uses the `start_date` / `end_date` contract. ### ERA5-Land (`era5-land-download`) @@ -265,9 +268,10 @@ Downloads CHIRPS3 daily precipitation data. Additional inputs: -| Input | Type | Required | Default | Description | -| ------- | ------ | -------- | --------- | -------------------------------------- | -| `stage` | string | no | `"final"` | Product stage: `"final"` or `"prelim"` | +| Input | Type | Required | Default | Description | +| -------- | ------ | -------- | --------- | -------------------------------------------------------- | +| `stage` | string | no | `"final"` | Product stage: `"final"` or `"prelim"` | +| `flavor` | string | no | `"rnl"` | Product flavor: `"rnl"` or `"sat"` (`prelim` -> `sat`) | Example request: @@ -280,6 +284,7 @@ curl -X POST http://localhost:8000/ogcapi/processes/chirps3-download/execution \ "end": "2024-03", "bbox": [32.0, -2.0, 35.0, 1.0], "stage": "final", + "flavor": "rnl", "dry_run": true } }' @@ -368,19 +373,19 @@ All processes return a JSON object with: } ``` -### CHIRPS3 to DHIS2 pipeline (`chirps3-dhis2-pipeline`) +### CHIRPS3 to DHIS2 workflow (`chirps3-dhis2-workflow`) -Runs four steps in one execution: +Runs process-first orchestration in one execution: -1. Get features (from DHIS2 or provided GeoJSON) -2. Fetch CHIRPS3 data for union bbox -3. Process dataset (spatial + temporal aggregation) -4. Generate DHIS2 `dataValueSet` payload (optional auto-import) +1. `feature-fetch` +2. `chirps3-download` +3. `data-aggregate` +4. `dhis2-datavalue-build` Example request using DHIS2 org units as source features: ```bash -curl -X POST http://localhost:8000/ogcapi/processes/chirps3-dhis2-pipeline/execution \ +curl -X POST http://localhost:8000/ogcapi/processes/chirps3-dhis2-workflow/execution \ -H "Content-Type: application/json" \ -d '{ "inputs": { @@ -392,6 +397,7 @@ curl -X POST http://localhost:8000/ogcapi/processes/chirps3-dhis2-pipeline/execu "temporal_reducer": "sum", "spatial_reducer": "mean", "stage": "final", + "flavor": "rnl", "dry_run": true, "auto_import": false } @@ -402,7 +408,8 @@ The response includes: - `files`: downloaded CHIRPS3 monthly files - `dataValueSet`: DHIS2-compatible payload (`dataValues` array) -- `importResponse`: populated only when `auto_import=true` and `dry_run=false` +- `dataValueTable`: table-friendly rows/columns +- `workflowTrace`: per-step status and duration Notes: @@ -410,6 +417,7 @@ Notes: - `org_unit_level` alone runs across the full level by default. - `category_option_combo` and `attribute_option_combo` are optional. If omitted, they are not sent in `dataValues`, allowing DHIS2 defaults where supported. - `temporal_resolution` supports `daily`, `weekly`, and `monthly`. +- `flavor` supports `rnl` and `sat`. If `stage` is `prelim`, `flavor` must be `sat`. - DHIS2 timeout/retry behavior is configured globally via adapter env vars (`DHIS2_HTTP_TIMEOUT_SECONDS`, `DHIS2_HTTP_RETRIES`). ## Async execution and job management diff --git a/pygeoapi-config.yml b/pygeoapi-config.yml index ede57d7..a73156c 100644 --- a/pygeoapi-config.yml +++ b/pygeoapi-config.yml @@ -370,7 +370,22 @@ resources: processor: name: eo_api.routers.ogcapi.plugins.processes.chirps3.CHIRPS3Processor - chirps3-dhis2-pipeline: + chirps3-dhis2-workflow: type: process processor: - name: eo_api.routers.ogcapi.plugins.processes.chirps3_dhis2_pipeline.CHIRPS3DHIS2PipelineProcessor + name: eo_api.routers.ogcapi.plugins.processes.chirps3_workflow.Chirps3WorkflowProcessor + + feature-fetch: + type: process + processor: + name: eo_api.routers.ogcapi.plugins.processes.feature_fetch.FeatureFetchProcessor + + data-aggregate: + type: process + processor: + name: eo_api.routers.ogcapi.plugins.processes.data_aggregate.DataAggregateProcessor + + dhis2-datavalue-build: + type: process + processor: + name: eo_api.routers.ogcapi.plugins.processes.datavalue_build.DataValueBuildProcessor diff --git a/src/eo_api/routers/ogcapi/plugins/processes/chirps3.py b/src/eo_api/routers/ogcapi/plugins/processes/chirps3.py index f31897f..68fc238 100644 --- a/src/eo_api/routers/ogcapi/plugins/processes/chirps3.py +++ b/src/eo_api/routers/ogcapi/plugins/processes/chirps3.py @@ -10,11 +10,13 @@ from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError from eo_api.routers.ogcapi.plugins.processes.schemas import CHIRPS3Input, ProcessOutput +from eo_api.utils.cache import bbox_token, monthly_periods, read_manifest, write_manifest DOWNLOAD_DIR = os.getenv("DOWNLOAD_DIR", "/tmp/data") LOGGER = logging.getLogger(__name__) + PROCESS_METADATA = { "version": "0.1.0", "id": "chirps3", @@ -51,6 +53,13 @@ "minOccurs": 0, "maxOccurs": 1, }, + "flavor": { + "title": "Product flavor", + "description": "CHIRPS3 flavor: 'rnl' or 'sat' (prelim requires 'sat')", + "schema": {"type": "string", "enum": ["rnl", "sat"], "default": "rnl"}, + "minOccurs": 0, + "maxOccurs": 1, + }, "dry_run": { "title": "Dry run", "description": "If true, return data without pushing to DHIS2", @@ -81,32 +90,73 @@ def execute(self, data: dict[str, Any], outputs: Any = None) -> tuple[str, dict[ raise ProcessorExecuteError(str(e)) from e LOGGER.info( - "CHIRPS3 download: start=%s end=%s bbox=%s stage=%s", + "CHIRPS3 download: start=%s end=%s bbox=%s stage=%s flavor=%s", inputs.start, inputs.end, inputs.bbox, inputs.stage, + inputs.flavor, ) - download_dir = Path(DOWNLOAD_DIR) + root_dir = Path(DOWNLOAD_DIR) / "chirps3_cache" + scope_key = f"{inputs.stage}_{inputs.flavor}_{bbox_token(inputs.bbox)}" + download_dir = root_dir / scope_key download_dir.mkdir(parents=True, exist_ok=True) + prefix = f"chirps3_{scope_key}" + manifest_path = download_dir / "manifest.json" + requested_months = monthly_periods(inputs.start, inputs.end) + expected_files = [download_dir / f"{prefix}_{month}.nc" for month in requested_months] + existing_before = {str(path) for path in expected_files if path.exists()} + + files = [ + str(path) + for path in chirps3_daily.download( + start=inputs.start, + end=inputs.end, + bbox=(inputs.bbox[0], inputs.bbox[1], inputs.bbox[2], inputs.bbox[3]), + dirname=str(download_dir), + prefix=prefix, + stage=inputs.stage, + flavor=inputs.flavor, + ) + ] + downloaded_now = [path for path in files if path not in existing_before] + cache_hit = len(downloaded_now) == 0 - files = chirps3_daily.download( - start=inputs.start, - end=inputs.end, - bbox=(inputs.bbox[0], inputs.bbox[1], inputs.bbox[2], inputs.bbox[3]), - dirname=str(download_dir), - prefix="chirps3", - stage=inputs.stage, + manifest = read_manifest(manifest_path) or {} + manifest.update( + { + "dataset": "chirps3", + "scope_key": scope_key, + "bbox": [float(v) for v in inputs.bbox], + "stage": inputs.stage, + "flavor": inputs.flavor, + "last_start": inputs.start, + "last_end": inputs.end, + } + ) + write_manifest(manifest_path, manifest) + + LOGGER.info( + "CHIRPS3 cache %s: reused=%s downloaded=%s dir=%s", + "hit" if cache_hit else "delta", + len(files) - len(downloaded_now), + len(downloaded_now), + download_dir, ) output = ProcessOutput( status="completed", - files=[str(f) for f in files], + files=files, summary={ "file_count": len(files), "stage": inputs.stage, + "flavor": inputs.flavor, "start": inputs.start, "end": inputs.end, + "cache_hit": cache_hit, + "cache_key": scope_key, + "cache_downloaded_delta_count": len(downloaded_now), + "cache_reused_count": len(files) - len(downloaded_now), }, message="Data downloaded" + (" (dry run)" if inputs.dry_run else ""), ) diff --git a/src/eo_api/routers/ogcapi/plugins/processes/chirps3_dhis2_pipeline.py b/src/eo_api/routers/ogcapi/plugins/processes/chirps3_dhis2_pipeline.py deleted file mode 100644 index 0351b3d..0000000 --- a/src/eo_api/routers/ogcapi/plugins/processes/chirps3_dhis2_pipeline.py +++ /dev/null @@ -1,529 +0,0 @@ -"""CHIRPS3 to DHIS2 data value pipeline process.""" - -from __future__ import annotations - -import logging -import os -from pathlib import Path -from typing import Any, cast - -import httpx -import pandas as pd -import xarray as xr -from dhis2_client.client import DHIS2Client -from dhis2eo.data.chc.chirps3 import daily as chirps3_daily -from dhis2eo.integrations.pandas import format_value_for_dhis2 -from pydantic import ValidationError -from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError -from rioxarray.exceptions import NoDataInBounds -from shapely.geometry import shape - -from eo_api.integrations.dhis2_adapter import ( - create_client, - get_org_unit_geojson, - get_org_unit_subtree_geojson, - get_org_units_geojson, -) -from eo_api.routers.ogcapi.plugins.processes.schemas import CHIRPS3DHIS2PipelineInput - -DOWNLOAD_DIR = os.getenv("DOWNLOAD_DIR", "/tmp/data") - -LOGGER = logging.getLogger(__name__) - -PROCESS_METADATA = { - "version": "0.1.0", - "id": "chirps3-dhis2-pipeline", - "title": "CHIRPS3 to DHIS2 Data Value Pipeline", - "description": "Fetch features, download CHIRPS3, aggregate values, and build a DHIS2 dataValueSet.", - "jobControlOptions": ["sync-execute"], - "keywords": ["climate", "CHIRPS3", "DHIS2", "pipeline", "dataValueSet"], - "inputs": { - "start_date": { - "title": "Start date", - "description": "Inclusive date", - "schema": {"type": "string"}, - "minOccurs": 1, - "maxOccurs": 1, - }, - "end_date": { - "title": "End date", - "description": "Inclusive date", - "schema": {"type": "string"}, - "minOccurs": 1, - "maxOccurs": 1, - }, - "features_geojson": { - "title": "Features GeoJSON", - "description": "Optional FeatureCollection", - "schema": {"type": "object"}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "org_unit_level": { - "title": "Org unit level", - "description": "DHIS2 org unit level", - "schema": {"type": "integer", "minimum": 1}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "parent_org_unit": { - "title": "Parent org unit", - "description": "Optional subtree scope", - "schema": {"type": "string"}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "org_unit_ids": { - "title": "Org unit IDs", - "description": "Optional explicit org unit UIDs", - "schema": {"type": "array", "items": {"type": "string"}}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "org_unit_id_property": { - "title": "Org unit ID property", - "description": "Property name for org unit UID", - "schema": {"type": "string", "default": "id"}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "data_element": { - "title": "Data element UID", - "description": "DHIS2 data element UID", - "schema": {"type": "string"}, - "minOccurs": 1, - "maxOccurs": 1, - }, - "category_option_combo": { - "title": "Category option combo UID", - "description": "Optional category option combo UID", - "schema": {"type": "string"}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "attribute_option_combo": { - "title": "Attribute option combo UID", - "description": "Optional attribute option combo UID", - "schema": {"type": "string"}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "data_set": { - "title": "Dataset UID", - "description": "Optional dataset UID", - "schema": {"type": "string"}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "stage": { - "title": "CHIRPS3 stage", - "description": "final or prelim", - "schema": {"type": "string", "enum": ["final", "prelim"], "default": "final"}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "spatial_reducer": { - "title": "Spatial reducer", - "description": "mean or sum", - "schema": {"type": "string", "enum": ["mean", "sum"], "default": "mean"}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "temporal_resolution": { - "title": "Temporal resolution", - "description": "daily, weekly, or monthly", - "schema": {"type": "string", "enum": ["daily", "weekly", "monthly"], "default": "monthly"}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "temporal_reducer": { - "title": "Temporal reducer", - "description": "sum or mean", - "schema": {"type": "string", "enum": ["sum", "mean"], "default": "sum"}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "value_rounding": { - "title": "Value rounding", - "description": "Decimal places", - "schema": {"type": "integer", "minimum": 0, "maximum": 10, "default": 3}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "auto_import": { - "title": "Auto import", - "description": "Import payload into DHIS2", - "schema": {"type": "boolean", "default": False}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "import_strategy": { - "title": "Import strategy", - "description": "DHIS2 importStrategy value", - "schema": {"type": "string", "default": "CREATE_AND_UPDATE"}, - "minOccurs": 0, - "maxOccurs": 1, - }, - "dry_run": { - "title": "Dry run", - "description": "Skip DHIS2 import", - "schema": {"type": "boolean", "default": True}, - "minOccurs": 0, - "maxOccurs": 1, - }, - }, - "outputs": { - "result": { - "title": "Pipeline result", - "schema": {"type": "object", "contentMediaType": "application/json"}, - } - }, -} - - -def _dhis2_client_from_env() -> DHIS2Client: - """Build DHIS2 client from environment settings.""" - try: - return create_client() - except ValueError as err: - raise ProcessorExecuteError(str(err)) from None - - -def _ensure_feature_collection(maybe_fc: dict[str, Any]) -> list[dict[str, Any]]: - """Extract feature list from GeoJSON FeatureCollection.""" - if maybe_fc.get("type") != "FeatureCollection": - raise ProcessorExecuteError("features_geojson must be a GeoJSON FeatureCollection") - features = maybe_fc.get("features", []) - if not isinstance(features, list): - raise ProcessorExecuteError("features_geojson.features must be an array") - return features - - -def _feature_org_unit_id(feature: dict[str, Any], id_property: str) -> str | None: - """Resolve orgUnit id from feature.id or a property key.""" - if feature.get("id"): - return str(feature["id"]) - props = feature.get("properties") or {} - if id_property in props and props[id_property] is not None: - return str(props[id_property]) - return None - - -def _fetch_features_from_dhis2(client: DHIS2Client, inputs: CHIRPS3DHIS2PipelineInput) -> list[dict[str, Any]]: - """Fetch features from DHIS2 using client selectors.""" - if inputs.org_unit_ids: - features: list[dict[str, Any]] = [] - for uid in inputs.org_unit_ids: - geo = get_org_unit_geojson(client, uid) - if geo.get("type") == "Feature": - features.append(geo) - elif geo.get("type") == "FeatureCollection": - features.extend(geo.get("features", [])) - return features - - if inputs.parent_org_unit: - if inputs.org_unit_level: - fc = get_org_units_geojson(client, level=inputs.org_unit_level, parent=inputs.parent_org_unit) - return _ensure_feature_collection(fc) - fc = get_org_unit_subtree_geojson(client, inputs.parent_org_unit) - return _ensure_feature_collection(fc) - - if inputs.org_unit_level: - LOGGER.warning( - "[chirps3-dhis2-pipeline] unscoped org_unit_level=%s fetch requested; " - "this may be slow on large DHIS2 instances", - inputs.org_unit_level, - ) - fc = get_org_units_geojson(client, level=inputs.org_unit_level) - return _ensure_feature_collection(fc) - - raise ProcessorExecuteError("Provide one of: features_geojson, org_unit_ids, parent_org_unit, org_unit_level") - - -def _features_union_bbox(features: list[dict[str, Any]]) -> tuple[float, float, float, float]: - """Compute union bbox from GeoJSON feature geometries.""" - bounds: list[tuple[float, float, float, float]] = [] - for feature in features: - geometry = feature.get("geometry") - if not geometry: - continue - geom = shape(geometry) - if geom.is_empty: - continue - bounds.append(geom.bounds) - if not bounds: - raise ProcessorExecuteError("No valid geometries found in input features") - - minx = min(b[0] for b in bounds) - miny = min(b[1] for b in bounds) - maxx = max(b[2] for b in bounds) - maxy = max(b[3] for b in bounds) - return (minx, miny, maxx, maxy) - - -def _resolve_value_var(dataset: xr.Dataset) -> str: - """Resolve precipitation variable name from xarray dataset.""" - if "precip" in dataset.data_vars: - return "precip" - keys = list(dataset.data_vars.keys()) - if not keys: - raise ProcessorExecuteError("Downloaded CHIRPS3 dataset has no data variables") - return str(keys[0]) - - -def _clip_spatial_series( - data_array: xr.DataArray, - geometry: dict[str, Any], - spatial_reducer: str, -) -> pd.Series: - """Clip a data array by geometry and reduce over spatial dimensions.""" - try: - clipped = data_array.rio.clip([geometry], crs="EPSG:4326", drop=True) - except NoDataInBounds: - return pd.Series(dtype=float) - spatial_dims = [dim for dim in clipped.dims if dim != "time"] - if not spatial_dims: - raise ProcessorExecuteError("Unable to resolve spatial dimensions in CHIRPS3 dataset") - reduced = ( - clipped.mean(dim=spatial_dims, skipna=True) - if spatial_reducer == "mean" - else clipped.sum(dim=spatial_dims, skipna=True) - ) - series = reduced.to_series().dropna() - if series.empty: - return cast(pd.Series, series) - series.index = pd.to_datetime(series.index) - return cast(pd.Series, series) - - -def _format_period_code(timestamp: pd.Timestamp, temporal_resolution: str) -> str: - """Format timestamp into DHIS2 period code.""" - if temporal_resolution == "daily": - return str(timestamp.strftime("%Y%m%d")) - if temporal_resolution == "monthly": - return str(timestamp.strftime("%Y%m")) - iso = timestamp.isocalendar() - iso_year, iso_week, _ = iso - return f"{int(iso_year):04d}W{int(iso_week):02d}" - - -def _as_timestamp(value: Any) -> pd.Timestamp: - """Convert a value into a valid pandas Timestamp.""" - ts = pd.Timestamp(value) - if bool(pd.isna(ts)): - raise ProcessorExecuteError("Encountered invalid timestamp while formatting periods") - return ts - - -def _apply_temporal_aggregation( - series: pd.Series, - temporal_resolution: str, - temporal_reducer: str, - start_date: pd.Timestamp, - end_date: pd.Timestamp, -) -> list[tuple[str, float]]: - """Apply optional temporal aggregation over the time index.""" - windowed = cast(Any, series[(series.index >= start_date) & (series.index <= end_date)]) - if windowed.empty: - return [] - - if temporal_resolution == "daily": - return [ - (_format_period_code(_as_timestamp(ts), temporal_resolution), float(val)) for ts, val in windowed.items() - ] - - if temporal_resolution == "monthly": - aggregated = ( - windowed.resample("MS").mean().dropna() - if temporal_reducer == "mean" - else windowed.resample("MS").sum().dropna() - ) - return [ - (_format_period_code(_as_timestamp(ts), temporal_resolution), float(val)) for ts, val in aggregated.items() - ] - - # Weekly aggregation uses ISO year/week to generate DHIS2 weekly periods (YYYYWww). - iso = windowed.index.isocalendar() - grouped = windowed.groupby([iso["year"], iso["week"]]) - weekly = grouped.mean() if temporal_reducer == "mean" else grouped.sum() - return [(f"{int(year):04d}W{int(week):02d}", float(value)) for (year, week), value in weekly.items()] - - -def _chirps_cache_key( - *, - stage: str, - start_date: Any, - end_date: Any, - bbox: tuple[float, float, float, float], -) -> str: - """Build a deterministic cache key to avoid bbox/date collisions in downloads.""" - bbox_part = "_".join(f"{coord:.4f}".replace("-", "m").replace(".", "p") for coord in bbox) - return f"{stage}_{start_date}_{end_date}_{bbox_part}" - - -class CHIRPS3DHIS2PipelineProcessor(BaseProcessor): - """One-go CHIRPS3 processing pipeline for DHIS2 data values.""" - - def __init__(self, processor_def: dict[str, Any]) -> None: - super().__init__(processor_def, PROCESS_METADATA) - - def execute(self, data: dict[str, Any], outputs: Any = None) -> tuple[str, dict[str, Any]]: - try: - inputs = CHIRPS3DHIS2PipelineInput.model_validate(data) - except ValidationError as e: - raise ProcessorExecuteError(str(e)) from e - - LOGGER.info( - "[chirps3-dhis2-pipeline] start start_date=%s end_date=%s auto_import=%s dry_run=%s", - inputs.start_date, - inputs.end_date, - inputs.auto_import, - inputs.dry_run, - ) - client = _dhis2_client_from_env() - try: - LOGGER.info("[chirps3-dhis2-pipeline] step=1 fetch_features") - if inputs.features_geojson: - features = _ensure_feature_collection(inputs.features_geojson) - else: - features = _fetch_features_from_dhis2(client, inputs) - - valid_features: list[tuple[str, dict[str, Any]]] = [] - for feature in features: - geometry = feature.get("geometry") - if not geometry: - continue - org_unit_id = _feature_org_unit_id(feature, inputs.org_unit_id_property) - if not org_unit_id: - continue - valid_features.append((org_unit_id, geometry)) - - if not valid_features: - raise ProcessorExecuteError("No valid features with geometry and org unit identifiers were found") - - LOGGER.info("[chirps3-dhis2-pipeline] features=%s", len(valid_features)) - union_bbox = _features_union_bbox([{"geometry": g} for _, g in valid_features]) - if inputs.bbox: - effective_bbox: tuple[float, float, float, float] = ( - float(inputs.bbox[0]), - float(inputs.bbox[1]), - float(inputs.bbox[2]), - float(inputs.bbox[3]), - ) - else: - effective_bbox = union_bbox - LOGGER.info("[chirps3-dhis2-pipeline] step=2 download_chirps3 bbox=%s", effective_bbox) - cache_key = _chirps_cache_key( - stage=inputs.stage, - start_date=inputs.start_date, - end_date=inputs.end_date, - bbox=effective_bbox, - ) - download_dir = Path(DOWNLOAD_DIR) / "chirps3_dhis2_pipeline" / cache_key - download_dir.mkdir(parents=True, exist_ok=True) - - files = chirps3_daily.download( - start=str(inputs.start_date), - end=str(inputs.end_date), - bbox=effective_bbox, - dirname=str(download_dir), - prefix=f"chirps3_pipeline_{cache_key}", - stage=inputs.stage, - ) - if not files: - raise ProcessorExecuteError("No CHIRPS3 files were downloaded for the requested date range") - - LOGGER.info("[chirps3-dhis2-pipeline] downloaded_files=%s", len(files)) - LOGGER.info("[chirps3-dhis2-pipeline] step=3 aggregate") - dataset = xr.open_mfdataset([str(path) for path in files], combine="by_coords") - data_var = _resolve_value_var(dataset) - data_array = dataset[data_var] - start_dt = pd.Timestamp(inputs.start_date) - end_dt = pd.Timestamp(inputs.end_date) - - rows: list[dict[str, Any]] = [] - for org_unit_id, geometry in valid_features: - series = _clip_spatial_series(data_array, geometry, inputs.spatial_reducer) - if series.empty: - continue - period_values = _apply_temporal_aggregation( - series, - inputs.temporal_resolution, - inputs.temporal_reducer, - start_dt, - end_dt, - ) - for period, value in period_values: - if pd.isna(value): - continue - rows.append( - { - "orgUnit": org_unit_id, - "period": period, - "value": round(value, inputs.value_rounding), - } - ) - - if not rows: - raise ProcessorExecuteError("No non-empty aggregated values were produced for the selected features") - - LOGGER.info("[chirps3-dhis2-pipeline] aggregated_rows=%s", len(rows)) - LOGGER.info("[chirps3-dhis2-pipeline] step=4 build_datavalueset") - data_values: list[dict[str, Any]] = [] - for row in rows: - data_value = { - "dataElement": inputs.data_element, - "orgUnit": row["orgUnit"], - "period": row["period"], - "value": format_value_for_dhis2(row["value"]), - } - if inputs.category_option_combo: - data_value["categoryOptionCombo"] = inputs.category_option_combo - if inputs.attribute_option_combo: - data_value["attributeOptionCombo"] = inputs.attribute_option_combo - data_values.append(data_value) - payload: dict[str, Any] = {"dataValues": data_values} - if inputs.data_set: - payload["dataSet"] = inputs.data_set - - import_response: dict[str, Any] | None = None - should_import = inputs.auto_import and not inputs.dry_run - if should_import: - LOGGER.info("[chirps3-dhis2-pipeline] step=5 import_to_dhis2") - import_response = client.post( - "/api/dataValueSets", - params={"importStrategy": inputs.import_strategy}, - json=payload, - ) - LOGGER.info("[chirps3-dhis2-pipeline] import_completed") - - LOGGER.info("[chirps3-dhis2-pipeline] completed") - return "application/json", { - "status": "completed", - "files": [str(path) for path in files], - "summary": { - "feature_count": len(valid_features), - "data_value_count": len(payload["dataValues"]), - "start_date": str(inputs.start_date), - "end_date": str(inputs.end_date), - "temporal_resolution": inputs.temporal_resolution, - "spatial_reducer": inputs.spatial_reducer, - "temporal_reducer": inputs.temporal_reducer, - "imported": bool(import_response), - }, - "message": "Pipeline completed" + (" (dry run)" if inputs.dry_run else ""), - "dataValueSet": payload, - "importResponse": import_response, - } - except httpx.ReadTimeout: - raise ProcessorExecuteError( - "DHIS2 request timed out. Narrow feature scope with parent_org_unit/org_unit_ids " - "or increase DHIS2_HTTP_TIMEOUT_SECONDS." - ) from None - except Exception as e: - raise ProcessorExecuteError(str(e)) from None - finally: - client.close() - - def __repr__(self) -> str: - return "" diff --git a/src/eo_api/routers/ogcapi/plugins/processes/chirps3_workflow.py b/src/eo_api/routers/ogcapi/plugins/processes/chirps3_workflow.py new file mode 100644 index 0000000..e8366fc --- /dev/null +++ b/src/eo_api/routers/ogcapi/plugins/processes/chirps3_workflow.py @@ -0,0 +1,218 @@ +"""CHIRPS3 to DHIS2 workflow process.""" + +from __future__ import annotations + +import logging +from typing import Any + +from pygeoapi.process.base import BaseProcessor + +from eo_api.routers.ogcapi.plugins.processes.chirps3 import CHIRPS3Processor +from eo_api.routers.ogcapi.plugins.processes.data_aggregate import DataAggregateProcessor +from eo_api.routers.ogcapi.plugins.processes.datavalue_build import DataValueBuildProcessor +from eo_api.routers.ogcapi.plugins.processes.feature_fetch import FeatureFetchProcessor +from eo_api.routers.ogcapi.plugins.processes.schemas import ClimateDhis2WorkflowInput +from eo_api.routers.ogcapi.plugins.processes.workflow_runtime import run_process_with_trace + +LOGGER = logging.getLogger(__name__) + +PROCESS_METADATA = { + "version": "0.1.0", + "id": "chirps3-dhis2-workflow", + "title": "CHIRPS3 to DHIS2 Workflow", + "description": "Orchestrate feature fetch, CHIRPS download, aggregation, and DHIS2 data value generation.", + "jobControlOptions": ["sync-execute"], + "keywords": ["climate", "CHIRPS3", "DHIS2", "workflow", "dataValueSet"], + "inputs": { + "start_date": {"title": "Start date", "schema": {"type": "string"}, "minOccurs": 1, "maxOccurs": 1}, + "end_date": {"title": "End date", "schema": {"type": "string"}, "minOccurs": 1, "maxOccurs": 1}, + "features_geojson": { + "title": "Features GeoJSON", + "schema": {"type": "object"}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "org_unit_level": { + "title": "Org unit level", + "schema": {"type": "integer", "minimum": 1}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "parent_org_unit": {"title": "Parent org unit", "schema": {"type": "string"}, "minOccurs": 0, "maxOccurs": 1}, + "org_unit_ids": { + "title": "Org unit IDs", + "schema": {"type": "array", "items": {"type": "string"}}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "org_unit_id_property": { + "title": "Org unit ID property", + "schema": {"type": "string", "default": "id"}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "data_element": {"title": "Data element UID", "schema": {"type": "string"}, "minOccurs": 1, "maxOccurs": 1}, + "category_option_combo": { + "title": "Category option combo UID", + "schema": {"type": "string"}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "attribute_option_combo": { + "title": "Attribute option combo UID", + "schema": {"type": "string"}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "data_set": {"title": "Dataset UID", "schema": {"type": "string"}, "minOccurs": 0, "maxOccurs": 1}, + "stage": { + "title": "CHIRPS3 stage", + "schema": {"type": "string", "enum": ["final", "prelim"], "default": "final"}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "flavor": { + "title": "CHIRPS3 flavor", + "schema": {"type": "string", "enum": ["rnl", "sat"], "default": "rnl"}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "spatial_reducer": { + "title": "Spatial reducer", + "schema": {"type": "string", "enum": ["mean", "sum"], "default": "mean"}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "temporal_resolution": { + "title": "Temporal resolution", + "schema": {"type": "string", "enum": ["daily", "weekly", "monthly"], "default": "monthly"}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "temporal_reducer": { + "title": "Temporal reducer", + "schema": {"type": "string", "enum": ["sum", "mean"], "default": "sum"}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "value_rounding": { + "title": "Value rounding", + "schema": {"type": "integer", "minimum": 0, "maximum": 10, "default": 3}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "dry_run": {"title": "Dry run", "schema": {"type": "boolean", "default": True}, "minOccurs": 0, "maxOccurs": 1}, + }, + "outputs": { + "result": { + "title": "Workflow result", + "schema": {"type": "object", "contentMediaType": "application/json"}, + } + }, +} + + +class Chirps3WorkflowProcessor(BaseProcessor): + """Workflow orchestration process with decomposed internal services.""" + + def __init__(self, processor_def: dict[str, Any]) -> None: + super().__init__(processor_def, PROCESS_METADATA) + + def execute(self, data: dict[str, Any], outputs: Any = None) -> tuple[str, dict[str, Any]]: + inputs = ClimateDhis2WorkflowInput.model_validate(data) + workflow_trace: list[dict[str, Any]] = [] + LOGGER.info( + "[chirps3-dhis2-workflow] start start_date=%s end_date=%s stage=%s flavor=%s dry_run=%s", + inputs.start_date, + inputs.end_date, + inputs.stage, + inputs.flavor, + inputs.dry_run, + ) + + feature_result = run_process_with_trace( + workflow_trace, + step_name="feature_fetch", + processor_cls=FeatureFetchProcessor, + process_name="feature-fetch", + data={ + "features_geojson": inputs.features_geojson, + "org_unit_level": inputs.org_unit_level, + "parent_org_unit": inputs.parent_org_unit, + "org_unit_ids": inputs.org_unit_ids, + "org_unit_id_property": inputs.org_unit_id_property, + "bbox": inputs.bbox, + }, + ) + download_result = run_process_with_trace( + workflow_trace, + step_name="chirps3_download", + processor_cls=CHIRPS3Processor, + process_name="chirps3-download", + data={ + "start": inputs.start_date.strftime("%Y-%m"), + "end": inputs.end_date.strftime("%Y-%m"), + "bbox": list(feature_result["effective_bbox"]), + "stage": inputs.stage, + "flavor": inputs.flavor, + "dry_run": True, + }, + ) + files = [str(path) for path in download_result.get("files", [])] + + aggregate_result = run_process_with_trace( + workflow_trace, + step_name="aggregate", + processor_cls=DataAggregateProcessor, + process_name="data-aggregate", + data={ + "start_date": str(inputs.start_date), + "end_date": str(inputs.end_date), + "files": files, + "valid_features": feature_result["valid_features"], + "spatial_reducer": inputs.spatial_reducer, + "temporal_resolution": inputs.temporal_resolution, + "temporal_reducer": inputs.temporal_reducer, + "value_rounding": inputs.value_rounding, + }, + ) + rows = aggregate_result["rows"] + + dv_result = run_process_with_trace( + workflow_trace, + step_name="build_datavalues", + processor_cls=DataValueBuildProcessor, + process_name="dhis2-datavalue-build", + data={ + "rows": rows, + "data_element": inputs.data_element, + "category_option_combo": inputs.category_option_combo, + "attribute_option_combo": inputs.attribute_option_combo, + "data_set": inputs.data_set, + }, + ) + + return "application/json", { + "status": "completed", + "files": files, + "summary": { + "feature_count": len(feature_result["valid_features"]), + "data_value_count": len(dv_result["dataValueSet"]["dataValues"]), + "start_date": str(inputs.start_date), + "end_date": str(inputs.end_date), + "stage": inputs.stage, + "flavor": inputs.flavor, + "temporal_resolution": inputs.temporal_resolution, + "spatial_reducer": inputs.spatial_reducer, + "temporal_reducer": inputs.temporal_reducer, + "imported": False, + }, + "message": "Workflow completed (stopped at data value generation)", + "dataValueSet": dv_result["dataValueSet"], + "dataValueTable": dv_result["table"], + "workflowTrace": workflow_trace, + "importResponse": None, + } + + def __repr__(self) -> str: + return "" diff --git a/src/eo_api/routers/ogcapi/plugins/processes/data_aggregate.py b/src/eo_api/routers/ogcapi/plugins/processes/data_aggregate.py new file mode 100644 index 0000000..f74ac75 --- /dev/null +++ b/src/eo_api/routers/ogcapi/plugins/processes/data_aggregate.py @@ -0,0 +1,289 @@ +"""Aggregate CHIRPS3 files over workflow features.""" + +from __future__ import annotations + +import hashlib +import os +from typing import Any, cast + +import pandas as pd +import xarray as xr +from pydantic import ValidationError +from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError +from rioxarray.exceptions import NoDataInBounds + +from eo_api.routers.ogcapi.plugins.processes.schemas import DataAggregateInput + +DOWNLOAD_DIR = os.getenv("DOWNLOAD_DIR", "/tmp/data") + +PROCESS_METADATA = { + "version": "0.1.0", + "id": "data-aggregate", + "title": "Data aggregate", + "description": "Aggregate downloaded raster files over normalized features.", + "jobControlOptions": ["sync-execute"], + "keywords": ["aggregation", "zonal", "timeseries"], + "inputs": { + "start_date": {"schema": {"type": "string"}, "minOccurs": 1, "maxOccurs": 1}, + "end_date": {"schema": {"type": "string"}, "minOccurs": 1, "maxOccurs": 1}, + "files": {"schema": {"type": "array", "items": {"type": "string"}}, "minOccurs": 1, "maxOccurs": 1}, + "valid_features": {"schema": {"type": "array", "items": {"type": "object"}}, "minOccurs": 1, "maxOccurs": 1}, + "stage": {"schema": {"type": "string", "enum": ["final", "prelim"], "default": "final"}}, + "flavor": {"schema": {"type": "string", "enum": ["rnl", "sat"], "default": "rnl"}}, + "spatial_reducer": {"schema": {"type": "string", "enum": ["mean", "sum"], "default": "mean"}}, + "temporal_resolution": { + "schema": {"type": "string", "enum": ["daily", "weekly", "monthly"], "default": "monthly"} + }, + "temporal_reducer": {"schema": {"type": "string", "enum": ["sum", "mean"], "default": "sum"}}, + "value_rounding": {"schema": {"type": "integer", "minimum": 0, "maximum": 10, "default": 3}}, + }, + "outputs": { + "result": { + "title": "Aggregated rows", + "schema": {"type": "object", "contentMediaType": "application/json"}, + } + }, +} + + +def _resolve_value_var(dataset: xr.Dataset) -> str: + if "precip" in dataset.data_vars: + return "precip" + keys = list(dataset.data_vars.keys()) + if not keys: + raise ProcessorExecuteError("Downloaded CHIRPS3 dataset has no data variables") + return str(keys[0]) + + +def _clip_spatial_series(data_array: xr.DataArray, geometry: dict[str, Any], spatial_reducer: str) -> pd.Series: + try: + clipped = data_array.rio.clip([geometry], crs="EPSG:4326", drop=True) + except NoDataInBounds: + return pd.Series(dtype=float) + spatial_dims = [dim for dim in clipped.dims if dim != "time"] + if not spatial_dims: + raise ProcessorExecuteError("Unable to resolve spatial dimensions in CHIRPS3 dataset") + reduced = ( + clipped.mean(dim=spatial_dims, skipna=True) + if spatial_reducer == "mean" + else clipped.sum(dim=spatial_dims, skipna=True) + ) + series: pd.Series[Any] = reduced.to_series().dropna() + if series.empty: + return series + series.index = pd.DatetimeIndex(pd.to_datetime(series.index)) + return series + + +def _format_period_code(timestamp: pd.Timestamp, temporal_resolution: str) -> str: + if temporal_resolution == "daily": + return str(timestamp.strftime("%Y%m%d")) + if temporal_resolution == "monthly": + return str(timestamp.strftime("%Y%m")) + iso = timestamp.isocalendar() + iso_year, iso_week, _ = iso + return f"{int(iso_year):04d}W{int(iso_week):02d}" + + +def _as_timestamp(value: Any) -> pd.Timestamp: + ts = pd.Timestamp(value) + if bool(pd.isna(ts)): + raise ProcessorExecuteError("Encountered invalid timestamp while formatting periods") + return ts + + +def _apply_temporal_aggregation( + series: pd.Series, + temporal_resolution: str, + temporal_reducer: str, + start_date: pd.Timestamp, + end_date: pd.Timestamp, +) -> list[tuple[str, float]]: + windowed = series[(series.index >= start_date) & (series.index <= end_date)] + if windowed.empty: + return [] + if temporal_resolution == "daily": + return [ + (_format_period_code(_as_timestamp(ts), temporal_resolution), float(val)) for ts, val in windowed.items() + ] + if temporal_resolution == "monthly": + aggregated = ( + windowed.resample("MS").mean().dropna() + if temporal_reducer == "mean" + else windowed.resample("MS").sum().dropna() + ) + return [ + (_format_period_code(_as_timestamp(ts), temporal_resolution), float(val)) for ts, val in aggregated.items() + ] + index = pd.DatetimeIndex(pd.to_datetime(windowed.index)) + weekly_df = pd.DataFrame({"value": windowed.to_numpy()}, index=index) + iso = index.isocalendar() + weekly_df["iso_year"] = iso.year.to_numpy() + weekly_df["iso_week"] = iso.week.to_numpy() + grouped = weekly_df.groupby(["iso_year", "iso_week"])["value"] + weekly = grouped.mean() if temporal_reducer == "mean" else grouped.sum() + pairs: list[tuple[str, float]] = [] + for key, value in weekly.items(): + iso_year, iso_week = cast(tuple[Any, Any], key) + pairs.append((f"{int(iso_year):04d}W{int(iso_week):02d}", float(value))) + return pairs + + +def _scope_token_from_files(files: list[str], fallback_stage: str, fallback_flavor: str) -> str: + if not files: + return f"{fallback_stage}_{fallback_flavor}_unknown" + # Expected name: chirps3__YYYY-MM.nc + name = os.path.basename(files[0]) + if name.endswith(".nc") and len(name) > 11: + stem = name[:-3] + token = stem.rsplit("_", 1)[0] + if token: + return token + return f"{fallback_stage}_{fallback_flavor}_unknown" + + +def _cache_key( + *, + scope_token: str, + spatial_reducer: str, + temporal_resolution: str, + temporal_reducer: str, + value_rounding: int, +) -> str: + raw = "|".join([scope_token, spatial_reducer, temporal_resolution, temporal_reducer, str(value_rounding)]) + return hashlib.sha1(raw.encode("utf-8")).hexdigest()[:16] + + +def _target_periods(start_date: pd.Timestamp, end_date: pd.Timestamp, temporal_resolution: str) -> list[str]: + if temporal_resolution == "daily": + return [str(ts.strftime("%Y%m%d")) for ts in pd.date_range(start_date, end_date, freq="D")] + if temporal_resolution == "monthly": + return [str(ts.strftime("%Y%m")) for ts in pd.date_range(start_date, end_date, freq="MS")] + days = pd.date_range(start_date, end_date, freq="D") + keys: list[str] = [] + seen: set[str] = set() + for ts in days: + iso = ts.isocalendar() + key = f"{int(iso.year):04d}W{int(iso.week):02d}" + if key not in seen: + seen.add(key) + keys.append(key) + return keys + + +def _load_cached_rows(cache_file: str) -> list[dict[str, Any]]: + if not os.path.exists(cache_file): + return [] + df = pd.read_csv(cache_file, dtype={"orgUnit": str, "period": str}) + if df.empty: + return [] + df["value"] = pd.to_numeric(df["value"], errors="coerce") + df = df.dropna(subset=["value"]) + records = df.to_dict(orient="records") + return [ + {"orgUnit": str(record["orgUnit"]), "period": str(record["period"]), "value": float(record["value"])} + for record in records + ] + + +def _write_cached_rows(cache_file: str, rows: list[dict[str, Any]]) -> None: + if not rows: + return + os.makedirs(os.path.dirname(cache_file), exist_ok=True) + df = pd.DataFrame(rows) + deduped = df.drop_duplicates(subset=["orgUnit", "period"], keep="last").sort_values(by=["orgUnit", "period"]) + deduped.to_csv(cache_file, index=False) + + +class DataAggregateProcessor(BaseProcessor): + """Process wrapper for workflow aggregation step.""" + + def __init__(self, processor_def: dict[str, Any]) -> None: + super().__init__(processor_def, PROCESS_METADATA) + + def execute(self, data: dict[str, Any], outputs: Any = None) -> tuple[str, dict[str, Any]]: + try: + inputs = DataAggregateInput.model_validate(data) + except ValidationError as err: + raise ProcessorExecuteError(str(err)) from err + + dataset = xr.open_mfdataset(inputs.files, combine="by_coords") + try: + data_var = _resolve_value_var(dataset) + data_array = dataset[data_var] + start_dt = pd.Timestamp(inputs.start_date) + end_dt = pd.Timestamp(inputs.end_date) + target_periods = _target_periods(start_dt, end_dt, inputs.temporal_resolution) + target_period_set = set(target_periods) + target_org_units = {str(item["orgUnit"]) for item in inputs.valid_features} + + scope_token = _scope_token_from_files(inputs.files, inputs.stage, inputs.flavor) + cache_key = _cache_key( + scope_token=scope_token, + spatial_reducer=inputs.spatial_reducer, + temporal_resolution=inputs.temporal_resolution, + temporal_reducer=inputs.temporal_reducer, + value_rounding=inputs.value_rounding, + ) + cache_file = os.path.join(DOWNLOAD_DIR, "aggregation_cache", f"{cache_key}.csv") + cached_rows = _load_cached_rows(cache_file) + cached_by_key = {(str(r["orgUnit"]), str(r["period"])): r for r in cached_rows} + + rows: list[dict[str, Any]] = [] + computed_rows: list[dict[str, Any]] = [] + for item in inputs.valid_features: + org_unit_id = str(item["orgUnit"]) + geometry = item["geometry"] + missing_periods = [period for period in target_periods if (org_unit_id, period) not in cached_by_key] + if not missing_periods: + continue + series = _clip_spatial_series(data_array, geometry, inputs.spatial_reducer) + if series.empty: + continue + period_values = _apply_temporal_aggregation( + series, + inputs.temporal_resolution, + inputs.temporal_reducer, + start_dt, + end_dt, + ) + for period, value in period_values: + if pd.isna(value): + continue + if period in target_period_set and period in missing_periods: + computed_rows.append( + { + "orgUnit": org_unit_id, + "period": period, + "value": round(value, inputs.value_rounding), + } + ) + finally: + dataset.close() + + merged_rows_map = dict(cached_by_key) + for row in computed_rows: + merged_rows_map[(str(row["orgUnit"]), str(row["period"]))] = row + + rows = [ + row + for (org_unit, period), row in merged_rows_map.items() + if org_unit in target_org_units and period in target_period_set + ] + if not rows: + raise ProcessorExecuteError("No non-empty aggregated values were produced for the selected features") + + _write_cached_rows(cache_file, list(merged_rows_map.values())) + + return "application/json", { + "rows": sorted(rows, key=lambda item: (str(item["orgUnit"]), str(item["period"]))), + "cache": { + "key": cache_key, + "file": cache_file, + "cached_rows_reused": len(rows) - len(computed_rows), + "computed_rows_delta": len(computed_rows), + }, + } + + def __repr__(self) -> str: + return "" diff --git a/src/eo_api/routers/ogcapi/plugins/processes/datavalue_build.py b/src/eo_api/routers/ogcapi/plugins/processes/datavalue_build.py new file mode 100644 index 0000000..b22226e --- /dev/null +++ b/src/eo_api/routers/ogcapi/plugins/processes/datavalue_build.py @@ -0,0 +1,108 @@ +"""Build DHIS2 dataValueSet payloads from aggregated rows.""" + +from __future__ import annotations + +from typing import Any + +from dhis2eo.integrations.pandas import format_value_for_dhis2 +from pydantic import ValidationError +from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError + +from eo_api.routers.ogcapi.plugins.processes.schemas import DataValueBuildInput + +PROCESS_METADATA = { + "version": "0.1.0", + "id": "dhis2-datavalue-build", + "title": "Build DHIS2 dataValueSet", + "description": "Build a DHIS2-compatible dataValueSet and table output from aggregated rows.", + "jobControlOptions": ["sync-execute"], + "keywords": ["dhis2", "datavalueset", "format", "table"], + "inputs": { + "rows": { + "title": "Aggregated rows", + "description": "List of aggregated rows with orgUnit, period, value.", + "schema": {"type": "array", "items": {"type": "object"}}, + "minOccurs": 1, + "maxOccurs": 1, + }, + "data_element": { + "title": "Data element UID", + "description": "DHIS2 data element UID.", + "schema": {"type": "string"}, + "minOccurs": 1, + "maxOccurs": 1, + }, + "category_option_combo": { + "title": "Category option combo UID", + "description": "Optional category option combo UID.", + "schema": {"type": "string"}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "attribute_option_combo": { + "title": "Attribute option combo UID", + "description": "Optional attribute option combo UID.", + "schema": {"type": "string"}, + "minOccurs": 0, + "maxOccurs": 1, + }, + "data_set": { + "title": "Dataset UID", + "description": "Optional dataset UID.", + "schema": {"type": "string"}, + "minOccurs": 0, + "maxOccurs": 1, + }, + }, + "outputs": { + "result": { + "title": "Built dataValueSet and table", + "schema": {"type": "object", "contentMediaType": "application/json"}, + } + }, +} + + +class DataValueBuildProcessor(BaseProcessor): + """Processor that builds dataValueSet/table output from aggregated rows.""" + + def __init__(self, processor_def: dict[str, Any]) -> None: + super().__init__(processor_def, PROCESS_METADATA) + + def execute(self, data: dict[str, Any], outputs: Any = None) -> tuple[str, dict[str, Any]]: + try: + inputs = DataValueBuildInput.model_validate(data) + except ValidationError as err: + raise ProcessorExecuteError(str(err)) from err + + data_values: list[dict[str, Any]] = [] + for row in inputs.rows: + data_value = { + "dataElement": inputs.data_element, + "orgUnit": row["orgUnit"], + "period": row["period"], + "value": format_value_for_dhis2(row["value"]), + } + if inputs.category_option_combo: + data_value["categoryOptionCombo"] = inputs.category_option_combo + if inputs.attribute_option_combo: + data_value["attributeOptionCombo"] = inputs.attribute_option_combo + data_values.append(data_value) + + payload: dict[str, Any] = {"dataValues": data_values} + if inputs.data_set: + payload["dataSet"] = inputs.data_set + + columns = ["orgUnit", "period", "value", "dataElement", "categoryOptionCombo", "attributeOptionCombo"] + table_rows = [{column: value.get(column) for column in columns} for value in data_values] + + return "application/json", { + "dataValueSet": payload, + "table": { + "columns": columns, + "rows": table_rows, + }, + } + + def __repr__(self) -> str: + return "" diff --git a/src/eo_api/routers/ogcapi/plugins/processes/feature_fetch.py b/src/eo_api/routers/ogcapi/plugins/processes/feature_fetch.py new file mode 100644 index 0000000..8df2d5b --- /dev/null +++ b/src/eo_api/routers/ogcapi/plugins/processes/feature_fetch.py @@ -0,0 +1,158 @@ +"""Fetch DHIS2/GeoJSON features for CHIRPS3 workflow.""" + +from __future__ import annotations + +from typing import Any + +from pydantic import ValidationError +from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError +from shapely.geometry import shape + +from eo_api.integrations.dhis2_adapter import ( + create_client, + get_org_unit_geojson, + get_org_unit_subtree_geojson, + get_org_units_geojson, +) +from eo_api.routers.ogcapi.plugins.processes.schemas import FeatureFetchInput + +PROCESS_METADATA = { + "version": "0.1.0", + "id": "feature-fetch", + "title": "Feature fetch", + "description": "Fetch and normalize org-unit features from inline GeoJSON or DHIS2 selectors.", + "jobControlOptions": ["sync-execute"], + "keywords": ["dhis2", "features", "geojson", "orgunits"], + "inputs": { + "features_geojson": {"schema": {"type": "object"}, "minOccurs": 0, "maxOccurs": 1}, + "org_unit_level": {"schema": {"type": "integer", "minimum": 1}, "minOccurs": 0, "maxOccurs": 1}, + "parent_org_unit": {"schema": {"type": "string"}, "minOccurs": 0, "maxOccurs": 1}, + "org_unit_ids": {"schema": {"type": "array", "items": {"type": "string"}}, "minOccurs": 0, "maxOccurs": 1}, + "org_unit_id_property": {"schema": {"type": "string", "default": "id"}, "minOccurs": 0, "maxOccurs": 1}, + "bbox": { + "schema": {"type": "array", "items": {"type": "number"}, "minItems": 4, "maxItems": 4}, + "minOccurs": 0, + "maxOccurs": 1, + }, + }, + "outputs": { + "result": { + "title": "Normalized features and effective bbox", + "schema": {"type": "object", "contentMediaType": "application/json"}, + } + }, +} + + +def _ensure_feature_collection(maybe_fc: dict[str, Any]) -> list[dict[str, Any]]: + if maybe_fc.get("type") != "FeatureCollection": + raise ProcessorExecuteError("features_geojson must be a GeoJSON FeatureCollection") + features = maybe_fc.get("features", []) + if not isinstance(features, list): + raise ProcessorExecuteError("features_geojson.features must be an array") + return features + + +def _feature_org_unit_id(feature: dict[str, Any], id_property: str) -> str | None: + if feature.get("id"): + return str(feature["id"]) + props = feature.get("properties") or {} + if id_property in props and props[id_property] is not None: + return str(props[id_property]) + return None + + +def _fetch_features_from_dhis2(inputs: FeatureFetchInput) -> list[dict[str, Any]]: + client = create_client() + try: + if inputs.org_unit_ids: + features: list[dict[str, Any]] = [] + for uid in inputs.org_unit_ids: + geo = get_org_unit_geojson(client, uid) + if geo.get("type") == "Feature": + features.append(geo) + elif geo.get("type") == "FeatureCollection": + features.extend(geo.get("features", [])) + return features + + if inputs.parent_org_unit: + if inputs.org_unit_level: + fc = get_org_units_geojson(client, level=inputs.org_unit_level, parent=inputs.parent_org_unit) + return _ensure_feature_collection(fc) + fc = get_org_unit_subtree_geojson(client, inputs.parent_org_unit) + return _ensure_feature_collection(fc) + + if inputs.org_unit_level: + fc = get_org_units_geojson(client, level=inputs.org_unit_level) + return _ensure_feature_collection(fc) + + raise ProcessorExecuteError("Provide one of: features_geojson, org_unit_ids, parent_org_unit, org_unit_level") + finally: + client.close() + + +def _features_union_bbox(features: list[dict[str, Any]]) -> tuple[float, float, float, float]: + bounds: list[tuple[float, float, float, float]] = [] + for feature in features: + geometry = feature.get("geometry") + if not geometry: + continue + geom = shape(geometry) + if geom.is_empty: + continue + bounds.append(geom.bounds) + if not bounds: + raise ProcessorExecuteError("No valid geometries found in input features") + minx = min(b[0] for b in bounds) + miny = min(b[1] for b in bounds) + maxx = max(b[2] for b in bounds) + maxy = max(b[3] for b in bounds) + return (minx, miny, maxx, maxy) + + +class FeatureFetchProcessor(BaseProcessor): + """Process wrapper for workflow feature-fetch step.""" + + def __init__(self, processor_def: dict[str, Any]) -> None: + super().__init__(processor_def, PROCESS_METADATA) + + def execute(self, data: dict[str, Any], outputs: Any = None) -> tuple[str, dict[str, Any]]: + try: + inputs = FeatureFetchInput.model_validate(data) + except ValidationError as err: + raise ProcessorExecuteError(str(err)) from err + + if inputs.features_geojson: + features = _ensure_feature_collection(inputs.features_geojson) + else: + features = _fetch_features_from_dhis2(inputs) + + valid_features: list[dict[str, Any]] = [] + for feature in features: + geometry = feature.get("geometry") + if not geometry: + continue + org_unit_id = _feature_org_unit_id(feature, inputs.org_unit_id_property) + if not org_unit_id: + continue + valid_features.append({"orgUnit": org_unit_id, "geometry": geometry}) + + if not valid_features: + raise ProcessorExecuteError("No valid features with geometry and org unit identifiers were found") + + union_bbox = _features_union_bbox([{"geometry": f["geometry"]} for f in valid_features]) + effective_bbox: tuple[float, float, float, float] + if inputs.bbox: + effective_bbox = ( + float(inputs.bbox[0]), + float(inputs.bbox[1]), + float(inputs.bbox[2]), + float(inputs.bbox[3]), + ) + else: + effective_bbox = union_bbox + + return "application/json", {"valid_features": valid_features, "effective_bbox": effective_bbox} + + def __repr__(self) -> str: + return "" diff --git a/src/eo_api/routers/ogcapi/plugins/processes/schemas.py b/src/eo_api/routers/ogcapi/plugins/processes/schemas.py index 39e7a2f..e204da9 100644 --- a/src/eo_api/routers/ogcapi/plugins/processes/schemas.py +++ b/src/eo_api/routers/ogcapi/plugins/processes/schemas.py @@ -29,6 +29,13 @@ class CHIRPS3Input(ClimateProcessInput): """CHIRPS3 specific inputs.""" stage: str = Field(default="final", pattern=r"^(final|prelim)$", description="Product stage") + flavor: str = Field(default="rnl", pattern=r"^(rnl|sat)$", description="Product flavor") + + @model_validator(mode="after") + def validate_stage_flavor(self) -> "CHIRPS3Input": + if self.stage == "prelim" and self.flavor != "sat": + raise ValueError("For stage='prelim', flavor must be 'sat'") + return self class ZonalStatisticsInput(BaseModel): @@ -61,8 +68,8 @@ def validate_stats(self) -> "ZonalStatisticsInput": return self -class CHIRPS3DHIS2PipelineInput(BaseModel): - """Inputs for CHIRPS3 -> DHIS2 data value pipeline.""" +class ClimateDhis2WorkflowInput(BaseModel): + """Inputs for climate -> DHIS2 workflow.""" start_date: date = Field(..., description="Inclusive start date (YYYY-MM-DD)") end_date: date = Field(..., description="Inclusive end date (YYYY-MM-DD)") @@ -87,6 +94,7 @@ class CHIRPS3DHIS2PipelineInput(BaseModel): attribute_option_combo: str | None = Field(default=None) data_set: str | None = Field(default=None) stage: str = Field(default="final", pattern=r"^(final|prelim)$") + flavor: str = Field(default="rnl", pattern=r"^(rnl|sat)$") spatial_reducer: str = Field(default="mean", pattern=r"^(mean|sum)$") temporal_resolution: str = Field( default="monthly", @@ -103,10 +111,56 @@ class CHIRPS3DHIS2PipelineInput(BaseModel): import_strategy: str = Field(default="CREATE_AND_UPDATE") @model_validator(mode="after") - def validate_date_window(self) -> "CHIRPS3DHIS2PipelineInput": + def validate_date_window(self) -> "ClimateDhis2WorkflowInput": """Ensure date window is valid.""" if self.end_date < self.start_date: raise ValueError("end_date must be greater than or equal to start_date") + if self.stage == "prelim" and self.flavor != "sat": + raise ValueError("For stage='prelim', flavor must be 'sat'") + return self + + +class FeatureFetchInput(BaseModel): + """Inputs for feature fetching step (DHIS2 or inline GeoJSON).""" + + bbox: list[float] | None = Field(default=None, min_length=4, max_length=4) + features_geojson: dict[str, Any] | None = Field(default=None) + org_unit_level: int | None = Field(default=None, ge=1) + parent_org_unit: str | None = Field(default=None) + org_unit_ids: list[str] | None = Field(default=None) + org_unit_id_property: str = Field(default="id") + + +class DataValueBuildInput(BaseModel): + """Inputs for dataValueSet builder step.""" + + rows: list[dict[str, Any]] = Field(default_factory=list) + data_element: str = Field(..., description="DHIS2 data element UID") + category_option_combo: str | None = Field(default=None) + attribute_option_combo: str | None = Field(default=None) + data_set: str | None = Field(default=None) + + +class DataAggregateInput(BaseModel): + """Inputs for data aggregation step.""" + + start_date: date = Field(..., description="Inclusive start date (YYYY-MM-DD)") + end_date: date = Field(..., description="Inclusive end date (YYYY-MM-DD)") + files: list[str] = Field(default_factory=list, description="Downloaded CHIRPS3 file paths") + valid_features: list[dict[str, Any]] = Field(default_factory=list, description="Feature rows with orgUnit/geometry") + stage: str = Field(default="final", pattern=r"^(final|prelim)$") + flavor: str = Field(default="rnl", pattern=r"^(rnl|sat)$") + spatial_reducer: str = Field(default="mean", pattern=r"^(mean|sum)$") + temporal_resolution: str = Field(default="monthly", pattern=r"^(daily|weekly|monthly)$") + temporal_reducer: str = Field(default="sum", pattern=r"^(sum|mean)$") + value_rounding: int = Field(default=3, ge=0, le=10) + + @model_validator(mode="after") + def validate_date_window(self) -> "DataAggregateInput": + if self.end_date < self.start_date: + raise ValueError("end_date must be greater than or equal to start_date") + if self.stage == "prelim" and self.flavor != "sat": + raise ValueError("For stage='prelim', flavor must be 'sat'") return self diff --git a/src/eo_api/routers/ogcapi/plugins/processes/workflow_runtime.py b/src/eo_api/routers/ogcapi/plugins/processes/workflow_runtime.py new file mode 100644 index 0000000..1276893 --- /dev/null +++ b/src/eo_api/routers/ogcapi/plugins/processes/workflow_runtime.py @@ -0,0 +1,64 @@ +"""Minimal workflow runtime helpers for step entry/exit logging.""" + +from __future__ import annotations + +import logging +import time +from collections.abc import Callable +from typing import Any, TypeVar + +from pygeoapi.process.base import ProcessorExecuteError + +LOGGER = logging.getLogger(__name__) +T = TypeVar("T") + + +def run_step(name: str, fn: Callable[..., T], *args: Any, **kwargs: Any) -> T: + """Run a workflow step with consistent logging and error wrapping.""" + LOGGER.info("[chirps3-dhis2-workflow] step=%s start", name) + try: + result = fn(*args, **kwargs) + except ProcessorExecuteError: + raise + except Exception as exc: + raise ProcessorExecuteError(f"Step '{name}' failed: {exc}") from exc + LOGGER.info("[chirps3-dhis2-workflow] step=%s done", name) + return result + + +def run_process_with_trace( + trace: list[dict[str, Any]], + *, + step_name: str, + processor_cls: type[Any], + process_name: str, + data: dict[str, Any], +) -> dict[str, Any]: + """Execute an OGC processor class as a step and capture workflow trace.""" + start = time.perf_counter() + processor = processor_cls({"name": process_name}) + try: + mimetype, output = run_step(step_name, processor.execute, data, None) + except Exception as exc: + trace.append( + { + "step": step_name, + "status": "failed", + "durationMs": round((time.perf_counter() - start) * 1000.0, 2), + "error": str(exc), + } + ) + raise + + trace.append( + { + "step": step_name, + "status": "completed", + "durationMs": round((time.perf_counter() - start) * 1000.0, 2), + } + ) + if mimetype != "application/json": + raise ProcessorExecuteError(f"Step '{step_name}' returned unsupported mimetype: {mimetype}") + if not isinstance(output, dict): + raise ProcessorExecuteError(f"Step '{step_name}' returned non-object output") + return output diff --git a/src/eo_api/utils/__init__.py b/src/eo_api/utils/__init__.py new file mode 100644 index 0000000..607c149 --- /dev/null +++ b/src/eo_api/utils/__init__.py @@ -0,0 +1 @@ +"""Utility helpers for EO API.""" diff --git a/src/eo_api/utils/cache.py b/src/eo_api/utils/cache.py new file mode 100644 index 0000000..121885f --- /dev/null +++ b/src/eo_api/utils/cache.py @@ -0,0 +1,47 @@ +"""Generic cache helpers for process-level file caching.""" + +from __future__ import annotations + +import json +from datetime import datetime +from pathlib import Path +from typing import Any, Sequence, cast + + +def bbox_token(bbox: Sequence[float], precision: int = 4) -> str: + """Create a stable cache token from bbox coordinates.""" + fmt = "{:." + str(precision) + "f}" + return "_".join(fmt.format(coord).replace("-", "m").replace(".", "p") for coord in bbox) + + +def monthly_periods(start: str, end: str) -> list[str]: + """Return inclusive YYYY-MM periods between start and end (YYYY-MM).""" + start_dt = datetime.strptime(start, "%Y-%m") + end_dt = datetime.strptime(end, "%Y-%m") + periods: list[str] = [] + year, month = start_dt.year, start_dt.month + while (year, month) <= (end_dt.year, end_dt.month): + periods.append(f"{year:04d}-{month:02d}") + month += 1 + if month > 12: + month = 1 + year += 1 + return periods + + +def read_manifest(path: Path) -> dict[str, Any] | None: + """Read JSON manifest from disk; return None on parse/read errors.""" + if not path.exists(): + return None + try: + data = json.loads(path.read_text(encoding="utf-8")) + if isinstance(data, dict): + return cast(dict[str, Any], data) + return None + except (json.JSONDecodeError, OSError): + return None + + +def write_manifest(path: Path, payload: dict[str, Any]) -> None: + """Write JSON manifest to disk.""" + path.write_text(json.dumps(payload, indent=2), encoding="utf-8")