Skip to content

[system tests] Filter APM connector-generated data streams in OTel system tests#3442

Merged
teresaromero merged 12 commits intoelastic:mainfrom
mrodm:filter_datastreams_system_tests
Apr 10, 2026
Merged

[system tests] Filter APM connector-generated data streams in OTel system tests#3442
teresaromero merged 12 commits intoelastic:mainfrom
mrodm:filter_datastreams_system_tests

Conversation

@mrodm
Copy link
Copy Markdown
Contributor

@mrodm mrodm commented Apr 8, 2026

Fixes #3434

When running system tests for otelcol input packages with dynamic data
stream discovery, the APM connector generates rollup metrics data streams
(e.g. metrics-service_destination.1m.otel-) that are not
produced directly by the OTel receiver. These streams do not accumulate
enough documents to satisfy test assertions.

Add filterOtelAPMRollupDataStreams to skip any discovered data stream
matching ^metrics-.*\.(1m|10m|60m)\.otel- during buildDataStreamScenarios,
logging an info message for each skipped stream.

Example output:

2026/04/08 13:44:03 DEBUG Waiting for data streams matching *-*-90334 (timeout: 10m0s, or results are stable for: 1m0s)...
2026/04/08 13:46:11 DEBUG Discovery finished; found 5 data stream(s) matching *-*-90334: logs-generic.otel-90334, metrics-service_summary.1m.otel-90334, metrics-service_transaction.1m.otel-90334, metrics-transaction.1m.otel-90334, traces-generic.otel-90334
2026/04/08 13:46:11  INFO Skipping APM connector-generated data stream "metrics-service_summary.1m.otel-90334"
2026/04/08 13:46:11  INFO Skipping APM connector-generated data stream "metrics-service_transaction.1m.otel-90334"
2026/04/08 13:46:11  INFO Skipping APM connector-generated data stream "metrics-transaction.1m.otel-90334"
2026/04/08 13:46:11 DEBUG Testing 2 data stream(s): logs-generic.otel-90334, traces-generic.otel-90334

Other changes in this PR:

  • Increase the default time to ensure no more data streams are created up to 1 minute.

  • Update delete data streams and search data streams API calls to include hidden data streams (WithExpandWildcards("all"))

    • There were some executions that left in Elasticsearch some hidden indexes:
      • metrics-service_destination.10m.otel-19528
      • metrics-service_summary.10m.otel-19528
      • metrics-service_transaction.10m.otel-19528
      • metrics-service_destination.60m.otel-19528
      • metrics-service_summary.60m.otel-19528
      • metrics-service_transaction.60m.otel-19528
      • metrics-transaction.10m.otel-19528
      • metrics-transaction.60m.otel-19528
    • Screenshot from another test execution:
      image
  • Re-ordered tear down test to delete data streams as one of the final step.

  • Run all handlers in tear down test even if there are failures.

Author's Checklist


Generated with the assistance of Claude

mrodm and others added 2 commits April 8, 2026 12:30
When running system tests for otelcol input packages with dynamic data
stream discovery, the APM connector generates rollup metrics data streams
(e.g. metrics-service_destination.1m.otel-<namespace>) that are not
produced directly by the OTel receiver. These streams do not accumulate
enough documents to satisfy test assertions.

Add filterOtelAPMRollupDataStreams to skip any discovered data stream
matching ^metrics-.*\.(1m|10m|60m)\.otel- during buildDataStreamScenarios,
logging an info message for each skipped stream.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
When validating documents, the same "not able to add key" warning could
be logged once per document, spamming the output. Track already-emitted
warnings in a per-validator map (loggedWarnings) keyed on the full
message, so each unique key+error combination is logged exactly once.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
mrodm and others added 5 commits April 8, 2026 16:34
APM connector transforms create rollup data streams asynchronously after
traces are ingested, so a single delete pass can miss streams that appear
after the cleanup runs. Replace the single delete call with a retry loop
(up to 3 attempts): delete all streams matching the namespace pattern,
sleep 5s to allow async streams to appear, then search for any remaining.
If streams are still present after all attempts, log a warning rather than
failing the test.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously, tearDownTest returned on the first handler error, leaving
subsequent handlers (e.g. data stream cleanup, agent shutdown) unexecuted.
Collect errors with multierror so all handlers always run regardless of
individual failures, returning the combined error at the end.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
APM connector rollup data streams are marked as hidden in Elasticsearch.
The default expand_wildcards=open silently excludes hidden streams, causing
deleteDataStream and searchDataStreams to miss them entirely — leaving them
undeleted after test cleanup.

