Skip to content

Commit 3d407b3

Browse files
committed
fix: query error when stream_consume_batch_size_hint is not 0
1 parent 6fbb28e commit 3d407b3

File tree

2 files changed

+50
-44
lines changed

2 files changed

+50
-44
lines changed

src/query/service/src/sessions/query_ctx.rs

Lines changed: 43 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,21 @@ impl QueryContext {
512512
table: &str,
513513
max_batch_size: Option<u64>,
514514
) -> Result<Arc<dyn Table>> {
515+
let max_batch_size = match max_batch_size {
516+
Some(v) => {
517+
// use the batch size specified in the statement
518+
Some(v)
519+
}
520+
None => {
521+
if let Some(v) = self.get_settings().get_stream_consume_batch_size_hint()? {
522+
info!("Overriding stream max_batch_size with setting value: {}", v);
523+
Some(v)
524+
} else {
525+
None
526+
}
527+
}
528+
};
529+
515530
let table = self
516531
.shared
517532
.get_table(catalog, database, table, max_batch_size)
@@ -1425,23 +1440,7 @@ impl TableContext for QueryContext {
14251440
database: &str,
14261441
table: &str,
14271442
) -> Result<Arc<dyn Table>> {
1428-
// Queries to non-internal system_history databases require license checks to be enabled.
1429-
if database.eq_ignore_ascii_case("system_history")
1430-
&& ThreadTracker::capture_log_settings().is_none()
1431-
{
1432-
LicenseManagerSwitch::instance()
1433-
.check_enterprise_enabled(self.get_license_key(), Feature::SystemHistory)?;
1434-
1435-
if GlobalConfig::instance().log.history.is_invisible(table) {
1436-
return Err(ErrorCode::InvalidArgument(format!(
1437-
"history table `{}` is configured as invisible",
1438-
table
1439-
)));
1440-
}
1441-
}
1442-
1443-
let batch_size = self.get_settings().get_stream_consume_batch_size_hint()?;
1444-
self.get_table_from_shared(catalog, database, table, batch_size)
1443+
self.get_table_with_batch(catalog, database, table, None)
14451444
.await
14461445
}
14471446

@@ -1457,41 +1456,41 @@ impl TableContext for QueryContext {
14571456
table: &str,
14581457
max_batch_size: Option<u64>,
14591458
) -> Result<Arc<dyn Table>> {
1460-
let max_batch_size = {
1461-
match max_batch_size {
1462-
Some(v) => {
1463-
// use the batch size specified in the statement
1464-
Some(v)
1465-
}
1466-
None => {
1467-
if let Some(v) = self.get_settings().get_stream_consume_batch_size_hint()? {
1468-
info!("Overriding stream max_batch_size with setting value: {}", v);
1469-
Some(v)
1470-
} else {
1471-
None
1472-
}
1473-
}
1459+
// Queries to non-internal system_history databases require license checks to be enabled.
1460+
if database.eq_ignore_ascii_case("system_history")
1461+
&& ThreadTracker::capture_log_settings().is_none()
1462+
{
1463+
LicenseManagerSwitch::instance()
1464+
.check_enterprise_enabled(self.get_license_key(), Feature::SystemHistory)?;
1465+
1466+
if GlobalConfig::instance().log.history.is_invisible(table) {
1467+
return Err(ErrorCode::InvalidArgument(format!(
1468+
"history table `{}` is configured as invisible",
1469+
table
1470+
)));
14741471
}
1475-
};
1472+
}
14761473

14771474
let table = self
14781475
.get_table_from_shared(catalog, database, table, max_batch_size)
14791476
.await?;
1480-
if table.is_stream() {
1481-
let stream = StreamTable::try_from_table(table.as_ref())?;
1482-
let actual_batch_limit = stream.max_batch_size();
1483-
if actual_batch_limit != max_batch_size {
1477+
if max_batch_size.is_some() {
1478+
if table.is_stream() {
1479+
let stream = StreamTable::try_from_table(table.as_ref())?;
1480+
let actual_batch_limit = stream.max_batch_size();
1481+
if actual_batch_limit != max_batch_size {
1482+
return Err(ErrorCode::StorageUnsupported(
1483+
format!(
1484+
"Stream batch size must be consistent within transaction: actual={:?}, requested={:?}",
1485+
actual_batch_limit, max_batch_size
1486+
)
1487+
));
1488+
}
1489+
} else {
14841490
return Err(ErrorCode::StorageUnsupported(
1485-
format!(
1486-
"Stream batch size must be consistent within transaction: actual={:?}, requested={:?}",
1487-
actual_batch_limit, max_batch_size
1488-
)
1491+
"MAX_BATCH_SIZE parameter only supported for STREAM tables",
14891492
));
14901493
}
1491-
} else if max_batch_size.is_some() {
1492-
return Err(ErrorCode::StorageUnsupported(
1493-
"MAX_BATCH_SIZE parameter only supported for STREAM tables",
1494-
));
14951494
}
14961495
Ok(table)
14971496
}

tests/sqllogictests/suites/ee/06_ee_stream/06_0006_stream_batch_limit.test

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,13 @@ select c from s_t_settings order by c;
325325
1
326326
2
327327

328+
query I
329+
select c from t_settings order by c;
330+
----
331+
1
332+
2
333+
3
334+
328335
#######################################################
329336
# max_batch_size specified in query has high priority #
330337
#######################################################

0 commit comments

Comments
 (0)