Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 7.14.4 (Unreleased)

### Bugs Fixed

- Fixed a race condition in the async ServiceBusSender where concurrent coroutines could trigger `AttributeError: 'NoneType' object has no attribute 'client_ready_async'` when reusing a sender. ([#35618](https://github.com/Azure/azure-sdk-for-python/issues/35618))

## 7.14.3 (2025-11-11)

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ def __init__(
self._create_attribute(**kwargs)
self._connection = kwargs.get("connection")
self._handler: Union["pyamqp_SendClientAsync", "uamqp_SendClientAsync"]
# Serializes _open() so concurrent callers cannot race on creating
# and closing self._handler (see issue #35618). Initialized lazily
# because the constructor may execute outside a running event loop.
self._open_lock: Optional[asyncio.Lock] = None

async def __aenter__(self) -> "ServiceBusSender":
if self._shutdown.is_set():
Expand Down Expand Up @@ -203,25 +207,31 @@ def _create_handler(self, auth: Union["uamqp_JWTTokenAuthAsync", "pyamqp_JWTToke
async def _open(self):
if self._running:
return
if self._handler:
await self._handler.close_async()
auth = None if self._connection else (await create_authentication(self))
self._create_handler(auth)
try:
await self._handler.open_async(connection=self._connection)
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
self._running = True
self._max_message_size_on_link = (
self._amqp_transport.get_remote_max_message_size(self._handler) or MAX_MESSAGE_LENGTH_BYTES
)
if self._max_message_size_on_link >= MAX_BATCH_SIZE_PREMIUM:
self._max_batch_size_on_link = MAX_BATCH_SIZE_PREMIUM
else:
self._max_batch_size_on_link = MAX_BATCH_SIZE_STANDARD
except:
await self._close_handler()
raise
if self._open_lock is None:
self._open_lock = asyncio.Lock()
async with self._open_lock:
if self._running:
return
if self._handler:
await self._handler.close_async()
auth = None if self._connection else (await create_authentication(self))
self._create_handler(auth)
handler = self._handler
try:
await handler.open_async(connection=self._connection)
while not await handler.client_ready_async():
await asyncio.sleep(0.05)
self._running = True
self._max_message_size_on_link = (
self._amqp_transport.get_remote_max_message_size(handler) or MAX_MESSAGE_LENGTH_BYTES
)
if self._max_message_size_on_link >= MAX_BATCH_SIZE_PREMIUM:
self._max_batch_size_on_link = MAX_BATCH_SIZE_PREMIUM
else:
self._max_batch_size_on_link = MAX_BATCH_SIZE_STANDARD
except:
await self._close_handler()
raise

async def _send(
self,
Expand Down
Loading