Mass transfer scaling#349
Conversation
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughThis PR adds automated mass transfer worker scaling infrastructure for production environments. It introduces three new CLI commands for scaling Docker Swarm replicas, configuring cron-based schedules, and cleaning up scheduled jobs. Environment variables control scaling parameters. Development configuration is adjusted, and comprehensive integration tests validate scaling behavior with existing queued tasks and graceful shutdown semantics. ChangesMass Transfer Worker Autoscaling
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces an autoscaling mechanism for the mass transfer worker, featuring new CLI commands to configure cron-based scaling of Docker Swarm services and comprehensive tests for graceful shutdowns. Feedback suggests optimizing test execution speed by reducing sleep intervals, enhancing shell command security through improved quoting and consolidation, and restoring the --autoreload flag in the development environment. Additionally, corrections were requested for a misleading log message and a syntax error in the example environment configuration.
| running_task_seconds = 30 | ||
| graceful_timeout_seconds = 50 |
There was a problem hiding this comment.
The test uses a 30-second sleep which significantly slows down the test suite. Consider reducing these values to a few seconds (e.g., 2s and 5s) to maintain test validity while improving performance.
| running_task_seconds = 30 | |
| graceful_timeout_seconds = 50 | |
| running_task_seconds = 2 | |
| graceful_timeout_seconds = 5 |
| helper.execute_cmd( | ||
| f"docker service update --stop-grace-period {shlex.quote(grace_period)} {service_name}" | ||
| ) | ||
|
|
||
| helper.execute_cmd(f"docker service scale {service_name}={replicas}") |
There was a problem hiding this comment.
The docker service update and docker service scale commands can be combined into a single docker service update call. This is more efficient as it triggers only one service reconciliation in Docker Swarm. Additionally, it's safer to quote the service name.
| helper.execute_cmd( | |
| f"docker service update --stop-grace-period {shlex.quote(grace_period)} {service_name}" | |
| ) | |
| helper.execute_cmd(f"docker service scale {service_name}={replicas}") | |
| helper.execute_cmd( | |
| f"docker service update --stop-grace-period {shlex.quote(grace_period)} " | |
| f"--replicas {replicas} " | |
| f"{shlex.quote(service_name)}" | |
| ) |
| logs_dir = shlex.quote(str(Path(helper.root_path) / "logs")) | ||
|
|
||
| # Ensure logs directory exists | ||
| Path(helper.root_path).joinpath("logs").mkdir(exist_ok=True) | ||
|
|
||
| log_file = f"{logs_dir}/mass_transfer_worker_cron.log" |
There was a problem hiding this comment.
Quoting only the directory part of the log file path can lead to awkward shell commands. It's better to quote the entire path to the log file.
| logs_dir = shlex.quote(str(Path(helper.root_path) / "logs")) | |
| # Ensure logs directory exists | |
| Path(helper.root_path).joinpath("logs").mkdir(exist_ok=True) | |
| log_file = f"{logs_dir}/mass_transfer_worker_cron.log" | |
| logs_dir = str(Path(helper.root_path) / "logs") | |
| # Ensure logs directory exists | |
| Path(logs_dir).mkdir(exist_ok=True) | |
| log_file = shlex.quote(f"{logs_dir}/mass_transfer_worker_cron.log") |
| cron_marker_end, | ||
| ] | ||
| ) | ||
| typer.echo("Executed: {}".format(cron_block)) |
There was a problem hiding this comment.
| bash -c " | ||
| wait-for-it -s postgres.local:5432 -t 60 && | ||
| ./manage.py bg_worker -l debug -q mass_transfer --autoreload | ||
| ./manage.py bg_worker -l debug -q mass_transfer |
| MASS_TRANSFER_WORKER_REPLICAS_DOWNSCALED=1 | ||
| MASS_TRANSFER_WORKER_SCALE_UP_CRON="0 18 * * 1-5" | ||
| MASS_TRANSFER_WORKER_SCALE_DOWN_CRON="0 8 * * 1-5" | ||
| MASS_TRANSFER_WORKER_STOP_GRACE_PERIOD ="2h" |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
cli.py (1)
147-147: ⚡ Quick winUse structured logging for cron install/update events.
This operational state change is currently emitted as plain stdout. Structured fields here will make cron-triggered scale changes much easier to query and alert on. As per coding guidelines, "Use structured logging for agent actions and state transitions".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@cli.py` at line 147, Replace the plain stdout call that prints cron_block with a structured log entry: instead of typer.echo("Executed: {}".format(cron_block)) emit a structured log (e.g., logger.info or processLogger.info) including explicit fields such as event="cron_executed" (or event="cron_install" depending on context), action/cron_block=cron_block, and any relevant metadata (timestamp, source) so the cron install/update is queryable; update the caller in cli.py to use the logger symbol used elsewhere in the module and keep the human-facing typer.echo only if needed but ensure operational state changes are recorded via structured logging.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@adit/mass_transfer/tests/test_scale_mass_transfer_worker.py`:
- Around line 40-47: The helper _build_database_url_from_connection builds
DATABASE_URL with raw credentials which breaks when USER, PASSWORD or NAME
contain reserved URI characters; update it to URL-encode user, password and
database name before interpolating into the connection string (use
urllib.parse.quote_plus or urllib.parse.quote and import it) so the returned
string is a valid postgres URI; reference the function
_build_database_url_from_connection and connection.settings_dict when making the
change.
In `@cli.py`:
- Around line 128-134: Replace the hardcoded "/usr/local/bin/uv" in the cron
command strings by resolving the uv executable with shutil.which("uv") (e.g.
uv_path = shutil.which("uv")) and pass that through shlex.quote() before
building the command; then use the quoted uv_path variable in the scale_up_cmd
and scale_down_cmd f-strings (which also use project_root, log_file,
up_replicas, down_replicas) so the cron entries call the actual installed uv
binary safely; if shutil.which returns None, decide on a fallback (e.g. raise or
use "uv") before quoting.
- Around line 76-80: Current code calls helper.execute_cmd twice
(helper.execute_cmd(... stop-grace-period ...) and helper.execute_cmd(... scale
...)) causing two separate Swarm updates; change it to a single atomic update by
invoking helper.execute_cmd once with docker service update including both
--stop-grace-period and --replicas flags (use shlex.quote on grace_period and
replicas and include service_name) so the stop grace period and replica count
are applied in the same spec update.
In `@docker-compose.dev.yml`:
- Around line 72-76: The dev docker-compose command for the mass_transfer worker
should include the autoreload flag so it reloads updated code from
develop.watch; update the command that runs "./manage.py bg_worker -l debug -q
mass_transfer" to add "--autoreload" (preserving the existing wait-for-it usage
and options) so the bg_worker process restarts on file changes during
development.
---
Nitpick comments:
In `@cli.py`:
- Line 147: Replace the plain stdout call that prints cron_block with a
structured log entry: instead of typer.echo("Executed: {}".format(cron_block))
emit a structured log (e.g., logger.info or processLogger.info) including
explicit fields such as event="cron_executed" (or event="cron_install" depending
on context), action/cron_block=cron_block, and any relevant metadata (timestamp,
source) so the cron install/update is queryable; update the caller in cli.py to
use the logger symbol used elsewhere in the module and keep the human-facing
typer.echo only if needed but ensure operational state changes are recorded via
structured logging.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 948e7d57-cfdd-4749-b8f8-91f55d75b000
📒 Files selected for processing (5)
.gitignoreadit/mass_transfer/tests/test_scale_mass_transfer_worker.pycli.pydocker-compose.dev.ymlexample.env
| def _build_database_url_from_connection() -> str: | ||
| db_settings = connection.settings_dict | ||
| user = db_settings.get("USER") or "" | ||
| password = db_settings.get("PASSWORD") or "" | ||
| host = db_settings.get("HOST") or "localhost" | ||
| port = db_settings.get("PORT") or "5432" | ||
| name = db_settings["NAME"] | ||
| return f"postgres://{user}:{password}@{host}:{port}/{name}" |
There was a problem hiding this comment.
URL-encode the database credentials in DATABASE_URL.
Any reserved character in the username, password, or database name will produce an invalid URI and make this test fail on CI environments with non-trivial credentials.
Suggested fix
+from urllib.parse import quote
+
...
def _build_database_url_from_connection() -> str:
db_settings = connection.settings_dict
user = db_settings.get("USER") or ""
password = db_settings.get("PASSWORD") or ""
host = db_settings.get("HOST") or "localhost"
port = db_settings.get("PORT") or "5432"
name = db_settings["NAME"]
- return f"postgres://{user}:{password}@{host}:{port}/{name}"
+ credentials = ""
+ if user:
+ credentials = quote(user, safe="")
+ if password:
+ credentials += f":{quote(password, safe='')}"
+ credentials += "@"
+ return f"postgres://{credentials}{host}:{port}/{quote(name, safe='')}"🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@adit/mass_transfer/tests/test_scale_mass_transfer_worker.py` around lines 40
- 47, The helper _build_database_url_from_connection builds DATABASE_URL with
raw credentials which breaks when USER, PASSWORD or NAME contain reserved URI
characters; update it to URL-encode user, password and database name before
interpolating into the connection string (use urllib.parse.quote_plus or
urllib.parse.quote and import it) so the returned string is a valid postgres
URI; reference the function _build_database_url_from_connection and
connection.settings_dict when making the change.
| helper.execute_cmd( | ||
| f"docker service update --stop-grace-period {shlex.quote(grace_period)} {service_name}" | ||
| ) | ||
|
|
||
| helper.execute_cmd(f"docker service scale {service_name}={replicas}") |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and read the cli.py file
find . -name "cli.py" -type f | head -20Repository: openradx/adit
Length of output: 66
🏁 Script executed:
# Read the specific lines mentioned in the review comment
# Let's get lines 70-85 for context
sed -n '70,85p' cli.py | cat -nRepository: openradx/adit
Length of output: 647
🏁 Script executed:
# Get the full function context to understand the code better
sed -n '60,95p' cli.py | cat -nRepository: openradx/adit
Length of output: 1478
🌐 Web query:
docker service update --detach flag atomic operations race condition
💡 Result:
Docker Swarm service updates using docker service update --detach (or -d) are atomic at the service specification level due to the Raft consensus algorithm used by Swarm managers to replicate the global cluster state consistently across nodes [1]. Each update submits a new service spec version; if accepted by the Raft quorum, it becomes the desired state, ensuring no partial spec application [2][3]. Concurrent updates from multiple clients can result in "update out of sequence" errors if a client uses an outdated version index, preventing conflicting changes [2][3]. The --detach flag only affects client-side behavior: it exits immediately without waiting for task convergence, while --detach=false (default) waits [4][5]. This does not introduce races in the update itself, as the spec change remains atomic. Rolling updates to tasks are controlled separately (e.g., parallelism, delay) and may pause/rollback on failures but follow the committed spec [6][7][8]. No sources indicate races specifically tied to --detach; concurrency issues stem from rapid successive updates, not the flag [9][2].
Citations:
- 1: https://docs.docker.com/engine/swarm/raft
- 2: Service update may fail with "update out of sequence" error moby/moby#30794
- 3: Rolling update causes "out of sequence" errors for the client. moby/swarmkit#1379
- 4: https://docs.docker.com/reference/cli/docker/service/update/
- 5: docker service update with --detach=false hangs on services with 0 tasks docker/cli#627
- 6: https://docs.docker.com/engine/swarm/swarm-tutorial/rolling-update/
- 7: https://docs.docker.com/reference/cli/docker/service/update
- 8: Partial service --image update with start-first && parallelism on registry timeout moby/moby#44069
- 9: https://stackoverflow.com/questions/53748944/how-to-update-multiple-services-at-the-same-time-in-docker-swarm
Combine grace-period and replica updates into a single atomic operation.
These are two separate Swarm updates. While Docker Swarm makes each update atomic via Raft consensus, combining them into one operation ensures the grace-period and replica changes are applied in the same spec version. This prevents intermediate states where the grace period has been updated but scaling hasn't occurred yet, which could cause in-flight transfers to be interrupted if scaling proceeds before the new grace period is fully reconciled.
Suggested fix
- helper.execute_cmd(
- f"docker service update --stop-grace-period {shlex.quote(grace_period)} {service_name}"
- )
-
- helper.execute_cmd(f"docker service scale {service_name}={replicas}")
+ helper.execute_cmd(
+ "docker service update "
+ f"--detach=false "
+ f"--stop-grace-period {shlex.quote(grace_period)} "
+ f"--replicas {replicas} "
+ f"{service_name}"
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| helper.execute_cmd( | |
| f"docker service update --stop-grace-period {shlex.quote(grace_period)} {service_name}" | |
| ) | |
| helper.execute_cmd(f"docker service scale {service_name}={replicas}") | |
| helper.execute_cmd( | |
| "docker service update " | |
| f"--detach=false " | |
| f"--stop-grace-period {shlex.quote(grace_period)} " | |
| f"--replicas {replicas} " | |
| f"{service_name}" | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@cli.py` around lines 76 - 80, Current code calls helper.execute_cmd twice
(helper.execute_cmd(... stop-grace-period ...) and helper.execute_cmd(... scale
...)) causing two separate Swarm updates; change it to a single atomic update by
invoking helper.execute_cmd once with docker service update including both
--stop-grace-period and --replicas flags (use shlex.quote on grace_period and
replicas and include service_name) so the stop grace period and replica count
are applied in the same spec update.
| scale_up_cmd = ( | ||
| f"cd {project_root} && /usr/local/bin/uv run cli scale-mass-transfer-worker {up_replicas}" | ||
| f" >> {log_file} 2>&1" | ||
| ) | ||
| scale_down_cmd = ( | ||
| f"cd {project_root} && /usr/local/bin/uv run cli scale-mass-transfer-worker {down_replicas}" | ||
| f" >> {log_file} 2>&1" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd /repo && find . -name "cli.py" -type f | head -20Repository: openradx/adit
Length of output: 113
🏁 Script executed:
pwd && ls -la && find . -name "cli.py" -type f 2>/dev/null | head -20Repository: openradx/adit
Length of output: 3055
🏁 Script executed:
wc -l cli.py && head -20 cli.py && sed -n '120,145p' cli.pyRepository: openradx/adit
Length of output: 1487
🏁 Script executed:
rg '/usr/local/bin' cli.py && rg -n 'shutil.which|shlex.quote' cli.pyRepository: openradx/adit
Length of output: 489
🏁 Script executed:
sed -n '100,150p' cli.pyRepository: openradx/adit
Length of output: 2045
Don't hardcode /usr/local/bin/uv into the crontab.
uv is not guaranteed to be at that location on every production host. When installed elsewhere, the cron entries are written successfully but autoscaling never executes. Use shutil.which("uv") to locate the executable and shlex.quote() to safely pass it to the shell command. The necessary imports (shutil and shlex) are already available in this file.
Suggested fix
+ uv_bin = shutil.which("uv")
+ if not uv_bin:
+ typer.echo("uv executable not found in PATH")
+ raise typer.Exit(code=1)
+
scale_up_cmd = (
- f"cd {project_root} && /usr/local/bin/uv run cli scale-mass-transfer-worker {up_replicas}"
+ f"cd {project_root} && {shlex.quote(uv_bin)} run cli scale-mass-transfer-worker {up_replicas}"
f" >> {log_file} 2>&1"
)
scale_down_cmd = (
- f"cd {project_root} && /usr/local/bin/uv run cli scale-mass-transfer-worker {down_replicas}"
+ f"cd {project_root} && {shlex.quote(uv_bin)} run cli scale-mass-transfer-worker {down_replicas}"
f" >> {log_file} 2>&1"
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| scale_up_cmd = ( | |
| f"cd {project_root} && /usr/local/bin/uv run cli scale-mass-transfer-worker {up_replicas}" | |
| f" >> {log_file} 2>&1" | |
| ) | |
| scale_down_cmd = ( | |
| f"cd {project_root} && /usr/local/bin/uv run cli scale-mass-transfer-worker {down_replicas}" | |
| f" >> {log_file} 2>&1" | |
| uv_bin = shutil.which("uv") | |
| if not uv_bin: | |
| typer.echo("uv executable not found in PATH") | |
| raise typer.Exit(code=1) | |
| scale_up_cmd = ( | |
| f"cd {project_root} && {shlex.quote(uv_bin)} run cli scale-mass-transfer-worker {up_replicas}" | |
| f" >> {log_file} 2>&1" | |
| ) | |
| scale_down_cmd = ( | |
| f"cd {project_root} && {shlex.quote(uv_bin)} run cli scale-mass-transfer-worker {down_replicas}" | |
| f" >> {log_file} 2>&1" | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@cli.py` around lines 128 - 134, Replace the hardcoded "/usr/local/bin/uv" in
the cron command strings by resolving the uv executable with shutil.which("uv")
(e.g. uv_path = shutil.which("uv")) and pass that through shlex.quote() before
building the command; then use the quoted uv_path variable in the scale_up_cmd
and scale_down_cmd f-strings (which also use project_root, log_file,
up_replicas, down_replicas) so the cron entries call the actual installed uv
binary safely; if shutil.which returns None, decide on a fallback (e.g. raise or
use "uv") before quoting.
| command: > | ||
| bash -c " | ||
| wait-for-it -s postgres.local:5432 -t 60 && | ||
| ./manage.py bg_worker -l debug -q mass_transfer --autoreload | ||
| ./manage.py bg_worker -l debug -q mass_transfer | ||
| " |
There was a problem hiding this comment.
Restore --autoreload for the dev mass transfer worker.
develop.watch only syncs files into the container. Without autoreload, this worker keeps running stale code until the container is restarted, unlike the other dev workers.
Suggested fix
command: >
bash -c "
wait-for-it -s postgres.local:5432 -t 60 &&
- ./manage.py bg_worker -l debug -q mass_transfer
+ ./manage.py bg_worker -l debug -q mass_transfer --autoreload
"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| command: > | |
| bash -c " | |
| wait-for-it -s postgres.local:5432 -t 60 && | |
| ./manage.py bg_worker -l debug -q mass_transfer --autoreload | |
| ./manage.py bg_worker -l debug -q mass_transfer | |
| " | |
| command: > | |
| bash -c " | |
| wait-for-it -s postgres.local:5432 -t 60 && | |
| ./manage.py bg_worker -l debug -q mass_transfer --autoreload | |
| " |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docker-compose.dev.yml` around lines 72 - 76, The dev docker-compose command
for the mass_transfer worker should include the autoreload flag so it reloads
updated code from develop.watch; update the command that runs "./manage.py
bg_worker -l debug -q mass_transfer" to add "--autoreload" (preserving the
existing wait-for-it usage and options) so the bg_worker process restarts on
file changes during development.
Summary by CodeRabbit
New Features
scale-mass-transfer-worker,configure-mass-transfer-worker-cron, andremove-mass-transfer-worker-cron.Tests
Chores