Human-in-the-Loop (HITL,人机协同) 是 AI 工作流中一种强大的机制,通过将人类干预融入系统来提高可靠性、可控性和准确性。在 LangGraph(一个用于构建状态化、基于图的 AI 代理的框架)中,HITL 通过状态管理和中断机制得以实现。本文将详细讲解 LangGraph 中 HITL 的实现原理,澄清 interrupt() 和检查点(checkpoint)的角色,介绍支持的数据库选项,并强调关键注意事项。

HITL 概述

HITL 的核心目标是让 AI 系统在关键步骤暂停,等待人类输入(例如批准、编辑或补充信息),然后根据反馈继续执行。这种机制特别适用于需要人类监督的场景,如金融交易、医疗诊断或内容生成。

在 LangGraph 中,HITL 依赖以下核心组件:

  • 状态持久化:通过检查点保存工作流状态,确保暂停后可恢复。
  • 中断机制:使用 interrupt() 函数暂停执行,等待人类输入。
  • 命令机制:通过 Command 对象传递人类反馈,动态控制工作流走向。

HITL 的实现原理

LangGraph 的 HITL 实现基于其状态图(StateGraph)架构,通过节点、边和检查点协调 AI 代理与人类交互。以下是实现 HITL 的关键步骤:

1. 状态定义

状态(State)是一个存储工作流上下文的结构,通常使用 TypedDict 定义,包含消息历史、工具调用结果等。状态通过 add_messages 等注解更新,确保人类输入可以追加或修改上下文。

示例

  1. from typing import Annotated
  2. from typing_extensions import TypedDict
  3. from langgraph.graph.message import add_messages
  4. class State(TypedDict):
  5. messages: Annotated[list, add_messages]

2. 中断节点

中断通过 interrupt() 函数或 interrupt_before/interrupt_after 配置实现,在特定节点暂停工作流,等待人类输入。interrupt() 抛出 GraphInterrupt 异常,暂停执行并保存状态。

示例

  1. from langgraph.types import interrupt
  2. def human_node(state: State):
  3. value = interrupt({"text_to_review": state["messages"][-1].content})
  4. return {"messages": [{"role": "assistant", "content": value}]}

3. 检查点与持久化

检查点是 HITL 的核心基础设施,负责保存和恢复状态。LangGraph 使用检查点器(如 MemorySaverSqliteSaver)持久化状态,确保工作流可以在长时间暂停后恢复。

示例

  1. from langgraph.checkpoint.memory import MemorySaver
  2. from langgraph.graph import StateGraph
  3. workflow = StateGraph(State)
  4. workflow.add_node("human_node", human_node)
  5. checkpointer = MemorySaver()
  6. graph = workflow.compile(checkpointer=checkpointer, interrupt_before=["human_node"])

4. 运行与恢复

工作流运行到中断点时暂停,人类通过 Command 对象提供输入,系统从检查点恢复状态并继续执行。

示例

  1. from langgraph.types import Command
  2. thread = {"configurable": {"thread_id": "1"}}
  3. for event in graph.stream({"messages": [HumanMessage(content="Hello")]}, thread):
  4. print(event)
  5. # 提供人类输入并恢复
  6. human_input = "Approved"
  7. graph.stream(Command(resume={"data": human_input}), thread)

5. 条件边

条件边根据人类输入动态决定工作流走向。例如,若人类批准操作,进入工具调用节点;若拒绝,则结束。

示例

  1. def should_continue(state: State):
  2. last_message = state["messages"][-1]
  3. return "action" if last_message.content == "Approved" else "end"
  4. workflow.add_conditional_edges("human_node", should_continue, {"action": "tool_node", "end": "END"})

interrupt() 与检查点的关系

在 HITL 中,interrupt() 和检查点相辅相成,但角色不同:

  • interrupt():触发暂停的机制,抛出 GraphInterrupt 异常,标记工作流在特定节点停止,等待人类输入。
  • 检查点:保存暂停时的状态快照(包括 State 中的消息、工具调用等),并在恢复时加载,确保上下文不丢失。

类比

  • interrupt() 像汽车的刹车,负责暂停。
  • 检查点像引擎,保存状态并支持恢复。

为什么检查点是核心? 检查点提供了状态持久化和恢复的基础,支持异步操作、时间旅行(回滚到之前状态)和多轮交互。没有检查点,interrupt() 仅能暂停,但无法保存上下文,导致恢复失败。

支持的数据库选项

检查点通过 Checkpointer 接口持久化状态,LangGraph 支持以下数据库:

1. 内存存储(MemorySaver)

  • 描述:将状态保存在内存中,适合开发和测试。
  • 特点
    • 快速、轻量,无需外部数据库。
    • 不支持跨进程持久化,进程重启后状态丢失。
  • 适用场景:本地调试、短期运行。
  • 示例
    1. from langgraph.checkpoint.memory import MemorySaver
    2. checkpointer = MemorySaver()

2. SQLite(SqliteSaver)

  • 描述:使用 SQLite 数据库存储检查点,适合轻量级持久化。
  • 特点
    • 存储在文件(如 checkpoints.sqlite)。
    • 轻量、易部署,适合单机环境。
    • 不适合高并发或分布式系统。
  • 适用场景:中小型项目、单机部署。
  • 示例
    1. from langgraph.checkpoint.sqlite import SqliteSaver
    2. checkpointer = SqliteSaver.from_conn_string("checkpoints.sqlite")

