Rust + Tokio:把 Agent 从 Demo 升级为“生产级执行器”

  • King
  • 发布于 9小时前
  • 阅读 20

在上一篇里,我们用Rust从0写了一个完整Agent:有LLM、有工具、有记忆、有Plan→Act→Observe循环。但如果你真把它跑在生产环境,很快就会遇到这些问题:❌工具调用是串行的,明明可以并发却在“排队等”❌一个工具卡住,整个Agent就卡死❌

在上一篇里,我们用 Rust 从 0 写了一个完整 Agent: 有 LLM、有工具、有记忆、有 Plan → Act → Observe 循环。

但如果你真把它跑在生产环境,很快就会遇到这些问题:

  • ❌ 工具调用是串行的,明明可以并发却在“排队等”
  • ❌ 一个工具卡住,整个 Agent 就卡死
  • ❌ LLM 疯狂重复调用同一个工具,token 和钱一起烧
  • ❌ 多个 Agent 同时跑,RPC / API 被打爆
  • ❌ 没有“任务级结构”,Debug 像在破案

本质原因只有一个:

大多数 Agent 实现,根本没有一个“像样的执行器(Executor)”。

而 Rust + Tokio,恰好非常适合干这件事。


从「循环」到「执行器」:认清 Agent 的真实形态

我们先退一步看 Agent 在“工程上”到底是什么。

不是一个 while loop 而是一个不断生成和执行「任务(Task)」的系统

抽象一下 Agent 的运行模型

User Goal
   ↓
Planner (LLM)
   ↓
生成一组 Actions(可能有依赖)
   ↓
Executor
   ↓
并发执行 Tools
   ↓
Collect Observations
   ↓
再喂给 Planner

⚠️ 关键变化在这里:

一次规划,不一定只产生一个 Action

比如 LLM 可能说:

“先并行请求 A / B / C 三个接口,等结果回来再综合分析”

如果你还是用“一个 loop + 一个 tool call”,性能和成本都会很差。


引入核心概念:Action ≠ Task

在代码层面,我们要把 “LLM 的想法”“真正执行的任务” 分开。

Action:LLM 视角(逻辑层)

pub enum AgentAction {
    ToolCall { tool: String, input: Value },
    Parallel { actions: Vec<AgentAction> },
    Final { answer: String },
}

Task:执行器视角(物理层)

pub struct Task {
    pub id: Uuid,
    pub tool: String,
    pub input: Value,
    pub timeout: Duration,
}

👉 LLM 决定做什么(Action) 👉 Executor 决定怎么、何时、并发多少去做(Task)

这是 Agent 工程化的分水岭。


用 Tokio 写一个 Agent Executor

Executor 的职责只有三件事

  1. 控制并发(不能无限起任务)
  2. 控制失败(超时 / 重试 / 降级)
  3. 汇总结果(Observation)

Executor 结构

pub struct AgentExecutor {
    semaphore: Arc<Semaphore>,
    tool_registry: ToolRegistry,
}

执行单个 Task(带限流 + 超时)

impl AgentExecutor {
    pub async fn run_task(&self, task: Task) -> TaskResult {
        let _permit = self.semaphore.acquire().await.unwrap();

        let tool = match self.tool_registry.get(&task.tool) {
            Some(t) => t,
            None => return TaskResult::error(task.id, "tool not found"),
        };

        let fut = tool.call(task.input);
        match tokio::time::timeout(task.timeout, fut).await {
            Ok(Ok(output)) => TaskResult::success(task.id, output),
            Ok(Err(e)) => TaskResult::error(task.id, e.to_string()),
            Err(_) => TaskResult::error(task.id, "timeout"),
        }
    }
}

💡 这一小段代码,已经解决了 80% Agent 在生产中的问题。


并发执行:Parallel Action 怎么落地?

当 LLM 给出一个并行 Action:

{
  "type": "Parallel",
  "actions": [
    {"type":"ToolCall","tool":"http_get","input":{"url":"..."}},
    {"type":"ToolCall","tool":"http_get","input":{"url":"..."}}
  ]
}

转换为 Tasks

fn actions_to_tasks(actions: Vec<AgentAction>) -> Vec<Task> {
    actions.into_iter().map(|a| {
        match a {
            AgentAction::ToolCall { tool, input } => Task {
                id: Uuid::new_v4(),
                tool,
                input,
                timeout: Duration::from_secs(10),
            },
            _ => unreachable!(),
        }
    }).collect()
}

用 FuturesUnordered 并发跑

use futures::stream::{FuturesUnordered, StreamExt};

pub async fn run_parallel(
    executor: Arc<AgentExecutor>,
    tasks: Vec<Task>,
) -> Vec<TaskResult> {
    let mut futs = FuturesUnordered::new();

    for task in tasks {
        let exec = executor.clone();
        futs.push(tokio::spawn(async move {
            exec.run_task(task).await
        }));
    }

    let mut results = vec![];
    while let Some(res) = futs.next().await {
        if let Ok(task_res) = res {
            results.push(task_res);
        }
    }
    results
}

📌 注意:

  • 并发数由 Semaphore 控制
  • 不会因为某个 task 卡住而阻塞全部

Observation 汇总:喂给 LLM 的不是“原始输出”

很多 Agent 的一个致命问题是:

👉 把工具的原始 JSON 全量丢回给 LLM

这样会导致:

  • prompt 巨大
  • token 成本爆炸
  • LLM 反而抓不住重点

正确做法:Observation 是“执行器加工后的结果”

pub struct Observation {
    pub summary: String,
    pub raw: Option<Value>,
}

你可以做:

  • 截断
  • 结构化
  • 错误归类
  • 多结果合并
fn summarize_results(results: &[TaskResult]) -> Observation {
    let success = results.iter().filter(|r| r.is_ok()).count();
    let failed = results.len() - success;

    Observation {
        summary: format!(
            "Executed {} tasks: {} success, {} failed",
            results.len(), success, failed
        ),
        raw: None,
    }
}

👉 LLM 负责“思考”,Executor 负责“脏活累活”


防止 Agent 跑飞:生产环境必备的 5 条铁律

这是血泪总结,特别适合写在公众号里:

① 最大步数(Hard Stop)

if step > MAX_STEPS {
    return Final("Stopped: max steps reached");
}

② 重复 Action 检测

  • 连续 N 次调用同一 tool
  • 输入高度相似 → 强制总结并结束

③ Tool 白名单

allowed_tools = ["http_get", "read_file"];

④ 成本预算

  • token 上限
  • tool 调用次数上限

⑤ 人类兜底(Human-in-the-loop)

  • 高风险 action(转账 / 写文件)
  • 必须人工确认

为什么这套 Agent 更“Rust 风格”?

如果你用过 Python Agent 框架,会很熟悉这些痛点:

问题 Python 常见情况 Rust 方案
并发 async 混乱 Tokio + Semaphore
内存 随跑随涨 明确所有权
失败 try/except 吞掉 Result 强制处理
执行结构 动态、隐式 明确 Task / Executor
长期运行 容易泄漏 稳定

Agent 一旦“长期跑”,就已经是系统工程,而不是 Prompt 工程。

点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
King
King
0x56af...a0dd
擅长Rust/Solidity/FunC/Move开发