Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions kafka_actions/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,23 @@ files:
description: |
Message value format.

Supported formats:
Built-in formats:
- json: JSON data (strict validation, fails if not valid JSON)
- bson: BSON (Binary JSON) data
- string: Plain UTF-8 text (use for non-JSON text messages)
- protobuf: Protocol Buffers
- avro: Apache Avro
- raw: Pass-through bytes (base64-encoded in output)
- protobuf: Protocol Buffers (value_schema is the base64 FileDescriptorSet)
- avro: Apache Avro (value_schema is the JSON schema)

Plugin formats (require the kafka_deserializers wheel installed
alongside this check; loaded via the
``datadog_kafka_actions.formats`` entry-point group):
- msgpack: MessagePack
- protobuf_msgpack: Protobuf envelope whose bytes fields can
carry msgpack payloads. value_schema is a JSON wrapper of the
form {"schema": "<base64 FileDescriptorSet>",
"msgpack_fields": ["pkg.Msg.field"]}.

Note: If any message fails deserialization, the read_messages action will stop immediately.
Ensure the format matches the actual messages in your topic.
default: json
Expand All @@ -307,14 +318,9 @@ files:
- name: key_format
type: string
description: |
Message key format.

Supported formats:
- string: Plain UTF-8 text (most common for keys)
- json: JSON data (strict validation, fails if not valid JSON)
- bson: BSON (Binary JSON) data
- protobuf: Protocol Buffers
- avro: Apache Avro
Message key format. Same supported set as value_format
(including msgpack / protobuf_msgpack via the
kafka_deserializers plugin).
default: json
example: json
- name: key_schema
Expand All @@ -331,6 +337,22 @@ files:
deserialization. See value_skip_bytes for details. Default 0.
default: 0
example: 1
- name: value_compression
type: string
description: |
Compression codec to apply to the message value BEFORE format
deserialization. Codecs are loaded from registered plugins under
the ``datadog_kafka_actions.compressions`` entry-point group; no
codecs ship in core. Common values when the ``kafka_deserializers``
plugin is installed: ``gzip``, ``zlib``, ``snappy``, ``lz4``,
``lz4_dd_hdr``, ``zstd``. Empty / unset = no decompression.
example: gzip
- name: key_compression
type: string
description: |
Compression codec to apply to the message key BEFORE format
deserialization. See value_compression for details.
example: snappy
- name: consumer_group_id
type: string
description: |
Expand Down
1 change: 1 addition & 0 deletions kafka_actions/changelog.d/23650.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a plugin architecture for message format handlers and payload-compression codecs. Format handlers can be registered via the `datadog_kafka_actions.formats` entry-point group, and compression codecs via `datadog_kafka_actions.compressions`. Built-in formats (json, string, raw, bson, avro, protobuf) are now first-class plugins. New `value_compression` and `key_compression` options on the read_messages action decompress payloads before deserialization. The protobuf decode helpers `get_protobuf_message_class` and `read_protobuf_message_indices` are now public so plugin format handlers can reuse them. No compression codecs ship in core — install a plugin wheel to add them.
8 changes: 7 additions & 1 deletion kafka_actions/datadog_checks/kafka_actions/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ def __init__(self, name, init_config, instances):
if schema_registry_url:
schema_registry = SchemaRegistryClient(self.http, schema_registry_url, self.log, self.instance)

self.deserializer = MessageDeserializer(self.log, schema_registry=schema_registry)
read_messages_config = self.config.read_messages or {}
self.deserializer = MessageDeserializer(
self.log,
schema_registry=schema_registry,
value_compression=read_messages_config.get('value_compression'),
key_compression=read_messages_config.get('key_compression'),
)

