-
Notifications
You must be signed in to change notification settings - Fork 945
DGS-23437: Enable client omit Confluent-Identity-Pool-Id #2182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
1728c15
b0940f3
efd2db4
da6ece1
2f45d49
001f1fa
1e86713
2f5b75a
ad173f9
a8d4194
7257b6e
98ff9c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -44,6 +44,7 @@ | |||||||||||||||
| full_jitter, | ||||||||||||||||
| is_retriable, | ||||||||||||||||
| is_success, | ||||||||||||||||
| normalize_identity_pool, | ||||||||||||||||
| ) | ||||||||||||||||
| from confluent_kafka.schema_registry.error import OAuthTokenError, SchemaRegistryError | ||||||||||||||||
|
|
||||||||||||||||
|
|
@@ -97,10 +98,10 @@ def __init__( | |||||||||||||||
| scope: str, | ||||||||||||||||
| token_endpoint: str, | ||||||||||||||||
| logical_cluster: str, | ||||||||||||||||
| identity_pool: str, | ||||||||||||||||
| max_retries: int, | ||||||||||||||||
| retries_wait_ms: int, | ||||||||||||||||
| retries_max_wait_ms: int, | ||||||||||||||||
| identity_pool: Optional[str] = None, | ||||||||||||||||
| ): | ||||||||||||||||
| self.token = None | ||||||||||||||||
| self.logical_cluster = logical_cluster | ||||||||||||||||
|
|
@@ -113,11 +114,13 @@ def __init__( | |||||||||||||||
| self.token_expiry_threshold = 0.8 | ||||||||||||||||
|
|
||||||||||||||||
| async def get_bearer_fields(self) -> dict: | ||||||||||||||||
| return { | ||||||||||||||||
| fields = { | ||||||||||||||||
| 'bearer.auth.token': await self.get_access_token(), | ||||||||||||||||
| 'bearer.auth.logical.cluster': self.logical_cluster, | ||||||||||||||||
| 'bearer.auth.identity.pool.id': self.identity_pool, | ||||||||||||||||
| } | ||||||||||||||||
| if self.identity_pool is not None: | ||||||||||||||||
| fields['bearer.auth.identity.pool.id'] = self.identity_pool | ||||||||||||||||
| return fields | ||||||||||||||||
|
|
||||||||||||||||
| def token_expired(self) -> bool: | ||||||||||||||||
| if self.token is None: | ||||||||||||||||
|
|
@@ -283,20 +286,14 @@ def __init__(self, conf: dict): | |||||||||||||||
| self.auth = None | ||||||||||||||||
|
|
||||||||||||||||
| if self.bearer_auth_credentials_source in {'OAUTHBEARER', 'STATIC_TOKEN'}: | ||||||||||||||||
| headers = ['bearer.auth.logical.cluster', 'bearer.auth.identity.pool.id'] | ||||||||||||||||
| missing_headers = [header for header in headers if header not in conf_copy] | ||||||||||||||||
| if missing_headers: | ||||||||||||||||
| raise ValueError( | ||||||||||||||||
| "Missing required bearer configuration properties: {}".format(", ".join(missing_headers)) | ||||||||||||||||
| ) | ||||||||||||||||
| if 'bearer.auth.logical.cluster' not in conf_copy: | ||||||||||||||||
| raise ValueError("Missing required bearer configuration property: bearer.auth.logical.cluster") | ||||||||||||||||
|
|
||||||||||||||||
| logical_cluster = conf_copy.pop('bearer.auth.logical.cluster') | ||||||||||||||||
| if not isinstance(logical_cluster, str): | ||||||||||||||||
| raise TypeError("logical cluster must be a str, not " + str(type(logical_cluster))) | ||||||||||||||||
|
|
||||||||||||||||
|
||||||||||||||||
| # Note: identity_pool is always provided and validated as a single | |
| # string. For union-of-pools use cases, multiple identity pool | |
| # IDs should be encoded as a comma-separated list within this | |
| # string. No additional parsing or validation of the individual | |
| # comma-separated values is performed here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should logical.cluster be optional as well, since it's only required for CC not but CP? I know we discussed this for the OAuth CPP implementation (https://github.com/confluentinc/libschemaregistry/pull/13/changes) and decided and it should be optional (cc Justin Wang (@Claimundefine)). Wondering if we should fix it in python client as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logical cluster is always required for SDS, maybe I can just include a comment instructing users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think this should be extracted into a separate function in example. Combining this with the existing one might be confusing, since it's currently building two separate SR clients + calling get_subjects() twice in one call