Skip to content

Commit 6923747

Browse files
author
koval
committed
Edit README.md
1 parent 2ddab10 commit 6923747

1 file changed

Lines changed: 110 additions & 39 deletions

File tree

README.md

Lines changed: 110 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
Async Python client for the [Huntflow API](https://api.huntflow.ai/v2/docs). It wraps [httpx](https://www.python-httpx.org/), adds Bearer authentication, optional automatic token refresh, and typed helpers for major resources.
88

9-
**Async-only:** every `request()` and entity method is `async` — call them from `async def` code (`asyncio.run`, FastAPI routes, your own event loop, etc.). There is no synchronous client.
10-
119
## Installation
1210

1311
```bash
@@ -136,49 +134,47 @@ Seed the JSON file once with `access_token` and `refresh_token` from Huntflow be
136134
The package does **not** depend on Redis; install it separately (`pip install "redis>=4.2"` so `redis.asyncio` and async locks behave consistently). Use one async Redis client for both storage and the lock. **Populate the token key** before the first API call (same JSON shape as the file storage).
137135

138136
```python
137+
import asyncio
139138
import json
139+
import time
140+
from typing import Any, Dict, Optional
140141

141142
from redis.asyncio import Redis
143+
from redis.asyncio.lock import Lock
142144
from redis.exceptions import LockError
143145

144146
from huntflow_api_client import HuntflowAPI
145147
from huntflow_api_client.tokens.locker import AbstractLocker
146-
from huntflow_api_client.tokens.proxy import HuntflowTokenProxy
147-
from huntflow_api_client.tokens.storage import AbstractHuntflowTokenStorage
148+
from huntflow_api_client.tokens.proxy import (
149+
AbstractTokenProxy,
150+
convert_refresh_result_to_hf_token,
151+
get_auth_headers,
152+
get_refresh_token_data,
153+
)
148154
from huntflow_api_client.tokens.token import ApiToken
149155

150-
151-
class HuntflowTokenRedisStorage(AbstractHuntflowTokenStorage):
152-
def __init__(self, redis: Redis, key: str = "huntflow:token") -> None:
153-
self._redis = redis
154-
self._key = key
155-
156-
async def get(self) -> ApiToken:
157-
raw = await self._redis.get(self._key)
158-
if raw is None:
159-
msg = (
160-
f"Redis key {self._key!r} is empty. "
161-
"SET JSON with access_token and refresh_token before use."
162-
)
163-
raise KeyError(msg)
164-
return ApiToken.from_dict(json.loads(raw))
165-
166-
async def update(self, token: ApiToken) -> None:
167-
await self._redis.set(self._key, json.dumps(token.dict()))
156+
POLL_INTERVAL = 0.2
168157

169158

170159
class RedisLockLocker(AbstractLocker):
171-
"""Distributed lock compatible with HuntflowTokenProxy (multi-worker)."""
160+
"""Coordinates token refresh across concurrent workers.
161+
162+
One caller acquires the lock and performs refresh; others wait until
163+
the lock is released and then continue with updated token data.
164+
"""
172165

173166
def __init__(self, redis: Redis, name: str = "huntflow:token_refresh") -> None:
174-
self._lock = redis.lock(name, timeout=30.0, blocking_timeout=60.0)
167+
self._lock = Lock(redis, name=name, timeout=30.0, blocking=False)
175168

176169
async def acquire(self) -> bool:
177-
return bool(await self._lock.acquire(blocking=False))
170+
try:
171+
return bool(await self._lock.acquire())
172+
except LockError:
173+
return False
178174

179175
async def wait_for_lock(self) -> None:
180-
async with self._lock:
181-
pass
176+
while await self._lock.locked():
177+
await asyncio.sleep(POLL_INTERVAL)
182178

183179
async def release(self) -> None:
184180
try:
@@ -187,27 +183,102 @@ class RedisLockLocker(AbstractLocker):
187183
return
188184

189185

186+
class RedisTokenAccessor:
187+
"""Layer for token read/update operations.
188+
189+
Keeps Redis calls in one place and exposes lock-related operations
190+
used by the proxy.
191+
"""
192+
193+
def __init__(
194+
self,
195+
redis: Redis,
196+
locker: AbstractLocker,
197+
token_key: str = "huntflow:token",
198+
) -> None:
199+
self._redis = redis
200+
self._locker = locker
201+
self._token_key = token_key
202+
203+
async def get(self, bypass_lock: bool = False) -> Optional[Dict[str, Any]]:
204+
if not bypass_lock:
205+
await self._locker.wait_for_lock()
206+
raw = await self._redis.get(self._token_key)
207+
if not raw:
208+
return None
209+
return json.loads(raw)
210+
211+
async def update(self, token: ApiToken) -> None:
212+
await self._redis.set(self._token_key, json.dumps(token.dict()))
213+
214+
async def lock_for_update(self) -> bool:
215+
return await self._locker.acquire()
216+
217+
async def release_lock(self) -> None:
218+
await self._locker.release()
219+
220+
221+
class RedisTokenProxy(AbstractTokenProxy):
222+
"""`AbstractTokenProxy` implementation over accessor + locker.
223+
224+
Returns auth headers, provides refresh payload, saves refreshed token,
225+
and checks whether another worker has already updated the token.
226+
"""
227+
228+
def __init__(self, accessor: RedisTokenAccessor) -> None:
229+
self._accessor = accessor
230+
self._token: Optional[ApiToken] = None
231+
self._last_read_timestamp: Optional[float] = None
232+
233+
async def get_auth_header(self) -> Dict[str, str]:
234+
data = await self._accessor.get()
235+
if data is None:
236+
raise KeyError("Token not found in Redis. Seed access_token and refresh_token first.")
237+
self._token = ApiToken.from_dict(data)
238+
self._last_read_timestamp = time.time()
239+
return get_auth_headers(self._token)
240+
241+
async def get_refresh_data(self) -> Dict[str, str]:
242+
if self._token is None:
243+
data = await self._accessor.get()
244+
if data is None:
245+
raise KeyError("Token not found in Redis. Seed access_token and refresh_token first.")
246+
self._token = ApiToken.from_dict(data)
247+
return get_refresh_token_data(self._token)
248+
249+
async def update(self, refresh_result: dict) -> None:
250+
assert self._token is not None
251+
self._token = convert_refresh_result_to_hf_token(refresh_result, self._token)
252+
await self._accessor.update(self._token)
253+
254+
async def lock_for_update(self) -> bool:
255+
return await self._accessor.lock_for_update()
256+
257+
async def release_lock(self) -> None:
258+
await self._accessor.release_lock()
259+
260+
async def is_updated(self) -> bool:
261+
if self._last_read_timestamp is None:
262+
return False
263+
current_data = await self._accessor.get(bypass_lock=True)
264+
if current_data is None:
265+
return False
266+
current = ApiToken.from_dict(current_data)
267+
last_refresh_timestamp = current.last_refresh_timestamp or 0.0
268+
return last_refresh_timestamp > self._last_read_timestamp
269+
270+
190271
def build_api(redis: Redis) -> HuntflowAPI:
191-
storage = HuntflowTokenRedisStorage(redis, key="huntflow:token")
192272
locker = RedisLockLocker(redis, name="huntflow:token_refresh")
193-
token_proxy = HuntflowTokenProxy(storage, locker=locker)
273+
accessor = RedisTokenAccessor(redis, locker=locker, token_key="huntflow:token")
274+
token_proxy = RedisTokenProxy(accessor)
194275
return HuntflowAPI(
195276
"https://api.huntflow.ai",
196277
token_proxy=token_proxy,
197278
auto_refresh_tokens=True,
198279
)
199-
200-
201-
# redis = Redis.from_url("redis://localhost:6379/0", decode_responses=True)
202-
# try:
203-
# api = build_api(redis)
204-
# ...
205-
# finally:
206-
# await redis.aclose()
207280
```
208281

209-
Tune lock **`timeout`** / **`blocking_timeout`** for your network and refresh latency. Keep the **`Redis`** instance for the app lifetime and **`await redis.aclose()`** on shutdown. For fully custom behavior (e.g. KMS-wrapped secrets), subclass **`AbstractTokenProxy`** instead of `HuntflowTokenProxy`.
210-
211282
## Raw HTTP access
212283

213284
Every method on entities ultimately uses `HuntflowAPI.request`, which mirrors [`httpx.AsyncClient.request`](https://www.python-httpx.org/api/#asyncclient) (`json`, `params`, `files`, `timeout`, etc.). Entity methods usually serialize typed request models (for example `ApplicantCreateRequest.jsonable_dict(...)`); with `request()` you build the JSON yourself.

0 commit comments

Comments
 (0)