Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "clickhouse-cpp"]
path = vendor/clickhouse-cpp
url = https://github.com/ClickHouse/clickhouse-cpp.git
url = git@github.com:iskakaushik/clickhouse-cpp.git
9 changes: 9 additions & 0 deletions doc/pg_clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ The supported options are:
* `dbname`: The ClickHouse database to use upon connecting. Defaults to
"default".
* `host`: The host name of the ClickHouse server. Defaults to "localhost";
* `fetch_size`: The number of remote rows to buffer per streamed fetch while
scanning foreign tables with the `binary` driver. Defaults to `50000`. Set
it to `0` to disable streaming for tables that use this server and fall
back to buffering the full result set in memory. The `http` driver always
uses the buffered query path.
* `port`: The port to connect to on the ClickHouse server. Defaults as
follows:
* 9440 if `driver` is "binary" and `host` is a ClickHouse Cloud host
Expand Down Expand Up @@ -299,6 +304,10 @@ The supported table options are:
defined for the foreign server.
* `table_name`: The name of the remote table. Default to the name specified
for the foreign table.
* `fetch_size`: Overrides the server-level `fetch_size` for this table only.
Use it to make scans buffer fewer or more remote rows at a time, or set it
to `0` to disable streaming for this table. Streaming currently applies
only to tables that use the `binary` driver.
* `engine`: The [table engine] used by the ClickHouse table. For
`CollapsingMergeTree()` and `AggregatingMergeTree()`, pg_clickhouse
automatically applies the parameters to function expressions executed on
Expand Down
25 changes: 16 additions & 9 deletions src/binary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ extern "C"
#include "utils/uuid.h"

#include "binary.hh"

using namespace clickhouse;
#include "binary_internal.hh"

#if defined(__APPLE__) /* Byte ordering on macOS */
#include <libkern/OSByteOrder.h>
Expand All @@ -55,6 +54,8 @@ extern "C"
#define BIG_ENDIAN_64_TO_HOST(x) be64toh(x)
#endif

using namespace clickhouse;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad indent

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I couldn't figure out how to get clang-format not to do that. You can ignore it.


#define THROW_UNEXPECTED_COLUMN(exp_type, col) \
throw std::runtime_error("unexpected column type for " + std::string(exp_type) + ": " + col->Type()->GetName())

