diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index a1ca0c0..7f01bcb 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -775,6 +775,16 @@ async def subscribe(self, topic: Union[str, List[str]], schema.attach_client(self._client) return Consumer(await future, schema) + + def shutdown(self) -> None: + """ + Shutdown the client and all the associated producers and consumers + + Raises + ------ + PulsarException + """ + self._client.shutdown() async def close(self) -> None: """ diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 4440809..54d57db 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -164,6 +164,18 @@ async def test_producer_is_connected(self): await producer.close() self.assertFalse(producer.is_connected()) + async def test_shutdown_client(self): + producer = await self._client.create_producer("persistent://public/default/partitioned_topic_name_test") + await producer.send(b"hello") + self._client.shutdown() + + try: + await producer.send(b"hello") + self.assertTrue(False) + except PulsarException: + # Expected + pass + async def _prepare_messages(self, producer: Producer) -> List[pulsar.MessageId]: msg_ids = [] for i in range(5):