Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion doc/user/content/reference/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,9 @@ schema information.
## `mz_mcp_data_product_details`

The `mz_mcp_data_product_details` view extends [`mz_mcp_data_products`](#mz_mcp_data_products)
with a JSON Schema describing each data product's columns and types.
with a JSON Schema describing each data product's columns and types, and a
readiness summary that lets agents tell "still warming up" apart from
"genuinely empty."

<!-- RELATION_SPEC mz_internal.mz_mcp_data_product_details -->
| Field | Type | Meaning |
Expand All @@ -570,6 +572,7 @@ with a JSON Schema describing each data product's columns and types.
| `cluster` | [`text`] | Cluster where the object computes or its index is hosted. Reads from any cluster work, but only reads on this cluster benefit from the index. |
| `description` | [`text`] | Index comment if available, otherwise object comment. Used as data product description. |
| `schema` | [`jsonb`]| JSON Schema describing the object's columns and types. |
| `hydration` | [`jsonb`]| Readiness summary as a JSON object with `hydrated` (bool), `replica_count` (int), and `hydrated_replica_count` (int). `hydrated` is true only when the cluster has at least one replica and the dataflow is hydrated on every replica. Agents should back off and retry when `hydrated` is false rather than treating an empty read as final. |

## `mz_object_dependencies`

Expand Down
59 changes: 57 additions & 2 deletions src/catalog/src/builtin/mz_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5318,12 +5318,18 @@ pub static MZ_MCP_DATA_PRODUCT_DETAILS: LazyLock<BuiltinView> = LazyLock::new(||
name: "mz_mcp_data_product_details",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::VIEW_MZ_MCP_DATA_PRODUCT_DETAILS_OID,
// Note: no `.with_key` here. The view's row identity is semantically
// (object_name, cluster, description) — same as the underlying details
// CTE — but the planner can't prove key propagation through the
// `LEFT JOIN ... ON ... IS NOT DISTINCT FROM` to the hydration CTE,
// so declaring it here would diverge from the inferred RelationDesc
// and fail `verify_builtin_descs`.
desc: RelationDesc::builder()
.with_column("object_name", SqlScalarType::String.nullable(false))
.with_column("cluster", SqlScalarType::String.nullable(true))
.with_column("description", SqlScalarType::String.nullable(true))
.with_column("schema", SqlScalarType::Jsonb.nullable(false))
.with_key(vec![0, 1, 2])
.with_column("hydration", SqlScalarType::Jsonb.nullable(false))
.finish(),
column_comments: BTreeMap::from_iter([
(
Expand All @@ -5342,9 +5348,13 @@ pub static MZ_MCP_DATA_PRODUCT_DETAILS: LazyLock<BuiltinView> = LazyLock::new(||
"schema",
"JSON Schema describing the object's columns and types.",
),
(
"hydration",
"Readiness summary as a JSON object with `hydrated` (bool), `replica_count` (int), and `hydrated_replica_count` (int). `hydrated` is true only when the cluster has at least one replica and the dataflow is hydrated on every replica. Agents should back off and retry when `hydrated` is false rather than treating an empty read as final.",
),
]),
sql: r#"
SELECT * FROM (
WITH details_raw AS (
SELECT
'"' || op.database || '"."' || op.schema || '"."' || op.name || '"' AS object_name,
COALESCE(c_idx.name, c_obj.name) AS cluster,
Expand Down Expand Up @@ -5420,7 +5430,52 @@ WHERE op.privilege_type = 'SELECT'
AND (o.type = 'materialized-view' OR (o.type = 'view' AND i.id IS NOT NULL))
AND s.name NOT IN ('mz_catalog', 'mz_internal', 'pg_catalog', 'information_schema', 'mz_introspection')
GROUP BY 1, 2, 3
),
-- Pick the right (object_id, cluster_id) for hydration: the index's id +
-- cluster when an index exists (its arrangement is what the data product
-- reads from), otherwise the materialized view's own id + cluster.
hydration_meta AS (
SELECT DISTINCT
'"' || db.name || '"."' || s.name || '"."' || o.name || '"' AS object_name,
COALESCE(c_idx.name, c_obj.name) AS cluster,
COALESCE(i.id, o.id) AS hydration_object_id,
COALESCE(i.cluster_id, o.cluster_id) AS cluster_id
FROM mz_objects o
JOIN mz_schemas s ON s.id = o.schema_id
JOIN mz_databases db ON db.id = s.database_id
LEFT JOIN mz_indexes i ON i.on_id = o.id
LEFT JOIN mz_clusters c_idx ON c_idx.id = i.cluster_id
LEFT JOIN mz_clusters c_obj ON c_obj.id = o.cluster_id
WHERE (o.type = 'materialized-view' OR (o.type = 'view' AND i.id IS NOT NULL))
AND s.name NOT IN ('mz_catalog', 'mz_internal', 'pg_catalog', 'information_schema', 'mz_introspection')
),
hydration AS (
SELECT
m.object_name,
m.cluster,
COUNT(r.id)::int AS replica_count,
COUNT(*) FILTER (WHERE COALESCE(h.hydrated, false))::int AS hydrated_replica_count
FROM hydration_meta m
LEFT JOIN mz_catalog.mz_cluster_replicas r ON r.cluster_id = m.cluster_id
LEFT JOIN mz_internal.mz_hydration_statuses h
ON h.replica_id = r.id AND h.object_id = m.hydration_object_id
GROUP BY m.object_name, m.cluster
)
SELECT
d.object_name,
d.cluster,
d.description,
d.schema,
jsonb_build_object(
'hydrated',
COALESCE(h.replica_count > 0 AND h.hydrated_replica_count = h.replica_count, false),
'replica_count', COALESCE(h.replica_count, 0),
'hydrated_replica_count', COALESCE(h.hydrated_replica_count, 0)
) AS hydration
FROM details_raw d
LEFT JOIN hydration h
ON h.object_name = d.object_name
AND h.cluster IS NOT DISTINCT FROM d.cluster
"#,
access: vec![PUBLIC_SELECT],
ontology: None,
Expand Down
15 changes: 12 additions & 3 deletions src/environmentd/src/http/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ const MCP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);

// Discovery uses the lightweight view (no JSON schema computation).
const DISCOVERY_QUERY: &str = "SELECT * FROM mz_internal.mz_mcp_data_products";
// Details uses the full view with JSON schema.
// Details uses the full view, which also exposes a `hydration` JSON column
// (`{hydrated, replica_count, hydrated_replica_count}`) so agents can decide
// whether to back off and retry vs treat empty reads as final. See DEX-30
// and the comments on `mz_mcp_data_product_details` in
// `src/catalog/src/builtin/mz_internal.rs`.
const DETAILS_QUERY_PREFIX: &str =
"SELECT * FROM mz_internal.mz_mcp_data_product_details WHERE object_name = ";

Expand Down Expand Up @@ -657,7 +661,12 @@ fn endpoint_instructions(endpoint_type: McpEndpointType) -> Option<String> {
"Prefer indexed objects (served from memory) over unindexed materialized views ",
"(read from persistent storage). Indexes are cluster-local; if a data product's ",
"cluster differs from your session, pass the `cluster` parameter to `read_data_product` ",
"so the index is actually used.",
"so the index is actually used. ",
"`get_data_product_details` returns a `hydration` object with `hydrated`, ",
"`replica_count`, and `hydrated_replica_count` fields: if `hydrated` is false, ",
"the dataflow is still warming up. Back off and retry rather than looping on ",
"empty results — an empty answer from a not-yet-hydrated product is not the ",
"same as a genuinely empty result.",
).to_string()),
McpEndpointType::Developer => Some(concat!(
"You are connected to the Materialize developer MCP server. ",
Expand Down Expand Up @@ -714,7 +723,7 @@ async fn handle_tools_list(
ToolDefinition {
name: "get_data_product_details".to_string(),
title: Some("Get Data Product Details".to_string()),
description: "Get the complete schema and structure of a specific data product. This shows you exactly what fields are available, their types, and what data you can query. Use this after finding a data product from get_data_products() to understand how to query it.".to_string(),
description: "Get the complete schema and structure of a specific data product, plus a `hydration` object reporting whether the dataflow is fully hydrated across the cluster's replicas (`{hydrated, replica_count, hydrated_replica_count}`). This shows you exactly what fields are available, their types, what data you can query, and whether the data product is ready to serve fresh results. Use this after finding a data product from get_data_products() to understand how to query it; if `hydrated` is false, back off and retry rather than treating empty reads as final.".to_string(),
input_schema: json!({
"type": "object",
"properties": {
Expand Down
57 changes: 57 additions & 0 deletions src/environmentd/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5547,6 +5547,53 @@ fn test_mcp_agent_with_data_product() {
assert!(body["result"]["content"][0]["text"].as_str().is_some());
assert!(body["error"].is_null());

// The response should expose a `hydration` field per row (5th cell) so
// agents can decide whether to back off or treat empty reads as final.
// See DEX-30. For an MV that's had time to hydrate on a single-replica
// `quickstart` cluster, expect `hydrated: true` with 1/1 replicas.
let rows_text = body["result"]["content"][0]["text"].as_str().unwrap();
let rows: serde_json::Value = serde_json::from_str(rows_text).unwrap();
let rows = rows.as_array().expect("details should return rows");
assert!(!rows.is_empty(), "details should return at least one row");
for row in rows {
let row = row.as_array().expect("each row should be an array");
assert_eq!(
row.len(),
5,
"each details row should have 5 cells (object_name, cluster, description, schema, hydration), got: {:?}",
row,
);
let hydration = &row[4];
assert!(
hydration.is_object(),
"hydration cell should be a JSON object, got: {hydration}",
);
assert!(
hydration.get("hydrated").is_some_and(|v| v.is_boolean()),
"hydration.hydrated should be a bool, got: {hydration}",
);
let replica_count = hydration
.get("replica_count")
.and_then(|v| v.as_i64())
.unwrap_or_else(|| {
panic!("hydration.replica_count should be an int, got: {hydration}")
});
let hydrated_replica_count = hydration
.get("hydrated_replica_count")
.and_then(|v| v.as_i64())
.unwrap_or_else(|| {
panic!("hydration.hydrated_replica_count should be an int, got: {hydration}")
});
assert!(
replica_count >= 0 && hydrated_replica_count >= 0,
"replica counts must be non-negative, got: {hydration}",
);
assert!(
hydrated_replica_count <= replica_count,
"hydrated_replica_count ({hydrated_replica_count}) cannot exceed replica_count ({replica_count}): {hydration}",
);
}

// get_data_product_details should also resolve the indexed view, proving
// the filter change is applied consistently to mz_mcp_data_product_details.
let indexed_view_name = find_product("test_indexed_view").as_array().unwrap()[0]
Expand All @@ -5571,6 +5618,16 @@ fn test_mcp_agent_with_data_product() {
"indexed view should be resolvable via get_data_product_details, got: {body}"
);
assert!(body["result"]["content"][0]["text"].as_str().is_some());
// Indexed view should also report a hydration object.
let rows_text = body["result"]["content"][0]["text"].as_str().unwrap();
let rows: serde_json::Value = serde_json::from_str(rows_text).unwrap();
let rows = rows.as_array().expect("details should return rows");
assert!(!rows.is_empty());
for row in rows {
let row = row.as_array().expect("each row should be an array");
assert_eq!(row.len(), 5, "row should include hydration cell: {row:?}");
assert!(row[4].is_object(), "hydration cell should be an object");
}

// read_data_product should return the row from the view.
let (status, body) = mcp_post(
Expand Down
Loading
Loading