Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/IO/S3/URI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@
#include <boost/algorithm/string/case_conv.hpp>
#include <Poco/Util/AbstractConfiguration.h>


namespace DB
{

struct URIConverter
{
static void modifyURI(Poco::URI & uri, std::unordered_map<std::string, std::string> mapper)
static void modifyURI(Poco::URI & uri, std::unordered_map<std::string, std::string> mapper, bool enable_url_encoding)
{
Macros macros({{"bucket", uri.getHost()}});
uri = macros.expand(mapper[uri.getScheme()]).empty() ? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery());
uri = macros.expand(mapper[uri.getScheme()]).empty()
? uri
: Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery(), enable_url_encoding);
}
};

Expand All @@ -32,7 +33,7 @@ namespace ErrorCodes
namespace S3
{

URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool enable_url_encoding)
{
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.region.amazonaws.com/key)
Expand All @@ -54,7 +55,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
else
uri_str = uri_;

uri = Poco::URI(uri_str);
uri = Poco::URI(uri_str, enable_url_encoding);

std::unordered_map<std::string, std::string> mapper;
auto context = Context::getGlobalContextInstance();
Expand All @@ -76,7 +77,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
}

if (!mapper.empty())
URIConverter::modifyURI(uri, mapper);
URIConverter::modifyURI(uri, mapper, enable_url_encoding);
}

storage_name = "S3";
Expand Down
2 changes: 1 addition & 1 deletion src/IO/S3/URI.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ struct URI
bool is_virtual_hosted_style;

URI() = default;
explicit URI(const std::string & uri_, bool allow_archive_path_syntax = false);
explicit URI(const std::string & uri_, bool allow_archive_path_syntax = false, bool enable_url_encoding = true);
void addRegionToURI(const std::string & region);

static void validateBucket(const std::string & bucket, const Poco::URI & uri);
Expand Down
5 changes: 3 additions & 2 deletions src/Storages/ObjectStorage/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ std::pair<DB::ObjectStoragePtr, std::string> resolveObjectStorageForPath(
{
normalized_path = "s3://" + target_decomposed.authority + "/" + target_decomposed.key;
}
S3::URI s3_uri(normalized_path);
// enable_url_encoding=false, path from metadata must have correct encoding already
S3::URI s3_uri(normalized_path, /*allow_archive_path_syntax*/ false, /*enable_url_encoding*/ false);

std::string key_to_use = s3_uri.key;

Expand All @@ -365,7 +366,7 @@ std::pair<DB::ObjectStoragePtr, std::string> resolveObjectStorageForPath(
{
normalized_table_location = "s3://" + table_location_decomposed.authority + "/" + table_location_decomposed.key;
}
S3::URI base_s3_uri(normalized_table_location);
S3::URI base_s3_uri(normalized_table_location, /*allow_archive_path_syntax*/ false, /*enable_url_encoding*/ false);

if (s3URIMatches(s3_uri, base_s3_uri.bucket, base_s3_uri.endpoint, target_scheme_normalized))
use_base_storage = true;
Expand Down
93 changes: 91 additions & 2 deletions tests/integration/test_database_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import random
import time
import uuid
from datetime import datetime, timedelta
from datetime import datetime, timedelta, time as dtime

import pyarrow as pa
import pytest
Expand All @@ -26,7 +26,8 @@
StringType,
StructType,
TimestampType,
TimestamptzType
TimestamptzType,
TimeType,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER

Expand Down Expand Up @@ -939,3 +940,91 @@ def test_cluster_select(started_cluster):
assert len(cluster_secondary_queries) == 1

assert node2.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`", settings={"parallel_replicas_for_cluster_engines":1, 'enable_parallel_replicas': 2, 'cluster_for_parallel_replicas': 'cluster_simple', 'parallel_replicas_for_cluster_engines' : 1}) == 'pablo\n'


@pytest.mark.parametrize("storage_type", ["s3"])
def test_partitioning_by_time(started_cluster, storage_type):
node = started_cluster.instances["node1"]

test_ref = f"test_partitioning_by_time_{uuid.uuid4()}"
table_name = f"{test_ref}_table"
root_namespace = f"{test_ref}_namespace"

namespace = f"{root_namespace}.A"
catalog = load_catalog_impl(started_cluster)
catalog.create_namespace(namespace)

schema = Schema(
NestedField(
field_id=1,
name="key",
field_type=TimeType(),
required=False
),
NestedField(
field_id=2,
name="value",
field_type=StringType(),
required=False,
),
)

partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key"
)
)

table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec)
data = [{"key": dtime(12,0,0), "value": "test"}]
df = pa.Table.from_pylist(data)
table.append(df)

create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)

# Fix test when https://github.com/Altinity/ClickHouse/issues/15355 is resolved
# Must be 43200
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "43200000000\ttest\n"


@pytest.mark.parametrize("storage_type", ["s3"])
def test_partitioning_by_string(started_cluster, storage_type):
node = started_cluster.instances["node1"]

test_ref = f"test_partitioning_by_string_{uuid.uuid4()}"
table_name = f"{test_ref}_table"
root_namespace = f"{test_ref}_namespace"

namespace = f"{root_namespace}.A"
catalog = load_catalog_impl(started_cluster)
catalog.create_namespace(namespace)

schema = Schema(
NestedField(
field_id=1,
name="key",
field_type=StringType(),
required=False
),
NestedField(
field_id=2,
name="value",
field_type=StringType(),
required=False,
),
)

partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key"
)
)

table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec)
data = [{"key": "a:b,c[d=e/f%g?h", "value": "test"}]
df = pa.Table.from_pylist(data)
table.append(df)

create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)

assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "a:b,c[d=e/f%g?h\ttest\n"
Loading