Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 127 additions & 23 deletions src/environmentd/src/http/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,10 @@ fn endpoint_instructions(endpoint_type: McpEndpointType) -> Option<String> {
McpEndpointType::Agent => Some(concat!(
"You have access to Materialize data products via MCP. ",
"Prefer indexed objects (served from memory) over unindexed materialized views ",
"(read from persistent storage). Indexes are cluster-local; if a data product's ",
"cluster differs from your session, pass the `cluster` parameter to `read_data_product` ",
"so the index is actually used.",
"(read from persistent storage). `read_data_product` automatically routes the ",
"read to the cluster recorded in the data product catalog so indexes are used; ",
"you only need to set the `cluster` parameter if you intentionally want the ",
"read to run on a different cluster (e.g. one with larger or more replicas).",
).to_string()),
McpEndpointType::Developer => Some(concat!(
"You are connected to the Materialize developer MCP server. ",
Expand Down Expand Up @@ -745,7 +746,7 @@ async fn handle_tools_list(
},
"cluster": {
"type": "string",
"description": "Optional cluster override. If omitted, uses the cluster from the data product catalog."
"description": "Optional override. By default, the read runs on the cluster recorded in the data product catalog (where the index or materialized view dataflow lives), so indexed reads actually hit their arrangement. Set this only to intentionally run the same read on a different cluster — e.g. one with more or larger replicas, or to compare cost/latency."
}
},
"required": ["name"]
Expand Down Expand Up @@ -981,11 +982,21 @@ fn safe_data_product_name(name: &str) -> Result<String, McpRequestError> {

/// Read rows from a data product. Issues a single read-only query.
///
/// When `cluster_override` is provided, sets the cluster explicitly.
/// Otherwise the query runs on the session's default cluster.
/// By default the read is routed to the cluster recorded in the data product
/// catalog (`mz_mcp_data_products.cluster`), so reads of indexed views
/// actually hit the index's in-memory arrangement instead of falling back to
/// a full recompute through persist on whatever cluster the session happens
/// to default to. `cluster_override` bypasses that routing and runs the
/// same read on a named cluster instead — useful for running the read on a
/// differently-sized or differently-replicated cluster.
///
/// The name is expected to come from `get_data_products()` / `get_data_product_details()`.
/// The query runs inside a READ ONLY transaction, preventing mutations.
/// If both the override and the catalog cluster are absent (the catalog
/// cluster column is unexpectedly null), the query falls back to the
/// session's default cluster.
///
/// The name is expected to come from `get_data_products()` /
/// `get_data_product_details()`. The query runs inside a READ ONLY
/// transaction, preventing mutations.
async fn read_data_product(
client: &mut AuthedClient,
name: &str,
Expand All @@ -998,40 +1009,69 @@ async fn read_data_product(
// Parse and safely quote the name for SQL interpolation.
let safe_name = safe_data_product_name(name)?;

// Lightweight existence check: verify the data product is visible in the
// catalog before running the read query. This gives a clean DataProductNotFound
// error for missing or inaccessible products (including RBAC revocations)
// without relying on fragile error code matching.
// TODO: Remove this extra round-trip once catalog errors get specific SQL
// error codes (see TODO in src/adapter/src/error.rs `fn code()`), then we
// can translate the query error directly.
// Lookup query: serves as the existence check (we still error
// `DataProductNotFound` on an empty result) and at the same time
// recovers the catalog cluster for auto-routing. `mz_mcp_data_products`
// can hold multiple rows per object_name when a product is indexed on
// multiple clusters; `ORDER BY cluster NULLS LAST LIMIT 1` picks one
// deterministically. Callers can disambiguate via `cluster_override`.
//
// TODO: Remove this extra round-trip once catalog errors get specific
// SQL error codes (see TODO in src/adapter/src/error.rs `fn code()`),
// then we can translate the query error directly and drop the lookup.
let lookup_query = format!(
"SELECT 1 FROM mz_internal.mz_mcp_data_products WHERE object_name = {}",
"SELECT cluster FROM mz_internal.mz_mcp_data_products \
WHERE object_name = {} \
ORDER BY cluster NULLS LAST \
LIMIT 1",
escaped_string_literal(name)
);
let lookup_rows = execute_sql(client, &lookup_query).await?;
if lookup_rows.is_empty() {
return Err(McpRequestError::DataProductNotFound(name.to_string()));
}
let catalog_cluster: Option<&str> = lookup_rows
.first()
.and_then(|row| row.first())
.and_then(|v| v.as_str());

// Override beats catalog; catalog beats session default. The override
// path is unchanged from before, so explicit callers are not affected
// by the auto-routing change.
let target_cluster = cluster_override.or(catalog_cluster);

// No row cap is applied here: the response is bounded by the size cap
// enforced in format_rows_response (MCP_MAX_RESPONSE_SIZE), and by
// max_result_size at the adapter layer. Mirrors the SQL HTTP endpoint,
// which also leans on a size cap rather than a row cap.
let read_query = match cluster_override {
let read_query = build_read_query(&safe_name, limit, target_cluster);

let rows = execute_sql(client, &read_query).await?;

format_rows_response(rows, max_response_size)
}

/// Builds the SQL the agent runs for `read_data_product`.
///
/// `safe_name` must already be the validated, quoted form produced by
/// [`safe_data_product_name`]. `target_cluster`, when provided, is escaped
/// as a SQL string literal and wrapped in `SET CLUSTER` inside a `BEGIN
/// READ ONLY` transaction so the cluster choice is scoped to this read and
/// does not leak into the session.
///
/// When `target_cluster` is `None` we emit a single bare `SELECT` instead
/// of opening a transaction, which avoids three extra round trips on the
/// hot path of session-default reads.
fn build_read_query(safe_name: &str, limit: u32, target_cluster: Option<&str>) -> String {
match target_cluster {
Some(cluster) => format!(
"BEGIN READ ONLY; SET CLUSTER = {}; SELECT * FROM {} LIMIT {}\n; COMMIT;",
escaped_string_literal(cluster),
safe_name,
limit,
),
// Single statement — skip explicit transaction for better performance.
None => format!("SELECT * FROM {} LIMIT {}", safe_name, limit),
};

let rows = execute_sql(client, &read_query).await?;

format_rows_response(rows, max_response_size)
}
}

/// Validates query is a single SELECT, SHOW, or EXPLAIN statement.
Expand Down Expand Up @@ -1797,6 +1837,70 @@ mod tests {
assert!(safe_data_product_name("my_view WHERE 1=1 --").is_err());
}

// ── build_read_query tests (DEX-27) ────────────────────────────────

/// Without a target cluster, the read is a single bare `SELECT` so
/// session-default reads avoid the three extra round trips that
/// `BEGIN READ ONLY; SET CLUSTER; ...; COMMIT;` would cost.
#[mz_ore::test]
fn test_build_read_query_no_cluster() {
let sql = build_read_query("\"db\".\"sch\".\"v\"", 100, None);
assert_eq!(sql, "SELECT * FROM \"db\".\"sch\".\"v\" LIMIT 100");
assert!(
!sql.contains("SET CLUSTER"),
"session-default reads should not emit SET CLUSTER: {sql}",
);
assert!(
!sql.contains("BEGIN"),
"session-default reads should not open a transaction: {sql}",
);
}

/// With a target cluster, the read is wrapped in a `BEGIN READ ONLY`
/// transaction so the `SET CLUSTER` scope is bounded to this read and
/// does not leak into the rest of the session.
#[mz_ore::test]
fn test_build_read_query_with_cluster() {
let sql = build_read_query("\"db\".\"sch\".\"v\"", 50, Some("prod_cluster"));
assert!(sql.contains("BEGIN READ ONLY"), "{sql}");
assert!(sql.contains("SET CLUSTER = 'prod_cluster'"), "{sql}");
assert!(
sql.contains("SELECT * FROM \"db\".\"sch\".\"v\" LIMIT 50"),
"{sql}",
);
assert!(sql.contains("COMMIT"), "{sql}");
}

/// Cluster names with single quotes and backslashes must be escaped
/// the same way as the `name` argument, since they end up interpolated
/// into a SQL string literal. Defends against catalog-injection or
/// adversarial cluster names.
#[mz_ore::test]
fn test_build_read_query_escapes_cluster_name() {
let sql = build_read_query(
"\"db\".\"sch\".\"v\"",
10,
Some("evil'; DROP TABLE secrets; --"),
);
// The single quote in `evil'` must be doubled inside the literal.
assert!(
sql.contains("SET CLUSTER = 'evil''; DROP TABLE secrets; --'"),
"single quote should be doubled inside the literal: {sql}",
);
// The injected `DROP TABLE` must remain inside the literal — i.e.
// there is no second statement that escapes the SET CLUSTER call.
assert_eq!(
sql.matches("SET CLUSTER").count(),
1,
"exactly one SET CLUSTER statement: {sql}",
);
assert_eq!(
sql.matches("DROP TABLE").count(),
1,
"DROP TABLE should appear once, inside the quoted literal: {sql}",
);
}

#[mz_ore::test]
fn test_mcp_error_codes() {
assert_eq!(
Expand Down
61 changes: 61 additions & 0 deletions src/environmentd/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5613,6 +5613,67 @@ fn test_mcp_agent_with_data_product() {
let rows: serde_json::Value = serde_json::from_str(rows_text).unwrap();
assert_eq!(rows.as_array().unwrap().len(), 1);

// DEX-27: an MV whose home cluster is NOT the HTTP session's default
// (`quickstart`) must still be readable without the caller passing a
// `cluster` argument. Before the auto-routing change the read would
// execute against `quickstart`, missing the index. Reading without an
// override must still succeed and return the row.
{
let mut super_user = server
.pg_config_internal()
.user(&SYSTEM_USER.name)
.connect(postgres::NoTls)
.unwrap();
super_user
.batch_execute(
"CREATE CLUSTER dex27_other_cluster REPLICAS (r1 (SIZE 'scale=1,workers=1'))",
)
.unwrap();
super_user
.batch_execute(
"CREATE MATERIALIZED VIEW test_off_default IN CLUSTER dex27_other_cluster \
AS SELECT 42::int AS id, 'off_default'::text AS name",
)
.unwrap();
super_user
.batch_execute(&format!(
"GRANT SELECT ON test_off_default TO {}",
&HTTP_DEFAULT_USER.name
))
.unwrap();
super_user
.batch_execute(&format!(
"GRANT USAGE ON CLUSTER dex27_other_cluster TO {}",
&HTTP_DEFAULT_USER.name
))
.unwrap();
}
// The MV was created after the cached `get_data_products` snapshot, so
// build the qualified name directly rather than re-querying.
let off_default_name = r#""materialize"."public"."test_off_default""#.to_string();
let (status, body) = mcp_post(
&agents_url,
serde_json::json!({
"jsonrpc": "2.0",
"id": 41,
"method": "tools/call",
"params": {
"name": "read_data_product",
"arguments": {"name": off_default_name, "limit": 10}
}
}),
);
assert_eq!(status, StatusCode::OK);
assert!(
body["error"].is_null(),
"no-override read on off-default cluster should auto-route, got: {body}",
);
let rows_text = body["result"]["content"][0]["text"].as_str().unwrap();
let rows: serde_json::Value = serde_json::from_str(rows_text).unwrap();
assert_eq!(rows.as_array().unwrap().len(), 1);
assert_eq!(rows[0][0].as_str().unwrap(), "42");
assert_eq!(rows[0][1].as_str().unwrap(), "off_default");

// read_data_product with limit 0 should return no rows.
let (status, body) = mcp_post(
&agents_url,
Expand Down
Loading
Loading