self.action_handlers = {
'read_messages': self._action_read_messages,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
"""Compression codec registry for kafka_actions.

Some producers compress message payloads at the application layer (before
handing bytes to the Kafka producer) using a variety of algorithms, separate
from the broker-negotiated ``compression.type`` setting. This module exposes
a pluggable codec interface so consumers can decompress those payloads
before deserialization.

No codecs ship in the core wheel — install a plugin wheel that registers
codecs on the ``datadog_kafka_actions.compressions`` entry-point group, or
register them directly via :func:`register_codec` in tests.
"""

from .base import CompressionCodec
from .registry import get_codec, list_codecs, register_codec

__all__ = ['CompressionCodec', 'get_codec', 'list_codecs', 'register_codec']
19 changes: 19 additions & 0 deletions kafka_actions/datadog_checks/kafka_actions/compression/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
"""Base class for app-level payload compression codecs."""

from __future__ import annotations

from abc import ABC, abstractmethod


class CompressionCodec(ABC):
"""Plug-in interface for app-level payload decompression."""

name: str = ''

@abstractmethod
def decompress(self, data: bytes) -> bytes:
"""Return the uncompressed payload bytes."""
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
"""Lazy registry of compression codecs."""

from __future__ import annotations

import logging
from importlib.metadata import entry_points
from threading import Lock

from .base import CompressionCodec

_LOG = logging.getLogger(__name__)
_ENTRY_POINT_GROUP = 'datadog_kafka_actions.compressions'

_lock = Lock()
_codecs: dict[str, CompressionCodec] = {}
_loaded = False


def register_codec(codec: CompressionCodec) -> None:
if not codec.name:
raise ValueError(f"CompressionCodec {type(codec).__name__} has no name set")
with _lock:
_codecs[codec.name] = codec


def _load_entry_points() -> None:
global _loaded
if _loaded:
return
with _lock:
if _loaded:
return
try:
eps = entry_points(group=_ENTRY_POINT_GROUP)
except TypeError: # pragma: no cover
eps = entry_points().get(_ENTRY_POINT_GROUP, [])
for ep in eps:
if ep.name in _codecs:
continue
try:
cls = ep.load()
instance = cls() if isinstance(cls, type) else cls
if not isinstance(instance, CompressionCodec):
_LOG.warning("Entry point %s did not produce a CompressionCodec", ep.name)
continue
if not instance.name:
instance.name = ep.name
_codecs[instance.name] = instance
except Exception as e:
_LOG.warning("Failed to load compression codec '%s': %s", ep.name, e)
_loaded = True


def get_codec(name: str) -> CompressionCodec | None:
_load_entry_points()
return _codecs.get(name)


def list_codecs() -> list[str]:
_load_entry_points()
return sorted(_codecs)
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,6 @@
from . import defaults, validators


SECURE_FIELD_NAMES = frozenset(
[
'schema_registry_tls_ca_cert',
'schema_registry_tls_cert',
'schema_registry_tls_key',
'tls_ca_cert',
'tls_cert',
'tls_private_key',
]
)


class CreateTopic(BaseModel):
model_config = ConfigDict(
arbitrary_types_allowed=True,
Expand Down Expand Up @@ -118,9 +106,14 @@ class ReadMessages(BaseModel):
description='jq-style expression to filter messages (optional).\nFiltering happens AFTER deserialization.\nExamples: \'.value.price > 100\', \'.value.user.country == "US"\'\n',
examples=['.value.status == "failed"'],
)
key_compression: Optional[str] = Field(
None,
description='Compression codec to apply to the message key BEFORE format\ndeserialization. See value_compression for details.\n',
examples=['snappy'],
)
key_format: Optional[str] = Field(
'json',
description='Message key format.\n\nSupported formats:\n- string: Plain UTF-8 text (most common for keys)\n- json: JSON data (strict validation, fails if not valid JSON)\n- bson: BSON (Binary JSON) data\n- protobuf: Protocol Buffers\n- avro: Apache Avro\n',
description='Message key format. Same supported set as value_format\n(including msgpack / protobuf_msgpack via the\nkafka_deserializers plugin).\n',
examples=['json'],
)
key_schema: Optional[str] = Field(None, description='Schema definition for protobuf/avro key')
Expand Down Expand Up @@ -154,9 +147,14 @@ class ReadMessages(BaseModel):
examples=[1700000000000],
)
topic: str = Field(..., description='Topic to read messages from', examples=['orders'])
value_compression: Optional[str] = Field(
None,
description='Compression codec to apply to the message value BEFORE format\ndeserialization. Codecs are loaded from registered plugins under\nthe ``datadog_kafka_actions.compressions`` entry-point group; no\ncodecs ship in core. Common values when the ``kafka_deserializers``\nplugin is installed: ``gzip``, ``zlib``, ``snappy``, ``lz4``,\n``lz4_dd_hdr``, ``zstd``. Empty / unset = no decompression.\n',
examples=['gzip'],
)
value_format: Optional[str] = Field(
'json',
description='Message value format.\n\nSupported formats:\n- json: JSON data (strict validation, fails if not valid JSON)\n- bson: BSON (Binary JSON) data\n- string: Plain UTF-8 text (use for non-JSON text messages)\n- protobuf: Protocol Buffers\n- avro: Apache Avro\nNote: If any message fails deserialization, the read_messages action will stop immediately.\nEnsure the format matches the actual messages in your topic.\n',
description='Message value format.\n\nBuilt-in formats:\n- json: JSON data (strict validation, fails if not valid JSON)\n- bson: BSON (Binary JSON) data\n- string: Plain UTF-8 text (use for non-JSON text messages)\n- raw: Pass-through bytes (base64-encoded in output)\n- protobuf: Protocol Buffers (value_schema is the base64 FileDescriptorSet)\n- avro: Apache Avro (value_schema is the JSON schema)\n\nPlugin formats (require the kafka_deserializers wheel installed\nalongside this check; loaded via the\n``datadog_kafka_actions.formats`` entry-point group):\n- msgpack: MessagePack\n- protobuf_msgpack: Protobuf envelope whose bytes fields can\n carry msgpack payloads. value_schema is a JSON wrapper of the\n form {"schema": "<base64 FileDescriptorSet>",\n "msgpack_fields": ["pkg.Msg.field"]}.\n\nNote: If any message fails deserialization, the read_messages action will stop immediately.\nEnsure the format matches the actual messages in your topic.\n',
examples=['json'],
)
value_schema: Optional[str] = Field(None, description='Schema definition for protobuf/avro value')
Expand Down Expand Up @@ -301,11 +299,6 @@ def _validate(cls, value, info):
field_name = field.alias or info.field_name
if field_name in info.context['configured_fields']:
value = getattr(validators, f'instance_{info.field_name}', identity)(value, field=field)

