Skip to content

Commit 00af42d

Browse files
committed
More type linting
1 parent be2d143 commit 00af42d

6 files changed

Lines changed: 23 additions & 25 deletions

File tree

src/workflows/services/sample_consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class SampleConsumer(CommonService):
1919

2020
def initializing(self):
2121
"""Subscribe to a channel."""
22-
self._transport.subscribe("transient.destination", self.consume_message)
22+
self.transport.subscribe("transient.destination", self.consume_message)
2323

2424
def consume_message(self, header, message):
2525
"""Consume a message"""

src/workflows/services/sample_pipethrough.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class SamplePipethrough(CommonService):
2121
def initializing(self):
2222
"""Subscribe to a channel."""
2323
workflows.recipe.wrap_subscribe(
24-
self._transport,
24+
self.transport,
2525
"transient.destination",
2626
self.process,
2727
)

src/workflows/services/sample_producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def create_message(self):
3030
"""Create and send a unique message for this service."""
3131
self.counter += 1
3232
self.log.info("Sending message #%d", self.counter)
33-
self._transport.send(
33+
self.transport.send(
3434
"transient.destination",
3535
"Message #%d\n++++++++Produced @%10.3f ms"
3636
% (self.counter, (time.time() % 1000) * 1000),

src/workflows/services/sample_transaction.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class SampleTxn(CommonService):
1818

1919
def initializing(self):
2020
"""Subscribe to a channel. Received messages must be acknowledged."""
21-
self.subid = self._transport.subscribe(
21+
self.subid = self.transport.subscribe(
2222
"transient.transaction",
2323
self.receive_message,
2424
acknowledgement=True,
@@ -41,29 +41,29 @@ def receive_message(self, header, message):
4141
self.log.info("MsgID: {}".format(header["message-id"]))
4242
assert header["message-id"]
4343

44-
txn = self._transport.transaction_begin()
44+
txn = self.transport.transaction_begin()
4545
self.log.info(f" 1. Txn: {txn}")
4646
if self.crashpoint():
47-
self._transport.transaction_abort(txn)
47+
self.transport.transaction_abort(txn)
4848
self.log.info("--- Abort ---")
4949
return
5050

51-
self._transport.ack(header["message-id"], self.subid, transaction=txn)
51+
self.transport.ack(header["message-id"], self.subid, transaction=txn)
5252
self.log.info(" 2. Ack")
5353
if self.crashpoint():
54-
self._transport.transaction_abort(txn)
54+
self.transport.transaction_abort(txn)
5555
self.log.info("--- Abort ---")
5656
return
5757

58-
self._transport.send("transient.destination", message, transaction=txn)
58+
self.transport.send("transient.destination", message, transaction=txn)
5959
self.log.info(" 3. Send")
6060

6161
if self.crashpoint():
62-
self._transport.transaction_abort(txn)
62+
self.transport.transaction_abort(txn)
6363
self.log.info("--- Abort ---")
6464
return
6565

66-
self._transport.transaction_commit(txn)
66+
self.transport.transaction_commit(txn)
6767
self.log.info(" 4. Commit")
6868
self.log.info("=== Done ===")
6969

@@ -88,7 +88,7 @@ def initializing(self):
8888
def create_message(self):
8989
"""Create and send a unique message for this service."""
9090
self.counter += 1
91-
self._transport.send(
91+
self.transport.send(
9292
"transient.transaction",
9393
"TXMessage #%d\n++++++++Produced@ %f"
9494
% (self.counter, (time.time() % 1000) * 1000),

src/workflows/transport/offline_transport.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class OfflineTransport(CommonTransport):
2828
config: dict[Any, Any] = {}
2929

3030
def __init__(
31-
self, middleware: list[type[middleware.BaseTransportMiddleware]] | None = None
31+
self, middleware: list[middleware.BaseTransportMiddleware] | None = None
3232
):
3333
self._connected = False
3434
super().__init__(middleware=middleware)
@@ -78,10 +78,8 @@ def _subscribe_broadcast(self, sub_id, channel, callback, **kwargs):
7878
f"subscription ID {sub_id}, callback function {callback}, further keywords: {kwargs}",
7979
)
8080

81-
def _unsubscribe(self, subscription, **kwargs):
82-
self._output(
83-
f"Ending subscription #{subscription}", f"further keywords: {kwargs}"
84-
)
81+
def _unsubscribe(self, sub_id: int, **kwargs):
82+
self._output(f"Ending subscription #{sub_id}", f"further keywords: {kwargs}")
8583

8684
def _send(
8785
self, destination, message, headers=None, delay=None, expiration=None, **kwargs

src/workflows/transport/pika_transport.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
from enum import Enum, auto
1616
from typing import Any
1717

18+
import pika.channel
1819
import pika.exceptions
20+
import pika.spec
1921
from bidict import bidict
2022
from pika.adapters.blocking_connection import BlockingChannel
2123

@@ -63,7 +65,7 @@ class PikaTransport(CommonTransport):
6365
config: dict[Any, Any] = {}
6466

6567
def __init__(
66-
self, middleware: list[type[middleware.BaseTransportMiddleware]] | None = None
68+
self, middleware: list[middleware.BaseTransportMiddleware] | None = None
6769
):
6870
self._channel = None
6971
self._conn = None
@@ -242,17 +244,15 @@ def set_parameter(option, opt, value, parser):
242244
)
243245

244246
def _generate_connection_parameters(self) -> list[pika.ConnectionParameters]:
245-
username = self.config.get("--rabbit-user", self.defaults.get("--rabbit-user"))
246-
password = self.config.get("--rabbit-pass", self.defaults.get("--rabbit-pass"))
247+
username = self.config.get("--rabbit-user", self.defaults["--rabbit-user"])
248+
password = self.config.get("--rabbit-pass", self.defaults["--rabbit-pass"])
247249
credentials = pika.PlainCredentials(username, password)
248250

249-
host_string = self.config.get(
250-
"--rabbit-host", self.defaults.get("--rabbit-host")
251-
)
251+
host_string = self.config.get("--rabbit-host", self.defaults["--rabbit-host"])
252252
port_string = str(
253-
self.config.get("--rabbit-port", self.defaults.get("--rabbit-port"))
253+
self.config.get("--rabbit-port", self.defaults["--rabbit-port"])
254254
)
255-
vhost = self.config.get("--rabbit-vhost", self.defaults.get("--rabbit-vhost"))
255+
vhost = self.config.get("--rabbit-vhost", self.defaults["--rabbit-vhost"])
256256
if "," in host_string:
257257
host = host_string.split(",")
258258
else:

0 commit comments

Comments
 (0)