Human-in-the-Loop (HITL,人机协同) 是 AI 工作流中一种强大的机制,通过将人类干预融入系统来提高可靠性、可控性和准确性。在 LangGraph(一个用于构建状态化、基于图的 AI 代理的框架)中,HITL 通过状态管理和中断机制得以实现。本文将详细讲解 LangGraph 中 HITL 的实现原理,澄清
interrupt()
和检查点(checkpoint)的角色,介绍支持的数据库选项,并强调关键注意事项。
HITL 概述
HITL 的核心目标是让 AI 系统在关键步骤暂停,等待人类输入(例如批准、编辑或补充信息),然后根据反馈继续执行。这种机制特别适用于需要人类监督的场景,如金融交易、医疗诊断或内容生成。
在 LangGraph 中,HITL 依赖以下核心组件:
- 状态持久化:通过检查点保存工作流状态,确保暂停后可恢复。
- 中断机制:使用
interrupt()
函数暂停执行,等待人类输入。 - 命令机制:通过
Command
对象传递人类反馈,动态控制工作流走向。
HITL 的实现原理
LangGraph 的 HITL 实现基于其状态图(StateGraph
)架构,通过节点、边和检查点协调 AI 代理与人类交互。以下是实现 HITL 的关键步骤:
1. 状态定义
状态(State
)是一个存储工作流上下文的结构,通常使用 TypedDict
定义,包含消息历史、工具调用结果等。状态通过 add_messages
等注解更新,确保人类输入可以追加或修改上下文。
示例:
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
class State(TypedDict):
messages: Annotated[list, add_messages]
2. 中断节点
中断通过 interrupt()
函数或 interrupt_before
/interrupt_after
配置实现,在特定节点暂停工作流,等待人类输入。interrupt()
抛出 GraphInterrupt
异常,暂停执行并保存状态。
示例:
from langgraph.types import interrupt
def human_node(state: State):
value = interrupt({"text_to_review": state["messages"][-1].content})
return {"messages": [{"role": "assistant", "content": value}]}
3. 检查点与持久化
检查点是 HITL 的核心基础设施,负责保存和恢复状态。LangGraph 使用检查点器(如 MemorySaver
或 SqliteSaver
)持久化状态,确保工作流可以在长时间暂停后恢复。
示例:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
workflow = StateGraph(State)
workflow.add_node("human_node", human_node)
checkpointer = MemorySaver()
graph = workflow.compile(checkpointer=checkpointer, interrupt_before=["human_node"])
4. 运行与恢复
工作流运行到中断点时暂停,人类通过 Command
对象提供输入,系统从检查点恢复状态并继续执行。
示例:
from langgraph.types import Command
thread = {"configurable": {"thread_id": "1"}}
for event in graph.stream({"messages": [HumanMessage(content="Hello")]}, thread):
print(event)
# 提供人类输入并恢复
human_input = "Approved"
graph.stream(Command(resume={"data": human_input}), thread)
5. 条件边
条件边根据人类输入动态决定工作流走向。例如,若人类批准操作,进入工具调用节点;若拒绝,则结束。
示例:
def should_continue(state: State):
last_message = state["messages"][-1]
return "action" if last_message.content == "Approved" else "end"
workflow.add_conditional_edges("human_node", should_continue, {"action": "tool_node", "end": "END"})
interrupt()
与检查点的关系
在 HITL 中,interrupt()
和检查点相辅相成,但角色不同:
interrupt()
:触发暂停的机制,抛出GraphInterrupt
异常,标记工作流在特定节点停止,等待人类输入。- 检查点:保存暂停时的状态快照(包括
State
中的消息、工具调用等),并在恢复时加载,确保上下文不丢失。
类比:
interrupt()
像汽车的刹车,负责暂停。- 检查点像引擎,保存状态并支持恢复。
为什么检查点是核心?
检查点提供了状态持久化和恢复的基础,支持异步操作、时间旅行(回滚到之前状态)和多轮交互。没有检查点,interrupt()
仅能暂停,但无法保存上下文,导致恢复失败。
支持的数据库选项
检查点通过 Checkpointer
接口持久化状态,LangGraph 支持以下数据库:
1. 内存存储(MemorySaver)
- 描述:将状态保存在内存中,适合开发和测试。
- 特点:
- 快速、轻量,无需外部数据库。
- 不支持跨进程持久化,进程重启后状态丢失。
- 适用场景:本地调试、短期运行。
- 示例:
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
2. SQLite(SqliteSaver)
- 描述:使用 SQLite 数据库存储检查点,适合轻量级持久化。
- 特点:
- 存储在文件(如
checkpoints.sqlite
)。 - 轻量、易部署,适合单机环境。
- 不适合高并发或分布式系统。
- 存储在文件(如
- 适用场景:中小型项目、单机部署。
- 示例:
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("checkpoints.sqlite")
3. PostgreSQL(PostgresSaver)
- 描述:使用 PostgreSQL 存储检查点,适合生产环境。
- 特点:
- 支持高并发和分布式系统。
- 提供事务支持和数据一致性。
- 需要配置数据库实例。
- 适用场景:生产环境、高可用性需求。
- 示例:
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string("postgresql://user:password@localhost:5432/dbname")
4. 自定义检查点器
- 描述:通过继承
BaseCheckpointSaver
实现自定义存储,适配任意数据库(如 Redis、MongoDB)。 - 特点:灵活,可集成现有基础设施。
- 适用场景:特殊存储需求。
示例(Redis):
from langgraph.checkpoint.base import BaseCheckpointSaver
import redis
import json
class RedisSaver(BaseCheckpointSaver):
def __init__(self, redis_client):
super().__init__()
self.client = redis_client
def put_state(self, config, state):
self.client.set(config["thread_id"], json.dumps(state))
return config
def get_state(self, config):
state = self.client.get(config["thread_id"])
return json.loads(state) if state else None
def list_states(self, config):
return []
redis_client = redis.Redis(host="localhost", port=6379, db=0)
checkpointer = RedisSaver(redis_client)
如何选择数据库
选择数据库需根据应用场景:
- 开发/测试:
MemorySaver
,简单快速。 - 单机应用:
SqliteSaver
,轻量持久化。 - 生产环境:
PostgresSaver
或自定义检查点器,适合高并发和分布式系统。 - 特殊需求:自定义检查点器,适配 Redis、MongoDB 等。
关键注意事项
避免捕获
GraphInterrupt
异常:使用interrupt()
时,避免在try/catch
中捕获GraphInterrupt
异常,否则可能破坏中断逻辑,导致工作流无法正确暂停或恢复。# 错误示例
def human_node(state: State):
try:
value = interrupt({"text_to_review": state["messages"][-1].content})
except GraphInterrupt:
pass # 捕获 GraphInterrupt 会破坏中断逻辑
return {"messages": [{"role": "assistant", "content": value}]}
# 正确示例
def human_node(state: State):
value = interrupt({"text_to_review": state["messages"][-1].content})
return {"messages": [{"role": "assistant", "content": value}]}
- 线程隔离:使用
thread_id
区分会话,确保状态隔离。 - 序列化:状态需序列化为 JSON 或其他格式。
- 性能:内存最快,SQLite 次之,PostgreSQL 适合高并发。
- 一致性:生产环境中需确保事务一致性。
示例:使用 SQLite 持久化 HITL
以下是一个使用 SqliteSaver
的 HITL 工作流示例:
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.messages import HumanMessage, AIMessage
from typing import Annotated
from typing_extensions import TypedDict
# 定义状态
class State(TypedDict):
messages: Annotated[list, add_messages]
# 节点:AI 代理
def agent_node(state: State):
return {"messages": [AIMessage(content="等待人类批准")]}
# 节点:人类输入
def human_node(state: State):
human_input = interrupt({"message": "请批准或拒绝"})
return {"messages": [AIMessage(content=f"人类回应:{human_input}")]}
# 构建工作流
workflow = StateGraph(State)
workflow.add_node("agent", agent_node)
workflow.add_node("human", human_node)
workflow.add_edge(START, "agent")
workflow.add_edge("agent", "human")
workflow.add_edge("human", END)
# 使用 SQLite 检查点
checkpointer = SqliteSaver.from_conn_string("checkpoints.sqlite")
graph = workflow.compile(checkpointer=checkpointer, interrupt_before=["human"])
# 运行到中断
thread = {"configurable": {"thread_id": "session_1"}}
for event in graph.stream({"messages": [HumanMessage(content="开始")]}, thread):
print(event)
# 检查状态
state = graph.get_state(thread)
print("保存的状态:", state)
# 模拟人类输入并恢复
graph.stream(Command(resume={"data": "已批准"}), thread)
运行结果:
- 状态保存在
checkpoints.sqlite
中,即使进程重启,状态可恢复。 - 中断后,人类输入通过
Command
传递,系统从 SQLite 加载状态继续执行。
总结
LangGraph 的 HITL 通过 interrupt()
和检查点实现人机协同:
interrupt()
触发暂停,标记等待人类输入的节点。- 检查点 保存状态快照,支持暂停后的恢复、异步操作和时间旅行。
- 支持的数据库:
MemorySaver
(开发)、SqliteSaver
(单机)、PostgresSaver
(生产)、自定义检查点器(特殊需求)。 - 关键提示:避免在
try/catch
中捕获GraphInterrupt
异常,以确保中断逻辑正常运行。
通过合理配置,LangGraph 的 HITL 可广泛应用于需要人类监督的场景,如工具调用验证、内容审核和复杂决策支持。选择合适的数据库可确保工作流的高效性和可靠性。