From 78fb50803cb2ff929f06738e53dc9eefa49ac1cb Mon Sep 17 00:00:00 2001 From: SAY-5 Date: Sat, 2 May 2026 15:59:42 -0700 Subject: [PATCH 1/2] fix(servicebus): guard async sender against handler race in _open Capture self._handler into a local before awaiting open_async/client_ready_async so a concurrent coroutine cannot null it out mid-flight, addressing the AttributeError reported in #35618. Signed-off-by: SAY-5 --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 6 ++++++ .../azure/servicebus/aio/_servicebus_sender_async.py | 9 ++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index a00f558792d2..49148e0b0c3b 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -1,5 +1,11 @@ # Release History +## 7.14.4 (Unreleased) + +### Bugs Fixed + +- Fixed a race condition in the async ServiceBusSender where concurrent coroutines could trigger `AttributeError: 'NoneType' object has no attribute 'client_ready_async'` when reusing a sender. ([#35618](https://github.com/Azure/azure-sdk-for-python/issues/35618)) + ## 7.14.3 (2025-11-11) ### Bugs Fixed diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index f706e3b24f8b..399e2e64b72f 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -207,13 +207,16 @@ async def _open(self): await self._handler.close_async() auth = None if self._connection else (await create_authentication(self)) self._create_handler(auth) + # Capture a local reference to the handler to guard against concurrent + # coroutines mutating self._handler across awaits (see issue #35618). + handler = self._handler try: - await self._handler.open_async(connection=self._connection) - while not await self._handler.client_ready_async(): + await handler.open_async(connection=self._connection) + while not await handler.client_ready_async(): await asyncio.sleep(0.05) self._running = True self._max_message_size_on_link = ( - self._amqp_transport.get_remote_max_message_size(self._handler) or MAX_MESSAGE_LENGTH_BYTES + self._amqp_transport.get_remote_max_message_size(handler) or MAX_MESSAGE_LENGTH_BYTES ) if self._max_message_size_on_link >= MAX_BATCH_SIZE_PREMIUM: self._max_batch_size_on_link = MAX_BATCH_SIZE_PREMIUM From f568e447b3c5df8b63f05b452f459c68062884b1 Mon Sep 17 00:00:00 2001 From: SAY-5 Date: Sat, 2 May 2026 19:22:16 -0700 Subject: [PATCH 2/2] address review: serialize _open() with asyncio.Lock Capture critical section in a per-sender lock so concurrent coroutines cannot race on creating and closing self._handler. Re-checks _running inside the lock so the loser of the race no-ops cleanly. Signed-off-by: SAY-5 --- .../aio/_servicebus_sender_async.py | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 399e2e64b72f..5446c62d3a95 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -146,6 +146,10 @@ def __init__( self._create_attribute(**kwargs) self._connection = kwargs.get("connection") self._handler: Union["pyamqp_SendClientAsync", "uamqp_SendClientAsync"] + # Serializes _open() so concurrent callers cannot race on creating + # and closing self._handler (see issue #35618). Initialized lazily + # because the constructor may execute outside a running event loop. + self._open_lock: Optional[asyncio.Lock] = None async def __aenter__(self) -> "ServiceBusSender": if self._shutdown.is_set(): @@ -203,28 +207,31 @@ def _create_handler(self, auth: Union["uamqp_JWTTokenAuthAsync", "pyamqp_JWTToke async def _open(self): if self._running: return - if self._handler: - await self._handler.close_async() - auth = None if self._connection else (await create_authentication(self)) - self._create_handler(auth) - # Capture a local reference to the handler to guard against concurrent - # coroutines mutating self._handler across awaits (see issue #35618). - handler = self._handler - try: - await handler.open_async(connection=self._connection) - while not await handler.client_ready_async(): - await asyncio.sleep(0.05) - self._running = True - self._max_message_size_on_link = ( - self._amqp_transport.get_remote_max_message_size(handler) or MAX_MESSAGE_LENGTH_BYTES - ) - if self._max_message_size_on_link >= MAX_BATCH_SIZE_PREMIUM: - self._max_batch_size_on_link = MAX_BATCH_SIZE_PREMIUM - else: - self._max_batch_size_on_link = MAX_BATCH_SIZE_STANDARD - except: - await self._close_handler() - raise + if self._open_lock is None: + self._open_lock = asyncio.Lock() + async with self._open_lock: + if self._running: + return + if self._handler: + await self._handler.close_async() + auth = None if self._connection else (await create_authentication(self)) + self._create_handler(auth) + handler = self._handler + try: + await handler.open_async(connection=self._connection) + while not await handler.client_ready_async(): + await asyncio.sleep(0.05) + self._running = True + self._max_message_size_on_link = ( + self._amqp_transport.get_remote_max_message_size(handler) or MAX_MESSAGE_LENGTH_BYTES + ) + if self._max_message_size_on_link >= MAX_BATCH_SIZE_PREMIUM: + self._max_batch_size_on_link = MAX_BATCH_SIZE_PREMIUM + else: + self._max_batch_size_on_link = MAX_BATCH_SIZE_STANDARD + except: + await self._close_handler() + raise async def _send( self,