diff --git a/src/IO/S3/URI.cpp b/src/IO/S3/URI.cpp index 480160a86c5f..6f1c276ecf27 100644 --- a/src/IO/S3/URI.cpp +++ b/src/IO/S3/URI.cpp @@ -11,16 +11,17 @@ #include #include - namespace DB { struct URIConverter { - static void modifyURI(Poco::URI & uri, std::unordered_map mapper) + static void modifyURI(Poco::URI & uri, std::unordered_map mapper, bool enable_url_encoding) { Macros macros({{"bucket", uri.getHost()}}); - uri = macros.expand(mapper[uri.getScheme()]).empty() ? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery()); + uri = macros.expand(mapper[uri.getScheme()]).empty() + ? uri + : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery(), enable_url_encoding); } }; @@ -32,7 +33,7 @@ namespace ErrorCodes namespace S3 { -URI::URI(const std::string & uri_, bool allow_archive_path_syntax) +URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool enable_url_encoding) { /// Case when bucket name represented in domain name of S3 URL. /// E.g. (https://bucket-name.s3.region.amazonaws.com/key) @@ -54,7 +55,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax) else uri_str = uri_; - uri = Poco::URI(uri_str); + uri = Poco::URI(uri_str, enable_url_encoding); std::unordered_map mapper; auto context = Context::getGlobalContextInstance(); @@ -76,7 +77,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax) } if (!mapper.empty()) - URIConverter::modifyURI(uri, mapper); + URIConverter::modifyURI(uri, mapper, enable_url_encoding); } storage_name = "S3"; diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h index 8af05c177807..9bb9eec87e1e 100644 --- a/src/IO/S3/URI.h +++ b/src/IO/S3/URI.h @@ -36,7 +36,7 @@ struct URI bool is_virtual_hosted_style; URI() = default; - explicit URI(const std::string & uri_, bool allow_archive_path_syntax = false); + explicit URI(const std::string & uri_, bool allow_archive_path_syntax = false, bool enable_url_encoding = true); void addRegionToURI(const std::string & region); static void validateBucket(const std::string & bucket, const Poco::URI & uri); diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index c4629582ee28..c0b3b5174262 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -341,7 +341,8 @@ std::pair resolveObjectStorageForPath( { normalized_path = "s3://" + target_decomposed.authority + "/" + target_decomposed.key; } - S3::URI s3_uri(normalized_path); + // enable_url_encoding=false, path from metadata must have correct encoding already + S3::URI s3_uri(normalized_path, /*allow_archive_path_syntax*/ false, /*enable_url_encoding*/ false); std::string key_to_use = s3_uri.key; @@ -365,7 +366,7 @@ std::pair resolveObjectStorageForPath( { normalized_table_location = "s3://" + table_location_decomposed.authority + "/" + table_location_decomposed.key; } - S3::URI base_s3_uri(normalized_table_location); + S3::URI base_s3_uri(normalized_table_location, /*allow_archive_path_syntax*/ false, /*enable_url_encoding*/ false); if (s3URIMatches(s3_uri, base_s3_uri.bucket, base_s3_uri.endpoint, target_scheme_normalized)) use_base_storage = true; diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 021a7d78d36e..7ca0725a1cc7 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -5,7 +5,7 @@ import random import time import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, time as dtime import pyarrow as pa import pytest @@ -26,7 +26,8 @@ StringType, StructType, TimestampType, - TimestamptzType + TimestamptzType, + TimeType, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER @@ -939,3 +940,91 @@ def test_cluster_select(started_cluster): assert len(cluster_secondary_queries) == 1 assert node2.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`", settings={"parallel_replicas_for_cluster_engines":1, 'enable_parallel_replicas': 2, 'cluster_for_parallel_replicas': 'cluster_simple', 'parallel_replicas_for_cluster_engines' : 1}) == 'pablo\n' + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_partitioning_by_time(started_cluster, storage_type): + node = started_cluster.instances["node1"] + + test_ref = f"test_partitioning_by_time_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespace = f"{root_namespace}.A" + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(namespace) + + schema = Schema( + NestedField( + field_id=1, + name="key", + field_type=TimeType(), + required=False + ), + NestedField( + field_id=2, + name="value", + field_type=StringType(), + required=False, + ), + ) + + partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key" + ) + ) + + table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec) + data = [{"key": dtime(12,0,0), "value": "test"}] + df = pa.Table.from_pylist(data) + table.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + # Fix test when https://github.com/Altinity/ClickHouse/issues/15355 is resolved + # Must be 43200 + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "43200000000\ttest\n" + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_partitioning_by_string(started_cluster, storage_type): + node = started_cluster.instances["node1"] + + test_ref = f"test_partitioning_by_string_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespace = f"{root_namespace}.A" + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(namespace) + + schema = Schema( + NestedField( + field_id=1, + name="key", + field_type=StringType(), + required=False + ), + NestedField( + field_id=2, + name="value", + field_type=StringType(), + required=False, + ), + ) + + partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key" + ) + ) + + table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec) + data = [{"key": "a:b,c[d=e/f%g?h", "value": "test"}] + df = pa.Table.from_pylist(data) + table.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "a:b,c[d=e/f%g?h\ttest\n"