Server-Sent Events (SSE) have become a staple for building real-time applications, especially with the rise of Generative AI and streaming LLM responses. However, as any developer who has deployed SSE in production knows, maintaining a stable connection can be a nightmare.
Whether it is a flaky mobile network, a serverless function timeout, or a load balancer cycling connections, disconnections are inevitable. When they happen, the client usually has to restart the entire stream from scratch—wasting resources and creating a poor user experience.
Today, I’m excited to announce the release of resumable-stream, a Python port of Vercel’s resumable-stream. This library brings the same resilient streaming architecture that powers the Vercel AI SDK to the Python ecosystem.
The Problem: Fragile Streams
Standard SSE streams are stateless and ephemeral. If a client disconnects at chunk 50 of 100, the server typically stops the generator. When the client reconnects, it has no way of telling the server “pick up where I left off.”
In a distributed environment (like multiple Kubernetes pods or serverless functions), the reconnection might hit a different server instance that has no context of the previous stream.
The Solution: Redis-Powered Resumption
resumable-stream solves this by using Redis as a coordination layer. When a stream is initiated, it creates a “sentinel” in Redis to track the stream’s state (active vs. finished). The producer then runs as a managed background task, ensuring that even if the client’s HTTP connection is severed, the data generation continues until completion.
When a client reconnects with a resume_at offset, the library:
- Identifies the State: Checks the sentinel to see if the stream is still active or already done.
- Handshake: The consumer subscribes to a dedicated Redis channel and notifies the producer of its intent to resume.
- Buffer Replay: The producer fetches the necessary chunks from the internal buffer and replays them to the new consumer.
- Live Streaming: Transitions seamlessly back into real-time streaming via Redis Pub/Sub.
Tip (Serverless & Edge Ready)
This architecture is specifically designed for serverless environments (like AWS Lambda, Vercel Functions, or Cloud Run) where you don’t have control over sticky sessions. A client can start a stream on Instance A, lose connection, and resume exactly where they left off on Instance B.
Getting Started
You can install the package via pip:
pip install resumable-streamBasic Usage with FastAPI
The easiest way to use the library is with the idempotent resumable_stream API. It automatically handles both the creation of new streams and the resumption of existing ones.
import asynciofrom fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom resumable_stream import create_resumable_stream_context
app = FastAPI()ctx = create_resumable_stream_context(redis_url="redis://localhost:6379")
async def my_producer(): for i in range(10): yield f"data: chunk {i}\n\n" await asyncio.sleep(1)
@app.get("/stream/{stream_id}")async def stream_endpoint(stream_id: str, resume_at: int = None): stream = await ctx.resumable_stream( stream_id, my_producer, skip_characters=resume_at )
if stream is None: return {"error": "Stream already finished"}, 422
return StreamingResponse(stream, media_type="text/event-stream")Case Study: Resilient AI Chat
The most impactful use case for resumable-stream is in Generative AI. LLM responses are often long, expensive to generate, and mission-critical. If a client loses connection halfway through a 500-token response, you don’t want to re-run the inference.
Here is how you can implement a professional AI chat endpoint that supports resumption:
1. The Initial Request (POST)
When the user starts a chat, we generate a unique stream_id and initialize the resumable stream.
@router.post("/chat")async def chat_stream(request: ChatRequest): stream_id = str(uuid.uuid4())
def stream_factory(): # Your LLM logic here (e.g., LangChain graph.astream) return chat_event_generator( graph.astream(input=state, config=config), stream_id=stream_id )
# producer starts in the background via Redis stream_iterator = await stream_context.create_new_resumable_stream( stream_id, stream_factory )
return EventSourceResponse(stream_iterator)2. The Resumption Endpoint (GET)
If the connection drops, the frontend can reconnect to a dedicated stream endpoint using the stream_id it received in the first request.
@router.get("/chat/{stream_id}/stream")async def resume_chat_stream( stream_id: str, resume_at: str | None = None): # Library automatically picks up from Redis buffer stream = await stream_context.resume_existing_stream( stream_id, int(resume_at) if resume_at else None )
if not stream: return Response("Stream is already done", status_code=422)
return EventSourceResponse(stream)Important (Why two endpoints?)
Using a POST for the initial generation and a GET for resumption is a best practice. It separates the side-effect (starting the AI generation) from the idempotent read (resuming the stream).
Behind the Scenes: The Redis Key Structure
To keep things fast and lightweight, the library uses a specialized key structure:
- Sentinel:
{prefix}:rs:sentinel:{streamId}— Tracks if the stream is1(active) orDONE. - Request Channel:
{prefix}:rs:request:{streamId}— The signal box where new consumers ask producers to replay data. - Chunk Channel:
{prefix}:rs:chunk:{listenerId}— The fast-path for delivering data to individual consumers.
By using Redis Pub/Sub for the actual data delivery, we keep the database overhead minimal—only performing INCR and GET/SET operations for state management.
Features at a Glance
- Resumable SSE streams: Pick up right where you left off.
- Redis-based persistence: Cross-instance communication out of the box.
- Low Latency: Optimized for the common case with minimal Redis operations.
- Flexible: Use with FastAPI, Starlette, or any ASGI framework.
Conclusion
Streaming is no longer just a “nice-to-have” feature; for modern AI apps, it’s a requirement. resumable-stream ensures that your streams are as robust as the rest of your stack.
Check out the project on GitHub: hieunguyen1053/resumable-stream
Happy streaming! 🚀