Add layered MergePipeline for multi-source entity discovery#246
Add layered MergePipeline for multi-source entity discovery#246
Conversation
There was a problem hiding this comment.
Pull request overview
Introduces a formal, layered discovery merge pipeline for HYBRID discovery mode, replacing the prior ad-hoc hybrid merging/linking approach and exposing pipeline diagnostics via the /health endpoint.
Changes:
- Added
DiscoveryLayer+ concrete layers (ManifestLayer,RuntimeLayer,PluginLayer) and aMergePipelinewith per-field-group merge policies, conflict/collision detection, and gap-fill filtering. - Updated HYBRID discovery to cache merged results and to run a post-merge
RuntimeLinkerstep with deterministic namespace/topic matching and additional conflict reporting. - Exposed merge-pipeline diagnostics in
/api/v1/healthand added extensive unit tests for the pipeline and linker behavior.
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/ros2_medkit_gateway/test/test_runtime_linker.cpp | Adds deterministic namespace/topic matching and conflict scenario tests for RuntimeLinker. |
| src/ros2_medkit_gateway/test/test_merge_pipeline.cpp | New unit tests covering merge policies, layers, gap-fill config, plugin layer behavior, and post-merge linking. |
| src/ros2_medkit_gateway/src/plugins/plugin_manager.cpp | Adds API to return introspection providers along with their plugin names. |
| src/ros2_medkit_gateway/src/http/handlers/health_handlers.cpp | Adds discovery mode/strategy and merge-pipeline report summary to /health. |
| src/ros2_medkit_gateway/src/gateway_node.cpp | Adds merge-pipeline gap-fill params; registers introspection plugins as pipeline layers in HYBRID; adjusts cache refresh flow. |
| src/ros2_medkit_gateway/src/discovery/merge_pipeline.cpp | Implements merge pipeline execution, per-field-group merges, conflicts, ID collision logging, and post-merge app-to-node linking. |
| src/ros2_medkit_gateway/src/discovery/manifest/runtime_linker.cpp | Adds path-segment-boundary matching, deterministic candidate selection, and binding conflict counters/warnings. |
| src/ros2_medkit_gateway/src/discovery/layers/runtime_layer.cpp | New runtime discovery layer with gap-fill filtering and default merge policies. |
| src/ros2_medkit_gateway/src/discovery/layers/plugin_layer.cpp | New plugin discovery layer wrapping IntrospectionProvider output. |
| src/ros2_medkit_gateway/src/discovery/layers/manifest_layer.cpp | New manifest discovery layer wrapping ManifestManager. |
| src/ros2_medkit_gateway/src/discovery/hybrid_discovery.cpp | Refactors HYBRID discovery to use and cache MergePipeline output. |
| src/ros2_medkit_gateway/src/discovery/discovery_manager.cpp | Constructs pipeline in HYBRID mode, refreshes pipeline on topic-map refresh, supports adding plugin layers, and exposes merge reports. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/plugins/plugin_manager.hpp | Declares the new named introspection provider API. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/merge_types.hpp | Adds merge policy/types, merge report structure, and gap-fill config. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/merge_pipeline.hpp | Declares the MergePipeline interface and merge execution result. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/manifest/runtime_linker.hpp | Extends linking result stats/summary for conflicts and wildcard multi-match. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/layers/runtime_layer.hpp | Declares the runtime discovery layer wrapper and gap-fill filtering support. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/layers/plugin_layer.hpp | Declares the plugin discovery layer wrapper for introspection providers. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/layers/manifest_layer.hpp | Declares the manifest discovery layer wrapper. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/hybrid_discovery.hpp | Updates HYBRID strategy interface to pipeline-based caching and reporting. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/discovery_manager.hpp | Adds merge-pipeline config, plugin-layer API, and merge-report accessors. |
| src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/discovery_layer.hpp | Introduces the DiscoveryLayer interface and LayerOutput struct. |
| src/ros2_medkit_gateway/config/gateway_params.yaml | Documents new merge-pipeline gap-fill configuration parameters. |
| src/ros2_medkit_gateway/CMakeLists.txt | Adds new pipeline/layer sources and new test_merge_pipeline target. |
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/merge_types.hpp
Outdated
Show resolved
Hide resolved
5484fe5 to
8b47dd7
Compare
| /// Path-segment-boundary namespace match: "/nav" matches "/nav" and "/nav/sub" but NOT "/navigation" | ||
| bool namespace_matches(const std::string & actual_ns, const std::string & expected_ns) { | ||
| if (actual_ns == expected_ns) { | ||
| return true; | ||
| } | ||
| if (actual_ns.size() > expected_ns.size() && actual_ns.compare(0, expected_ns.size(), expected_ns) == 0 && | ||
| actual_ns[expected_ns.size()] == '/') { | ||
| return true; | ||
| } | ||
| return false; |
There was a problem hiding this comment.
This change updates namespace matching to allow path-segment-boundary prefixes (e.g., expected "/nav" matches actual "/nav/sub"). The docs in docs/config/manifest-schema.rst and the manifest discovery tutorial currently describe namespace matching as “exact match”. Please update those docs to reflect the new matching semantics, otherwise users may be surprised by bindings matching deeper namespaces.
| // Start with highest-priority layer's entity as base | ||
| Entity merged = std::move(entries[0].entity); | ||
| size_t owner_layer_idx = entries[0].layer_idx; | ||
| report.entity_source[id] = layers_[owner_layer_idx]->name(); | ||
|
|
There was a problem hiding this comment.
In merge_entities(), the merge policies for subsequent layers are always evaluated against layers_[owner_layer_idx] (the first/initial layer for this entity). If a lower-priority layer wins a field group due to a higher MergePolicy, the code does not update the “owner” for that field group, so a third (or later) layer will be compared against the wrong target policy. This can lead to incorrect precedence/overrides and inaccurate conflict reporting in multi-layer pipelines. Consider tracking the effective/winning layer per FieldGroup (or recomputing target_policy from the current winner) and updating it when SOURCE wins, so later merges use the correct target policy.
docs/config/discovery-options.rst
Outdated
|
|
||
| **Built-in layer policies:** | ||
|
|
||
| - **ManifestLayer** (priority 1): All field groups AUTHORITATIVE |
There was a problem hiding this comment.
This list states that ManifestLayer uses AUTHORITATIVE for all field groups, but the actual defaults in ManifestLayer are LIVE_DATA=ENRICHMENT and STATUS=FALLBACK (with IDENTITY/HIERARCHY/METADATA authoritative). Please align the documentation with the implemented default policies (or adjust the code if the doc is intended).
| - **ManifestLayer** (priority 1): All field groups AUTHORITATIVE | |
| - **ManifestLayer** (priority 1): IDENTITY/HIERARCHY/METADATA are AUTHORITATIVE, LIVE_DATA is ENRICHMENT, STATUS is FALLBACK |
|
|
||
| **Built-in Layers:** | ||
|
|
||
| - ``ManifestLayer`` - Wraps ManifestManager; all fields AUTHORITATIVE (manifest is source of truth) |
There was a problem hiding this comment.
This section says ManifestLayer is AUTHORITATIVE for all fields, but the implemented defaults are LIVE_DATA=ENRICHMENT and STATUS=FALLBACK. Please update this design doc to match the real merge policy defaults to avoid confusion when debugging merge results.
| - ``ManifestLayer`` - Wraps ManifestManager; all fields AUTHORITATIVE (manifest is source of truth) | |
| - ``ManifestLayer`` - Wraps ManifestManager; defaults to AUTHORITATIVE for all field groups except | |
| ``LIVE_DATA`` (ENRICHMENT) and ``STATUS`` (FALLBACK), with the manifest remaining the primary source of truth |
| # Per-layer policy overrides (optional) | ||
| # Defaults: manifest=AUTH for identity/hierarchy, runtime=AUTH for live_data/status |
There was a problem hiding this comment.
gateway_params.yaml documents per-layer merge policy overrides under discovery.merge_pipeline.layers, but this PR only declares/reads gap_fill parameters (no layer policy override params are wired). This is likely to confuse users since it implies a supported configuration. Either implement the policy override parameters in GatewayNode/DiscoveryManager or reword this block as a future/unsupported feature.
| # Per-layer policy overrides (optional) | |
| # Defaults: manifest=AUTH for identity/hierarchy, runtime=AUTH for live_data/status | |
| # Per-layer policy overrides (FUTURE / UNSUPPORTED FEATURE) | |
| # NOTE: The Gateway currently only reads merge_pipeline.gap_fill parameters. | |
| # Any "layers" configuration is ignored in the current implementation | |
| # and is reserved for a future version of the merge pipeline. | |
| # Do NOT rely on these options for behavior in this release. | |
| # | |
| # Intended defaults (subject to change when implemented): | |
| # - manifest = authoritative for identity/hierarchy | |
| # - runtime = authoritative for live_data/status | |
| # | |
| # Example (NOT YET SUPPORTED): |
| std::lock_guard<std::mutex> lock(mutex_); | ||
| cached_result_ = pipeline_.execute(); | ||
| if (node_) { | ||
| RCLCPP_INFO(node_->get_logger(), "Hybrid discovery refreshed: %zu entities", cached_result_.report.total_entities); | ||
| } |
There was a problem hiding this comment.
HybridDiscoveryStrategy::refresh() holds mutex_ while executing pipeline_.execute(). Pipeline execution can involve ROS graph introspection and plugin work and may take a non-trivial amount of time, blocking concurrent discover_* calls (and /health merge report reads). Consider executing the pipeline outside the lock and then swapping cached_result_ under the mutex to reduce contention while still keeping results consistent.
| std::lock_guard<std::mutex> lock(mutex_); | |
| cached_result_ = pipeline_.execute(); | |
| if (node_) { | |
| RCLCPP_INFO(node_->get_logger(), "Hybrid discovery refreshed: %zu entities", cached_result_.report.total_entities); | |
| } | |
| // Run the potentially expensive pipeline execution without holding the mutex | |
| auto new_result = pipeline_.execute(); | |
| if (node_) { | |
| RCLCPP_INFO( | |
| node_->get_logger(), "Hybrid discovery refreshed: %zu entities", | |
| new_result.report.total_entities); | |
| } | |
| // Update the cached result under the mutex to keep readers consistent | |
| std::lock_guard<std::mutex> lock(mutex_); | |
| cached_result_ = std::move(new_result); |
| // Merge with each subsequent (lower-priority) layer | ||
| for (size_t i = 1; i < entries.size(); i++) { | ||
| size_t source_layer_idx = entries[i].layer_idx; | ||
| report.enriched_count++; | ||
|
|
||
| for (auto fg : ALL_FIELD_GROUPS) { | ||
| auto target_policy = layers_[owner_layer_idx]->policy_for(fg); | ||
| auto source_policy = layers_[source_layer_idx]->policy_for(fg); | ||
| auto res = resolve_policies(target_policy, source_policy); | ||
|
|
||
| if (res.is_conflict) { | ||
| report.conflicts.push_back({id, fg, layers_[owner_layer_idx]->name(), layers_[source_layer_idx]->name()}); | ||
| report.conflict_count++; | ||
| } | ||
|
|
||
| apply_field_group_merge(merged, entries[i].entity, fg, res); |
There was a problem hiding this comment.
merge_entities() keeps owner_layer_idx fixed to the first layer that introduced the entity. If a later layer wins a field group (e.g., owner FALLBACK but source AUTHORITATIVE), subsequent merges still compare against the original owner's policy, which can let an even-lower-priority layer override an already-winning AUTHORITATIVE value and also mis-report conflicts. Consider tracking the current “owning layer” per FieldGroup (or performing pairwise merges in layer-priority order for each FieldGroup) so later merges compare against the policy of the layer that currently owns that field group’s value.
| // Merge with each subsequent (lower-priority) layer | |
| for (size_t i = 1; i < entries.size(); i++) { | |
| size_t source_layer_idx = entries[i].layer_idx; | |
| report.enriched_count++; | |
| for (auto fg : ALL_FIELD_GROUPS) { | |
| auto target_policy = layers_[owner_layer_idx]->policy_for(fg); | |
| auto source_policy = layers_[source_layer_idx]->policy_for(fg); | |
| auto res = resolve_policies(target_policy, source_policy); | |
| if (res.is_conflict) { | |
| report.conflicts.push_back({id, fg, layers_[owner_layer_idx]->name(), layers_[source_layer_idx]->name()}); | |
| report.conflict_count++; | |
| } | |
| apply_field_group_merge(merged, entries[i].entity, fg, res); | |
| // Track current owning layer per field group as we merge | |
| std::vector<size_t> owner_layer_idx_per_fg; | |
| owner_layer_idx_per_fg.resize(ALL_FIELD_GROUPS.size(), owner_layer_idx); | |
| // Merge with each subsequent (lower-priority) layer | |
| for (size_t i = 1; i < entries.size(); i++) { | |
| size_t source_layer_idx = entries[i].layer_idx; | |
| report.enriched_count++; | |
| for (size_t fg_index = 0; fg_index < ALL_FIELD_GROUPS.size(); ++fg_index) { | |
| auto fg = ALL_FIELD_GROUPS[fg_index]; | |
| size_t current_owner_idx = owner_layer_idx_per_fg[fg_index]; | |
| auto target_policy = layers_[current_owner_idx]->policy_for(fg); | |
| auto source_policy = layers_[source_layer_idx]->policy_for(fg); | |
| auto res = resolve_policies(target_policy, source_policy); | |
| if (res.is_conflict) { | |
| report.conflicts.push_back({id, fg, layers_[current_owner_idx]->name(), layers_[source_layer_idx]->name()}); | |
| report.conflict_count++; | |
| } | |
| apply_field_group_merge(merged, entries[i].entity, fg, res); | |
| // If the source has a strictly higher-priority policy for this field group and | |
| // there is no conflict, treat the source layer as the new owner for future merges. | |
| if (!res.is_conflict) { | |
| int target_prio = policy_priority(target_policy); | |
| int source_prio = policy_priority(source_policy); | |
| if (source_prio > target_prio) { | |
| owner_layer_idx_per_fg[fg_index] = source_layer_idx; | |
| } | |
| } |
| for (size_t i = 0; i < layers_.size(); ++i) { | ||
| // Build discovery context from entities collected so far (for plugin layers) | ||
| IntrospectionInput context; | ||
| for (const auto & [idx, entities] : area_layers) { | ||
| context.areas.insert(context.areas.end(), entities.begin(), entities.end()); | ||
| } | ||
| for (const auto & [idx, entities] : component_layers) { | ||
| context.components.insert(context.components.end(), entities.begin(), entities.end()); | ||
| } | ||
| for (const auto & [idx, entities] : app_layers) { | ||
| context.apps.insert(context.apps.end(), entities.begin(), entities.end()); | ||
| } | ||
| for (const auto & [idx, entities] : function_layers) { | ||
| context.functions.insert(context.functions.end(), entities.begin(), entities.end()); | ||
| } | ||
| layers_[i]->set_discovery_context(context); |
There was a problem hiding this comment.
MergePipeline::execute() rebuilds IntrospectionInput from scratch on every layer by re-copying all prior layer entity vectors. With multiple plugin layers and large entity sets this becomes O(L^2) copying and can noticeably increase discovery latency. Consider maintaining an accumulating context (append just the previous layer’s output each iteration), or building the context from the already-merged-so-far result to avoid repeated full copies.
| } | ||
| // Default: each node = 1 component (backward compatible) | ||
| return discover_node_components(); | ||
| auto apps = discover_apps(); |
There was a problem hiding this comment.
RuntimeDiscoveryStrategy::discover_components() now always derives Components via discover_synthetic_components(apps) and ignores config_.create_synthetic_components. This makes the "legacy" mode (create_synthetic_components=false => each node is a Component) impossible and changes component IDs/FQNs even when users explicitly disable synthetic components. Consider restoring the conditional: when create_synthetic_components is false, return per-node Components (with per-node fqn/namespace_path and live data) rather than synthetic groupings.
| auto apps = discover_apps(); | |
| auto apps = discover_apps(); | |
| // Honor configuration: when synthetic components are disabled, each App becomes its own Component. | |
| if (!config_.create_synthetic_components) { | |
| std::vector<Component> components; | |
| components.reserve(apps.size()); | |
| for (const auto & app : apps) { | |
| Component component; | |
| // Preserve per-node identity and namespace information where available | |
| component.id = app.id; | |
| component.namespace_path = app.namespace_path; | |
| component.fqn = app.fqn; | |
| component.area_id = app.area_id; | |
| components.push_back(std::move(component)); | |
| } | |
| return components; | |
| } | |
| // Default behavior: group Apps into synthetic Components based on the configured strategy. |
Introduce MergePolicy (AUTHORITATIVE/ENRICHMENT/FALLBACK), FieldGroup enums, MergeReport diagnostics, DiscoveryLayer interface and LayerOutput struct. Implement MergePipeline core with single-layer passthrough and multi-layer field-group merge with conflict resolution.
Implement concrete DiscoveryLayer wrappers: ManifestLayer (delegates to ManifestManager, AUTHORITATIVE for identity/hierarchy/metadata), RuntimeLayer (delegates to RuntimeDiscoveryStrategy, AUTHORITATIVE for live_data/status), PluginLayer (wraps IntrospectionProvider, ENRICHMENT for all fields). Add GapFillConfig for RuntimeLayer namespace filtering in hybrid mode.
…coveryStrategy Implement RuntimeLinker that binds manifest apps (with ros_binding) to runtime-discovered ROS 2 nodes (with bound_fqn) after pipeline merge. Includes multi-match detection, binding conflict reporting, orphan node warnings, and deterministic namespace matching. Wire MergePipeline into HybridDiscoveryStrategy replacing the ad-hoc merge logic.
Integrate IntrospectionProviders as pipeline layers with priority ordering and batch registration. Add YAML configuration for merge pipeline gap-fill. Expose MergeReport and LinkingResult in GET /health endpoint. Pass merged entity context (IntrospectionInput) to plugin layers before discover(). Add comprehensive documentation for merge pipeline architecture, policies, gap-fill options, and merge report diagnostics.
Add VendorExtensionPlugin demo that registers custom entities and routes via the IntrospectionProvider interface. Add integration test validating plugin layer discovery, entity merging, and context passing through the merge pipeline.
Fix linker errors (TPOFF32, R_X86_64_PC32) when gateway_lib.a is linked into test_gateway_plugin.so (MODULE target). Remove static thread_local std::mt19937 in BulkDataStore (incompatible with initial-exec TLS model). Enable POSITION_INDEPENDENT_CODE on ros2_medkit_serialization static lib. Remove @verifies tag referencing nonexistent REQ_DISC_MERGE_FUNCTION.
61874c0 to
e886342
Compare
Pull Request
Summary
Replace ad-hoc merging in HybridDiscoveryStrategy with a formal layered MergePipeline that orchestrates manifest, runtime, and plugin discovery sources with configurable per-field-group merge precedence (AUTHORITATIVE/ENRICHMENT/FALLBACK). Adds conflict detection, gap-fill filtering, runtime linker determinism fixes, and diagnostic reporting via /health endpoint.
Commit overview
Issue
Type
Testing
Checklist