Pass WithExpandWildcards("all") to both the DeleteDataStream and
GetDataStream ES API calls so hidden streams are included in both deletion
and discovery.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
The retry loop was added to handle APM connector rollup streams not being
cleaned up. The root cause was that those streams are hidden in ES and
expand_wildcards=open (the default) silently skipped them. Since
deleteDataStream now uses expand_wildcards=all, the single delete call is
sufficient and the retry loop is no longer needed.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment on lines +164 to +166
// loggedWarnings tracks keys for which a warning has already been emitted, to
// avoid repeating the same message for every validated document.
loggedWarnings map[string]struct{}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes about loggedWarnings have been moved to a new PR: #3441

if r.resetAgentPolicyHandler != nil {
if err := r.resetAgentPolicyHandler(cleanupCtx); err != nil {
return err
merr = append(merr, err)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated tearDown process to ensure that all handlers are executed even if there is any failure. WDYT ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so it does a best-effort cleanup 👍

Though I wonder if there can be chains of errors if some cleanup handler fails, and others depend on it.

Comment on lines +1496 to +1497
// Other signal types (logs-*, traces-*) may need adding here if future APM connector
// versions generate rollup streams of those types.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, zipkin test package with use_apm: false it creates documents in logs-* and traces-*

@mrodm mrodm force-pushed the filter_datastreams_system_tests branch from 07d2e26 to 21da7d2 Compare April 8, 2026 17:53
mrodm and others added 3 commits April 8, 2026 20:01
Replace the manual [:0:0] loop in filterOtelAPMRollupDataStreams with
slices.DeleteFunc, which is already used elsewhere in the codebase.

Improve the WithExpandWildcards("all") comments to explain why the default
expand_wildcards=open is insufficient (APM connector rollup streams are
hidden in ES and silently excluded by default).

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
waitForDynamicStreamsStableDuration = 10 * time.Second
dataStreamDiscoveryPollInterval = 1 * time.Second
waitForDataDefaultTimeout = 10 * time.Minute
waitForDynamicStreamsStableDefaultDuration = 60 * time.Second
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to update this duration, so elastic-package could detect all the data streams in the tests that I run locally.

@mrodm
Copy link
Copy Markdown
Contributor Author

mrodm commented Apr 8, 2026

test integrations

@elastic-vault-github-plugin-prod
Copy link
Copy Markdown

Created or updated PR in integrations repository to test this version. Check elastic/integrations#18295

@mrodm mrodm marked this pull request as ready for review April 8, 2026 18:51
@mrodm mrodm requested a review from a team April 8, 2026 18:51
merr = append(merr, err)
}
r.cleanTestScenarioHandler = nil
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking advantage of the multierror change, the handler in charge of deleting data streams could be moved at the end to delete all data streams.

if err != nil {
return nil, err
}
discovered = filterOtelAPMRollupDataStreams(discovered)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just takes effect if canHaveMultipleDataStreams is true

	canHaveMultipleDataStreams := policyTemplate.DynamicSignalTypes || (policyTemplate.Input == otelCollectorInputName && policyTemplate.Type == "traces")

Comment on lines +8 to +9
# required to increase waiting time to detect all data streams (specially logs and traces)
wait_for_dynamic_streams_stable: 90s
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given my local tests with zipkin and kafka, it's likely that these kind of packages creating several data streams require more than 60s or 90s to be able get all required data streams.

if r.resetAgentPolicyHandler != nil {
if err := r.resetAgentPolicyHandler(cleanupCtx); err != nil {
return err
merr = append(merr, err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so it does a best-effort cleanup 👍

Though I wonder if there can be chains of errors if some cleanup handler fails, and others depend on it.

func filterOtelAPMRollupDataStreams(streams []discoveredDataStream) []discoveredDataStream {
return slices.DeleteFunc(streams, func(s discoveredDataStream) bool {
if apmRollupDataStreamPattern.MatchString(s.name) {
logger.Infof("Skipping APM connector-generated data stream %q", s.name)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log this at the debug level?

@elasticmachine
Copy link
Copy Markdown
Collaborator

elasticmachine commented Apr 10, 2026

💛 Build succeeded, but was flaky

Failed CI Steps

History

cc @mrodm

@teresaromero
Copy link
Copy Markdown
Contributor

i am merging this as needed for my changes! thnks!

@teresaromero teresaromero merged commit 234024f into elastic:main Apr 10, 2026
5 checks passed
@mrodm mrodm deleted the filter_datastreams_system_tests branch April 14, 2026 08:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[system tests] Filter metrics data streams generated by traces (APM) in OTEL packages

4 participants