Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions content/develop/clients/redis-py/async.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
---
categories:
- docs
- develop
- stack
- oss
- rs
- rc
- oss
- kubernetes
- clients
description: Use redis-py with asyncio for non-blocking Redis access
linkTitle: Async operations
title: Asynchronous operations with redis-py
weight: 22
---

`redis-py` provides an asyncio-compatible API under the
[`redis.asyncio`](https://redis.readthedocs.io/en/stable/examples/asyncio_examples.html)
namespace. It mirrors the synchronous client API, so most code patterns
translate directly — you `await` commands instead of calling them.

Use the async client for I/O-bound workloads, for integration with async web
frameworks (such as [FastAPI](https://fastapi.tiangolo.com/), [Starlette](https://www.starlette.io/), [aiohttp](https://docs.aiohttp.org/en/stable/), or [Sanic](https://sanic.dev/en/), or when you need
to run many concurrent Redis operations from a single process. For simple
scripts, CPU-bound work, or codebases without an existing event loop, the
synchronous client is usually a better choice.

The examples on the other pages in this section use the synchronous client,
but you can translate any of them to async by following the rules in
[Translating sync examples](#translating-sync-examples) below.

## Basic connection

Import the async client from `redis.asyncio` and `await` each command. The
constructor itself is *not* awaitable — the connection is established lazily
the first time you issue a command. Call `aclose()` when you're done to
release the underlying socket.

{{< clients-example set="async_intro" step="connect" lang_filter="Python" description="Foundational: Connect to Redis with the async client and run a basic SET/GET" difficulty="beginner" >}}
{{< /clients-example >}}

The recommended pattern is to use the client as an async context manager,
which ensures `aclose()` runs even if an exception is raised:

{{< clients-example set="async_intro" step="context_manager" lang_filter="Python" description="Foundational: Use the async client as a context manager for automatic cleanup" difficulty="beginner" >}}
{{< /clients-example >}}

## Connection pools

For production usage, you should manage connections with a connection pool
rather than opening and closing them individually.
See [Connection pools and multiplexing]({{< relref "/develop/clients/pools-and-muxing" >}})
for more information about how this works.

In a long-running async application, create a single
`redis.asyncio.ConnectionPool` at startup and share it across requests by
passing it to each `Redis(connection_pool=...)` instance. Avoid creating a
new `Redis()` per request — it defeats pooling and pays the connection cost
on every call.

{{< clients-example set="async_intro" step="pool" lang_filter="Python" description="Foundational: Create and share an async connection pool" difficulty="beginner" >}}
{{< /clients-example >}}

Tune `max_connections` to the maximum number of concurrent Redis operations
you expect from the process. A `BlockingConnectionPool` is also available
if you'd rather block on pool exhaustion than raise an error.

## Awaiting commands

Every command method on the async client returns a coroutine — each call
must be `await`ed. Forgetting `await` returns a coroutine object instead of
a result, which is the most common async mistake to watch for.

Because each command is a coroutine, you can run several concurrently with
`asyncio.gather()`:

{{< clients-example set="async_intro" step="gather" lang_filter="Python" description="Run several Redis commands concurrently with asyncio.gather" difficulty="intermediate" >}}
{{< /clients-example >}}

Each `gather()` argument uses its own connection from the pool, so size the
pool to match your peak concurrency.

## Pipelines and transactions

Pipelines and transactions work the same way as in the synchronous client
(see [Pipelines and transactions]({{< relref "/develop/clients/redis-py/transpipe" >}})
for the conceptual background). The only difference is that you create the
pipeline inside an `async with` block and `await pipe.execute()`.

{{< clients-example set="async_intro" step="pipeline" lang_filter="Python" description="Foundational: Execute commands in an async pipeline" difficulty="beginner" >}}
{{< /clients-example >}}

`WATCH`/`MULTI`/`EXEC` for optimistic locking also has an async form.
`watch()` and `execute()` are coroutines; `multi()` remains synchronous
because it only toggles internal pipeline state.

{{< clients-example set="async_intro" step="watch" lang_filter="Python" description="Optimistic locking with an async pipeline and WATCH" difficulty="intermediate" >}}
{{< /clients-example >}}

## Pub/Sub

The async pub/sub object follows the same shape as the sync version. Call
`await pubsub.subscribe(...)` to register channels, then iterate messages
with `async for message in pubsub.listen():`. Call `await pubsub.aclose()`
to release the connection.

{{< clients-example set="async_intro" step="pubsub" lang_filter="Python" description="Subscribe and receive messages with the async pub/sub API" difficulty="intermediate" >}}
{{< /clients-example >}}

A single `PubSub` object isn't safe to share across tasks — give each
consuming task its own subscription.

## Cluster connections

To connect to a Redis cluster asynchronously, import `RedisCluster` from
`redis.asyncio.cluster`. The API matches the synchronous cluster client
(see [Connect to a Redis cluster]({{< relref "/develop/clients/redis-py/connect#connect-to-a-redis-cluster" >}})),
with `await` in front of each command.

{{< clients-example set="async_intro" step="cluster" lang_filter="Python" description="Foundational: Connect to a Redis cluster with the async client" difficulty="beginner" >}}
{{< /clients-example >}}

## Cleanup and lifecycle

Always close clients and pools when you're done:

- Use `async with Redis(...) as r:` whenever the client's lifetime fits a
single scope.
- For longer-lived clients, call `await r.aclose()` explicitly. (The older
`close()` method is deprecated.)
- For frameworks with startup/shutdown hooks — for example FastAPI's
`lifespan` — create the client or pool at startup and close it at
shutdown so connections aren't leaked between process restarts.

## Cancellation and timeouts

You can cancel an in-flight command by wrapping it in `asyncio.timeout()`
or `asyncio.wait_for()`. If the command is cancelled mid-flight, `redis-py`
disconnects the underlying connection to avoid response/request misalignment
on subsequent reads. The next command transparently picks up a new
connection from the pool.

{{< clients-example set="async_intro" step="timeout" lang_filter="Python" description="Apply an asyncio timeout to a Redis command" difficulty="intermediate" >}}
{{< /clients-example >}}

A `Redis` client instance is safe to share across tasks (the pool handles
concurrency), but stateful objects derived from it — pipelines and pub/sub
subscriptions — are not. Give each task its own pipeline or `PubSub`.

## Translating sync examples

To adapt any synchronous example elsewhere in this guide to the async
client, apply these rules:

- Replace `import redis` with `import redis.asyncio as redis`.
- Wrap the example in an `async def` function and call it with
`asyncio.run(...)`.
- Add `await` in front of every command call.
- Replace `with r.pipeline(...) as pipe:` with
`async with r.pipeline(...) as pipe:`, and `await pipe.execute()`.
- Replace `r.close()` with `await r.aclose()`, or use
`async with redis.Redis(...) as r:` as a context manager.

## More information

- The [`redis-py` asyncio examples](https://redis.readthedocs.io/en/stable/examples/asyncio_examples.html)
on Read the Docs cover further patterns.
- See [Error handling]({{< relref "/develop/clients/redis-py/error-handling" >}}) and
[Client-side geographic failover]({{< relref "/develop/clients/redis-py/failover" >}}) for
resiliency patterns that apply to both sync and async clients.
152 changes: 152 additions & 0 deletions local_examples/client-specific/redis-py/async_intro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# EXAMPLE: async_intro
import asyncio

# STEP_START connect
import redis.asyncio as redis

async def basic_example():
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar
await r.aclose()

asyncio.run(basic_example())
# STEP_END

# STEP_START context_manager
async def context_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('foo', 'bar')
value = await r.get('foo')
print(value)
# bar

asyncio.run(context_example())
# STEP_END

# STEP_START pool
async def pool_example():
pool = redis.ConnectionPool(
host='localhost', port=6379, decode_responses=True,
max_connections=10,
)
r = redis.Redis(connection_pool=pool)
await r.set('foo', 'bar')
await r.aclose()
await pool.aclose()

asyncio.run(pool_example())
# STEP_END

# STEP_START gather
async def gather_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.mset({'a': '1', 'b': '2', 'c': '3'})
a, b, c = await asyncio.gather(
r.get('a'),
r.get('b'),
r.get('c'),
)
print(a, b, c)
# 1 2 3

asyncio.run(gather_example())
# STEP_END

# STEP_START pipeline
async def pipeline_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
async with r.pipeline(transaction=True) as pipe:
pipe.set('a', '1')
pipe.set('b', '2')
pipe.get('a')
pipe.get('b')
results = await pipe.execute()
print(results)
# [True, True, '1', '2']

asyncio.run(pipeline_example())
# STEP_END

# STEP_START watch
from redis.exceptions import WatchError

async def watch_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
await r.set('counter', '0')
async with r.pipeline(transaction=True) as pipe:
while True:
try:
await pipe.watch('counter')
current = int(await pipe.get('counter'))
pipe.multi()
pipe.set('counter', str(current + 1))
await pipe.execute()
break
except WatchError:
continue
print(await r.get('counter'))
# 1

asyncio.run(watch_example())
# STEP_END

# STEP_START pubsub
async def pubsub_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
pubsub = r.pubsub()
await pubsub.subscribe('channel-1')

async def reader():
async for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
# hello
break

reader_task = asyncio.create_task(reader())
await asyncio.sleep(0.1)
await r.publish('channel-1', 'hello')
await reader_task
await pubsub.aclose()

asyncio.run(pubsub_example())
# STEP_END

# STEP_START cluster
from redis.asyncio.cluster import RedisCluster

async def cluster_example():
rc = RedisCluster(host='localhost', port=16379, decode_responses=True)
await rc.set('foo', 'bar')
value = await rc.get('foo')
print(value)
# bar
await rc.aclose()

asyncio.run(cluster_example())
# STEP_END

# STEP_START timeout
async def timeout_example():
async with redis.Redis(
host='localhost', port=6379, decode_responses=True
) as r:
try:
async with asyncio.timeout(0.001):
await r.set('foo', 'bar')
except TimeoutError:
print('command cancelled')
Comment thread
andy-stark-redis marked this conversation as resolved.
Outdated
# STEP_END
Comment thread
cursor[bot] marked this conversation as resolved.
Loading