3. PostgreSQL(PostgresSaver)

  • 描述:使用 PostgreSQL 存储检查点,适合生产环境。
  • 特点
    • 支持高并发和分布式系统。
    • 提供事务支持和数据一致性。
    • 需要配置数据库实例。
  • 适用场景:生产环境、高可用性需求。
  • 示例
    1. from langgraph.checkpoint.postgres import PostgresSaver
    2. checkpointer = PostgresSaver.from_conn_string("postgresql://user:password@localhost:5432/dbname")

4. 自定义检查点器

  • 描述:通过继承 BaseCheckpointSaver 实现自定义存储,适配任意数据库(如 Redis、MongoDB)。
  • 特点:灵活,可集成现有基础设施。
  • 适用场景:特殊存储需求。
  • 示例(Redis)

    1. from langgraph.checkpoint.base import BaseCheckpointSaver
    2. import redis
    3. import json
    4. class RedisSaver(BaseCheckpointSaver):
    5. def __init__(self, redis_client):
    6. super().__init__()
    7. self.client = redis_client
    8. def put_state(self, config, state):
    9. self.client.set(config["thread_id"], json.dumps(state))
    10. return config
    11. def get_state(self, config):
    12. state = self.client.get(config["thread_id"])
    13. return json.loads(state) if state else None
    14. def list_states(self, config):
    15. return []
    16. redis_client = redis.Redis(host="localhost", port=6379, db=0)
    17. checkpointer = RedisSaver(redis_client)

如何选择数据库

选择数据库需根据应用场景:

  • 开发/测试MemorySaver,简单快速。
  • 单机应用SqliteSaver,轻量持久化。
  • 生产环境PostgresSaver 或自定义检查点器,适合高并发和分布式系统。
  • 特殊需求:自定义检查点器,适配 Redis、MongoDB 等。

关键注意事项

  • 避免捕获 GraphInterrupt 异常:使用 interrupt() 时,避免在 try/catch 中捕获 GraphInterrupt 异常,否则可能破坏中断逻辑,导致工作流无法正确暂停或恢复。

    1. # 错误示例
    2. def human_node(state: State):
    3. try:
    4. value = interrupt({"text_to_review": state["messages"][-1].content})
    5. except GraphInterrupt:
    6. pass # 捕获 GraphInterrupt 会破坏中断逻辑
    7. return {"messages": [{"role": "assistant", "content": value}]}
    8. # 正确示例
    9. def human_node(state: State):
    10. value = interrupt({"text_to_review": state["messages"][-1].content})
    11. return {"messages": [{"role": "assistant", "content": value}]}
  • 线程隔离:使用 thread_id 区分会话,确保状态隔离。
  • 序列化:状态需序列化为 JSON 或其他格式。
  • 性能:内存最快,SQLite 次之,PostgreSQL 适合高并发。
  • 一致性:生产环境中需确保事务一致性。

示例:使用 SQLite 持久化 HITL

以下是一个使用 SqliteSaver 的 HITL 工作流示例:

  1. from langgraph.graph import StateGraph, START, END
  2. from langgraph.types import interrupt, Command
  3. from langgraph.checkpoint.sqlite import SqliteSaver
  4. from langchain_core.messages import HumanMessage, AIMessage
  5. from typing import Annotated
  6. from typing_extensions import TypedDict
  7. # 定义状态
  8. class State(TypedDict):
  9. messages: Annotated[list, add_messages]
  10. # 节点:AI 代理
  11. def agent_node(state: State):
  12. return {"messages": [AIMessage(content="等待人类批准")]}
  13. # 节点:人类输入
  14. def human_node(state: State):
  15. human_input = interrupt({"message": "请批准或拒绝"})
  16. return {"messages": [AIMessage(content=f"人类回应:{human_input}")]}
  17. # 构建工作流
  18. workflow = StateGraph(State)
  19. workflow.add_node("agent", agent_node)
  20. workflow.add_node("human", human_node)
  21. workflow.add_edge(START, "agent")
  22. workflow.add_edge("agent", "human")
  23. workflow.add_edge("human", END)
  24. # 使用 SQLite 检查点
  25. checkpointer = SqliteSaver.from_conn_string("checkpoints.sqlite")
  26. graph = workflow.compile(checkpointer=checkpointer, interrupt_before=["human"])
  27. # 运行到中断
  28. thread = {"configurable": {"thread_id": "session_1"}}
  29. for event in graph.stream({"messages": [HumanMessage(content="开始")]}, thread):
  30. print(event)
  31. # 检查状态
  32. state = graph.get_state(thread)
  33. print("保存的状态:", state)
  34. # 模拟人类输入并恢复
  35. graph.stream(Command(resume={"data": "已批准"}), thread)

运行结果

  • 状态保存在 checkpoints.sqlite 中,即使进程重启,状态可恢复。
  • 中断后,人类输入通过 Command 传递,系统从 SQLite 加载状态继续执行。

总结

LangGraph 的 HITL 通过 interrupt() 和检查点实现人机协同:

  • interrupt() 触发暂停,标记等待人类输入的节点。
  • 检查点 保存状态快照,支持暂停后的恢复、异步操作和时间旅行。
  • 支持的数据库MemorySaver(开发)、SqliteSaver(单机)、PostgresSaver(生产)、自定义检查点器(特殊需求)。
  • 关键提示:避免在 try/catch 中捕获 GraphInterrupt 异常,以确保中断逻辑正常运行。

通过合理配置,LangGraph 的 HITL 可广泛应用于需要人类监督的场景,如工具调用验证、内容审核和复杂决策支持。选择合适的数据库可确保工作流的高效性和可靠性。