From 5f2b4646dff1725511be6566a4c71964dcbecfd8 Mon Sep 17 00:00:00 2001 From: bobbyiliev Date: Tue, 19 May 2026 16:53:12 +0300 Subject: [PATCH] mcp: auto-route read_data_product to the data product's catalog cluster --- src/environmentd/src/http/mcp.rs | 150 +++++++++++++++--- src/environmentd/tests/server.rs | 61 +++++++ src/environmentd/tests/testdata/mcp/agent | 6 +- .../tests/testdata/mcp/agent_query_tool | 2 +- test/mcp/mzcompose.py | 99 ++++++++++++ 5 files changed, 291 insertions(+), 27 deletions(-) diff --git a/src/environmentd/src/http/mcp.rs b/src/environmentd/src/http/mcp.rs index fd2f63bc4982a..b6ea365be0395 100644 --- a/src/environmentd/src/http/mcp.rs +++ b/src/environmentd/src/http/mcp.rs @@ -655,9 +655,10 @@ fn endpoint_instructions(endpoint_type: McpEndpointType) -> Option { McpEndpointType::Agent => Some(concat!( "You have access to Materialize data products via MCP. ", "Prefer indexed objects (served from memory) over unindexed materialized views ", - "(read from persistent storage). Indexes are cluster-local; if a data product's ", - "cluster differs from your session, pass the `cluster` parameter to `read_data_product` ", - "so the index is actually used.", + "(read from persistent storage). `read_data_product` automatically routes the ", + "read to the cluster recorded in the data product catalog so indexes are used; ", + "you only need to set the `cluster` parameter if you intentionally want the ", + "read to run on a different cluster (e.g. one with larger or more replicas).", ).to_string()), McpEndpointType::Developer => Some(concat!( "You are connected to the Materialize developer MCP server. ", @@ -745,7 +746,7 @@ async fn handle_tools_list( }, "cluster": { "type": "string", - "description": "Optional cluster override. If omitted, uses the cluster from the data product catalog." + "description": "Optional override. By default, the read runs on the cluster recorded in the data product catalog (where the index or materialized view dataflow lives), so indexed reads actually hit their arrangement. Set this only to intentionally run the same read on a different cluster — e.g. one with more or larger replicas, or to compare cost/latency." } }, "required": ["name"] @@ -981,11 +982,21 @@ fn safe_data_product_name(name: &str) -> Result { /// Read rows from a data product. Issues a single read-only query. /// -/// When `cluster_override` is provided, sets the cluster explicitly. -/// Otherwise the query runs on the session's default cluster. +/// By default the read is routed to the cluster recorded in the data product +/// catalog (`mz_mcp_data_products.cluster`), so reads of indexed views +/// actually hit the index's in-memory arrangement instead of falling back to +/// a full recompute through persist on whatever cluster the session happens +/// to default to. `cluster_override` bypasses that routing and runs the +/// same read on a named cluster instead — useful for running the read on a +/// differently-sized or differently-replicated cluster. /// -/// The name is expected to come from `get_data_products()` / `get_data_product_details()`. -/// The query runs inside a READ ONLY transaction, preventing mutations. +/// If both the override and the catalog cluster are absent (the catalog +/// cluster column is unexpectedly null), the query falls back to the +/// session's default cluster. +/// +/// The name is expected to come from `get_data_products()` / +/// `get_data_product_details()`. The query runs inside a READ ONLY +/// transaction, preventing mutations. async fn read_data_product( client: &mut AuthedClient, name: &str, @@ -998,40 +1009,69 @@ async fn read_data_product( // Parse and safely quote the name for SQL interpolation. let safe_name = safe_data_product_name(name)?; - // Lightweight existence check: verify the data product is visible in the - // catalog before running the read query. This gives a clean DataProductNotFound - // error for missing or inaccessible products (including RBAC revocations) - // without relying on fragile error code matching. - // TODO: Remove this extra round-trip once catalog errors get specific SQL - // error codes (see TODO in src/adapter/src/error.rs `fn code()`), then we - // can translate the query error directly. + // Lookup query: serves as the existence check (we still error + // `DataProductNotFound` on an empty result) and at the same time + // recovers the catalog cluster for auto-routing. `mz_mcp_data_products` + // can hold multiple rows per object_name when a product is indexed on + // multiple clusters; `ORDER BY cluster NULLS LAST LIMIT 1` picks one + // deterministically. Callers can disambiguate via `cluster_override`. + // + // TODO: Remove this extra round-trip once catalog errors get specific + // SQL error codes (see TODO in src/adapter/src/error.rs `fn code()`), + // then we can translate the query error directly and drop the lookup. let lookup_query = format!( - "SELECT 1 FROM mz_internal.mz_mcp_data_products WHERE object_name = {}", + "SELECT cluster FROM mz_internal.mz_mcp_data_products \ + WHERE object_name = {} \ + ORDER BY cluster NULLS LAST \ + LIMIT 1", escaped_string_literal(name) ); let lookup_rows = execute_sql(client, &lookup_query).await?; if lookup_rows.is_empty() { return Err(McpRequestError::DataProductNotFound(name.to_string())); } + let catalog_cluster: Option<&str> = lookup_rows + .first() + .and_then(|row| row.first()) + .and_then(|v| v.as_str()); + + // Override beats catalog; catalog beats session default. The override + // path is unchanged from before, so explicit callers are not affected + // by the auto-routing change. + let target_cluster = cluster_override.or(catalog_cluster); // No row cap is applied here: the response is bounded by the size cap // enforced in format_rows_response (MCP_MAX_RESPONSE_SIZE), and by // max_result_size at the adapter layer. Mirrors the SQL HTTP endpoint, // which also leans on a size cap rather than a row cap. - let read_query = match cluster_override { + let read_query = build_read_query(&safe_name, limit, target_cluster); + + let rows = execute_sql(client, &read_query).await?; + + format_rows_response(rows, max_response_size) +} + +/// Builds the SQL the agent runs for `read_data_product`. +/// +/// `safe_name` must already be the validated, quoted form produced by +/// [`safe_data_product_name`]. `target_cluster`, when provided, is escaped +/// as a SQL string literal and wrapped in `SET CLUSTER` inside a `BEGIN +/// READ ONLY` transaction so the cluster choice is scoped to this read and +/// does not leak into the session. +/// +/// When `target_cluster` is `None` we emit a single bare `SELECT` instead +/// of opening a transaction, which avoids three extra round trips on the +/// hot path of session-default reads. +fn build_read_query(safe_name: &str, limit: u32, target_cluster: Option<&str>) -> String { + match target_cluster { Some(cluster) => format!( "BEGIN READ ONLY; SET CLUSTER = {}; SELECT * FROM {} LIMIT {}\n; COMMIT;", escaped_string_literal(cluster), safe_name, limit, ), - // Single statement — skip explicit transaction for better performance. None => format!("SELECT * FROM {} LIMIT {}", safe_name, limit), - }; - - let rows = execute_sql(client, &read_query).await?; - - format_rows_response(rows, max_response_size) + } } /// Validates query is a single SELECT, SHOW, or EXPLAIN statement. @@ -1797,6 +1837,70 @@ mod tests { assert!(safe_data_product_name("my_view WHERE 1=1 --").is_err()); } + // ── build_read_query tests (DEX-27) ──────────────────────────────── + + /// Without a target cluster, the read is a single bare `SELECT` so + /// session-default reads avoid the three extra round trips that + /// `BEGIN READ ONLY; SET CLUSTER; ...; COMMIT;` would cost. + #[mz_ore::test] + fn test_build_read_query_no_cluster() { + let sql = build_read_query("\"db\".\"sch\".\"v\"", 100, None); + assert_eq!(sql, "SELECT * FROM \"db\".\"sch\".\"v\" LIMIT 100"); + assert!( + !sql.contains("SET CLUSTER"), + "session-default reads should not emit SET CLUSTER: {sql}", + ); + assert!( + !sql.contains("BEGIN"), + "session-default reads should not open a transaction: {sql}", + ); + } + + /// With a target cluster, the read is wrapped in a `BEGIN READ ONLY` + /// transaction so the `SET CLUSTER` scope is bounded to this read and + /// does not leak into the rest of the session. + #[mz_ore::test] + fn test_build_read_query_with_cluster() { + let sql = build_read_query("\"db\".\"sch\".\"v\"", 50, Some("prod_cluster")); + assert!(sql.contains("BEGIN READ ONLY"), "{sql}"); + assert!(sql.contains("SET CLUSTER = 'prod_cluster'"), "{sql}"); + assert!( + sql.contains("SELECT * FROM \"db\".\"sch\".\"v\" LIMIT 50"), + "{sql}", + ); + assert!(sql.contains("COMMIT"), "{sql}"); + } + + /// Cluster names with single quotes and backslashes must be escaped + /// the same way as the `name` argument, since they end up interpolated + /// into a SQL string literal. Defends against catalog-injection or + /// adversarial cluster names. + #[mz_ore::test] + fn test_build_read_query_escapes_cluster_name() { + let sql = build_read_query( + "\"db\".\"sch\".\"v\"", + 10, + Some("evil'; DROP TABLE secrets; --"), + ); + // The single quote in `evil'` must be doubled inside the literal. + assert!( + sql.contains("SET CLUSTER = 'evil''; DROP TABLE secrets; --'"), + "single quote should be doubled inside the literal: {sql}", + ); + // The injected `DROP TABLE` must remain inside the literal — i.e. + // there is no second statement that escapes the SET CLUSTER call. + assert_eq!( + sql.matches("SET CLUSTER").count(), + 1, + "exactly one SET CLUSTER statement: {sql}", + ); + assert_eq!( + sql.matches("DROP TABLE").count(), + 1, + "DROP TABLE should appear once, inside the quoted literal: {sql}", + ); + } + #[mz_ore::test] fn test_mcp_error_codes() { assert_eq!( diff --git a/src/environmentd/tests/server.rs b/src/environmentd/tests/server.rs index 3ba3dbfa08c8c..b5cb1c8959413 100644 --- a/src/environmentd/tests/server.rs +++ b/src/environmentd/tests/server.rs @@ -5613,6 +5613,67 @@ fn test_mcp_agent_with_data_product() { let rows: serde_json::Value = serde_json::from_str(rows_text).unwrap(); assert_eq!(rows.as_array().unwrap().len(), 1); + // DEX-27: an MV whose home cluster is NOT the HTTP session's default + // (`quickstart`) must still be readable without the caller passing a + // `cluster` argument. Before the auto-routing change the read would + // execute against `quickstart`, missing the index. Reading without an + // override must still succeed and return the row. + { + let mut super_user = server + .pg_config_internal() + .user(&SYSTEM_USER.name) + .connect(postgres::NoTls) + .unwrap(); + super_user + .batch_execute( + "CREATE CLUSTER dex27_other_cluster REPLICAS (r1 (SIZE 'scale=1,workers=1'))", + ) + .unwrap(); + super_user + .batch_execute( + "CREATE MATERIALIZED VIEW test_off_default IN CLUSTER dex27_other_cluster \ + AS SELECT 42::int AS id, 'off_default'::text AS name", + ) + .unwrap(); + super_user + .batch_execute(&format!( + "GRANT SELECT ON test_off_default TO {}", + &HTTP_DEFAULT_USER.name + )) + .unwrap(); + super_user + .batch_execute(&format!( + "GRANT USAGE ON CLUSTER dex27_other_cluster TO {}", + &HTTP_DEFAULT_USER.name + )) + .unwrap(); + } + // The MV was created after the cached `get_data_products` snapshot, so + // build the qualified name directly rather than re-querying. + let off_default_name = r#""materialize"."public"."test_off_default""#.to_string(); + let (status, body) = mcp_post( + &agents_url, + serde_json::json!({ + "jsonrpc": "2.0", + "id": 41, + "method": "tools/call", + "params": { + "name": "read_data_product", + "arguments": {"name": off_default_name, "limit": 10} + } + }), + ); + assert_eq!(status, StatusCode::OK); + assert!( + body["error"].is_null(), + "no-override read on off-default cluster should auto-route, got: {body}", + ); + let rows_text = body["result"]["content"][0]["text"].as_str().unwrap(); + let rows: serde_json::Value = serde_json::from_str(rows_text).unwrap(); + assert_eq!(rows.as_array().unwrap().len(), 1); + assert_eq!(rows[0][0].as_str().unwrap(), "42"); + assert_eq!(rows[0][1].as_str().unwrap(), "off_default"); + // read_data_product with limit 0 should return no rows. let (status, body) = mcp_post( &agents_url, diff --git a/src/environmentd/tests/testdata/mcp/agent b/src/environmentd/tests/testdata/mcp/agent index 81221cae24beb..0dc3b0ba6df3a 100644 --- a/src/environmentd/tests/testdata/mcp/agent +++ b/src/environmentd/tests/testdata/mcp/agent @@ -16,7 +16,7 @@ mcp-agent {"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-11-25","capabilities":{},"clientInfo":{"name":"test-client","version":"1.0.0"}}} ---- 200 OK -{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-11-25","capabilities":{"tools":{}},"serverInfo":{"name":"materialize-mcp-agent","version":""},"instructions":"You have access to Materialize data products via MCP. Prefer indexed objects (served from memory) over unindexed materialized views (read from persistent storage). Indexes are cluster-local; if a data product's cluster differs from your session, pass the `cluster` parameter to `read_data_product` so the index is actually used."}} +{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-11-25","capabilities":{"tools":{}},"serverInfo":{"name":"materialize-mcp-agent","version":""},"instructions":"You have access to Materialize data products via MCP. Prefer indexed objects (served from memory) over unindexed materialized views (read from persistent storage). `read_data_product` automatically routes the read to the cluster recorded in the data product catalog so indexes are used; you only need to set the `cluster` parameter if you intentionally want the read to run on a different cluster (e.g. one with larger or more replicas)."}} # ============================================================================= # Tools List (query tool disabled by default) @@ -27,7 +27,7 @@ mcp-agent {"jsonrpc":"2.0","id":2,"method":"tools/list"} ---- 200 OK -{"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"get_data_products","title":"List Data Products","description":"Discover all available real-time data views (data products) that represent business entities like customers, orders, products, etc. Each data product provides fresh, queryable data with defined schemas. Use this first to see what data is available before querying specific information.","inputSchema":{"type":"object","properties":{},"required":[]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"get_data_product_details","title":"Get Data Product Details","description":"Get the complete schema and structure of a specific data product. This shows you exactly what fields are available, their types, and what data you can query. Use this after finding a data product from get_data_products() to understand how to query it.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact name of the data product from get_data_products() list"}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"read_data_product","title":"Read Data Product","description":"Read rows from a specific data product. Returns up to `limit` rows (default 500). The data product must exist in the catalog (use get_data_products() to discover available products). Use this to retrieve actual data from a known data product. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact fully-qualified name of the data product (e.g. '\"materialize\".\"schema\".\"view_name\"')"},"limit":{"type":"integer","description":"Maximum number of rows to return (default 500)","default":500},"cluster":{"type":"string","description":"Optional cluster override. If omitted, uses the cluster from the data product catalog."}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}}]}} +{"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"get_data_products","title":"List Data Products","description":"Discover all available real-time data views (data products) that represent business entities like customers, orders, products, etc. Each data product provides fresh, queryable data with defined schemas. Use this first to see what data is available before querying specific information.","inputSchema":{"type":"object","properties":{},"required":[]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"get_data_product_details","title":"Get Data Product Details","description":"Get the complete schema and structure of a specific data product. This shows you exactly what fields are available, their types, and what data you can query. Use this after finding a data product from get_data_products() to understand how to query it.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact name of the data product from get_data_products() list"}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"read_data_product","title":"Read Data Product","description":"Read rows from a specific data product. Returns up to `limit` rows (default 500). The data product must exist in the catalog (use get_data_products() to discover available products). Use this to retrieve actual data from a known data product. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact fully-qualified name of the data product (e.g. '\"materialize\".\"schema\".\"view_name\"')"},"limit":{"type":"integer","description":"Maximum number of rows to return (default 500)","default":500},"cluster":{"type":"string","description":"Optional override. By default, the read runs on the cluster recorded in the data product catalog (where the index or materialized view dataflow lives), so indexed reads actually hit their arrangement. Set this only to intentionally run the same read on a different cluster — e.g. one with more or larger replicas, or to compare cost/latency."}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}}]}} # ============================================================================= # Notifications (no id = no response body expected, just 200) @@ -114,7 +114,7 @@ mcp-agent {"jsonrpc":"2.0","id":"my-request-id","method":"tools/list"} ---- 200 OK -{"jsonrpc":"2.0","id":"my-request-id","result":{"tools":[{"name":"get_data_products","title":"List Data Products","description":"Discover all available real-time data views (data products) that represent business entities like customers, orders, products, etc. Each data product provides fresh, queryable data with defined schemas. Use this first to see what data is available before querying specific information.","inputSchema":{"type":"object","properties":{},"required":[]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"get_data_product_details","title":"Get Data Product Details","description":"Get the complete schema and structure of a specific data product. This shows you exactly what fields are available, their types, and what data you can query. Use this after finding a data product from get_data_products() to understand how to query it.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact name of the data product from get_data_products() list"}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"read_data_product","title":"Read Data Product","description":"Read rows from a specific data product. Returns up to `limit` rows (default 500). The data product must exist in the catalog (use get_data_products() to discover available products). Use this to retrieve actual data from a known data product. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact fully-qualified name of the data product (e.g. '\"materialize\".\"schema\".\"view_name\"')"},"limit":{"type":"integer","description":"Maximum number of rows to return (default 500)","default":500},"cluster":{"type":"string","description":"Optional cluster override. If omitted, uses the cluster from the data product catalog."}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}}]}} +{"jsonrpc":"2.0","id":"my-request-id","result":{"tools":[{"name":"get_data_products","title":"List Data Products","description":"Discover all available real-time data views (data products) that represent business entities like customers, orders, products, etc. Each data product provides fresh, queryable data with defined schemas. Use this first to see what data is available before querying specific information.","inputSchema":{"type":"object","properties":{},"required":[]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"get_data_product_details","title":"Get Data Product Details","description":"Get the complete schema and structure of a specific data product. This shows you exactly what fields are available, their types, and what data you can query. Use this after finding a data product from get_data_products() to understand how to query it.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact name of the data product from get_data_products() list"}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"read_data_product","title":"Read Data Product","description":"Read rows from a specific data product. Returns up to `limit` rows (default 500). The data product must exist in the catalog (use get_data_products() to discover available products). Use this to retrieve actual data from a known data product. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact fully-qualified name of the data product (e.g. '\"materialize\".\"schema\".\"view_name\"')"},"limit":{"type":"integer","description":"Maximum number of rows to return (default 500)","default":500},"cluster":{"type":"string","description":"Optional override. By default, the read runs on the cluster recorded in the data product catalog (where the index or materialized view dataflow lives), so indexed reads actually hit their arrangement. Set this only to intentionally run the same read on a different cluster — e.g. one with more or larger replicas, or to compare cost/latency."}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}}]}} # Null id is treated as a notification by serde (Option deserializes null as None). mcp-agent diff --git a/src/environmentd/tests/testdata/mcp/agent_query_tool b/src/environmentd/tests/testdata/mcp/agent_query_tool index 0a25ee91953fd..b1c66fe684045 100644 --- a/src/environmentd/tests/testdata/mcp/agent_query_tool +++ b/src/environmentd/tests/testdata/mcp/agent_query_tool @@ -16,7 +16,7 @@ mcp-agent {"jsonrpc":"2.0","id":1,"method":"tools/list"} ---- 200 OK -{"jsonrpc":"2.0","id":1,"result":{"tools":[{"name":"get_data_products","title":"List Data Products","description":"Discover all available real-time data views (data products) that represent business entities like customers, orders, products, etc. Each data product provides fresh, queryable data with defined schemas. Use this first to see what data is available before querying specific information.","inputSchema":{"type":"object","properties":{},"required":[]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"get_data_product_details","title":"Get Data Product Details","description":"Get the complete schema and structure of a specific data product. This shows you exactly what fields are available, their types, and what data you can query. Use this after finding a data product from get_data_products() to understand how to query it.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact name of the data product from get_data_products() list"}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"read_data_product","title":"Read Data Product","description":"Read rows from a specific data product. Returns up to `limit` rows (default 500). The data product must exist in the catalog (use get_data_products() to discover available products). Use this to retrieve actual data from a known data product. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact fully-qualified name of the data product (e.g. '\"materialize\".\"schema\".\"view_name\"')"},"limit":{"type":"integer","description":"Maximum number of rows to return (default 500)","default":500},"cluster":{"type":"string","description":"Optional cluster override. If omitted, uses the cluster from the data product catalog."}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"query","title":"Query Data Products","description":"Execute SQL queries against real-time data products to retrieve current business information. Use standard PostgreSQL syntax. You can JOIN multiple data products together, but ONLY if they are all hosted on the same cluster. Always specify the cluster parameter from the data product details. This provides fresh, up-to-date results from materialized views. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"cluster":{"type":"string","description":"Exact cluster name from the data product details - required for query execution"},"sql_query":{"type":"string","description":"PostgreSQL-compatible SELECT statement to retrieve data. Use the fully qualified data product name exactly as provided (with double quotes). You can JOIN multiple data products, but only those on the same cluster."}},"required":["cluster","sql_query"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}}]}} +{"jsonrpc":"2.0","id":1,"result":{"tools":[{"name":"get_data_products","title":"List Data Products","description":"Discover all available real-time data views (data products) that represent business entities like customers, orders, products, etc. Each data product provides fresh, queryable data with defined schemas. Use this first to see what data is available before querying specific information.","inputSchema":{"type":"object","properties":{},"required":[]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"get_data_product_details","title":"Get Data Product Details","description":"Get the complete schema and structure of a specific data product. This shows you exactly what fields are available, their types, and what data you can query. Use this after finding a data product from get_data_products() to understand how to query it.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact name of the data product from get_data_products() list"}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"read_data_product","title":"Read Data Product","description":"Read rows from a specific data product. Returns up to `limit` rows (default 500). The data product must exist in the catalog (use get_data_products() to discover available products). Use this to retrieve actual data from a known data product. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact fully-qualified name of the data product (e.g. '\"materialize\".\"schema\".\"view_name\"')"},"limit":{"type":"integer","description":"Maximum number of rows to return (default 500)","default":500},"cluster":{"type":"string","description":"Optional override. By default, the read runs on the cluster recorded in the data product catalog (where the index or materialized view dataflow lives), so indexed reads actually hit their arrangement. Set this only to intentionally run the same read on a different cluster — e.g. one with more or larger replicas, or to compare cost/latency."}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"query","title":"Query Data Products","description":"Execute SQL queries against real-time data products to retrieve current business information. Use standard PostgreSQL syntax. You can JOIN multiple data products together, but ONLY if they are all hosted on the same cluster. Always specify the cluster parameter from the data product details. This provides fresh, up-to-date results from materialized views. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"cluster":{"type":"string","description":"Exact cluster name from the data product details - required for query execution"},"sql_query":{"type":"string","description":"PostgreSQL-compatible SELECT statement to retrieve data. Use the fully qualified data product name exactly as provided (with double quotes). You can JOIN multiple data products, but only those on the same cluster."}},"required":["cluster","sql_query"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}}]}} # ============================================================================= # query tool - valid SELECT queries diff --git a/test/mcp/mzcompose.py b/test/mcp/mzcompose.py index 7981b4e9c37d7..23d5292922be5 100644 --- a/test/mcp/mzcompose.py +++ b/test/mcp/mzcompose.py @@ -9,6 +9,9 @@ """End-to-end tests for the MCP (Model Context Protocol) HTTP endpoints.""" +import json +import time + import requests from materialize import MZ_ROOT @@ -352,6 +355,102 @@ def workflow_default(c: Composition) -> None: r = post_mcp(c, "developer", jsonrpc("tools/list")) assert r.status_code == 200 + # -- DEX-27: read_data_product auto-routes to the catalog cluster --------- + # + # Verifies that when an MV lives on a cluster other than the session's + # default, `read_data_product` (called without a `cluster` argument) + # transparently issues `SET CLUSTER` to the data product's home cluster + # so the read actually hits the index/MV's dataflow. We confirm this by + # checking `mz_internal.mz_recent_activity_log` for the SELECT we + # issued and asserting it ran on the off-default cluster. + + with c.test_case("agent_read_data_product_auto_routes_cluster"): + # Provision the off-default cluster + MV + grants for the HTTP user. + c.sql( + """ + DROP MATERIALIZED VIEW IF EXISTS public.dex27_routed_mv; + DROP CLUSTER IF EXISTS dex27_other CASCADE; + CREATE CLUSTER dex27_other REPLICAS (r1 (SIZE 'scale=1,workers=1')); + CREATE MATERIALIZED VIEW public.dex27_routed_mv IN CLUSTER dex27_other + AS SELECT 7::int AS id, 'routed'::text AS name; + GRANT USAGE ON CLUSTER dex27_other TO anonymous_http_user; + GRANT SELECT ON public.dex27_routed_mv TO anonymous_http_user; + -- Make sure statement logging fires for our read so we can + -- inspect mz_recent_activity_log without flakiness. + ALTER SYSTEM SET statement_logging_default_sample_rate = 1.0; + ALTER SYSTEM SET statement_logging_max_sample_rate = 1.0; + """, + user="mz_system", + port=6877, + print_statement=False, + ) + + # First touch the agent endpoint so `anonymous_http_user` is + # auto-provisioned, then call `read_data_product` with NO cluster + # argument — the server should route to `dex27_other` based on the + # catalog row, not the session-default `quickstart`. + post_mcp( + c, + "agent", + jsonrpc( + "initialize", + { + "protocolVersion": "2025-11-25", + "capabilities": {}, + "clientInfo": {"name": "dex27", "version": "0.1.0"}, + }, + req_id=2700, + ), + ) + r = post_mcp( + c, + "agent", + jsonrpc( + "tools/call", + { + "name": "read_data_product", + "arguments": { + "name": '"materialize"."public"."dex27_routed_mv"', + "limit": 5, + }, + }, + req_id=2701, + ), + ) + assert r.status_code == 200, f"unexpected status: {r.status_code} {r.text}" + body = r.json() + assert "error" not in body, f"read_data_product errored: {body}" + rows = json.loads(body["result"]["content"][0]["text"]) + assert rows == [["7", "routed"]], f"unexpected rows: {rows}" + + # Now confirm via the activity log that the read ran on + # `dex27_other`, not the session default. The log is emitted + # asynchronously, so poll briefly. + deadline = time.monotonic() + 30 + observed_cluster: str | None = None + while time.monotonic() < deadline: + rows = c.sql_query( + """ + SELECT cluster_name + FROM mz_internal.mz_recent_activity_log + WHERE application_name = 'mz_mcp_agents' + AND sql ILIKE '%dex27_routed_mv%' + AND finished_status = 'success' + ORDER BY began_at DESC + LIMIT 1 + """, + user="mz_system", + port=6877, + ) + if rows: + observed_cluster = rows[0][0] + break + time.sleep(0.5) + assert observed_cluster == "dex27_other", ( + "no-override read should auto-route to the data product's cluster " + f"(dex27_other), but activity log shows cluster_name = {observed_cluster!r}" + ) + # -- agent: disable/enable via flag ---------------------------------------- with c.test_case("agent_disable_via_flag"):