From a46559cc94f2fc3096231fe68f37891b8b3b21b3 Mon Sep 17 00:00:00 2001 From: bobbyiliev Date: Tue, 19 May 2026 12:51:54 +0300 Subject: [PATCH 1/3] mcp: surface data product hydration readiness via mz_mcp_data_product_details --- .../reference/system-catalog/mz_internal.md | 5 +- src/catalog/src/builtin/mz_internal.rs | 52 +++++- src/environmentd/src/http/mcp.rs | 15 +- src/environmentd/tests/server.rs | 57 +++++++ src/environmentd/tests/testdata/mcp/agent | 6 +- .../tests/testdata/mcp/agent_query_tool | 2 +- test/mcp/mzcompose.py | 159 ++++++++++++++++++ .../autogenerated/mz_internal.slt | 1 + 8 files changed, 288 insertions(+), 9 deletions(-) diff --git a/doc/user/content/reference/system-catalog/mz_internal.md b/doc/user/content/reference/system-catalog/mz_internal.md index 229f6bd710b45..d4ec308133ff3 100644 --- a/doc/user/content/reference/system-catalog/mz_internal.md +++ b/doc/user/content/reference/system-catalog/mz_internal.md @@ -561,7 +561,9 @@ schema information. ## `mz_mcp_data_product_details` The `mz_mcp_data_product_details` view extends [`mz_mcp_data_products`](#mz_mcp_data_products) -with a JSON Schema describing each data product's columns and types. +with a JSON Schema describing each data product's columns and types, and a +readiness summary that lets agents tell "still warming up" apart from +"genuinely empty." | Field | Type | Meaning | @@ -570,6 +572,7 @@ with a JSON Schema describing each data product's columns and types. | `cluster` | [`text`] | Cluster where the object computes or its index is hosted. Reads from any cluster work, but only reads on this cluster benefit from the index. | | `description` | [`text`] | Index comment if available, otherwise object comment. Used as data product description. | | `schema` | [`jsonb`]| JSON Schema describing the object's columns and types. | +| `hydration` | [`jsonb`]| Readiness summary as a JSON object with `hydrated` (bool), `replica_count` (int), and `hydrated_replica_count` (int). `hydrated` is true only when the cluster has at least one replica and the dataflow is hydrated on every replica. Agents should back off and retry when `hydrated` is false rather than treating an empty read as final. | ## `mz_object_dependencies` diff --git a/src/catalog/src/builtin/mz_internal.rs b/src/catalog/src/builtin/mz_internal.rs index 1f9050d8f1629..95fa0677f2b7a 100644 --- a/src/catalog/src/builtin/mz_internal.rs +++ b/src/catalog/src/builtin/mz_internal.rs @@ -5323,6 +5323,7 @@ pub static MZ_MCP_DATA_PRODUCT_DETAILS: LazyLock = LazyLock::new(|| .with_column("cluster", SqlScalarType::String.nullable(true)) .with_column("description", SqlScalarType::String.nullable(true)) .with_column("schema", SqlScalarType::Jsonb.nullable(false)) + .with_column("hydration", SqlScalarType::Jsonb.nullable(false)) .with_key(vec![0, 1, 2]) .finish(), column_comments: BTreeMap::from_iter([ @@ -5342,9 +5343,13 @@ pub static MZ_MCP_DATA_PRODUCT_DETAILS: LazyLock = LazyLock::new(|| "schema", "JSON Schema describing the object's columns and types.", ), + ( + "hydration", + "Readiness summary as a JSON object with `hydrated` (bool), `replica_count` (int), and `hydrated_replica_count` (int). `hydrated` is true only when the cluster has at least one replica and the dataflow is hydrated on every replica. Agents should back off and retry when `hydrated` is false rather than treating an empty read as final.", + ), ]), sql: r#" -SELECT * FROM ( +WITH details_raw AS ( SELECT '"' || op.database || '"."' || op.schema || '"."' || op.name || '"' AS object_name, COALESCE(c_idx.name, c_obj.name) AS cluster, @@ -5420,7 +5425,52 @@ WHERE op.privilege_type = 'SELECT' AND (o.type = 'materialized-view' OR (o.type = 'view' AND i.id IS NOT NULL)) AND s.name NOT IN ('mz_catalog', 'mz_internal', 'pg_catalog', 'information_schema', 'mz_introspection') GROUP BY 1, 2, 3 +), +-- Pick the right (object_id, cluster_id) for hydration: the index's id + +-- cluster when an index exists (its arrangement is what the data product +-- reads from), otherwise the materialized view's own id + cluster. +hydration_meta AS ( + SELECT DISTINCT + '"' || db.name || '"."' || s.name || '"."' || o.name || '"' AS object_name, + COALESCE(c_idx.name, c_obj.name) AS cluster, + COALESCE(i.id, o.id) AS hydration_object_id, + COALESCE(i.cluster_id, o.cluster_id) AS cluster_id + FROM mz_objects o + JOIN mz_schemas s ON s.id = o.schema_id + JOIN mz_databases db ON db.id = s.database_id + LEFT JOIN mz_indexes i ON i.on_id = o.id + LEFT JOIN mz_clusters c_idx ON c_idx.id = i.cluster_id + LEFT JOIN mz_clusters c_obj ON c_obj.id = o.cluster_id + WHERE (o.type = 'materialized-view' OR (o.type = 'view' AND i.id IS NOT NULL)) + AND s.name NOT IN ('mz_catalog', 'mz_internal', 'pg_catalog', 'information_schema', 'mz_introspection') +), +hydration AS ( + SELECT + m.object_name, + m.cluster, + COUNT(r.id)::int AS replica_count, + COUNT(*) FILTER (WHERE COALESCE(h.hydrated, false))::int AS hydrated_replica_count + FROM hydration_meta m + LEFT JOIN mz_catalog.mz_cluster_replicas r ON r.cluster_id = m.cluster_id + LEFT JOIN mz_internal.mz_hydration_statuses h + ON h.replica_id = r.id AND h.object_id = m.hydration_object_id + GROUP BY m.object_name, m.cluster ) +SELECT + d.object_name, + d.cluster, + d.description, + d.schema, + jsonb_build_object( + 'hydrated', + COALESCE(h.replica_count > 0 AND h.hydrated_replica_count = h.replica_count, false), + 'replica_count', COALESCE(h.replica_count, 0), + 'hydrated_replica_count', COALESCE(h.hydrated_replica_count, 0) + ) AS hydration +FROM details_raw d +LEFT JOIN hydration h + ON h.object_name = d.object_name + AND h.cluster IS NOT DISTINCT FROM d.cluster "#, access: vec![PUBLIC_SELECT], ontology: None, diff --git a/src/environmentd/src/http/mcp.rs b/src/environmentd/src/http/mcp.rs index fd2f63bc4982a..5ede733e2d637 100644 --- a/src/environmentd/src/http/mcp.rs +++ b/src/environmentd/src/http/mcp.rs @@ -68,7 +68,11 @@ const MCP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60); // Discovery uses the lightweight view (no JSON schema computation). const DISCOVERY_QUERY: &str = "SELECT * FROM mz_internal.mz_mcp_data_products"; -// Details uses the full view with JSON schema. +// Details uses the full view, which also exposes a `hydration` JSON column +// (`{hydrated, replica_count, hydrated_replica_count}`) so agents can decide +// whether to back off and retry vs treat empty reads as final. See DEX-30 +// and the comments on `mz_mcp_data_product_details` in +// `src/catalog/src/builtin/mz_internal.rs`. const DETAILS_QUERY_PREFIX: &str = "SELECT * FROM mz_internal.mz_mcp_data_product_details WHERE object_name = "; @@ -657,7 +661,12 @@ fn endpoint_instructions(endpoint_type: McpEndpointType) -> Option { "Prefer indexed objects (served from memory) over unindexed materialized views ", "(read from persistent storage). Indexes are cluster-local; if a data product's ", "cluster differs from your session, pass the `cluster` parameter to `read_data_product` ", - "so the index is actually used.", + "so the index is actually used. ", + "`get_data_product_details` returns a `hydration` object with `hydrated`, ", + "`replica_count`, and `hydrated_replica_count` fields: if `hydrated` is false, ", + "the dataflow is still warming up. Back off and retry rather than looping on ", + "empty results — an empty answer from a not-yet-hydrated product is not the ", + "same as a genuinely empty result.", ).to_string()), McpEndpointType::Developer => Some(concat!( "You are connected to the Materialize developer MCP server. ", @@ -714,7 +723,7 @@ async fn handle_tools_list( ToolDefinition { name: "get_data_product_details".to_string(), title: Some("Get Data Product Details".to_string()), - description: "Get the complete schema and structure of a specific data product. This shows you exactly what fields are available, their types, and what data you can query. Use this after finding a data product from get_data_products() to understand how to query it.".to_string(), + description: "Get the complete schema and structure of a specific data product, plus a `hydration` object reporting whether the dataflow is fully hydrated across the cluster's replicas (`{hydrated, replica_count, hydrated_replica_count}`). This shows you exactly what fields are available, their types, what data you can query, and whether the data product is ready to serve fresh results. Use this after finding a data product from get_data_products() to understand how to query it; if `hydrated` is false, back off and retry rather than treating empty reads as final.".to_string(), input_schema: json!({ "type": "object", "properties": { diff --git a/src/environmentd/tests/server.rs b/src/environmentd/tests/server.rs index 3ba3dbfa08c8c..002767096f7b9 100644 --- a/src/environmentd/tests/server.rs +++ b/src/environmentd/tests/server.rs @@ -5547,6 +5547,53 @@ fn test_mcp_agent_with_data_product() { assert!(body["result"]["content"][0]["text"].as_str().is_some()); assert!(body["error"].is_null()); + // The response should expose a `hydration` field per row (5th cell) so + // agents can decide whether to back off or treat empty reads as final. + // See DEX-30. For an MV that's had time to hydrate on a single-replica + // `quickstart` cluster, expect `hydrated: true` with 1/1 replicas. + let rows_text = body["result"]["content"][0]["text"].as_str().unwrap(); + let rows: serde_json::Value = serde_json::from_str(rows_text).unwrap(); + let rows = rows.as_array().expect("details should return rows"); + assert!(!rows.is_empty(), "details should return at least one row"); + for row in rows { + let row = row.as_array().expect("each row should be an array"); + assert_eq!( + row.len(), + 5, + "each details row should have 5 cells (object_name, cluster, description, schema, hydration), got: {:?}", + row, + ); + let hydration = &row[4]; + assert!( + hydration.is_object(), + "hydration cell should be a JSON object, got: {hydration}", + ); + assert!( + hydration.get("hydrated").is_some_and(|v| v.is_boolean()), + "hydration.hydrated should be a bool, got: {hydration}", + ); + let replica_count = hydration + .get("replica_count") + .and_then(|v| v.as_i64()) + .unwrap_or_else(|| { + panic!("hydration.replica_count should be an int, got: {hydration}") + }); + let hydrated_replica_count = hydration + .get("hydrated_replica_count") + .and_then(|v| v.as_i64()) + .unwrap_or_else(|| { + panic!("hydration.hydrated_replica_count should be an int, got: {hydration}") + }); + assert!( + replica_count >= 0 && hydrated_replica_count >= 0, + "replica counts must be non-negative, got: {hydration}", + ); + assert!( + hydrated_replica_count <= replica_count, + "hydrated_replica_count ({hydrated_replica_count}) cannot exceed replica_count ({replica_count}): {hydration}", + ); + } + // get_data_product_details should also resolve the indexed view, proving // the filter change is applied consistently to mz_mcp_data_product_details. let indexed_view_name = find_product("test_indexed_view").as_array().unwrap()[0] @@ -5571,6 +5618,16 @@ fn test_mcp_agent_with_data_product() { "indexed view should be resolvable via get_data_product_details, got: {body}" ); assert!(body["result"]["content"][0]["text"].as_str().is_some()); + // Indexed view should also report a hydration object. + let rows_text = body["result"]["content"][0]["text"].as_str().unwrap(); + let rows: serde_json::Value = serde_json::from_str(rows_text).unwrap(); + let rows = rows.as_array().expect("details should return rows"); + assert!(!rows.is_empty()); + for row in rows { + let row = row.as_array().expect("each row should be an array"); + assert_eq!(row.len(), 5, "row should include hydration cell: {row:?}"); + assert!(row[4].is_object(), "hydration cell should be an object"); + } // read_data_product should return the row from the view. let (status, body) = mcp_post( diff --git a/src/environmentd/tests/testdata/mcp/agent b/src/environmentd/tests/testdata/mcp/agent index 81221cae24beb..21d60b7692310 100644 --- a/src/environmentd/tests/testdata/mcp/agent +++ b/src/environmentd/tests/testdata/mcp/agent @@ -16,7 +16,7 @@ mcp-agent {"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-11-25","capabilities":{},"clientInfo":{"name":"test-client","version":"1.0.0"}}} ---- 200 OK -{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-11-25","capabilities":{"tools":{}},"serverInfo":{"name":"materialize-mcp-agent","version":""},"instructions":"You have access to Materialize data products via MCP. Prefer indexed objects (served from memory) over unindexed materialized views (read from persistent storage). Indexes are cluster-local; if a data product's cluster differs from your session, pass the `cluster` parameter to `read_data_product` so the index is actually used."}} +{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-11-25","capabilities":{"tools":{}},"serverInfo":{"name":"materialize-mcp-agent","version":""},"instructions":"You have access to Materialize data products via MCP. Prefer indexed objects (served from memory) over unindexed materialized views (read from persistent storage). Indexes are cluster-local; if a data product's cluster differs from your session, pass the `cluster` parameter to `read_data_product` so the index is actually used. `get_data_product_details` returns a `hydration` object with `hydrated`, `replica_count`, and `hydrated_replica_count` fields: if `hydrated` is false, the dataflow is still warming up. Back off and retry rather than looping on empty results — an empty answer from a not-yet-hydrated product is not the same as a genuinely empty result."}} # ============================================================================= # Tools List (query tool disabled by default) @@ -27,7 +27,7 @@ mcp-agent {"jsonrpc":"2.0","id":2,"method":"tools/list"} ---- 200 OK -{"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"get_data_products","title":"List Data Products","description":"Discover all available real-time data views (data products) that represent business entities like customers, orders, products, etc. Each data product provides fresh, queryable data with defined schemas. Use this first to see what data is available before querying specific information.","inputSchema":{"type":"object","properties":{},"required":[]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"get_data_product_details","title":"Get Data Product Details","description":"Get the complete schema and structure of a specific data product. This shows you exactly what fields are available, their types, and what data you can query. Use this after finding a data product from get_data_products() to understand how to query it.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact name of the data product from get_data_products() list"}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"read_data_product","title":"Read Data Product","description":"Read rows from a specific data product. Returns up to `limit` rows (default 500). The data product must exist in the catalog (use get_data_products() to discover available products). Use this to retrieve actual data from a known data product. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact fully-qualified name of the data product (e.g. '\"materialize\".\"schema\".\"view_name\"')"},"limit":{"type":"integer","description":"Maximum number of rows to return (default 500)","default":500},"cluster":{"type":"string","description":"Optional cluster override. If omitted, uses the cluster from the data product catalog."}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}}]}} +{"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"get_data_products","title":"List Data Products","description":"Discover all available real-time data views (data products) that represent business entities like customers, orders, products, etc. Each data product provides fresh, queryable data with defined schemas. Use this first to see what data is available before querying specific information.","inputSchema":{"type":"object","properties":{},"required":[]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"get_data_product_details","title":"Get Data Product Details","description":"Get the complete schema and structure of a specific data product, plus a `hydration` object reporting whether the dataflow is fully hydrated across the cluster's replicas (`{hydrated, replica_count, hydrated_replica_count}`). This shows you exactly what fields are available, their types, what data you can query, and whether the data product is ready to serve fresh results. Use this after finding a data product from get_data_products() to understand how to query it; if `hydrated` is false, back off and retry rather than treating empty reads as final.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact name of the data product from get_data_products() list"}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"read_data_product","title":"Read Data Product","description":"Read rows from a specific data product. Returns up to `limit` rows (default 500). The data product must exist in the catalog (use get_data_products() to discover available products). Use this to retrieve actual data from a known data product. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact fully-qualified name of the data product (e.g. '\"materialize\".\"schema\".\"view_name\"')"},"limit":{"type":"integer","description":"Maximum number of rows to return (default 500)","default":500},"cluster":{"type":"string","description":"Optional cluster override. If omitted, uses the cluster from the data product catalog."}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}}]}} # ============================================================================= # Notifications (no id = no response body expected, just 200) @@ -114,7 +114,7 @@ mcp-agent {"jsonrpc":"2.0","id":"my-request-id","method":"tools/list"} ---- 200 OK -{"jsonrpc":"2.0","id":"my-request-id","result":{"tools":[{"name":"get_data_products","title":"List Data Products","description":"Discover all available real-time data views (data products) that represent business entities like customers, orders, products, etc. Each data product provides fresh, queryable data with defined schemas. Use this first to see what data is available before querying specific information.","inputSchema":{"type":"object","properties":{},"required":[]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"get_data_product_details","title":"Get Data Product Details","description":"Get the complete schema and structure of a specific data product. This shows you exactly what fields are available, their types, and what data you can query. Use this after finding a data product from get_data_products() to understand how to query it.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact name of the data product from get_data_products() list"}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"read_data_product","title":"Read Data Product","description":"Read rows from a specific data product. Returns up to `limit` rows (default 500). The data product must exist in the catalog (use get_data_products() to discover available products). Use this to retrieve actual data from a known data product. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact fully-qualified name of the data product (e.g. '\"materialize\".\"schema\".\"view_name\"')"},"limit":{"type":"integer","description":"Maximum number of rows to return (default 500)","default":500},"cluster":{"type":"string","description":"Optional cluster override. If omitted, uses the cluster from the data product catalog."}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}}]}} +{"jsonrpc":"2.0","id":"my-request-id","result":{"tools":[{"name":"get_data_products","title":"List Data Products","description":"Discover all available real-time data views (data products) that represent business entities like customers, orders, products, etc. Each data product provides fresh, queryable data with defined schemas. Use this first to see what data is available before querying specific information.","inputSchema":{"type":"object","properties":{},"required":[]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"get_data_product_details","title":"Get Data Product Details","description":"Get the complete schema and structure of a specific data product, plus a `hydration` object reporting whether the dataflow is fully hydrated across the cluster's replicas (`{hydrated, replica_count, hydrated_replica_count}`). This shows you exactly what fields are available, their types, what data you can query, and whether the data product is ready to serve fresh results. Use this after finding a data product from get_data_products() to understand how to query it; if `hydrated` is false, back off and retry rather than treating empty reads as final.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact name of the data product from get_data_products() list"}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"read_data_product","title":"Read Data Product","description":"Read rows from a specific data product. Returns up to `limit` rows (default 500). The data product must exist in the catalog (use get_data_products() to discover available products). Use this to retrieve actual data from a known data product. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact fully-qualified name of the data product (e.g. '\"materialize\".\"schema\".\"view_name\"')"},"limit":{"type":"integer","description":"Maximum number of rows to return (default 500)","default":500},"cluster":{"type":"string","description":"Optional cluster override. If omitted, uses the cluster from the data product catalog."}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}}]}} # Null id is treated as a notification by serde (Option deserializes null as None). mcp-agent diff --git a/src/environmentd/tests/testdata/mcp/agent_query_tool b/src/environmentd/tests/testdata/mcp/agent_query_tool index 0a25ee91953fd..d46e258d1d9ca 100644 --- a/src/environmentd/tests/testdata/mcp/agent_query_tool +++ b/src/environmentd/tests/testdata/mcp/agent_query_tool @@ -16,7 +16,7 @@ mcp-agent {"jsonrpc":"2.0","id":1,"method":"tools/list"} ---- 200 OK -{"jsonrpc":"2.0","id":1,"result":{"tools":[{"name":"get_data_products","title":"List Data Products","description":"Discover all available real-time data views (data products) that represent business entities like customers, orders, products, etc. Each data product provides fresh, queryable data with defined schemas. Use this first to see what data is available before querying specific information.","inputSchema":{"type":"object","properties":{},"required":[]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"get_data_product_details","title":"Get Data Product Details","description":"Get the complete schema and structure of a specific data product. This shows you exactly what fields are available, their types, and what data you can query. Use this after finding a data product from get_data_products() to understand how to query it.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact name of the data product from get_data_products() list"}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"read_data_product","title":"Read Data Product","description":"Read rows from a specific data product. Returns up to `limit` rows (default 500). The data product must exist in the catalog (use get_data_products() to discover available products). Use this to retrieve actual data from a known data product. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact fully-qualified name of the data product (e.g. '\"materialize\".\"schema\".\"view_name\"')"},"limit":{"type":"integer","description":"Maximum number of rows to return (default 500)","default":500},"cluster":{"type":"string","description":"Optional cluster override. If omitted, uses the cluster from the data product catalog."}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"query","title":"Query Data Products","description":"Execute SQL queries against real-time data products to retrieve current business information. Use standard PostgreSQL syntax. You can JOIN multiple data products together, but ONLY if they are all hosted on the same cluster. Always specify the cluster parameter from the data product details. This provides fresh, up-to-date results from materialized views. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"cluster":{"type":"string","description":"Exact cluster name from the data product details - required for query execution"},"sql_query":{"type":"string","description":"PostgreSQL-compatible SELECT statement to retrieve data. Use the fully qualified data product name exactly as provided (with double quotes). You can JOIN multiple data products, but only those on the same cluster."}},"required":["cluster","sql_query"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}}]}} +{"jsonrpc":"2.0","id":1,"result":{"tools":[{"name":"get_data_products","title":"List Data Products","description":"Discover all available real-time data views (data products) that represent business entities like customers, orders, products, etc. Each data product provides fresh, queryable data with defined schemas. Use this first to see what data is available before querying specific information.","inputSchema":{"type":"object","properties":{},"required":[]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"get_data_product_details","title":"Get Data Product Details","description":"Get the complete schema and structure of a specific data product, plus a `hydration` object reporting whether the dataflow is fully hydrated across the cluster's replicas (`{hydrated, replica_count, hydrated_replica_count}`). This shows you exactly what fields are available, their types, what data you can query, and whether the data product is ready to serve fresh results. Use this after finding a data product from get_data_products() to understand how to query it; if `hydrated` is false, back off and retry rather than treating empty reads as final.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact name of the data product from get_data_products() list"}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"read_data_product","title":"Read Data Product","description":"Read rows from a specific data product. Returns up to `limit` rows (default 500). The data product must exist in the catalog (use get_data_products() to discover available products). Use this to retrieve actual data from a known data product. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"name":{"type":"string","description":"Exact fully-qualified name of the data product (e.g. '\"materialize\".\"schema\".\"view_name\"')"},"limit":{"type":"integer","description":"Maximum number of rows to return (default 500)","default":500},"cluster":{"type":"string","description":"Optional cluster override. If omitted, uses the cluster from the data product catalog."}},"required":["name"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}},{"name":"query","title":"Query Data Products","description":"Execute SQL queries against real-time data products to retrieve current business information. Use standard PostgreSQL syntax. You can JOIN multiple data products together, but ONLY if they are all hosted on the same cluster. Always specify the cluster parameter from the data product details. This provides fresh, up-to-date results from materialized views. Response limit: 1 MB.","inputSchema":{"type":"object","properties":{"cluster":{"type":"string","description":"Exact cluster name from the data product details - required for query execution"},"sql_query":{"type":"string","description":"PostgreSQL-compatible SELECT statement to retrieve data. Use the fully qualified data product name exactly as provided (with double quotes). You can JOIN multiple data products, but only those on the same cluster."}},"required":["cluster","sql_query"]},"annotations":{"readOnlyHint":true,"destructiveHint":false,"idempotentHint":true,"openWorldHint":false}}]}} # ============================================================================= # query tool - valid SELECT queries diff --git a/test/mcp/mzcompose.py b/test/mcp/mzcompose.py index 7981b4e9c37d7..8075f6e352428 100644 --- a/test/mcp/mzcompose.py +++ b/test/mcp/mzcompose.py @@ -9,6 +9,9 @@ """End-to-end tests for the MCP (Model Context Protocol) HTTP endpoints.""" +import json +import time + import requests from materialize import MZ_ROOT @@ -380,3 +383,159 @@ def workflow_default(c: Composition) -> None: r = post_mcp(c, "agent", jsonrpc("tools/list")) assert r.status_code == 200 + + # -- hydration: end-to-end coverage for DEX-30 ---------------------------- + # + # Verifies that `mz_internal.mz_mcp_data_product_details` exposes a + # `hydration` JSON column that the agent endpoint surfaces through + # `get_data_product_details`, so agents can distinguish "still warming + # up" from "genuinely empty." Two scenarios: + # + # 1. MV on `quickstart` (1 replica) → hydrated=true, 1/1 replicas. + # 2. MV on a cluster with zero replicas → hydrated=false, 0/0. + # + # The HTTP user that the MCP server runs as on this no-auth listener is + # `anonymous_http_user`; it gets auto-provisioned on the first MCP + # request, so we provision it eagerly with an `initialize` call and then + # grant the necessary privileges as `mz_system`. + + # First touch the agent endpoint so `anonymous_http_user` exists as a role + # and we can GRANT to it. + post_mcp( + c, + "agent", + jsonrpc( + "initialize", + { + "protocolVersion": "2025-11-25", + "capabilities": {}, + "clientInfo": {"name": "setup", "version": "0.1.0"}, + }, + req_id=999, + ), + ) + + c.sql( + """ + GRANT USAGE ON CLUSTER quickstart TO anonymous_http_user; + GRANT USAGE ON DATABASE materialize TO anonymous_http_user; + GRANT USAGE, CREATE ON SCHEMA materialize.public TO anonymous_http_user; + """, + user="mz_system", + port=6877, + print_statement=False, + ) + + def hydration_for(object_name: str) -> dict: + """Calls `get_data_product_details` and returns the parsed hydration + object from the first row. Asserts the row has the documented 5-cell + shape and that the hydration cell carries the three required keys.""" + r = post_mcp( + c, + "agent", + jsonrpc( + "tools/call", + { + "name": "get_data_product_details", + "arguments": {"name": object_name}, + }, + ), + ) + assert r.status_code == 200, f"unexpected status: {r.status_code} {r.text}" + body = r.json() + assert "error" not in body, f"unexpected error response: {body}" + rows = json.loads(body["result"]["content"][0]["text"]) + assert rows, f"expected at least one details row, got: {rows}" + row = rows[0] + assert ( + len(row) == 5 + ), f"details row should have 5 cells (object_name, cluster, description, schema, hydration), got: {row}" + hydration = row[4] + assert isinstance(hydration, dict), f"hydration should be a dict: {hydration}" + for key in ("hydrated", "replica_count", "hydrated_replica_count"): + assert key in hydration, f"hydration missing `{key}`: {hydration}" + return hydration + + with c.test_case("agent_get_data_product_details_hydrated"): + c.sql( + """ + DROP MATERIALIZED VIEW IF EXISTS public.test_hydration_mv; + CREATE MATERIALIZED VIEW public.test_hydration_mv IN CLUSTER quickstart + AS SELECT 1::int AS id, 'widget'::text AS name; + GRANT SELECT ON public.test_hydration_mv TO anonymous_http_user; + """, + user="mz_system", + port=6877, + print_statement=False, + ) + + # The MV runs on `quickstart`, which has a single ready replica, so + # hydration should converge almost immediately. Poll briefly to + # avoid a race with hydration-status updates. + object_name = '"materialize"."public"."test_hydration_mv"' + deadline = time.monotonic() + 30 + last: dict = {} + while time.monotonic() < deadline: + last = hydration_for(object_name) + if last["hydrated"]: + break + time.sleep(0.5) + assert ( + last.get("hydrated") is True + ), f"expected hydrated=true within 30s, last: {last}" + assert ( + last["replica_count"] == last["hydrated_replica_count"] + ), f"counts should match when hydrated: {last}" + assert last["replica_count"] >= 1, f"quickstart has a replica: {last}" + + with c.test_case("agent_get_data_product_details_zero_replicas"): + # A cluster with no replicas can't hydrate anything, so `hydrated` + # must be false with 0/0 counts. This is the canary case for the + # `replica_count > 0` guard in the view's `hydrated` expression. + c.sql( + """ + DROP MATERIALIZED VIEW IF EXISTS public.test_hydration_empty_mv; + DROP CLUSTER IF EXISTS test_hydration_empty; + CREATE CLUSTER test_hydration_empty REPLICAS (); + CREATE MATERIALIZED VIEW public.test_hydration_empty_mv + IN CLUSTER test_hydration_empty + AS SELECT 1::int AS id; + GRANT USAGE ON CLUSTER test_hydration_empty TO anonymous_http_user; + GRANT SELECT ON public.test_hydration_empty_mv TO anonymous_http_user; + """, + user="mz_system", + port=6877, + print_statement=False, + ) + + hydration = hydration_for( + '"materialize"."public"."test_hydration_empty_mv"', + ) + assert hydration == { + "hydrated": False, + "replica_count": 0, + "hydrated_replica_count": 0, + }, f"expected zero-replica hydration, got: {hydration}" + + with c.test_case("agent_mcp_data_product_details_view_sql"): + # The hydration column should also be queryable directly via SQL, + # not just through MCP. This locks in the catalog surface so users + # and dashboards can build on it independently of the MCP server. + rows = c.sql_query( + """ + SELECT object_name, hydration + FROM mz_internal.mz_mcp_data_product_details + WHERE object_name = '"materialize"."public"."test_hydration_mv"' + """, + ) + assert rows, f"expected at least one row, got: {rows}" + _, hydration = rows[0] + # psycopg decodes jsonb to a Python dict. + assert isinstance(hydration, dict), f"hydration should be dict: {hydration!r}" + assert hydration.get("hydrated") is True, hydration + assert hydration["replica_count"] == hydration["hydrated_replica_count"] + assert set(hydration.keys()) == { + "hydrated", + "replica_count", + "hydrated_replica_count", + }, f"unexpected keys in hydration object: {hydration}" diff --git a/test/sqllogictest/autogenerated/mz_internal.slt b/test/sqllogictest/autogenerated/mz_internal.slt index 568cd9f5fd7e7..b2281e1df389a 100644 --- a/test/sqllogictest/autogenerated/mz_internal.slt +++ b/test/sqllogictest/autogenerated/mz_internal.slt @@ -327,6 +327,7 @@ object_name text Fully␠qualified␠object␠name␠(database.schema.name). cluster text Cluster␠where␠the␠object␠computes␠or␠its␠index␠is␠hosted.␠Reads␠from␠any␠cluster␠work,␠but␠only␠reads␠on␠this␠cluster␠benefit␠from␠the␠index. description text Index␠comment␠if␠available,␠otherwise␠object␠comment.␠Used␠as␠data␠product␠description. schema jsonb JSON␠Schema␠describing␠the␠object's␠columns␠and␠types. +hydration jsonb Readiness␠summary␠as␠a␠JSON␠object␠with␠`hydrated`␠(bool),␠`replica_count`␠(int),␠and␠`hydrated_replica_count`␠(int).␠`hydrated`␠is␠true␠only␠when␠the␠cluster␠has␠at␠least␠one␠replica␠and␠the␠dataflow␠is␠hydrated␠on␠every␠replica.␠Agents␠should␠back␠off␠and␠retry␠when␠`hydrated`␠is␠false␠rather␠than␠treating␠an␠empty␠read␠as␠final. query TTT SELECT name, type, comment FROM objects WHERE schema = 'mz_internal' AND object = 'mz_object_dependencies' ORDER BY position From b39cda61b47375daad523676b66137e0793057f8 Mon Sep 17 00:00:00 2001 From: bobbyiliev Date: Tue, 19 May 2026 13:15:44 +0300 Subject: [PATCH 2/3] mcp: drop unprovable key from mz_mcp_data_product_details RelationDesc --- src/catalog/src/builtin/mz_internal.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/catalog/src/builtin/mz_internal.rs b/src/catalog/src/builtin/mz_internal.rs index 95fa0677f2b7a..32a0f6b68e671 100644 --- a/src/catalog/src/builtin/mz_internal.rs +++ b/src/catalog/src/builtin/mz_internal.rs @@ -5318,13 +5318,18 @@ pub static MZ_MCP_DATA_PRODUCT_DETAILS: LazyLock = LazyLock::new(|| name: "mz_mcp_data_product_details", schema: MZ_INTERNAL_SCHEMA, oid: oid::VIEW_MZ_MCP_DATA_PRODUCT_DETAILS_OID, + // Note: no `.with_key` here. The view's row identity is semantically + // (object_name, cluster, description) — same as the underlying details + // CTE — but the planner can't prove key propagation through the + // `LEFT JOIN ... ON ... IS NOT DISTINCT FROM` to the hydration CTE, + // so declaring it here would diverge from the inferred RelationDesc + // and fail `verify_builtin_descs`. desc: RelationDesc::builder() .with_column("object_name", SqlScalarType::String.nullable(false)) .with_column("cluster", SqlScalarType::String.nullable(true)) .with_column("description", SqlScalarType::String.nullable(true)) .with_column("schema", SqlScalarType::Jsonb.nullable(false)) .with_column("hydration", SqlScalarType::Jsonb.nullable(false)) - .with_key(vec![0, 1, 2]) .finish(), column_comments: BTreeMap::from_iter([ ( From 4e4754f0362a504f82ac52246b5f38b98ca02008 Mon Sep 17 00:00:00 2001 From: bobbyiliev Date: Tue, 19 May 2026 14:10:31 +0300 Subject: [PATCH 3/3] mcp: grant SELECT to materialize user in mzcompose hydration test so the SQL-level case sees the MV --- test/mcp/mzcompose.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/mcp/mzcompose.py b/test/mcp/mzcompose.py index 8075f6e352428..52b8e013ec6f9 100644 --- a/test/mcp/mzcompose.py +++ b/test/mcp/mzcompose.py @@ -457,12 +457,18 @@ def hydration_for(object_name: str) -> dict: return hydration with c.test_case("agent_get_data_product_details_hydrated"): + # Grant SELECT to both `anonymous_http_user` (the MCP server's + # session user on this no-auth listener) and `materialize` (the + # default user for `c.sql_query`, used by the SQL-level test + # below). `mz_mcp_data_product_details` filters by + # `mz_show_my_object_privileges`, which is per-user. c.sql( """ DROP MATERIALIZED VIEW IF EXISTS public.test_hydration_mv; CREATE MATERIALIZED VIEW public.test_hydration_mv IN CLUSTER quickstart AS SELECT 1::int AS id, 'widget'::text AS name; GRANT SELECT ON public.test_hydration_mv TO anonymous_http_user; + GRANT SELECT ON public.test_hydration_mv TO materialize; """, user="mz_system", port=6877,