Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions docs/ogcapi.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,10 @@ OGC API - Processes exposes server-side processing tasks. Each process defines t
| Zonal statistics | `zonal-statistics` | Compute zonal stats from GeoJSON features and a raster source |
| ERA5-Land | `era5-land-download` | Download ERA5-Land hourly climate data (temperature, precipitation, etc.) |
| CHIRPS3 | `chirps3-download` | Download CHIRPS3 daily precipitation data |
| CHIRPS3 -> DHIS2 pipeline | `chirps3-dhis2-pipeline` | Fetch features, download CHIRPS3, aggregate by feature, generate DHIS2 dataValueSet (optional auto-import) |
| Feature fetch | `feature-fetch` | Normalize features from inline GeoJSON or DHIS2 selectors |
| Data aggregate | `data-aggregate` | Aggregate downloaded raster data over workflow features |
| DHIS2 dataValue build | `dhis2-datavalue-build` | Build DHIS2 `dataValueSet` and table output from aggregated rows |
| CHIRPS3 -> DHIS2 workflow | `chirps3-dhis2-workflow` | Process-first assembly: feature-fetch -> chirps3-download -> data-aggregate -> dhis2-datavalue-build |

### Endpoints

Expand All @@ -231,7 +234,7 @@ OGC API - Processes exposes server-side processing tasks. Each process defines t
| `bbox` | array[number] | yes | Bounding box `[west, south, east, north]` |
| `dry_run` | boolean | no | If true (default), return data without pushing to DHIS2 |

Note: `chirps3-dhis2-pipeline` has its own contract (`start_date`, `end_date`, feature-source selectors, and output options).
Note: `chirps3-dhis2-workflow` uses the `start_date` / `end_date` contract.

### ERA5-Land (`era5-land-download`)

Expand Down Expand Up @@ -265,9 +268,10 @@ Downloads CHIRPS3 daily precipitation data.

Additional inputs:

| Input | Type | Required | Default | Description |
| ------- | ------ | -------- | --------- | -------------------------------------- |
| `stage` | string | no | `"final"` | Product stage: `"final"` or `"prelim"` |
| Input | Type | Required | Default | Description |
| -------- | ------ | -------- | --------- | -------------------------------------------------------- |
| `stage` | string | no | `"final"` | Product stage: `"final"` or `"prelim"` |
| `flavor` | string | no | `"rnl"` | Product flavor: `"rnl"` or `"sat"` (`prelim` -> `sat`) |

Example request:

Expand All @@ -280,6 +284,7 @@ curl -X POST http://localhost:8000/ogcapi/processes/chirps3-download/execution \
"end": "2024-03",
"bbox": [32.0, -2.0, 35.0, 1.0],
"stage": "final",
"flavor": "rnl",
"dry_run": true
}
}'
Expand Down Expand Up @@ -368,19 +373,19 @@ All processes return a JSON object with:
}
```

### CHIRPS3 to DHIS2 pipeline (`chirps3-dhis2-pipeline`)
### CHIRPS3 to DHIS2 workflow (`chirps3-dhis2-workflow`)

Runs four steps in one execution:
Runs process-first orchestration in one execution:

1. Get features (from DHIS2 or provided GeoJSON)
2. Fetch CHIRPS3 data for union bbox
3. Process dataset (spatial + temporal aggregation)
4. Generate DHIS2 `dataValueSet` payload (optional auto-import)
1. `feature-fetch`
2. `chirps3-download`
3. `data-aggregate`
4. `dhis2-datavalue-build`

Example request using DHIS2 org units as source features:

```bash
curl -X POST http://localhost:8000/ogcapi/processes/chirps3-dhis2-pipeline/execution \
curl -X POST http://localhost:8000/ogcapi/processes/chirps3-dhis2-workflow/execution \
-H "Content-Type: application/json" \
-d '{
"inputs": {
Expand All @@ -392,6 +397,7 @@ curl -X POST http://localhost:8000/ogcapi/processes/chirps3-dhis2-pipeline/execu
"temporal_reducer": "sum",
"spatial_reducer": "mean",
"stage": "final",
"flavor": "rnl",
"dry_run": true,
"auto_import": false
}
Expand All @@ -402,14 +408,16 @@ The response includes:

- `files`: downloaded CHIRPS3 monthly files
- `dataValueSet`: DHIS2-compatible payload (`dataValues` array)
- `importResponse`: populated only when `auto_import=true` and `dry_run=false`
- `dataValueTable`: table-friendly rows/columns
- `workflowTrace`: per-step status and duration

Notes:

- `parent_org_unit` is optional. For large DHIS2 instances, prefer `parent_org_unit` + `org_unit_level` (or explicit `org_unit_ids`) to avoid fetching very large feature sets.
- `org_unit_level` alone runs across the full level by default.
- `category_option_combo` and `attribute_option_combo` are optional. If omitted, they are not sent in `dataValues`, allowing DHIS2 defaults where supported.
- `temporal_resolution` supports `daily`, `weekly`, and `monthly`.
- `flavor` supports `rnl` and `sat`. If `stage` is `prelim`, `flavor` must be `sat`.
- DHIS2 timeout/retry behavior is configured globally via adapter env vars (`DHIS2_HTTP_TIMEOUT_SECONDS`, `DHIS2_HTTP_RETRIES`).

## Async execution and job management
Expand Down
19 changes: 17 additions & 2 deletions pygeoapi-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,22 @@ resources:
processor:
name: eo_api.routers.ogcapi.plugins.processes.chirps3.CHIRPS3Processor

chirps3-dhis2-pipeline:
chirps3-dhis2-workflow:
type: process
processor:
name: eo_api.routers.ogcapi.plugins.processes.chirps3_dhis2_pipeline.CHIRPS3DHIS2PipelineProcessor
name: eo_api.routers.ogcapi.plugins.processes.chirps3_workflow.Chirps3WorkflowProcessor

feature-fetch:
type: process
processor:
name: eo_api.routers.ogcapi.plugins.processes.feature_fetch.FeatureFetchProcessor

data-aggregate:
type: process
processor:
name: eo_api.routers.ogcapi.plugins.processes.data_aggregate.DataAggregateProcessor

dhis2-datavalue-build:
type: process
processor:
name: eo_api.routers.ogcapi.plugins.processes.datavalue_build.DataValueBuildProcessor
70 changes: 60 additions & 10 deletions src/eo_api/routers/ogcapi/plugins/processes/chirps3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError

from eo_api.routers.ogcapi.plugins.processes.schemas import CHIRPS3Input, ProcessOutput
from eo_api.utils.cache import bbox_token, monthly_periods, read_manifest, write_manifest

DOWNLOAD_DIR = os.getenv("DOWNLOAD_DIR", "/tmp/data")

LOGGER = logging.getLogger(__name__)


PROCESS_METADATA = {
"version": "0.1.0",
"id": "chirps3",
Expand Down Expand Up @@ -51,6 +53,13 @@
"minOccurs": 0,
"maxOccurs": 1,
},
"flavor": {
"title": "Product flavor",
"description": "CHIRPS3 flavor: 'rnl' or 'sat' (prelim requires 'sat')",
"schema": {"type": "string", "enum": ["rnl", "sat"], "default": "rnl"},
"minOccurs": 0,
"maxOccurs": 1,
},
"dry_run": {
"title": "Dry run",
"description": "If true, return data without pushing to DHIS2",
Expand Down Expand Up @@ -81,32 +90,73 @@ def execute(self, data: dict[str, Any], outputs: Any = None) -> tuple[str, dict[
raise ProcessorExecuteError(str(e)) from e

LOGGER.info(
"CHIRPS3 download: start=%s end=%s bbox=%s stage=%s",
"CHIRPS3 download: start=%s end=%s bbox=%s stage=%s flavor=%s",
inputs.start,
inputs.end,
inputs.bbox,
inputs.stage,
inputs.flavor,
)

download_dir = Path(DOWNLOAD_DIR)
root_dir = Path(DOWNLOAD_DIR) / "chirps3_cache"
scope_key = f"{inputs.stage}_{inputs.flavor}_{bbox_token(inputs.bbox)}"
download_dir = root_dir / scope_key
download_dir.mkdir(parents=True, exist_ok=True)
prefix = f"chirps3_{scope_key}"
manifest_path = download_dir / "manifest.json"
requested_months = monthly_periods(inputs.start, inputs.end)
expected_files = [download_dir / f"{prefix}_{month}.nc" for month in requested_months]
existing_before = {str(path) for path in expected_files if path.exists()}

files = [
str(path)
for path in chirps3_daily.download(
start=inputs.start,
end=inputs.end,
bbox=(inputs.bbox[0], inputs.bbox[1], inputs.bbox[2], inputs.bbox[3]),
dirname=str(download_dir),
prefix=prefix,
stage=inputs.stage,
flavor=inputs.flavor,
)
]
downloaded_now = [path for path in files if path not in existing_before]
cache_hit = len(downloaded_now) == 0

files = chirps3_daily.download(
start=inputs.start,
end=inputs.end,
bbox=(inputs.bbox[0], inputs.bbox[1], inputs.bbox[2], inputs.bbox[3]),
dirname=str(download_dir),
prefix="chirps3",
stage=inputs.stage,
manifest = read_manifest(manifest_path) or {}
manifest.update(
{
"dataset": "chirps3",
"scope_key": scope_key,
"bbox": [float(v) for v in inputs.bbox],
"stage": inputs.stage,
"flavor": inputs.flavor,
"last_start": inputs.start,
"last_end": inputs.end,
}
)
write_manifest(manifest_path, manifest)

LOGGER.info(
"CHIRPS3 cache %s: reused=%s downloaded=%s dir=%s",
"hit" if cache_hit else "delta",
len(files) - len(downloaded_now),
len(downloaded_now),
download_dir,
)
output = ProcessOutput(
status="completed",
files=[str(f) for f in files],
files=files,
summary={
"file_count": len(files),
"stage": inputs.stage,
"flavor": inputs.flavor,
"start": inputs.start,
"end": inputs.end,
"cache_hit": cache_hit,
"cache_key": scope_key,
"cache_downloaded_delta_count": len(downloaded_now),
"cache_reused_count": len(files) - len(downloaded_now),
},
message="Data downloaded" + (" (dry run)" if inputs.dry_run else ""),
)
Expand Down
Loading