From 19c0db20d8f7c322080f7dac8c25482c2f55fe9e Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Tue, 26 May 2026 15:43:42 +0200 Subject: [PATCH] feat(ci): swap Python's OTLP encode+upload for the native Rust pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second layer of the `mergify ci junit-process` port — replace the `opentelemetry-exporter-otlp-proto-http` encode-and-upload path in `mergify_cli/ci/junit_processing/upload.py` with a call into a new hidden Rust subcommand, `mergify _internal junit-upload`, that re-parses the JUnit XML files, builds the OTLP `ExportTraceServiceRequest` with the quarantine set baked in, gzips it, and POSTs it to `/v1/repos///ci/traces`. Two new modules under `junit_process`: - `spans` builds one session span per upload, one suite span per ``, and one case span per ``, all sharing a resource that carries the CI-env attributes the backend uses for routing (provider, pipeline run id, branch, head SHA, …). The test cases pin parent/child wiring, status-code mapping, attribute propagation, and resource scraping by feeding a deterministic RNG into a hidden seam. `UploadMetadata` carries an optional `run_id` (the bridge passes the same 16-char hex Python already printed to its UI; decoded to 8 bytes as the session span id) and a `quarantined` name set (each matching case span gets `cicd.test.quarantined = true`). - `upload` POSTs the request with the same headers the Python `OTLPSpanExporter` used to send (`Bearer` auth, `application/x-protobuf`, `Content-Encoding: gzip`) and matches the error wording so any log scrapers tracking Python output keep working. Wiremock covers the happy path, the empty-request short-circuit, and the 401 error surface. The detector grows the resource-attribute lookups Python emits (pipeline name, job name, run id/attempt/url, head/base ref, head SHA, repository URL, runner name). On GitHub Actions `pull_request` builds, `GITHUB_SHA` points at the synthetic merge commit GitHub creates by pre-merging the PR head into the base — not the actual code the tests ran against. The contributor's real head SHA lives at `pull_request.head.sha` inside `GITHUB_EVENT_PATH` and that is the value the dashboards correlate with, so `get_head_sha` prefers the event payload and falls back to `GITHUB_SHA` when the payload is missing, malformed, or the build isn't a PR event. Three unit tests pin the precedence: payload wins, missing-PR-field falls back, missing-event-file falls back. The CircleCI PR-build API fallback Python implements stays Python-side for now — it requires a GitHub REST API client we don't have on the Rust side yet. Python's `upload.upload(api_url, token, repository, spans=...)` becomes `upload.upload(api_url, token, repository, files=..., run_id=..., quarantined_names=..., test_framework=..., test_language=...)`. The caller (`process_junit_files`) extracts `quarantined_names` from the spans the Python quarantine pass already mutated (`cicd.test.quarantined is True`) so the Rust span builder reproduces the same wire shape the Python pipeline used to emit. The `opentelemetry-exporter-otlp-proto-http` runtime dep is dropped — `opentelemetry-sdk` stays because the quarantine and report paths still consume `ReadableSpan` attributes (those move to native in Phase C). `opentelemetry-proto` is the only otel dep we pull in on the Rust side, gated to `gen-tonic-messages + trace` so it boils down to the prost generated types plus the trace proto module — no tonic, no otel SDK, no exporter runtime. Compression is `flate2` and the HTTP layer reuses the existing workspace `reqwest` to keep the TLS / rustls flavour consistent. This is a transitional bridge. When Phase C of the port lands later in the stack (the native `ci junit-process` orchestrator that subsumes the entire command), the Python side of `junit-process` and the `_internal junit-upload` subcommand both go away. Cleanup checklist for that follow-up: - delete `InternalSubcommand::JunitUpload`, `InternalJunitUploadArgs`, `InternalJunitUploadOpts`, `NativeCommand::InternalJunitUpload`, and the `("_internal", "junit-upload")` entry in `NATIVE_COMMANDS` - delete `mergify_cli/ci/junit_processing/upload.py` - drop the remaining `opentelemetry-sdk` Python dep once the rest of `process_junit_files` is gone Co-Authored-By: Claude Opus 4.7 Change-Id: Icbc727166711d76678877aa172ca47c2dcc07ebc --- Cargo.lock | 129 +++ crates/mergify-ci/Cargo.toml | 4 + crates/mergify-ci/src/detector.rs | 294 ++++++- crates/mergify-ci/src/junit_process/mod.rs | 18 +- crates/mergify-ci/src/junit_process/spans.rs | 760 ++++++++++++++++++ crates/mergify-ci/src/junit_process/upload.rs | 260 ++++++ crates/mergify-ci/src/testing.rs | 58 +- crates/mergify-cli/src/main.rs | 156 +++- mergify_cli/ci/junit_processing/cli.py | 20 +- mergify_cli/ci/junit_processing/upload.py | 122 ++- .../tests/ci/junit_processing/test_cli.py | 13 +- .../tests/ci/junit_processing/test_upload.py | 223 +++-- mergify_cli/tests/ci/test_cli.py | 10 +- pyproject.toml | 8 +- uv.lock | 70 -- 15 files changed, 1919 insertions(+), 226 deletions(-) create mode 100644 crates/mergify-ci/src/junit_process/spans.rs create mode 100644 crates/mergify-ci/src/junit_process/upload.rs diff --git a/Cargo.lock b/Cargo.lock index 5f5aa546..e7dbbb76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + [[package]] name = "ahash" version = "0.8.12" @@ -318,6 +324,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "crc32fast" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" +dependencies = [ + "cfg-if", +] + [[package]] name = "data-encoding" version = "2.11.0" @@ -359,6 +374,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "either" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" + [[package]] name = "email_address" version = "0.2.9" @@ -407,6 +428,16 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "flate2" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fluent-uri" version = "0.4.1" @@ -920,6 +951,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.18" @@ -1086,12 +1126,16 @@ name = "mergify-ci" version = "0.0.0" dependencies = [ "chrono", + "flate2", "getrandom 0.3.4", "globset", "mergify-config", "mergify-core", "mergify-test-support", + "opentelemetry-proto", + "prost", "quick-xml", + "reqwest", "serde", "serde_json", "serde_yaml_ng", @@ -1213,6 +1257,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a86d3146ed3995b5913c414f6664344b9617457320782e64f0bb44afd49d74" +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", + "simd-adler32", +] + [[package]] name = "mio" version = "1.2.0" @@ -1331,6 +1385,46 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "opentelemetry" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0142c63252a9e054e68a4c61a5778f7b14f576274d593f8ce883d191a099682" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56d658ba1faf63f7b9c492cfbe6e0ec365440a16132d3270c1065f7b33f1b638" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368afaed344110f40b179bb8fbe54bc52d98f9bd2b281799ef32487c2650c956" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "portable-atomic", + "rand", + "thiserror", +] + [[package]] name = "outref" version = "0.5.2" @@ -1372,6 +1466,12 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "potential_utf" version = "0.1.5" @@ -1409,6 +1509,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "quick-xml" version = "0.40.1" @@ -1880,6 +2003,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "simd-adler32" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703d5c7ef118737c72f1af64ad2f6f8c5e1921f818cdcb97b8fe6fc69bf66214" + [[package]] name = "simd_cesu8" version = "1.1.1" diff --git a/crates/mergify-ci/Cargo.toml b/crates/mergify-ci/Cargo.toml index c4750cee..18773adf 100644 --- a/crates/mergify-ci/Cargo.toml +++ b/crates/mergify-ci/Cargo.toml @@ -11,11 +11,15 @@ publish = false [dependencies] chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "std"] } +flate2 = "1" getrandom = "0.3" globset = "0.4" mergify-config = { path = "../mergify-config" } mergify-core = { path = "../mergify-core" } +opentelemetry-proto = { version = "0.32", default-features = false, features = ["gen-tonic-messages", "trace"] } +prost = "0.14" quick-xml = "0.40" +reqwest = { version = "0.13", default-features = false, features = ["rustls"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml_ng = "0.10" diff --git a/crates/mergify-ci/src/detector.rs b/crates/mergify-ci/src/detector.rs index f18a2559..8728c5c6 100644 --- a/crates/mergify-ci/src/detector.rs +++ b/crates/mergify-ci/src/detector.rs @@ -19,6 +19,22 @@ pub enum CIProvider { Buildkite, } +impl CIProvider { + /// String identifier Python emits as the `cicd.provider.name` + /// span attribute. Must match `mergify_cli.ci.detector.CIProviderT` + /// (`snake_case`, no underscore for the multi-word ones except + /// `github_actions`). + #[must_use] + pub fn as_str(self) -> &'static str { + match self { + Self::GithubActions => "github_actions", + Self::CircleCi => "circleci", + Self::Jenkins => "jenkins", + Self::Buildkite => "buildkite", + } + } +} + #[must_use] pub fn get_ci_provider() -> Option { if env::var("JENKINS_URL").ok().is_some_and(|v| !v.is_empty()) { @@ -155,15 +171,15 @@ pub fn get_github_pull_request_number() -> Result, CliError> { } fn read_github_event_pull_request_number() -> Result, CliError> { - let Ok(event_path) = env::var("GITHUB_EVENT_PATH") else { + // The PR-number lookup is strict about JSON failures because + // it's the only signal that decides whether `scopes-send` runs + // at all — silently swallowing a parse error there would hide + // a misconfigured workflow. The head-SHA lookup (see + // [`read_github_event_pull_request_head_sha`]) has a sane + // fallback (`GITHUB_SHA`), so it stays lenient. + let Some(event_path) = env::var("GITHUB_EVENT_PATH").ok().filter(|s| !s.is_empty()) else { return Ok(None); }; - if event_path.is_empty() { - return Ok(None); - } - // A missing event file means "this isn't a GitHub Actions - // pull-request event" — match the Python CLI and treat it as - // "no PR detected", not a Configuration error. let content = match std::fs::read_to_string(&event_path) { Ok(content) => content, Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None), @@ -181,6 +197,191 @@ fn read_github_event_pull_request_number() -> Result, CliError> { .and_then(serde_json::Value::as_u64)) } +/// `cicd.pipeline.name` resource attribute. None when the +/// provider can't be detected or its env var isn't set. +#[must_use] +pub fn get_pipeline_name() -> Option { + let var = match get_ci_provider()? { + CIProvider::GithubActions => "GITHUB_WORKFLOW", + CIProvider::Jenkins => "JOB_NAME", + CIProvider::Buildkite => "BUILDKITE_PIPELINE_SLUG", + CIProvider::CircleCi => return None, + }; + non_empty_env(var) +} + +/// `cicd.pipeline.task.name` — the job within a pipeline. +#[must_use] +pub fn get_job_name() -> Option { + match get_ci_provider()? { + CIProvider::GithubActions => non_empty_env("GITHUB_JOB"), + CIProvider::CircleCi => non_empty_env("CIRCLE_JOB"), + CIProvider::Jenkins => non_empty_env("JOB_NAME"), + CIProvider::Buildkite => { + non_empty_env("BUILDKITE_LABEL").or_else(|| non_empty_env("BUILDKITE_STEP_KEY")) + } + } +} + +/// `vcs.ref.head.name` — name of the branch the test ran on. +#[must_use] +pub fn get_head_ref_name() -> Option { + match get_ci_provider()? { + CIProvider::GithubActions => { + // GitHub Actions sets `GITHUB_HEAD_REF` only on PR + // events. Fall back to `GITHUB_REF_NAME` everywhere + // else (the bare branch name, not `/merge`). + non_empty_env("GITHUB_HEAD_REF").or_else(|| non_empty_env("GITHUB_REF_NAME")) + } + CIProvider::CircleCi => non_empty_env("CIRCLE_BRANCH"), + CIProvider::Jenkins => non_empty_env("GIT_BRANCH").map(|raw| { + // Jenkins' Git plugin sets `GIT_BRANCH` to + // `/` (or `refs/heads/` when + // the job's configured for a refspec). Strip the + // common prefixes so the wire value matches what + // GitHub Actions reports. + for prefix in ["origin/", "refs/heads/"] { + if let Some(stripped) = raw.strip_prefix(prefix) { + return stripped.to_string(); + } + } + raw + }), + CIProvider::Buildkite => non_empty_env("BUILDKITE_BRANCH"), + } +} + +/// `vcs.ref.base.name` — PR target branch, when running for a PR. +#[must_use] +pub fn get_base_ref_name() -> Option { + match get_ci_provider()? { + CIProvider::GithubActions => non_empty_env("GITHUB_BASE_REF"), + CIProvider::Jenkins => non_empty_env("CHANGE_TARGET"), + CIProvider::Buildkite => non_empty_env("BUILDKITE_PULL_REQUEST_BASE_BRANCH"), + CIProvider::CircleCi => None, + } +} + +/// `cicd.pipeline.runner.name` — host / agent identity. +#[must_use] +pub fn get_cicd_pipeline_runner_name() -> Option { + match get_ci_provider()? { + CIProvider::GithubActions => non_empty_env("RUNNER_NAME"), + CIProvider::Jenkins => non_empty_env("NODE_NAME"), + CIProvider::Buildkite => non_empty_env("BUILDKITE_AGENT_NAME"), + CIProvider::CircleCi => None, + } +} + +/// `cicd.pipeline.run.id` — the workflow / build identifier. +/// Returned as a string because GitHub uses an integer-like ID +/// while Jenkins and Buildkite emit free-form strings. +#[must_use] +pub fn get_cicd_pipeline_run_id() -> Option { + match get_ci_provider()? { + CIProvider::GithubActions => non_empty_env("GITHUB_RUN_ID"), + CIProvider::CircleCi => non_empty_env("CIRCLE_WORKFLOW_ID"), + CIProvider::Jenkins => non_empty_env("BUILD_ID"), + CIProvider::Buildkite => non_empty_env("BUILDKITE_BUILD_ID"), + } +} + +/// `cicd.pipeline.run.attempt` — 1-indexed retry counter. +#[must_use] +pub fn get_cicd_pipeline_run_attempt() -> Option { + match get_ci_provider()? { + CIProvider::GithubActions => non_empty_env("GITHUB_RUN_ATTEMPT")?.parse().ok(), + CIProvider::CircleCi => non_empty_env("CIRCLE_BUILD_NUM")?.parse().ok(), + // Buildkite uses 0-indexed retries; add 1 so a fresh run + // reads as attempt 1 (matching the GHA/CircleCI semantics). + CIProvider::Buildkite => non_empty_env("BUILDKITE_RETRY_COUNT")? + .parse::() + .ok() + .map(|n| n + 1), + CIProvider::Jenkins => None, + } +} + +/// `cicd.pipeline.run.url` — direct link to the running build. +#[must_use] +pub fn get_cicd_pipeline_run_url() -> Option { + match get_ci_provider()? { + CIProvider::Buildkite => non_empty_env("BUILDKITE_BUILD_URL"), + _ => None, + } +} + +/// `vcs.repository.url.full` — clone URL of the repository under +/// test. GitHub Actions has no equivalent env (the repo is implicit +/// from `GITHUB_REPOSITORY`); we report `None` there. +#[must_use] +pub fn get_repository_url() -> Option { + match get_ci_provider()? { + CIProvider::Buildkite => non_empty_env("BUILDKITE_REPO"), + CIProvider::CircleCi => non_empty_env("CIRCLE_REPOSITORY_URL"), + CIProvider::Jenkins => non_empty_env("GIT_URL"), + CIProvider::GithubActions => None, + } +} + +/// `vcs.ref.head.revision` — the commit SHA the tests ran against. +/// +/// For GitHub Actions PR builds, `GITHUB_SHA` is the *synthetic +/// merge commit* GitHub creates by merging the PR head into the +/// base — not the actual code under test. The event payload at +/// `GITHUB_EVENT_PATH` carries the real `pull_request.head.sha`, +/// which is what dashboards correlate with the contributor's +/// commit. We prefer the event-payload value when present and +/// fall back to `GITHUB_SHA` otherwise. +/// +/// For other providers we only have the bare env var today; the +/// `CircleCI` PR-build API fallback Python implements stays +/// Python-side until a Rust HTTP shim for GitHub's REST API lands. +#[must_use] +pub fn get_head_sha() -> Option { + match get_ci_provider()? { + CIProvider::GithubActions => get_github_actions_head_sha(), + CIProvider::CircleCi => non_empty_env("CIRCLE_SHA1"), + CIProvider::Jenkins => non_empty_env("GIT_COMMIT"), + CIProvider::Buildkite => non_empty_env("BUILDKITE_COMMIT"), + } +} + +fn get_github_actions_head_sha() -> Option { + if env::var("GITHUB_EVENT_NAME").as_deref() == Ok("pull_request") { + if let Some(sha) = read_github_event_pull_request_head_sha() { + return Some(sha); + } + } + non_empty_env("GITHUB_SHA") +} + +/// Read `GITHUB_EVENT_PATH` and pluck the +/// `pull_request.head.sha` out of the JSON. Returns `None` for +/// every "not applicable" case — env unset, file missing, file +/// not JSON, key not present — so the caller can quietly fall +/// back to `GITHUB_SHA` without surfacing an error to the user. +fn read_github_event_pull_request_head_sha() -> Option { + let event = read_github_event_json()?; + event + .pointer("/pull_request/head/sha") + .and_then(serde_json::Value::as_str) + .map(str::to_string) +} + +fn read_github_event_json() -> Option { + let event_path = env::var("GITHUB_EVENT_PATH").ok()?; + if event_path.is_empty() { + return None; + } + let content = std::fs::read_to_string(&event_path).ok()?; + serde_json::from_str(&content).ok() +} + +fn non_empty_env(name: &str) -> Option { + env::var(name).ok().filter(|s| !s.is_empty()) +} + #[cfg(test)] mod tests { use super::*; @@ -430,4 +631,83 @@ mod tests { ); } } + + #[test] + fn head_sha_prefers_pr_event_payload_over_github_sha() { + // PR events: `GITHUB_SHA` is the synthetic merge commit + // GitHub creates by pre-merging the PR; the contributor's + // actual head sha lives in the event payload at + // `pull_request.head.sha`. Dashboards correlate with the + // payload value, so we must prefer it. + let tmp = tempfile::tempdir().unwrap(); + let event_path = tmp.path().join("event.json"); + std::fs::write( + &event_path, + serde_json::json!({ + "pull_request": { + "number": 7, + "head": { "sha": "feedface00000000000000000000000000000000" } + } + }) + .to_string(), + ) + .unwrap(); + + with_ci_env( + &[ + ("GITHUB_ACTIONS", Some("true")), + ("GITHUB_EVENT_NAME", Some("pull_request")), + ("GITHUB_EVENT_PATH", Some(event_path.to_str().unwrap())), + ( + "GITHUB_SHA", + Some("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef"), + ), + ], + || { + assert_eq!( + get_head_sha().as_deref(), + Some("feedface00000000000000000000000000000000"), + ); + }, + ); + } + + #[test] + fn head_sha_falls_back_to_github_sha_when_event_lacks_pr_head() { + // push events still leave `GITHUB_EVENT_PATH` pointing at a + // payload, but it has no `pull_request` field. Fall back to + // `GITHUB_SHA` rather than returning None. + let tmp = tempfile::tempdir().unwrap(); + let event_path = tmp.path().join("event.json"); + std::fs::write(&event_path, serde_json::json!({}).to_string()).unwrap(); + with_ci_env( + &[ + ("GITHUB_ACTIONS", Some("true")), + ("GITHUB_EVENT_NAME", Some("push")), + ("GITHUB_EVENT_PATH", Some(event_path.to_str().unwrap())), + ("GITHUB_SHA", Some("deadbeef")), + ], + || { + assert_eq!(get_head_sha().as_deref(), Some("deadbeef")); + }, + ); + } + + #[test] + fn head_sha_uses_github_sha_when_event_path_missing() { + // Workflows without an event file (e.g. local + // `act` runs) still set GITHUB_SHA — we must not regress + // to `None` just because the JSON file isn't there. + with_ci_env( + &[ + ("GITHUB_ACTIONS", Some("true")), + ("GITHUB_EVENT_NAME", Some("pull_request")), + ("GITHUB_EVENT_PATH", Some("/this/path/does/not/exist")), + ("GITHUB_SHA", Some("cafef00d")), + ], + || { + assert_eq!(get_head_sha().as_deref(), Some("cafef00d")); + }, + ); + } } diff --git a/crates/mergify-ci/src/junit_process/mod.rs b/crates/mergify-ci/src/junit_process/mod.rs index 56c2bfb9..c56c5742 100644 --- a/crates/mergify-ci/src/junit_process/mod.rs +++ b/crates/mergify-ci/src/junit_process/mod.rs @@ -4,18 +4,26 @@ //! The command port lands in three steps so each layer is //! reviewable on its own: //! -//! - **Phase A** (this commit) — [`junit`]: `JUnit` XML parser +//! - **Phase A** (landed) — [`junit`]: `JUnit` XML parser //! producing semantically-tagged [`TestCase`] values. //! Hermetic, no network. -//! - **Phase B** (next) — OTLP protobuf encoding + upload. -//! - **Phase C** (final) — quarantine API client, CLI dispatch, +//! - **Phase B** (this commit) — [`spans`] turns parser output +//! into an OTLP `ExportTraceServiceRequest`; [`upload`] gzips +//! that protobuf payload and POSTs it to +//! `/v1/repos///ci/traces`. +//! - **Phase C** (next) — quarantine API client, CLI dispatch, //! and `Subcommands::Ci(CiSubcommand::JunitProcess)` promotion //! from shim to native. //! //! Until Phase C lands, the binary keeps shimming -//! `ci junit-process` to Python — but the parser already lives -//! here so subsequent layers have something to consume. +//! `ci junit-process` to Python — but the parser and uploader +//! already live here so the dispatch layer just needs to wire +//! them together. pub mod junit; +pub mod spans; +pub mod upload; pub use junit::{Failure, InvalidJunitXml, ParseResult, TestCase, TestStatus}; +pub use spans::{BuiltTraces, UploadMetadata, build_traces}; +pub use upload::{UploadError, default_client, upload}; diff --git a/crates/mergify-ci/src/junit_process/spans.rs b/crates/mergify-ci/src/junit_process/spans.rs new file mode 100644 index 00000000..7d031441 --- /dev/null +++ b/crates/mergify-ci/src/junit_process/spans.rs @@ -0,0 +1,760 @@ +//! `TestCase` → OTLP `ExportTraceServiceRequest`. +//! +//! Mirrors the span layout `mergify_cli/ci/junit_processing/junit.py` +//! produces: +//! +//! - one root **session** span per upload (parent: optional +//! `MERGIFY_TRACEPARENT`), +//! - one **suite** span per `` (parent: session), +//! - one **case** span per `` (parent: suite). +//! +//! All spans share a single OTLP `Resource` carrying the CI +//! environment attributes the backend uses for routing and +//! dashboards (provider, pipeline, run, branch, …). Common +//! attributes (`test.framework`, `test.language`) — the +//! caller-supplied per-upload metadata — get folded into every +//! span on top of its scope-specific attributes. + +use std::collections::BTreeSet; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use mergify_core::CliError; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::common::v1::any_value::Value as AnyValueOneof; +use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, KeyValue}; +use opentelemetry_proto::tonic::resource::v1::Resource; +use opentelemetry_proto::tonic::trace::v1::status::StatusCode; +use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, Status}; + +use crate::detector; +use crate::junit_process::junit::{ParseResult, TestCase, TestStatus}; + +/// Caller-supplied per-upload metadata. `test_framework` and +/// `test_language` get propagated to every span as attributes; +/// `run_id` is the human-readable identifier surfaced in the CLI +/// output (e.g. `Run ID: `). +#[derive(Debug, Clone, Default)] +pub struct UploadMetadata { + pub test_framework: Option, + pub test_language: Option, + /// Optional `mergify.test.job.name` attribute set when the + /// `MERGIFY_TEST_JOB_NAME` env var is present at parse time. + pub mergify_test_job_name: Option, + /// Caller-provided run identifier. When `Some`, used verbatim + /// as the human-readable `run_id` *and* (decoded from 16-char + /// hex) the session span's 8-byte ID — so the wire spans + /// match a `run_id` the caller already printed to its own UI. + /// When `None`, both are generated from fresh random bytes. + /// Used by the Python migration bridge to pass the `run_id` + /// the Python orchestrator generated earlier in the run. + pub run_id: Option, + /// Set of test names the quarantine API confirmed are + /// currently quarantined. Each case span whose name is in + /// this set gets `cicd.test.quarantined = true`; everything + /// else defaults to `false`. Pass an empty set when the + /// quarantine check was skipped (no failures) or failed + /// (network/API error) — the spans then upload with every + /// case marked non-quarantined, which is the conservative + /// default. + pub quarantined: BTreeSet, +} + +/// Decode a 16-character lowercase hex string into the 8-byte +/// session span id. Surface a `Configuration` error for the wrong +/// length or non-hex characters so the `_internal junit-upload` +/// bridge can refuse a malformed `--run-id` cleanly instead of +/// generating a different span id and silently confusing Python's +/// run_id-keyed report. +fn decode_run_id(hex: &str) -> Result<[u8; 8], CliError> { + if hex.len() != 16 { + return Err(CliError::Configuration(format!( + "--run-id must be 16 hex characters (got {} chars)", + hex.len(), + ))); + } + let mut bytes = [0u8; 8]; + for (i, chunk) in hex.as_bytes().chunks(2).enumerate() { + let s = std::str::from_utf8(chunk) + .map_err(|_| CliError::Configuration("--run-id is not ASCII".to_string()))?; + bytes[i] = u8::from_str_radix(s, 16) + .map_err(|e| CliError::Configuration(format!("--run-id is not hex: {e}")))?; + } + Ok(bytes) +} + +/// Result of converting a [`ParseResult`] (one or more `JUnit` +/// files) into a wire-ready OTLP request. +#[derive(Debug, Clone)] +pub struct BuiltTraces { + /// Lowercase hex identifier the CLI prints back to the user. + /// Same value populates the `test.run.id` resource attribute. + pub run_id: String, + pub request: ExportTraceServiceRequest, +} + +/// Convert a [`ParseResult`] (the union of every parsed `JUnit` +/// file) into an OTLP `ExportTraceServiceRequest`. +/// +/// Random trace and span IDs are produced via [`getrandom::fill`]. +/// `now_unix_nanos` and `id_source` exist so tests can pin a +/// deterministic clock and randomness source; production callers +/// use [`build_traces`] which fills them from +/// `SystemTime::now()` and `getrandom`. +pub fn build_traces( + parsed: &ParseResult, + metadata: &UploadMetadata, +) -> Result { + build_traces_with(parsed, metadata, system_now_unix_nanos(), &mut OsRandom) +} + +#[allow(clippy::too_many_lines)] // Straight-line span construction; splitting +// would just hide the per-span attribute set behind helper noise that's +// harder to skim than the inline form. +fn build_traces_with( + parsed: &ParseResult, + metadata: &UploadMetadata, + now_unix_nanos: u64, + rng: &mut dyn RandomBytes, +) -> Result { + let trace_id = rng.bytes16(); + let session_span_id = match &metadata.run_id { + Some(hex) => decode_run_id(hex)?, + None => rng.bytes8(), + }; + let run_id = hex_lower(&session_span_id); + + let resource = build_resource(&run_id, metadata); + + let common_attrs = common_attributes(metadata); + + let mut spans: Vec = Vec::new(); + + // Suite spans are appended after we know each suite's earliest + // case start (so the suite's start_time covers all its cases). + // Group the parser's flat case list by `suite_name`, preserving + // first-seen order so the wire output matches Python's + // document-order iteration over `` elements. + let suites = group_by_suite(&parsed.cases); + + let mut session_start_time_unix_nanos = now_unix_nanos; + + for (suite_name, suite_cases) in &suites { + let suite_span_id = rng.bytes8(); + let mut suite_start_time_unix_nanos = now_unix_nanos; + + for case in suite_cases { + let case_span_id = rng.bytes8(); + let start_time_unix_nanos = case_start_time(now_unix_nanos, case.duration); + suite_start_time_unix_nanos = suite_start_time_unix_nanos.min(start_time_unix_nanos); + + let mut attrs = common_attrs.clone(); + attrs.push(kv_string("test.scope", "case")); + attrs.push(kv_string("test.case.name", &case.name)); + attrs.push(kv_string("code.function.name", &case.name)); + attrs.push(kv_bool( + "cicd.test.quarantined", + metadata.quarantined.contains(&case.name), + )); + if let Some(file) = &case.file { + attrs.push(kv_string("code.filepath", file)); + } + if let Some(line) = &case.line { + attrs.push(kv_string("code.lineno", line)); + } + attrs.push(kv_string( + "test.case.result.status", + case.status.status_attr(), + )); + + let status_code = match case.status { + TestStatus::Passed | TestStatus::Skipped => StatusCode::Ok, + TestStatus::Failed | TestStatus::Errored => StatusCode::Error, + }; + if case.status.is_failure() { + if let Some(kind) = &case.failure.kind { + attrs.push(kv_string("exception.type", kind)); + } + if let Some(msg) = &case.failure.message { + attrs.push(kv_string("exception.message", msg)); + } + if let Some(trace) = &case.failure.stacktrace { + attrs.push(kv_string("exception.stacktrace", trace)); + } + } + + spans.push(Span { + trace_id: trace_id.to_vec(), + span_id: case_span_id.to_vec(), + trace_state: String::new(), + parent_span_id: suite_span_id.to_vec(), + flags: 0, + name: case.name.clone(), + kind: 0, + start_time_unix_nano: start_time_unix_nanos, + end_time_unix_nano: now_unix_nanos, + attributes: attrs, + dropped_attributes_count: 0, + events: Vec::new(), + dropped_events_count: 0, + links: Vec::new(), + dropped_links_count: 0, + status: Some(Status { + message: String::new(), + code: status_code.into(), + }), + }); + } + + session_start_time_unix_nanos = + session_start_time_unix_nanos.min(suite_start_time_unix_nanos); + + let mut suite_attrs = common_attrs.clone(); + suite_attrs.push(kv_string("test.case.name", suite_name)); + suite_attrs.push(kv_string("test.scope", "suite")); + spans.push(Span { + trace_id: trace_id.to_vec(), + span_id: suite_span_id.to_vec(), + trace_state: String::new(), + parent_span_id: session_span_id.to_vec(), + flags: 0, + name: suite_name.clone(), + kind: 0, + start_time_unix_nano: suite_start_time_unix_nanos, + end_time_unix_nano: now_unix_nanos, + attributes: suite_attrs, + dropped_attributes_count: 0, + events: Vec::new(), + dropped_events_count: 0, + links: Vec::new(), + dropped_links_count: 0, + status: None, + }); + } + + // Session is the root span. Place it FIRST in the wire vector + // so the backend has it before any child references it. + let mut session_attrs = common_attrs.clone(); + session_attrs.push(kv_string("test.scope", "session")); + let session_span = Span { + trace_id: trace_id.to_vec(), + span_id: session_span_id.to_vec(), + trace_state: String::new(), + parent_span_id: Vec::new(), + flags: 0, + name: "test session".to_string(), + kind: 0, + start_time_unix_nano: session_start_time_unix_nanos, + end_time_unix_nano: now_unix_nanos, + attributes: session_attrs, + dropped_attributes_count: 0, + events: Vec::new(), + dropped_events_count: 0, + links: Vec::new(), + dropped_links_count: 0, + status: None, + }; + spans.insert(0, session_span); + + let resource_spans = ResourceSpans { + resource: Some(resource), + scope_spans: vec![ScopeSpans { + scope: Some(InstrumentationScope { + name: "mergify-cli".to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + attributes: Vec::new(), + dropped_attributes_count: 0, + }), + spans, + schema_url: String::new(), + }], + schema_url: String::new(), + }; + + Ok(BuiltTraces { + run_id, + request: ExportTraceServiceRequest { + resource_spans: vec![resource_spans], + }, + }) +} + +fn group_by_suite(cases: &[TestCase]) -> Vec<(String, Vec<&TestCase>)> { + // Preserve first-seen order. A `Vec<(K, Vec<&T>)>` linear-scan + // group-by is fine here — JUnit reports rarely exceed a handful + // of suites, and we get deterministic iteration for free. + let mut groups: Vec<(String, Vec<&TestCase>)> = Vec::new(); + for case in cases { + if let Some(existing) = groups.iter_mut().find(|(name, _)| *name == case.suite_name) { + existing.1.push(case); + } else { + groups.push((case.suite_name.clone(), vec![case])); + } + } + groups +} + +fn case_start_time(now_unix_nanos: u64, duration: Option) -> u64 { + // Mirror Python's `now - int(float(time) * 10e9)`. The `10e9` + // is a long-standing bug in the Python emitter (should be + // `1e9`), so cases appear ~10× longer on the wire than the + // JUnit report claims. The Mergify backend interprets the + // current shape, so we replicate it verbatim — fixing it + // here would silently change every uploaded dashboard. If + // the Python side ever fixes the multiplier, mirror the + // change in both places. + let Some(d) = duration else { + return now_unix_nanos; + }; + #[allow( + clippy::cast_precision_loss, + clippy::cast_possible_truncation, + clippy::cast_sign_loss + )] + let scaled_nanos = (d.as_secs_f64() * 10e9) as u64; + now_unix_nanos.saturating_sub(scaled_nanos) +} + +fn build_resource(run_id: &str, metadata: &UploadMetadata) -> Resource { + let mut attrs = Vec::new(); + attrs.push(kv_string("test.run.id", run_id)); + + if let Some(job_name) = &metadata.mergify_test_job_name { + attrs.push(kv_string("mergify.test.job.name", job_name)); + } + + if let Some(name) = detector::get_pipeline_name() { + attrs.push(kv_string("cicd.pipeline.name", &name)); + } + if let Some(name) = detector::get_job_name() { + attrs.push(kv_string("cicd.pipeline.task.name", &name)); + } + if let Some(id) = detector::get_cicd_pipeline_run_id() { + attrs.push(kv_string("cicd.pipeline.run.id", &id)); + } + if let Some(url) = detector::get_cicd_pipeline_run_url() { + attrs.push(kv_string("cicd.pipeline.run.url", &url)); + } + if let Some(attempt) = detector::get_cicd_pipeline_run_attempt() { + #[allow(clippy::cast_possible_wrap)] + attrs.push(kv_int("cicd.pipeline.run.attempt", attempt as i64)); + } + if let Some(sha) = detector::get_head_sha() { + attrs.push(kv_string("vcs.ref.head.revision", &sha)); + } + if let Some(name) = detector::get_head_ref_name() { + attrs.push(kv_string("vcs.ref.head.name", &name)); + } + if let Some(name) = detector::get_base_ref_name() { + attrs.push(kv_string("vcs.ref.base.name", &name)); + } + if let Some(url) = detector::get_repository_url() { + attrs.push(kv_string("vcs.repository.url.full", &url)); + } + if let Some(repo) = detector::get_github_repository() { + attrs.push(kv_string("vcs.repository.name", &repo)); + } + if let Some(name) = detector::get_cicd_pipeline_runner_name() { + attrs.push(kv_string("cicd.pipeline.runner.name", &name)); + } + if let Some(provider) = detector::get_ci_provider() { + attrs.push(kv_string("cicd.provider.name", provider.as_str())); + } + + Resource { + attributes: attrs, + dropped_attributes_count: 0, + entity_refs: Vec::new(), + } +} + +fn common_attributes(metadata: &UploadMetadata) -> Vec { + let mut attrs = Vec::new(); + if let Some(framework) = &metadata.test_framework { + attrs.push(kv_string("test.framework", framework)); + } + if let Some(language) = &metadata.test_language { + attrs.push(kv_string("test.language", language)); + } + attrs +} + +fn kv(key: &str, value: AnyValueOneof) -> KeyValue { + // `..Default::default()` so any future proto-generated fields + // (e.g. the profiling-signal `key_strindex` already present in + // 0.32) round-trip as their default without us having to spell + // them out. + KeyValue { + key: key.to_string(), + value: Some(AnyValue { value: Some(value) }), + ..KeyValue::default() + } +} + +fn kv_string(key: &str, value: &str) -> KeyValue { + kv(key, AnyValueOneof::StringValue(value.to_string())) +} + +fn kv_bool(key: &str, value: bool) -> KeyValue { + kv(key, AnyValueOneof::BoolValue(value)) +} + +fn kv_int(key: &str, value: i64) -> KeyValue { + kv(key, AnyValueOneof::IntValue(value)) +} + +fn hex_lower(bytes: &[u8]) -> String { + use std::fmt::Write as _; + let mut out = String::with_capacity(bytes.len() * 2); + for b in bytes { + // `write!` on a `String` is infallible; the `_` discards + // the `Ok` value rather than going through a `format!` + // round trip that allocates per byte. + let _ = write!(out, "{b:02x}"); + } + out +} + +fn system_now_unix_nanos() -> u64 { + match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(d) => u64::try_from(d.as_nanos()).unwrap_or(u64::MAX), + Err(_) => 0, + } +} + +/// Internal seam for tests: any source of random bytes. The +/// production impl reads from the OS via `getrandom`; tests +/// hand-feed deterministic byte streams. +trait RandomBytes { + fn bytes8(&mut self) -> [u8; 8]; + fn bytes16(&mut self) -> [u8; 16]; +} + +struct OsRandom; + +impl RandomBytes for OsRandom { + fn bytes8(&mut self) -> [u8; 8] { + let mut buf = [0u8; 8]; + getrandom::fill(&mut buf).expect("OS rng available"); + buf + } + fn bytes16(&mut self) -> [u8; 16] { + let mut buf = [0u8; 16]; + getrandom::fill(&mut buf).expect("OS rng available"); + buf + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::junit_process::junit::Failure; + use crate::testing::with_ci_env; + + /// Deterministic byte source for tests. Bytes are consumed + /// in order. Tests provide enough buffer for the spans they + /// expect to build; running out of bytes panics so a wrong + /// span-count assumption is loud rather than silent. + struct FixedRng { + bytes: Vec, + cursor: usize, + } + impl FixedRng { + fn new(bytes: Vec) -> Self { + Self { bytes, cursor: 0 } + } + fn take(&mut self, n: usize) -> Vec { + let slice = self.bytes[self.cursor..self.cursor + n].to_vec(); + self.cursor += n; + slice + } + } + impl RandomBytes for FixedRng { + fn bytes8(&mut self) -> [u8; 8] { + let mut out = [0u8; 8]; + out.copy_from_slice(&self.take(8)); + out + } + fn bytes16(&mut self) -> [u8; 16] { + let mut out = [0u8; 16]; + out.copy_from_slice(&self.take(16)); + out + } + } + + fn sample_parsed() -> ParseResult { + ParseResult { + suite_names: vec!["pytest".to_string()], + cases: vec![ + TestCase { + name: "tests.test_func.test_success".to_string(), + suite_name: "pytest".to_string(), + duration: Some(Duration::from_secs_f64(0.001)), + file: None, + line: None, + status: TestStatus::Passed, + failure: Failure::default(), + }, + TestCase { + name: "tests.test_func.test_failed".to_string(), + suite_name: "pytest".to_string(), + duration: Some(Duration::from_secs_f64(0.002)), + file: Some("tests/test_func.py".to_string()), + line: Some("6".to_string()), + status: TestStatus::Failed, + failure: Failure { + kind: None, + message: Some("assert 1 == 0".to_string()), + stacktrace: Some("trace".to_string()), + }, + }, + ], + } + } + + #[test] + fn builds_session_suite_and_case_spans_with_consistent_parent_chain() { + // 16 bytes for trace_id; 4×8 bytes for session, suite, + // case-1, case-2 span ids. Distinct fill bytes per region + // so the assertions below can tell them apart at a glance. + let mut bytes: Vec = Vec::with_capacity(16 + 4 * 8); + bytes.extend(std::iter::repeat_n(0xAA, 16)); // trace_id + bytes.extend(std::iter::repeat_n(0x11, 8)); // session + bytes.extend(std::iter::repeat_n(0x22, 8)); // suite + bytes.extend(std::iter::repeat_n(0x33, 8)); // case-1 + bytes.extend(std::iter::repeat_n(0x44, 8)); // case-2 + let mut rng = FixedRng::new(bytes); + + let now: u64 = 1_700_000_000_000_000_000; + let metadata = UploadMetadata::default(); + let built = with_ci_env(&[], || { + build_traces_with(&sample_parsed(), &metadata, now, &mut rng).unwrap() + }); + + // run_id is the session span id rendered as hex. + assert_eq!(built.run_id, "1111111111111111"); + + let resource_spans = &built.request.resource_spans; + assert_eq!(resource_spans.len(), 1); + let scope_spans = &resource_spans[0].scope_spans; + assert_eq!(scope_spans.len(), 1); + let spans = &scope_spans[0].spans; + // 1 session + 1 suite + 2 cases. + assert_eq!(spans.len(), 4); + + // Session is first; suite reports session as parent; both + // cases report suite as parent. + let session = &spans[0]; + assert_eq!(session.name, "test session"); + assert!(session.parent_span_id.is_empty()); + assert_eq!(session.span_id, vec![0x11; 8]); + assert_eq!(session.trace_id, vec![0xAA; 16]); + + let suite = spans.iter().find(|s| s.name == "pytest").unwrap(); + assert_eq!(suite.parent_span_id, vec![0x11; 8]); + assert_eq!(suite.span_id, vec![0x22; 8]); + + let cases: Vec<&Span> = spans + .iter() + .filter(|s| s.name.starts_with("tests.test_func")) + .collect(); + assert_eq!(cases.len(), 2); + for case in &cases { + assert_eq!(case.parent_span_id, vec![0x22; 8]); + assert_eq!(case.trace_id, vec![0xAA; 16]); + } + } + + #[test] + fn case_status_maps_to_otlp_status_code() { + let mut rng = FixedRng::new(vec![0xFF; 256]); + let now: u64 = 1_700_000_000_000_000_000; + let metadata = UploadMetadata::default(); + let built = with_ci_env(&[], || { + build_traces_with(&sample_parsed(), &metadata, now, &mut rng).unwrap() + }); + let spans = &built.request.resource_spans[0].scope_spans[0].spans; + + let pass = spans + .iter() + .find(|s| s.name == "tests.test_func.test_success") + .unwrap(); + assert_eq!( + pass.status.as_ref().unwrap().code, + i32::from(StatusCode::Ok), + ); + + let fail = spans + .iter() + .find(|s| s.name == "tests.test_func.test_failed") + .unwrap(); + assert_eq!( + fail.status.as_ref().unwrap().code, + i32::from(StatusCode::Error), + ); + // exception.message / .stacktrace are attached to failing + // cases; passing ones don't get the keys at all. + let fail_attr_keys: Vec<&str> = fail.attributes.iter().map(|kv| kv.key.as_str()).collect(); + assert!(fail_attr_keys.contains(&"exception.message")); + assert!(fail_attr_keys.contains(&"exception.stacktrace")); + let pass_attr_keys: Vec<&str> = pass.attributes.iter().map(|kv| kv.key.as_str()).collect(); + assert!(!pass_attr_keys.contains(&"exception.message")); + } + + #[test] + fn case_attributes_include_file_line_and_code_function() { + let mut rng = FixedRng::new(vec![0xFF; 256]); + let metadata = UploadMetadata::default(); + let built = with_ci_env(&[], || { + build_traces_with(&sample_parsed(), &metadata, 0, &mut rng).unwrap() + }); + let spans = &built.request.resource_spans[0].scope_spans[0].spans; + let fail = spans + .iter() + .find(|s| s.name == "tests.test_func.test_failed") + .unwrap(); + let by_key: std::collections::HashMap<&str, &AnyValue> = fail + .attributes + .iter() + .filter_map(|kv| kv.value.as_ref().map(|v| (kv.key.as_str(), v))) + .collect(); + // file/line straight passthrough from the parser. + assert!(matches!( + by_key.get("code.filepath").and_then(|v| v.value.as_ref()), + Some(AnyValueOneof::StringValue(s)) if s == "tests/test_func.py" + )); + assert!(matches!( + by_key.get("code.lineno").and_then(|v| v.value.as_ref()), + Some(AnyValueOneof::StringValue(s)) if s == "6" + )); + // code.function.name mirrors the test case name. + assert!(matches!( + by_key.get("code.function.name").and_then(|v| v.value.as_ref()), + Some(AnyValueOneof::StringValue(s)) if s == "tests.test_func.test_failed" + )); + // cicd.test.quarantined defaults to false on every case; + // the quarantine layer flips it later (Phase C). + assert!(matches!( + by_key + .get("cicd.test.quarantined") + .and_then(|v| v.value.as_ref()), + Some(AnyValueOneof::BoolValue(false)) + )); + } + + #[test] + fn resource_attributes_carry_ci_env_when_set() { + let mut rng = FixedRng::new(vec![0xFF; 256]); + let metadata = UploadMetadata::default(); + let built = with_ci_env( + &[ + ("GITHUB_ACTIONS", Some("true")), + ("GITHUB_REPOSITORY", Some("owner/repo")), + ("GITHUB_WORKFLOW", Some("CI")), + ("GITHUB_JOB", Some("build")), + ("GITHUB_RUN_ID", Some("12345")), + ("GITHUB_RUN_ATTEMPT", Some("2")), + ("GITHUB_SHA", Some("abc123")), + ("GITHUB_REF_NAME", Some("main")), + ("RUNNER_NAME", Some("runner-1")), + ], + || build_traces_with(&sample_parsed(), &metadata, 0, &mut rng).unwrap(), + ); + let resource = built.request.resource_spans[0].resource.as_ref().unwrap(); + let by_key: std::collections::HashMap<&str, &AnyValue> = resource + .attributes + .iter() + .filter_map(|kv| kv.value.as_ref().map(|v| (kv.key.as_str(), v))) + .collect(); + + for (key, expected) in [ + ("cicd.provider.name", "github_actions"), + ("cicd.pipeline.name", "CI"), + ("cicd.pipeline.task.name", "build"), + ("cicd.pipeline.run.id", "12345"), + ("vcs.ref.head.revision", "abc123"), + ("vcs.ref.head.name", "main"), + ("vcs.repository.name", "owner/repo"), + ("cicd.pipeline.runner.name", "runner-1"), + ] { + assert!( + matches!( + by_key.get(key).and_then(|v| v.value.as_ref()), + Some(AnyValueOneof::StringValue(s)) if s == expected + ), + "expected resource attr {key} == {expected:?}, got {:?}", + by_key.get(key), + ); + } + // Attempt comes through as int, not string. + assert!(matches!( + by_key + .get("cicd.pipeline.run.attempt") + .and_then(|v| v.value.as_ref()), + Some(AnyValueOneof::IntValue(2)) + )); + } + + #[test] + fn common_attributes_propagate_to_every_span() { + let mut rng = FixedRng::new(vec![0xFF; 256]); + let metadata = UploadMetadata { + test_framework: Some("pytest".to_string()), + test_language: Some("python".to_string()), + mergify_test_job_name: None, + run_id: None, + quarantined: BTreeSet::new(), + }; + let built = with_ci_env(&[], || { + build_traces_with(&sample_parsed(), &metadata, 0, &mut rng).unwrap() + }); + let spans = &built.request.resource_spans[0].scope_spans[0].spans; + for span in spans { + let keys: Vec<&str> = span.attributes.iter().map(|kv| kv.key.as_str()).collect(); + assert!( + keys.contains(&"test.framework"), + "span {} missing test.framework: {keys:?}", + span.name + ); + assert!( + keys.contains(&"test.language"), + "span {} missing test.language: {keys:?}", + span.name + ); + } + } + + #[test] + fn timestamps_propagate_duration_and_session_envelopes_all_cases() { + // Cases of durations 0.001s and 0.002s. With the 10× scale + // Python uses, those become 0.01s and 0.02s in nanos before + // `now`. The session start must be the earliest case start. + let mut rng = FixedRng::new(vec![0xFF; 256]); + let now: u64 = 1_000_000_000_000_000_000; + let metadata = UploadMetadata::default(); + let built = with_ci_env(&[], || { + build_traces_with(&sample_parsed(), &metadata, now, &mut rng).unwrap() + }); + let spans = &built.request.resource_spans[0].scope_spans[0].spans; + let session = spans.iter().find(|s| s.name == "test session").unwrap(); + let suite = spans.iter().find(|s| s.name == "pytest").unwrap(); + let earliest_case = spans + .iter() + .filter(|s| s.name.starts_with("tests.")) + .map(|s| s.start_time_unix_nano) + .min() + .unwrap(); + assert_eq!(session.start_time_unix_nano, earliest_case); + assert_eq!(suite.start_time_unix_nano, earliest_case); + // End time is `now` for every span. + assert_eq!(session.end_time_unix_nano, now); + assert_eq!(suite.end_time_unix_nano, now); + for span in spans { + assert!( + span.end_time_unix_nano == now, + "{}: {}", + span.name, + span.end_time_unix_nano + ); + } + } +} diff --git a/crates/mergify-ci/src/junit_process/upload.rs b/crates/mergify-ci/src/junit_process/upload.rs new file mode 100644 index 00000000..35e60a67 --- /dev/null +++ b/crates/mergify-ci/src/junit_process/upload.rs @@ -0,0 +1,260 @@ +//! POST an `ExportTraceServiceRequest` to the Mergify CI Insights +//! traces endpoint as OTLP/HTTP/protobuf with gzip. +//! +//! Mirrors `mergify_cli/ci/junit_processing/upload.py`. The Python +//! version delegates to `opentelemetry-exporter-otlp-proto-http`, +//! which boils down to a single `POST` with three headers: +//! +//! - `Authorization: Bearer ` +//! - `Content-Type: application/x-protobuf` +//! - `Content-Encoding: gzip` +//! +//! No retries, no streaming, no SDK lifecycle — small enough to do +//! by hand with `reqwest` so we don't drag in `opentelemetry-otlp` +//! and its tonic dependency. The endpoint +//! (`{api_url}/v1/repos/{repository}/ci/traces`) matches the +//! Python URL byte for byte. + +use std::io::Write as _; +use std::time::Duration; + +use flate2::Compression; +use flate2::write::GzEncoder; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use prost::Message as _; + +#[derive(Debug)] +pub struct UploadError { + pub status: Option, + pub message: String, +} + +impl std::fmt::Display for UploadError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(status) = self.status { + write!( + f, + "Failed to export span batch code: {status}, reason: {msg}", + msg = self.message, + ) + } else { + write!(f, "Failed to export span batch: {}", self.message) + } + } +} + +impl std::error::Error for UploadError {} + +const ENDPOINT_PATH: &str = "/v1/repos/"; +const ENDPOINT_SUFFIX: &str = "/ci/traces"; + +fn endpoint_url(api_url: &str, repository: &str) -> String { + // The shape `/v1/repos///ci/traces` + // matches the Python implementation. The repository segment is + // pre-validated by `detector::split_owner_repo` at the CLI + // boundary, so we can interpolate it without further escaping. + let trimmed = api_url.trim_end_matches('/'); + format!("{trimmed}{ENDPOINT_PATH}{repository}{ENDPOINT_SUFFIX}") +} + +fn gzip(bytes: &[u8]) -> Result, std::io::Error> { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(bytes)?; + encoder.finish() +} + +/// Encode `request` to OTLP/HTTP/protobuf, gzip it, and POST. +/// +/// `client` is passed in so callers can configure timeouts, TLS, +/// or test-time wiremock interception once and reuse it for both +/// the quarantine API and the OTLP endpoint (Phase C will share +/// a single `reqwest::Client`). +pub async fn upload( + client: &reqwest::Client, + api_url: &str, + token: &str, + repository: &str, + request: &ExportTraceServiceRequest, +) -> Result<(), UploadError> { + if request.resource_spans.is_empty() { + // Match Python's `upload()` short-circuit: no spans, no + // request. The backend would 400 anyway, so we save a + // round trip. + return Ok(()); + } + + let url = endpoint_url(api_url, repository); + + let encoded = request.encode_to_vec(); + let compressed = gzip(&encoded).map_err(|e| UploadError { + status: None, + message: format!("failed to gzip OTLP payload: {e}"), + })?; + + let resp = client + .post(&url) + .bearer_auth(token) + .header("Content-Type", "application/x-protobuf") + .header("Content-Encoding", "gzip") + .body(compressed) + .send() + .await + .map_err(|e| UploadError { + status: None, + message: e.to_string(), + })?; + + if resp.status().is_success() { + return Ok(()); + } + let status = resp.status().as_u16(); + let body = resp + .text() + .await + .unwrap_or_else(|e| format!("")); + Err(UploadError { + status: Some(status), + message: body, + }) +} + +/// Build a `reqwest::Client` with a sensible per-request timeout +/// for OTLP uploads. The default `reqwest` timeout is unlimited, +/// which can hang CI for an hour if the backend is slow. +#[must_use] +pub fn default_client() -> reqwest::Client { + reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .expect("rustls reqwest client builds with default config") +} + +#[cfg(test)] +mod tests { + use super::*; + use flate2::read::GzDecoder; + use opentelemetry_proto::tonic::resource::v1::Resource; + use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span}; + use std::io::Read as _; + use wiremock::matchers::{header, method, path}; + use wiremock::{Mock, MockServer, Request as MockRequest, ResponseTemplate}; + + fn sample_request() -> ExportTraceServiceRequest { + ExportTraceServiceRequest { + resource_spans: vec![ResourceSpans { + resource: Some(Resource { + attributes: Vec::new(), + dropped_attributes_count: 0, + entity_refs: Vec::new(), + }), + scope_spans: vec![ScopeSpans { + scope: None, + spans: vec![Span { + name: "x".into(), + ..Span::default() + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + } + } + + #[test] + fn endpoint_url_matches_python_layout() { + assert_eq!( + endpoint_url("https://api.mergify.com", "owner/repo"), + "https://api.mergify.com/v1/repos/owner/repo/ci/traces" + ); + // Trailing slash on api_url must not produce a double slash. + assert_eq!( + endpoint_url("https://api.mergify.com/", "owner/repo"), + "https://api.mergify.com/v1/repos/owner/repo/ci/traces" + ); + } + + #[tokio::test] + async fn empty_request_skips_http_round_trip() { + // No spans → no request. If the function tries to POST, + // it'll fail because the URL is bogus. + let client = reqwest::Client::new(); + let request = ExportTraceServiceRequest { + resource_spans: Vec::new(), + }; + upload( + &client, + "http://127.0.0.1:1", // refused if actually hit + "token", + "owner/repo", + &request, + ) + .await + .expect("empty request must short-circuit"); + } + + #[tokio::test] + async fn posts_gzipped_protobuf_to_traces_endpoint() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/repos/owner/repo/ci/traces")) + .and(header("Authorization", "Bearer secret")) + .and(header("Content-Type", "application/x-protobuf")) + .and(header("Content-Encoding", "gzip")) + .respond_with(|req: &MockRequest| { + // Decode the body to assert it's valid gzip-protobuf + // round-tripping back into the same request shape. + let mut decoder = GzDecoder::new(req.body.as_slice()); + let mut unzipped = Vec::new(); + decoder + .read_to_end(&mut unzipped) + .expect("body decompresses"); + let decoded_req = ExportTraceServiceRequest::decode(unzipped.as_slice()) + .expect("body decodes to OTLP request"); + assert_eq!(decoded_req.resource_spans.len(), 1); + ResponseTemplate::new(200) + }) + .mount(&server) + .await; + + let client = default_client(); + upload( + &client, + &server.uri(), + "secret", + "owner/repo", + &sample_request(), + ) + .await + .expect("upload succeeds"); + } + + #[tokio::test] + async fn surfaces_http_error_status_and_body() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/repos/owner/repo/ci/traces")) + .respond_with(ResponseTemplate::new(401).set_body_string("bad token")) + .mount(&server) + .await; + + let client = default_client(); + let err = upload( + &client, + &server.uri(), + "wrong", + "owner/repo", + &sample_request(), + ) + .await + .expect_err("401 must surface as UploadError"); + assert_eq!(err.status, Some(401)); + assert!(err.message.contains("bad token"), "got: {}", err.message); + let rendered = err.to_string(); + // The Display impl is what Python prints; match the wording + // so existing log scrapers / docs don't drift. + assert!( + rendered.contains("code: 401") && rendered.contains("reason: bad token"), + "got: {rendered}" + ); + } +} diff --git a/crates/mergify-ci/src/testing.rs b/crates/mergify-ci/src/testing.rs index c0c4a261..092a84a8 100644 --- a/crates/mergify-ci/src/testing.rs +++ b/crates/mergify-ci/src/testing.rs @@ -11,10 +11,14 @@ use std::future::Future; /// Env vars the CI-provider detection chain inspects. Clear every /// one of them before applying the test-specific overrides, so the -/// host environment can't leak into the test. +/// host environment can't leak into the test — running the test +/// suite *on* a real GitHub Actions / `CircleCI` / Jenkins / Buildkite +/// host would otherwise produce `vcs.ref.head.name` etc. values +/// taken from the runner instead of the test's explicit override +/// and silently fail. /// /// `GITHUB_OUTPUT` belongs on this list too — when the suite runs -/// *on* a GHA runner that var points at the runner's real +/// on a GHA runner that var points at the runner's real /// step-output file, and any test that exercises a code path /// appending a heredoc (e.g. `ci scopes` → /// `MERGIFY_SCOPES<)]) -> Vec<(String, Option)> { diff --git a/crates/mergify-cli/src/main.rs b/crates/mergify-cli/src/main.rs index 6bbf8444..1e43d322 100644 --- a/crates/mergify-cli/src/main.rs +++ b/crates/mergify-cli/src/main.rs @@ -155,11 +155,12 @@ const NATIVE_COMMANDS: &[(&str, &str)] = &[ ("freeze", "create"), ("freeze", "update"), ("freeze", "delete"), - // Internal Python migration helper. Listed so `looks_native` - // routes `mergify _internal junit-parse …` past the shim - // fallback when clap rejects it, but it stays hidden from - // `--help` (see the `Subcommands::Internal` variant). + // Internal Python migration helpers. Listed so `looks_native` + // routes `mergify _internal …` past the shim fallback when + // clap rejects it, but they stay hidden from `--help` (see + // the `Subcommands::Internal` variant). ("_internal", "junit-parse"), + ("_internal", "junit-upload"), ]; /// Native commands the Rust binary handles without delegating to @@ -192,6 +193,25 @@ enum NativeCommand { InternalJunitParse { file: PathBuf, }, + /// `_internal junit-upload … --token … --api-url … …` + /// — Python migration helper. Parses every file, builds the + /// OTLP `ExportTraceServiceRequest` with the quarantined set + /// baked in, POSTs gzipped protobuf to the traces endpoint. + /// Wire format is not stable; only the Python code shipped in + /// this wheel may consume it. + InternalJunitUpload(InternalJunitUploadOpts), +} + +struct InternalJunitUploadOpts { + api_url: String, + token: String, + repository: String, + run_id: String, + test_framework: Option, + test_language: Option, + mergify_test_job_name: Option, + quarantined: Vec, + files: Vec, } struct ConfigSimulateOpts { @@ -393,6 +413,32 @@ fn dispatch_from_parsed(parsed: CliRoot) -> Dispatch { Subcommands::Internal(InternalArgs { command: InternalSubcommand::JunitParse(InternalJunitParseArgs { file }), }) => Dispatch::Native(NativeCommand::InternalJunitParse { file }), + Subcommands::Internal(InternalArgs { + command: + InternalSubcommand::JunitUpload(InternalJunitUploadArgs { + api_url, + token, + repository, + run_id, + test_framework, + test_language, + mergify_test_job_name, + quarantined, + files, + }), + }) => Dispatch::Native(NativeCommand::InternalJunitUpload( + InternalJunitUploadOpts { + api_url, + token, + repository, + run_id, + test_framework, + test_language, + mergify_test_job_name, + quarantined, + files, + }, + )), Subcommands::Ci(CiArgs { command: CiSubcommand::Scopes(ScopesCliArgs { @@ -852,6 +898,61 @@ fn run_native(cmd: NativeCommand) -> ExitCode { println!("{json}"); Ok(mergify_core::ExitCode::Success) } + NativeCommand::InternalJunitUpload(opts) => { + // Parse every file, concatenate their cases / + // suite_names, build OTLP spans with the quarantine + // set baked in, POST. The Python orchestrator that + // calls this has already done the quarantine check + // and passes the names via repeated `--quarantined`. + let mut all_cases = Vec::new(); + let mut all_suite_names = Vec::new(); + for path in &opts.files { + let bytes = std::fs::read(path).map_err(|e| { + mergify_core::CliError::Generic(format!( + "cannot read {}: {e}", + path.display(), + )) + })?; + let parsed = mergify_ci::junit_process::junit::parse(&bytes)?; + all_suite_names.extend(parsed.suite_names); + all_cases.extend(parsed.cases); + } + let parsed = mergify_ci::junit_process::junit::ParseResult { + suite_names: all_suite_names, + cases: all_cases, + }; + + let metadata = mergify_ci::junit_process::spans::UploadMetadata { + test_framework: opts.test_framework, + test_language: opts.test_language, + mergify_test_job_name: opts.mergify_test_job_name.or_else(|| { + env::var("MERGIFY_TEST_JOB_NAME") + .ok() + .filter(|s| !s.is_empty()) + }), + run_id: Some(opts.run_id), + quarantined: opts.quarantined.into_iter().collect(), + }; + let built = mergify_ci::junit_process::spans::build_traces(&parsed, &metadata)?; + + // No spans → nothing to send. Matches Python's + // existing `if not spans: return` short-circuit. + if built.request.resource_spans.is_empty() { + return Ok(mergify_core::ExitCode::Success); + } + + let client = mergify_ci::junit_process::upload::default_client(); + mergify_ci::junit_process::upload::upload( + &client, + &opts.api_url, + &opts.token, + &opts.repository, + &built.request, + ) + .await + .map_err(|e| mergify_core::CliError::Generic(e.to_string()))?; + Ok(mergify_core::ExitCode::Success) + } } }); @@ -935,6 +1036,16 @@ enum InternalSubcommand { /// user-facing surface. #[command(name = "junit-parse")] JunitParse(InternalJunitParseArgs), + /// Parse `JUnit` XML files, build the OTLP `ExportTraceServiceRequest` + /// (one session span + one suite span per `` + one + /// case span per ``, tagged with the caller-supplied + /// quarantine set), and POST it as gzipped protobuf to + /// `{api_url}/v1/repos/{repository}/ci/traces`. Used by the + /// Python side of the `junit-process` command during + /// migration to replace the `opentelemetry-exporter-otlp-proto-http` + /// upload path; not a stable user-facing surface. + #[command(name = "junit-upload")] + JunitUpload(InternalJunitUploadArgs), } #[derive(clap::Args)] @@ -944,6 +1055,43 @@ struct InternalJunitParseArgs { file: PathBuf, } +#[derive(clap::Args)] +struct InternalJunitUploadArgs { + /// Mergify API base URL (e.g. `https://api.mergify.com`). + #[arg(long = "api-url")] + api_url: String, + /// Mergify CI Insights bearer token. + #[arg(long)] + token: String, + /// Repository the spans belong to, as `owner/repo`. + #[arg(long)] + repository: String, + /// 16-character hex run identifier the Python orchestrator + /// already printed to its UI. The session span's 8-byte ID + /// decodes from this so wire spans line up with what the + /// user sees in the CLI report. + #[arg(long = "run-id")] + run_id: String, + /// Optional `test.framework` attribute applied to every span. + #[arg(long = "test-framework")] + test_framework: Option, + /// Optional `test.language` attribute applied to every span. + #[arg(long = "test-language")] + test_language: Option, + /// Optional `mergify.test.job.name` resource attribute. Falls + /// back to `MERGIFY_TEST_JOB_NAME` env var when omitted. + #[arg(long = "mergify-test-job-name")] + mergify_test_job_name: Option, + /// Test names the quarantine API reported as currently + /// quarantined. Each case span whose `name` matches gets + /// `cicd.test.quarantined = true`. Repeatable. + #[arg(long = "quarantined", value_name = "TEST_NAME")] + quarantined: Vec, + /// `JUnit` XML files to parse and upload spans for. + #[arg(value_name = "FILE", required = true, num_args = 1..)] + files: Vec, +} + #[derive(clap::Args)] struct ConfigArgs { /// Path to the Mergify configuration file (auto-detected if not diff --git a/mergify_cli/ci/junit_processing/cli.py b/mergify_cli/ci/junit_processing/cli.py index 21905c25..a74be3ed 100644 --- a/mergify_cli/ci/junit_processing/cli.py +++ b/mergify_cli/ci/junit_processing/cli.py @@ -138,13 +138,31 @@ async def process_junit_files( f"{quarantined}/{total} failures quarantined" ) + # The OTLP encode + upload step is the Rust `_internal + # junit-upload` subcommand. It re-parses every file, builds + # the spans (mirroring what `junit_to_spans` did Python-side), + # bakes in the quarantine set, gzips the protobuf, and POSTs. + # We extract the quarantined names from the spans the Python + # quarantine pass mutated above — that captures both + # quarantined-failing and quarantined-passing cases — so the + # Rust span builder reproduces the same wire shape Python used + # to emit. + quarantined_names = [ + span.name + for span in spans + if span.attributes and span.attributes.get("cicd.test.quarantined") is True + ] upload_error: str | None = None try: upload.upload( api_url=api_url, token=token, repository=repository, - spans=spans, + files=files, + run_id=run_id, + quarantined_names=quarantined_names, + test_framework=test_framework, + test_language=test_language, ) except Exception as e: upload_error = str(e) diff --git a/mergify_cli/ci/junit_processing/upload.py b/mergify_cli/ci/junit_processing/upload.py index 9ff6e04a..1356e7aa 100644 --- a/mergify_cli/ci/junit_processing/upload.py +++ b/mergify_cli/ci/junit_processing/upload.py @@ -1,64 +1,100 @@ +"""Upload JUnit test results to Mergify CI Insights as OTLP spans. + +The encode-and-upload step shells out to the native Rust binary's +hidden `_internal junit-upload` subcommand — it re-parses the +JUnit XML files, builds the OTLP `ExportTraceServiceRequest` +(with the caller-supplied quarantine set baked into each case +span's `cicd.test.quarantined` attribute), gzips the protobuf, +and POSTs it to `/v1/repos///ci/traces`. + +This module is the Python→Rust migration bridge for the upload +step. When `ci junit-process` is fully ported to Rust (Phase C, +later in the same stack), this module, its callers, and the +`_internal junit-upload` Rust subcommand all go away. +""" + from __future__ import annotations +import subprocess import typing -from opentelemetry.exporter.otlp.proto.http import Compression -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.trace import export +from mergify_cli.ci.junit_processing import junit if typing.TYPE_CHECKING: - from opentelemetry.sdk.trace import ReadableSpan + from collections.abc import Iterable class UploadError(Exception): pass -class _OTLPSpanExporterWithBody(OTLPSpanExporter): - last_failure_status: int | None = None - last_failure_body: str | None = None - - def _export( - self, - serialized_data: bytes, - timeout_sec: float | None = None, - ) -> typing.Any: - resp = super()._export(serialized_data, timeout_sec) - if not resp.ok: - self.last_failure_status = resp.status_code - self.last_failure_body = resp.text - return resp - - -def upload_spans( +def upload( api_url: str, token: str, repository: str, - spans: list[ReadableSpan], + files: tuple[str, ...], + run_id: str, + quarantined_names: Iterable[str] = (), + test_framework: str | None = None, + test_language: str | None = None, + mergify_test_job_name: str | None = None, ) -> None: - exporter = _OTLPSpanExporterWithBody( - endpoint=f"{api_url}/v1/repos/{repository}/ci/traces", - headers={"Authorization": f"Bearer {token}"}, - compression=Compression.Gzip, - ) - result = exporter.export(spans) + """Upload spans for `files` to Mergify CI Insights. - if result == export.SpanExportResult.FAILURE: - if exporter.last_failure_status is not None: - raise UploadError( - f"Failed to export span batch code: {exporter.last_failure_status}, " - f"reason: {exporter.last_failure_body}", - ) - raise UploadError("Failed to export span batch") + `files` is the original tuple of paths passed to + `process_junit_files`; the Rust binary re-parses them so the + span attributes (suite name, classname-qualified test name, + file/line, exception type/message/stacktrace) come straight + from the same parser the Python side already used to compute + the failing set. + `run_id` must be the same 16-char hex identifier Python + generated upstream (`junit.files_to_spans` returns it) so the + session-span ID embedded in the upload matches what the CLI + report later prints. -def upload( - api_url: str, - token: str, - repository: str, - spans: list[ReadableSpan], -) -> None: - if not spans: + `quarantined_names` is the set of test names the quarantine + API said are currently quarantined. The Rust span builder + sets `cicd.test.quarantined = true` on the matching case + spans; everything else defaults to false. + """ + if not files: return - upload_spans(api_url, token, repository, spans) + + binary = junit._resolve_mergify_binary() + cmd = [ + binary, + "_internal", + "junit-upload", + "--api-url", + api_url, + "--token", + token, + "--repository", + repository, + "--run-id", + run_id, + ] + if test_framework is not None: + cmd.extend(["--test-framework", test_framework]) + if test_language is not None: + cmd.extend(["--test-language", test_language]) + if mergify_test_job_name is not None: + cmd.extend(["--mergify-test-job-name", mergify_test_job_name]) + for name in quarantined_names: + cmd.extend(["--quarantined", name]) + cmd.extend(files) + + result = subprocess.run( # noqa: S603 + cmd, + capture_output=True, + check=False, + ) + if result.returncode != 0: + stderr = result.stderr.decode("utf-8", errors="replace").strip() + # The Rust binary's generic error wrapper prefixes every + # error with `mergify: ` — strip it so the message reads + # cleanly when surfaced via UploadError. + details = stderr.removeprefix("mergify: ") + raise UploadError(details or "junit-upload failed") diff --git a/mergify_cli/tests/ci/junit_processing/test_cli.py b/mergify_cli/tests/ci/junit_processing/test_cli.py index f725191d..121415ad 100644 --- a/mergify_cli/tests/ci/junit_processing/test_cli.py +++ b/mergify_cli/tests/ci/junit_processing/test_cli.py @@ -4,7 +4,6 @@ import pathlib from unittest import mock -import anys import httpx from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace import id_generator @@ -647,11 +646,21 @@ async def test_upload_failure( assert result.exit_code == 0 assert result.upload_mock.call_count == 1 + # Phase B migration: the Python `upload.upload` now wraps a + # subprocess to `mergify _internal junit-upload`. The new + # signature takes `files` (original tuple) + `run_id` + + # `quarantined_names` (the union the Rust span builder bakes + # into `cicd.test.quarantined`) instead of the pre-built + # `spans` list. assert result.upload_mock.call_args.kwargs == { "api_url": "https://api.mergify.com", "token": "foobar", "repository": "foo/bar", - "spans": anys.ANY_LIST, + "files": (str(REPORT_ALL_PASS_XML),), + "run_id": "00000002dfdc1c3e", + "quarantined_names": [], + "test_framework": None, + "test_language": None, } assert result.stdout == ( "══════════════════════════════════════════\n" diff --git a/mergify_cli/tests/ci/junit_processing/test_upload.py b/mergify_cli/tests/ci/junit_processing/test_upload.py index 5da28e1c..98968d8e 100644 --- a/mergify_cli/tests/ci/junit_processing/test_upload.py +++ b/mergify_cli/tests/ci/junit_processing/test_upload.py @@ -1,13 +1,22 @@ +"""Tests for the `upload` bridge that shells out to +`mergify _internal junit-upload`. + +The actual HTTP upload behavior is covered by the Rust-side +wiremock tests in `crates/mergify-ci/src/junit_process/upload.rs` +(happy path, empty request short-circuit, 401 error surface). +These Python tests focus on the subprocess wiring: that the right +binary gets invoked with the right flag set, and that non-zero +exit codes surface as `UploadError` with the prefix stripped. +""" + from __future__ import annotations import pathlib +import subprocess +from unittest import mock -from opentelemetry.sdk.trace import ReadableSpan -import opentelemetry.trace.span import pytest -import responses -from mergify_cli.ci.junit_processing import junit from mergify_cli.ci.junit_processing import upload @@ -15,97 +24,135 @@ REPORT_XML = FIXTURES_DIR / "report.xml" -@responses.activate(assert_all_requests_are_fired=True) -@pytest.mark.parametrize( - "env", - [ - pytest.param( - { - "GITHUB_EVENT_NAME": "push", - "GITHUB_ACTIONS": "true", - "MERGIFY_API_URL": "https://api.mergify.com", - "MERGIFY_TOKEN": "abc", - "GITHUB_REPOSITORY": "user/repo", - "GITHUB_SHA": "3af96aa24f1d32fcfbb7067793cacc6dc0c6b199", - "GITHUB_WORKFLOW": "JOB", - }, - id="GitHub", - ), - pytest.param( - { - "GITHUB_ACTIONS": "", - "CIRCLECI": "true", - "MERGIFY_API_URL": "https://api.mergify.com", - "MERGIFY_TOKEN": "abc", - "CIRCLE_REPOSITORY_URL": "https://github.com/user/repo", - "CIRCLE_SHA1": "3af96aa24f1d32fcfbb7067793cacc6dc0c6b199", - "CIRCLE_JOB": "JOB", - }, - id="CircleCI", - ), - ], -) -async def test_junit_upload( - env: dict[str, str], - monkeypatch: pytest.MonkeyPatch, -) -> None: - run_id, spans = await junit.files_to_spans(files=(str(REPORT_XML),)) - for key, value in env.items(): - monkeypatch.setenv(key, value) - - responses.post( - "https://api.mergify.com/v1/repos/user/repo/ci/traces", +def _completed( + returncode: int = 0, + stderr: bytes = b"", +) -> subprocess.CompletedProcess[bytes]: + return subprocess.CompletedProcess( + args=[], + returncode=returncode, + stdout=b"", + stderr=stderr, ) - upload.upload( - "https://api.mergify.com", - "token", - "user/repo", - spans, - ) - - assert len(bytes.fromhex(run_id)) == 8 - -@responses.activate(assert_all_requests_are_fired=True) -def test_junit_upload_http_error() -> None: - responses.post( - "https://api.mergify.com/v1/repos/user/repo/ci/traces", - status=422, - json={"detail": "Not enabled on this repository"}, - ) +def test_upload_invokes_subcommand_with_all_metadata() -> None: + with ( + mock.patch.object( + upload.junit, "_resolve_mergify_binary", return_value="/bin/mergify" + ), + mock.patch.object( + upload.subprocess, + "run", + return_value=_completed(), + ) as run_mock, + ): + upload.upload( + api_url="https://api.mergify.com", + token="secret", + repository="user/repo", + files=(str(REPORT_XML), "other.xml"), + run_id="0011223344556677", + quarantined_names=["test_a", "test_b"], + test_framework="pytest", + test_language="python", + mergify_test_job_name="ci-job", + ) - with pytest.raises(upload.UploadError): - upload.upload_spans( - "https://api.mergify.com", - "token", - "user/repo", - [ - ReadableSpan( - name="hello", - context=opentelemetry.trace.span.SpanContext( - trace_id=1234, - span_id=324, - is_remote=False, - ), - ), - ], + run_mock.assert_called_once() + cmd = run_mock.call_args.args[0] + assert cmd == [ + "/bin/mergify", + "_internal", + "junit-upload", + "--api-url", + "https://api.mergify.com", + "--token", + "secret", + "--repository", + "user/repo", + "--run-id", + "0011223344556677", + "--test-framework", + "pytest", + "--test-language", + "python", + "--mergify-test-job-name", + "ci-job", + "--quarantined", + "test_a", + "--quarantined", + "test_b", + str(REPORT_XML), + "other.xml", + ] + + +def test_upload_omits_optional_flags_when_unset() -> None: + with ( + mock.patch.object( + upload.junit, "_resolve_mergify_binary", return_value="/bin/mergify" + ), + mock.patch.object( + upload.subprocess, + "run", + return_value=_completed(), + ) as run_mock, + ): + upload.upload( + api_url="https://api.mergify.com", + token="secret", + repository="user/repo", + files=(str(REPORT_XML),), + run_id="0011223344556677", ) + cmd = run_mock.call_args.args[0] + # No --test-framework / --test-language / --mergify-test-job-name + # / --quarantined flags when the caller didn't supply them. + for flag in ( + "--test-framework", + "--test-language", + "--mergify-test-job-name", + "--quarantined", + ): + assert flag not in cmd, f"{flag!r} should not appear in cmd: {cmd!r}" + + +def test_upload_short_circuits_on_empty_files() -> None: + # No files → no subprocess. Mirrors the Python pre-Rust + # behavior where `upload.upload` returned early on empty + # spans, saving a no-op round trip. + with mock.patch.object(upload.subprocess, "run") as run_mock: + upload.upload( + api_url="https://api.mergify.com", + token="secret", + repository="user/repo", + files=(), + run_id="0011223344556677", + ) + run_mock.assert_not_called() -@responses.activate(assert_all_requests_are_fired=True) -async def test_junit_upload_http_error_raises() -> None: - responses.post( - "https://api.mergify.com/v1/repos/user/repo/ci/traces", - status=422, - json={"detail": "Not enabled on this repository"}, - ) - _run_id, spans = await junit.files_to_spans(files=(str(REPORT_XML),)) - with pytest.raises(upload.UploadError, match="422"): +def test_upload_surfaces_subprocess_failure_with_prefix_stripped() -> None: + with ( + mock.patch.object( + upload.junit, "_resolve_mergify_binary", return_value="/bin/mergify" + ), + mock.patch.object( + upload.subprocess, + "run", + return_value=_completed( + returncode=1, + stderr=b"mergify: HTTP 422: Not enabled on this repository\n", + ), + ), + pytest.raises(upload.UploadError, match="HTTP 422"), + ): upload.upload( - "https://api.mergify.com", - "token", - "user/repo", - spans, + api_url="https://api.mergify.com", + token="secret", + repository="user/repo", + files=(str(REPORT_XML),), + run_id="0011223344556677", ) diff --git a/mergify_cli/tests/ci/test_cli.py b/mergify_cli/tests/ci/test_cli.py index 4e6f9163..10eec9e4 100644 --- a/mergify_cli/tests/ci/test_cli.py +++ b/mergify_cli/tests/ci/test_cli.py @@ -83,12 +83,20 @@ def test_cli(env: dict[str, str], monkeypatch: pytest.MonkeyPatch) -> None: ) assert result_process.exit_code == 0, result_process.stdout assert result_upload.exit_code == 0, result_upload.stdout + # Phase B migration: `upload.upload` now wraps a subprocess + # to `mergify _internal junit-upload`. The signature takes + # `files` (original tuple) + `run_id` + `quarantined_names` + # instead of the pre-built `spans` list. assert mocked_upload.call_count == 2 assert mocked_upload.call_args.kwargs == { "api_url": "https://api.mergify.com", "token": "abc", "repository": "user/repo", - "spans": anys.ANY_LIST, + "files": (str(REPORT_XML),), + "run_id": anys.ANY_STR, + "quarantined_names": [], + "test_framework": None, + "test_language": None, } diff --git a/pyproject.toml b/pyproject.toml index 19c75f10..8adc5bfa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,13 @@ dependencies = [ "rich==15.0.0", "aiofiles==25.1.0", "click==8.4.1", - "opentelemetry-exporter-otlp-proto-http==1.42.1", + # `opentelemetry-exporter-otlp-proto-http` is gone — the OTLP + # encode-and-upload step now shells out to the native Rust + # binary (`mergify _internal junit-upload`) instead. The SDK + # stays because the Python `junit_to_spans` bridge still + # builds `ReadableSpan` objects so the quarantine + report + # paths can keep using span attributes; that goes away too + # in Phase C when the orchestrator is fully native. "opentelemetry-sdk==1.42.1", "tenacity==9.1.4", "pyyaml==6.0.3", diff --git a/uv.lock b/uv.lock index ebf1a5c2..7adbe2df 100644 --- a/uv.lock +++ b/uv.lock @@ -145,18 +145,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] -[[package]] -name = "googleapis-common-protos" -version = "1.66.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "protobuf" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/ff/a7/8e9cccdb1c49870de6faea2a2764fa23f627dd290633103540209f03524c/googleapis_common_protos-1.66.0.tar.gz", hash = "sha256:c3e7b33d15fdca5374cc0a7346dd92ffa847425cc4ea941d970f13680052ec8c", size = 114376, upload-time = "2024-11-12T17:33:38.494Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/a0/0f/c0713fb2b3d28af4b2fded3291df1c4d4f79a00d15c2374a9e010870016c/googleapis_common_protos-1.66.0-py2.py3-none-any.whl", hash = "sha256:d7abcd75fabb2e0ec9f74466401f6c119a0b498e27370e9be4c94cb7e382b8ed", size = 221682, upload-time = "2024-11-12T17:33:37.067Z" }, -] - [[package]] name = "h11" version = "0.16.0" @@ -316,7 +304,6 @@ dependencies = [ { name = "click" }, { name = "httpx" }, { name = "jsonschema" }, - { name = "opentelemetry-exporter-otlp-proto-http" }, { name = "opentelemetry-sdk" }, { name = "pydantic" }, { name = "pyyaml" }, @@ -349,7 +336,6 @@ requires-dist = [ { name = "click", specifier = "==8.4.1" }, { name = "httpx", specifier = "==0.28.1" }, { name = "jsonschema", specifier = "==4.26.0" }, - { name = "opentelemetry-exporter-otlp-proto-http", specifier = "==1.42.1" }, { name = "opentelemetry-sdk", specifier = "==1.42.1" }, { name = "pydantic", specifier = "==2.13.4" }, { name = "pyyaml", specifier = "==6.0.3" }, @@ -434,48 +420,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a3/ca/9520cc1f3dfbbd03ac5903bbf55833e257bc64b1cf30fa8b0d6df374d821/opentelemetry_api-1.42.1-py3-none-any.whl", hash = "sha256:51a69edacadbc03a8950ace1c4c21099cacc538820ac2c9e36277e78cebba714", size = 61311, upload-time = "2026-05-21T16:32:28.822Z" }, ] -[[package]] -name = "opentelemetry-exporter-otlp-proto-common" -version = "1.42.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "opentelemetry-proto" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/0e/9c/216acfeaedadf2e1937f4373929b20f73197c5c4a2546d4f584b7fa63813/opentelemetry_exporter_otlp_proto_common-1.42.1.tar.gz", hash = "sha256:04f1f01fb597c4249dfcd7f8b861c902c2102369d376d9d346ff38de4469a2ee", size = 21433, upload-time = "2026-05-21T16:32:55.526Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/d6/43/2375e7612e1121a4518c17603b6e0b03ad94f565aafad53f464dc5be2bf6/opentelemetry_exporter_otlp_proto_common-1.42.1-py3-none-any.whl", hash = "sha256:f48d395ab815b444da118868977e9798ea354c25737d5cf39578ae894011c140", size = 17327, upload-time = "2026-05-21T16:32:33.387Z" }, -] - -[[package]] -name = "opentelemetry-exporter-otlp-proto-http" -version = "1.42.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "googleapis-common-protos" }, - { name = "opentelemetry-api" }, - { name = "opentelemetry-exporter-otlp-proto-common" }, - { name = "opentelemetry-proto" }, - { name = "opentelemetry-sdk" }, - { name = "requests" }, - { name = "typing-extensions" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/77/32/826bfa1d80ecea24f47808de03cd4a0d13c17ecc07712f45123f0f61e4ac/opentelemetry_exporter_otlp_proto_http-1.42.1.tar.gz", hash = "sha256:bf142a21035d7571ac3a09cb2e5639f49886f243972883cfe777ed3bf02b734d", size = 25406, upload-time = "2026-05-21T16:32:56.807Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/d3/96/82cb223a1502f0787d4bbff12907f5f8d870a50731febcd5818d93ef9555/opentelemetry_exporter_otlp_proto_http-1.42.1-py3-none-any.whl", hash = "sha256:00a16da1b312a1d6c7233d600d557c91df71125af73020f3b9a7765bd699d59d", size = 21793, upload-time = "2026-05-21T16:32:35.277Z" }, -] - -[[package]] -name = "opentelemetry-proto" -version = "1.42.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "protobuf" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/b4/55/63eac3e1089b768ba014091fdd2ae8a9a440c821ef5e2b786909c94c8836/opentelemetry_proto-1.42.1.tar.gz", hash = "sha256:c6a51e6b4f05ae63565f3a113217f3d2bfaec68f78c02d7a6c85f9010d1cfca6", size = 45839, upload-time = "2026-05-21T16:33:03.937Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/41/9d/171c02c84a76940b7e601805b3bb536985aded9168fbcc9ba52f0a730fa2/opentelemetry_proto-1.42.1-py3-none-any.whl", hash = "sha256:dedb74cba2886c59c7789b227a7a670613025a07489040050aedff6e5c0fb43c", size = 71782, upload-time = "2026-05-21T16:32:44.867Z" }, -] - [[package]] name = "opentelemetry-sdk" version = "1.42.1" @@ -564,20 +508,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/84/03/0d3ce49e2505ae70cf43bc5bb3033955d2fc9f932163e84dc0779cc47f48/prompt_toolkit-3.0.52-py3-none-any.whl", hash = "sha256:9aac639a3bbd33284347de5ad8d68ecc044b91a762dc39b7c21095fcd6a19955", size = 391431, upload-time = "2025-08-27T15:23:59.498Z" }, ] -[[package]] -name = "protobuf" -version = "5.29.6" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/7e/57/394a763c103e0edf87f0938dafcd918d53b4c011dfc5c8ae80f3b0452dbb/protobuf-5.29.6.tar.gz", hash = "sha256:da9ee6a5424b6b30fd5e45c5ea663aef540ca95f9ad99d1e887e819cdf9b8723", size = 425623, upload-time = "2026-02-04T22:54:40.584Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/d4/88/9ee58ff7863c479d6f8346686d4636dd4c415b0cbeed7a6a7d0617639c2a/protobuf-5.29.6-cp310-abi3-win32.whl", hash = "sha256:62e8a3114992c7c647bce37dcc93647575fc52d50e48de30c6fcb28a6a291eb1", size = 423357, upload-time = "2026-02-04T22:54:25.805Z" }, - { url = "https://files.pythonhosted.org/packages/1c/66/2dc736a4d576847134fb6d80bd995c569b13cdc7b815d669050bf0ce2d2c/protobuf-5.29.6-cp310-abi3-win_amd64.whl", hash = "sha256:7e6ad413275be172f67fdee0f43484b6de5a904cc1c3ea9804cb6fe2ff366eda", size = 435175, upload-time = "2026-02-04T22:54:28.592Z" }, - { url = "https://files.pythonhosted.org/packages/06/db/49b05966fd208ae3f44dcd33837b6243b4915c57561d730a43f881f24dea/protobuf-5.29.6-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:b5a169e664b4057183a34bdc424540e86eea47560f3c123a0d64de4e137f9269", size = 418619, upload-time = "2026-02-04T22:54:30.266Z" }, - { url = "https://files.pythonhosted.org/packages/b7/d7/48cbf6b0c3c39761e47a99cb483405f0fde2be22cf00d71ef316ce52b458/protobuf-5.29.6-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:a8866b2cff111f0f863c1b3b9e7572dc7eaea23a7fae27f6fc613304046483e6", size = 320284, upload-time = "2026-02-04T22:54:31.782Z" }, - { url = "https://files.pythonhosted.org/packages/e3/dd/cadd6ec43069247d91f6345fa7a0d2858bef6af366dbd7ba8f05d2c77d3b/protobuf-5.29.6-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:e3387f44798ac1106af0233c04fb8abf543772ff241169946f698b3a9a3d3ab9", size = 320478, upload-time = "2026-02-04T22:54:32.909Z" }, - { url = "https://files.pythonhosted.org/packages/5a/cb/e3065b447186cb70aa65acc70c86baf482d82bf75625bf5a2c4f6919c6a3/protobuf-5.29.6-py3-none-any.whl", hash = "sha256:6b9edb641441b2da9fa8f428760fc136a49cf97a52076010cf22a2ff73438a86", size = 173126, upload-time = "2026-02-04T22:54:39.462Z" }, -] - [[package]] name = "pydantic" version = "2.13.4"