在上一篇里,我们用Rust从0写了一个完整Agent:有LLM、有工具、有记忆、有Plan→Act→Observe循环。但如果你真把它跑在生产环境,很快就会遇到这些问题:❌工具调用是串行的,明明可以并发却在“排队等”❌一个工具卡住,整个Agent就卡死❌
在上一篇里,我们用 Rust 从 0 写了一个完整 Agent: 有 LLM、有工具、有记忆、有 Plan → Act → Observe 循环。
但如果你真把它跑在生产环境,很快就会遇到这些问题:
本质原因只有一个:
大多数 Agent 实现,根本没有一个“像样的执行器(Executor)”。
而 Rust + Tokio,恰好非常适合干这件事。
我们先退一步看 Agent 在“工程上”到底是什么。
不是一个 while loop 而是一个不断生成和执行「任务(Task)」的系统
User Goal
↓
Planner (LLM)
↓
生成一组 Actions(可能有依赖)
↓
Executor
↓
并发执行 Tools
↓
Collect Observations
↓
再喂给 Planner
⚠️ 关键变化在这里:
一次规划,不一定只产生一个 Action
比如 LLM 可能说:
“先并行请求 A / B / C 三个接口,等结果回来再综合分析”
如果你还是用“一个 loop + 一个 tool call”,性能和成本都会很差。
在代码层面,我们要把 “LLM 的想法” 和 “真正执行的任务” 分开。
pub enum AgentAction {
ToolCall { tool: String, input: Value },
Parallel { actions: Vec<AgentAction> },
Final { answer: String },
}
pub struct Task {
pub id: Uuid,
pub tool: String,
pub input: Value,
pub timeout: Duration,
}
👉 LLM 决定做什么(Action) 👉 Executor 决定怎么、何时、并发多少去做(Task)
这是 Agent 工程化的分水岭。
pub struct AgentExecutor {
semaphore: Arc<Semaphore>,
tool_registry: ToolRegistry,
}
impl AgentExecutor {
pub async fn run_task(&self, task: Task) -> TaskResult {
let _permit = self.semaphore.acquire().await.unwrap();
let tool = match self.tool_registry.get(&task.tool) {
Some(t) => t,
None => return TaskResult::error(task.id, "tool not found"),
};
let fut = tool.call(task.input);
match tokio::time::timeout(task.timeout, fut).await {
Ok(Ok(output)) => TaskResult::success(task.id, output),
Ok(Err(e)) => TaskResult::error(task.id, e.to_string()),
Err(_) => TaskResult::error(task.id, "timeout"),
}
}
}
💡 这一小段代码,已经解决了 80% Agent 在生产中的问题。
当 LLM 给出一个并行 Action:
{
"type": "Parallel",
"actions": [
{"type":"ToolCall","tool":"http_get","input":{"url":"..."}},
{"type":"ToolCall","tool":"http_get","input":{"url":"..."}}
]
}
fn actions_to_tasks(actions: Vec<AgentAction>) -> Vec<Task> {
actions.into_iter().map(|a| {
match a {
AgentAction::ToolCall { tool, input } => Task {
id: Uuid::new_v4(),
tool,
input,
timeout: Duration::from_secs(10),
},
_ => unreachable!(),
}
}).collect()
}
use futures::stream::{FuturesUnordered, StreamExt};
pub async fn run_parallel(
executor: Arc<AgentExecutor>,
tasks: Vec<Task>,
) -> Vec<TaskResult> {
let mut futs = FuturesUnordered::new();
for task in tasks {
let exec = executor.clone();
futs.push(tokio::spawn(async move {
exec.run_task(task).await
}));
}
let mut results = vec![];
while let Some(res) = futs.next().await {
if let Ok(task_res) = res {
results.push(task_res);
}
}
results
}
📌 注意:
很多 Agent 的一个致命问题是:
👉 把工具的原始 JSON 全量丢回给 LLM
这样会导致:
pub struct Observation {
pub summary: String,
pub raw: Option<Value>,
}
你可以做:
fn summarize_results(results: &[TaskResult]) -> Observation {
let success = results.iter().filter(|r| r.is_ok()).count();
let failed = results.len() - success;
Observation {
summary: format!(
"Executed {} tasks: {} success, {} failed",
results.len(), success, failed
),
raw: None,
}
}
👉 LLM 负责“思考”,Executor 负责“脏活累活”
这是血泪总结,特别适合写在公众号里:
if step > MAX_STEPS {
return Final("Stopped: max steps reached");
}
allowed_tools = ["http_get", "read_file"];
如果你用过 Python Agent 框架,会很熟悉这些痛点:
| 问题 | Python 常见情况 | Rust 方案 |
|---|---|---|
| 并发 | async 混乱 | Tokio + Semaphore |
| 内存 | 随跑随涨 | 明确所有权 |
| 失败 | try/except 吞掉 | Result 强制处理 |
| 执行结构 | 动态、隐式 | 明确 Task / Executor |
| 长期运行 | 容易泄漏 | 稳定 |
Agent 一旦“长期跑”,就已经是系统工程,而不是 Prompt 工程。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!