Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
### Enhancements

- Implement async context manager protocol for AIOProducer and AIOConsumer (#2180)
- Add AssociatedNameStrategy (#2194)
- Add enableAt to RuleSet (#2218)

### Fixes

Expand Down
28 changes: 22 additions & 6 deletions src/confluent_kafka/schema_registry/_async/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@
ser_ctx, subject, RulePhase.DOMAIN, rule_mode, source, target, message, inline_tags, field_transformer
)

def _execute_rules_with_phase(

Check failure on line 365 in src/confluent_kafka/schema_registry/_async/serde.py

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this function to reduce its Cognitive Complexity from 62 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2218&issues=e5d65047-e9e4-459e-92db-b39d125820c8&open=e5d65047-e9e4-459e-92db-b39d125820c8
self,
ser_ctx: SerializationContext,
subject: str,
Expand All @@ -376,17 +376,21 @@
) -> Any:
if message is None or target is None:
return message
enabled_env: Optional[str] = None
rules: Optional[List[Rule]] = None
if rule_mode == RuleMode.UPGRADE:
if target is not None and target.rule_set is not None:
enabled_env = target.rule_set.enable_at
rules = target.rule_set.migration_rules
elif rule_mode == RuleMode.DOWNGRADE:
if source is not None and source.rule_set is not None:
enabled_env = source.rule_set.enable_at
rules = source.rule_set.migration_rules
rules = rules[:] if rules else []
rules.reverse()
else:
if target is not None and target.rule_set is not None:
enabled_env = target.rule_set.enable_at
if rule_phase == RulePhase.ENCODING:
rules = target.rule_set.encoding_rules
else:
Expand All @@ -401,7 +405,20 @@

for index in range(len(rules)):
rule = rules[index]
if self._is_disabled(rule):
ctx = RuleContext(
enabled_env,
ser_ctx,
source,
target,
subject,
rule_mode,
rule,
index,
rules,
inline_tags,
field_transformer,
)
if self._is_disabled(ctx, rule):
continue
if rule.mode == RuleMode.WRITEREAD:
if rule_mode != RuleMode.READ and rule_mode != RuleMode.WRITE:
Expand All @@ -411,10 +428,6 @@
continue
elif rule.mode != rule_mode:
continue

ctx = RuleContext(
ser_ctx, source, target, subject, rule_mode, rule, index, rules, inline_tags, field_transformer
)
if rule.type is None:
self._run_action(
ctx,
Expand Down Expand Up @@ -476,12 +489,15 @@
return override.on_failure
return rule.on_failure

def _is_disabled(self, rule: Rule) -> Optional[bool]:
def _is_disabled(self, ctx: RuleContext, rule: Rule) -> Optional[bool]:
if rule.type is None:
return rule.disabled
override = self._rule_registry.get_override(rule.type)
if override is not None and override.disabled is not None:
return override.disabled
enabled_env = ctx.enabled_env if ctx.enabled_env is not None else "ALL"
if enabled_env != "ALL" and enabled_env != "CLIENT":
return True
return rule.disabled

def _run_action(
Expand Down
28 changes: 22 additions & 6 deletions src/confluent_kafka/schema_registry/_sync/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@
ser_ctx, subject, RulePhase.DOMAIN, rule_mode, source, target, message, inline_tags, field_transformer
)

def _execute_rules_with_phase(

Check failure on line 365 in src/confluent_kafka/schema_registry/_sync/serde.py

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this function to reduce its Cognitive Complexity from 62 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2218&issues=3047dad3-dd5e-4e7e-ac16-84d2f3928400&open=3047dad3-dd5e-4e7e-ac16-84d2f3928400
self,
ser_ctx: SerializationContext,
subject: str,
Expand All @@ -376,17 +376,21 @@
) -> Any:
if message is None or target is None:
return message
enabled_env: Optional[str] = None
rules: Optional[List[Rule]] = None
if rule_mode == RuleMode.UPGRADE:
if target is not None and target.rule_set is not None:
enabled_env = target.rule_set.enable_at
rules = target.rule_set.migration_rules
elif rule_mode == RuleMode.DOWNGRADE:
if source is not None and source.rule_set is not None:
enabled_env = source.rule_set.enable_at
rules = source.rule_set.migration_rules
rules = rules[:] if rules else []
rules.reverse()
else:
if target is not None and target.rule_set is not None:
enabled_env = target.rule_set.enable_at
if rule_phase == RulePhase.ENCODING:
rules = target.rule_set.encoding_rules
else:
Expand All @@ -401,7 +405,20 @@

for index in range(len(rules)):
rule = rules[index]
if self._is_disabled(rule):
ctx = RuleContext(
enabled_env,
ser_ctx,
source,
target,
subject,
rule_mode,
rule,
index,
rules,
inline_tags,
field_transformer,
)
if self._is_disabled(ctx, rule):
continue
if rule.mode == RuleMode.WRITEREAD:
if rule_mode != RuleMode.READ and rule_mode != RuleMode.WRITE:
Expand All @@ -411,10 +428,6 @@
continue
elif rule.mode != rule_mode:
continue

ctx = RuleContext(
ser_ctx, source, target, subject, rule_mode, rule, index, rules, inline_tags, field_transformer
)
if rule.type is None:
self._run_action(
ctx,
Expand Down Expand Up @@ -476,12 +489,15 @@
return override.on_failure
return rule.on_failure

def _is_disabled(self, rule: Rule) -> Optional[bool]:
def _is_disabled(self, ctx: RuleContext, rule: Rule) -> Optional[bool]:
if rule.type is None:
return rule.disabled
override = self._rule_registry.get_override(rule.type)
if override is not None and override.disabled is not None:
return override.disabled
enabled_env = ctx.enabled_env if ctx.enabled_env is not None else "ALL"
if enabled_env != "ALL" and enabled_env != "CLIENT":
return True
return rule.disabled

def _run_action(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ class RuleSet:
migration_rules: Optional[List["Rule"]] = _attrs_field(hash=False)
domain_rules: Optional[List["Rule"]] = _attrs_field(hash=False)
encoding_rules: Optional[List["Rule"]] = _attrs_field(hash=False, default=None)
enable_at: Optional[str] = _attrs_field(default=None)

def to_dict(self) -> Dict[str, Any]:
_migration_rules: Optional[List[Dict[str, Any]]] = None
Expand Down Expand Up @@ -498,6 +499,8 @@ def to_dict(self) -> Dict[str, Any]:
field_dict["domainRules"] = _domain_rules
if _encoding_rules is not None:
field_dict["encodingRules"] = _encoding_rules
if self.enable_at is not None:
field_dict["enableAt"] = self.enable_at

return field_dict

Expand All @@ -522,16 +525,24 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:
encoding_rules_item = Rule.from_dict(encoding_rules_item_data)
encoding_rules.append(encoding_rules_item)

enable_at = d.pop("enableAt", None)

rule_set = cls( # type: ignore[call-arg]
migration_rules=migration_rules,
domain_rules=domain_rules,
encoding_rules=encoding_rules,
enable_at=enable_at,
)

return rule_set

def __hash__(self):
return hash(frozenset((self.migration_rules or []) + (self.domain_rules or []) + (self.encoding_rules or [])))
return hash(
(
frozenset((self.migration_rules or []) + (self.domain_rules or []) + (self.encoding_rules or [])),
self.enable_at,
)
)


@_attrs_define
Expand Down
3 changes: 3 additions & 0 deletions src/confluent_kafka/schema_registry/common/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def type_name(self) -> str:

class RuleContext(object):
__slots__ = [
'enabled_env',
'ser_ctx',
'source',
'target',
Expand All @@ -138,6 +139,7 @@ class RuleContext(object):

def __init__(
self,
enabled_env: Optional[str],
ser_ctx: SerializationContext,
source: Optional[Schema],
target: Optional[Schema],
Expand All @@ -149,6 +151,7 @@ def __init__(
inline_tags: Optional[Dict[str, Set[str]]],
field_transformer,
):
self.enabled_env = enabled_env
self.ser_ctx = ser_ctx
self.source = source
self.target = target
Expand Down