-
Notifications
You must be signed in to change notification settings - Fork 4
feat(admin-api): add sync progress endpoint #1528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
7723067 to
f21b0eb
Compare
LNSD
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Slack, I suggested using a PUSH model (instead of a POLL model) to expose the table sync progress updates to all the clients (e.g., via a Kafka broker).
This is a secondary priority. But there is value for the engine management use case.
Please review my comments.
crates/services/admin-api/src/handlers/datasets/sync_progress.rs
Outdated
Show resolved
Hide resolved
| })?; | ||
|
|
||
| // Query active tables info from metadata database (job_id, status) | ||
| let writer_infos = metadata_db::sync_progress::get_active_tables_with_writer_info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not query this information directly from the metadata_db. Actually, no direct metadata_db interactions should happen in the admin API handlers. This should be part of the amp-data-store component.
| for resolved_table in dataset.resolved_tables(partial_ref) { | ||
| let table_name = resolved_table.name().clone(); | ||
| let writer_info = writer_info_map.get(table_name.as_str()); | ||
|
|
||
| // Get the active physical table if it exists | ||
| let physical_table = | ||
| PhysicalTable::get_active(ctx.data_store.clone(), resolved_table.clone()) | ||
| .await | ||
| .map_err(|err| { | ||
| tracing::error!( | ||
| table = %table_name, | ||
| error = %err, | ||
| error_source = logging::error_source(&*err), | ||
| "failed to get active physical table" | ||
| ); | ||
| Error::PhysicalTable(err) | ||
| })?; | ||
|
|
||
| let (current_block, start_block, files_count, total_size_bytes) = | ||
| if let Some(pt) = physical_table { | ||
| // Take a snapshot to get accurate synced range | ||
| let snapshot = pt | ||
| .snapshot(false, ctx.data_store.clone()) | ||
| .await | ||
| .map_err(|err| { | ||
| tracing::error!( | ||
| table = %table_name, | ||
| error = %err, | ||
| error_source = logging::error_source(&*err), | ||
| "failed to snapshot physical table" | ||
| ); | ||
| Error::PhysicalTable(err) | ||
| })?; | ||
|
|
||
| let synced_range = snapshot.synced_range(); | ||
| let canonical_segments = snapshot.canonical_segments(); | ||
|
|
||
| let files_count = canonical_segments.len() as i64; | ||
| let total_size_bytes = canonical_segments | ||
| .iter() | ||
| .map(|s| s.object.size as i64) | ||
| .sum(); | ||
|
|
||
| let (start, end) = match synced_range { | ||
| Some(range) => ( | ||
| Some(range.start().try_into().unwrap_or(0)), | ||
| Some(range.end().try_into().unwrap_or(0)), | ||
| ), | ||
| None => (None, None), | ||
| }; | ||
|
|
||
| (end, start, files_count, total_size_bytes) | ||
| } else { | ||
| // Table hasn't been created/synced yet | ||
| (None, None, 0, 0) | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this logic seems too ad hoc to be included in the admin API handler. It belongs somewhere else in the data plane, not in the admin API.
|
In addition to a |
|
I've added an RFC documenting the proposed changes based on all the feedback. @LNSD @leoyvens @Chriswhited
|
27cb3ef to
359a8ac
Compare
This has been added. |
16f4c38 to
408b46c
Compare
LNSD
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, check my comments 🙂
I am still checking the implementation.
…p + reorg handling via canonical_chain logic
… for a specific table within a dataset
4d1d468 to
ed2b026
Compare
Summary
GET /datasets/{namespace}/{name}/versions/{revision}/sync-progressendpoint.current_block,start_block,job_status, and file stats.TableSnapshot::synced_range()withcanonical_chainlogic to accurately report sync progress, handling gaps and reorgs.Tests
Response format:
{ "dataset_namespace": "ethereum", "dataset_name": "mainnet", "revision": "0.0.0", "manifest_hash": "2dbf16e8a4d1c526e3893341d1945040d51ea1b68d1c420e402be59b0646fcfa", "tables": [ { "table_name": "blocks", "current_block": 950000, "start_block": 0, "job_id": 1, "job_status": "RUNNING", "files_count": 47, "total_size_bytes": 2147483648 } ] }