Skip to content

Commit 9054f5f

Browse files
committed
More
1 parent 00af42d commit 9054f5f

1 file changed

Lines changed: 9 additions & 5 deletions

File tree

src/workflows/transport/pika_transport.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ def broadcast_status(self, status):
324324
def _call_message_callback(
325325
self,
326326
subscription_id: int,
327-
_channel: pika.channel.Channel,
327+
_channel: BlockingChannel,
328328
method: pika.spec.Basic.Deliver,
329329
properties: pika.spec.BasicProperties,
330330
body: bytes,
@@ -395,7 +395,9 @@ def _subscribe(
395395
try:
396396
return self._pika_thread.subscribe_queue(
397397
queue=channel,
398-
callback=functools.partial(self._call_message_callback, sub_id),
398+
callback=lambda ch, m, p, b: self._call_message_callback(
399+
sub_id, ch, m, p, b
400+
),
399401
auto_ack=not acknowledgement,
400402
subscription_id=sub_id,
401403
reconnectable=reconnectable,
@@ -1067,7 +1069,7 @@ def send(
10671069
exchange: str,
10681070
routing_key: str,
10691071
body: str | bytes,
1070-
properties: pika.spec.BasicProperties = None,
1072+
properties: pika.spec.BasicProperties | None = None,
10711073
mandatory: bool = True,
10721074
transaction_id: int | None = None,
10731075
) -> Future[None]:
@@ -1318,7 +1320,9 @@ def connection_alive(self) -> bool:
13181320
# PikaThread Internal methods
13191321

13201322
def _debug_close_connection(self):
1321-
self._connection.add_callback_threadsafe(lambda: self._connection.close())
1323+
connection = self._connection
1324+
assert connection is not None
1325+
connection.add_callback_threadsafe(lambda: connection.close())
13221326

13231327
def _get_shared_channel(self) -> BlockingChannel:
13241328
"""Get the shared (no prefetch) channel. Create if necessary."""
@@ -1389,7 +1393,7 @@ def _add_subscription(self, subscription_id: int, subscription: _PikaSubscriptio
13891393
def _run(self):
13901394
if self._please_stop.is_set():
13911395
# stop() was called before start()... so quit
1392-
self._state == _PikaThreadStatus.STOPPED
1396+
self._state == _PikaThreadStatus.STOPPED # type: ignore
13931397
return
13941398
assert self._state == _PikaThreadStatus.NEW
13951399
assert self._reconnection_allowed, "Should be true until first synchronize"

0 commit comments

Comments
 (0)