Skip to content

Graph Definitions

Graphs are the core abstraction of LangGraph. Each StateGraph implementation is used to create graph workflows. Once compiled, you can run the CompiledGraph to run the application.

Graph

Source code in langgraph/graph/graph.py
class Graph:
    def __init__(self) -> None:
        self.nodes: dict[str, Runnable] = {}
        self.edges = set[tuple[str, str]]()
        self.branches: defaultdict[str, dict[str, Branch]] = defaultdict(dict)
        self.support_multiple_edges = False
        self.compiled = False

    @property
    def _all_edges(self) -> set[tuple[str, str]]:
        return self.edges

    def add_node(self, key: str, action: RunnableLike) -> None:
        if self.compiled:
            logger.warning(
                "Adding a node to a graph that has already been compiled. This will "
                "not be reflected in the compiled graph."
            )
        if key in self.nodes:
            raise ValueError(f"Node `{key}` already present.")
        if key == END or key == START:
            raise ValueError(f"Node `{key}` is reserved.")

        self.nodes[key] = coerce_to_runnable(action, name=key, trace=False)

    def add_edge(self, start_key: str, end_key: str) -> None:
        if self.compiled:
            logger.warning(
                "Adding an edge to a graph that has already been compiled. This will "
                "not be reflected in the compiled graph."
            )
        if start_key == END:
            raise ValueError("END cannot be a start node")
        if end_key == START:
            raise ValueError("START cannot be an end node")
        if not self.support_multiple_edges and start_key in set(
            start for start, _ in self.edges
        ):
            raise ValueError(
                f"Already found path for node '{start_key}'.\n"
                "For multiple edges, use StateGraph with an annotated state key."
            )

        self.edges.add((start_key, end_key))

    def add_conditional_edges(
        self,
        source: str,
        path: Union[
            Callable[..., Union[str, list[str]]],
            Callable[..., Awaitable[Union[str, list[str]]]],
            Runnable[Any, Union[str, list[str]]],
        ],
        path_map: Optional[Union[dict[str, str], list[str]]] = None,
        then: Optional[str] = None,
    ) -> None:
        """Add a conditional edge from the starting node to any number of destination nodes.

        Args:
            source (str): The starting node. This conditional edge will run when
                exiting this node.
            path (Union[Callable, Runnable]): The callable that determines the next
                node or nodes. If not specifying `path_map` it should return one or
                more nodes. If it returns END, the graph will stop execution.
            path_map (Optional[dict[str, str]]): Optional mapping of paths to node
                names. If omitted the paths returned by `path` should be node names.
            then (Optional[str]): The name of a node to execute after the nodes
                selected by `path`.

        Returns:
            None
        """  # noqa: E501
        if self.compiled:
            logger.warning(
                "Adding an edge to a graph that has already been compiled. This will "
                "not be reflected in the compiled graph."
            )
        # coerce path_map to a dictionary
        if isinstance(path_map, dict):
            pass
        elif isinstance(path_map, list):
            path_map = {name: name for name in path_map}
        elif rtn_type := get_type_hints(path).get("return"):
            if get_origin(rtn_type) is Literal:
                path_map = {name: name for name in get_args(rtn_type)}
        # find a name for the condition
        path = coerce_to_runnable(path, name=None, trace=True)
        name = path.name or "condition"
        # validate the condition
        if name in self.branches[source]:
            raise ValueError(
                f"Branch with name `{path.name}` already exists for node " f"`{source}`"
            )
        # save it
        self.branches[source][name] = Branch(path, path_map, then)

    def set_entry_point(self, key: str) -> None:
        """Specifies the first node to be called in the graph.

        Parameters:
            key (str): The key of the node to set as the entry point.

        Returns:
            None
        """
        return self.add_edge(START, key)

    def set_conditional_entry_point(
        self,
        path: Union[
            Callable[..., str], Callable[..., Awaitable[str]], Runnable[Any, str]
        ],
        path_map: Optional[Dict[str, str]] = None,
        then: Optional[str] = None,
    ) -> None:
        """Sets a conditional entry point in the graph.

        Args:
            path (Union[Callable, Runnable]): The callable that determines the next
                node or nodes. If not specifying `path_map` it should return one or
                more nodes. If it returns END, the graph will stop execution.
            path_map (Optional[dict[str, str]]): Optional mapping of paths to node
                names. If omitted the paths returned by `path` should be node names.
            then (Optional[str]): The name of a node to execute after the nodes
                selected by `path`.

        Returns:
            None
        """
        return self.add_conditional_edges(START, path, path_map, then)

    def set_finish_point(self, key: str) -> None:
        """Marks a node as a finish point of the graph.

        If the graph reaches this node, it will cease execution.

        Parameters:
            key (str): The key of the node to set as the finish point.

        Returns:
            None
        """
        return self.add_edge(key, END)

    def validate(self, interrupt: Optional[Sequence[str]] = None) -> None:
        # assemble sources
        all_sources = {src for src, _ in self._all_edges}
        for start, branches in self.branches.items():
            for cond, branch in branches.items():
                all_sources.add(start)
                if branch.then is not None:
                    if branch.ends is not None:
                        for end in branch.ends.values():
                            all_sources.add(end)
                    else:
                        for node in self.nodes:
                            if node != start and node != branch.then:
                                all_sources.add(node)
        # validate sources
        for node in self.nodes:
            if node not in all_sources:
                raise ValueError(f"Node '{node}' is a dead-end")
        for source in all_sources:
            if node not in self.nodes and node != START:
                raise ValueError(f"Found edge starting at unknown node '{source}'")

        # assemble targets
        all_targets = {end for _, end in self._all_edges}
        for start, branches in self.branches.items():
            for cond, branch in branches.items():
                if branch.then is not None:
                    all_targets.add(branch.then)
                if branch.ends is not None:
                    for end in branch.ends.values():
                        if end not in self.nodes and end != END:
                            raise ValueError(
                                f"At '{start}' node, '{cond}' branch found unknown target '{end}'"
                            )
                        all_targets.add(end)
                else:
                    all_targets.add(END)
                    for node in self.nodes:
                        if node != start and node != branch.then:
                            all_targets.add(node)
        # validate targets
        for node in self.nodes:
            if node not in all_targets:
                raise ValueError(f"Node `{node}` is not reachable")
        for target in all_targets:
            if target not in self.nodes and target != END:
                raise ValueError(f"Found edge ending at unknown node `{target}`")
        # validate interrupts
        if interrupt:
            for node in interrupt:
                if node not in self.nodes:
                    raise ValueError(f"Interrupt node `{node}` not found")

        self.compiled = True

    def compile(
        self,
        checkpointer: Optional[BaseCheckpointSaver] = None,
        interrupt_before: Optional[Union[All, Sequence[str]]] = None,
        interrupt_after: Optional[Union[All, Sequence[str]]] = None,
        debug: bool = False,
    ) -> "CompiledGraph":
        # assign default values
        interrupt_before = interrupt_before or []
        interrupt_after = interrupt_after or []

        # validate the graph
        self.validate(
            interrupt=(interrupt_before if interrupt_before != "*" else [])
            + interrupt_after
            if interrupt_after != "*"
            else []
        )

        # create empty compiled graph
        compiled = CompiledGraph(
            builder=self,
            nodes={},
            channels={START: EphemeralValue(Any), END: EphemeralValue(Any)},
            input_channels=START,
            output_channels=END,
            stream_mode="values",
            stream_channels=[],
            checkpointer=checkpointer,
            interrupt_before_nodes=interrupt_before,
            interrupt_after_nodes=interrupt_after,
            auto_validate=False,
            debug=debug,
        )

        # attach nodes, edges, and branches
        for key, node in self.nodes.items():
            compiled.attach_node(key, node)

        for start, end in self.edges:
            compiled.attach_edge(start, end)

        for start, branches in self.branches.items():
            for name, branch in branches.items():
                compiled.attach_branch(start, name, branch)

        # validate the compiled graph
        return compiled.validate()

add_conditional_edges(source, path, path_map=None, then=None)

Add a conditional edge from the starting node to any number of destination nodes.

