Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

- Ensure normalize.schemas config is passed during Protobuf ref lookup #2214
- Fix type annotations for context manager hooks so that they are correct for subclasses (#2181)
- Fix OAuth callback handling for Async IO clients to prevent initialization failures (#2219)


## v2.13.2 - 2026-03-02
Expand Down
1 change: 0 additions & 1 deletion src/confluent_kafka/aio/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,4 @@ def wrap_common_callbacks(loop: asyncio.AbstractEventLoop, conf: Dict[str, Any])
wrap_conf_callback(loop, conf, 'error_cb')
wrap_conf_callback(loop, conf, 'throttle_cb')
wrap_conf_callback(loop, conf, 'stats_cb')
wrap_conf_callback(loop, conf, 'oauth_cb')
wrap_conf_logger(loop, conf)
145 changes: 1 addition & 144 deletions tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest

import confluent_kafka
from confluent_kafka import Consumer, KafkaException, Producer
from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient
from tests.common import TestConsumer

Expand Down Expand Up @@ -120,149 +120,6 @@ def test_throttle_event_types():
assert str(throttle_event) == "broker/0 throttled for 10000 ms"


def test_oauth_cb():
"""Tests oauth_cb."""
seen_oauth_cb = False

def oauth_cb(oauth_config):
nonlocal seen_oauth_cb
seen_oauth_cb = True
assert oauth_config == 'oauth_cb'
return 'token', time.time() + 300.0

conf = {
'group.id': 'test',
'security.protocol': 'sasl_plaintext',
'sasl.mechanisms': 'OAUTHBEARER',
'session.timeout.ms': 1000, # Avoid close() blocking too long
'sasl.oauthbearer.config': 'oauth_cb',
'oauth_cb': oauth_cb,
}

kc = TestConsumer(conf)
assert seen_oauth_cb # callback is expected to happen during client init
kc.close()


def test_oauth_cb_principal_sasl_extensions():
"""Tests oauth_cb."""
seen_oauth_cb = False

def oauth_cb(oauth_config):
nonlocal seen_oauth_cb
seen_oauth_cb = True
assert oauth_config == 'oauth_cb'
return 'token', time.time() + 300.0, oauth_config, {"extone": "extoneval", "exttwo": "exttwoval"}

conf = {
'group.id': 'test',
'security.protocol': 'sasl_plaintext',
'sasl.mechanisms': 'OAUTHBEARER',
'session.timeout.ms': 100,
'sasl.oauthbearer.config': 'oauth_cb',
'oauth_cb': oauth_cb,
}

kc = TestConsumer(conf)
assert seen_oauth_cb # callback is expected to happen during client init
kc.close()


def test_oauth_cb_failure():
"""
Tests oauth_cb for a case when it fails to return a token.
We expect the client init to fail
"""

def oauth_cb(oauth_config):
raise Exception

conf = {
'group.id': 'test',
'security.protocol': 'sasl_plaintext',
'sasl.mechanisms': 'OAUTHBEARER',
'session.timeout.ms': 1000,
'sasl.oauthbearer.config': 'oauth_cb',
'oauth_cb': oauth_cb,
}

with pytest.raises(KafkaException):
TestConsumer(conf)


def test_oauth_cb_token_refresh_success():
"""
Tests whether oauth callback gets called multiple times by the background thread
"""
oauth_cb_count = 0

def oauth_cb(oauth_config):
nonlocal oauth_cb_count
oauth_cb_count += 1
assert oauth_config == 'oauth_cb'
return 'token', time.time() + 3 # token is returned with an expiry of 3 seconds

conf = {
'group.id': 'test',
'security.protocol': 'sasl_plaintext',
'sasl.mechanisms': 'OAUTHBEARER',
'session.timeout.ms': 1000,
'sasl.oauthbearer.config': 'oauth_cb',
'oauth_cb': oauth_cb,
}

kc = TestConsumer(conf) # callback is expected to happen during client init
assert oauth_cb_count == 1

# Check every 1 second for up to 5 seconds for callback count to increase
max_wait_sec = 5
elapsed_sec = 0
while oauth_cb_count == 1 and elapsed_sec < max_wait_sec:
time.sleep(1)
elapsed_sec += 1

kc.close()
assert oauth_cb_count > 1


def test_oauth_cb_token_refresh_failure():
"""
Tests whether oauth callback gets called again if token refresh failed in one of the calls after init
"""
oauth_cb_count = 0

def oauth_cb(oauth_config):
nonlocal oauth_cb_count
oauth_cb_count += 1
assert oauth_config == 'oauth_cb'
if oauth_cb_count == 2:
raise Exception
return 'token', time.time() + 3 # token is returned with an expiry of 3 seconds

conf = {
'group.id': 'test',
'security.protocol': 'sasl_plaintext',
'sasl.mechanisms': 'OAUTHBEARER',
'session.timeout.ms': 1000, # Avoid close() blocking too long
'sasl.oauthbearer.config': 'oauth_cb',
'oauth_cb': oauth_cb,
}

kc = TestConsumer(conf) # callback is expected to happen during client init
assert oauth_cb_count == 1

# Check every 1 second for up to 15 seconds for callback count to increase
# Call back failure causes a refresh attempt after 10 secs, so ideally 2 callbacks should happen within 15 secs
max_wait_sec = 15
elapsed_sec = 0
while oauth_cb_count <= 2 and elapsed_sec < max_wait_sec:
time.sleep(1)
elapsed_sec += 1

kc.close()
assert oauth_cb_count > 2


def skip_interceptors():
# Run interceptor test if monitoring-interceptor is found
for path in ["/usr/lib", "/usr/local/lib", "staging/libs", "."]:
Expand Down
Loading