From 972cb2826c026e232804d09d2d61a995a25c7c45 Mon Sep 17 00:00:00 2001 From: Pranava Vedagnya Gaddam Date: Fri, 14 Nov 2025 20:16:41 +0530 Subject: [PATCH 1/2] add kafka configs and tests --- azure/functions/decorators/function_app.py | 45 ++++++++++++- azure/functions/decorators/kafka.py | 73 +++++++++++++++------- tests/decorators/test_kafka.py | 70 ++++++++++++++++++++- 3 files changed, 165 insertions(+), 23 deletions(-) diff --git a/azure/functions/decorators/function_app.py b/azure/functions/decorators/function_app.py index 16482aa3..c0a19d9a 100644 --- a/azure/functions/decorators/function_app.py +++ b/azure/functions/decorators/function_app.py @@ -30,7 +30,8 @@ from azure.functions.decorators.http import HttpTrigger, HttpOutput, \ HttpMethod from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \ - BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod + BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod, \ + KafkaMessageKeyType from azure.functions.decorators.queue import QueueTrigger, QueueOutput from azure.functions.decorators.servicebus import ServiceBusQueueTrigger, \ ServiceBusQueueOutput, ServiceBusTopicTrigger, \ @@ -1244,12 +1245,18 @@ def kafka_trigger(self, event_hub_connection_string: Optional[str] = None, consumer_group: Optional[str] = None, avro_schema: Optional[str] = None, + key_avro_schema: Optional[str] = None, + key_data_type: Optional[Union[KafkaMessageKeyType, str]] = None, username: Optional[str] = None, password: Optional[str] = None, ssl_key_location: Optional[str] = None, ssl_ca_location: Optional[str] = None, ssl_certificate_location: Optional[str] = None, ssl_key_password: Optional[str] = None, + ssl_certificate_pem: Optional[str] = None, + ssl_key_pem: Optional[str] = None, + ssl_ca_pem: Optional[str] = None, + ssl_certificate_and_key_pem: Optional[str] = None, schema_registry_url: Optional[str] = None, schema_registry_username: Optional[str] = None, schema_registry_password: Optional[str] = None, @@ -1287,6 +1294,10 @@ def kafka_trigger(self, Azure Event Hubs). :param consumer_group: Kafka consumer group used by the trigger. :param avro_schema: Used only if a generic Avro record should be generated. + :param key_avro_schema: Avro schema for the message key. Used only if a + generic Avro record should be generated for the key. + :param key_data_type: Data type of the message key. Valid values: Int, Long, + String, Binary. Default is String. Ignored if key_avro_schema is set. :param username: SASL username for use with the PLAIN or SASL-SCRAM mechanisms. Equivalent to 'sasl.username' in librdkafka. Default is empty string. :param password: SASL password for use with the PLAIN or SASL-SCRAM mechanisms. @@ -1338,12 +1349,19 @@ def decorator(): event_hub_connection_string=event_hub_connection_string, # noqa: E501 consumer_group=consumer_group, avro_schema=avro_schema, + key_avro_schema=key_avro_schema, + key_data_type=parse_singular_param_to_enum( + key_data_type, KafkaMessageKeyType), username=username, password=password, ssl_key_location=ssl_key_location, ssl_ca_location=ssl_ca_location, ssl_certificate_location=ssl_certificate_location, ssl_key_password=ssl_key_password, + ssl_certificate_pem=ssl_certificate_pem, + ssl_key_pem=ssl_key_pem, + ssl_ca_pem=ssl_ca_pem, + ssl_certificate_and_key_pem=ssl_certificate_and_key_pem, schema_registry_url=schema_registry_url, schema_registry_username=schema_registry_username, schema_registry_password=schema_registry_password, @@ -2588,12 +2606,18 @@ def kafka_output(self, topic: str, broker_list: str, avro_schema: Optional[str] = None, + key_avro_schema: Optional[str] = None, + key_data_type: Optional[Union[KafkaMessageKeyType, str]] = None, username: Optional[str] = None, password: Optional[str] = None, ssl_key_location: Optional[str] = None, ssl_ca_location: Optional[str] = None, ssl_certificate_location: Optional[str] = None, ssl_key_password: Optional[str] = None, + ssl_certificate_pem: Optional[str] = None, + ssl_key_pem: Optional[str] = None, + ssl_ca_pem: Optional[str] = None, + ssl_certificate_and_key_pem: Optional[str] = None, schema_registry_url: Optional[str] = None, schema_registry_username: Optional[str] = None, schema_registry_password: Optional[str] = None, @@ -2630,6 +2654,10 @@ def kafka_output(self, :param topic: The Kafka topic to which messages are published. :param broker_list: The list of Kafka brokers to which the producer connects. :param avro_schema: Optional. Avro schema to generate a generic record. + :param key_avro_schema: Avro schema for the message key. Used only if a + generic Avro record should be generated for the key. + :param key_data_type: Data type of the message key. Valid values: Int, Long, + String, Binary. Default is String. Ignored if key_avro_schema is set. :param username: SASL username for use with the PLAIN and SASL-SCRAM mechanisms. Equivalent to `'sasl.username'` in librdkafka. :param password: SASL password for use with the PLAIN and SASL-SCRAM @@ -2642,6 +2670,14 @@ def kafka_output(self, Equivalent to `'ssl.certificate.location'` in librdkafka. :param ssl_key_password: Password for the client's SSL key. Equivalent to `'ssl.key.password'` in librdkafka. + :param ssl_certificate_pem: Client certificate in PEM format. + Equivalent to 'ssl.certificate.pem' in librdkafka. + :param ssl_key_pem: Client private key in PEM format. + Equivalent to 'ssl.key.pem' in librdkafka. + :param ssl_ca_pem: CA certificate for verifying the broker's certificate in PEM format. + Equivalent to 'ssl.ca.pem' in librdkafka. + :param ssl_certificate_and_key_pem: Client certificate and key in PEM format. + Additional configuration for KeyVault support (certificate with private key). :param schema_registry_url: URL of the Avro Schema Registry. :param schema_registry_username: Username for accessing the Schema Registry. :param schema_registry_password: Password for accessing the Schema Registry. @@ -2695,12 +2731,19 @@ def decorator(): topic=topic, broker_list=broker_list, avro_schema=avro_schema, + key_avro_schema=key_avro_schema, + key_data_type=parse_singular_param_to_enum( + key_data_type, KafkaMessageKeyType), username=username, password=password, ssl_key_location=ssl_key_location, ssl_ca_location=ssl_ca_location, ssl_certificate_location=ssl_certificate_location, ssl_key_password=ssl_key_password, + ssl_certificate_pem=ssl_certificate_pem, + ssl_key_pem=ssl_key_pem, + ssl_ca_pem=ssl_ca_pem, + ssl_certificate_and_key_pem=ssl_certificate_and_key_pem, schema_registry_url=schema_registry_url, schema_registry_username=schema_registry_username, schema_registry_password=schema_registry_password, diff --git a/azure/functions/decorators/kafka.py b/azure/functions/decorators/kafka.py index 3e726f88..c22007f5 100644 --- a/azure/functions/decorators/kafka.py +++ b/azure/functions/decorators/kafka.py @@ -29,6 +29,13 @@ class OAuthBearerMethod(StringifyEnum): OIDC = 1 +class KafkaMessageKeyType(StringifyEnum): + INT = 0 + LONG = 1 + STRING = 2 + BINARY = 3 + + class KafkaOutput(OutputBinding): @staticmethod def get_binding_name() -> str: @@ -39,15 +46,21 @@ def __init__(self, topic: str, broker_list: str, avro_schema: Optional[str], - username: Optional[str], - password: Optional[str], - ssl_key_location: Optional[str], - ssl_ca_location: Optional[str], - ssl_certificate_location: Optional[str], - ssl_key_password: Optional[str], - schema_registry_url: Optional[str], - schema_registry_username: Optional[str], - schema_registry_password: Optional[str], + key_avro_schema: Optional[str] = None, + key_data_type: Optional[KafkaMessageKeyType] = KafkaMessageKeyType.STRING, + username: Optional[str] = None, + password: Optional[str] = None, + ssl_key_location: Optional[str] = None, + ssl_ca_location: Optional[str] = None, + ssl_certificate_location: Optional[str] = None, + ssl_key_password: Optional[str] = None, + ssl_certificate_pem: Optional[str] = None, + ssl_key_pem: Optional[str] = None, + ssl_ca_pem: Optional[str] = None, + ssl_certificate_and_key_pem: Optional[str] = None, + schema_registry_url: Optional[str] = None, + schema_registry_username: Optional[str] = None, + schema_registry_password: Optional[str] = None, o_auth_bearer_method: Optional[OAuthBearerMethod] = None, o_auth_bearer_client_id: Optional[str] = None, o_auth_bearer_client_secret: Optional[str] = None, @@ -68,12 +81,18 @@ def __init__(self, self.topic = topic self.broker_list = broker_list self.avro_schema = avro_schema + self.key_avro_schema = key_avro_schema + self.key_data_type = key_data_type self.username = username self.password = password self.ssl_key_location = ssl_key_location self.ssl_ca_location = ssl_ca_location self.ssl_certificate_location = ssl_certificate_location self.ssl_key_password = ssl_key_password + self.ssl_certificate_pem = ssl_certificate_pem + self.ssl_key_pem = ssl_key_pem + self.ssl_ca_pem = ssl_ca_pem + self.ssl_certificate_and_key_pem = ssl_certificate_and_key_pem self.schema_registry_url = schema_registry_url self.schema_registry_username = schema_registry_username self.schema_registry_password = schema_registry_password @@ -104,18 +123,24 @@ def __init__(self, name: str, topic: str, broker_list: str, - event_hub_connection_string: Optional[str], - consumer_group: Optional[str], - avro_schema: Optional[str], - username: Optional[str], - password: Optional[str], - ssl_key_location: Optional[str], - ssl_ca_location: Optional[str], - ssl_certificate_location: Optional[str], - ssl_key_password: Optional[str], - schema_registry_url: Optional[str], - schema_registry_username: Optional[str], - schema_registry_password: Optional[str], + event_hub_connection_string: Optional[str] = None, + consumer_group: Optional[str] = None, + avro_schema: Optional[str] = None, + key_avro_schema: Optional[str] = None, + key_data_type: Optional[KafkaMessageKeyType] = KafkaMessageKeyType.STRING, + username: Optional[str] = None, + password: Optional[str] = None, + ssl_key_location: Optional[str] = None, + ssl_ca_location: Optional[str] = None, + ssl_certificate_location: Optional[str] = None, + ssl_key_password: Optional[str] = None, + ssl_certificate_pem: Optional[str] = None, + ssl_key_pem: Optional[str] = None, + ssl_ca_pem: Optional[str] = None, + ssl_certificate_and_key_pem: Optional[str] = None, + schema_registry_url: Optional[str] = None, + schema_registry_username: Optional[str] = None, + schema_registry_password: Optional[str] = None, o_auth_bearer_method: Optional[OAuthBearerMethod] = None, o_auth_bearer_client_id: Optional[str] = None, o_auth_bearer_client_secret: Optional[str] = None, @@ -133,12 +158,18 @@ def __init__(self, self.event_hub_connection_string = event_hub_connection_string self.consumer_group = consumer_group self.avro_schema = avro_schema + self.key_avro_schema = key_avro_schema + self.key_data_type = key_data_type self.username = username self.password = password self.ssl_key_location = ssl_key_location self.ssl_ca_location = ssl_ca_location self.ssl_certificate_location = ssl_certificate_location self.ssl_key_password = ssl_key_password + self.ssl_certificate_pem = ssl_certificate_pem + self.ssl_key_pem = ssl_key_pem + self.ssl_ca_pem = ssl_ca_pem + self.ssl_certificate_and_key_pem = ssl_certificate_and_key_pem self.schema_registry_url = schema_registry_url self.schema_registry_username = schema_registry_username self.schema_registry_password = schema_registry_password diff --git a/tests/decorators/test_kafka.py b/tests/decorators/test_kafka.py index 6f0257e8..84eecd6b 100644 --- a/tests/decorators/test_kafka.py +++ b/tests/decorators/test_kafka.py @@ -6,7 +6,7 @@ from azure.functions.decorators.core import BindingDirection, Cardinality, \ DataType from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \ - BrokerAuthenticationMode, BrokerProtocol + BrokerAuthenticationMode, BrokerProtocol, KafkaMessageKeyType class TestKafka(unittest.TestCase): @@ -102,3 +102,71 @@ def test_kafka_output_valid_creation(self): 'topic': 'topic', 'type': KAFKA, 'username': 'username'}) + + def test_kafka_trigger_with_key_data_type_and_pem(self): + trigger = KafkaTrigger(name="arg_name", + topic="topic", + broker_list="broker_list", + key_avro_schema="key_avro_schema", + key_data_type=KafkaMessageKeyType.LONG, + ssl_certificate_pem="cert_pem", + ssl_key_pem="key_pem", + ssl_ca_pem="ca_pem", + ssl_certificate_and_key_pem="cert_and_key_pem", + data_type=DataType.UNDEFINED) + + self.assertEqual(trigger.get_binding_name(), "kafkaTrigger") + dict_repr = trigger.get_dict_repr() + self.assertEqual(dict_repr["keyAvroSchema"], "key_avro_schema") + self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.LONG) + self.assertEqual(dict_repr["sslCertificatePem"], "cert_pem") + self.assertEqual(dict_repr["sslKeyPem"], "key_pem") + self.assertEqual(dict_repr["sslCaPem"], "ca_pem") + self.assertEqual(dict_repr["sslCertificateAndKeyPem"], "cert_and_key_pem") + + def test_kafka_output_with_key_data_type_and_pem(self): + output = KafkaOutput(name="arg_name", + topic="topic", + broker_list="broker_list", + key_avro_schema="key_avro_schema", + key_data_type=KafkaMessageKeyType.BINARY, + ssl_certificate_pem="cert_pem", + ssl_key_pem="key_pem", + ssl_ca_pem="ca_pem", + ssl_certificate_and_key_pem="cert_and_key_pem", + data_type=DataType.UNDEFINED) + + self.assertEqual(output.get_binding_name(), "kafka") + dict_repr = output.get_dict_repr() + self.assertEqual(dict_repr["keyAvroSchema"], "key_avro_schema") + self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.BINARY) + self.assertEqual(dict_repr["sslCertificatePem"], "cert_pem") + self.assertEqual(dict_repr["sslKeyPem"], "key_pem") + self.assertEqual(dict_repr["sslCaPem"], "ca_pem") + self.assertEqual(dict_repr["sslCertificateAndKeyPem"], "cert_and_key_pem") + + def test_kafka_message_key_type_enum(self): + """Test that KafkaMessageKeyType enum has the correct values""" + self.assertEqual(KafkaMessageKeyType.INT, 0) + self.assertEqual(KafkaMessageKeyType.LONG, 1) + self.assertEqual(KafkaMessageKeyType.STRING, 2) + self.assertEqual(KafkaMessageKeyType.BINARY, 3) + + def test_kafka_trigger_key_data_type_default(self): + """Test that key_data_type defaults to STRING""" + trigger = KafkaTrigger(name="arg_name", + topic="topic", + broker_list="broker_list") + + dict_repr = trigger.get_dict_repr() + self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.STRING) + + def test_kafka_output_key_data_type_default(self): + """Test that key_data_type defaults to STRING""" + output = KafkaOutput(name="arg_name", + topic="topic", + broker_list="broker_list", + avro_schema="schema") + + dict_repr = output.get_dict_repr() + self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.STRING) From b0edc4437e1977c56c8507fb31bb04b094591bd6 Mon Sep 17 00:00:00 2001 From: Pranava <68387945+aloiva@users.noreply.github.com> Date: Tue, 2 Dec 2025 20:53:10 +0530 Subject: [PATCH 2/2] change defaults --- azure/functions/decorators/function_app.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/azure/functions/decorators/function_app.py b/azure/functions/decorators/function_app.py index c0a19d9a..c9b77bb2 100644 --- a/azure/functions/decorators/function_app.py +++ b/azure/functions/decorators/function_app.py @@ -1246,7 +1246,7 @@ def kafka_trigger(self, consumer_group: Optional[str] = None, avro_schema: Optional[str] = None, key_avro_schema: Optional[str] = None, - key_data_type: Optional[Union[KafkaMessageKeyType, str]] = None, + key_data_type: Optional[Union[KafkaMessageKeyType, str]] = KafkaMessageKeyType.STRING, username: Optional[str] = None, password: Optional[str] = None, ssl_key_location: Optional[str] = None, @@ -2607,7 +2607,7 @@ def kafka_output(self, broker_list: str, avro_schema: Optional[str] = None, key_avro_schema: Optional[str] = None, - key_data_type: Optional[Union[KafkaMessageKeyType, str]] = None, + key_data_type: Optional[Union[KafkaMessageKeyType, str]] = KafkaMessageKeyType.STRING, username: Optional[str] = None, password: Optional[str] = None, ssl_key_location: Optional[str] = None,