diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index a00f558792d2..49148e0b0c3b 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -1,5 +1,11 @@ # Release History +## 7.14.4 (Unreleased) + +### Bugs Fixed + +- Fixed a race condition in the async ServiceBusSender where concurrent coroutines could trigger `AttributeError: 'NoneType' object has no attribute 'client_ready_async'` when reusing a sender. ([#35618](https://github.com/Azure/azure-sdk-for-python/issues/35618)) + ## 7.14.3 (2025-11-11) ### Bugs Fixed diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index f706e3b24f8b..5446c62d3a95 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -146,6 +146,10 @@ def __init__( self._create_attribute(**kwargs) self._connection = kwargs.get("connection") self._handler: Union["pyamqp_SendClientAsync", "uamqp_SendClientAsync"] + # Serializes _open() so concurrent callers cannot race on creating + # and closing self._handler (see issue #35618). Initialized lazily + # because the constructor may execute outside a running event loop. + self._open_lock: Optional[asyncio.Lock] = None async def __aenter__(self) -> "ServiceBusSender": if self._shutdown.is_set(): @@ -203,25 +207,31 @@ def _create_handler(self, auth: Union["uamqp_JWTTokenAuthAsync", "pyamqp_JWTToke async def _open(self): if self._running: return - if self._handler: - await self._handler.close_async() - auth = None if self._connection else (await create_authentication(self)) - self._create_handler(auth) - try: - await self._handler.open_async(connection=self._connection) - while not await self._handler.client_ready_async(): - await asyncio.sleep(0.05) - self._running = True - self._max_message_size_on_link = ( - self._amqp_transport.get_remote_max_message_size(self._handler) or MAX_MESSAGE_LENGTH_BYTES - ) - if self._max_message_size_on_link >= MAX_BATCH_SIZE_PREMIUM: - self._max_batch_size_on_link = MAX_BATCH_SIZE_PREMIUM - else: - self._max_batch_size_on_link = MAX_BATCH_SIZE_STANDARD - except: - await self._close_handler() - raise + if self._open_lock is None: + self._open_lock = asyncio.Lock() + async with self._open_lock: + if self._running: + return + if self._handler: + await self._handler.close_async() + auth = None if self._connection else (await create_authentication(self)) + self._create_handler(auth) + handler = self._handler + try: + await handler.open_async(connection=self._connection) + while not await handler.client_ready_async(): + await asyncio.sleep(0.05) + self._running = True + self._max_message_size_on_link = ( + self._amqp_transport.get_remote_max_message_size(handler) or MAX_MESSAGE_LENGTH_BYTES + ) + if self._max_message_size_on_link >= MAX_BATCH_SIZE_PREMIUM: + self._max_batch_size_on_link = MAX_BATCH_SIZE_PREMIUM + else: + self._max_batch_size_on_link = MAX_BATCH_SIZE_STANDARD + except: + await self._close_handler() + raise async def _send( self,