diff --git a/src/IO/S3/URI.cpp b/src/IO/S3/URI.cpp index 480160a86c5f..6f1c276ecf27 100644 --- a/src/IO/S3/URI.cpp +++ b/src/IO/S3/URI.cpp @@ -11,16 +11,17 @@ #include #include - namespace DB { struct URIConverter { - static void modifyURI(Poco::URI & uri, std::unordered_map mapper) + static void modifyURI(Poco::URI & uri, std::unordered_map mapper, bool enable_url_encoding) { Macros macros({{"bucket", uri.getHost()}}); - uri = macros.expand(mapper[uri.getScheme()]).empty() ? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery()); + uri = macros.expand(mapper[uri.getScheme()]).empty() + ? uri + : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery(), enable_url_encoding); } }; @@ -32,7 +33,7 @@ namespace ErrorCodes namespace S3 { -URI::URI(const std::string & uri_, bool allow_archive_path_syntax) +URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool enable_url_encoding) { /// Case when bucket name represented in domain name of S3 URL. /// E.g. (https://bucket-name.s3.region.amazonaws.com/key) @@ -54,7 +55,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax) else uri_str = uri_; - uri = Poco::URI(uri_str); + uri = Poco::URI(uri_str, enable_url_encoding); std::unordered_map mapper; auto context = Context::getGlobalContextInstance(); @@ -76,7 +77,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax) } if (!mapper.empty()) - URIConverter::modifyURI(uri, mapper); + URIConverter::modifyURI(uri, mapper, enable_url_encoding); } storage_name = "S3"; diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h index 8af05c177807..9bb9eec87e1e 100644 --- a/src/IO/S3/URI.h +++ b/src/IO/S3/URI.h @@ -36,7 +36,7 @@ struct URI bool is_virtual_hosted_style; URI() = default; - explicit URI(const std::string & uri_, bool allow_archive_path_syntax = false); + explicit URI(const std::string & uri_, bool allow_archive_path_syntax = false, bool enable_url_encoding = true); void addRegionToURI(const std::string & region); static void validateBucket(const std::string & bucket, const Poco::URI & uri); diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index f9a780b42007..c1754c9978da 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -155,6 +155,9 @@ static void insertNumber(IColumn & column, WhichDataType type, T value) case TypeIndex::DateTime64: assert_cast &>(column).insertValue(static_cast(value)); break; + case TypeIndex::Time64: + assert_cast &>(column).insertValue(static_cast(value)); + break; case TypeIndex::IPv4: assert_cast(column).insertValue(IPv4(static_cast(value))); break; @@ -303,6 +306,8 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro return createDecimalDeserializeFn(root_node, target_type, false); if (target.isDateTime64()) return createDecimalDeserializeFn(root_node, target_type, false); + if (target.isTime64()) + return createDecimalDeserializeFn(root_node, target_type, false); break; case avro::AVRO_INT: if (target_type->isValueRepresentedByNumber()) @@ -1282,8 +1287,11 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node) { case avro::Type::AVRO_INT: { - if (node->logicalType().type() == avro::LogicalType::DATE) + auto logical_type = node->logicalType(); + if (logical_type.type() == avro::LogicalType::DATE) return {std::make_shared()}; + if (logical_type.type() == avro::LogicalType::TIME_MILLIS) + return {std::make_shared(3)}; return {std::make_shared()}; } @@ -1294,6 +1302,10 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node) return {std::make_shared(3)}; if (logical_type.type() == avro::LogicalType::TIMESTAMP_MICROS) return {std::make_shared(6)}; + if (logical_type.type() == avro::LogicalType::TIME_MILLIS) + return {std::make_shared(3)}; + if (logical_type.type() == avro::LogicalType::TIME_MICROS) + return {std::make_shared(6)}; return std::make_shared(); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index 01ad28b2e593..c09bbec5b920 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -117,6 +117,8 @@ bool canDumpIcebergStats(const Field & field, DataTypePtr type) case TypeIndex::Date32: case TypeIndex::Int64: case TypeIndex::DateTime64: + case TypeIndex::Time: + case TypeIndex::Time64: case TypeIndex::String: return true; default: @@ -143,7 +145,9 @@ std::vector dumpFieldToBytes(const Field & field, DataTypePtr type) case TypeIndex::Date32: return dumpValue(field.safeGet()); case TypeIndex::Int64: + case TypeIndex::Time: return dumpValue(field.safeGet()); + case TypeIndex::Time64: case TypeIndex::DateTime64: return dumpValue(field.safeGet().getValue().value); case TypeIndex::String: diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp index 56a2cd976f6b..457baaece6cf 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp @@ -243,7 +243,7 @@ DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name, Cont if (type_name == f_date) return std::make_shared(); if (type_name == f_time) - return std::make_shared(); + return std::make_shared(6); if (type_name == f_timestamp) return std::make_shared(6); if (type_name == f_timestamptz) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index f4540f68bb8e..5bbe88148b5a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -302,6 +302,7 @@ std::pair getIcebergType(DataTypePtr type, Int32 & ite case TypeIndex::DateTime64: return {"timestamp", true}; case TypeIndex::Time: + case TypeIndex::Time64: return {"time", true}; case TypeIndex::String: return {"string", true}; diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index c4629582ee28..c0b3b5174262 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -341,7 +341,8 @@ std::pair resolveObjectStorageForPath( { normalized_path = "s3://" + target_decomposed.authority + "/" + target_decomposed.key; } - S3::URI s3_uri(normalized_path); + // enable_url_encoding=false, path from metadata must have correct encoding already + S3::URI s3_uri(normalized_path, /*allow_archive_path_syntax*/ false, /*enable_url_encoding*/ false); std::string key_to_use = s3_uri.key; @@ -365,7 +366,7 @@ std::pair resolveObjectStorageForPath( { normalized_table_location = "s3://" + table_location_decomposed.authority + "/" + table_location_decomposed.key; } - S3::URI base_s3_uri(normalized_table_location); + S3::URI base_s3_uri(normalized_table_location, /*allow_archive_path_syntax*/ false, /*enable_url_encoding*/ false); if (s3URIMatches(s3_uri, base_s3_uri.bucket, base_s3_uri.endpoint, target_scheme_normalized)) use_base_storage = true; diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 021a7d78d36e..28773c9585d6 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -5,7 +5,7 @@ import random import time import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, time as dtime import pyarrow as pa import pytest @@ -26,7 +26,8 @@ StringType, StructType, TimestampType, - TimestamptzType + TimestamptzType, + TimeType, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER @@ -939,3 +940,95 @@ def test_cluster_select(started_cluster): assert len(cluster_secondary_queries) == 1 assert node2.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`", settings={"parallel_replicas_for_cluster_engines":1, 'enable_parallel_replicas': 2, 'cluster_for_parallel_replicas': 'cluster_simple', 'parallel_replicas_for_cluster_engines' : 1}) == 'pablo\n' + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_partitioning_by_time(started_cluster, storage_type): + node = started_cluster.instances["node1"] + + test_ref = f"test_partitioning_by_time_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespace = f"{root_namespace}.A" + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(namespace) + + schema = Schema( + NestedField( + field_id=1, + name="key", + field_type=TimeType(), + required=False + ), + NestedField( + field_id=2, + name="value", + field_type=StringType(), + required=False, + ), + ) + + partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key" + ) + ) + + table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec) + data = [{"key": dtime(12,0,0), "value": "test"}] + df = pa.Table.from_pylist(data) + table.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "12:00:00.000000\ttest\n" + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_partitioning_by_string(started_cluster, storage_type): + node = started_cluster.instances["node1"] + + test_ref = f"test_partitioning_by_string_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespace = f"{root_namespace}.A" + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(namespace) + + schema = Schema( + NestedField( + field_id=1, + name="key", + field_type=StringType(), + required=False + ), + NestedField( + field_id=2, + name="value", + field_type=StringType(), + required=False, + ), + NestedField( + field_id=3, + name="time_value", + field_type=TimeType(), + required=False, + ), + ) + + partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key" + ) + ) + + table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec) + data = [{"key": "a:b,c[d=e/f%g?h", "value": "test", "time_value": dtime(12,0,0)}] + df = pa.Table.from_pylist(data) + table.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "a:b,c[d=e/f%g?h\ttest\t12:00:00.000000\n"