这是最完整的技术版本。架构演进、代码片段、数据库Schema、97个commit里每一次翻车——全部展开讲。如果你想复刻类似项目,这篇应该能让你少走几周弯路。
客户的真实痛点
客户是悉尼一家做定制铝合金门窗的公司,老板是华人。
很多人第一反应会觉得痛点是"邮件太多处理不过来"——不是。真实的痛点是两个:
1. 跨国习惯差异。 澳洲的B2B业务高度依赖邮件。客户发询盘、发图纸、确认报价、跟进进度,全都通过邮件。但老板在国内做生意多年,习惯的是微信——语音消息、截图、直接打电话。邮件这套工作流对他来说不是"太多",而是"不习惯"。他不知道什么时候该回、回什么格式、怎么追一封两周前的邮件线程。
2. 语言障碍。 老板能日常交流英语,但写不出专业的商务邮件。"请在方便时确认报价"、"附件是修改后的图纸,请查收"——这种句式他写不出来。之前的做法是把内容一条条喂给ChatGPT,复制粘贴回Outlook。能用,但一封邮件要折腾十几分钟。
所以需求不是"帮我自动处理邮件",而是:帮我用我习惯的方式(中文、消息平台)来驱动一个我不习惯的工作流(英文邮件+文件管理)。
这个需求定义决定了后面所有的架构选择。Agent的核心价值不是"自动化",是"桥接"——桥接语言、桥接工作习惯、桥接平台。
整体架构
┌─────────────┐
│ Outlook │
│ (邮箱) │
└──────┬──────┘
│ Microsoft Graph API
▼
┌────────────────────────┐
│ 邮件处理流水线 │
│ (每5分钟) │
│ │
│ 1. 增量拉取 │
│ 2. 规则预过滤 │
│ 3. LLM分类 (A/B/R/C) │
│ 4. 字段提取 │
│ 5. 去重 & 合并 │
│ 6. 存储 → Supabase │
│ 7. 通知老板 │
└────────────┬───────────┘
│
┌────────────▼───────────┐
│ Supabase PostgreSQL │
│ │
│ inquiries │
│ bot_sessions │
│ memories_global │
│ customer_memories │
└────────────┬───────────┘
│
┌─────────────────┼─────────────────┐
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│ Discord │ │Telegram │ │WhatsApp │
│ Bot │ │ Bot │ │ Webhook │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└────────┬────────┘─────────┬───────┘
│ │
┌──────▼──────┐ ┌──────▼──────┐
│ Chat Agent │ │ OneDrive │
│ (LLM) │ │ Watcher │
│ 20+ 工具 │ │ (每10分钟) │
└─────────────┘ └─────────────┘
三层架构:
- 数据摄入层:邮件流水线 + OneDrive监听,自动拉取外部数据
- 存储层:Supabase存储所有客户状态、会话历史、提取的记忆
- 交互层:老板通过任意平台和Agent对话,Agent通过20+工具执行操作
架构演进:三个阶段,三次推翻
不是一开始就想清楚了才动手的。整个项目经历了三次架构级别的推翻重来。
第一阶段:分类器 + 路由器(v1,活了一天)
v1的思路很"教科书":
老板发消息 → 分类器判断意图 → 路由到对应处理器 → 返回结果
分类器输出一个JSON:{"action": "stats"} 或 {"action": "advance"}。然后硬编码的处理器执行对应操作。
代码大概长这样:
# v1: classifier + router(已废弃)
async def handle_message(message):
intent = await classify_intent(message.content)
# intent = {"action": "stats"} or {"action": "advance"} etc.
handler = ROUTE_MAP.get(intent["action"])
if handler:
return await handler(message, intent)
else:
return "不支持的操作"
ROUTE_MAP = {
"stats": handle_stats,
"advance": handle_advance,
"draft": handle_draft,
"files": handle_files,
}
问题是什么?真实场景不是一问一答的。
老板说"看一下Steward的平面图"。分类器把它路由到"files"动作,去OneDrive找文件——找不到。结束了。
但实际上正确的做法是:先搜客户 → 检查OneDrive → 没有 → 搜邮件附件 → 找到了 → 返回。这是一个多步推理链,分类器做不到。
v1的代码在上线第一天就暴露了这个问题,我在commit 6a3df39 里把整个分类器架构推翻了。
第二阶段:Agentic Tool-Calling(活到现在)
推翻之后换成了ReAct模式:Agent拿到一堆工具,自己决定调用顺序和策略。
同样是"看一下Steward的平面图":
Agent → search_customer("Steward") → 找到客户
Agent → list_customer_files(customer_id) → OneDrive没有
Agent → search_email_attachments(customer_id) → 邮箱里找到了PDF
Agent → download_and_show_attachments(attachment_id) → 转为图片发给老板
四步自主完成。不需要任何硬编码路由。
核心循环的实现:
async def agent_loop(user_message, session, channel):
messages = build_context(session, user_message)
for round_num in range(MAX_ROUNDS := 15):
response = await llm_call(
MODEL_TIER_3,
messages=messages,
tools=ALL_TOOL_DEFINITIONS
)
if response.has_tool_calls:
for tool_call in response.tool_calls:
result = await execute_tool(tool_call, channel)
messages.append({"role": "tool", "content": result,
"tool_call_id": tool_call.id})
messages.append(response.assistant_message)
else:
# Agent决定不需要更多工具调用,返回最终回复
await channel.send_text(response.content)
return
# 达到15轮上限
await channel.send_text("⚠️ 操作步骤过多,请简化请求")
代价是什么?成本和安全性。 每次LLM调用都有成本,一个任务调四五次工具成本就会累积。而且Agent有"自由意志"之后,犯错的方式也更多了——这个后面专门讲。
第三阶段:平台扩展(Discord → Telegram → WhatsApp)
老板一开始在Discord上用(因为我在Discord上开发的)。但他日常用Telegram和微信。
这导致了消息抽象层的出现:
class MessageChannel(ABC):
"""所有消息平台的统一接口"""
@abstractmethod
async def send_text(self, text: str) -> None:
"""发送纯文本,自动处理平台字符限制"""
...
@abstractmethod
async def send_file(self, file_path: str, filename: str) -> None:
"""发送文件附件"""
...
@abstractmethod
async def send_preview(self, long_text: str) -> None:
"""发送长文本预览,按段落边界切块"""
...
三个实现:
| 平台 | 字符限制 | 关键实现细节 |
|---|---|---|
| Discord | 1990字符 | 原生Thread支持、Emoji格式化 |
| Telegram | 4096字符 | 群聊支持、老板ID白名单审批、HTML格式化 |
| 4096字符 | Meta Cloud API webhook、媒体上传、电话验证 |
send_preview()是最巧妙的方法——它把长回复按段落边界切块,确保不会在句子中间截断。切块逻辑根据每个平台的字符限制自动调整。
添加新平台(比如Slack、Teams)只需要实现这个接口,不需要改任何Agent逻辑。
教训:先在用户所在的平台上做MVP,不是你习惯的平台。 我先做了Discord,等于白做了一版。
三级模型路由:成本控制的关键
第一个月全用最强模型处理所有任务,账单出来就知道不行了。
核心思路:每个任务有一个"最低智力门槛"。能用便宜模型搞定的不用贵的。
| 层级 | 定位 | 用途 |
|---|---|---|
| Tier 1 | 快且便宜的小模型 | 邮件分类、记忆去重、规则过滤 |
| Tier 2 | 中档模型 | 字段提取、邮件写作 |
| Tier 3 | 完整推理能力的大模型 | Agent推理、复杂决策 |
配置和调用:
# config.py
MODEL_TIER_1 = "..." # 快且便宜
MODEL_TIER_2 = "..." # 中档
MODEL_TIER_3 = "..." # 完整推理
# 邮件流水线里:
classification = await llm_call(MODEL_TIER_1, classify_prompt) # A/B/C?
extracted = await llm_call(MODEL_TIER_2, extract_prompt) # 姓名、电话等
# Agent交互里:
response = await llm_call(MODEL_TIER_3, agent_prompt, tools=ALL_TOOLS)
所有调用走统一的LLM路由API,底层共享httpx连接池(复用TCP/TLS连接),异步信号量限制最多10个并发,429/5xx自动指数退避重试(3次)。
# llm_client.py - 核心调用逻辑
_semaphore = asyncio.Semaphore(10)
async def llm_call(model, messages, tools=None, max_retries=3):
async with _semaphore:
for attempt in range(max_retries):
try:
resp = await _http_client.post(
LLM_API_ENDPOINT,
json={"model": model, "messages": messages, "tools": tools},
headers={"Authorization": f"Bearer {API_KEY}"}
)
if resp.status_code == 429:
await asyncio.sleep(2 ** attempt)
continue
return resp.json()
except httpx.TimeoutException:
if attempt == max_retries - 1:
raise
判断邮件是A类还是C类这种任务,给小模型一个好prompt就够了,准确率和大模型差不多,但成本是百分之一。
JSONB vs 关系表:数据库设计的核心决策
核心表inquiries最重要的列是context_data(JSONB类型):
CREATE TABLE inquiries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email_id TEXT UNIQUE NOT NULL,
classification TEXT CHECK (classification IN ('A', 'B', 'C')),
confidence FLOAT,
status TEXT CHECK (status IN ('new', 'pending', 'scheduled', 'quoted', 'closed')),
customer_name TEXT,
customer_email TEXT,
customer_phone TEXT,
address TEXT,
suburb TEXT,
product_types TEXT[],
product_description TEXT,
dimensions TEXT,
urgency TEXT DEFAULT 'medium',
notes TEXT,
raw_subject TEXT,
raw_body TEXT, -- 最多5000字符
context_data JSONB,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
context_data的结构:
{
"emails": [
{
"email_id": "AAMkAGI...",
"subject": "Quote request - sliding doors",
"body": "Hi, I need 3 sliding doors...",
"received_at": "2024-03-15T10:00:00Z"
}
],
"pipeline": {
"current_stage": "inquiry",
"stage_updated_at": "2024-03-15T10:00:00Z",
"history": [
{
"from": "new",
"to": "inquiry",
"at": "2024-03-15T10:00:00Z",
"reason": "Auto-classified as A-type",
"triggered_by": "pipeline"
}
]
},
"memories": [
{"type": "customer_note", "content": "预算有限,要基础款"}
],
"has_dimensions": true,
"glass_type": "double IGU",
"language": "en"
}
为什么不用关系表? 我一开始做了标准关系设计——emails表、pipeline_history表、memories表,外键关联。做了一周之后发现问题:Agent每次要回复客户,需要把完整邮件线程、客户状态、历史记忆全部加载到prompt里。这意味着每次都要JOIN三四张表。
改成JSONB之后,一条查询拿到所有上下文:
async def get_customer_context(customer_id):
row = await db.fetchone(
"SELECT * FROM inquiries WHERE id = $1", customer_id
)
# row.context_data 包含所有邮件、pipeline历史、客户记忆
# 一条查询,零JOIN
return row
代价是丧失了跨客户的关系查询能力(比如"所有subject含某关键词的邮件")。但对这个业务来说,这种查询需求不存在——我们永远是在单个客户维度上操作。
数据库设计要跟着访问模式走,不是跟着范式走。
邮件处理流水线:逐步拆解
每5分钟跑一次,流程:
Step 1:增量拉取
# 通过Microsoft Graph API拉取未读邮件
messages = await graph_client.get(
"/me/mailFolders/inbox/messages",
params={
"$filter": "isRead eq false",
"$orderby": "receivedDateTime desc",
"$top": 50,
"$select": "id,subject,body,from,receivedDateTime,hasAttachments"
}
)
坑: Graph API的分页用@odata.nextLink,必须显式处理。$top默认值是10,很容易漏邮件。时区方面,API返回UTC,老板思维是AEST,要在存储层统一转换。
Step 2:规则预过滤(零LLM成本)
def pre_filter(email):
if email.headers.get("X-Auto-Reply"):
return "skip"
if email.from_domain in KNOWN_SPAM_DOMAINS:
return "skip"
if "out of office" in email.subject.lower():
return "skip"
return "needs_llm"
Step 3:LLM分类
通过规则层的交给Tier 1小模型分类:
- A类:需要上门量尺(有地址,定制产品)
- B类:可以远程报价(有尺寸,标准产品)
- R类:已有客户的回复(不是新询盘)
- C类:不是询盘(营销、供应商、内部邮件)
坑:R类 vs A类的边界。 老客户回复旧邮件但说的是全新项目 → 被错误分为R → 合并到旧记录 → 差点漏单。修复:二次检查,如果回复内容包含新地址或新产品类型,即使是回复也当A类新询盘处理。
Step 4:字段提取
Tier 2中档模型解析:姓名、邮箱、电话、地址、suburb、产品类型、尺寸、紧急度、语言。
坑:澳洲电话号码。 手机04xx、座机(02) xxxx、国际+61 4xx,同一号码三种写法。统一转E.164格式存储,否则去重会把同一客户当不同人。
Step 5:去重与合并
existing = await db.search_by_email(sender_email)
if existing:
# 追加到已有记录的emails数组
existing.context_data["emails"].append(new_email_data)
await db.update_inquiry(existing.id, context_data=existing.context_data)
else:
await db.create_inquiry(extracted_fields)
Step 6:剥离回复链
收到的邮件经常包含完整回复历史。如果全部传给LLM,就是在为已经存在于context_data.emails[]里的重复内容付费。
REPLY_PATTERNS = [
r"^>.*$", # Gmail风格
r"^From:.*\nSent:.*\nTo:.*", # Outlook风格
r"^-+\s*Original Message\s*-+", # 通用风格
r"^On .+ wrote:$", # Apple Mail风格
]
没有通用标准,不同邮件客户端的引用格式都不一样。我的多模式stripper覆盖了主要情况但边缘case还是会漏。在这个场景下good enough > perfect。
从Git历史看到的真实教训
接下来的内容不是事后总结出来的"最佳实践"。是97个commit里每一次翻车后的修复记录。
教训1:Agent会说谎——"邮件已发送"幻觉
commit bed80c7:严禁谎报发送状态
Agent起草了一封邮件,工具返回了草稿内容。然后Agent在回复老板时说"邮件已发送给客户"。
实际上根本没发。Agent把"起草"和"发送"搞混了。
这不是偶发问题。在B2B场景下,老板以为邮件发了就不会再跟进了。一个五万澳元的订单可能因为这个"幻觉"丢掉。
修复方法不是靠prompt说"不要说谎"。而是从工具层面强制:
# draft_email 工具的返回值
return "✅ DRAFT_PENDING_APPROVAL — 邮件已起草,尚未发送。老板回复「发送」确认。"
这段文字不是给人看的,是给Agent看的——它在下一轮推理时会读到这个工具返回值,就不会误判状态。
同时send_email只在匹配到正确的draft_id时才执行:
async def send_email(draft_id: str):
# 只有存在且状态为PENDING的草稿才能发送
draft = await get_draft(draft_id)
if not draft or draft.status != "PENDING_APPROVAL":
return "❌ 没有找到待发送的草稿"
await outlook.send(draft.message_id)
draft.status = "SENT"
return f"✅ 邮件已发送给 {draft.to_email}"
从代码层面堵死了"没审批就发送"的可能。
教训2:模糊确认导致意外发送
commit 7399e7b:收紧发送触发词
老板说"好"、"可以"、"ok"——在中文语境里这可能只是"我知道了"的意思,不是"确认发送"。但Agent把它当成了发送指令。
修复:发送触发词收紧到只有 "发送"、"发"、"send"、"确认发送"。"好"、"可以"、"ok"、"yes"全部移除。
SEND_TRIGGERS = {"发送", "发", "send", "确认发送"}
# 不再包含:"好", "可以", "ok", "yes", "行"
同时加了sent-items去重检查:如果14天内已经给这个客户发过邮件,draft_email会主动提醒。
教训3:Agent"失忆"——不记得在讨论哪个客户
commit 7195bf8:注入当前客户上下文
老板先问了"Billy的情况怎么样",Agent查了Billy的资料回复了。然后老板说"帮他回封邮件"。Agent懵了——"他是谁?"
原因是上下文构建的时机问题。每轮对话重新构建context时,没有把"当前正在讨论的客户"带进去。
修复:
def build_context(session, new_message):
context = []
# 注入当前讨论的客户
if session.current_customer:
context.append({
"role": "system",
"content": f"[当前讨论客户: {session.current_customer.name}]"
})
# 注入记忆
context.extend(format_memories(session.memories))
# 会话历史
context.extend(compress_session(session.messages))
# 新消息
context.append({"role": "user", "content": new_message})
return context
跨消息边界保持客户指代。
教训4:Notion是多余的——去掉一整层
commit 486fc1a:移除Notion依赖
v1同时用了Discord + OneDrive + Supabase + Notion。Notion作为CRM仪表盘。
问题是:老板根本不看Notion。他所有的交互都在Discord/Telegram上。Notion变成了一个只有我在维护、没人在看的数据同步目标。每次pipeline状态变更都要同步一遍Notion,同步逻辑又因为Notion API的限制写得很复杂。
第三天就把整个Notion层删了。数据库只留Supabase,界面只留消息平台。
教训:不要因为"看起来专业"就加一个组件。用户不用的东西,维护成本是纯负债。
教训5:Context管理是一场持续的战斗
跟context相关的commit有十几个,贯穿整个项目。几个关键的:
286e30c:P0+P1 context优化
- 发现context里有重复的boss message(build_context时机错误)
- customer search查询返回了完整的context_data JSONB(几KB),但只需要name和email
- 会话历史没有时间限制,一个月前的对话还在context里
修复后估算每次调用省了约200 token + 数据库传输量减少80%。
889c09c:增大context保留限制
- 之前为了省token把context压得太狠,Agent回复变得"无脑"——没有足够上下文做出好判断
- 最终找到平衡点:最近20条完整保留 + 更早的压缩摘要
d433fed:保留最后一轮Agent完整回复
- 老板经常发很短的跟进消息:"然后呢"、"报价多少"
- 如果Agent自己上一轮的回复被截断了,它就接不上这种短消息
这不是一次性解决的问题。context管理是每加一个功能都要重新平衡的事情。
教训6:batch操作不能交给Agent
commit cfe0e41:拆分batch_draft_emails
老板要给92个联系人群发邮件。我最初把这个功能做成了一个Agent工具——Agent读CSV、查去重、生成邮件、批量发送。
结果Agent在处理92行CSV时,因为context太长,开始编造邮箱地址。不是格式错误,是凭空生成了看起来合理但完全不存在的邮箱。
修复:把batch操作拆成确定性的Python工作流:
# workflows/batch_email.py
async def batch_email_workflow(csv_path, template):
# Step 1: 解析CSV — 纯Python,零LLM
contacts = parse_csv(csv_path)
# Step 2: 查已发邮件 — 纯API调用,零LLM
sent = await get_sent_emails_last_n_days(14)
sent_addresses = {e.to_email for e in sent}
# Step 3: 过滤
to_send = [c for c in contacts if c.email not in sent_addresses]
# Step 4: 生成邮件 — 只有这一步用LLM,且每封独立调用
for contact in to_send:
draft = await llm_call(MODEL_TIER_2,
compose_prompt(template, contact))
await save_draft(contact, draft)
return f"已生成 {len(to_send)} 封草稿,待审批"
Agent负责启动工作流,但数据流水线是确定性的Python代码,不经过LLM的context window。
核心原则:LLM做决策,Python做数据搬运。数据从A到B不需要"创造力"的环节,绝对不能让LLM碰。
教训7:消息格式不能依赖prompt
commit e1917d1:三层Telegram格式化
一开始我在system prompt里写"请用Telegram HTML格式回复"。有时候Agent输出的是Markdown,有时候是HTML,有时候是混合体。Telegram的HTML解析器非常严格,一个未关闭的<b>标签就会导致整条消息发送失败。
最终方案是三层降级,完全不依赖prompt:
def _md_to_tg_html(text: str) -> str:
"""Markdown → Telegram HTML,代码层完成,不依赖LLM"""
# 1. 保护代码块(不被后续转义影响)
code_blocks = []
text = re.sub(r'```(\w*)\n(.*?)```',
lambda m: _save_code_block(m, code_blocks), text, flags=re.DOTALL)
# 2. 转义HTML特殊字符
text = text.replace('&', '&').replace('<', '<').replace('>', '>')
# 3. 转换Markdown语法
text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)
text = re.sub(r'\*(.+?)\*', r'<i>\1</i>', text)
text = re.sub(r'`(.+?)`', r'<code>\1</code>', text)
# 4. 表格转移动端友好格式
text = convert_tables_to_dots(text) # "cell · cell" 格式
# 5. 还原代码块
text = restore_code_blocks(text, code_blocks)
return text
async def send_text(self, text: str):
html = _md_to_tg_html(text)
try:
# Layer 2: 尝试HTML发送
await self.bot.send_message(self.chat_id, html, parse_mode="HTML")
except TelegramError:
# Layer 3: HTML失败 → 纯文本降级
plain = strip_all_tags(html)
await self.bot.send_message(self.chat_id, plain)
教训:永远不要依赖LLM的输出格式。让LLM自由输出,格式转换在代码层做。
教训8:Discord的并发Bug连环坑
这是Git历史里最密集的一段fix(commits 2b8011c → eb0bf9c → 4f03d6f),三天内连修三个bug,全都是Discord bot的并发问题:
- 加了一个锁防止并发处理 → 结果锁把所有@mention都阻塞了
- 改成
_should_respond标记 → 标记逻辑有race condition,所有消息都被标记为"不处理" - 改成
_imlang_handled属性 → discord.py的Message对象不支持自定义属性,直接crash
最终修复:删掉所有这些"聪明"的并发控制,用最简单的方式——每条消息独立处理,通过asyncio.Semaphore控制并发数量。
_processing_semaphore = asyncio.Semaphore(3)
async def on_message(message):
if not should_respond(message):
return
async with _processing_semaphore:
await process_message(message)
教训:在并发场景下,"简单粗暴"往往比"精巧设计"更可靠。
Agent工具全览
20+工具分四类:
查询类: search_customer(模糊搜索)、query_stats(今日统计)、get_pending(待处理)、scan_mailbox(全量邮件线程分析)、search_emails(条件搜索)
文件类: list_customer_files(OneDrive列表)、search_email_attachments(查邮件附件)、download_and_show_attachments(下载转预览图)、read_onedrive_file(读Excel/PDF/文本)、create_customer_folder(创建客户文件夹)
流水线类: advance_stage(推进状态)、audit_pipeline(审计一致性)
邮件类: draft_email(起草双语邮件)、send_email(发送已确认草稿)、forward_to_factory(打包转发工厂)、batch_draft_emails(批量起草)、create_calendar_event(创建量尺日历事件)
工具定义用OpenAI function-calling格式:
{
"name": "search_customer",
"description": "Fuzzy search customers by name, email, or phone",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Customer name, email, or phone to search"
}
},
"required": ["query"]
}
}
工具数量从v1的0个(硬编码动作)到现在的20+个,是逐步积累的。每个工具的加入都是因为老板提出了一个现有工具无法满足的需求。
草稿审批机制
这是整个系统最关键的安全设计,值得单独拿出来讲:
1. 老板:"帮我给Billy回邮件,报价15000"
2. Agent → search_customer("Billy") → 找到客户
3. Agent → draft_email(to=billy@..., body=...) → 生成双语预览
4. Agent向老板展示:
┌──────────────────────────────┐
│ 📧 邮件草稿 │
│ │
│ [中文翻译] 供老板确认内容 │
│ [英文正文] 实际发给客户的版本 │
│ │
│ ⚠️ 未发送 — 回复「发送」确认 │
└──────────────────────────────┘
5. 状态 → DRAFT_PENDING_APPROVAL
6. 老板说"发送" → Agent → send_email(draft_id=xxx)
老板说"改一下" → 回到步骤3
永远不自动发送。 在B2B制造业,一封错误的邮件可能丢掉一个$50,000的订单。
记忆系统
每次对话后异步运行MemoryWatcher(不阻塞回复),提取三类记忆:
# memory_watcher.py
async def extract_and_store_memories(conversation, customer_id=None):
memories = await llm_call(
MODEL_TIER_1, # 用最便宜的模型提取
f"从以下对话中提取值得长期记住的信息:\n{conversation}"
)
for memory in memories:
# 语义去重:检查是否已有类似记忆
if await is_semantic_duplicate(memory):
continue
if memory.type == "boss_pref":
# "报价都要含GST" → 全局记忆
await store_global_memory(memory)
elif memory.type == "customer_note":
# "Billy预算有限" → 客户专属记忆
await store_customer_memory(customer_id, memory)
elif memory.type == "business_rule":
# "发墨尔本+$200运费" → 全局记忆
await store_global_memory(memory)
记忆注入到下次会话的system prompt。Agent随使用积累业务知识。
语义去重用小模型检查——"Billy预算有限"和"Billy的budget比较紧"是同一条,存储前做相似度检查。不完美但防止记忆库膨胀。
这个功能我第三周才加。前两周老板重复说了20多次同样的偏好,每次Agent都像第一天上班一样什么都不知道。如果重来,记忆系统第一天就做。
会话管理:渐进式压缩
# 会话历史结构
session = {
"messages": deque(maxlen=40), # 最近40条消息
"summary": "", # 旧消息的摘要
"customer_context": {} # 当前客户上下文
}
def compress_session(messages):
msgs = list(messages)
# 最旧的10条 → 摘要(~200 token)
old = msgs[:10]
summary = await summarize(old)
# 中间20条 → 保留原文但截断工具返回值
middle = msgs[10:30]
for msg in middle:
if msg["role"] == "tool":
msg["content"] = truncate(msg["content"], max_chars=500)
# 最新10条 → 完整保留(包括Agent自己的完整回复)
recent = msgs[30:]
return [{"role": "system", "content": summary}] + middle + recent
坑: 压缩是有损的。重要信息可能被摘要掉。缓解方案:MemoryWatcher在压缩窗口移动前就提取关键事实到持久化memories。这是"不完美但可用"的典型工程折中。
流水线追踪与OneDrive同步
8阶段状态机,全量审计日志:
inquiry → measure → quoting → follow_up → ordered → production → install → done
每个阶段有超时阈值:
| 阶段 | 超时天数 | 超时动作 |
|---|---|---|
| inquiry | 2天 | 提醒老板处理 |
| measure | 5天 | 提醒安排量尺 |
| quoting | 7天 | 提醒跟进报价 |
| follow_up | 14天 | 建议再次联系 |
| ordered | 3天 | 确认工厂收到 |
| production | 21天 | 跟进生产进度 |
| install | 7天 | 安排安装时间 |
每次状态变更写入审计日志:
{
"from": "inquiry",
"to": "measure",
"at": "2024-03-16T09:00:00+11:00",
"reason": "量尺安排在3月18日",
"triggered_by": "agent"
}
OneDrive双向同步
客户文件夹按阶段目录组织:
OneDrive/
├── 未报价顾客/ ← inquiry, measure阶段
│ ├── Billy Smith/
│ └── John Lee/
├── 厂家已报价/ ← quoting阶段
│ └── Sarah Chen/
├── 已报价顾客/ ← follow_up阶段
│ └── Mike Wang/
└── 已确定并付定金顾客/ ← ordered, production, install, done阶段
└── David Liu/
OneDrive Watcher每10分钟检查delta API。
OneDrive delta API的坑: 它能检测"有变更"但不告诉你"变了什么"。文件重命名和文件移动在响应里一模一样。我写了路径对比逻辑:提取父文件夹名,如果从未报价顾客/变成厂家已报价/就是阶段变更,否则是重命名,不触发状态更新。
语音消息转写
老板习惯发语音而不是打字。加了语音转写功能(commit bd8bcdc → 5605df3):
用Whisper模型做语音识别,接入免费的API调用。
处理流程:收到语音消息 → 下载音频 → Whisper转文字 → 当作文本消息处理。老板感知不到差异,发语音和打字效果一样。
中英双语的全局影响
这不是某一个模块的问题,它渗透到每个角落:
- Prompt两套:中文理解指令 + 英文生成邮件
- 错误消息中文:老板看到英文报错会懵
- 邮件预览双语:中文给老板确认 + 英文发给客户
- 商务文化差异:"麻烦您尽快回复" ≠ "Please reply at your earliest convenience"
- 产品术语对照:推拉门 = sliding door,平开窗 = casement window
System prompt里嵌入了产品知识(AS2047标准、产品规格、建筑规范),这样Agent在回复客户时能引用正确的技术参数。
工作量是纯英文/纯中文项目的2-3倍。在排期时直接乘系数。
定时任务编排
# main.py
scheduler = AsyncIOScheduler(timezone="Australia/Sydney")
scheduler.add_job(run_email_check, "interval", minutes=5) # 增量邮件处理
scheduler.add_job(run_onedrive_watch, "interval", minutes=10) # OneDrive变更监控
scheduler.add_job(run_evening_schedule, "cron", hour=20) # 每晚8点简报+路线
# 凌晨0点-早上6点不运行(节省API调用)
# 在任务内部检查:if 0 <= now.hour < 6: return
scheduler.start()
每晚8点AEST自动生成:
- 查询所有待量尺的A类询盘
- 根据地址数据生成优化路线
- 汇编简报:客户名、地址、产品需求、备注
- 推送到老板活跃的消息平台
老板第二天早上看简报直接出发。
部署
| 项目 | 选择 |
|---|---|
| Agent运行时 | Railway(Python worker) |
| 数据库 | Supabase |
| LLM调用 | 统一路由API |
| 语音转写 | Whisper API |
| 网站 | Vercel(Next.js) |
Railway用Procfile部署:worker: python main.py。Nixpacks构建,额外安装了CJK字体支持(因为Agent需要生成中文PDF)。
坑:Railway冷启动。 空闲后首次请求等待10-15秒。定时任务不受影响,但老板发消息时赶上冷启动就要等。解决:心跳保活,worker永不休眠。
技术栈
| 组件 | 技术 |
|---|---|
| 运行时 | Python 3.12, asyncio |
| LLM路由 | 统一路由API(三级模型) |
| 数据库 | Supabase PostgreSQL |
| 邮件 | Microsoft Graph API (Outlook) |
| 文件 | OneDrive API |
| 消息 | Discord.py / python-telegram-bot / Meta WhatsApp Cloud API |
| 语音 | Whisper API |
| 调度 | APScheduler |
| HTTP | httpx(异步,连接池复用) |
| 部署 | Railway |
| 网站 | Next.js, React, Tailwind CSS, Vercel |
如果重来
- 先做Telegram/WhatsApp。用户在哪就先做哪。
- 记忆系统第一天就做。Agent不记事,用户体验断崖式下降。
- 一开始就用JSONB。在关系表上浪费了一周。
- batch操作从第一天就用workflow不用Agent。LLM碰大量数据会编造。
- 格式化在代码层做。永远不要通过prompt控制输出格式。
- context管理是持续工程。不是做一次就完了,每加一个功能都要重新平衡。
- 双语排期×2.5。不只是翻译,是整个交互链路的双轨化。
总结
97个commit,12天(从3月25号v1到4月6号的最新commit)。从一个分类器+路由器的玩具,到一个跑在生产环境里、老板每天在用的系统。
核心体会:AI Agent的难点不在模型调用,在于——
- 理解用户的真实痛点(不是"邮件太多",是"习惯不同+语言不通")
- 处理LLM的不可靠性(幻觉、格式不确定、context膨胀)
- 在"Agent自由度"和"系统安全性"之间找平衡
- 持续迭代(97个commit里,fix比feat多得多)
有问题欢迎交流。
邝奇峰 Parker | QKAi Studio | qkai.com.au
继续阅读
把这篇放回更大的业务判断里
从第一性原理到 Agent Harness
我一开始并不知道 LangGraph 或 agent harness 这些概念。我只是从一个真实业务系统出发,自己解决调度、工具编排、消息抽象和外部 API 接入,最后回头看,才发现我已经搭出了一套 agent harness。
机器人的飞轮,其实从机器人出现之前就开始了
真正的机器人飞轮,往往先从流程重构、数据结构、AI Agent 和运营纪律开始运转——硬件本体只是最后一步。
在澳洲,为什么高人工成本正在让 AI 自动化变成企业经营优先级
澳洲高人工成本正推动越来越多中小企业重新评估行政自动化、运营自动化、线索筛选、报价自动化与 AI Agent 工作流,在继续加人之前先做流程重构。