Paulo Duarte
Senior Technical Writer · API Documentation
Technical Writing Portfolio
Last updated: 2026-05-06   ·   API version: Anthropic API 2023-06-01 · claude-sonnet-4-6

Implementing Streaming Responses with the Claude API

Conceptual Overview

Streaming enables the Claude API to send back response tokens as they are generated, rather than waiting for the entire completion to finish. Instead of a single JSON response, the server sends a sequence of events over a persistent connection using Server-Sent Events (SSE). Each event represents a part of the message — content block starts, text deltas, and final message metadata.

Why it matters:

  • Perceived responsiveness: Users see text appear in real time, similar to a chat interface, dramatically improving the experience for interactive applications.
  • Lower time-to-first-byte (TTFB): The client can begin processing or presenting content before the full response is ready.
  • Efficient cancellation: If the user wants to stop a generation early, you can close the stream without wasting tokens on a full completion.
  • Progressive delivery: Enables building features like live typing indicators or continuous text rendering in UIs.

When Not to Use Streaming

Streaming is not the right choice for every scenario. Consider a standard non-streaming request when:

  • Batch processing: You're running multiple generations and don't need to display results in real time. Streaming introduces connection overhead that can complicate batch pipelines.
  • Very short responses: If your expected output is a single word, classification label, or JSON object under ~50 tokens, the overhead of parsing SSE events outweighs any perceived speed gain.
  • Full-output-first pipelines: Any workflow that requires the complete response before downstream processing — translation, structured extraction, chain-of-thought reranking — should use a standard request. Streaming forces you to reassemble fragments, adding complexity with no benefit.
  • Serverless functions with short timeouts: If your runtime has a strict execution limit (e.g., AWS Lambda with a 30-second timeout), a non-streaming request gives you one atomic response instead of managing a connection that might outlive the function.

Rule of thumb: Use streaming when a human is waiting to read the output interactively. Use standard requests when another machine is the consumer.

Prerequisites

  • Anthropic API keyCreate one here. Set it as an environment variable and never hardcode it in source files:
    export ANTHROPIC_API_KEY="sk-ant-..."
    In production, load it from a secrets manager or your platform's secure environment variable store. Hardcoding keys in committed code is the most common cause of leaked credentials.
  • curl and/or Python 3.7+ installed
  • For Python, install the Anthropic SDK:
    pip install anthropic

Step-by-Step Guide

Using curl

The CLI approach is excellent for testing and understanding the raw event stream without any abstraction layer.

  1. Set your API key as an environment variable.
  2. Create a JSON request body with "stream": true.
  3. Send the request with -N (no buffering) to see events as they arrive.

Example

curl https://api.anthropic.com/v1/messages     -H "content-type: application/json"     -H "x-api-key: $ANTHROPIC_API_KEY"     -H "anthropic-version: 2023-06-01"     -N     -d '{
    "model": "claude-sonnet-4-6",
    "max_tokens": 256,
    "stream": true,
    "messages": [{"role": "user", "content": "Explain quantum computing in one sentence."}]
}'

You will see lines like event: message_start, data: {...}, then several content_block_delta events with text fragments, and finally message_stop.

Using Python

The Anthropic Python SDK handles SSE parsing and provides a high-level iterator so you never need to parse raw events unless you want to.

