Skip to content

Real-time streaming

SSE chat streams, event types, header contract, and client parsing.

Intended audience: Stakeholders, Business analysts, Solution architects, Developers, Testers

Learning outcomes by role

Stakeholders

  • Understand streaming as the default UX for long-running model output.

Business analysts

  • Document client reconnection and error display for streaming failures.

Solution architects

  • Plan proxies, timeouts, and buffering for SSE or chunked responses.

Developers

  • Implement consumers for Cadence streaming contracts and event framing.

Testers

  • Stress streaming endpoints and verify partial output and error events.

Streaming means the server keeps the HTTP response open and sends many small chunks instead of one JSON body. Cadence uses Server-Sent Events (SSE) for chat — each chunk may be message text, a named lifecycle event (agent_start, tool_start, …), or an error envelope.

Each event is a pair of event: and data: lines followed by a blank line:

event: agent_start
data: {"agent": "supervisor", "session_id": "..."}
event: message
data: {"content": "Hello", "session_id": "..."}
event: tool_start
data: {"tool": "search", "input": "..."}
data: {"event":"error","data":{"error":"An internal error occurred"}}
Event typeMeaning
messageToken text fragment — append to the visible reply
agent_startAn agent phase began
tool_start / tool_endA tool invocation started or completed
errorServer error mid-stream — show a failure state rather than hanging

If stream is false, the API runs the graph to completion and returns one JSON object with session_id, response, agent_hops, and current_agent.

The chat completion endpoint requires X-ORG-ID and one of X-INSTANCE-ID or X-CENTRAL-ID. Ambiguous or missing headers return 400 before any model tokens are consumed.

cadence/api/chat.py
@router.post("/completion")
async def chat_completion(body: ChatCompletionRequest, request: Request):
org_id = _require_header(request, "X-ORG-ID")
instance_id = request.headers.get("X-INSTANCE-ID")
central_id = request.headers.get("X-CENTRAL-ID")
if not instance_id and not central_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="X-INSTANCE-ID or X-CENTRAL-ID header required",
)

When streaming is enabled, the handler returns a StreamingResponse whose async generator yields to_sse() chunks. Errors inside the generator emit a JSON error payload on the wire.

cadence/api/chat.py
async def event_generator():
try:
async for stream_event in orchestrator_service.process_chat_stream(
org_id=org_id,
instance_id=instance_id,
user_id=user_id,
message=body.message,
conversation_id=body.conversation_id,
resource_id=body.resource_id,
):
yield stream_event.to_sse()
yield "\n\n"
except Exception as e:
logger.error("Stream error: %s", e, exc_info=True)
yield 'data: {"event":"error","data":{"error":"An internal error occurred"}}\n\n'
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
  • text/event-stream responses disable caching and set X-Accel-Buffering: no for reverse proxies.
  • Central point chat resolves the backing instance and enforces visibility rules before streaming begins — private central points still require cadence:chat:use.
  • Non-stream responses return a single JSON object with session_id, final text, and metadata fields.
SymptomCauseFix
400 missing org or instanceX-ORG-ID or instance/central header absentAdd the required headers
503 not loadedOrchestrator instance not in poolTrigger load; see pool docs
403 on chatMissing cadence:chat:use for org, or wrong org header vs instanceCheck role and org alignment
Stream stops mid-wayServer error; the generator emits an error eventInspect logs; the error SSE event carries the reason