From 900ce011488b0abe64da509b2a0ce39003309fe3 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 11 Mar 2026 20:22:41 -0700 Subject: [PATCH 1/3] Add enableAt to RuleSet --- .../schema_registry/_async/serde.py | 19 +++++++++---- .../schema_registry/_sync/serde.py | 28 +++++++++++++++---- .../common/schema_registry_client.py | 9 +++++- .../schema_registry/common/serde.py | 3 ++ 4 files changed, 46 insertions(+), 13 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/serde.py b/src/confluent_kafka/schema_registry/_async/serde.py index 8e1b58983..d60982ef4 100644 --- a/src/confluent_kafka/schema_registry/_async/serde.py +++ b/src/confluent_kafka/schema_registry/_async/serde.py @@ -376,17 +376,21 @@ def _execute_rules_with_phase( ) -> Any: if message is None or target is None: return message + enabled_env: Optional[str] = None rules: Optional[List[Rule]] = None if rule_mode == RuleMode.UPGRADE: if target is not None and target.rule_set is not None: + enabled_env = target.rule_set.enable_at rules = target.rule_set.migration_rules elif rule_mode == RuleMode.DOWNGRADE: if source is not None and source.rule_set is not None: + enabled_env = source.rule_set.enable_at rules = source.rule_set.migration_rules rules = rules[:] if rules else [] rules.reverse() else: if target is not None and target.rule_set is not None: + enabled_env = target.rule_set.enable_at if rule_phase == RulePhase.ENCODING: rules = target.rule_set.encoding_rules else: @@ -401,7 +405,11 @@ def _execute_rules_with_phase( for index in range(len(rules)): rule = rules[index] - if self._is_disabled(rule): + ctx = RuleContext( + enabled_env, ser_ctx, source, target, subject, rule_mode, rule, index, rules, inline_tags, + field_transformer + ) + if self._is_disabled(ctx, rule): continue if rule.mode == RuleMode.WRITEREAD: if rule_mode != RuleMode.READ and rule_mode != RuleMode.WRITE: @@ -411,10 +419,6 @@ def _execute_rules_with_phase( continue elif rule.mode != rule_mode: continue - - ctx = RuleContext( - ser_ctx, source, target, subject, rule_mode, rule, index, rules, inline_tags, field_transformer - ) if rule.type is None: self._run_action( ctx, @@ -476,12 +480,15 @@ def _get_on_failure(self, rule: Rule) -> Optional[str]: return override.on_failure return rule.on_failure - def _is_disabled(self, rule: Rule) -> Optional[bool]: + def _is_disabled(self, ctx: RuleContext, rule: Rule) -> Optional[bool]: if rule.type is None: return rule.disabled override = self._rule_registry.get_override(rule.type) if override is not None and override.disabled is not None: return override.disabled + enabled_env = ctx.enabled_env if ctx.enabled_env is not None else "ALL" + if enabled_env != "ALL" and enabled_env != "CLIENT": + return True return rule.disabled def _run_action( diff --git a/src/confluent_kafka/schema_registry/_sync/serde.py b/src/confluent_kafka/schema_registry/_sync/serde.py index 58b835006..7d7d55a02 100644 --- a/src/confluent_kafka/schema_registry/_sync/serde.py +++ b/src/confluent_kafka/schema_registry/_sync/serde.py @@ -376,17 +376,21 @@ def _execute_rules_with_phase( ) -> Any: if message is None or target is None: return message + enabled_env: Optional[str] = None rules: Optional[List[Rule]] = None if rule_mode == RuleMode.UPGRADE: if target is not None and target.rule_set is not None: + enabled_env = target.rule_set.enable_at rules = target.rule_set.migration_rules elif rule_mode == RuleMode.DOWNGRADE: if source is not None and source.rule_set is not None: + enabled_env = source.rule_set.enable_at rules = source.rule_set.migration_rules rules = rules[:] if rules else [] rules.reverse() else: if target is not None and target.rule_set is not None: + enabled_env = target.rule_set.enable_at if rule_phase == RulePhase.ENCODING: rules = target.rule_set.encoding_rules else: @@ -401,7 +405,20 @@ def _execute_rules_with_phase( for index in range(len(rules)): rule = rules[index] - if self._is_disabled(rule): + ctx = RuleContext( + enabled_env, + ser_ctx, + source, + target, + subject, + rule_mode, + rule, + index, + rules, + inline_tags, + field_transformer, + ) + if self._is_disabled(ctx, rule): continue if rule.mode == RuleMode.WRITEREAD: if rule_mode != RuleMode.READ and rule_mode != RuleMode.WRITE: @@ -411,10 +428,6 @@ def _execute_rules_with_phase( continue elif rule.mode != rule_mode: continue - - ctx = RuleContext( - ser_ctx, source, target, subject, rule_mode, rule, index, rules, inline_tags, field_transformer - ) if rule.type is None: self._run_action( ctx, @@ -476,12 +489,15 @@ def _get_on_failure(self, rule: Rule) -> Optional[str]: return override.on_failure return rule.on_failure - def _is_disabled(self, rule: Rule) -> Optional[bool]: + def _is_disabled(self, ctx: RuleContext, rule: Rule) -> Optional[bool]: if rule.type is None: return rule.disabled override = self._rule_registry.get_override(rule.type) if override is not None and override.disabled is not None: return override.disabled + enabled_env = ctx.enabled_env if ctx.enabled_env is not None else "ALL" + if enabled_env != "ALL" and enabled_env != "CLIENT": + return True return rule.disabled def _run_action( diff --git a/src/confluent_kafka/schema_registry/common/schema_registry_client.py b/src/confluent_kafka/schema_registry/common/schema_registry_client.py index 88819d085..30a34aebd 100644 --- a/src/confluent_kafka/schema_registry/common/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/common/schema_registry_client.py @@ -467,6 +467,7 @@ class RuleSet: migration_rules: Optional[List["Rule"]] = _attrs_field(hash=False) domain_rules: Optional[List["Rule"]] = _attrs_field(hash=False) encoding_rules: Optional[List["Rule"]] = _attrs_field(hash=False, default=None) + enable_at: Optional[str] = _attrs_field(default=None) def to_dict(self) -> Dict[str, Any]: _migration_rules: Optional[List[Dict[str, Any]]] = None @@ -498,6 +499,8 @@ def to_dict(self) -> Dict[str, Any]: field_dict["domainRules"] = _domain_rules if _encoding_rules is not None: field_dict["encodingRules"] = _encoding_rules + if self.enable_at is not None: + field_dict["enableAt"] = self.enable_at return field_dict @@ -522,16 +525,20 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: encoding_rules_item = Rule.from_dict(encoding_rules_item_data) encoding_rules.append(encoding_rules_item) + enable_at = d.pop("enableAt", None) + rule_set = cls( # type: ignore[call-arg] migration_rules=migration_rules, domain_rules=domain_rules, encoding_rules=encoding_rules, + enable_at=enable_at, ) return rule_set def __hash__(self): - return hash(frozenset((self.migration_rules or []) + (self.domain_rules or []) + (self.encoding_rules or []))) + return hash((frozenset((self.migration_rules or []) + (self.domain_rules or []) + + (self.encoding_rules or [])), self.enable_at)) @_attrs_define diff --git a/src/confluent_kafka/schema_registry/common/serde.py b/src/confluent_kafka/schema_registry/common/serde.py index decb6e45c..a6c201671 100644 --- a/src/confluent_kafka/schema_registry/common/serde.py +++ b/src/confluent_kafka/schema_registry/common/serde.py @@ -123,6 +123,7 @@ def type_name(self) -> str: class RuleContext(object): __slots__ = [ + 'enabled_env', 'ser_ctx', 'source', 'target', @@ -138,6 +139,7 @@ class RuleContext(object): def __init__( self, + enabled_env: Optional[str], ser_ctx: SerializationContext, source: Optional[Schema], target: Optional[Schema], @@ -149,6 +151,7 @@ def __init__( inline_tags: Optional[Dict[str, Set[str]]], field_transformer, ): + self.enabled_env = enabled_env self.ser_ctx = ser_ctx self.source = source self.target = target From de21d5c437e2b739838ec7ed407bf1ee699f6bc3 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 16 Mar 2026 10:08:13 -0700 Subject: [PATCH 2/3] Update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a91d3a253..838bd59c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ ### Enhancements - Implement async context manager protocol for AIOProducer and AIOConsumer (#2180) +- Add AssociatedNameStrategy (#2194) +- Add enableAt to RuleSet (#2218) ### Fixes From 35994a4383bf6f3c1ab6aa96cb640945591cbfb8 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 17 Mar 2026 11:54:29 -0700 Subject: [PATCH 3/3] Fix style --- src/confluent_kafka/schema_registry/_async/serde.py | 13 +++++++++++-- .../common/schema_registry_client.py | 8 ++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/serde.py b/src/confluent_kafka/schema_registry/_async/serde.py index d60982ef4..adce5032a 100644 --- a/src/confluent_kafka/schema_registry/_async/serde.py +++ b/src/confluent_kafka/schema_registry/_async/serde.py @@ -406,8 +406,17 @@ def _execute_rules_with_phase( for index in range(len(rules)): rule = rules[index] ctx = RuleContext( - enabled_env, ser_ctx, source, target, subject, rule_mode, rule, index, rules, inline_tags, - field_transformer + enabled_env, + ser_ctx, + source, + target, + subject, + rule_mode, + rule, + index, + rules, + inline_tags, + field_transformer, ) if self._is_disabled(ctx, rule): continue diff --git a/src/confluent_kafka/schema_registry/common/schema_registry_client.py b/src/confluent_kafka/schema_registry/common/schema_registry_client.py index 30a34aebd..2c3ed1151 100644 --- a/src/confluent_kafka/schema_registry/common/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/common/schema_registry_client.py @@ -537,8 +537,12 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: return rule_set def __hash__(self): - return hash((frozenset((self.migration_rules or []) + (self.domain_rules or []) - + (self.encoding_rules or [])), self.enable_at)) + return hash( + ( + frozenset((self.migration_rules or []) + (self.domain_rules or []) + (self.encoding_rules or [])), + self.enable_at, + ) + ) @_attrs_define