Synchronous streaming

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=256,
    messages=[{"role": "user", "content": "Tell me a short story about a robot."}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

The text_stream iterator yields each text delta as it arrives. The context manager (with) automatically closes the connection when the block exits.

Accessing raw events

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=256,
    messages=[{"role": "user", "content": "Hello, world!"}],
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            print(event.delta.text, end="", flush=True)
        elif event.type == "message_stop":
            print(f"
Stop reason: {event.message.stop_reason}")

Asynchronous streaming (asyncio)

import asyncio
from anthropic import AsyncAnthropic

async def main():
    client = AsyncAnthropic()
    async with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=256,
        messages=[{"role": "user", "content": "Write a haiku about the sea."}],
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)

asyncio.run(main())

Complete Example

Below is a single, self-contained Python script that demonstrates streaming end to end. Copy it, set your API key as an environment variable, and run it.

"""
complete_streaming_example.py

A fully executable example of streaming with the Claude API.
Usage:
    export ANTHROPIC_API_KEY="sk-ant-..."
    python complete_streaming_example.py
"""

import os
import sys
import anthropic


def main():
    # Load the API key from the environment — never hardcode it.
    api_key = os.environ.get("ANTHROPIC_API_KEY")
    if not api_key:
        print("Error: ANTHROPIC_API_KEY environment variable not set.")
        print("Set it with: export ANTHROPIC_API_KEY='sk-ant-...'")
        sys.exit(1)

    client = anthropic.Anthropic(api_key=api_key)

    prompt = "Write a short poem about a developer who finally fixed a bug after three days."

    print("Streaming response:
")

    try:
        with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=300,
            messages=[{"role": "user", "content": prompt}],
        ) as stream:
            for text_delta in stream.text_stream:
                print(text_delta, end="", flush=True)

        # After the stream completes, the final message is available.
        final = stream.get_final_message()
        print(f"

---
Stop reason: {final.stop_reason}")
        print(f"Input tokens: {final.usage.input_tokens}")
        print(f"Output tokens: {final.usage.output_tokens}")

    except anthropic.APIStatusError as e:
        print(f"
API error: {e.status_code} - {e.message}")
        sys.exit(1)
    except Exception as e:
        print(f"
Unexpected error: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()

Expected output: You should see the poem appear one word at a time, followed by a summary line showing stop_reason: "end_turn" and the token counts.

Reference

Request Parameters for Streaming

All standard Messages API parameters apply. The only change to enable streaming is:

ParameterTypeRequiredDescription
streambooleanYes (set to true)Enables SSE streaming. If omitted or false, a standard non-streaming response is returned.

When streaming, the following standard parameters work exactly the same: model (e.g., "claude-sonnet-4-6" or "claude-opus-4-5"), messages (conversation history), max_tokens (required), system (system prompt), temperature, top_p, top_k, stop_sequences, metadata, etc.

Stream Event Types

EventDescriptionKey Fields in data
message_startIndicates the beginning of the response.message object with id, model, role
content_block_startA new content block is starting (e.g., text block).index, content_block (type, text initial)
content_block_deltaA chunk of content within a block.index, delta (type, text snippet)
content_block_stopA content block has been fully emitted.index
message_deltaA delta to top-level message fields (stop_reason, usage).delta (stop_reason, stop_sequence), usage
message_stopThe complete message is finished.message object with final fields and usage
pingPeriodic keep-alive to prevent timeouts. No data payload.(empty)

SDK Helper Methods (Python Stream class)

Method / PropertyReturn TypeDescription
text_streamIterator[str]Yields each text delta as it arrives. These are individual fragments — concatenate them yourself or print them sequentially to build the full output.
get_final_message()MessageReturns the completed Message object after the stream finishes. Blocks if the stream is still active.
current_message_snapshotMessageLive snapshot of the message as it is being built (available during iteration).
Important: text_stream yields raw deltas, not accumulated text. If you want the full message as a single string, either (a) concatenate the deltas yourself, or (b) call get_final_message() after the stream completes and access .content[0].text.

Common Errors and Troubleshooting

1. Stream not enabled in the request

  • Symptom: You receive a single JSON response instead of SSE events.
  • What to do: Verify "stream": true is included in the request body. In the Python SDK, use client.messages.stream(...) instead of client.messages.create(...).

2. Connection closed prematurely / timeout

  • Symptom: requests.exceptions.ConnectionError or httpx.ReadTimeout.
  • What to do: Ensure you consume the stream continuously. Increase the client read timeout: client = anthropic.Anthropic(timeout=60.0). The API sends periodic ping events (roughly every 15 seconds), but network appliances may close idle connections sooner.

3. Invalid SSE parsing / missing events

  • Symptom: Only partial text is collected, or you see parsing errors when handling events manually.
  • What to do: Use a robust SSE parser. The Python SDK's Stream class handles this for you. Events may be split across TCP frames; buffer data and split on double newlines. When using curl, always include the -N flag.

4. content_block_delta received before content_block_start

  • Symptom: Your code expects a block start event but encounters a delta first, causing an index or key error.
  • What to do: Maintain a map of content blocks keyed by their index. The official SDK abstracts this entirely.

5. Rate limiting or permission errors (HTTP 429 / 403)

  • Symptom: Error 429: Too Many Requests or 403: Forbidden.
  • What to do: Confirm your API key is valid and has available quota. For 429 errors, implement exponential backoff. For streaming, you must handle retries in your own code because a stream cannot be replayed after it begins.

6. Unexpected stop reason (e.g., max_tokens)

  • Symptom: The stream ends with stop_reason: "max_tokens" and the text appears truncated.
  • What to do: Increase the max_tokens parameter. Set a generous max_tokens and let the model stop naturally (stop_reason: "end_turn").

7. Mixing streaming and non-streaming SDK methods

  • Symptom: AttributeError: 'Message' object has no attribute 'text_stream'.
  • What to do: Use client.messages.stream(...) for streaming. create() returns a plain Message, not a Stream.

8. Asyncio event loop errors

  • Symptom: RuntimeError: There is no current event loop in async Python code.
  • What to do: Use AsyncAnthropic with async with. Ensure you are inside an event loop (e.g., asyncio.run(main())).

9. Hardcoded API key leaked in version control

  • Symptom: Your key appears in a public repository or a past commit.
  • What to do (immediately): Revoke the key at console.anthropic.com. Generate a new key and store it in an environment variable or a .env file (with .env added to .gitignore). Use a secrets scanning tool (git-secrets, truffleHog) to catch leaks before they reach a remote. Never write a raw key string inside a source file that could be committed.

For additional details, refer to the official Anthropic streaming documentation and the Python SDK reference.