Trusted by companies shaping the future of agents – including Klarna, Replit, Elastic, and more – LangGraph is a low-level orchestration framework for building, managing, and deploying long-running, stateful agents.
Install LangGraph:
pip install -U langgraph
Create a simple workflow:
from langgraph.graph import START, StateGraph
from typing_extensions import TypedDict
class State(TypedDict):
text: str
def node_a(state: State) -> dict:
return {"text": state["text"] + "a"}
def node_b(state: State) -> dict:
return {"text": state["text"] + "b"}
graph = StateGraph(State)
graph.add_node("node_a", node_a)
graph.add_node("node_b", node_b)
graph.add_edge(START, "node_a")
graph.add_edge("node_a", "node_b")
print(graph.compile().invoke({"text": ""}))
# {'text': 'ab'}
Get started with the LangGraph Quickstart.
To quickly build agents with LangChain's create_agent (built on LangGraph), see the LangChain Agents documentation.
LangGraph provides low-level supporting infrastructure for any long-running, stateful workflow or agent. LangGraph does not abstract prompts or architecture, and provides the following central benefits:
While LangGraph can be used standalone, it also integrates seamlessly with any LangChain product, giving developers a full suite of tools for building agents. To improve your LLM application development, pair LangGraph with:
[!NOTE] Looking for the JS version of LangGraph? See the JS repo and the JS docs.
LangGraph is inspired by Pregel and Apache Beam. The public interface draws inspiration from NetworkX. LangGraph is built by LangChain Inc, the creators of LangChain, but can be used without LangChain.
Read-only execution info/metadata for the execution of current thread/run/node.
Convenience class that bundles run-scoped context and other runtime utilities.
Payload for a task start event.
Payload for a task result event.
A task entry within a CheckpointPayload.
Payload for a checkpoint event.
Stream part emitted for stream_mode="values".
Stream part emitted for stream_mode="updates".
Stream part emitted for stream_mode="messages".
Stream part emitted for stream_mode="custom".
Stream part emitted for stream_mode="checkpoints".
Stream part emitted for stream_mode="tasks".
Stream part emitted for stream_mode="debug".
Typed container returned by invoke() / ainvoke() with version="v2".
Configuration for retrying nodes.
Configuration for caching nodes.
Information about an interrupt that occurred in a node.
A Pregel task.
Cache key for a task.
Snapshot of the state of the graph at the beginning of a step.
A message or packet to send to a specific node in the graph.
One or more commands to update the graph's state and send messages to nodes.
Bypass a reducer and write the wrapped value directly to a BinaryOperatorAggregate channel.
A LangGraph specific deprecation warning.
A specific LangGraphDeprecationWarning subclass defining functionality deprecated since LangGraph v0.5.0
A specific LangGraphDeprecationWarning subclass defining functionality deprecated since LangGraph v1.0.0
A specific LangGraphDeprecationWarning subclass defining functionality deprecated since LangGraph v1.1.0
Raised when the graph has exhausted the maximum number of steps.
Raised when attempting to update a channel with an invalid set of updates.
Raised when a subgraph is interrupted, suppressed by the root graph.
Raised when graph receives an empty input.
Raised when the executor is unable to find a task (for distributed mode).
Define a LangGraph workflow using the entrypoint decorator.
A primitive that can be returned from an entrypoint.
A message type for UI updates in LangGraph.
A message type for removing UI components in LangGraph.
A graph whose nodes communicate by reading and writing to a shared state.
Stores the value received in the step immediately preceding, clears after.
A configurable PubSub Topic.
Stores the last value received, never checkpointed.
Stores the last value received, can receive at most one value per step.
Stores the last value received, but only made available after finish().
Stores the result of applying a binary operator to the current value and each new value.
Base class for all channels.
A channel that waits until all named values are received before making the value available.
A channel that waits until all named values are received before making the value ready to be made available. It is only made available after finish() is called.
Stores the last value received, assumes that if multiple values are
Tracks which subgraphs have already loaded their pre-replay checkpoint.
Protocol to represent types that behave like TypedDicts
Protocol to represent types that behave like TypedDicts
Protocol to represent types that behave like dataclasses.
TypedDict to use for extra keyword arguments, enabling type checking warnings for deprecated arguments.
Async unbounded FIFO queue with a wait() method.
Semaphore subclass with a wait() method.
Unbounded FIFO queue with a wait() method.
A string enum.
A much simpler version of RunnableLambda that requires sync and async functions.
Sequence of Runnable, where the output of each is the input of the next.
Pregel manages the runtime behavior for LangGraph applications.
Responsible for executing a set of Pregel tasks concurrently, committing
Exception raised when an error occurs in the remote graph.
The RemoteGraph class is a client implementation for calling remote
Protocol for objects containing writes to be applied to checkpoint.
Simplest implementation of WritesProtocol, for usage with writes that
A callback handler that implements stream_mode=messages.
Get the nonlocal variables accessed of a function.
Get nonlocal variables accessed.
Implements the logic for reading state from CONFIG_KEY_READ.
A node in a Pregel graph. This won't be invoked as a runnable by the graph
A context manager that runs sync tasks in the background.
A context manager that runs async tasks in the background.
Implements the logic for sending writes to CONFIG_KEY_SEND.
Raised by a node to interrupt execution.
A StateGraph where every node receives a list of messages as input and returns one or more messages as output.
Get the runtime for the current graph run.
Interrupt the graph with a resumable exception from within a node.
Access LangGraph store from inside a graph node or entrypoint task at runtime.
Access LangGraph StreamWriter from inside a graph node or entrypoint task at runtime.
Define a LangGraph task using the task decorator.
Push a new UI message to update the UI state.
Delete a UI message by ID from the UI state.
Merge two lists of UI messages, supporting removing UI messages.
Merges two lists of messages, updating existing messages by ID.
Write a message manually to the messages / messages-tuple stream mode.
Remove task IDs from checkpoint namespace.
Merge multiple configs into one.
Patch a config with new values.
Get a callback manager for a config.
Get an async callback manager for a config.
Return a config with all keys, merging any provided configs.
Submit a coroutine object to a given event loop.
Determine the default value for a field in a state schema.
Attempt to extract default values and descriptions from provided type, used for config schema.
Get Pydantic state update as a list of (key, value) tuples.
Return cached annotated keys for a Python class.
Get the field names of a Pydantic model.
Create a pydantic model with the given field definitions.
Check if a given "complex" type is supported by pydantic.
Set the child Runnable config + tracing context.
Check if a function is async.
Check if a function is an async generator.
Coerce a runnable-like object into a Runnable.
Default cache key function that uses the arguments and keyword arguments to generate a hashable key.
Get the graph for this Pregel instance.
Add an edge to the graph.
Run a task with retries.
Run a task asynchronously with retries.
Map input chunk to a sequence of pending writes in the form (channel, value).
Map input chunk to a sequence of pending writes in the form (channel, value).
Map pending writes (a sequence of tuples (channel, value)) to output chunk.
Map pending writes (a sequence of tuples (channel, value)) to output chunk.
Return the module and name of an object.
Check if the graph should be interrupted based on current state.
Function injected under CONFIG_KEY_READ in task config, to read current state.
Default channel versioning function, increments the current int version.
Apply writes from a set of tasks (usually the tasks from a Pregel step)
Prepare the set of tasks that will make up the next Pregel step.
Prepares a single task for the next Pregel step, given a task path, which
Prepare a push task with an attached caller. Used for the functional API.
Get the null version for the checkpoint, if available.
Generate a string representation of the task path.
Pop any values belonging to UntrackedValue channels in Send.arg for safe checkpointing.
Create a checkpoint for the given channels.
Get channels from a checkpoint.
Get subset of current_versions that are newer than previous_versions.
Get the nonlocal variables accessed by a function.
Check if the given string matches the format of xxh3_128_hexdigest.
A coroutine that waits for a semaphore before running another coroutine.
A function that yields control to other threads before running another function.
Produce "task" events for stream_mode=debug.
Return True if the payload already wraps multiple writes from the same channel.
Folds task writes into a result dict and aggregates multiple writes to the same channel.
Produce "task_result" events for stream_mode=debug.
Remove pregel-specific keys from the config.
Produce "checkpoint" events for stream_mode=debug.
Apply writes / subgraph states to tasks to be returned in a StateSnapshot.
Get colored text.
Get bolded text.