Streaming¶
Real-time event streaming, LLM token deltas, and custom telemetry from handlers. Use streaming when you need live UI updates instead of waiting for the full run to complete.
All invoke/stream methods have async counterparts: ainvoke(), astream_events(), aresume(), astream_resume().
# Async stream with real-time LLM token deltas and passthrough custom frames
from langgraph_events import (
CustomEventFrame,
LLMStreamEnd,
LLMToken,
StateSnapshotFrame,
emit_state_snapshot,
emit_custom,
)
@on(QueryReceived)
def step(event: QueryReceived) -> ReplyProduced:
emit_state_snapshot({"messages": [], "step": "draft"})
emit_custom("tool.progress", {"pct": 50})
return ReplyProduced(...)
async for item in graph.astream_events(
QueryReceived(...),
include_llm_tokens=True,
include_custom_events=True,
):
if isinstance(item, LLMToken):
print(item.content, end="")
elif isinstance(item, LLMStreamEnd):
print("\n[done]", item.message_id)
elif isinstance(item, StateSnapshotFrame):
print("snapshot:", item.data)
elif isinstance(item, CustomEventFrame):
print("custom:", item.name, item.data)
else:
print(item)
Stream Options¶
| Flag | Enables | Frame types yielded |
|---|---|---|
include_reducers=True |
Reducer snapshots alongside events | StreamFrame(event, reducers, changed_reducers) |
include_llm_tokens=True |
Real-time LLM token deltas | LLMToken(run_id, content), LLMStreamEnd(run_id, message_id) |
include_custom_events=True |
Custom event passthrough | CustomEventFrame(name, data), StateSnapshotFrame(data) |
Emission Helpers¶
Emit telemetry from inside handlers without importing LangGraph callback APIs directly:
| Function | Use case |
|---|---|
emit_custom(name, data) |
Arbitrary stream-only telemetry (sync) |
aemit_custom(name, data) |
Async variant |
emit_state_snapshot(data) |
Typed state snapshot for UI (sync) |
aemit_state_snapshot(data) |
Async variant |
These surface in astream_events(..., include_custom_events=True) as CustomEventFrame or StateSnapshotFrame.
Reducer Deltas¶
When include_reducers=True, each StreamFrame includes changed_reducers: frozenset[str] | None — the set of reducer names that the event actually updated. Consumers can use this to skip re-emitting unchanged state. None means delta metadata is not available.
For streaming to AG-UI frontends, see the AG-UI Adapter.