Skip to content

Commit a47dd04

Browse files
authored
fix: ZeroMQ broker (#511)
1 parent 87d5394 commit a47dd04

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

taskiq/brokers/zmq_broker.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ async def startup(self) -> None:
6262
self.socket.bind(self.pub_host)
6363
await super().startup()
6464

65+
async def shutdown(self) -> None:
66+
"""
67+
Shutdown for zmq broker.
68+
69+
This function closes actual connections to sockets
70+
"""
71+
if not self.is_worker_process:
72+
self.socket.unbind(self.pub_host)
73+
return await super().shutdown()
74+
6575
async def kick(self, message: BrokerMessage) -> None:
6676
"""
6777
Kicking message.
@@ -78,8 +88,7 @@ async def kick(self, message: BrokerMessage) -> None:
7888
]
7989
for idx in range(math.ceil(len(message.message) / part_len))
8090
]
81-
with self.socket.connect(self.pub_host) as sock:
82-
await sock.send_multipart(parts)
91+
await self.socket.send_multipart(parts)
8392

8493
async def listen(self) -> AsyncGenerator[bytes, None]:
8594
"""

0 commit comments

Comments
 (0)