Skip to content

[Feature] Add azurefunctions-extensions-bindings-kafka package for KafkaRecord binding #155

@TsuyoshiUshio

Description

@TsuyoshiUshio

Summary

Add a new azurefunctions-extensions-bindings-kafka package that enables Python users to bind to raw Apache Kafka records (KafkaRecord type) with full message metadata access (topic, partition, offset, key/value as bytes, headers, timestamp, leader epoch).

This is the Python implementation of Azure/azure-functions-kafka-extension#612. The host-side Kafka Extension 4.3.1 and .NET Isolated Worker (PR #3356) are already complete.

Background

The host-side Kafka Extension (4.3.1) serializes IKafkaEventData to Protobuf and sends it as ModelBindingData with:

  • source: "AzureKafkaRecord"
  • content_type: "application/x-protobuf"
  • content: Protobuf-encoded KafkaRecordProto

The Python worker already supports deferred bindings via DEFERRED_BINDING_REGISTRY which auto-discovers extension packages. The existing KafkaEvent / KafkaTriggerConverter in azure-functions-python-library handles simple types (string, bytes) — the new KafkaRecord will use the deferred binding path via a new extensions package.

Protobuf Schema

message KafkaRecordProto {
    string topic = 1;
    int32 partition = 2;
    int64 offset = 3;
    optional bytes key = 4;
    optional bytes value = 5;
    KafkaTimestampProto timestamp = 6;
    repeated KafkaHeaderProto headers = 7;
    optional int32 leader_epoch = 8;
    reserved 9 to 15;
}

message KafkaTimestampProto {
    int64 unix_timestamp_ms = 1;
    int32 type = 2; // 0=NotAvailable, 1=CreateTime, 2=LogAppendTime
}

message KafkaHeaderProto {
    string key = 1;
    optional bytes value = 2;
}

Required Changes

New package: azurefunctions-extensions-bindings-kafka/

Follow the azurefunctions-extensions-bindings-eventhub pattern:

azurefunctions-extensions-bindings-kafka/
├── setup.cfg (or pyproject.toml)
├── azurefunctions/
│   └── extensions/
│       └── bindings/
│           └── kafka/
│               ├── __init__.py              # Export KafkaRecord, KafkaHeader, etc.
│               ├── kafkaRecord.py           # SdkType wrapper — deserializes Protobuf
│               └── kafkaRecordConverter.py  # InConverter (binding='kafkaTrigger')
├── proto/
│   └── KafkaRecordProto.proto              # For protoc code generation
└── tests/
    └── test_kafka_record_converter.py
File Description
kafkaRecord.py KafkaRecord(SdkType) — wraps ModelBindingData, deserializes Protobuf on get_sdk_type(). Properties: topic, partition, offset, key (bytes), value (bytes), timestamp, headers, leader_epoch
kafkaRecordConverter.py KafkaRecordConverter(InConverter, binding='kafkaTrigger') — handles model_binding_data and collection_model_binding_data cases
__init__.py Export public types: KafkaRecord, KafkaHeader, KafkaTimestamp, KafkaTimestampType

Dependencies

[options]
install_requires =
    azurefunctions-extensions-base>=1.1.0
    protobuf>=4.0.0,<6.0

No changes needed in other repos

Repo Changes
azure-functions-python-library None — existing KafkaEvent / KafkaTriggerConverter unchanged (simple types path)
azure-functions-python-worker None — DEFERRED_BINDING_REGISTRY auto-discovers the new extension

User Experience

import azure.functions as func
from azurefunctions.extensions.bindings.kafka import KafkaRecord

app = func.FunctionApp()

@app.kafka_trigger(
    arg_name="record",
    topic="my-topic",
    broker_list="%BrokerList%",
    consumer_group="$Default")
def kafka_trigger(record: KafkaRecord):
    logging.info(f"Topic: {record.topic}")
    logging.info(f"Partition: {record.partition}")
    logging.info(f"Offset: {record.offset}")
    logging.info(f"Key: {record.key.decode('utf-8')}")
    logging.info(f"Timestamp: {record.timestamp.datetime}")

    for header in record.headers:
        logging.info(f"Header: {header.key} = {header.value.decode('utf-8')}")

# Batch mode
@app.kafka_trigger(..., cardinality=func.Cardinality.MANY)
def kafka_batch(records: list[KafkaRecord]):
    for record in records:
        logging.info(f"{record.topic}:{record.partition}:{record.offset}")

Breaking Changes

None. This is a new package. All existing Kafka bindings (str, bytes, KafkaEvent) continue to work via the existing library path.

Related Issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions