Getting Started¶
Minimal Example¶
from langgraph_events import Event, EventGraph, on
class MessageReceived(Event):
text: str
class MessageClassified(Event):
label: str
class ReplyProduced(Event):
text: str
@on(MessageReceived)
def classify(event: MessageReceived) -> MessageClassified:
if "help" in event.text.lower():
return MessageClassified(label="support")
return MessageClassified(label="general")
@on(MessageClassified)
def respond(event: MessageClassified) -> ReplyProduced:
if event.label == "support":
return ReplyProduced(text="Routing you to support...")
return ReplyProduced(text="Thanks for your message!")
graph = EventGraph([classify, respond])
log = graph.invoke(MessageReceived(text="I need help with my order"))
print(log.latest(ReplyProduced))
# ReplyProduced(text='Routing you to support...')
How It Works¶
EventGraph compiles your handlers into a LangGraph StateGraph with a hub-and-spoke reactive loop:
seed event
|
v
[seed] --> [dispatch] --> handler_a --+
^ handler_b --+
| |
[router] <----------------+
|
v
[dispatch] --> handler_c --+
^ |
| |
[router] <-----------------+
|
v
[dispatch] --> END (no pending events)
- A seed event enters the graph.
- The router collects new events, then dispatch matches each to subscribed handlers via
isinstance. Matched handlers run and emit new events. - The loop repeats until no handler matches or a
Haltedevent appears.
Useful Operations¶
# Synchronous
log = graph.invoke(MessageReceived(text="hello"))
# Multiple seed events
log = graph.invoke([
SystemPromptSet.from_str("You are helpful"),
UserMessageReceived(message=HumanMessage(content="Hi")),
])
# Async
log = await graph.ainvoke(MessageReceived(text="hello"))
# Stream produced events
for event in graph.stream_events(MessageReceived(text="hello")):
print(event)
# Stream with reducer snapshots
for frame in graph.stream_events(MessageReceived(text="hello"), include_reducers=True):
print(frame.event, frame.reducers["messages"])
See Concepts for event log queries, reducers, interruption/resume, and fan-out.