if info.field_name in SECURE_FIELD_NAMES:
validation.security.check_field_trusted_provider(
info.field_name, value, info.context.get('security_config')
)
else:
value = getattr(defaults, f'instance_{info.field_name}', lambda: value)()

Expand Down
18 changes: 18 additions & 0 deletions kafka_actions/datadog_checks/kafka_actions/formats/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
"""Format handler registry for kafka_actions.

External wheels can register additional handlers by exposing them on the
``datadog_kafka_actions.formats`` entry-point group:

[project.entry-points."datadog_kafka_actions.formats"]
myformat = "my_pkg.handler:MyHandler"

Handlers must subclass :class:`FormatHandler` from ``base``.
"""

from .base import FormatHandler
from .registry import get_handler, list_handlers, register_handler

__all__ = ['FormatHandler', 'get_handler', 'list_handlers', 'register_handler']
42 changes: 42 additions & 0 deletions kafka_actions/datadog_checks/kafka_actions/formats/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
"""Base class for kafka_actions format handlers."""

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any


class FormatHandler(ABC):
"""Plug-in interface for message-body deserialization.

Subclasses are instantiated once and reused across messages, so they
should be stateless or maintain only thread-safe caches.
"""

name: str = ''

def build_schema(self, schema_str: str) -> Any:
"""Build a schema object from an inline (config-supplied) schema string.

Override for formats that need a parsed schema (e.g. Avro, Protobuf).
Schemaless formats (json, msgpack, raw) can leave the default.
"""
return None

def build_schema_from_registry(self, schema_str: str, dep_schemas: list) -> Any:
"""Build a schema object from registry-supplied bytes.

``dep_schemas`` is a list of ``(name, base64_bytes)`` tuples for
dependencies (e.g. imported .proto files).

Defaults to :meth:`build_schema` for formats that don't distinguish.
"""
return self.build_schema(schema_str)

@abstractmethod
def deserialize(self, message: bytes, schema: Any, *, log, uses_schema_registry: bool) -> str | None:
"""Decode ``message`` and return a JSON string (or None for empty messages)."""
raise NotImplementedError
Loading
Loading