Implementing Streaming Responses with the Claude API
Conceptual Overview
Streaming enables the Claude API to send back response tokens as they are generated, rather than waiting for the entire completion to finish. Instead of a single JSON response, the server sends a sequence of events over a persistent connection using Server-Sent Events (SSE). Each event represents a part of the message — content block starts, text deltas, and final message metadata.
Why it matters:
- Perceived responsiveness: Users see text appear in real time, similar to a chat interface, dramatically improving the experience for interactive applications.
- Lower time-to-first-byte (TTFB): The client can begin processing or presenting content before the full response is ready.
- Efficient cancellation: If the user wants to stop a generation early, you can close the stream without wasting tokens on a full completion.
- Progressive delivery: Enables building features like live typing indicators or continuous text rendering in UIs.
When Not to Use Streaming
Streaming is not the right choice for every scenario. Consider a standard non-streaming request when:
- Batch processing: You're running multiple generations and don't need to display results in real time. Streaming introduces connection overhead that can complicate batch pipelines.
- Very short responses: If your expected output is a single word, classification label, or JSON object under ~50 tokens, the overhead of parsing SSE events outweighs any perceived speed gain.
- Full-output-first pipelines: Any workflow that requires the complete response before downstream processing — translation, structured extraction, chain-of-thought reranking — should use a standard request. Streaming forces you to reassemble fragments, adding complexity with no benefit.
- Serverless functions with short timeouts: If your runtime has a strict execution limit (e.g., AWS Lambda with a 30-second timeout), a non-streaming request gives you one atomic response instead of managing a connection that might outlive the function.
Rule of thumb: Use streaming when a human is waiting to read the output interactively. Use standard requests when another machine is the consumer.
Prerequisites
- Anthropic API key — Create one here. Set it as an environment variable and never hardcode it in source files:
In production, load it from a secrets manager or your platform's secure environment variable store. Hardcoding keys in committed code is the most common cause of leaked credentials.export ANTHROPIC_API_KEY="sk-ant-..." curland/or Python 3.7+ installed- For Python, install the Anthropic SDK:
pip install anthropic
Step-by-Step Guide
Using curl
The CLI approach is excellent for testing and understanding the raw event stream without any abstraction layer.
- Set your API key as an environment variable.
- Create a JSON request body with
"stream": true. - Send the request with
-N(no buffering) to see events as they arrive.
Example
curl https://api.anthropic.com/v1/messages -H "content-type: application/json" -H "x-api-key: $ANTHROPIC_API_KEY" -H "anthropic-version: 2023-06-01" -N -d '{
"model": "claude-sonnet-4-6",
"max_tokens": 256,
"stream": true,
"messages": [{"role": "user", "content": "Explain quantum computing in one sentence."}]
}'
You will see lines like event: message_start, data: {...}, then several content_block_delta events with text fragments, and finally message_stop.
Using Python
The Anthropic Python SDK handles SSE parsing and provides a high-level iterator so you never need to parse raw events unless you want to.
Synchronous streaming
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=256,
messages=[{"role": "user", "content": "Tell me a short story about a robot."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
The text_stream iterator yields each text delta as it arrives. The context manager (with) automatically closes the connection when the block exits.
Accessing raw events
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=256,
messages=[{"role": "user", "content": "Hello, world!"}],
) as stream:
for event in stream:
if event.type == "content_block_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_stop":
print(f"
Stop reason: {event.message.stop_reason}")
Asynchronous streaming (asyncio)
import asyncio
from anthropic import AsyncAnthropic
async def main():
client = AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=256,
messages=[{"role": "user", "content": "Write a haiku about the sea."}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
asyncio.run(main())
Complete Example
Below is a single, self-contained Python script that demonstrates streaming end to end. Copy it, set your API key as an environment variable, and run it.
"""
complete_streaming_example.py
A fully executable example of streaming with the Claude API.
Usage:
export ANTHROPIC_API_KEY="sk-ant-..."
python complete_streaming_example.py
"""
import os
import sys
import anthropic
def main():
# Load the API key from the environment — never hardcode it.
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
print("Error: ANTHROPIC_API_KEY environment variable not set.")
print("Set it with: export ANTHROPIC_API_KEY='sk-ant-...'")
sys.exit(1)
client = anthropic.Anthropic(api_key=api_key)
prompt = "Write a short poem about a developer who finally fixed a bug after three days."
print("Streaming response:
")
try:
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=300,
messages=[{"role": "user", "content": prompt}],
) as stream:
for text_delta in stream.text_stream:
print(text_delta, end="", flush=True)
# After the stream completes, the final message is available.
final = stream.get_final_message()
print(f"
---
Stop reason: {final.stop_reason}")
print(f"Input tokens: {final.usage.input_tokens}")
print(f"Output tokens: {final.usage.output_tokens}")
except anthropic.APIStatusError as e:
print(f"
API error: {e.status_code} - {e.message}")
sys.exit(1)
except Exception as e:
print(f"
Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Expected output: You should see the poem appear one word at a time, followed by a summary line showing stop_reason: "end_turn" and the token counts.
Reference
Request Parameters for Streaming
All standard Messages API parameters apply. The only change to enable streaming is:
| Parameter | Type | Required | Description |
|---|---|---|---|
stream | boolean | Yes (set to true) | Enables SSE streaming. If omitted or false, a standard non-streaming response is returned. |
When streaming, the following standard parameters work exactly the same: model (e.g., "claude-sonnet-4-6" or "claude-opus-4-5"), messages (conversation history), max_tokens (required), system (system prompt), temperature, top_p, top_k, stop_sequences, metadata, etc.
Stream Event Types
| Event | Description | Key Fields in data |
|---|---|---|
message_start | Indicates the beginning of the response. | message object with id, model, role |
content_block_start | A new content block is starting (e.g., text block). | index, content_block (type, text initial) |
content_block_delta | A chunk of content within a block. | index, delta (type, text snippet) |
content_block_stop | A content block has been fully emitted. | index |
message_delta | A delta to top-level message fields (stop_reason, usage). | delta (stop_reason, stop_sequence), usage |
message_stop | The complete message is finished. | message object with final fields and usage |
ping | Periodic keep-alive to prevent timeouts. No data payload. | (empty) |
SDK Helper Methods (Python Stream class)
| Method / Property | Return Type | Description |
|---|---|---|
text_stream | Iterator[str] | Yields each text delta as it arrives. These are individual fragments — concatenate them yourself or print them sequentially to build the full output. |
get_final_message() | Message | Returns the completed Message object after the stream finishes. Blocks if the stream is still active. |
current_message_snapshot | Message | Live snapshot of the message as it is being built (available during iteration). |
text_stream yields raw deltas, not accumulated text. If you want the full message as a single string, either (a) concatenate the deltas yourself, or (b) call get_final_message() after the stream completes and access .content[0].text.
Common Errors and Troubleshooting
1. Stream not enabled in the request
- Symptom: You receive a single JSON response instead of SSE events.
- What to do: Verify
"stream": trueis included in the request body. In the Python SDK, useclient.messages.stream(...)instead ofclient.messages.create(...).
2. Connection closed prematurely / timeout
- Symptom:
requests.exceptions.ConnectionErrororhttpx.ReadTimeout. - What to do: Ensure you consume the stream continuously. Increase the client read timeout:
client = anthropic.Anthropic(timeout=60.0). The API sends periodicpingevents (roughly every 15 seconds), but network appliances may close idle connections sooner.
3. Invalid SSE parsing / missing events
- Symptom: Only partial text is collected, or you see parsing errors when handling events manually.
- What to do: Use a robust SSE parser. The Python SDK's
Streamclass handles this for you. Events may be split across TCP frames; buffer data and split on double newlines. When usingcurl, always include the-Nflag.
4. content_block_delta received before content_block_start
- Symptom: Your code expects a block start event but encounters a delta first, causing an index or key error.
- What to do: Maintain a map of content blocks keyed by their
index. The official SDK abstracts this entirely.
5. Rate limiting or permission errors (HTTP 429 / 403)
- Symptom:
Error 429: Too Many Requestsor403: Forbidden. - What to do: Confirm your API key is valid and has available quota. For 429 errors, implement exponential backoff. For streaming, you must handle retries in your own code because a stream cannot be replayed after it begins.
6. Unexpected stop reason (e.g., max_tokens)
- Symptom: The stream ends with
stop_reason: "max_tokens"and the text appears truncated. - What to do: Increase the
max_tokensparameter. Set a generousmax_tokensand let the model stop naturally (stop_reason: "end_turn").
7. Mixing streaming and non-streaming SDK methods
- Symptom:
AttributeError: 'Message' object has no attribute 'text_stream'. - What to do: Use
client.messages.stream(...)for streaming.create()returns a plainMessage, not aStream.
8. Asyncio event loop errors
- Symptom:
RuntimeError: There is no current event loopin async Python code. - What to do: Use
AsyncAnthropicwithasync with. Ensure you are inside an event loop (e.g.,asyncio.run(main())).
9. Hardcoded API key leaked in version control
- Symptom: Your key appears in a public repository or a past commit.
- What to do (immediately): Revoke the key at console.anthropic.com. Generate a new key and store it in an environment variable or a
.envfile (with.envadded to.gitignore). Use a secrets scanning tool (git-secrets,truffleHog) to catch leaks before they reach a remote. Never write a raw key string inside a source file that could be committed.
For additional details, refer to the official Anthropic streaming documentation and the Python SDK reference.