Skip to content

Fix HTTP streaming fetch_size handling#181

Open
iskakaushik wants to merge 9 commits intomainfrom
fix/http-streaming-state
Open

Fix HTTP streaming fetch_size handling#181
iskakaushik wants to merge 9 commits intomainfrom
fix/http-streaming-state

Conversation

@iskakaushik
Copy link
Copy Markdown
Collaborator

Fix HTTP streaming handle ownership and cancellation cleanup, and add basic fetch_size and cancel regression coverage.

The new fetch_size HTTP scan path did not preserve parser state
safely. It reinitialized the read buffer inside the per-tuple temp
context, could stop on arbitrary libcurl chunk boundaries, and
skipped the progress callback wiring used by the non-streaming
HTTP path. That left it vulnerable to stale offsets between
batches, partial-row parsing, and broken cancellation.

Track buffer lengths explicitly and only expose complete rows
from each streaming batch. Keep the parser state in the cursor
context, carry the HTTP connection on cursors so cancellation
kills the right query, and reuse the existing curl progress and
error-buffer setup for streaming requests. Also validate
fetch_size and add regression coverage for multi-batch HTTP
scans and HTTP query cancellation.

Tested with Docker-backed ClickHouse and local PG 18 via
`make tempcheck`; all 23 regression tests passed.
The current default of 0 disables HTTP streaming and buffers the full
response unless every server or table overrides the option. That makes
large result sets riskier than they need to be and turns an omitted
setting into unbounded memory growth.

Default fetch_size to 50000 instead, while preserving 0 as the
explicit opt-out for full-response buffering. Only the HTTP driver
uses the streaming path, so binary queries keep their existing
behavior.

Document the option at both the server and table levels so operators
can see the new default and know how to override it.
The streaming handle and cancellation fixes changed both the
exercised code paths and the expected output, but the regression
suite still only covered the streaming HTTP cancel path and several
versioned http expected files were stale. That left older Postgres
and ClickHouse matrix cells failing even when the implementation
was correct.

Add fetch_size coverage to http.sql, add a buffered HTTP cancel
case to query_cancel.sql, and update the versioned expected files
for the new ft1_stream coverage, CRLF database-name validation
output, and the ClickHouse 23 wording difference.

Tested with:
uv run pgch test --repo /Users/kaushik/git/pg_clickhouse --no-cache

Run ID: f7b5cadd
Results: 26 passed, 0 failed
The streaming follow-up patch did not match the project's pg_bsd_indent
output, so the lint workflow rewrote the touched files and failed.
Record the canonical formatter output for the affected declarations and
continuation lines so the branch matches CI.
RAII (constructors/destructors) is incompatible with PostgreSQL's
palloc/longjmp error handling — longjmp skips C++ destructors, leaking
CURL handles and malloc'd buffers.

Replace the HttpStream C++ class with a plain C struct allocated via
calloc. All internal allocations use malloc/free (no palloc), eliminating
longjmp risk inside the streaming driver. Replace psprintf calls in
setup_curl with snprintf to stack buffers.

In pglink.c, wrap the cursor setup in PG_TRY/FINALLY so that if any
palloc throws before the MemoryContextResetCallback is registered, the
stream is still cleaned up. Set stream = NULL after ownership transfer
to prevent double-free.

Additional fixes:
- Free prior error_msg before overwriting in pump error paths
- Set http_status = 419 on curl_multi_perform failure so the caller's
  status check catches it
- Add NULL check for curl_multi_init return value
- Improve error message from "out of memory" to a descriptive message
@theory theory added enhancement New feature or request drivers Improve binary and/or http driver support labels Apr 3, 2026
@iskakaushik iskakaushik force-pushed the fix/http-streaming-state branch from 228ff00 to c13d5d6 Compare April 3, 2026 20:33
DefElem *def = (DefElem *) lfirst(lc);

if (strcmp(def->defname, "fetch_size") == 0)
fpinfo->fetch_size = get_fetch_size_option(def);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add break?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Member

@serprex serprex Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you scan table first you can skip scanning server after table fetch_size found, but code ugly without goto

src/fdw.c.in Outdated
Comment on lines +927 to +929
cursor_fetch_row_method fetch_fn = fsstate->is_streaming
? fsstate->conn.methods->streaming_fetch_row
: fsstate->conn.methods->fetch_row;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need two paths? Could the streaming API handle the non-streaming API simply by grabbing everything if the max is 0?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll do it as a follow-up

src/fdw.c.in Outdated
: fsstate->conn.methods->fetch_row;

if (fetch_fn(&ctx) == NULL)
goto cleanup;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid goto?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

src/fdw.c.in Outdated
fsstate->ch_cursor = fsstate->conn.methods->simple_query(fsstate->conn.conn,
&query);
if (fsstate->fetch_size > 0
&& fsstate->conn.methods->streaming_query != NULL)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this likely to happen? Under what circumstances would we not be streaming a query when the fetch size wants it?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, no implementation for the binary driver, so you have to do this (for now).

Comment on lines -50 to -54
case '\0':
/* unexpected end */
state->done = true;
state->curpos++;
return CH_EOF;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this not happen anymore?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines -54 to -56
-- HTTP: a multi-way cross join produces a huge result that will take far
-- longer than the 10 ms timeout, exercising the curl progress-callback
-- cancel path.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to retain this comment describing how the cross-join produces a large resultset.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +97 to +101
SELECT count(*) FROM cancel_http_buf_ft;
count
-------
100
(1 row)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of this test?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/* ----------------------------------------------------------------
* setup_curl — configure the CURL easy handle for this query.
* Mirrors the setup portion of ch_http_simple_query() in http.c.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we eliminate duplication? DO we need the non-batch code anymore?


/* ----------------------------------------------------------------
* http_stream_pump — drive curl_multi until the next batch is ready
* or the transfer completes. Returns 0 on success, -1 on error.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a boolean?

Comment on lines +460 to +467
/*
* ch_http_stream_pump — public wrapper for http_stream_pump.
*/
int
ch_http_stream_pump(HttpStream * stream)
{
return http_stream_pump(stream);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just rename http_stream_pump to ch_http_stream_pump?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


for (size_t i = 0; i < stream->write_pos; i++)
{
if (stream->buf[i] == '\n' && ++rows >= (size_t) stream->fetch_size)
Copy link
Copy Markdown
Member

@serprex serprex Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch_size being based on rows instead of bytes seems odd to me, would be able to drop rows & just use i. then you get to start scan at fetch_size & stop at first newline (tho I'd want to memrchr to find last newline as fallback when no newline found ahead)

why do we return 0 instead of last newline when buf doesn't have fetch_size rows? need to see how this is being called to see if there's legitimate worry that you'd be streaming rows at too granular a pace, but seems unlikely

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also seems like when we decide we've read in enough to pause stream, we should process as much of stream as we've read in, then you can use memrchr to process everything up to last newline in buffer

@serprex
Copy link
Copy Markdown
Member

serprex commented Apr 3, 2026

some nits & questioning design decision of tying fetch_size to row counts (as a memory optimization, I've seen row counts consistently be too unreliable for being used to manage resource consumption), but the memory management looks good. reminds me of the tedious code I dealt with at citus

…etch_size

- Consolidate param_buf/header_buf/name_buf into single temp_buf in setup_curl
- Extract count_newlines() helper using memchr for SIMD-optimized scanning
- Use count_newlines() in write_callback and count_buffered_rows
- Rewrite find_batch_end with memchr; return up to last newline to reduce
  pause/resume cycles
- Change fetch_size type from int to int32 for pg_strtoint32 consistency
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

drivers Improve binary and/or http driver support enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants