Human-in-the-loop¶
When creating LangGraph agents, it is often nice to add a human in the loop component. This can be helpful when giving them access to tools. Often in these situations you may want to manually approve an action before taking.
This can be in several ways, but the primary supported way is to add an "interrupt" before a node is executed. This interrupts execution at that node. You can then resume from that spot to continue.
Setup¶
First we need to install the packages required
%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai
Next, we need to set API keys for OpenAI (the LLM we will use) and Tavily (the search tool we will use)
import os
import getpass
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
Optionally, we can set API key for LangSmith tracing, which will give us best-in-class observability.
os.environ["LANGCHAIN_TRACING_V2"] = "true"
_set_env("LANGCHAIN_API_KEY")
Set up the State¶
The state is the interface for all the nodes.
from typing_extensions import TypedDict
from typing import Annotated
from langgraph.graph.message import add_messages
# `add_messages`` essentially does this
# (with more robust handling)
# def add_messages(left: list, right: list):
# return left + right
class State(TypedDict):
messages: Annotated[list, add_messages]
from langchain_core.tools import tool
@tool
def search(query: str):
"""Call to surf the web."""
# This is a placeholder for the actual implementation
# Don't let the LLM know this though 😊
return [
"It's sunny in San Francisco, but you better look out if you're a Gemini 😈."
]
tools = [search]
We can now wrap these tools in a simple ToolNode.
This is a simple class that takes in a list of messages containing an AIMessages with tool_calls, runs the tools, and returns the output as ToolMessages.
A ToolInvocation is any class with tool and tool_input attribute.
from langgraph.prebuilt import ToolExecutor
tool_executor = ToolExecutor(tools)
Set up the model¶
Now we need to load the chat model we want to use. Since we are creating a tool-using ReAct agent, we want to make sure the model supports Tool Calling and works with chat messages.
Note: these model requirements are not requirements for using LangGraph - they are just requirements for this one example.
from langchain_openai import ChatOpenAI
model = ChatOpenAI(temperature=0)
After we've done this, we should make sure the model knows that it has these tools available to call. We can do this by converting the LangChain tools into the format for OpenAI function calling, and then bind them to the model class.
model = model.bind_tools(tools)
Define the nodes¶
We now need to define a few different nodes in our graph.
In langgraph, a node can be either a function or a runnable.
There are two main nodes we need for this:
- The agent: responsible for deciding what (if any) actions to take.
- A function to invoke tools: if the agent decides to take an action, this node will then execute that action.
We will also need to define some edges. Some of these edges may be conditional. The reason they are conditional is that based on the output of a node, one of several paths may be taken. The path that is taken is not known until that node is run (the LLM decides).
- Conditional Edge: after the agent is called, we should either: a. If the agent said to take an action, then the function to invoke tools should be called b. If the agent said that it was finished, then it should finish
- Normal Edge: after the tools are invoked, it should always go back to the agent to decide what to do next
Let's define the nodes, as well as a function to decide how what conditional edge to take.
from langgraph.prebuilt import ToolInvocation
from langchain_core.messages import ToolMessage
# Define the function that determines whether to continue or not
def should_continue(state):
messages = state["messages"]
last_message = messages[-1]
# If there is no function call, then we finish
if not last_message.tool_calls:
return "end"
# Otherwise if there is, we continue
else:
return "continue"
# Define the function that calls the model
def call_model(state):
messages = state["messages"]
response = model.invoke(messages)
# We return a list, because this will get added to the existing list
return {"messages": [response]}
# Define the function to execute tools
def call_tool(state):
messages = state["messages"]
# Based on the continue condition
# we know the last message involves a function call
last_message = messages[-1]
# We construct an ToolInvocation from the function_call
tool_call = last_message.tool_calls[0]
action = ToolInvocation(
tool=tool_call["name"],
tool_input=tool_call["args"],
)
# We call the tool_executor and get back a response
response = tool_executor.invoke(action)
# We use the response to create a ToolMessage
tool_message = ToolMessage(
content=str(response), name=action.tool, tool_call_id=tool_call["id"]
)
# We return a list, because this will get added to the existing list
return {"messages": [tool_message]}
Define the graph¶
We can now put it all together and define the graph!
from langgraph.graph import StateGraph, END
# Define a new graph
workflow = StateGraph(State)
# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", call_tool)
# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.set_entry_point("agent")
# We now add a conditional edge
workflow.add_conditional_edges(
# First, we define the start node. We use `agent`.
# This means these are the edges taken after the `agent` node is called.
"agent",
# Next, we pass in the function that will determine which node is called next.
should_continue,
# Finally we pass in a mapping.
# The keys are strings, and the values are other nodes.
# END is a special node marking that the graph should finish.
# What will happen is we will call `should_continue`, and then the output of that
# will be matched against the keys in this mapping.
# Based on which one it matches, that node will then be called.
{
# If `tools`, then we call the tool node.
"continue": "action",
# Otherwise we finish.
"end": END,
},
)
# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("action", "agent")
Persistence
To add in persistence, we pass in a checkpoint when compiling the graph. Persistence is required to support interrupts, since the graph will stop executing while it is interrupted.
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")
Interrupt
To always interrupt before a particular node, pass the name of the node to compile.
# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile(checkpointer=memory, interrupt_before=["action"])
Preview the graph¶
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))
Interacting with the Agent¶
We can now interact with the agent and see that it stops before calling a tool.
from langchain_core.messages import HumanMessage
thread = {"configurable": {"thread_id": "2"}}
inputs = [HumanMessage(content="hi! I'm bob")]
for event in app.stream({"messages": inputs}, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
================================ Human Message ================================= hi! I'm bob ================================== Ai Message ================================== Hello Bob! How can I assist you today?
inputs = [HumanMessage(content="What did I tell you my name was?")]
for event in app.stream({"messages": inputs}, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
================================ Human Message ================================= What did I tell you my name was? ================================== Ai Message ================================== You mentioned that your name is Bob. How can I help you, Bob?
inputs = [HumanMessage(content="what's the weather in sf now?")]
for event in app.stream({"messages": inputs}, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
================================ Human Message ================================= what's the weather in sf now? ================================== Ai Message ================================== Tool Calls: search (call_yBVzSaJu8hvdunRIklHNLGhV) Call ID: call_yBVzSaJu8hvdunRIklHNLGhV Args: query: weather in San Francisco
Resume
We can now call the agent again with no inputs to continue, ie. run the tool as requested.
Running an interrupted graph with None in the inputs means to "proceed as if the interruption didn't occur."
for event in app.stream(None, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
================================= Tool Message ================================= Name: search ["It's sunny in San Francisco, but you better look out if you're a Gemini 😈."] ================================== Ai Message ================================== It seems like it's sunny in San Francisco at the moment. Enjoy the weather! If you need more specific details, feel free to ask.