Skip to content

[Feature] Kafka Share Group support for Routine Load #63190

@Shekharrajak

Description

@Shekharrajak

Kafka 4.0 introduced Share Groups (KIP-932): a consumption model where multiple consumers in a group share access to the same partitions without exclusive ownership. The broker tracks per-record delivery state (ACQUIRED → ACCEPTED/RELEASED/REJECTED) with an acquisition lock timeout as the crash-safety mechanism.

Doris Routine Load supports traditional Kafka's assign() API where Parallelism is capped by partition count and Partition skew is unaddressable.

If we have support for Kafka Queue Semantics (Share group implementation) then it will benefits the usecases of: append-only tables, log/metrics ingestion, high-throughput order-insensitive workloads.

Proposal

Add an optional LOAD_MODE = "share_group" property to CREATE ROUTINE LOAD. When set:

• divideRoutineLoadJob() creates N tasks with no partition assignment — all tasks subscribe to the same topic via KafkaShareConsumer with the same group.id (job id).
• The BE task loop becomes a single sequential poll() → pipe → commit_txn → ack cycle per task thread, replacing the current N-thread BlockingQueue model.
• On Doris txn commit success: send ACCEPT ack + commitSync().
• On Doris txn commit failure: send RELEASE ack + commitSync() — broker requeues records, next poll gets them back.

Discussion : #63265

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions