From 32f6b1cd60c54b4ada3870752625d6915941c4d2 Mon Sep 17 00:00:00 2001 From: Andy Stark Date: Tue, 19 May 2026 13:19:19 +0100 Subject: [PATCH 1/3] DOC-6331 Python async intro page --- content/develop/clients/redis-py/async.md | 171 ++++++++++++++++++ .../client-specific/redis-py/async_intro.py | 152 ++++++++++++++++ 2 files changed, 323 insertions(+) create mode 100644 content/develop/clients/redis-py/async.md create mode 100644 local_examples/client-specific/redis-py/async_intro.py diff --git a/content/develop/clients/redis-py/async.md b/content/develop/clients/redis-py/async.md new file mode 100644 index 0000000000..b862a9d445 --- /dev/null +++ b/content/develop/clients/redis-py/async.md @@ -0,0 +1,171 @@ +--- +categories: +- docs +- develop +- stack +- oss +- rs +- rc +- oss +- kubernetes +- clients +description: Use redis-py with asyncio for non-blocking Redis access +linkTitle: Async operations +title: Asynchronous operations with redis-py +weight: 22 +--- + +`redis-py` provides an asyncio-compatible API under the +[`redis.asyncio`](https://redis.readthedocs.io/en/stable/examples/asyncio_examples.html) +namespace. It mirrors the synchronous client API, so most code patterns +translate directly — you `await` commands instead of calling them. + +Use the async client for I/O-bound workloads, for integration with async web +frameworks (such as [FastAPI](https://fastapi.tiangolo.com/), [Starlette](https://www.starlette.io/), [aiohttp](https://docs.aiohttp.org/en/stable/), or [Sanic](https://sanic.dev/en/), or when you need +to run many concurrent Redis operations from a single process. For simple +scripts, CPU-bound work, or codebases without an existing event loop, the +synchronous client is usually a better choice. + +The examples on the other pages in this section use the synchronous client, +but you can translate any of them to async by following the rules in +[Translating sync examples](#translating-sync-examples) below. + +## Basic connection + +Import the async client from `redis.asyncio` and `await` each command. The +constructor itself is *not* awaitable — the connection is established lazily +the first time you issue a command. Call `aclose()` when you're done to +release the underlying socket. + +{{< clients-example set="async_intro" step="connect" lang_filter="Python" description="Foundational: Connect to Redis with the async client and run a basic SET/GET" difficulty="beginner" >}} +{{< /clients-example >}} + +The recommended pattern is to use the client as an async context manager, +which ensures `aclose()` runs even if an exception is raised: + +{{< clients-example set="async_intro" step="context_manager" lang_filter="Python" description="Foundational: Use the async client as a context manager for automatic cleanup" difficulty="beginner" >}} +{{< /clients-example >}} + +## Connection pools + +For production usage, you should manage connections with a connection pool +rather than opening and closing them individually. +See [Connection pools and multiplexing]({{< relref "/develop/clients/pools-and-muxing" >}}) +for more information about how this works. + +In a long-running async application, create a single +`redis.asyncio.ConnectionPool` at startup and share it across requests by +passing it to each `Redis(connection_pool=...)` instance. Avoid creating a +new `Redis()` per request — it defeats pooling and pays the connection cost +on every call. + +{{< clients-example set="async_intro" step="pool" lang_filter="Python" description="Foundational: Create and share an async connection pool" difficulty="beginner" >}} +{{< /clients-example >}} + +Tune `max_connections` to the maximum number of concurrent Redis operations +you expect from the process. A `BlockingConnectionPool` is also available +if you'd rather block on pool exhaustion than raise an error. + +## Awaiting commands + +Every command method on the async client returns a coroutine — each call +must be `await`ed. Forgetting `await` returns a coroutine object instead of +a result, which is the most common async mistake to watch for. + +Because each command is a coroutine, you can run several concurrently with +`asyncio.gather()`: + +{{< clients-example set="async_intro" step="gather" lang_filter="Python" description="Run several Redis commands concurrently with asyncio.gather" difficulty="intermediate" >}} +{{< /clients-example >}} + +Each `gather()` argument uses its own connection from the pool, so size the +pool to match your peak concurrency. + +## Pipelines and transactions + +Pipelines and transactions work the same way as in the synchronous client +(see [Pipelines and transactions]({{< relref "/develop/clients/redis-py/transpipe" >}}) +for the conceptual background). The only difference is that you create the +pipeline inside an `async with` block and `await pipe.execute()`. + +{{< clients-example set="async_intro" step="pipeline" lang_filter="Python" description="Foundational: Execute commands in an async pipeline" difficulty="beginner" >}} +{{< /clients-example >}} + +`WATCH`/`MULTI`/`EXEC` for optimistic locking also has an async form. +`watch()` and `execute()` are coroutines; `multi()` remains synchronous +because it only toggles internal pipeline state. + +{{< clients-example set="async_intro" step="watch" lang_filter="Python" description="Optimistic locking with an async pipeline and WATCH" difficulty="intermediate" >}} +{{< /clients-example >}} + +## Pub/Sub + +The async pub/sub object follows the same shape as the sync version. Call +`await pubsub.subscribe(...)` to register channels, then iterate messages +with `async for message in pubsub.listen():`. Call `await pubsub.aclose()` +to release the connection. + +{{< clients-example set="async_intro" step="pubsub" lang_filter="Python" description="Subscribe and receive messages with the async pub/sub API" difficulty="intermediate" >}} +{{< /clients-example >}} + +A single `PubSub` object isn't safe to share across tasks — give each +consuming task its own subscription. + +## Cluster connections + +To connect to a Redis cluster asynchronously, import `RedisCluster` from +`redis.asyncio.cluster`. The API matches the synchronous cluster client +(see [Connect to a Redis cluster]({{< relref "/develop/clients/redis-py/connect#connect-to-a-redis-cluster" >}})), +with `await` in front of each command. + +{{< clients-example set="async_intro" step="cluster" lang_filter="Python" description="Foundational: Connect to a Redis cluster with the async client" difficulty="beginner" >}} +{{< /clients-example >}} + +## Cleanup and lifecycle + +Always close clients and pools when you're done: + +- Use `async with Redis(...) as r:` whenever the client's lifetime fits a + single scope. +- For longer-lived clients, call `await r.aclose()` explicitly. (The older + `close()` method is deprecated.) +- For frameworks with startup/shutdown hooks — for example FastAPI's + `lifespan` — create the client or pool at startup and close it at + shutdown so connections aren't leaked between process restarts. + +## Cancellation and timeouts + +You can cancel an in-flight command by wrapping it in `asyncio.timeout()` +or `asyncio.wait_for()`. If the command is cancelled mid-flight, `redis-py` +disconnects the underlying connection to avoid response/request misalignment +on subsequent reads. The next command transparently picks up a new +connection from the pool. + +{{< clients-example set="async_intro" step="timeout" lang_filter="Python" description="Apply an asyncio timeout to a Redis command" difficulty="intermediate" >}} +{{< /clients-example >}} + +A `Redis` client instance is safe to share across tasks (the pool handles +concurrency), but stateful objects derived from it — pipelines and pub/sub +subscriptions — are not. Give each task its own pipeline or `PubSub`. + +## Translating sync examples + +To adapt any synchronous example elsewhere in this guide to the async +client, apply these rules: + +- Replace `import redis` with `import redis.asyncio as redis`. +- Wrap the example in an `async def` function and call it with + `asyncio.run(...)`. +- Add `await` in front of every command call. +- Replace `with r.pipeline(...) as pipe:` with + `async with r.pipeline(...) as pipe:`, and `await pipe.execute()`. +- Replace `r.close()` with `await r.aclose()`, or use + `async with redis.Redis(...) as r:` as a context manager. + +## More information + +- The [`redis-py` asyncio examples](https://redis.readthedocs.io/en/stable/examples/asyncio_examples.html) + on Read the Docs cover further patterns. +- See [Error handling]({{< relref "/develop/clients/redis-py/error-handling" >}}) and + [Client-side geographic failover]({{< relref "/develop/clients/redis-py/failover" >}}) for + resiliency patterns that apply to both sync and async clients. diff --git a/local_examples/client-specific/redis-py/async_intro.py b/local_examples/client-specific/redis-py/async_intro.py new file mode 100644 index 0000000000..bcea8daa99 --- /dev/null +++ b/local_examples/client-specific/redis-py/async_intro.py @@ -0,0 +1,152 @@ +# EXAMPLE: async_intro +import asyncio + +# STEP_START connect +import redis.asyncio as redis + +async def basic_example(): + r = redis.Redis(host='localhost', port=6379, decode_responses=True) + await r.set('foo', 'bar') + value = await r.get('foo') + print(value) + # bar + await r.aclose() + +asyncio.run(basic_example()) +# STEP_END + +# STEP_START context_manager +async def context_example(): + async with redis.Redis( + host='localhost', port=6379, decode_responses=True + ) as r: + await r.set('foo', 'bar') + value = await r.get('foo') + print(value) + # bar + +asyncio.run(context_example()) +# STEP_END + +# STEP_START pool +async def pool_example(): + pool = redis.ConnectionPool( + host='localhost', port=6379, decode_responses=True, + max_connections=10, + ) + r = redis.Redis(connection_pool=pool) + await r.set('foo', 'bar') + await r.aclose() + await pool.aclose() + +asyncio.run(pool_example()) +# STEP_END + +# STEP_START gather +async def gather_example(): + async with redis.Redis( + host='localhost', port=6379, decode_responses=True + ) as r: + await r.mset({'a': '1', 'b': '2', 'c': '3'}) + a, b, c = await asyncio.gather( + r.get('a'), + r.get('b'), + r.get('c'), + ) + print(a, b, c) + # 1 2 3 + +asyncio.run(gather_example()) +# STEP_END + +# STEP_START pipeline +async def pipeline_example(): + async with redis.Redis( + host='localhost', port=6379, decode_responses=True + ) as r: + async with r.pipeline(transaction=True) as pipe: + pipe.set('a', '1') + pipe.set('b', '2') + pipe.get('a') + pipe.get('b') + results = await pipe.execute() + print(results) + # [True, True, '1', '2'] + +asyncio.run(pipeline_example()) +# STEP_END + +# STEP_START watch +from redis.exceptions import WatchError + +async def watch_example(): + async with redis.Redis( + host='localhost', port=6379, decode_responses=True + ) as r: + await r.set('counter', '0') + async with r.pipeline(transaction=True) as pipe: + while True: + try: + await pipe.watch('counter') + current = int(await pipe.get('counter')) + pipe.multi() + pipe.set('counter', str(current + 1)) + await pipe.execute() + break + except WatchError: + continue + print(await r.get('counter')) + # 1 + +asyncio.run(watch_example()) +# STEP_END + +# STEP_START pubsub +async def pubsub_example(): + async with redis.Redis( + host='localhost', port=6379, decode_responses=True + ) as r: + pubsub = r.pubsub() + await pubsub.subscribe('channel-1') + + async def reader(): + async for message in pubsub.listen(): + if message['type'] == 'message': + print(message['data']) + # hello + break + + reader_task = asyncio.create_task(reader()) + await asyncio.sleep(0.1) + await r.publish('channel-1', 'hello') + await reader_task + await pubsub.aclose() + +asyncio.run(pubsub_example()) +# STEP_END + +# STEP_START cluster +from redis.asyncio.cluster import RedisCluster + +async def cluster_example(): + rc = RedisCluster(host='localhost', port=16379, decode_responses=True) + await rc.set('foo', 'bar') + value = await rc.get('foo') + print(value) + # bar + await rc.aclose() + +asyncio.run(cluster_example()) +# STEP_END + +# STEP_START timeout +async def timeout_example(): + async with redis.Redis( + host='localhost', port=6379, decode_responses=True + ) as r: + try: + async with asyncio.timeout(0.001): + await r.set('foo', 'bar') + except TimeoutError: + print('command cancelled') +# STEP_END From 6c3a1920708b283c13ea0fc814fd6c67f816afb9 Mon Sep 17 00:00:00 2001 From: Andy Stark Date: Tue, 19 May 2026 14:18:50 +0100 Subject: [PATCH 2/3] DOC-6331 issues found in Codex review --- content/develop/clients/redis-py/async.md | 33 +++++++++-------- .../client-specific/redis-py/async_intro.py | 35 ++++++++++--------- 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/content/develop/clients/redis-py/async.md b/content/develop/clients/redis-py/async.md index b862a9d445..36b73b045b 100644 --- a/content/develop/clients/redis-py/async.md +++ b/content/develop/clients/redis-py/async.md @@ -32,10 +32,10 @@ but you can translate any of them to async by following the rules in ## Basic connection -Import the async client from `redis.asyncio` and `await` each command. The -constructor itself is *not* awaitable — the connection is established lazily -the first time you issue a command. Call `aclose()` when you're done to -release the underlying socket. +Import the async client from `redis.asyncio` and `await` each command. +Construction is synchronous and doesn't open a connection — the pool +establishes one lazily the first time you issue a command. Call `aclose()` +when you're done to release the underlying socket. {{< clients-example set="async_intro" step="connect" lang_filter="Python" description="Foundational: Connect to Redis with the async client and run a basic SET/GET" difficulty="beginner" >}} {{< /clients-example >}} @@ -78,8 +78,10 @@ Because each command is a coroutine, you can run several concurrently with {{< clients-example set="async_intro" step="gather" lang_filter="Python" description="Run several Redis commands concurrently with asyncio.gather" difficulty="intermediate" >}} {{< /clients-example >}} -Each `gather()` argument uses its own connection from the pool, so size the -pool to match your peak concurrency. +Each in-flight command consumes one pooled connection while it's executing, +so size `max_connections` to match your peak concurrency. (If you instantiate +the client with `single_connection_client=True`, all commands serialize +through a single connection instead.) ## Pipelines and transactions @@ -102,8 +104,9 @@ because it only toggles internal pipeline state. The async pub/sub object follows the same shape as the sync version. Call `await pubsub.subscribe(...)` to register channels, then iterate messages -with `async for message in pubsub.listen():`. Call `await pubsub.aclose()` -to release the connection. +with `async for message in pubsub.listen():`. Use the `PubSub` object as an +async context manager (`async with r.pubsub() as pubsub:`) or call +`await pubsub.aclose()` explicitly to release the connection. {{< clients-example set="async_intro" step="pubsub" lang_filter="Python" description="Subscribe and receive messages with the async pub/sub API" difficulty="intermediate" >}} {{< /clients-example >}} @@ -135,11 +138,11 @@ Always close clients and pools when you're done: ## Cancellation and timeouts -You can cancel an in-flight command by wrapping it in `asyncio.timeout()` -or `asyncio.wait_for()`. If the command is cancelled mid-flight, `redis-py` -disconnects the underlying connection to avoid response/request misalignment -on subsequent reads. The next command transparently picks up a new -connection from the pool. +You can cancel an in-flight command by wrapping it in `asyncio.wait_for()` +(or `asyncio.timeout()` on Python 3.11+). If the command is cancelled +mid-flight, `redis-py` disconnects the underlying connection to avoid +response/request misalignment on subsequent reads. The next command +transparently picks up a new connection from the pool. {{< clients-example set="async_intro" step="timeout" lang_filter="Python" description="Apply an asyncio timeout to a Redis command" difficulty="intermediate" >}} {{< /clients-example >}} @@ -156,7 +159,9 @@ client, apply these rules: - Replace `import redis` with `import redis.asyncio as redis`. - Wrap the example in an `async def` function and call it with `asyncio.run(...)`. -- Add `await` in front of every command call. +- Add `await` in front of every command call. (Exception: buffered pipeline + commands such as `pipe.set(...)` stay un-awaited; only `pipe.watch(...)`, + any reads issued before `pipe.multi()`, and `pipe.execute()` are awaited.) - Replace `with r.pipeline(...) as pipe:` with `async with r.pipeline(...) as pipe:`, and `await pipe.execute()`. - Replace `r.close()` with `await r.aclose()`, or use diff --git a/local_examples/client-specific/redis-py/async_intro.py b/local_examples/client-specific/redis-py/async_intro.py index bcea8daa99..24d1f2e4b3 100644 --- a/local_examples/client-specific/redis-py/async_intro.py +++ b/local_examples/client-specific/redis-py/async_intro.py @@ -106,21 +106,20 @@ async def pubsub_example(): async with redis.Redis( host='localhost', port=6379, decode_responses=True ) as r: - pubsub = r.pubsub() - await pubsub.subscribe('channel-1') - - async def reader(): - async for message in pubsub.listen(): - if message['type'] == 'message': - print(message['data']) - # hello - break + async with r.pubsub() as pubsub: + await pubsub.subscribe('channel-1') + + async def reader(): + async for message in pubsub.listen(): + if message['type'] == 'message': + print(message['data']) + # hello + break - reader_task = asyncio.create_task(reader()) - await asyncio.sleep(0.1) - await r.publish('channel-1', 'hello') - await reader_task - await pubsub.aclose() + reader_task = asyncio.create_task(reader()) + await asyncio.sleep(0.1) + await r.publish('channel-1', 'hello') + await reader_task asyncio.run(pubsub_example()) # STEP_END @@ -145,8 +144,10 @@ async def timeout_example(): host='localhost', port=6379, decode_responses=True ) as r: try: - async with asyncio.timeout(0.001): - await r.set('foo', 'bar') - except TimeoutError: + # BLPOP blocks waiting for a value, so the timeout reliably fires. + await asyncio.wait_for(r.blpop('empty-queue'), timeout=0.1) + except asyncio.TimeoutError: print('command cancelled') + +asyncio.run(timeout_example()) # STEP_END From efe5dae014b3fc81a665b518740feffc3921b043 Mon Sep 17 00:00:00 2001 From: andy-stark-redis <164213578+andy-stark-redis@users.noreply.github.com> Date: Tue, 19 May 2026 14:45:38 +0100 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: David Dougherty --- content/develop/clients/redis-py/async.md | 2 +- local_examples/client-specific/redis-py/async_intro.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/content/develop/clients/redis-py/async.md b/content/develop/clients/redis-py/async.md index 36b73b045b..51b4e88ccd 100644 --- a/content/develop/clients/redis-py/async.md +++ b/content/develop/clients/redis-py/async.md @@ -139,7 +139,7 @@ Always close clients and pools when you're done: ## Cancellation and timeouts You can cancel an in-flight command by wrapping it in `asyncio.wait_for()` -(or `asyncio.timeout()` on Python 3.11+). If the command is cancelled +(or `asyncio.timeout()` on Python 3.11+). If the command is canceled mid-flight, `redis-py` disconnects the underlying connection to avoid response/request misalignment on subsequent reads. The next command transparently picks up a new connection from the pool. diff --git a/local_examples/client-specific/redis-py/async_intro.py b/local_examples/client-specific/redis-py/async_intro.py index 24d1f2e4b3..fb7ea7d93b 100644 --- a/local_examples/client-specific/redis-py/async_intro.py +++ b/local_examples/client-specific/redis-py/async_intro.py @@ -147,7 +147,7 @@ async def timeout_example(): # BLPOP blocks waiting for a value, so the timeout reliably fires. await asyncio.wait_for(r.blpop('empty-queue'), timeout=0.1) except asyncio.TimeoutError: - print('command cancelled') + print('command canceled') asyncio.run(timeout_example()) # STEP_END