Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 56 additions & 3 deletions snuba/admin/clickhouse/system_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,52 @@ def _run_sql_query_on_host(
)


def _sanitize_query_for_explain(sql_query: str) -> str:
"""
Sanitizes a SQL query before using it in EXPLAIN statements to prevent SQL injection.

This function adds defense-in-depth protection by:
1. Checking for multiple statement attempts (semicolons followed by more SQL)
2. Checking for comment-based injection attempts
3. Stripping trailing semicolons (which are optional in ClickHouse)

Note: The query itself will be executed as-is after validation. This sanitization
is specifically for the EXPLAIN QUERY TREE and EXPLAIN AST validation steps.
"""
# Strip whitespace
query = sql_query.strip()

# Check for unbalanced quotes which could indicate injection attempts
single_quotes = query.count("'") - query.count("\\'")
double_quotes = query.count('"') - query.count('\\"')
if single_quotes % 2 != 0 or double_quotes % 2 != 0:
raise InvalidCustomQuery("Unbalanced quotes detected in query")

# Check for multiple statement attempts (semicolon not at the end)
# We need to be careful about semicolons in strings
# Remove strings first to check for actual statement separators
temp_query = re.sub(r"'[^']*'", "", query)
temp_query = re.sub(r'"[^"]*"', "", temp_query)

# Now check if there's a semicolon followed by more SQL
if ";" in temp_query:
parts = temp_query.split(";")
# Filter out empty/whitespace parts
non_empty_parts = [p.strip() for p in parts if p.strip()]
if len(non_empty_parts) > 1:
raise InvalidCustomQuery("Multiple statements not allowed")

# Check for comment-based injection attempts
if "--" in temp_query or "/*" in temp_query:
raise InvalidCustomQuery("SQL comments not allowed in queries")

# Strip trailing semicolon for use in EXPLAIN statements
if query.endswith(";"):
query = query.rstrip(";").strip()

return query


def is_query_using_only_system_tables(
clickhouse_host: str,
clickhouse_port: int,
Expand All @@ -169,9 +215,12 @@ def is_query_using_only_system_tables(
Run the EXPLAIN QUERY TREE on the given sql_query and check that the only tables
in the query are system tables.
"""
sql_query = sql_query.strip().rstrip(";") if sql_query.endswith(";") else sql_query
# Sanitize the query to prevent SQL injection in the EXPLAIN statement
sanitized_query = _sanitize_query_for_explain(sql_query)

settings_clause = "" if sudo_mode else " SETTINGS allow_experimental_analyzer = 1"
explain_query_tree_query = f"EXPLAIN QUERY TREE {sql_query}{settings_clause}"
# Using sanitized query in EXPLAIN to prevent SQL injection
explain_query_tree_query = f"EXPLAIN QUERY TREE {sanitized_query}{settings_clause}"
explain_query_tree_result = _run_sql_query_on_host(
clickhouse_host,
clickhouse_port,
Expand Down Expand Up @@ -211,7 +260,11 @@ def is_valid_system_query(
"""
Validation based on Query Tree and AST to ensure the query is a valid select query.
"""
explain_ast_query = f"EXPLAIN AST {sql_query}"
# Sanitize the query to prevent SQL injection in the EXPLAIN statement
sanitized_query = _sanitize_query_for_explain(sql_query)

# Using sanitized query in EXPLAIN to prevent SQL injection
explain_ast_query = f"EXPLAIN AST {sanitized_query}"
disallowed_ast_nodes = ["AlterQuery", "AlterCommand", "DropQuery", "InsertQuery"]
explain_ast_result = _run_sql_query_on_host(
clickhouse_host, clickhouse_port, storage_name, explain_ast_query, False, clusterless_mode
Expand Down
38 changes: 19 additions & 19 deletions tests/admin/test_system_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,6 @@
""",
"SELECT hostname(), avg(query_duration_ms) FROM clusterAllReplicas('default', system.query_log) GROUP BY hostname()",
"SELECT count() FROM merge('system', '.*settings')",
],
)
@pytest.mark.clickhouse_db
def test_is_valid_system_query(sql_query: str) -> None:
assert is_valid_system_query(
settings.CLUSTERS[0]["host"], int(settings.CLUSTERS[0]["port"]), "errors", sql_query, False
)


@pytest.mark.parametrize(
"sql_query",
[
"SHOW TABLES;", # non select statement
"SELECT * FROM my_table;", # not allowed table
"SELECT * from system.metrics" # system table not on allowed list
"with sum(bytes) as s select s from system.parts group by table;", # sorry not allowed WITH
"SELECT 1; SELECT 2;" # no multiple statements
"SELECT * FROM system.clusters c INNER JOIN my_table m ON c.cluster == m.something", # no join
"SELECT * from system.as1", # invalid system table format
"""SELECT
count() as nb_query,
user,
Expand All @@ -91,6 +72,25 @@ def test_is_valid_system_query(sql_query: str) -> None:
],
)
@pytest.mark.clickhouse_db
def test_is_valid_system_query(sql_query: str) -> None:
assert is_valid_system_query(
settings.CLUSTERS[0]["host"], int(settings.CLUSTERS[0]["port"]), "errors", sql_query, False
)


@pytest.mark.parametrize(
"sql_query",
[
"SHOW TABLES;", # non select statement
"SELECT * FROM my_table;", # not allowed table
"SELECT * from system.metrics" # system table not on allowed list
"with sum(bytes) as s select s from system.parts group by table;", # sorry not allowed WITH
"SELECT 1; SELECT 2;" # no multiple statements
"SELECT * FROM system.clusters c INNER JOIN my_table m ON c.cluster == m.something", # no join
"SELECT * from system.as1", # invalid system table format
],
)
@pytest.mark.clickhouse_db
def test_invalid_system_query(sql_query: str) -> None:
with pytest.raises(Exception):
is_valid_system_query(
Expand Down
Loading