Real-time streaming
SSE chat streams, event types, header contract, and client parsing.
Intended audience: Stakeholders, Business analysts, Solution architects, Developers, Testers
Learning outcomes by role
Stakeholders
- Understand streaming as the default UX for long-running model output.
Business analysts
- Document client reconnection and error display for streaming failures.
Solution architects
- Plan proxies, timeouts, and buffering for SSE or chunked responses.
Developers
- Implement consumers for Cadence streaming contracts and event framing.
Testers
- Stress streaming endpoints and verify partial output and error events.
Streaming means the server keeps the HTTP response open and sends many small chunks instead
of one JSON body. Cadence uses Server-Sent Events (SSE) for chat — each chunk may be message
text, a named lifecycle event (agent_start, tool_start, …), or an error envelope.
The SSE event model
Section titled “The SSE event model”Each event is a pair of event: and data: lines followed by a blank line:
event: agent_startdata: {"agent": "supervisor", "session_id": "..."}
event: messagedata: {"content": "Hello", "session_id": "..."}
event: tool_startdata: {"tool": "search", "input": "..."}
data: {"event":"error","data":{"error":"An internal error occurred"}}| Event type | Meaning |
|---|---|
message | Token text fragment — append to the visible reply |
agent_start | An agent phase began |
tool_start / tool_end | A tool invocation started or completed |
error | Server error mid-stream — show a failure state rather than hanging |
If stream is false, the API runs the graph to completion and returns one JSON object with
session_id, response, agent_hops, and current_agent.
Request header contract
Section titled “Request header contract”The chat completion endpoint requires X-ORG-ID and one of X-INSTANCE-ID or
X-CENTRAL-ID. Ambiguous or missing headers return 400 before any model tokens are consumed.
@router.post("/completion")async def chat_completion(body: ChatCompletionRequest, request: Request): org_id = _require_header(request, "X-ORG-ID") instance_id = request.headers.get("X-INSTANCE-ID") central_id = request.headers.get("X-CENTRAL-ID") if not instance_id and not central_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="X-INSTANCE-ID or X-CENTRAL-ID header required", )Receiving a stream
Section titled “Receiving a stream”When streaming is enabled, the handler returns a StreamingResponse whose async generator
yields to_sse() chunks. Errors inside the generator emit a JSON error payload on the wire.
async def event_generator():try:async for stream_event in orchestrator_service.process_chat_stream(org_id=org_id,instance_id=instance_id,user_id=user_id,message=body.message,conversation_id=body.conversation_id,resource_id=body.resource_id,):yield stream_event.to_sse()yield "\n\n"except Exception as e:logger.error("Stream error: %s", e, exc_info=True)yield 'data: {"event":"error","data":{"error":"An internal error occurred"}}\n\n'
return StreamingResponse(event_generator(),media_type="text/event-stream",headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},)The UI decoder walks event: and data: prefixes, JSON-parses payloads, routes message
content to the live assistant string, and forwards other event types to a structured event list.
Keep a buffer for incomplete lines — do not parse arbitrary byte chunks directly.
async function processSSEStream(reader: ReadableStreamDefaultReader<Uint8Array>,callbacks: StreamCallbacks): Promise<void> { const decoder = new TextDecoder() let buffer = '' let currentEventType: string | null = null
while (true) { const {done, value} = await reader.read() if (done) break
buffer += decoder.decode(value, {stream: true}) const lines = buffer.split('\n') buffer = lines.pop() ?? ''
for (const line of lines) { if (line === '') { currentEventType = null} else if (line.startsWith(SSE_EVENT_PREFIX)) { currentEventType = line.slice(SSE_EVENT_PREFIX.length).trim()} else if (line.startsWith(SSE_DATA_PREFIX)) { const rawData = line.slice(SSE_DATA_PREFIX.length).trim() try { const parsed = JSON.parse(rawData) as Record<string, unknown> if (isMessageContentEvent(currentEventType, parsed)) { callbacks.onContent((parsed.content as string) ?? '')} else if (currentEventType && currentEventType !== MESSAGE_EVENT_TYPE) { callbacks.onEvent({type: currentEventType, data: parsed})} if (parsed.session_id) { callbacks.onSessionId(parsed.session_id as string)}} catch { /* ignore parse errors */}}}}}Guarantees
Section titled “Guarantees”text/event-streamresponses disable caching and setX-Accel-Buffering: nofor reverse proxies.- Central point chat resolves the backing instance and enforces visibility rules before streaming
begins — private central points still require
cadence:chat:use. - Non-stream responses return a single JSON object with
session_id, final text, and metadata fields.
Troubleshooting
Section titled “Troubleshooting”| Symptom | Cause | Fix |
|---|---|---|
400 missing org or instance | X-ORG-ID or instance/central header absent | Add the required headers |
503 not loaded | Orchestrator instance not in pool | Trigger load; see pool docs |
403 on chat | Missing cadence:chat:use for org, or wrong org header vs instance | Check role and org alignment |
| Stream stops mid-way | Server error; the generator emits an error event | Inspect logs; the error SSE event carries the reason |