Skip to main content

Streaming

The Chat API streaming endpoint (POST /chat/stream) uses Server-Sent Events (SSE) to deliver the agent's response incrementally.

SSE Event Format

Each event has the following structure:

event: new_message
id: {message_id}:{event_index}
data: {"sender": "bot", "content": "...", "content_parts": [...], ...}
retry: 15000
FieldDescription
eventAlways new_message
idComposite ID: {message_id}:{event_index} — the event_index is a sequential integer starting at 0
dataA complete ChatMessage JSON object
retryReconnection interval hint in milliseconds

Incremental Content Delivery

Each SSE event carries the complete message so far, not a delta. As the agent generates tokens:

  1. Early events have partial content and no evidences.
  2. Intermediate events may contain content_parts with type: "tool" showing tool execution state.
  3. The final event contains the fully formed response: complete content, evidences, and content_parts.

This design means reconnecting clients can resume from the latest event without needing to reassemble deltas.

Basic Streaming Client

import json
import os

import requests
import sseclient

TENANT = "zetaalpha"
url = f"https://api.zeta-alpha.com/v0/service/chat/stream?tenant={TENANT}"

response = requests.post(
url,
headers={
"accept": "text/event-stream",
"Content-Type": "application/json",
"X-Auth": os.getenv("ZETA_ALPHA_API_KEY"),
},
json={
"agent_identifier": "chat_with_dynamic_retrieval",
"conversation": [
{"sender": "user", "content": "What is RAG?"}
],
},
stream=True,
)
response.raise_for_status()

client = sseclient.SSEClient(response)
for event in client.events():
message = json.loads(event.data)

# Check for tool execution status
for part in message.get("content_parts") or []:
if part["type"] == "tool":
tool = part["tool"]
print(f"[{tool['status']}] {tool['name']}: {tool.get('display_text', '')}")

# Check for dynamically retrieved context
for part in message.get("content_parts") or []:
if part["type"] == "context" and part.get("context"):
print(f"Agent found context: {part['context']}")

# Final message
print(message["content"])
print(message.get("evidences"))

Reconnection

If a client disconnects mid-stream (network issue, timeout), it can reconnect and resume from where it left off using the GET /chat/stream/{message_id} endpoint:

GET /chat/stream/{message_id}?tenant={tenant}&start_index={last_event_index + 1}
ParameterDescription
message_idThe message_id from the SSE event id field (the part before the colon)
start_indexThe event index to resume from (0-based). Pass last_received_index + 1 to avoid duplicates.

The server buffers events for a limited time after the stream completes. If the buffer has been cleaned up, the reconnection will return an error.

Extracting the message_id

The SSE event id field has the format {message_id}:{event_index}. Parse the message_id from the first event:

event_id = event.id # e.g. "abc123:0"
message_id, event_index = event_id.rsplit(":", 1)

Reconnection Example

import json
import os

import requests
import sseclient

TENANT = "zetaalpha"
BASE_URL = "https://api.zeta-alpha.com/v0/service/chat/stream"
headers = {
"accept": "text/event-stream",
"Content-Type": "application/json",
"X-Auth": os.getenv("ZETA_ALPHA_API_KEY"),
}

# Resume from event index 5
message_id = "previously-received-message-id"
start_index = 5

response = requests.get(
f"{BASE_URL}/{message_id}?tenant={TENANT}&start_index={start_index}",
headers=headers,
stream=True,
)
response.raise_for_status()

client = sseclient.SSEClient(response)
for event in client.events():
message = json.loads(event.data)
print(message["content"])

Cancellation

To cancel a running stream before the agent finishes:

DELETE /chat/stream/{message_id}?tenant={tenant}

This stops the agent's generation and cleans up server-side resources. The endpoint returns 204 No Content on success.

import os
import requests

TENANT = "zetaalpha"
message_id = "the-message-id-from-stream"

requests.delete(
f"https://api.zeta-alpha.com/v0/service/chat/stream/{message_id}?tenant={TENANT}",
headers={"X-Auth": os.getenv("ZETA_ALPHA_API_KEY")},
)

Inactivity Timeout

Streams are automatically cancelled if no client is consuming events for a configured period (default: 30 seconds). This prevents resource leaks from abandoned connections. Reconnecting resets the inactivity timer.

Error Handling

If the agent encounters an error during streaming, an error event is sent:

event: error
data: Internal streaming error

Clients should handle this event by closing the connection and optionally retrying the request.