Parameters:

  • source (str) –

    The starting node. This conditional edge will run when exiting this node.

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

  • path_map (Optional[dict[str, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def add_conditional_edges(
    self,
    source: str,
    path: Union[
        Callable[..., Union[str, list[str]]],
        Callable[..., Awaitable[Union[str, list[str]]]],
        Runnable[Any, Union[str, list[str]]],
    ],
    path_map: Optional[Union[dict[str, str], list[str]]] = None,
    then: Optional[str] = None,
) -> None:
    """Add a conditional edge from the starting node to any number of destination nodes.

    Args:
        source (str): The starting node. This conditional edge will run when
            exiting this node.
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[str, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None
    """  # noqa: E501
    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    # coerce path_map to a dictionary
    if isinstance(path_map, dict):
        pass
    elif isinstance(path_map, list):
        path_map = {name: name for name in path_map}
    elif rtn_type := get_type_hints(path).get("return"):
        if get_origin(rtn_type) is Literal:
            path_map = {name: name for name in get_args(rtn_type)}
    # find a name for the condition
    path = coerce_to_runnable(path, name=None, trace=True)
    name = path.name or "condition"
    # validate the condition
    if name in self.branches[source]:
        raise ValueError(
            f"Branch with name `{path.name}` already exists for node " f"`{source}`"
        )
    # save it
    self.branches[source][name] = Branch(path, path_map, then)

set_entry_point(key)

Specifies the first node to be called in the graph.

Parameters:

  • key (str) –

    The key of the node to set as the entry point.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def set_entry_point(self, key: str) -> None:
    """Specifies the first node to be called in the graph.

    Parameters:
        key (str): The key of the node to set as the entry point.

    Returns:
        None
    """
    return self.add_edge(START, key)

set_conditional_entry_point(path, path_map=None, then=None)

Sets a conditional entry point in the graph.

Parameters:

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

  • path_map (Optional[dict[str, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def set_conditional_entry_point(
    self,
    path: Union[
        Callable[..., str], Callable[..., Awaitable[str]], Runnable[Any, str]
    ],
    path_map: Optional[Dict[str, str]] = None,
    then: Optional[str] = None,
) -> None:
    """Sets a conditional entry point in the graph.

    Args:
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[str, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None
    """
    return self.add_conditional_edges(START, path, path_map, then)

set_finish_point(key)

Marks a node as a finish point of the graph.

If the graph reaches this node, it will cease execution.

Parameters:

  • key (str) –

    The key of the node to set as the finish point.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def set_finish_point(self, key: str) -> None:
    """Marks a node as a finish point of the graph.

    If the graph reaches this node, it will cease execution.

    Parameters:
        key (str): The key of the node to set as the finish point.

    Returns:
        None
    """
    return self.add_edge(key, END)

MessageGraph

Bases: StateGraph

A StateGraph where every node receives a list of messages as input and returns one or more messages as output.

MessageGraph is a subclass of StateGraph whose entire state is a single, append-only* list of messages. Each node in a MessageGraph takes a list of messages as input and returns zero or more messages as output. The add_messages function is used to merge the output messages from each node into the existing list of messages in the graph's state.

Examples:

from langgraph.graph.message import MessageGraph

builder = MessageGraph()
builder.add_node("chatbot", lambda state: [("assistant", "Hello!")])
builder.set_entry_point("chatbot")
builder.set_finish_point("chatbot")
builder.compile().invoke([("user", "Hi there.")])
# {'messages': [HumanMessage(content="Hi there.", id='b8b7d8f4-7f4d-4f4d-9c1d-f8b8d8f4d9c1'),
#              AIMessage(content="Hello!", id='f4d9c1d8-8d8f-4d9c-b8b7-d8f4f4d9c1d8')]}


from langchain_core.messages import AIMessage, HumanMessage, ToolMessage

from langgraph.graph.message import MessageGraph

builder = MessageGraph()
builder.add_node(
    "chatbot",
    lambda state: [
        AIMessage(
            content="Hello!",
            tool_calls=[{"name": "search", "id": "123", "args": {"query": "X"}}],
        )
    ],
)
builder.add_node(
    "search", lambda state: [ToolMessage(content="Searching...", tool_call_id="123")]
)
builder.set_entry_point("chatbot")
builder.add_edge("chatbot", "search")
builder.set_finish_point("search")
builder.compile().invoke([HumanMessage(content="Hi there. Can you search for X?")])
# {'messages': [HumanMessage(content="Hi there. Can you search for X?", id='b8b7d8f4-7f4d-4f4d-9c1d-f8b8d8f4d9c1'),
#              AIMessage(content="Hello!", id='f4d9c1d8-8d8f-4d9c-b8b7-d8f4f4d9c1d8'),
#              ToolMessage(content="Searching...", id='d8f4f4d9-c1d8-4f4d-b8b7-d8f4f4d9c1d8', tool_call_id="123")]}
Source code in langgraph/graph/message.py
class MessageGraph(StateGraph):
    """A StateGraph where every node receives a list of messages as input and returns one or more messages as output.

    MessageGraph is a subclass of StateGraph whose entire state is a single, append-only* list of messages.
    Each node in a MessageGraph takes a list of messages as input and returns zero or more
    messages as output. The `add_messages` function is used to merge the output messages from each node
    into the existing list of messages in the graph's state.

    Examples:

        from langgraph.graph.message import MessageGraph

        builder = MessageGraph()
        builder.add_node("chatbot", lambda state: [("assistant", "Hello!")])
        builder.set_entry_point("chatbot")
        builder.set_finish_point("chatbot")
        builder.compile().invoke([("user", "Hi there.")])
        # {'messages': [HumanMessage(content="Hi there.", id='b8b7d8f4-7f4d-4f4d-9c1d-f8b8d8f4d9c1'),
        #              AIMessage(content="Hello!", id='f4d9c1d8-8d8f-4d9c-b8b7-d8f4f4d9c1d8')]}


        from langchain_core.messages import AIMessage, HumanMessage, ToolMessage

        from langgraph.graph.message import MessageGraph

        builder = MessageGraph()
        builder.add_node(
            "chatbot",
            lambda state: [
                AIMessage(
                    content="Hello!",
                    tool_calls=[{"name": "search", "id": "123", "args": {"query": "X"}}],
                )
            ],
        )
        builder.add_node(
            "search", lambda state: [ToolMessage(content="Searching...", tool_call_id="123")]
        )
        builder.set_entry_point("chatbot")
        builder.add_edge("chatbot", "search")
        builder.set_finish_point("search")
        builder.compile().invoke([HumanMessage(content="Hi there. Can you search for X?")])
        # {'messages': [HumanMessage(content="Hi there. Can you search for X?", id='b8b7d8f4-7f4d-4f4d-9c1d-f8b8d8f4d9c1'),
        #              AIMessage(content="Hello!", id='f4d9c1d8-8d8f-4d9c-b8b7-d8f4f4d9c1d8'),
        #              ToolMessage(content="Searching...", id='d8f4f4d9-c1d8-4f4d-b8b7-d8f4f4d9c1d8', tool_call_id="123")]}
    """

    def __init__(self) -> None:
        super().__init__(Annotated[list[AnyMessage], add_messages])

add_node(key, action)

Adds a new node to the state graph.

Parameters:

  • key (str) –

    The key of the node.

  • action (RunnableLike) –

    The action associated with the node.

Raises:

  • ValueError

    If the key is already being used as a state key.

Returns:

  • None

    None

Source code in langgraph/graph/state.py
def add_node(self, key: str, action: RunnableLike) -> None:
    """Adds a new node to the state graph.

    Args:
        key (str): The key of the node.
        action (RunnableLike): The action associated with the node.

    Raises:
        ValueError: If the key is already being used as a state key.

    Returns:
        None
    """
    if key in self.channels:
        raise ValueError(f"'{key}' is already being used as a state key")
    return super().add_node(key, action)

add_edge(start_key, end_key)

Adds a directed edge from the start node to the end node.

If the graph transitions to the start_key node, it will always transition to the end_key node next.

Parameters:

  • start_key (Union[str, list[str]]) –

    The key(s) of the start node(s) of the edge.

  • end_key (str) –

    The key of the end node of the edge.

Raises:

  • ValueError

    If the start key is 'END' or if the start key or end key is not present in the graph.

Returns:

  • None

    None

Source code in langgraph/graph/state.py
def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> None:
    """Adds a directed edge from the start node to the end node.

    If the graph transitions to the start_key node, it will always transition to the end_key node next.

    Args:
        start_key (Union[str, list[str]]): The key(s) of the start node(s) of the edge.
        end_key (str): The key of the end node of the edge.

    Raises:
        ValueError: If the start key is 'END' or if the start key or end key is not present in the graph.

    Returns:
        None
    """
    if isinstance(start_key, str):
        return super().add_edge(start_key, end_key)

    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    for start in start_key:
        if start == END:
            raise ValueError("END cannot be a start node")
        if start not in self.nodes:
            raise ValueError(f"Need to add_node `{start}` first")
    if end_key == END:
        raise ValueError("END cannot be an end node")
    if end_key not in self.nodes:
        raise ValueError(f"Need to add_node `{end_key}` first")

    self.waiting_edges.add((tuple(start_key), end_key))

add_conditional_edges(source, path, path_map=None, then=None)

Add a conditional edge from the starting node to any number of destination nodes.

Parameters:

  • source (str) –

    The starting node. This conditional edge will run when exiting this node.

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

  • path_map (Optional[dict[str, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def add_conditional_edges(
    self,
    source: str,
    path: Union[
        Callable[..., Union[str, list[str]]],
        Callable[..., Awaitable[Union[str, list[str]]]],
        Runnable[Any, Union[str, list[str]]],
    ],
    path_map: Optional[Union[dict[str, str], list[str]]] = None,
    then: Optional[str] = None,
) -> None:
    """Add a conditional edge from the starting node to any number of destination nodes.

    Args:
        source (str): The starting node. This conditional edge will run when
            exiting this node.
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[str, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None
    """  # noqa: E501
    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    # coerce path_map to a dictionary
    if isinstance(path_map, dict):
        pass
    elif isinstance(path_map, list):
        path_map = {name: name for name in path_map}
    elif rtn_type := get_type_hints(path).get("return"):
        if get_origin(rtn_type) is Literal:
            path_map = {name: name for name in get_args(rtn_type)}
    # find a name for the condition
    path = coerce_to_runnable(path, name=None, trace=True)
    name = path.name or "condition"
    # validate the condition
    if name in self.branches[source]:
        raise ValueError(
            f"Branch with name `{path.name}` already exists for node " f"`{source}`"
        )
    # save it
    self.branches[source][name] = Branch(path, path_map, then)

set_entry_point(key)

Specifies the first node to be called in the graph.

Parameters:

  • key (str) –

    The key of the node to set as the entry point.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def set_entry_point(self, key: str) -> None:
    """Specifies the first node to be called in the graph.

    Parameters:
        key (str): The key of the node to set as the entry point.

    Returns:
        None
    """
    return self.add_edge(START, key)

set_conditional_entry_point(path, path_map=None, then=None)

Sets a conditional entry point in the graph.

Parameters:

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

  • path_map (Optional[dict[str, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def set_conditional_entry_point(
    self,
    path: Union[
        Callable[..., str], Callable[..., Awaitable[str]], Runnable[Any, str]
    ],
    path_map: Optional[Dict[str, str]] = None,
    then: Optional[str] = None,
) -> None:
    """Sets a conditional entry point in the graph.

    Args:
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[str, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None
    """
    return self.add_conditional_edges(START, path, path_map, then)

set_finish_point(key)

Marks a node as a finish point of the graph.

If the graph reaches this node, it will cease execution.

Parameters:

  • key (str) –

    The key of the node to set as the finish point.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def set_finish_point(self, key: str) -> None:
    """Marks a node as a finish point of the graph.

    If the graph reaches this node, it will cease execution.

    Parameters:
        key (str): The key of the node to set as the finish point.

    Returns:
        None
    """
    return self.add_edge(key, END)

compile(checkpointer=None, interrupt_before=None, interrupt_after=None, debug=False)

Compiles the state graph into a CompiledGraph object.

The compiled graph implements the Runnable interface and can be invoked, streamed, batched, and run asynchronously.

Parameters:

  • checkpointer (Optional[BaseCheckpointSaver], default: None ) –

    An optional checkpoint saver object. This serves as a fully versioned "memory" for the graph, allowing the graph to be paused and resumed, and replayed from any point.

  • interrupt_before (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt before.

  • interrupt_after (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt after.

  • debug (bool, default: False ) –

    A flag indicating whether to enable debug mode.

Returns:

  • CompiledGraph ( CompiledGraph ) –

    The compiled state graph.

Source code in langgraph/graph/state.py
def compile(
    self,
    checkpointer: Optional[BaseCheckpointSaver] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: bool = False,
) -> CompiledGraph:
    """Compiles the state graph into a `CompiledGraph` object.

    The compiled graph implements the `Runnable` interface and can be invoked,
    streamed, batched, and run asynchronously.

    Args:
        checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object.
            This serves as a fully versioned "memory" for the graph, allowing
            the graph to be paused and resumed, and replayed from any point.
        interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before.
        interrupt_after (Optional[Sequence[str]]): An optional list of node names to interrupt after.
        debug (bool): A flag indicating whether to enable debug mode.

    Returns:
        CompiledGraph: The compiled state graph.
    """
    # assign default values
    interrupt_before = interrupt_before or []
    interrupt_after = interrupt_after or []

    # validate the graph
    self.validate(
        interrupt=(interrupt_before if interrupt_before != "*" else [])
        + interrupt_after
        if interrupt_after != "*"
        else []
    )

    # prepare output channels
    state_keys = list(self.channels)
    output_channels = state_keys[0] if state_keys == ["__root__"] else state_keys

    compiled = CompiledStateGraph(
        builder=self,
        config_type=self.config_schema,
        nodes={},
        channels={**self.channels, START: EphemeralValue(self.schema)},
        input_channels=START,
        stream_mode="updates",
        output_channels=output_channels,
        stream_channels=output_channels,
        checkpointer=checkpointer,
        interrupt_before_nodes=interrupt_before,
        interrupt_after_nodes=interrupt_after,
        auto_validate=False,
        debug=debug,
    )

    compiled.attach_node(START, None)
    for key, node in self.nodes.items():
        compiled.attach_node(key, node)

    for start, end in self.edges:
        compiled.attach_edge(start, end)

    for starts, end in self.waiting_edges:
        compiled.attach_edge(starts, end)

    for start, branches in self.branches.items():
        for name, branch in branches.items():
            compiled.attach_branch(start, name, branch)

    return compiled.validate()

StateGraph

Bases: Graph

A graph whose nodes communicate by reading and writing to a shared state. The signature of each node is State -> Partial.

Each state key can optionally be annotated with a reducer function that will be used to aggregate the values of that key received from multiple nodes. The signature of a reducer function is (Value, Value) -> Value.

Source code in langgraph/graph/state.py
class StateGraph(Graph):
    """A graph whose nodes communicate by reading and writing to a shared state.
    The signature of each node is State -> Partial<State>.

    Each state key can optionally be annotated with a reducer function that
    will be used to aggregate the values of that key received from multiple nodes.
    The signature of a reducer function is (Value, Value) -> Value.
    """

    def __init__(
        self, state_schema: Type[Any], config_schema: Optional[Type[Any]] = None
    ) -> None:
        super().__init__()
        self.schema = state_schema
        self.config_schema = config_schema
        self.channels = _get_channels(state_schema)
        if any(isinstance(c, BinaryOperatorAggregate) for c in self.channels.values()):
            self.support_multiple_edges = True
        self.waiting_edges: set[tuple[tuple[str, ...], str]] = set()

    @property
    def _all_edges(self) -> set[tuple[str, str]]:
        return self.edges | {
            (start, end) for starts, end in self.waiting_edges for start in starts
        }

    def add_node(self, key: str, action: RunnableLike) -> None:
        """Adds a new node to the state graph.

        Args:
            key (str): The key of the node.
            action (RunnableLike): The action associated with the node.

        Raises:
            ValueError: If the key is already being used as a state key.

        Returns:
            None
        """
        if key in self.channels:
            raise ValueError(f"'{key}' is already being used as a state key")
        return super().add_node(key, action)

    def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> None:
        """Adds a directed edge from the start node to the end node.

        If the graph transitions to the start_key node, it will always transition to the end_key node next.

        Args:
            start_key (Union[str, list[str]]): The key(s) of the start node(s) of the edge.
            end_key (str): The key of the end node of the edge.

        Raises:
            ValueError: If the start key is 'END' or if the start key or end key is not present in the graph.

        Returns:
            None
        """
        if isinstance(start_key, str):
            return super().add_edge(start_key, end_key)

        if self.compiled:
            logger.warning(
                "Adding an edge to a graph that has already been compiled. This will "
                "not be reflected in the compiled graph."
            )
        for start in start_key:
            if start == END:
                raise ValueError("END cannot be a start node")
            if start not in self.nodes:
                raise ValueError(f"Need to add_node `{start}` first")
        if end_key == END:
            raise ValueError("END cannot be an end node")
        if end_key not in self.nodes:
            raise ValueError(f"Need to add_node `{end_key}` first")

        self.waiting_edges.add((tuple(start_key), end_key))

    def compile(
        self,
        checkpointer: Optional[BaseCheckpointSaver] = None,
        interrupt_before: Optional[Union[All, Sequence[str]]] = None,
        interrupt_after: Optional[Union[All, Sequence[str]]] = None,
        debug: bool = False,
    ) -> CompiledGraph:
        """Compiles the state graph into a `CompiledGraph` object.

        The compiled graph implements the `Runnable` interface and can be invoked,
        streamed, batched, and run asynchronously.

        Args:
            checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object.
                This serves as a fully versioned "memory" for the graph, allowing
                the graph to be paused and resumed, and replayed from any point.
            interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before.
            interrupt_after (Optional[Sequence[str]]): An optional list of node names to interrupt after.
            debug (bool): A flag indicating whether to enable debug mode.

        Returns:
            CompiledGraph: The compiled state graph.
        """
        # assign default values
        interrupt_before = interrupt_before or []
        interrupt_after = interrupt_after or []

        # validate the graph
        self.validate(
            interrupt=(interrupt_before if interrupt_before != "*" else [])
            + interrupt_after
            if interrupt_after != "*"
            else []
        )

        # prepare output channels
        state_keys = list(self.channels)
        output_channels = state_keys[0] if state_keys == ["__root__"] else state_keys

        compiled = CompiledStateGraph(
            builder=self,
            config_type=self.config_schema,
            nodes={},
            channels={**self.channels, START: EphemeralValue(self.schema)},
            input_channels=START,
            stream_mode="updates",
            output_channels=output_channels,
            stream_channels=output_channels,
            checkpointer=checkpointer,
            interrupt_before_nodes=interrupt_before,
            interrupt_after_nodes=interrupt_after,
            auto_validate=False,
            debug=debug,
        )

        compiled.attach_node(START, None)
        for key, node in self.nodes.items():
            compiled.attach_node(key, node)

        for start, end in self.edges:
            compiled.attach_edge(start, end)

        for starts, end in self.waiting_edges:
            compiled.attach_edge(starts, end)

        for start, branches in self.branches.items():
            for name, branch in branches.items():
                compiled.attach_branch(start, name, branch)

        return compiled.validate()

add_conditional_edges(source, path, path_map=None, then=None)

Add a conditional edge from the starting node to any number of destination nodes.

Parameters:

  • source (str) –

    The starting node. This conditional edge will run when exiting this node.

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

  • path_map (Optional[dict[str, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def add_conditional_edges(
    self,
    source: str,
    path: Union[
        Callable[..., Union[str, list[str]]],
        Callable[..., Awaitable[Union[str, list[str]]]],
        Runnable[Any, Union[str, list[str]]],
    ],
    path_map: Optional[Union[dict[str, str], list[str]]] = None,
    then: Optional[str] = None,
) -> None:
    """Add a conditional edge from the starting node to any number of destination nodes.

    Args:
        source (str): The starting node. This conditional edge will run when
            exiting this node.
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[str, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None
    """  # noqa: E501
    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    # coerce path_map to a dictionary
    if isinstance(path_map, dict):
        pass
    elif isinstance(path_map, list):
        path_map = {name: name for name in path_map}
    elif rtn_type := get_type_hints(path).get("return"):
        if get_origin(rtn_type) is Literal:
            path_map = {name: name for name in get_args(rtn_type)}
    # find a name for the condition
    path = coerce_to_runnable(path, name=None, trace=True)
    name = path.name or "condition"
    # validate the condition
    if name in self.branches[source]:
        raise ValueError(
            f"Branch with name `{path.name}` already exists for node " f"`{source}`"
        )
    # save it
    self.branches[source][name] = Branch(path, path_map, then)

set_entry_point(key)

Specifies the first node to be called in the graph.

Parameters:

  • key (str) –

    The key of the node to set as the entry point.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def set_entry_point(self, key: str) -> None:
    """Specifies the first node to be called in the graph.

    Parameters:
        key (str): The key of the node to set as the entry point.

    Returns:
        None
    """
    return self.add_edge(START, key)

set_conditional_entry_point(path, path_map=None, then=None)

Sets a conditional entry point in the graph.

Parameters:

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

  • path_map (Optional[dict[str, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def set_conditional_entry_point(
    self,
    path: Union[
        Callable[..., str], Callable[..., Awaitable[str]], Runnable[Any, str]
    ],
    path_map: Optional[Dict[str, str]] = None,
    then: Optional[str] = None,
) -> None:
    """Sets a conditional entry point in the graph.

    Args:
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[str, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None
    """
    return self.add_conditional_edges(START, path, path_map, then)

set_finish_point(key)

Marks a node as a finish point of the graph.

If the graph reaches this node, it will cease execution.

Parameters:

  • key (str) –

    The key of the node to set as the finish point.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def set_finish_point(self, key: str) -> None:
    """Marks a node as a finish point of the graph.

    If the graph reaches this node, it will cease execution.

    Parameters:
        key (str): The key of the node to set as the finish point.

    Returns:
        None
    """
    return self.add_edge(key, END)

add_node(key, action)

Adds a new node to the state graph.

Parameters:

  • key (str) –

    The key of the node.

  • action (RunnableLike) –

    The action associated with the node.

Raises:

  • ValueError

    If the key is already being used as a state key.

Returns:

  • None

    None

Source code in langgraph/graph/state.py
def add_node(self, key: str, action: RunnableLike) -> None:
    """Adds a new node to the state graph.

    Args:
        key (str): The key of the node.
        action (RunnableLike): The action associated with the node.

    Raises:
        ValueError: If the key is already being used as a state key.

    Returns:
        None
    """
    if key in self.channels:
        raise ValueError(f"'{key}' is already being used as a state key")
    return super().add_node(key, action)

add_edge(start_key, end_key)

Adds a directed edge from the start node to the end node.

If the graph transitions to the start_key node, it will always transition to the end_key node next.

Parameters:

  • start_key (Union[str, list[str]]) –

    The key(s) of the start node(s) of the edge.

  • end_key (str) –

    The key of the end node of the edge.

Raises:

  • ValueError

    If the start key is 'END' or if the start key or end key is not present in the graph.

Returns:

  • None

    None

Source code in langgraph/graph/state.py
def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> None:
    """Adds a directed edge from the start node to the end node.

    If the graph transitions to the start_key node, it will always transition to the end_key node next.

    Args:
        start_key (Union[str, list[str]]): The key(s) of the start node(s) of the edge.
        end_key (str): The key of the end node of the edge.

    Raises:
        ValueError: If the start key is 'END' or if the start key or end key is not present in the graph.

    Returns:
        None
    """
    if isinstance(start_key, str):
        return super().add_edge(start_key, end_key)

    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    for start in start_key:
        if start == END:
            raise ValueError("END cannot be a start node")
        if start not in self.nodes:
            raise ValueError(f"Need to add_node `{start}` first")
    if end_key == END:
        raise ValueError("END cannot be an end node")
    if end_key not in self.nodes:
        raise ValueError(f"Need to add_node `{end_key}` first")

    self.waiting_edges.add((tuple(start_key), end_key))

compile(checkpointer=None, interrupt_before=None, interrupt_after=None, debug=False)

Compiles the state graph into a CompiledGraph object.

The compiled graph implements the Runnable interface and can be invoked, streamed, batched, and run asynchronously.

Parameters:

  • checkpointer (Optional[BaseCheckpointSaver], default: None ) –

    An optional checkpoint saver object. This serves as a fully versioned "memory" for the graph, allowing the graph to be paused and resumed, and replayed from any point.

  • interrupt_before (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt before.

  • interrupt_after (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt after.

  • debug (bool, default: False ) –

    A flag indicating whether to enable debug mode.

Returns:

  • CompiledGraph ( CompiledGraph ) –

    The compiled state graph.

Source code in langgraph/graph/state.py
def compile(
    self,
    checkpointer: Optional[BaseCheckpointSaver] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: bool = False,
) -> CompiledGraph:
    """Compiles the state graph into a `CompiledGraph` object.

    The compiled graph implements the `Runnable` interface and can be invoked,
    streamed, batched, and run asynchronously.

    Args:
        checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object.
            This serves as a fully versioned "memory" for the graph, allowing
            the graph to be paused and resumed, and replayed from any point.
        interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before.
        interrupt_after (Optional[Sequence[str]]): An optional list of node names to interrupt after.
        debug (bool): A flag indicating whether to enable debug mode.

    Returns:
        CompiledGraph: The compiled state graph.
    """
    # assign default values
    interrupt_before = interrupt_before or []
    interrupt_after = interrupt_after or []

    # validate the graph
    self.validate(
        interrupt=(interrupt_before if interrupt_before != "*" else [])
        + interrupt_after
        if interrupt_after != "*"
        else []
    )

    # prepare output channels
    state_keys = list(self.channels)
    output_channels = state_keys[0] if state_keys == ["__root__"] else state_keys

    compiled = CompiledStateGraph(
        builder=self,
        config_type=self.config_schema,
        nodes={},
        channels={**self.channels, START: EphemeralValue(self.schema)},
        input_channels=START,
        stream_mode="updates",
        output_channels=output_channels,
        stream_channels=output_channels,
        checkpointer=checkpointer,
        interrupt_before_nodes=interrupt_before,
        interrupt_after_nodes=interrupt_after,
        auto_validate=False,
        debug=debug,
    )

    compiled.attach_node(START, None)
    for key, node in self.nodes.items():
        compiled.attach_node(key, node)

    for start, end in self.edges:
        compiled.attach_edge(start, end)

    for starts, end in self.waiting_edges:
        compiled.attach_edge(starts, end)

    for start, branches in self.branches.items():
        for name, branch in branches.items():
            compiled.attach_branch(start, name, branch)

    return compiled.validate()

add_messages(left, right)

Merges two lists of messages, updating existing messages by ID.

By default, this ensures the state is "append-only", unless the new message has the same ID as an existing message.

Parameters:

  • left (Messages) –

    The base list of messages.

  • right (Messages) –

    The list of messages (or single message) to merge into the base list.

Returns:

  • Messages

    A new list of messages with the messages from right merged into left.

  • Messages

    If a message in right has the same ID as a message in left, the

  • Messages

    message from right will replace the message from left.

Examples:

    msgs1 = [HumanMessage(content="Hello", id="1")]
    msgs2 = [AIMessage(content="Hi there!", id="2")]
    add_messages(msgs1, msgs2)
    # [HumanMessage(content="Hello", id="1"), AIMessage(content="Hi there!", id="2")]


    msgs1 = [HumanMessage(content="Hello", id="1")]
    msgs2 = [HumanMessage(content="Hello again", id="1")]
    add_messages(msgs1, msgs2)
    # [HumanMessage(content="Hello again", id="1")]


    from typing import Annotated
    from typing_extensions import TypedDict
    from langgraph.graph import StateGraph


    class State(TypedDict):
        messages: Annotated[list, add_messages]


    builder = StateGraph(State)
    builder.add_node("chatbot", lambda state: {"messages": [("assistant", "Hello")]})
    builder.set_entry_point("chatbot")
    builder.set_finish_point("chatbot")
    graph = builder.compile()
    graph.invoke({})
    # {'messages': [AIMessage(content='Hello', id='f657fb65-b6af-4790-a5b5-1d266a2ed26e')]}
Source code in langgraph/graph/message.py
def add_messages(left: Messages, right: Messages) -> Messages:
    """Merges two lists of messages, updating existing messages by ID.

    By default, this ensures the state is "append-only", unless the
    new message has the same ID as an existing message.

    Args:
        left: The base list of messages.
        right: The list of messages (or single message) to merge
            into the base list.

    Returns:
        A new list of messages with the messages from `right` merged into `left`.
        If a message in `right` has the same ID as a message in `left`, the
        message from `right` will replace the message from `left`.

    Examples:

            msgs1 = [HumanMessage(content="Hello", id="1")]
            msgs2 = [AIMessage(content="Hi there!", id="2")]
            add_messages(msgs1, msgs2)
            # [HumanMessage(content="Hello", id="1"), AIMessage(content="Hi there!", id="2")]


            msgs1 = [HumanMessage(content="Hello", id="1")]
            msgs2 = [HumanMessage(content="Hello again", id="1")]
            add_messages(msgs1, msgs2)
            # [HumanMessage(content="Hello again", id="1")]


            from typing import Annotated
            from typing_extensions import TypedDict
            from langgraph.graph import StateGraph


            class State(TypedDict):
                messages: Annotated[list, add_messages]


            builder = StateGraph(State)
            builder.add_node("chatbot", lambda state: {"messages": [("assistant", "Hello")]})
            builder.set_entry_point("chatbot")
            builder.set_finish_point("chatbot")
            graph = builder.compile()
            graph.invoke({})
            # {'messages': [AIMessage(content='Hello', id='f657fb65-b6af-4790-a5b5-1d266a2ed26e')]}

    """
    # coerce to list
    if not isinstance(left, list):
        left = [left]
    if not isinstance(right, list):
        right = [right]
    # coerce to message
    left = [message_chunk_to_message(m) for m in convert_to_messages(left)]
    right = [message_chunk_to_message(m) for m in convert_to_messages(right)]
    # assign missing ids
    for m in left:
        if m.id is None:
            m.id = str(uuid.uuid4())
    for m in right:
        if m.id is None:
            m.id = str(uuid.uuid4())
    # merge
    left_idx_by_id = {m.id: i for i, m in enumerate(left)}
    merged = left.copy()
    for m in right:
        if (existing_idx := left_idx_by_id.get(m.id)) is not None:
            merged[existing_idx] = m
        else:
            merged.append(m)
    return merged

CompiledGraph

Bases: Pregel

Source code in langgraph/graph/graph.py
class CompiledGraph(Pregel):
    builder: Graph

    def attach_node(self, key: str, node: Runnable) -> None:
        self.channels[key] = EphemeralValue(Any)
        self.nodes[key] = (
            PregelNode(channels=[], triggers=[])
            | node
            | ChannelWrite([ChannelWriteEntry(key)], tags=[TAG_HIDDEN])
        )
        cast(list[str], self.stream_channels).append(key)

    def attach_edge(self, start: str, end: str) -> None:
        if end == END:
            # publish to end channel
            self.nodes[start].writers.append(
                ChannelWrite([ChannelWriteEntry(END)], tags=[TAG_HIDDEN])
            )
        else:
            # subscribe to start channel
            self.nodes[end].triggers.append(start)
            self.nodes[end].channels.append(start)

    def attach_branch(self, start: str, name: str, branch: Branch) -> None:
        def branch_writer(ends: list[str]) -> Optional[ChannelWrite]:
            channels = [
                f"branch:{start}:{name}:{end}" if end != END else END for end in ends
            ]
            return ChannelWrite(
                [ChannelWriteEntry(ch) for ch in channels], tags=[TAG_HIDDEN]
            )

        # add hidden start node
        if start == START and start not in self.nodes:
            self.nodes[start] = Channel.subscribe_to(START, tags=[TAG_HIDDEN])

        # attach branch writer
        self.nodes[start] |= branch.run(branch_writer)

        # attach branch readers
        ends = branch.ends.values() if branch.ends else [node for node in self.nodes]
        for end in ends:
            if end != END:
                channel_name = f"branch:{start}:{name}:{end}"
                self.channels[channel_name] = EphemeralValue(Any)
                self.nodes[end].triggers.append(channel_name)
                self.nodes[end].channels.append(channel_name)

    def get_graph(
        self,
        config: Optional[RunnableConfig] = None,
        *,
        xray: Union[int, bool] = False,
    ) -> DrawableGraph:
        """Returns a drawable representation of the computation graph."""
        graph = DrawableGraph()
        start_nodes: dict[str, RunnableGraphNode] = {
            START: graph.add_node(self.get_input_schema(config), START)
        }
        end_nodes: dict[str, RunnableGraphNode] = {
            END: graph.add_node(self.get_output_schema(config), END)
        }

        for key, node in self.builder.nodes.items():
            if xray:
                subgraph = (
                    node.get_graph(
                        config=config,
                        xray=xray - 1 if isinstance(xray, int) and xray > 0 else xray,
                    )
                    if isinstance(node, CompiledGraph)
                    else node.get_graph(config=config)
                )
                subgraph.trim_first_node()
                subgraph.trim_last_node()
                if len(subgraph.nodes) > 1:
                    end_nodes[key], start_nodes[key] = graph.extend(
                        subgraph, prefix=key
                    )
                else:
                    n = graph.add_node(node, key)
                    start_nodes[key] = n
                    end_nodes[key] = n
            else:
                n = graph.add_node(node, key)
                start_nodes[key] = n
                end_nodes[key] = n
        for start, end in sorted(self.builder._all_edges):
            graph.add_edge(start_nodes[start], end_nodes[end])
        for start, branches in self.builder.branches.items():
            default_ends = {
                **{k: k for k in self.builder.nodes if k != start},
                END: END,
            }
            for _, branch in branches.items():
                if branch.ends is not None:
                    ends = branch.ends
                elif branch.then is not None:
                    ends = {k: k for k in default_ends if k not in (END, branch.then)}
                else:
                    ends = default_ends
                for label, end in ends.items():
                    graph.add_edge(
                        start_nodes[start],
                        end_nodes[end],
                        label if label != end else None,
                        conditional=True,
                    )
                    if branch.then is not None:
                        graph.add_edge(start_nodes[end], end_nodes[branch.then])

        return graph

output_channels: Union[str, Sequence[str]] instance-attribute

Channels to output, defaults to channel named 'output'.

stream_channels: Optional[Union[str, Sequence[str]]] = None class-attribute instance-attribute

Channels to stream, defaults to all channels not in reserved channels

is_lc_serializable() classmethod

Return whether the graph can be serialized by Langchain.

Source code in langgraph/pregel/__init__.py
@classmethod
def is_lc_serializable(cls) -> bool:
    """Return whether the graph can be serialized by Langchain."""
    return True

get_state(config)

Get the current state of the graph.

Source code in langgraph/pregel/__init__.py
def get_state(self, config: RunnableConfig) -> StateSnapshot:
    """Get the current state of the graph."""
    if not self.checkpointer:
        raise ValueError("No checkpointer set")

    saved = self.checkpointer.get_tuple(config)
    checkpoint = saved.checkpoint if saved else empty_checkpoint()
    with ChannelsManager(self.channels, checkpoint) as channels:
        _, next_tasks = _prepare_next_tasks(
            checkpoint, self.nodes, channels, for_execution=False
        )
        return StateSnapshot(
            read_channels(channels, self.stream_channels_asis),
            tuple(name for name, _ in next_tasks),
            saved.config if saved else config,
            saved.metadata if saved else None,
            saved.parent_config if saved else None,
        )

aget_state(config) async

Get the current state of the graph.

Source code in langgraph/pregel/__init__.py
async def aget_state(self, config: RunnableConfig) -> StateSnapshot:
    """Get the current state of the graph."""
    if not self.checkpointer:
        raise ValueError("No checkpointer set")

    saved = await self.checkpointer.aget_tuple(config)
    checkpoint = saved.checkpoint if saved else empty_checkpoint()
    async with AsyncChannelsManager(self.channels, checkpoint) as channels:
        _, next_tasks = _prepare_next_tasks(
            checkpoint, self.nodes, channels, for_execution=False
        )
        return StateSnapshot(
            read_channels(channels, self.stream_channels_asis),
            tuple(name for name, _ in next_tasks),
            saved.config if saved else config,
            saved.metadata if saved else None,
            saved.parent_config if saved else None,
        )

get_state_history(config, *, before=None, limit=None)

Get the history of the state of the graph.

Source code in langgraph/pregel/__init__.py
def get_state_history(
    self,
    config: RunnableConfig,
    *,
    before: Optional[RunnableConfig] = None,
    limit: Optional[int] = None,
) -> Iterator[StateSnapshot]:
    """Get the history of the state of the graph."""
    if not self.checkpointer:
        raise ValueError("No checkpointer set")

    for config, checkpoint, metadata, parent_config in self.checkpointer.list(
        config, before=before, limit=limit
    ):
        with ChannelsManager(self.channels, checkpoint) as channels:
            _, next_tasks = _prepare_next_tasks(
                checkpoint, self.nodes, channels, for_execution=False
            )
            yield StateSnapshot(
                read_channels(channels, self.stream_channels_asis),
                tuple(name for name, _ in next_tasks),
                config,
                metadata,
                parent_config,
            )

aget_state_history(config, *, before=None, limit=None) async

Get the history of the state of the graph.

Source code in langgraph/pregel/__init__.py
async def aget_state_history(
    self,
    config: RunnableConfig,
    *,
    before: Optional[RunnableConfig] = None,
    limit: Optional[int] = None,
) -> AsyncIterator[StateSnapshot]:
    """Get the history of the state of the graph."""
    if not self.checkpointer:
        raise ValueError("No checkpointer set")

    async for (
        config,
        checkpoint,
        metadata,
        parent_config,
    ) in self.checkpointer.alist(config, before=before, limit=limit):
        async with AsyncChannelsManager(self.channels, checkpoint) as channels:
            _, next_tasks = _prepare_next_tasks(
                checkpoint, self.nodes, channels, for_execution=False
            )
            yield StateSnapshot(
                read_channels(channels, self.stream_channels_asis),
                tuple(name for name, _ in next_tasks),
                config,
                metadata,
                parent_config,
            )

update_state(config, values, as_node=None)

Update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

Source code in langgraph/pregel/__init__.py
def update_state(
    self,
    config: RunnableConfig,
    values: dict[str, Any] | Any,
    as_node: Optional[str] = None,
) -> RunnableConfig:
    """Update the state of the graph with the given values, as if they came from
    node `as_node`. If `as_node` is not provided, it will be set to the last node
    that updated the state, if not ambiguous.
    """
    if not self.checkpointer:
        raise ValueError("No checkpointer set")

    # get last checkpoint
    saved = self.checkpointer.get_tuple(config)
    checkpoint = copy_checkpoint(saved.checkpoint) if saved else empty_checkpoint()
    # find last node that updated the state, if not provided
    if as_node is None and not saved:
        if (
            isinstance(self.input_channels, str)
            and self.input_channels in self.nodes
        ):
            as_node = self.input_channels
    elif as_node is None:
        last_seen_by_node = sorted(
            (v, n)
            for n, seen in checkpoint["versions_seen"].items()
            for v in seen.values()
        )
        # if two nodes updated the state at the same time, it's ambiguous
        if last_seen_by_node:
            if len(last_seen_by_node) == 1:
                as_node = last_seen_by_node[0][1]
            elif last_seen_by_node[-1][0] != last_seen_by_node[-2][0]:
                as_node = last_seen_by_node[-1][1]
    if as_node is None:
        raise InvalidUpdateError("Ambiguous update, specify as_node")
    # update channels
    with ChannelsManager(self.channels, checkpoint) as channels:
        # create task to run all writers of the chosen node
        writers = self.nodes[as_node].get_writers()
        if not writers:
            raise InvalidUpdateError(f"Node {as_node} has no writers")
        task = PregelExecutableTask(
            as_node,
            values,
            RunnableSequence(*writers) if len(writers) > 1 else writers[0],
            deque(),
            None,
            [INTERRUPT],
        )
        # execute task
        task.proc.invoke(
            task.input,
            patch_config(
                config,
                run_name=self.name + "UpdateState",
                configurable={
                    # deque.extend is thread-safe
                    CONFIG_KEY_SEND: task.writes.extend,
                    CONFIG_KEY_READ: partial(
                        _local_read, checkpoint, channels, task.writes
                    ),
                },
            ),
        )
        # apply to checkpoint and save
        _apply_writes(checkpoint, channels, task.writes)
        return self.checkpointer.put(
            saved.config if saved else config,
            create_checkpoint(checkpoint, channels),
            {
                "source": "update",
                "step": saved.metadata.get("step", 0) + 1 if saved else 0,
            },
        )

stream(input, config=None, *, stream_mode=None, output_keys=None, input_keys=None, interrupt_before=None, interrupt_after=None, debug=None)

Stream graph steps for a single input.

Source code in langgraph/pregel/__init__.py
def stream(
    self,
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: Optional[StreamMode] = None,
    output_keys: Optional[Union[str, Sequence[str]]] = None,
    input_keys: Optional[Union[str, Sequence[str]]] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: Optional[bool] = None,
) -> Iterator[Union[dict[str, Any], Any]]:
    """Stream graph steps for a single input."""
    config = ensure_config(config)
    callback_manager = get_callback_manager_for_config(config)
    run_manager = callback_manager.on_chain_start(
        dumpd(self),
        input,
        name=config.get("run_name", self.get_name()),
        run_id=config.get("run_id"),
    )
    try:
        bg: list[concurrent.futures.Future] = []
        if config["recursion_limit"] < 1:
            raise ValueError("recursion_limit must be at least 1")
        if self.checkpointer and not config.get("configurable"):
            raise ValueError(
                f"Checkpointer requires one or more of the following 'configurable' keys: {[s.id for s in self.checkpointer.config_specs]}"
            )
        # assign defaults
        (
            debug,
            stream_mode,
            input_keys,
            output_keys,
            interrupt_before,
            interrupt_after,
        ) = self._defaults(
            config,
            stream_mode=stream_mode,
            input_keys=input_keys,
            output_keys=output_keys,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            debug=debug,
        )
        # copy nodes to ignore mutations during execution
        processes = {**self.nodes}
        # get checkpoint from saver, or create an empty one
        saved = self.checkpointer.get_tuple(config) if self.checkpointer else None
        checkpoint = saved.checkpoint if saved else empty_checkpoint()
        checkpoint_config = saved.config if saved else config
        start = saved.metadata.get("step", -2) + 1 if saved else -1
        # create channels from checkpoint
        with ChannelsManager(
            self.channels, checkpoint
        ) as channels, get_executor_for_config(config) as executor:
            # map inputs to channel updates
            if input_writes := deque(map_input(input_keys, input)):
                # discard any unfinished tasks from previous checkpoint
                checkpoint, _ = _prepare_next_tasks(
                    checkpoint, processes, channels, for_execution=True
                )
                # apply input writes
                _apply_writes(checkpoint, channels, input_writes)
                # save input checkpoint
                if self.checkpointer is not None:
                    checkpoint = create_checkpoint(checkpoint, channels)
                    bg.append(
                        executor.submit(
                            self.checkpointer.put,
                            checkpoint_config,
                            copy_checkpoint(checkpoint),
                            {"source": "input", "step": start},
                        )
                    )
                    checkpoint_config = {
                        "configurable": {
                            "thread_id": checkpoint_config["configurable"][
                                "thread_id"
                            ],
                            "thread_ts": checkpoint["ts"],
                        }
                    }
                # increment start to 0
                start += 1
            else:
                # if received no input, take that as signal to proceed
                # past previous interrupt, if any
                checkpoint = copy_checkpoint(checkpoint)
                for k in self.stream_channels_list:
                    version = checkpoint["channel_versions"][k]
                    checkpoint["versions_seen"][INTERRUPT][k] = version

            # Similarly to Bulk Synchronous Parallel / Pregel model
            # computation proceeds in steps, while there are channel updates
            # channel updates from step N are only visible in step N+1
            # channels are guaranteed to be immutable for the duration of the step,
            # with channel updates applied only at the transition between steps
            stop = start + config["recursion_limit"] + 1
            for step in range(start, stop):
                next_checkpoint, next_tasks = _prepare_next_tasks(
                    checkpoint, processes, channels, for_execution=True
                )

                # if no more tasks, we're done
                if not next_tasks:
                    if step == 0:
                        raise ValueError("No tasks to run in graph.")
                    else:
                        break

                # before execution, check if we should interrupt
                if _should_interrupt(
                    checkpoint,
                    interrupt_before,
                    self.stream_channels_list,
                    next_tasks,
                ):
                    break
                else:
                    checkpoint = next_checkpoint

                if debug:
                    print_step_tasks(step, next_tasks)
                if stream_mode == "debug":
                    for chunk in map_debug_tasks(step, next_tasks):
                        yield chunk

                # prepare tasks with config
                tasks_w_config = [
                    (
                        proc,
                        input,
                        patch_config(
                            merge_configs(config, proc_config),
                            run_name=name,
                            callbacks=run_manager.get_child(f"graph:step:{step}"),
                            configurable={
                                # deque.extend is thread-safe
                                CONFIG_KEY_SEND: writes.extend,
                                CONFIG_KEY_READ: partial(
                                    _local_read, checkpoint, channels, writes
                                ),
                            },
                        ),
                    )
                    for name, input, proc, writes, proc_config, _ in next_tasks
                ]

                futures = [
                    executor.submit(proc.invoke, input, config)
                    for proc, input, config in tasks_w_config
                ]

                # execute tasks, and wait for one to fail or all to finish.
                # each task is independent from all other concurrent tasks
                done, inflight = concurrent.futures.wait(
                    futures,
                    return_when=concurrent.futures.FIRST_EXCEPTION,
                    timeout=self.step_timeout,
                )

                # panic on failure or timeout
                _panic_or_proceed(done, inflight, step)

                # combine pending writes from all tasks
                pending_writes = deque[tuple[str, Any]]()
                for _, _, _, writes, _, _ in next_tasks:
                    pending_writes.extend(writes)

                if debug:
                    print_step_writes(
                        step, pending_writes, self.stream_channels_list
                    )

                # apply writes to channels
                _apply_writes(checkpoint, channels, pending_writes)

                if debug:
                    print_step_checkpoint(step, channels, self.stream_channels_list)

                # yield current value or updates
                if stream_mode == "values":
                    yield from map_output_values(
                        output_keys, pending_writes, channels
                    )
                elif stream_mode == "debug":
                    yield from map_debug_task_results(
                        step, next_tasks, self.stream_channels_list
                    )
                else:
                    yield from map_output_updates(output_keys, next_tasks)

                # save end of step checkpoint
                if self.checkpointer is not None:
                    checkpoint = create_checkpoint(checkpoint, channels)
                    bg.append(
                        executor.submit(
                            self.checkpointer.put,
                            checkpoint_config,
                            copy_checkpoint(checkpoint),
                            {"source": "loop", "step": step},
                        )
                    )
                    checkpoint_config = {
                        "configurable": {
                            "thread_id": checkpoint_config["configurable"][
                                "thread_id"
                            ],
                            "thread_ts": checkpoint["ts"],
                        }
                    }
                # yield debug checkpoint
                if stream_mode == "debug":
                    yield map_debug_checkpoint(
                        step,
                        checkpoint_config if self.checkpointer else None,
                        channels,
                        self.stream_channels_asis,
                    )

                # after execution, check if we should interrupt
                if _should_interrupt(
                    checkpoint,
                    interrupt_after,
                    self.stream_channels_list,
                    next_tasks,
                ):
                    break
            else:
                raise GraphRecursionError(
                    f"Recursion limit of {config['recursion_limit']} reached"
                    "without hitting a stop condition. You can increase the "
                    "limit by setting the `recursion_limit` config key."
                )

            # set final channel values as run output
            run_manager.on_chain_end(read_channels(channels, output_keys))
    except BaseException as e:
        run_manager.on_chain_error(e)
        raise
    finally:
        # cancel any pending tasks when generator is interrupted
        try:
            for task in futures:
                task.cancel()
        except NameError:
            pass
        # wait for all background tasks to finish
        done, _ = concurrent.futures.wait(
            bg, return_when=concurrent.futures.ALL_COMPLETED
        )
        for task in done:
            task.result()

invoke(input, config=None, *, stream_mode='values', output_keys=None, input_keys=None, interrupt_before=None, interrupt_after=None, debug=None, **kwargs)

Run the graph with a single input and config.

Parameters:

  • input (Union[dict[str, Any], Any]) –

    The input data for the graph. It can be a dictionary or any other type.

  • config (Optional[RunnableConfig], default: None ) –

    Optional. The configuration for the graph run.

  • stream_mode (StreamMode, default: 'values' ) –

    Optional[str]. The stream mode for the graph run. Default is "values".

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    Optional. The output keys to retrieve from the graph run.

  • input_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    Optional. The input keys to provide for the graph run.

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    Optional. The nodes to interrupt the graph run before.

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    Optional. The nodes to interrupt the graph run after.

  • debug (Optional[bool], default: None ) –

    Optional. Enable debug mode for the graph run.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to the graph run.

Returns:

  • Union[dict[str, Any], Any]

    The output of the graph run. If stream_mode is "values", it returns the latest output.

  • Union[dict[str, Any], Any]

    If stream_mode is not "values", it returns a list of output chunks.

Source code in langgraph/pregel/__init__.py
def invoke(
    self,
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: StreamMode = "values",
    output_keys: Optional[Union[str, Sequence[str]]] = None,
    input_keys: Optional[Union[str, Sequence[str]]] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: Optional[bool] = None,
    **kwargs: Any,
) -> Union[dict[str, Any], Any]:
    """Run the graph with a single input and config.

    Args:
        input: The input data for the graph. It can be a dictionary or any other type.
        config: Optional. The configuration for the graph run.
        stream_mode: Optional[str]. The stream mode for the graph run. Default is "values".
        output_keys: Optional. The output keys to retrieve from the graph run.
        input_keys: Optional. The input keys to provide for the graph run.
        interrupt_before: Optional. The nodes to interrupt the graph run before.
        interrupt_after: Optional. The nodes to interrupt the graph run after.
        debug: Optional. Enable debug mode for the graph run.
        **kwargs: Additional keyword arguments to pass to the graph run.

    Returns:
        The output of the graph run. If stream_mode is "values", it returns the latest output.
        If stream_mode is not "values", it returns a list of output chunks.
    """
    output_keys = output_keys if output_keys is not None else self.output_channels
    if stream_mode == "values":
        latest: Union[dict[str, Any], Any] = None
    else:
        chunks = []
    for chunk in self.stream(
        input,
        config,
        stream_mode=stream_mode,
        output_keys=output_keys,
        input_keys=input_keys,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        debug=debug,
        **kwargs,
    ):
        if stream_mode == "values":
            latest = chunk
        else:
            chunks.append(chunk)
    if stream_mode == "values":
        return latest
    else:
        return chunks

ainvoke(input, config=None, *, stream_mode='values', output_keys=None, input_keys=None, interrupt_before=None, interrupt_after=None, debug=None, **kwargs) async

Asynchronously invoke the graph on a single input.

Parameters:

  • input (Union[dict[str, Any], Any]) –

    The input data for the computation. It can be a dictionary or any other type.

  • config (Optional[RunnableConfig], default: None ) –

    Optional. The configuration for the computation.

  • stream_mode (StreamMode, default: 'values' ) –

    Optional. The stream mode for the computation. Default is "values".

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    Optional. The output keys to include in the result. Default is None.

  • input_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    Optional. The input keys to include in the result. Default is None.

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    Optional. The nodes to interrupt before. Default is None.

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    Optional. The nodes to interrupt after. Default is None.

  • debug (Optional[bool], default: None ) –

    Optional. Whether to enable debug mode. Default is None.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments.

Returns:

  • Union[dict[str, Any], Any]

    The result of the computation. If stream_mode is "values", it returns the latest value.

  • Union[dict[str, Any], Any]

    If stream_mode is "chunks", it returns a list of chunks.

Source code in langgraph/pregel/__init__.py
async def ainvoke(
    self,
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: StreamMode = "values",
    output_keys: Optional[Union[str, Sequence[str]]] = None,
    input_keys: Optional[Union[str, Sequence[str]]] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: Optional[bool] = None,
    **kwargs: Any,
) -> Union[dict[str, Any], Any]:
    """Asynchronously invoke the graph on a single input.

    Args:
        input: The input data for the computation. It can be a dictionary or any other type.
        config: Optional. The configuration for the computation.
        stream_mode: Optional. The stream mode for the computation. Default is "values".
        output_keys: Optional. The output keys to include in the result. Default is None.
        input_keys: Optional. The input keys to include in the result. Default is None.
        interrupt_before: Optional. The nodes to interrupt before. Default is None.
        interrupt_after: Optional. The nodes to interrupt after. Default is None.
        debug: Optional. Whether to enable debug mode. Default is None.
        **kwargs: Additional keyword arguments.

    Returns:
        The result of the computation. If stream_mode is "values", it returns the latest value.
        If stream_mode is "chunks", it returns a list of chunks.
    """

    output_keys = output_keys if output_keys is not None else self.output_channels
    if stream_mode == "values":
        latest: Union[dict[str, Any], Any] = None
    else:
        chunks = []
    async for chunk in self.astream(
        input,
        config,
        stream_mode=stream_mode,
        output_keys=output_keys,
        input_keys=input_keys,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        debug=debug,
        **kwargs,
    ):
        if stream_mode == "values":
            latest = chunk
        else:
            chunks.append(chunk)
    if stream_mode == "values":
        return latest
    else:
        return chunks

get_graph(config=None, *, xray=False)

Returns a drawable representation of the computation graph.

Source code in langgraph/graph/graph.py
def get_graph(
    self,
    config: Optional[RunnableConfig] = None,
    *,
    xray: Union[int, bool] = False,
) -> DrawableGraph:
    """Returns a drawable representation of the computation graph."""
    graph = DrawableGraph()
    start_nodes: dict[str, RunnableGraphNode] = {
        START: graph.add_node(self.get_input_schema(config), START)
    }
    end_nodes: dict[str, RunnableGraphNode] = {
        END: graph.add_node(self.get_output_schema(config), END)
    }

    for key, node in self.builder.nodes.items():
        if xray:
            subgraph = (
                node.get_graph(
                    config=config,
                    xray=xray - 1 if isinstance(xray, int) and xray > 0 else xray,
                )
                if isinstance(node, CompiledGraph)
                else node.get_graph(config=config)
            )
            subgraph.trim_first_node()
            subgraph.trim_last_node()
            if len(subgraph.nodes) > 1:
                end_nodes[key], start_nodes[key] = graph.extend(
                    subgraph, prefix=key
                )
            else:
                n = graph.add_node(node, key)
                start_nodes[key] = n
                end_nodes[key] = n
        else:
            n = graph.add_node(node, key)
            start_nodes[key] = n
            end_nodes[key] = n
    for start, end in sorted(self.builder._all_edges):
        graph.add_edge(start_nodes[start], end_nodes[end])
    for start, branches in self.builder.branches.items():
        default_ends = {
            **{k: k for k in self.builder.nodes if k != start},
            END: END,
        }
        for _, branch in branches.items():
            if branch.ends is not None:
                ends = branch.ends
            elif branch.then is not None:
                ends = {k: k for k in default_ends if k not in (END, branch.then)}
            else:
                ends = default_ends
            for label, end in ends.items():
                graph.add_edge(
                    start_nodes[start],
                    end_nodes[end],
                    label if label != end else None,
                    conditional=True,
                )
                if branch.then is not None:
                    graph.add_edge(start_nodes[end], end_nodes[branch.then])

    return graph

MessageGraph

Bases: StateGraph

A StateGraph where every node receives a list of messages as input and returns one or more messages as output.

MessageGraph is a subclass of StateGraph whose entire state is a single, append-only* list of messages. Each node in a MessageGraph takes a list of messages as input and returns zero or more messages as output. The add_messages function is used to merge the output messages from each node into the existing list of messages in the graph's state.

Examples:

from langgraph.graph.message import MessageGraph

builder = MessageGraph()
builder.add_node("chatbot", lambda state: [("assistant", "Hello!")])
builder.set_entry_point("chatbot")
builder.set_finish_point("chatbot")
builder.compile().invoke([("user", "Hi there.")])
# {'messages': [HumanMessage(content="Hi there.", id='b8b7d8f4-7f4d-4f4d-9c1d-f8b8d8f4d9c1'),
#              AIMessage(content="Hello!", id='f4d9c1d8-8d8f-4d9c-b8b7-d8f4f4d9c1d8')]}


from langchain_core.messages import AIMessage, HumanMessage, ToolMessage

from langgraph.graph.message import MessageGraph

builder = MessageGraph()
builder.add_node(
    "chatbot",
    lambda state: [
        AIMessage(
            content="Hello!",
            tool_calls=[{"name": "search", "id": "123", "args": {"query": "X"}}],
        )
    ],
)
builder.add_node(
    "search", lambda state: [ToolMessage(content="Searching...", tool_call_id="123")]
)
builder.set_entry_point("chatbot")
builder.add_edge("chatbot", "search")
builder.set_finish_point("search")
builder.compile().invoke([HumanMessage(content="Hi there. Can you search for X?")])
# {'messages': [HumanMessage(content="Hi there. Can you search for X?", id='b8b7d8f4-7f4d-4f4d-9c1d-f8b8d8f4d9c1'),
#              AIMessage(content="Hello!", id='f4d9c1d8-8d8f-4d9c-b8b7-d8f4f4d9c1d8'),
#              ToolMessage(content="Searching...", id='d8f4f4d9-c1d8-4f4d-b8b7-d8f4f4d9c1d8', tool_call_id="123")]}
Source code in langgraph/graph/message.py
class MessageGraph(StateGraph):
    """A StateGraph where every node receives a list of messages as input and returns one or more messages as output.

    MessageGraph is a subclass of StateGraph whose entire state is a single, append-only* list of messages.
    Each node in a MessageGraph takes a list of messages as input and returns zero or more
    messages as output. The `add_messages` function is used to merge the output messages from each node
    into the existing list of messages in the graph's state.

    Examples:

        from langgraph.graph.message import MessageGraph

        builder = MessageGraph()
        builder.add_node("chatbot", lambda state: [("assistant", "Hello!")])
        builder.set_entry_point("chatbot")
        builder.set_finish_point("chatbot")
        builder.compile().invoke([("user", "Hi there.")])
        # {'messages': [HumanMessage(content="Hi there.", id='b8b7d8f4-7f4d-4f4d-9c1d-f8b8d8f4d9c1'),
        #              AIMessage(content="Hello!", id='f4d9c1d8-8d8f-4d9c-b8b7-d8f4f4d9c1d8')]}


        from langchain_core.messages import AIMessage, HumanMessage, ToolMessage

        from langgraph.graph.message import MessageGraph

        builder = MessageGraph()
        builder.add_node(
            "chatbot",
            lambda state: [
                AIMessage(
                    content="Hello!",
                    tool_calls=[{"name": "search", "id": "123", "args": {"query": "X"}}],
                )
            ],
        )
        builder.add_node(
            "search", lambda state: [ToolMessage(content="Searching...", tool_call_id="123")]
        )
        builder.set_entry_point("chatbot")
        builder.add_edge("chatbot", "search")
        builder.set_finish_point("search")
        builder.compile().invoke([HumanMessage(content="Hi there. Can you search for X?")])
        # {'messages': [HumanMessage(content="Hi there. Can you search for X?", id='b8b7d8f4-7f4d-4f4d-9c1d-f8b8d8f4d9c1'),
        #              AIMessage(content="Hello!", id='f4d9c1d8-8d8f-4d9c-b8b7-d8f4f4d9c1d8'),
        #              ToolMessage(content="Searching...", id='d8f4f4d9-c1d8-4f4d-b8b7-d8f4f4d9c1d8', tool_call_id="123")]}
    """

    def __init__(self) -> None:
        super().__init__(Annotated[list[AnyMessage], add_messages])

add_node(key, action)

Adds a new node to the state graph.

Parameters:

  • key (str) –

    The key of the node.

  • action (RunnableLike) –

    The action associated with the node.

Raises:

  • ValueError

    If the key is already being used as a state key.

Returns:

  • None

    None

Source code in langgraph/graph/state.py
def add_node(self, key: str, action: RunnableLike) -> None:
    """Adds a new node to the state graph.

    Args:
        key (str): The key of the node.
        action (RunnableLike): The action associated with the node.

    Raises:
        ValueError: If the key is already being used as a state key.

    Returns:
        None
    """
    if key in self.channels:
        raise ValueError(f"'{key}' is already being used as a state key")
    return super().add_node(key, action)

add_edge(start_key, end_key)

Adds a directed edge from the start node to the end node.

If the graph transitions to the start_key node, it will always transition to the end_key node next.

Parameters:

  • start_key (Union[str, list[str]]) –

    The key(s) of the start node(s) of the edge.

  • end_key (str) –

    The key of the end node of the edge.

Raises:

  • ValueError

    If the start key is 'END' or if the start key or end key is not present in the graph.

Returns:

  • None

    None

Source code in langgraph/graph/state.py
def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> None:
    """Adds a directed edge from the start node to the end node.

    If the graph transitions to the start_key node, it will always transition to the end_key node next.

    Args:
        start_key (Union[str, list[str]]): The key(s) of the start node(s) of the edge.
        end_key (str): The key of the end node of the edge.

    Raises:
        ValueError: If the start key is 'END' or if the start key or end key is not present in the graph.

    Returns:
        None
    """
    if isinstance(start_key, str):
        return super().add_edge(start_key, end_key)

    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    for start in start_key:
        if start == END:
            raise ValueError("END cannot be a start node")
        if start not in self.nodes:
            raise ValueError(f"Need to add_node `{start}` first")
    if end_key == END:
        raise ValueError("END cannot be an end node")
    if end_key not in self.nodes:
        raise ValueError(f"Need to add_node `{end_key}` first")

    self.waiting_edges.add((tuple(start_key), end_key))

add_conditional_edges(source, path, path_map=None, then=None)

Add a conditional edge from the starting node to any number of destination nodes.

Parameters:

  • source (str) –

    The starting node. This conditional edge will run when exiting this node.

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

  • path_map (Optional[dict[str, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def add_conditional_edges(
    self,
    source: str,
    path: Union[
        Callable[..., Union[str, list[str]]],
        Callable[..., Awaitable[Union[str, list[str]]]],
        Runnable[Any, Union[str, list[str]]],
    ],
    path_map: Optional[Union[dict[str, str], list[str]]] = None,
    then: Optional[str] = None,
) -> None:
    """Add a conditional edge from the starting node to any number of destination nodes.

    Args:
        source (str): The starting node. This conditional edge will run when
            exiting this node.
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[str, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None
    """  # noqa: E501
    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    # coerce path_map to a dictionary
    if isinstance(path_map, dict):
        pass
    elif isinstance(path_map, list):
        path_map = {name: name for name in path_map}
    elif rtn_type := get_type_hints(path).get("return"):
        if get_origin(rtn_type) is Literal:
            path_map = {name: name for name in get_args(rtn_type)}
    # find a name for the condition
    path = coerce_to_runnable(path, name=None, trace=True)
    name = path.name or "condition"
    # validate the condition
    if name in self.branches[source]:
        raise ValueError(
            f"Branch with name `{path.name}` already exists for node " f"`{source}`"
        )
    # save it
    self.branches[source][name] = Branch(path, path_map, then)

set_entry_point(key)

Specifies the first node to be called in the graph.

Parameters:

  • key (str) –

    The key of the node to set as the entry point.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def set_entry_point(self, key: str) -> None:
    """Specifies the first node to be called in the graph.

    Parameters:
        key (str): The key of the node to set as the entry point.

    Returns:
        None
    """
    return self.add_edge(START, key)

set_conditional_entry_point(path, path_map=None, then=None)

Sets a conditional entry point in the graph.

Parameters:

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

  • path_map (Optional[dict[str, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def set_conditional_entry_point(
    self,
    path: Union[
        Callable[..., str], Callable[..., Awaitable[str]], Runnable[Any, str]
    ],
    path_map: Optional[Dict[str, str]] = None,
    then: Optional[str] = None,
) -> None:
    """Sets a conditional entry point in the graph.

    Args:
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[str, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None
    """
    return self.add_conditional_edges(START, path, path_map, then)

set_finish_point(key)

Marks a node as a finish point of the graph.

If the graph reaches this node, it will cease execution.

Parameters:

  • key (str) –

    The key of the node to set as the finish point.

Returns:

  • None

    None

Source code in langgraph/graph/graph.py
def set_finish_point(self, key: str) -> None:
    """Marks a node as a finish point of the graph.

    If the graph reaches this node, it will cease execution.

    Parameters:
        key (str): The key of the node to set as the finish point.

    Returns:
        None
    """
    return self.add_edge(key, END)

compile(checkpointer=None, interrupt_before=None, interrupt_after=None, debug=False)

Compiles the state graph into a CompiledGraph object.

The compiled graph implements the Runnable interface and can be invoked, streamed, batched, and run asynchronously.

Parameters:

  • checkpointer (Optional[BaseCheckpointSaver], default: None ) –

    An optional checkpoint saver object. This serves as a fully versioned "memory" for the graph, allowing the graph to be paused and resumed, and replayed from any point.

  • interrupt_before (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt before.

  • interrupt_after (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt after.

  • debug (bool, default: False ) –

    A flag indicating whether to enable debug mode.

Returns:

  • CompiledGraph ( CompiledGraph ) –

    The compiled state graph.

Source code in langgraph/graph/state.py
def compile(
    self,
    checkpointer: Optional[BaseCheckpointSaver] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: bool = False,
) -> CompiledGraph:
    """Compiles the state graph into a `CompiledGraph` object.

    The compiled graph implements the `Runnable` interface and can be invoked,
    streamed, batched, and run asynchronously.

    Args:
        checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object.
            This serves as a fully versioned "memory" for the graph, allowing
            the graph to be paused and resumed, and replayed from any point.
        interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before.
        interrupt_after (Optional[Sequence[str]]): An optional list of node names to interrupt after.
        debug (bool): A flag indicating whether to enable debug mode.

    Returns:
        CompiledGraph: The compiled state graph.
    """
    # assign default values
    interrupt_before = interrupt_before or []
    interrupt_after = interrupt_after or []

    # validate the graph
    self.validate(
        interrupt=(interrupt_before if interrupt_before != "*" else [])
        + interrupt_after
        if interrupt_after != "*"
        else []
    )

    # prepare output channels
    state_keys = list(self.channels)
    output_channels = state_keys[0] if state_keys == ["__root__"] else state_keys

    compiled = CompiledStateGraph(
        builder=self,
        config_type=self.config_schema,
        nodes={},
        channels={**self.channels, START: EphemeralValue(self.schema)},
        input_channels=START,
        stream_mode="updates",
        output_channels=output_channels,
        stream_channels=output_channels,
        checkpointer=checkpointer,
        interrupt_before_nodes=interrupt_before,
        interrupt_after_nodes=interrupt_after,
        auto_validate=False,
        debug=debug,
    )

    compiled.attach_node(START, None)
    for key, node in self.nodes.items():
        compiled.attach_node(key, node)

    for start, end in self.edges:
        compiled.attach_edge(start, end)

    for starts, end in self.waiting_edges:
        compiled.attach_edge(starts, end)

    for start, branches in self.branches.items():
        for name, branch in branches.items():
            compiled.attach_branch(start, name, branch)

    return compiled.validate()

add_messages

Merges two lists of messages, updating existing messages by ID.

By default, this ensures the state is "append-only", unless the new message has the same ID as an existing message.

Parameters:

  • left (Messages) –

    The base list of messages.

  • right (Messages) –

    The list of messages (or single message) to merge into the base list.

Returns:

  • Messages

    A new list of messages with the messages from right merged into left.

  • Messages

    If a message in right has the same ID as a message in left, the

  • Messages

    message from right will replace the message from left.

Examples:

    msgs1 = [HumanMessage(content="Hello", id="1")]
    msgs2 = [AIMessage(content="Hi there!", id="2")]
    add_messages(msgs1, msgs2)
    # [HumanMessage(content="Hello", id="1"), AIMessage(content="Hi there!", id="2")]


    msgs1 = [HumanMessage(content="Hello", id="1")]
    msgs2 = [HumanMessage(content="Hello again", id="1")]
    add_messages(msgs1, msgs2)
    # [HumanMessage(content="Hello again", id="1")]


    from typing import Annotated
    from typing_extensions import TypedDict
    from langgraph.graph import StateGraph


    class State(TypedDict):
        messages: Annotated[list, add_messages]


    builder = StateGraph(State)
    builder.add_node("chatbot", lambda state: {"messages": [("assistant", "Hello")]})
    builder.set_entry_point("chatbot")
    builder.set_finish_point("chatbot")
    graph = builder.compile()
    graph.invoke({})
    # {'messages': [AIMessage(content='Hello', id='f657fb65-b6af-4790-a5b5-1d266a2ed26e')]}
Source code in langgraph/graph/message.py
def add_messages(left: Messages, right: Messages) -> Messages:
    """Merges two lists of messages, updating existing messages by ID.

    By default, this ensures the state is "append-only", unless the
    new message has the same ID as an existing message.

    Args:
        left: The base list of messages.
        right: The list of messages (or single message) to merge
            into the base list.

    Returns:
        A new list of messages with the messages from `right` merged into `left`.
        If a message in `right` has the same ID as a message in `left`, the
        message from `right` will replace the message from `left`.

    Examples:

            msgs1 = [HumanMessage(content="Hello", id="1")]
            msgs2 = [AIMessage(content="Hi there!", id="2")]
            add_messages(msgs1, msgs2)
            # [HumanMessage(content="Hello", id="1"), AIMessage(content="Hi there!", id="2")]


            msgs1 = [HumanMessage(content="Hello", id="1")]
            msgs2 = [HumanMessage(content="Hello again", id="1")]
            add_messages(msgs1, msgs2)
            # [HumanMessage(content="Hello again", id="1")]


            from typing import Annotated
            from typing_extensions import TypedDict
            from langgraph.graph import StateGraph


            class State(TypedDict):
                messages: Annotated[list, add_messages]


            builder = StateGraph(State)
            builder.add_node("chatbot", lambda state: {"messages": [("assistant", "Hello")]})
            builder.set_entry_point("chatbot")
            builder.set_finish_point("chatbot")
            graph = builder.compile()
            graph.invoke({})
            # {'messages': [AIMessage(content='Hello', id='f657fb65-b6af-4790-a5b5-1d266a2ed26e')]}

    """
    # coerce to list
    if not isinstance(left, list):
        left = [left]
    if not isinstance(right, list):
        right = [right]
    # coerce to message
    left = [message_chunk_to_message(m) for m in convert_to_messages(left)]
    right = [message_chunk_to_message(m) for m in convert_to_messages(right)]
    # assign missing ids
    for m in left:
        if m.id is None:
            m.id = str(uuid.uuid4())
    for m in right:
        if m.id is None:
            m.id = str(uuid.uuid4())
    # merge
    left_idx_by_id = {m.id: i for i, m in enumerate(left)}
    merged = left.copy()
    for m in right:
        if (existing_idx := left_idx_by_id.get(m.id)) is not None:
            merged[existing_idx] = m
        else:
            merged.append(m)
    return merged