From 11f48ef7c27e0a75e890f25f967eda30af8bfe3c Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Thu, 23 Apr 2026 22:10:05 -0700 Subject: [PATCH] feat: Add azurefunctions-extensions-bindings-kafka package for KafkaRecord binding New package enabling Python users to bind to raw Apache Kafka records with full metadata access via Protobuf deserialization. Package structure (follows EventHub pattern): - kafkaRecord.py: KafkaRecord(SdkType) with Protobuf decoder, KafkaHeader, KafkaTimestamp, KafkaTimestampType types - kafkaRecordConverter.py: InConverter for kafkaTrigger binding, supports model_binding_data (single) and collection_model_binding_data (batch) - 11 unit tests covering full record, null key/value, null leader epoch, unknown timestamp type, batch, headers, type annotations - Sample app with function_app.py, host.json, local.settings.json, README - CI: Added to build.yml, build-artifacts.yml, unit-tests.yml matrices No changes needed to python-library or python-worker. Fixes Azure/azure-functions-python-extensions#155 Relates to Azure/azure-functions-kafka-extension#612 Co-authored-by: Dobby --- .../MANIFEST.in | 3 + .../README.md | 53 ++++ .../extensions/bindings/kafka/__init__.py | 15 + .../extensions/bindings/kafka/kafkaRecord.py | 253 ++++++++++++++++ .../bindings/kafka/kafkaRecordConverter.py | 69 +++++ .../pyproject.toml | 50 +++ .../kafka_samples_kafkarecord/README.md | 72 +++++ .../kafka_samples_kafkarecord/function_app.py | 73 +++++ .../kafka_samples_kafkarecord/host.json | 15 + .../local.settings.json | 8 + .../requirements.txt | 6 + .../tests/__init__.py | 0 .../tests/test_kafka_record.py | 284 ++++++++++++++++++ eng/templates/jobs/build.yml | 3 + .../official/jobs/build-artifacts.yml | 3 + eng/templates/official/jobs/unit-tests.yml | 25 +- 16 files changed, 931 insertions(+), 1 deletion(-) create mode 100644 azurefunctions-extensions-bindings-kafka/MANIFEST.in create mode 100644 azurefunctions-extensions-bindings-kafka/README.md create mode 100644 azurefunctions-extensions-bindings-kafka/azurefunctions/extensions/bindings/kafka/__init__.py create mode 100644 azurefunctions-extensions-bindings-kafka/azurefunctions/extensions/bindings/kafka/kafkaRecord.py create mode 100644 azurefunctions-extensions-bindings-kafka/azurefunctions/extensions/bindings/kafka/kafkaRecordConverter.py create mode 100644 azurefunctions-extensions-bindings-kafka/pyproject.toml create mode 100644 azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/README.md create mode 100644 azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/function_app.py create mode 100644 azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/host.json create mode 100644 azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/local.settings.json create mode 100644 azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/requirements.txt create mode 100644 azurefunctions-extensions-bindings-kafka/tests/__init__.py create mode 100644 azurefunctions-extensions-bindings-kafka/tests/test_kafka_record.py diff --git a/azurefunctions-extensions-bindings-kafka/MANIFEST.in b/azurefunctions-extensions-bindings-kafka/MANIFEST.in new file mode 100644 index 00000000..75e636af --- /dev/null +++ b/azurefunctions-extensions-bindings-kafka/MANIFEST.in @@ -0,0 +1,3 @@ +recursive-include azure *.py *.pyi +recursive-include tests *.py +include LICENSE README.md diff --git a/azurefunctions-extensions-bindings-kafka/README.md b/azurefunctions-extensions-bindings-kafka/README.md new file mode 100644 index 00000000..5faeafc6 --- /dev/null +++ b/azurefunctions-extensions-bindings-kafka/README.md @@ -0,0 +1,53 @@ +# azurefunctions-extensions-bindings-kafka + +Kafka Python worker extension for Azure Functions — raw `KafkaRecord` binding with full metadata access. + +## Installation + +```bash +pip install azurefunctions-extensions-bindings-kafka +``` + +## Usage + +```python +import logging +import azure.functions as func +import azurefunctions.extensions.bindings.kafka as kafka + +app = func.FunctionApp() + +@app.kafka_trigger( + arg_name="record", + topic="my-topic", + broker_list="%BrokerList%", + consumer_group="$Default") +def kafka_trigger(record: kafka.KafkaRecord): + logging.info(f"Topic: {record.topic}, Partition: {record.partition}") + logging.info(f"Value: {record.value.decode('utf-8')}") + + for header in record.headers: + logging.info(f"Header: {header.key} = {header.get_value_as_string()}") +``` + +## KafkaRecord Properties + +| Property | Type | Description | +|----------|------|-------------| +| `topic` | `str` | Topic name | +| `partition` | `int` | Partition number | +| `offset` | `int` | Offset within partition | +| `key` | `bytes \| None` | Raw key bytes | +| `value` | `bytes \| None` | Raw value bytes | +| `timestamp` | `KafkaTimestamp` | Timestamp with `unix_timestamp_ms`, `type`, `datetime` | +| `headers` | `list[KafkaHeader]` | Headers with `key` (str) and `value` (bytes \| None) | +| `leader_epoch` | `int \| None` | Leader epoch | + +## Samples + +See [`samples/kafka_samples_kafkarecord/`](./samples/kafka_samples_kafkarecord/) for a complete working example. + +## Related + +- [Azure Functions Kafka Extension](https://github.com/Azure/azure-functions-kafka-extension) +- [Kafka bindings documentation](https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-kafka) diff --git a/azurefunctions-extensions-bindings-kafka/azurefunctions/extensions/bindings/kafka/__init__.py b/azurefunctions-extensions-bindings-kafka/azurefunctions/extensions/bindings/kafka/__init__.py new file mode 100644 index 00000000..2a72cc5b --- /dev/null +++ b/azurefunctions-extensions-bindings-kafka/azurefunctions/extensions/bindings/kafka/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .kafkaRecord import KafkaRecord, KafkaHeader, KafkaTimestamp, KafkaTimestampType +from .kafkaRecordConverter import KafkaRecordConverter + +__all__ = [ + "KafkaRecord", + "KafkaHeader", + "KafkaTimestamp", + "KafkaTimestampType", + "KafkaRecordConverter", +] + +__version__ = "0.1.0b1" diff --git a/azurefunctions-extensions-bindings-kafka/azurefunctions/extensions/bindings/kafka/kafkaRecord.py b/azurefunctions-extensions-bindings-kafka/azurefunctions/extensions/bindings/kafka/kafkaRecord.py new file mode 100644 index 00000000..fad921c5 --- /dev/null +++ b/azurefunctions-extensions-bindings-kafka/azurefunctions/extensions/bindings/kafka/kafkaRecord.py @@ -0,0 +1,253 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +""" +KafkaRecord type for raw Apache Kafka record binding. + +Deserializes Protobuf-encoded ModelBindingData from the host-side Kafka Extension +into a Python object with full metadata access. +""" + +from datetime import datetime, timezone +from enum import IntEnum +from typing import List, Optional + +from google.protobuf.internal.decoder import _DecodeVarint # type: ignore +from google.protobuf.internal.wire_format import ( # type: ignore + WIRETYPE_VARINT, + WIRETYPE_LENGTH_DELIMITED, +) +from azurefunctions.extensions.base import Datum, SdkType + + +class KafkaTimestampType(IntEnum): + """Defines the type of a Kafka record timestamp.""" + NotAvailable = 0 + CreateTime = 1 + LogAppendTime = 2 + + +class KafkaTimestamp: + """Represents the timestamp of a Kafka record.""" + + __slots__ = ('unix_timestamp_ms', 'type') + + def __init__(self, unix_timestamp_ms: int, ts_type: KafkaTimestampType): + self.unix_timestamp_ms = unix_timestamp_ms + self.type = ts_type + + @property + def datetime(self) -> datetime: + """Returns the timestamp as a UTC datetime.""" + return datetime.fromtimestamp( + self.unix_timestamp_ms / 1000.0, tz=timezone.utc + ) + + +class KafkaHeader: + """Represents a single Kafka record header.""" + + __slots__ = ('key', 'value') + + def __init__(self, key: str, value: Optional[bytes]): + self.key = key + self.value = value + + def get_value_as_string(self, encoding: str = 'utf-8') -> Optional[str]: + """Returns the header value as a string, or None if value is None.""" + return self.value.decode(encoding) if self.value is not None else None + + +class KafkaRecord(SdkType): + """ + Represents a raw Apache Kafka record with full metadata. + Key and value are raw bytes — the user controls deserialization. + """ + + def __init__(self, *, data: Datum) -> None: + self._data = data + self._content = None + self._parsed = False + # Fields populated after parsing + self._topic: str = "" + self._partition: int = 0 + self._offset: int = 0 + self._key: Optional[bytes] = None + self._value: Optional[bytes] = None + self._timestamp: Optional[KafkaTimestamp] = None + self._headers: List[KafkaHeader] = [] + self._leader_epoch: Optional[int] = None + + if self._data: + self._content = data.content + + def get_sdk_type(self) -> 'KafkaRecord': + """Deserialize Protobuf content and return self with populated fields.""" + if not self._parsed: + if not self._content: + raise ValueError( + f"Unable to create {self.__class__.__name__}: " + "content is empty." + ) + self._decode_proto(self._content) + self._parsed = True + return self + + @property + def topic(self) -> str: + return self._topic + + @property + def partition(self) -> int: + return self._partition + + @property + def offset(self) -> int: + return self._offset + + @property + def key(self) -> Optional[bytes]: + return self._key + + @property + def value(self) -> Optional[bytes]: + return self._value + + @property + def timestamp(self) -> Optional[KafkaTimestamp]: + return self._timestamp + + @property + def headers(self) -> List[KafkaHeader]: + return self._headers + + @property + def leader_epoch(self) -> Optional[int]: + return self._leader_epoch + + def _decode_proto(self, content: bytes) -> None: + """ + Decode KafkaRecordProto from Protobuf wire format. + + Proto schema field numbers: + 1: topic (string) + 2: partition (int32) + 3: offset (int64) + 4: key (optional bytes) + 5: value (optional bytes) + 6: timestamp (message) + 7: headers (repeated message) + 8: leader_epoch (optional int32) + """ + pos = 0 + end = len(content) + has_key = False + has_value = False + has_leader_epoch = False + + while pos < end: + # Decode field tag + tag, new_pos = _DecodeVarint(content, pos) + pos = new_pos + field_number = tag >> 3 + wire_type = tag & 0x7 + + if field_number == 1 and wire_type == WIRETYPE_LENGTH_DELIMITED: + # topic: string + length, pos = _DecodeVarint(content, pos) + self._topic = content[pos:pos + length].decode('utf-8') + pos += length + elif field_number == 2 and wire_type == WIRETYPE_VARINT: + # partition: int32 + val, pos = _DecodeVarint(content, pos) + self._partition = val + elif field_number == 3 and wire_type == WIRETYPE_VARINT: + # offset: int64 + val, pos = _DecodeVarint(content, pos) + self._offset = val + elif field_number == 4 and wire_type == WIRETYPE_LENGTH_DELIMITED: + # key: optional bytes + has_key = True + length, pos = _DecodeVarint(content, pos) + self._key = content[pos:pos + length] + pos += length + elif field_number == 5 and wire_type == WIRETYPE_LENGTH_DELIMITED: + # value: optional bytes + has_value = True + length, pos = _DecodeVarint(content, pos) + self._value = content[pos:pos + length] + pos += length + elif field_number == 6 and wire_type == WIRETYPE_LENGTH_DELIMITED: + # timestamp: message + length, pos = _DecodeVarint(content, pos) + ts_end = pos + length + ts_ms = 0 + ts_type = 0 + while pos < ts_end: + ts_tag, pos = _DecodeVarint(content, pos) + ts_field = ts_tag >> 3 + if ts_field == 1: # unix_timestamp_ms + ts_ms, pos = _DecodeVarint(content, pos) + elif ts_field == 2: # type + ts_type, pos = _DecodeVarint(content, pos) + else: + # Skip unknown field + pos = self._skip_field(content, pos, ts_tag & 0x7) + try: + ts_enum = KafkaTimestampType(ts_type) + except ValueError: + ts_enum = KafkaTimestampType.NotAvailable + self._timestamp = KafkaTimestamp(ts_ms, ts_enum) + elif field_number == 7 and wire_type == WIRETYPE_LENGTH_DELIMITED: + # headers: repeated message + length, pos = _DecodeVarint(content, pos) + h_end = pos + length + h_key = "" + h_value: Optional[bytes] = None + h_has_value = False + while pos < h_end: + h_tag, pos = _DecodeVarint(content, pos) + h_field = h_tag >> 3 + if h_field == 1: # key: string + h_len, pos = _DecodeVarint(content, pos) + h_key = content[pos:pos + h_len].decode('utf-8') + pos += h_len + elif h_field == 2: # value: optional bytes + h_has_value = True + h_len, pos = _DecodeVarint(content, pos) + h_value = content[pos:pos + h_len] + pos += h_len + else: + pos = self._skip_field(content, pos, h_tag & 0x7) + self._headers.append( + KafkaHeader(h_key, h_value if h_has_value else None) + ) + elif field_number == 8 and wire_type == WIRETYPE_VARINT: + # leader_epoch: optional int32 + has_leader_epoch = True + val, pos = _DecodeVarint(content, pos) + self._leader_epoch = val + else: + # Skip unknown field + pos = self._skip_field(content, pos, wire_type) + + if not has_key: + self._key = None + if not has_value: + self._value = None + if not has_leader_epoch: + self._leader_epoch = None + + @staticmethod + def _skip_field(content: bytes, pos: int, wire_type: int) -> int: + """Skip a protobuf field based on its wire type.""" + if wire_type == WIRETYPE_VARINT: + _, pos = _DecodeVarint(content, pos) + elif wire_type == WIRETYPE_LENGTH_DELIMITED: + length, pos = _DecodeVarint(content, pos) + pos += length + elif wire_type == 5: # 32-bit + pos += 4 + elif wire_type == 1: # 64-bit + pos += 8 + return pos diff --git a/azurefunctions-extensions-bindings-kafka/azurefunctions/extensions/bindings/kafka/kafkaRecordConverter.py b/azurefunctions-extensions-bindings-kafka/azurefunctions/extensions/bindings/kafka/kafkaRecordConverter.py new file mode 100644 index 00000000..11b67076 --- /dev/null +++ b/azurefunctions-extensions-bindings-kafka/azurefunctions/extensions/bindings/kafka/kafkaRecordConverter.py @@ -0,0 +1,69 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import collections.abc +from typing import Any, Optional, get_args, get_origin + +from azurefunctions.extensions.base import Datum, InConverter, OutConverter +from .kafkaRecord import KafkaRecord + + +class KafkaRecordConverter( + InConverter, + OutConverter, + binding="kafka", + trigger="kafkaTrigger", +): + @classmethod + def check_input_type_annotation(cls, pytype: type) -> bool: + if pytype is None: + return False + + if isinstance(pytype, type) and issubclass(pytype, KafkaRecord): + return True + + return cls._is_iterable_supported_type(pytype) + + @classmethod + def _is_iterable_supported_type(cls, annotation: type) -> bool: + base_type = get_origin(annotation) + if base_type is None or not issubclass(base_type, collections.abc.Iterable): + return False + + inner_types = get_args(annotation) + if inner_types is None or len(inner_types) != 1: + return False + + inner_type = inner_types[0] + return isinstance(inner_type, type) and issubclass(inner_type, KafkaRecord) + + @classmethod + def decode(cls, data: Datum, *, trigger_metadata, pytype) -> Optional[Any]: + """ + Kafka allows for batches. This means the cardinality can be one or many. + When the cardinality is one: + - data is of type "model_binding_data" + When the cardinality is many: + - data is of type "collection_model_binding_data" + """ + if data is None or data.type is None: + return None + + if data.type == "collection_model_binding_data": + try: + return [ + KafkaRecord(data=mbd).get_sdk_type() + for mbd in data.value.model_binding_data + ] + except Exception as e: + raise ValueError( + "Failed to decode incoming Kafka batch: " + repr(e) + ) from e + + if data.type == "model_binding_data": + return KafkaRecord(data=data.value).get_sdk_type() + + raise ValueError( + "Unexpected type of data received for the 'kafka' binding: " + + repr(data.type) + ) diff --git a/azurefunctions-extensions-bindings-kafka/pyproject.toml b/azurefunctions-extensions-bindings-kafka/pyproject.toml new file mode 100644 index 00000000..23d23b9d --- /dev/null +++ b/azurefunctions-extensions-bindings-kafka/pyproject.toml @@ -0,0 +1,50 @@ +[build-system] +requires = ["setuptools >= 61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "azurefunctions-extensions-bindings-kafka" +dynamic = ["version"] +requires-python = ">=3.9" +authors = [{ name = "Azure Functions team at Microsoft Corp.", email = "azurefunctions@microsoft.com"}] +description = "Kafka Python worker extension for Azure Functions." +readme = "README.md" +license = {text = "MIT License"} +classifiers= [ + 'License :: OSI Approved :: MIT License', + 'Intended Audience :: Developers', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', + 'Programming Language :: Python :: 3.13', + 'Operating System :: Microsoft :: Windows', + 'Operating System :: POSIX', + 'Operating System :: MacOS :: MacOS X', + 'Environment :: Web Environment', + 'Development Status :: 4 - Beta', + ] +dependencies = [ + 'azurefunctions-extensions-base>=1.1.0', + 'protobuf>=4.0.0,<6.0' + ] + +[project.optional-dependencies] +dev = [ + 'pytest', + 'pytest-cov', + 'coverage', + 'pytest-instafail', + 'pre-commit', + 'mypy', + 'flake8' + ] + +[tool.setuptools.dynamic] +version = {attr = "azurefunctions.extensions.bindings.kafka.__version__"} + +[tool.setuptools.packages.find] +exclude = [ + 'azurefunctions.extensions.bindings','azurefunctions.extensions', + 'azurefunctions', 'tests', 'samples' + ] diff --git a/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/README.md b/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/README.md new file mode 100644 index 00000000..527a81b0 --- /dev/null +++ b/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/README.md @@ -0,0 +1,72 @@ +# Kafka Record Trigger Sample + +This sample demonstrates how to use the `azurefunctions-extensions-bindings-kafka` extension to bind to raw Apache Kafka records with full metadata access. + +## Overview + +- Access raw key/value as `bytes` (user controls deserialization) +- Read record metadata: topic, partition, offset, timestamp, leader epoch +- Iterate over Kafka headers +- Supports single and batch (cardinality=MANY) modes + +## Prerequisites + +- Python 3.9+ +- [Azure Functions Core Tools](https://learn.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4 +- A Kafka broker (Confluent Cloud, Azure Event Hubs with Kafka endpoint, or local Docker broker) + +## Quick Start + +### 1. Create a virtual environment + +```bash +python -m venv .venv +source .venv/bin/activate # Linux/macOS +# .venv\Scripts\Activate.ps1 # Windows PowerShell +``` + +### 2. Install dependencies + +```bash +pip install -r requirements.txt +``` + +### 3. Configure local settings + +Edit `local.settings.json` and set `BrokerList` to your Kafka broker: + +```json +{ + "Values": { + "BrokerList": "localhost:9092" + } +} +``` + +### 4. Run + +```bash +func start +``` + +### 5. Produce messages + +Send messages to `my-topic` using any Kafka producer. + +## KafkaRecord Properties + +| Property | Type | Description | +|----------|------|-------------| +| `topic` | `str` | Topic name | +| `partition` | `int` | Partition number | +| `offset` | `int` | Offset within partition | +| `key` | `bytes \| None` | Raw key bytes | +| `value` | `bytes \| None` | Raw value bytes | +| `timestamp` | `KafkaTimestamp` | Timestamp with `unix_timestamp_ms`, `type`, `datetime` | +| `headers` | `list[KafkaHeader]` | Headers with `key` (str) and `value` (bytes) | +| `leader_epoch` | `int \| None` | Leader epoch | + +## Related + +- [Azure Functions Kafka Extension](https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-kafka) +- [Parent issue: Azure/azure-functions-kafka-extension#612](https://github.com/Azure/azure-functions-kafka-extension/issues/612) diff --git a/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/function_app.py b/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/function_app.py new file mode 100644 index 00000000..11ad4ce8 --- /dev/null +++ b/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/function_app.py @@ -0,0 +1,73 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- + +import logging +from typing import List + +import azure.functions as func +import azurefunctions.extensions.bindings.kafka as kafka + +app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION) + +""" +FOLDER: kafka_samples_kafkarecord +DESCRIPTION: + These samples demonstrate how to obtain a KafkaRecord from a Kafka Trigger. +USAGE: + Configure your Kafka broker connection in local.settings.json. + The BrokerList setting should point to your Kafka bootstrap server(s). +""" + + +@app.kafka_trigger( + arg_name="record", + topic="my-topic", + broker_list="%BrokerList%", + consumer_group="$Default", +) +def kafka_trigger(record: kafka.KafkaRecord): + logging.info( + "Python Kafka trigger processed a record on topic %s " + "partition %d offset %d", + record.topic, + record.partition, + record.offset, + ) + + if record.key: + logging.info("Key: %s", record.key.decode("utf-8")) + if record.value: + logging.info("Value: %s", record.value.decode("utf-8")) + + logging.info( + "Timestamp: %s (type=%s)", + record.timestamp.datetime.isoformat(), + record.timestamp.type.name, + ) + + if record.leader_epoch is not None: + logging.info("Leader Epoch: %d", record.leader_epoch) + + for header in record.headers: + value_str = header.get_value_as_string() or "(null)" + logging.info("Header: %s = %s", header.key, value_str) + + +@app.kafka_trigger( + arg_name="records", + topic="my-topic", + broker_list="%BrokerList%", + consumer_group="$Default", + cardinality=func.Cardinality.MANY, +) +def kafka_batch_trigger(records: List[kafka.KafkaRecord]): + for record in records: + logging.info( + "Batch: %s:%d:%d value=%s", + record.topic, + record.partition, + record.offset, + record.value.decode("utf-8") if record.value else "(null)", + ) diff --git a/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/host.json b/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/host.json new file mode 100644 index 00000000..06d01bda --- /dev/null +++ b/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/host.json @@ -0,0 +1,15 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + } + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +} diff --git a/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/local.settings.json b/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/local.settings.json new file mode 100644 index 00000000..f170db9a --- /dev/null +++ b/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/local.settings.json @@ -0,0 +1,8 @@ +{ + "IsEncrypted": false, + "Values": { + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "FUNCTIONS_WORKER_RUNTIME": "python", + "BrokerList": ":9092" + } +} diff --git a/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/requirements.txt b/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/requirements.txt new file mode 100644 index 00000000..891008e3 --- /dev/null +++ b/azurefunctions-extensions-bindings-kafka/samples/kafka_samples_kafkarecord/requirements.txt @@ -0,0 +1,6 @@ +# DO NOT include azure-functions-worker in this file +# The Python Worker is managed by Azure Functions platform +# Manually managing azure-functions-worker may cause unexpected issues + +azure-functions +azurefunctions-extensions-bindings-kafka diff --git a/azurefunctions-extensions-bindings-kafka/tests/__init__.py b/azurefunctions-extensions-bindings-kafka/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/azurefunctions-extensions-bindings-kafka/tests/test_kafka_record.py b/azurefunctions-extensions-bindings-kafka/tests/test_kafka_record.py new file mode 100644 index 00000000..c6626ed8 --- /dev/null +++ b/azurefunctions-extensions-bindings-kafka/tests/test_kafka_record.py @@ -0,0 +1,284 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import struct +import unittest +from typing import List, Optional + +from azurefunctions.extensions.base import Datum +from azurefunctions.extensions.bindings.kafka import ( + KafkaRecord, + KafkaRecordConverter, + KafkaHeader, + KafkaTimestamp, + KafkaTimestampType, +) + + +# ---- Protobuf encoding helpers (mirrors host-side KafkaRecordProtobufSerializer) ---- + +def _encode_varint(value: int) -> bytes: + """Encode an unsigned varint.""" + result = bytearray() + while value > 0x7F: + result.append((value & 0x7F) | 0x80) + value >>= 7 + result.append(value & 0x7F) + return bytes(result) + + +def _encode_tag(field_number: int, wire_type: int) -> bytes: + return _encode_varint((field_number << 3) | wire_type) + + +def _encode_string(field_number: int, value: str) -> bytes: + data = value.encode('utf-8') + return _encode_tag(field_number, 2) + _encode_varint(len(data)) + data + + +def _encode_bytes(field_number: int, value: bytes) -> bytes: + return _encode_tag(field_number, 2) + _encode_varint(len(value)) + value + + +def _encode_varint_field(field_number: int, value: int) -> bytes: + return _encode_tag(field_number, 0) + _encode_varint(value) + + +def _encode_message(field_number: int, data: bytes) -> bytes: + return _encode_tag(field_number, 2) + _encode_varint(len(data)) + data + + +def encode_kafka_record_proto( + topic: str = "", + partition: int = 0, + offset: int = 0, + key: Optional[bytes] = None, + value: Optional[bytes] = None, + timestamp_ms: int = 0, + timestamp_type: int = 0, + headers: Optional[List[tuple]] = None, + leader_epoch: Optional[int] = None, +) -> bytes: + """Encode a KafkaRecordProto message matching the host-side serializer output.""" + result = bytearray() + + if topic: + result += _encode_string(1, topic) + if partition: + result += _encode_varint_field(2, partition) + if offset: + result += _encode_varint_field(3, offset) + if key is not None: + result += _encode_bytes(4, key) + if value is not None: + result += _encode_bytes(5, value) + + # Timestamp message (field 6) + ts_data = _encode_varint_field(1, timestamp_ms) + _encode_varint_field(2, timestamp_type) + result += _encode_message(6, ts_data) + + # Headers (field 7, repeated) + if headers: + for h_key, h_value in headers: + h_data = _encode_string(1, h_key) + if h_value is not None: + h_data += _encode_bytes(2, h_value) + result += _encode_message(7, h_data) + + if leader_epoch is not None: + result += _encode_varint_field(8, leader_epoch) + + return bytes(result) + + +# ---- Mock classes (same pattern as EventHub tests) ---- + +class MockMBD: + """Mock ModelBindingData.""" + def __init__(self, version: str, source: str, content_type: str, content: bytes): + self.version = version + self.source = source + self.content_type = content_type + self.content = content + + +class MockCMBD: + """Mock CollectionModelBindingData.""" + def __init__(self, model_binding_data_list: List[MockMBD]): + self.model_binding_data = model_binding_data_list + + +# ---- Tests ---- + +class TestKafkaRecord(unittest.TestCase): + """Tests for KafkaRecord Protobuf deserialization.""" + + def _make_datum_mbd(self, proto_bytes: bytes) -> Datum: + mbd = MockMBD( + version="1.0", + source="AzureKafkaRecord", + content_type="application/x-protobuf", + content=proto_bytes, + ) + return Datum(value=mbd, type="model_binding_data") + + def test_full_record(self): + proto = encode_kafka_record_proto( + topic="my-topic", + partition=3, + offset=12345, + key=b"my-key", + value=b'{"name":"test"}', + timestamp_ms=1700000000000, + timestamp_type=1, + headers=[("trace-id", b"trace-abc")], + leader_epoch=7, + ) + datum = self._make_datum_mbd(proto) + result = KafkaRecordConverter.decode( + data=datum, trigger_metadata=None, pytype=KafkaRecord + ) + + self.assertIsInstance(result, KafkaRecord) + self.assertEqual(result.topic, "my-topic") + self.assertEqual(result.partition, 3) + self.assertEqual(result.offset, 12345) + self.assertEqual(result.key, b"my-key") + self.assertEqual(result.value, b'{"name":"test"}') + self.assertEqual(result.timestamp.unix_timestamp_ms, 1700000000000) + self.assertEqual(result.timestamp.type, KafkaTimestampType.CreateTime) + self.assertEqual(result.leader_epoch, 7) + self.assertEqual(len(result.headers), 1) + self.assertEqual(result.headers[0].key, "trace-id") + self.assertEqual(result.headers[0].get_value_as_string(), "trace-abc") + + def test_null_key_and_value(self): + proto = encode_kafka_record_proto( + topic="test-topic", + partition=0, + offset=100, + timestamp_ms=1700000000000, + timestamp_type=0, + ) + datum = self._make_datum_mbd(proto) + result = KafkaRecordConverter.decode( + data=datum, trigger_metadata=None, pytype=KafkaRecord + ) + + self.assertIsNone(result.key) + self.assertIsNone(result.value) + + def test_no_leader_epoch(self): + proto = encode_kafka_record_proto( + topic="test-topic", + value=b"test", + timestamp_ms=0, + timestamp_type=0, + ) + datum = self._make_datum_mbd(proto) + result = KafkaRecordConverter.decode( + data=datum, trigger_metadata=None, pytype=KafkaRecord + ) + + self.assertIsNone(result.leader_epoch) + + def test_unknown_timestamp_type(self): + proto = encode_kafka_record_proto( + topic="test-topic", + value=b"test", + timestamp_ms=1700000000000, + timestamp_type=99, + ) + datum = self._make_datum_mbd(proto) + result = KafkaRecordConverter.decode( + data=datum, trigger_metadata=None, pytype=KafkaRecord + ) + + self.assertEqual(result.timestamp.type, KafkaTimestampType.NotAvailable) + + def test_multiple_headers(self): + proto = encode_kafka_record_proto( + topic="test-topic", + value=b"test", + timestamp_ms=0, + timestamp_type=0, + headers=[ + ("correlation-id", b"abc-123"), + ("null-value-header", None), + ], + ) + datum = self._make_datum_mbd(proto) + result = KafkaRecordConverter.decode( + data=datum, trigger_metadata=None, pytype=KafkaRecord + ) + + self.assertEqual(len(result.headers), 2) + self.assertEqual(result.headers[0].key, "correlation-id") + self.assertEqual(result.headers[0].get_value_as_string(), "abc-123") + self.assertEqual(result.headers[1].key, "null-value-header") + self.assertIsNone(result.headers[1].value) + + def test_timestamp_datetime(self): + proto = encode_kafka_record_proto( + topic="test-topic", + value=b"test", + timestamp_ms=1700000000000, + timestamp_type=2, + ) + datum = self._make_datum_mbd(proto) + result = KafkaRecordConverter.decode( + data=datum, trigger_metadata=None, pytype=KafkaRecord + ) + + self.assertEqual(result.timestamp.type, KafkaTimestampType.LogAppendTime) + dt = result.timestamp.datetime + self.assertEqual(dt.year, 2023) + self.assertEqual(dt.month, 11) + + def test_batch_collection(self): + proto = encode_kafka_record_proto( + topic="batch-topic", + partition=1, + offset=42, + value=b"msg1", + timestamp_ms=1700000000000, + timestamp_type=1, + ) + mbd = MockMBD("1.0", "AzureKafkaRecord", "application/x-protobuf", proto) + datum = Datum(value=MockCMBD([mbd, mbd]), type="collection_model_binding_data") + + result = KafkaRecordConverter.decode( + data=datum, trigger_metadata=None, pytype=KafkaRecord + ) + + self.assertIsInstance(result, list) + self.assertEqual(len(result), 2) + self.assertEqual(result[0].topic, "batch-topic") + self.assertEqual(result[1].offset, 42) + + def test_none_data(self): + result = KafkaRecordConverter.decode( + data=None, trigger_metadata=None, pytype=KafkaRecord + ) + self.assertIsNone(result) + + def test_invalid_data_type(self): + datum = Datum(value="hello", type="str") + with self.assertRaises(ValueError): + KafkaRecordConverter.decode( + data=datum, trigger_metadata=None, pytype=KafkaRecord + ) + + def test_input_type_annotation(self): + self.assertTrue(KafkaRecordConverter.check_input_type_annotation(KafkaRecord)) + self.assertFalse(KafkaRecordConverter.check_input_type_annotation(str)) + self.assertTrue(KafkaRecordConverter.check_input_type_annotation(List[KafkaRecord])) + self.assertFalse(KafkaRecordConverter.check_input_type_annotation(None)) + + def test_header_get_value_as_string_none(self): + header = KafkaHeader("key", None) + self.assertIsNone(header.get_value_as_string()) + + +if __name__ == "__main__": + unittest.main() diff --git a/eng/templates/jobs/build.yml b/eng/templates/jobs/build.yml index cdf94fb4..608797bf 100644 --- a/eng/templates/jobs/build.yml +++ b/eng/templates/jobs/build.yml @@ -22,6 +22,9 @@ jobs: fastapi_extension: EXTENSION_DIRECTORY: 'azurefunctions-extensions-http-fastapi' EXTENSION_NAME: 'Http' + kafka_extension: + EXTENSION_DIRECTORY: 'azurefunctions-extensions-bindings-kafka' + EXTENSION_NAME: 'Kafka' steps: - task: UsePythonVersion@0 diff --git a/eng/templates/official/jobs/build-artifacts.yml b/eng/templates/official/jobs/build-artifacts.yml index 08778520..ddd86a8f 100644 --- a/eng/templates/official/jobs/build-artifacts.yml +++ b/eng/templates/official/jobs/build-artifacts.yml @@ -22,6 +22,9 @@ jobs: fastapi_extension: EXTENSION_DIRECTORY: 'azurefunctions-extensions-http-fastapi' EXTENSION_NAME: 'FastAPI' + kafka_extension: + EXTENSION_DIRECTORY: 'azurefunctions-extensions-bindings-kafka' + EXTENSION_NAME: 'Kafka' templateContext: outputParentDirectory: $(Build.ArtifactStagingDirectory) outputs: diff --git a/eng/templates/official/jobs/unit-tests.yml b/eng/templates/official/jobs/unit-tests.yml index cb844de1..fe9742a4 100644 --- a/eng/templates/official/jobs/unit-tests.yml +++ b/eng/templates/official/jobs/unit-tests.yml @@ -168,4 +168,27 @@ jobs: displayName: 'Install ServiceBus Dependencies' - bash: | python -m pytest -q --instafail azurefunctions-extensions-bindings-servicebus/tests/ - displayName: "Running ServiceBus $(PYTHON_VERSION) Python Extension Tests" \ No newline at end of file + displayName: "Running ServiceBus $(PYTHON_VERSION) Python Extension Tests" + + - job: "KafkaTests" + displayName: "Kafka Extension Tests" + dependsOn: [] + strategy: + matrix: ${{ parameters.python_versions }} + condition: always() + steps: + - task: UsePythonVersion@0 + inputs: + versionSpec: $(PYTHON_VERSION) + - task: PipAuthenticate@1 + displayName: 'Pip Authenticate' + inputs: + artifactFeeds: ${{ parameters.ArtifactFeed }} + - bash: | + python -m pip install --upgrade pip + cd azurefunctions-extensions-bindings-kafka + python -m pip install -U -e .[dev] + displayName: 'Install Kafka Dependencies' + - bash: | + python -m pytest -q --instafail azurefunctions-extensions-bindings-kafka/tests/ + displayName: "Running Kafka $(PYTHON_VERSION) Python Extension Tests" \ No newline at end of file