Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
17d90a7
adding download task and creating seperate download pool
HansVRP Nov 26, 2025
d22ac20
include initial unit testing
HansVRP Nov 28, 2025
7acc043
updated unit tests
HansVRP Dec 10, 2025
246ac2f
including two simple unit tests and unifying pool usage
HansVRP Dec 11, 2025
d67fdd6
changes to job manager
HansVRP Dec 11, 2025
2973bee
adding easy callback to check number of pending tasks on thread worke…
HansVRP Dec 11, 2025
0e7c4f5
process updates through job update loop
HansVRP Dec 11, 2025
8ccb442
remove folder creation logic from thread to resprect optional downloa…
HansVRP Dec 11, 2025
855a393
fix stop_job_thread
HansVRP Dec 11, 2025
e2b6ab8
working on fix for indefinete loop
HansVRP Dec 11, 2025
dc75ca8
fix infinite loop
HansVRP Dec 11, 2025
4fc299d
wrapper to abstract multiple threadpools
HansVRP Dec 15, 2025
382eae4
coupling task type to seperate pool
HansVRP Dec 15, 2025
ab9914a
include unit test for dict of pools
HansVRP Dec 15, 2025
1fce77b
tmp_path usage and renaming
HansVRP Dec 16, 2025
21992fa
fix documentation
HansVRP Dec 16, 2025
cdc3748
StacDummyBuilder.asset: "data" role by default and add "proj:" helper…
soxofaan Dec 12, 2025
97d155d
Add 502/503/504 to the default request retry list
soxofaan Dec 15, 2025
1b31b19
fixup! Add 502/503/504 to the default request retry list
soxofaan Dec 15, 2025
fc29125
Remove deprecated load_disk_collection/load_disk_data
soxofaan Dec 16, 2025
615d821
Support UDF based spatial/temporal extents in load_collection/load_st…
soxofaan Dec 2, 2025
5387f89
`MultiBackendJobManager`: don't count "created" jobs as "running"
soxofaan Dec 17, 2025
5d22482
test_start_job_thread_basic: more robust against randomness of last job
soxofaan Dec 17, 2025
e18c820
Revert 706faf and 3d3ca1 for #839/#840
soxofaan Dec 17, 2025
be579f0
MultiBackendJobManager: keep "queued" under 10 for better CDSE compat…
HansVRP Dec 17, 2025
0b4cc09
Release 0.47.0
soxofaan Dec 17, 2025
2746cf2
Bump version to 0.48.0a1
soxofaan Dec 17, 2025
b88db02
keep track of number of assets
HansVRP Dec 19, 2025
02eb78b
avoid abreviation of number
HansVRP Dec 19, 2025
1f027cd
do not expose number of remaining jobs
HansVRP Dec 19, 2025
63d742c
abstract task name in thread pool
HansVRP Dec 19, 2025
fcc0fca
not use remaing in unit test
HansVRP Dec 19, 2025
4d79b87
fix unit tests
HansVRP Dec 19, 2025
597997f
fix
HansVRP Dec 19, 2025
3234f4f
working on unit tests
HansVRP Dec 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

### Changed

### Removed

### Fixed


## [0.47.0] - 2025-12-17

### Added

- `MultiBackendJobManager`: add `download_results` option to enable/disable the automated download of job results once completed by the job manager ([#744](https://github.com/Open-EO/openeo-python-client/issues/744))
- Support UDF based spatial and temporal extents in `load_collection`, `load_stac` and `filter_temporal` ([#831](https://github.com/Open-EO/openeo-python-client/pull/831))
- `MultiBackendJobManager`: keep number of "queued" jobs below 10 for better CDSE compatibility ([#839](https://github.com/Open-EO/openeo-python-client/pull/839), eu-cdse/openeo-cdse-infra#859)

### Changed

- Internal reorganization of `openeo.extra.job_management` submodule to ease future development ([#741](https://github.com/Open-EO/openeo-python-client/issues/741))
- `openeo.Connection`: add some more HTTP error codes to the default retry list: `502 Bad Gateway`, `503 Service Unavailable` and `504 Gateway Timeout` ([#835](https://github.com/Open-EO/openeo-python-client/issues/835))

### Removed

### Fixed
- Remove `Connection.load_disk_collection` (wrapper for non-standard `load_disk_data` process), deprecated since version 0.25.0 (related to [Open-EO/openeo-geopyspark-driver#1457](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1457))


## [0.46.0] - 2025-10-31
Expand Down
2 changes: 1 addition & 1 deletion openeo/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.47.0a3"
__version__ = "0.48.0a1"
89 changes: 65 additions & 24 deletions openeo/extra/job_management/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from openeo.extra.job_management._thread_worker import (
_JobManagerWorkerThreadPool,
_JobStartTask,
_JobDownloadTask
)
from openeo.rest import OpenEoApiError
from openeo.rest.auth.auth import BearerAuth
Expand Down Expand Up @@ -60,6 +61,9 @@ class _Backend(NamedTuple):
# Maximum number of jobs to allow in parallel on a backend
parallel_jobs: int

# Maximum number of jobs to allow in queue on a backend
queueing_limit: int = 10


@dataclasses.dataclass(frozen=True)
class _ColumnProperties:
Expand Down Expand Up @@ -172,6 +176,7 @@ def start_job(

.. versionchanged:: 0.47.0
Added ``download_results`` parameter.

"""

# Expected columns in the job DB dataframes.
Expand Down Expand Up @@ -229,7 +234,13 @@ def add_backend(
parallel_jobs: int = 2,
):
"""
Register a backend with a name and a Connection getter.
Register a backend with a name and a :py:class:`Connection` getter.

.. note::
For optimal throughput and responsiveness, it is recommended
to provide a :py:class:`Connection` instance without its own (blocking) retry behavior,
since the job manager will do retries in a non-blocking way,
allowing to take care of other tasks before retrying failed requests.

:param name:
Name of the backend.
Expand All @@ -246,7 +257,8 @@ def add_backend(
c = connection
connection = lambda: c
assert callable(connection)
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs)
# TODO: expose queueing_limit?
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs, queueing_limit=10)

def _get_connection(self, backend_name: str, resilient: bool = True) -> Connection:
"""Get a connection for the backend and optionally make it resilient (adds retry behavior)
Expand Down Expand Up @@ -363,6 +375,9 @@ def run_loop():
).values()
)
> 0

or (self._worker_pool.number_pending_tasks() > 0)

and not self._stop_thread
):
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
Expand All @@ -388,7 +403,10 @@ def stop_job_thread(self, timeout_seconds: Optional[float] = _UNSET):

.. versionadded:: 0.32.0
"""
self._worker_pool.shutdown()
if self._worker_pool is not None or self._worker_pool.number_pending_tasks() > 0:
self._worker_pool.shutdown()
self._worker_pool = None


if self._thread is not None:
self._stop_thread = True
Expand Down Expand Up @@ -494,13 +512,15 @@ def run_jobs(

self._worker_pool = _JobManagerWorkerThreadPool()


while (
sum(
job_db.count_by_status(
statuses=["not_started", "created", "queued_for_start", "queued", "running"]
).values()
)
> 0
).values()) > 0

or (self._worker_pool.number_pending_tasks() > 0)

):
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
stats["run_jobs loop"] += 1
Expand All @@ -510,8 +530,10 @@ def run_jobs(
time.sleep(self.poll_sleep)
stats["sleep"] += 1

# TODO; run post process after shutdown once more to ensure completion?


self._worker_pool.shutdown()
self._worker_pool = None

return stats

Expand All @@ -534,17 +556,21 @@ def _job_update_loop(

not_started = job_db.get_by_status(statuses=["not_started"], max=200).copy()
if len(not_started) > 0:
# Check number of jobs running at each backend
# Check number of jobs queued/running at each backend
# TODO: should "created" be included in here? Calling this "running" is quite misleading then.
# apparently (see #839/#840) this seemingly simple change makes a lot of MultiBackendJobManager tests flaky
running = job_db.get_by_status(statuses=["created", "queued", "queued_for_start", "running"])
stats["job_db get_by_status"] += 1
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
queued = running[running["status"] == "queued"]
running_per_backend = running.groupby("backend_name").size().to_dict()
queued_per_backend = queued.groupby("backend_name").size().to_dict()
_log.info(f"{running_per_backend=} {queued_per_backend=}")

total_added = 0
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = self.backends[backend_name].parallel_jobs - backend_load
queue_capacity = self.backends[backend_name].queueing_limit - queued_per_backend.get(backend_name, 0)
run_capacity = self.backends[backend_name].parallel_jobs - running_per_backend.get(backend_name, 0)
to_add = min(queue_capacity, run_capacity)
if to_add > 0:
for i in not_started.index[total_added : total_added + to_add]:
self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats)
stats["job launch"] += 1
Expand All @@ -553,7 +579,9 @@ def _job_update_loop(
stats["job_db persist"] += 1
total_added += 1

self._process_threadworker_updates(self._worker_pool, job_db=job_db, stats=stats)
if self._worker_pool is not None:
self._process_threadworker_updates(worker_pool=self._worker_pool, job_db=job_db, stats=stats)


# TODO: move this back closer to the `_track_statuses` call above, once job done/error handling is also handled in threads?
for job, row in jobs_done:
Expand All @@ -565,6 +593,7 @@ def _job_update_loop(
for job, row in jobs_cancel:
self.on_job_cancel(job, row)


def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None):
"""Helper method for launching jobs

Expand Down Expand Up @@ -629,7 +658,7 @@ def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = No
df_idx=i,
)
_log.info(f"Submitting task {task} to thread pool")
self._worker_pool.submit_task(task)
self._worker_pool.submit_task(task=task, pool_name="job_start")

stats["job_queued_for_start"] += 1
df.loc[i, "status"] = "queued_for_start"
Expand Down Expand Up @@ -675,7 +704,7 @@ def _process_threadworker_updates(
:param stats: Dictionary accumulating statistic counters
"""
# Retrieve completed task results immediately
results, _ = worker_pool.process_futures(timeout=0)
results = worker_pool.process_futures(timeout=0)

# Collect update dicts
updates: List[Dict[str, Any]] = []
Expand Down Expand Up @@ -721,17 +750,28 @@ def on_job_done(self, job: BatchJob, row):
:param job: The job that has finished.
:param row: DataFrame row containing the job's metadata.
"""
# TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use?
if self._download_results:
job_metadata = job.describe()
job_dir = self.get_job_dir(job.job_id)
metadata_path = self.get_job_metadata_path(job.job_id)

job_dir = self.get_job_dir(job.job_id)
self.ensure_job_dir_exists(job.job_id)
job.get_results().download_files(target=job_dir)

with metadata_path.open("w", encoding="utf-8") as f:
json.dump(job_metadata, f, ensure_ascii=False)
# Proactively refresh bearer token (because task in thread will not be able to do that
job_con = job.connection
self._refresh_bearer_token(connection=job_con)

task = _JobDownloadTask(
job_id=job.job_id,
df_idx=row.name, #this is going to be the index in the not saterted dataframe; should not be an issue as there is no db update for download task
root_url=job_con.root_url,
bearer_token=job_con.auth.bearer if isinstance(job_con.auth, BearerAuth) else None,
download_dir=job_dir,
)
_log.info(f"Submitting download task {task} to download thread pool")

if self._worker_pool is None:
self._worker_pool = _JobManagerWorkerThreadPool()

self._worker_pool.submit_task(task=task, pool_name="job_download")

def on_job_error(self, job: BatchJob, row):
"""
Expand Down Expand Up @@ -783,6 +823,7 @@ def _cancel_prolonged_job(self, job: BatchJob, row):
except Exception as e:
_log.error(f"Unexpected error while handling job {job.job_id}: {e}")

#TODO pull this functionality away from the manager to a general utility class? job dir creation could be reused for tje Jobdownload task
def get_job_dir(self, job_id: str) -> Path:
"""Path to directory where job metadata, results and error logs are be saved."""
return self._root_dir / f"job_{job_id}"
Expand Down
Loading
Loading