Expand Down Expand Up @@ -181,7 +182,7 @@ extern "C"
/*
* Converts query->settings to QuerySettings.
*/
static QuerySettings
QuerySettings
ch_binary_settings(const ch_query *query)
{
kv_iter iter;
Expand All @@ -198,7 +199,7 @@ extern "C"
/*
* Converts query->param_values to QueryParams.
*/
static QueryParams
QueryParams
ch_binary_params(const ch_query *query)
{
int i;
Expand Down Expand Up @@ -232,6 +233,12 @@ extern "C"
client->Select(clickhouse::Query(query->sql)
.SetQuerySettings(ch_binary_settings(query))
.SetParams(ch_binary_params(query))
.OnProgress(
[&check_cancel](const Progress &)
{
if (check_cancel && check_cancel())
throw std::runtime_error("query was canceled");
})
.OnDataCancelable(
[&resp, &values, &check_cancel](const Block &block)
{
Expand Down Expand Up @@ -836,8 +843,8 @@ extern "C"
* There is not an adequate (without huge overheads) solution, we just consider
* this state unfixable.
*/
static Datum
make_datum(clickhouse::ColumnRef col, size_t row, Oid *valtype, bool *is_null)
Datum
ch_binary_make_datum(clickhouse::ColumnRef col, size_t row, Oid *valtype, bool *is_null)
{
Datum ret = (Datum)0;

Expand Down Expand Up @@ -1088,7 +1095,7 @@ extern "C"
slot->nulls = (bool *)exc_palloc0(sizeof(bool) * len);

for (size_t i = 0; i < len; ++i)
slot->datums[i] = make_datum(arr, i, &slot->item_type, &slot->nulls[i]);
slot->datums[i] = ch_binary_make_datum(arr, i, &slot->item_type, &slot->nulls[i]);
}

/* this one will need additional work, since we just return raw slot */
Expand All @@ -1112,7 +1119,7 @@ extern "C"
slot->len = len;

for (size_t i = 0; i < len; ++i)
slot->datums[i] = make_datum((*tuple)[i], row, &slot->types[i], &slot->nulls[i]);
slot->datums[i] = ch_binary_make_datum((*tuple)[i], row, &slot->types[i], &slot->nulls[i]);

/* this one will need additional work, since we just return raw slot */
ret = PointerGetDatum(slot);
Expand Down Expand Up @@ -1173,7 +1180,7 @@ extern "C"
for (size_t i = 0; i < state->resp->columns_count; i++)
{
/* fill value and null arrays */
state->values[i] = make_datum(block[i], state->row, &state->coltypes[i], &state->nulls[i]);
state->values[i] = ch_binary_make_datum(block[i], state->row, &state->coltypes[i], &state->nulls[i]);
}
res = true;

Expand Down
272 changes: 272 additions & 0 deletions src/binary_streaming.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
#include <memory>
#include <new>
#include <optional>
#include <string>

#include <clickhouse/client.h>
#include <clickhouse/query.h>

extern "C"
{

#include "postgres.h"
#include "internal.h"
#include "engine.h"

}

#include "binary_internal.hh"

using namespace clickhouse;

static const char kBinaryStreamingCanceled[] = "query was canceled";
static const char kBinaryStreamingOom[] = "out of memory";

struct ch_binary_streaming_state
{
/* Current block returned by clickhouse-cpp. */
std::optional<Block> current_block;
size_t current_row = 0;
bool have_block = false;
bool done = false;

std::unique_ptr<Oid[]> coltypes;
std::unique_ptr<Datum[]> values;
std::unique_ptr<bool[]> nulls;
size_t columns_count = 0;

std::optional<std::string> error;
bool (*check_cancel) (void) = nullptr;

Client *client = nullptr;
std::string sql;
QuerySettings settings;
QueryParams params;

void
SetError(const char *message)
{
if (error)
return;
error.emplace(message ? message : kBinaryStreamingOom);
}

const char *
GetError() const
{
return error ? error->c_str() : nullptr;
}
};

static bool
binary_streaming_fill_block(ch_binary_streaming_state * st)
{
std::optional<Block> block;

for (;;)
{
try
{
if (st->check_cancel && st->check_cancel())
{
st->SetError(kBinaryStreamingCanceled);
st->done = true;
return false;
}

block = st->client->ReceiveSelectBlock();
}
catch (const std::exception & e)
{
st->SetError(e.what());
st->done = true;
return false;
}

if (!block)
{
try
{
st->client->EndSelect();
}
catch (const std::exception & e)
{
st->SetError(e.what());
}
st->current_block.reset();
st->have_block = false;
st->done = true;
return false;
}

/* Match the old callback path, which ignored zero-column blocks. */
if (block->GetColumnCount() == 0)
continue;

if (st->columns_count != 0 &&
block->GetColumnCount() != st->columns_count)
{
st->SetError("columns mismatch in blocks");
st->done = true;
return false;
}

st->current_block = std::move(block);
st->columns_count = st->current_block->GetColumnCount();
st->have_block = true;
st->current_row = 0;
return true;
}
}

extern "C"
{

ch_binary_streaming_state *
ch_binary_begin_streaming(ch_binary_connection_t * conn,
const ch_query * query,
bool (*check_cancel) (void))
{
ch_binary_streaming_state *st;

st = new (std::nothrow) ch_binary_streaming_state();
if (!st)
return NULL;

st->check_cancel = check_cancel;
st->client = (Client *) conn->client;
st->sql = query->sql;
st->settings = ch_binary_settings(query);
st->params = ch_binary_params(query);

try
{
st->client->BeginSelect(clickhouse::Query(st->sql)
.SetQuerySettings(st->settings)
.SetParams(st->params));
}
catch (const std::exception & e)
{
st->SetError(e.what());
st->done = true;
return st;
}

(void) binary_streaming_fill_block(st);

return st;
}

bool
ch_binary_fetch_block(ch_binary_streaming_state * st)
{
if (!st || st->done)
return false;
if (st->have_block)
return true;

return binary_streaming_fill_block(st);
}

bool
ch_binary_streaming_read_row(ch_binary_streaming_state * st)
{
Block *block;
size_t row_count;

if (!st || !st->have_block || !st->current_block)
return false;

block = &*st->current_block;
row_count = block->GetRowCount();
if (st->current_row >= row_count)
{
st->have_block = false;
return false;
}

if (!st->coltypes && st->columns_count > 0)
{
st->coltypes.reset(new (std::nothrow) Oid[st->columns_count]);
st->values.reset(new (std::nothrow) Datum[st->columns_count]);
st->nulls.reset(new (std::nothrow) bool[st->columns_count]);
if (!st->coltypes || !st->values || !st->nulls)
{
st->SetError(kBinaryStreamingOom);
return false;
}
}

try
{
for (size_t i = 0; i < st->columns_count; i++)
{
st->values[i] = ch_binary_make_datum((*block)[i], st->current_row,
&st->coltypes[i], &st->nulls[i]);
}
}
catch (const std::exception & e)
{
st->SetError(e.what());
return false;
}

st->current_row++;
return true;
}

size_t
ch_binary_streaming_columns(ch_binary_streaming_state * st)
{
return st ? st->columns_count : 0;
}

Datum
ch_binary_streaming_value(ch_binary_streaming_state * st, size_t col,
Oid * valtype, bool * is_null)
{
if (!st || col >= st->columns_count)
{
*is_null = true;
*valtype = InvalidOid;
return (Datum) 0;
}

*valtype = st->coltypes[col];
*is_null = st->nulls[col];
return st->values[col];
}

const char *
ch_binary_streaming_error(ch_binary_streaming_state * st)
{
return st ? st->GetError() : NULL;
}

void
ch_binary_end_streaming(ch_binary_streaming_state * st)
{
if (!st)
return;

try
{
if (st->client)
st->client->EndSelect();
}
catch (const std::exception &)
{
try
{
if (st->client)
st->client->ResetConnection();
}
catch (const std::exception &)
{
}
}

delete st;
}

}
Loading
Loading