Streaming
The Chat API streaming endpoint (POST /chat/stream) uses Server-Sent Events (SSE) to deliver the agent's response incrementally.
SSE Event Format
Each event has the following structure:
event: new_message
id: {message_id}:{event_index}
data: {"sender": "bot", "content": "...", "content_parts": [...], ...}
retry: 15000
| Field | Description |
|---|---|
event | Always new_message |
id | Composite ID: {message_id}:{event_index} — the event_index is a sequential integer starting at 0 |
data | A complete ChatMessage JSON object |
retry | Reconnection interval hint in milliseconds |
Incremental Content Delivery
Each SSE event carries the complete message so far, not a delta. As the agent generates tokens:
- Early events have partial
contentand noevidences. - Intermediate events may contain
content_partswithtype: "tool"showing tool execution state. - The final event contains the fully formed response: complete
content,evidences, andcontent_parts.
This design means reconnecting clients can resume from the latest event without needing to reassemble deltas.
Basic Streaming Client
import json
import os
import requests
import sseclient
TENANT = "zetaalpha"
url = f"https://api.zeta-alpha.com/v0/service/chat/stream?tenant={TENANT}"
response = requests.post(
url,
headers={
"accept": "text/event-stream",
"Content-Type": "application/json",
"X-Auth": os.getenv("ZETA_ALPHA_API_KEY"),
},
json={
"agent_identifier": "chat_with_dynamic_retrieval",
"conversation": [
{"sender": "user", "content": "What is RAG?"}
],
},
stream=True,
)
response.raise_for_status()
client = sseclient.SSEClient(response)
for event in client.events():
message = json.loads(event.data)
# Check for tool execution status
for part in message.get("content_parts") or []:
if part["type"] == "tool":
tool = part["tool"]
print(f"[{tool['status']}] {tool['name']}: {tool.get('display_text', '')}")
# Check for dynamically retrieved context
for part in message.get("content_parts") or []:
if part["type"] == "context" and part.get("context"):
print(f"Agent found context: {part['context']}")
# Final message
print(message["content"])
print(message.get("evidences"))
Reconnection
If a client disconnects mid-stream (network issue, timeout), it can reconnect and resume from where it left off using the GET /chat/stream/{message_id} endpoint:
GET /chat/stream/{message_id}?tenant={tenant}&start_index={last_event_index + 1}
| Parameter | Description |
|---|---|
message_id | The message_id from the SSE event id field (the part before the colon) |
start_index | The event index to resume from (0-based). Pass last_received_index + 1 to avoid duplicates. |
The server buffers events for a limited time after the stream completes. If the buffer has been cleaned up, the reconnection will return an error.
Extracting the message_id
The SSE event id field has the format {message_id}:{event_index}. Parse the message_id from the first event:
event_id = event.id # e.g. "abc123:0"
message_id, event_index = event_id.rsplit(":", 1)
Reconnection Example
import json
import os
import requests
import sseclient
TENANT = "zetaalpha"
BASE_URL = "https://api.zeta-alpha.com/v0/service/chat/stream"
headers = {
"accept": "text/event-stream",
"Content-Type": "application/json",
"X-Auth": os.getenv("ZETA_ALPHA_API_KEY"),
}
# Resume from event index 5
message_id = "previously-received-message-id"
start_index = 5
response = requests.get(
f"{BASE_URL}/{message_id}?tenant={TENANT}&start_index={start_index}",
headers=headers,
stream=True,
)
response.raise_for_status()
client = sseclient.SSEClient(response)
for event in client.events():
message = json.loads(event.data)
print(message["content"])
Cancellation
To cancel a running stream before the agent finishes:
DELETE /chat/stream/{message_id}?tenant={tenant}
This stops the agent's generation and cleans up server-side resources. The endpoint returns 204 No Content on success.
import os
import requests
TENANT = "zetaalpha"
message_id = "the-message-id-from-stream"
requests.delete(
f"https://api.zeta-alpha.com/v0/service/chat/stream/{message_id}?tenant={TENANT}",
headers={"X-Auth": os.getenv("ZETA_ALPHA_API_KEY")},
)
Inactivity Timeout
Streams are automatically cancelled if no client is consuming events for a configured period (default: 30 seconds). This prevents resource leaks from abandoned connections. Reconnecting resets the inactivity timer.
Error Handling
If the agent encounters an error during streaming, an error event is sent:
event: error
data: Internal streaming error
Clients should handle this event by closing the connection and optionally retrying the request.