🧪 07 - 高级主题
本章收纳多轮对话、定时任务、URL 解析、闲聊等进阶用法。
💬 多轮对话
多轮对话允许机器人与用户进行持续的交互,记住对话状态。
工作原理
1. 用户发送命令(如 /猜数字)
│
▼
2. 插件调用 context.create_session() 创建会话
│
▼
3. 会话存储在 SessionManager 中
│
▼
4. 用户后续消息被路由到 handle_session()
│
├─ 继续对话 → 更新会话数据
│
└─ 结束对话 → context.end_session()完整示例:猜数字游戏
plugin.json:
{
"name": "guess_number",
"version": "1.0.0",
"entry": "main.py",
"commands": [{
"name": "guess",
"triggers": ["猜数字", "guess"],
"help": "猜数字游戏"
}]
}main.py:
import random
from typing import Any, Dict, List
from core.plugin_base import segments
# 游戏配置
MIN_NUM = 1
MAX_NUM = 100
MAX_ATTEMPTS = 7
TIMEOUT = 180.0 # 3 分钟
async def handle(command: str, args: str, event: Dict, context) -> List:
"""处理初始命令,创建会话"""
# 检查是否已有游戏
existing = await context.get_session()
if existing:
return segments(
f"你已经有一个游戏在进行中!\n"
f"当前范围: {existing.get('min')}-{existing.get('max')}\n"
f"剩余次数: {existing.get('remaining')}\n"
f"发送数字继续猜测,或发送「退出」放弃"
)
# 生成目标数字
target = random.randint(MIN_NUM, MAX_NUM)
# 创建会话
await context.create_session(
initial_data={
"target": target,
"min": MIN_NUM,
"max": MAX_NUM,
"attempts": 0,
"remaining": MAX_ATTEMPTS,
"history": []
},
timeout=TIMEOUT
)
context.logger.info(f"Game started: target={target}")
return segments(
f"🎮 猜数字游戏开始!\n"
f"我想了一个 {MIN_NUM} 到 {MAX_NUM} 之间的数字\n"
f"你有 {MAX_ATTEMPTS} 次机会\n"
f"请发送一个数字开始猜测!"
)
async def handle_session(text: str, event: Dict, context, session) -> List:
"""处理会话中的消息"""
# 获取会话数据
target = session.get("target")
remaining = session.get("remaining")
history = session.get("history", [])
# 解析输入
try:
guess = int(text.strip())
except ValueError:
return segments("请输入一个数字!")
# 验证范围
min_num = session.get("min")
max_num = session.get("max")
if guess < min_num or guess > max_num:
return segments(f"请输入 {min_num} 到 {max_num} 之间的数字!")
# 更新尝试
remaining -= 1
history.append(guess)
session.set("remaining", remaining)
session.set("history", history)
# 判断结果
if guess == target:
# 猜对了
await context.end_session()
return segments(
f"🎉 恭喜你猜对了!答案是 {target}\n"
f"用了 {len(history)} 次猜测\n"
f"历史: {' → '.join(map(str, history))}"
)
if remaining <= 0:
# 次数用尽
await context.end_session()
return segments(
f"💔 游戏结束!答案是 {target}\n"
f"你的猜测: {' → '.join(map(str, history))}"
)
# 给出提示
if guess < target:
hint = "太小了!"
session.set("min", max(min_num, guess + 1))
else:
hint = "太大了!"
session.set("max", min(max_num, guess - 1))
return segments(f"{hint} 剩余 {remaining} 次机会")退出会话
用户发送退出关键字时,Dispatcher 会自动结束会话:
# dispatcher.py 中的处理
exit_commands = {"退出", "取消", "exit", "quit", "q"}
if text.strip().lower() in exit_commands:
await self.session_manager.delete(user_id, group_id)
return [{"type": "text", "data": {"text": "已退出当前对话"}}]Session API
# 创建会话
session = await context.create_session(
initial_data={"key": "value"},
timeout=300.0
)
# 获取会话
session = await context.get_session()
# 读写数据
value = session.get("key", default)
session.set("key", value)
session.clear()
# 检查状态
if session.is_expired():
...
# 结束会话
await context.end_session()后台任务队列
多轮会话不是所有“持续工作”的唯一方案。对于 Codex 这类可能执行几分钟甚至更久的任务,更合适的模式是插件自己维护业务会话和队列,然后在完成时主动发送结果。
plugins/codex 的设计可以作为参考:
/codex create <label> [cwd:<path>]创建一个业务会话标签,不创建框架 Session。/codex <label> <任务>将任务加入标签队列,handler 立即返回“已收到”。- 同一标签内任务串行运行,避免同一 Codex thread 被并发 resume;不同标签之间按
max_parallel_jobs并行。 - 任务完成、失败、超时或取消后,插件用
context.send_action()主动回发[codex:<label> #<job_id>]消息;如果任务生成图片,会随文字一起发送 QQ image 段。 - 会话索引保存在
plugins/codex/data/sessions.json,对话记录、图片副本和任务 artifacts 保存在plugins/codex/data/session/<label>/,删除归档保存在plugins/codex/data/deleted_sessions/,重启后可以继续知道 label、cwd 和 Codex thread id。 - 业务插件可以把自己的长任务作为侧路投递给 Codex,例如
arxiv_filter在发送论文列表后,把所有 positive arXiv 链接交给固定astro-ph会话;摘要成功、失败或历史重发都不影响论文列表消息。
这种模式不会占用框架活跃会话,因此用户可以继续使用其他命令或闲聊。
⏰ 定时任务
使用 APScheduler 执行定时任务。
配置方式
在 plugin.json 中声明:
{
"name": "daily",
"entry": "main.py",
"schedule": [
{
"id": "morning_greeting",
"handler": "send_morning",
"cron": {"hour": 8, "minute": 0},
"group_ids": [123456, 789012]
},
{
"id": "weekly_report",
"handler": "send_weekly",
"cron": {"day_of_week": "mon", "hour": 9, "minute": 0}
}
]
}处理函数
async def send_morning(context) -> List[Dict]:
"""每天 8:00 发送"""
return segments("☀️ 早上好!新的一天开始了")
async def send_weekly(context) -> List[Dict]:
"""每周一 9:00 发送"""
# 生成报告
report = await generate_report()
return segments(f"📊 周报\n{report}")Cron 表达式
支持 APScheduler 的所有 cron 字段:
| 字段 | 说明 | 示例 |
|---|---|---|
year | 年 | 2026 |
month | 月 | 1-12 或 jan-dec |
day | 日 | 1-31 |
week | 周数 | 1-53 |
day_of_week | 星期 | 0-6 或 mon-sun |
hour | 时 | 0-23 |
minute | 分 | 0-59 |
second | 秒 | 0-59 |
示例:
// 每天 8:00
{"hour": 8, "minute": 0}
// 每 2 小时整点
{"hour": "*/2", "minute": 0}
// 工作日 9:00
{"day_of_week": "mon-fri", "hour": 9, "minute": 0}
// 每月 1 日 0:00
{"day": 1, "hour": 0, "minute": 0}
// 每分钟(调试用)
{"minute": "*"}
// 每 30 分钟
{"minute": "*/30"}指定发送目标
{
"schedule": [{
"id": "task1",
"handler": "func",
"cron": {...},
"group_ids": [123456] // 指定群
}]
}如果不指定 group_ids,使用 config.json 中的 default_group_ids。
动态定时任务
动态任务需要在 init() 钩子中通过 app 实例操作调度器(context 不直接暴露 scheduler):
_app = None
async def init(context):
# 保存 app 引用(通过 context.reload_plugins 的闭包可获取)
# 推荐在 init 时注册静态任务,动态任务场景较少
pass
# 更推荐的方式:在 plugin.json schedule 字段声明所有定时任务如确需动态注册,可将 app 引用通过 init(context) 外部传入(框架扩展场景)。
URL 自动解析
当消息包含 URL 时,自动调用 URL 解析插件。
实现方式
在插件中实现 handle_url() 函数:
async def handle_url(url: str, event: Dict, context) -> List[Dict]:
"""
自动解析 URL
Args:
url: 提取到的 URL
event: 原始事件
context: 插件上下文
Returns:
消息段列表,或 None/[] 表示不处理
"""
context.logger.info(f"Parsing URL: {url}")
# 只处理特定域名
if "bilibili.com" not in url:
return None
try:
# 获取视频信息
async with context.http_session.get(url) as resp:
html = await resp.text()
title = extract_title(html)
return segments(f"🎬 B站视频: {title}")
except Exception as e:
context.logger.warning(f"URL parsing failed: {e}")
return None触发条件
- 消息包含
http://或https://开头的 URL - 消息不以命令前缀开头
默认 URL 解析插件
框架查找名为 url_parser 的插件:
url_plugin = self.app.plugin_manager.get("url_parser")
if url_plugin and hasattr(url_plugin.module, "handle_url"):
result = await url_plugin.module.handle_url(url, event, context)闲聊功能
当消息不是命令时,可以进行闲聊回复。
实现方式
实现 handle_smalltalk() 函数:
async def handle_smalltalk(text: str, event: Dict, context) -> List[Dict]:
"""
处理闲聊消息
Args:
text: 用户消息文本
event: 原始事件
context: 插件上下文
Returns:
消息段列表,或 None 表示不处理
"""
# 关键词匹配
if "天气" in text:
return segments("今天天气不错呢~")
if "你好" in text:
return segments("你好呀!有什么可以帮你的?")
# 调用 AI 模型
response = await call_ai_model(text)
if response:
return segments(response)
# 不处理
return None触发条件
- 私聊:命令未匹配、没有活跃会话消费时进入 smalltalk 回落
- 群聊:消息带命令前缀、包含 bot_name、@ 机器人,或
require_bot_name_in_group=false时进入处理流程 - 群聊静音:只跳过 smalltalk 回落,不影响 URL-only、命令、只喊名字和活跃会话
配置闲聊提供者
在 config.json 中:
{
"plugins": {
"smalltalk_provider": "xiaoqing_chat"
}
}框架会优先调用指定插件的 handle_smalltalk()。
只叫机器人名字
当用户只发送机器人名字(如"小青")时,调用 call_bot_name_only():
def call_bot_name_only(context) -> List[Dict]:
"""只叫机器人名字时的响应"""
responses = ["叫我干嘛?", "嗯?", "在的~"]
return segments(random.choice(responses))Dispatcher 消息分发顺序
Dispatcher 使用固定线性流程处理消息。插件通过约定函数接入,无需注册自定义处理链。
线性流程
消息到达 Dispatcher
↓
解析 MessageContext
↓
URL-only 短路
↓
处理门控(私聊 / require_bot_name_in_group=false / has_prefix / 活跃会话)
↓
只喊机器人名字或只 @ 机器人
↓
命令匹配并调用 handle()
↓
未知命令提示(仅严格命令前缀)
↓
活跃会话并调用 handle_session()
↓
smalltalk 回落并调用 handle_smalltalk()短路规则
某个步骤返回结果后,后续步骤不会继续执行。
示例:用户发送 /help
1. URL-only:不是完整单 URL,跳过
2. 处理门控:has_prefix=True,放行
3. 只喊名字:否
4. 命令匹配:匹配 /help
5. 调用 help.handle() 并返回帮助信息
6. 直接返回,不进入会话或 smalltalk 回落扩展边界
插件应优先使用现有接入点:
handle():命令处理handle_session():多轮会话handle_smalltalk():smalltalk 回落call_bot_name_only():只喊机器人名字或只 @ 机器人observe_message():观察消息、维护上下文,不直接决定 dispatcher 是否回复
如果需要新增全局消息处理能力,应在 core/dispatcher.py 的线性流程中显式设计步骤和优先级,并同步更新架构文档与回归测试。
调试
启用 DEBUG 日志可以查看 dispatcher 的关键分发决策:
# config.json
{
"log_level": "DEBUG"
}日志会包含收到的消息、命令匹配、URL 处理、会话处理、群聊静音跳过 smalltalk 等信息。
xiaoqing_chat 智能对话
xiaoqing_chat 提供基于 LLM 的智能对话能力。
核心特性
1. 长期记忆 + 人物资料
xiaoqing_chat 会同时维护会话记忆、人物资料和摘要记忆。当前回复阶段会把最近对话、相关长期记忆、当前说话人的已知事实一起送入 prompt。
工作原理:
用户发送消息
↓
写入会话历史
↓
提取人物事实 / 主题摘要
↓
查询相关长期记忆
↓
构建回复 prompt(记忆 + 人设 + 表达习惯 + 当前目标)
↓
调用 LLM 生成回复并写回会话相关配置主要在 plugins/xiaoqing_chat/config/xiaoqing_config.json 的 memory / summarizer / goal 段。
2. 拟人化表达系统
拟人化由多层机制共同实现,单句 system prompt 只承担一部分约束。
personality.identity和随机states- 从对话里学到的表达方式
- 黑话/梗解释
- 人设提示、pre-heuristic 和 reply checker,把回复压回“短、口语、像真人”
- PFC / goal / brain chat,让它知道这轮为什么要说话;私聊深度对话时还能切到更高 think level
3. 图片与表情包进入正常对话流
启用媒体能力后,用户发来的图片、NapCat mface、QQ 原生 face 会作为有效消息进入解析。
工作原理:
用户发送图片 / 表情包
↓
解析 OneBot 消息段
↓
必要时下载或回收真实图片
↓
视觉模型分析成 [图片:...] / [表情包:...] / [QQ表情:...]
↓
按原始 segment 顺序拼回有效用户输入
↓
进入和纯文本相同的频率控制、记忆、回复检查链路识别成表情包的图片会进入 plugins/xiaoqing_chat/data/media/library/,后续可作为本地表情包回复素材。
回复阶段由主 LLM 直接决定是否带出站媒体。模型可以在自然文本里附一个 [想发表情:hint]、[想发QQ表情:hint] 或 [想发图片:hint] marker。插件会剥离这个控制 marker,再按 hint 到本地表情包库、图片目录或 QQ face 目录里解析成实际发送段。旧图库里缺失的元数据会在后台补修,不会卡住当前回复。
4. 深度对话模式与 think level
xiaoqing_chat 的普通回复和私聊深度对话使用不同思考强度。
- 普通模式下,
planner.think_mode = "dynamic"会按近期上下文长度自动映射到不同 think level - 私聊启用
brain_chat后,会优先使用brain_think_level - 如果
private_planner_always_on = true,深度对话模式下即使普通 planner 关闭,也会保持 planner 链开启
因此普通短对话会更轻快,私聊深聊和长上下文场景会更容易得到更深入的回复。
智能回复控制
xiaoqing_chat 内部实现智能回复频率控制,优于简单的 random_reply_rate。
控制策略:
- attention gate:先判断这条消息是不是冲小青来的。
/xc、私聊、被@、直接叫名字、只喊名字后的追问、reply 引用小青、以及近期上下文锚定小青的“她/ta”共指召唤,都会跳过普通概率门。 - 硬频控:普通群聊参与前先检查最小回复间隔、每分钟上限、连续回复冷却。
- 活跃话题:近期目标仍活跃时可使用更短的
active_topic_min_reply_interval,并提高普通参与概率。 - soft gate:普通群聊插话概率
reply_probability_base叠加 heartflow 和连续未回复补偿。
配置边界:reply_probability_private、heartflow.threshold、heartflow.enable_random_gate、heartflow.weight_mentioned、heartflow.weight_private、heartflow.weight_rate_limit、heartflow.weight_cooldown、heartflow.weight_interval 不属于当前回复主路径。私聊、点名和共指由 attention gate 处理;速率限制由硬频控处理。
示例:
async def handle_smalltalk(text: str, event: Dict, context) -> List:
"""示意:xiaoqing_chat 自己决定是否回复"""
runtime = load_xiaoqing_runtime(context)
state = get_state()
chat_id = resolve_chat_id(event)
attention = await decide_attention(text, event, state, chat_id)
if not attention.forced and not await should_reply(runtime, state, chat_id, text):
await state.heartflow.on_no_reply_async(chat_id=chat_id)
return []
draft = await generate_reply_draft(text, event, context)
await save_reply_to_memory(chat_id, draft)
return draft.parts扩展 xiaoqing_chat
扩展 xiaoqing_chat 时,可以在现有链路上增加后处理或外部数据源。
添加自定义后处理
async def handle_smalltalk(text: str, event: Dict, context) -> List:
"""扩展现有 xiaoqing_chat"""
# 调用原始 xiaoqing_chat
raw_response = await call_original_xiaoqing_chat(text, context)
# 自定义后处理
processed = custom_post_process(raw_response, context)
return segments(processed)
def custom_post_process(text: str, context) -> str:
"""自定义后处理"""
# 添加时间戳
from datetime import datetime
return f"[{datetime.now().strftime('%H:%M')}] {text}"集成其他数据源
async def handle_smalltalk(text: str, event: Dict, context) -> List:
"""集成外部数据源"""
# 检查是否需要查询外部数据
if "天气" in text:
# 调用天气 API
weather = await fetch_weather(text, context)
# 将天气信息作为上下文
response = await generate_llm_response(
text=text,
context=f"当前天气:{weather}",
history=get_history(context)
)
return segments(response)
# 正常对话
return await call_xiaoqing_chat(text, context)NOTE
扩展图片或表情相关行为时,优先阅读 plugins/xiaoqing_chat/media/event_media.py、emoji_library.py、marker_resolver.py。当前设计是主回复模型输出至多一个出站媒体 marker,解析与落盘路径由 marker_resolver.py 统一处理。
静音控制
管理员可以让机器人在群里静音。
使用命令
/闭嘴 30 # 静音 30 分钟
/闭嘴 1h # 静音 1 小时
/说话 # 解除静音静音期间的行为
- ✅ 仍然响应命令
- ❌ 不进行随机回复
- ❌ 不进行闲聊
API
# 静音群
context.mute_group(group_id, duration_minutes)
# 解除静音
context.unmute_group(group_id)
# 检查是否静音
is_muted = context.is_group_muted(group_id)
# 获取剩余时间
remaining = context.get_mute_remaining(group_id) # 分钟插件间调用
获取其他插件
async def handle(command: str, args: str, event: Dict, context) -> List:
# 获取插件管理器
pm = context.app.plugin_manager
# 获取另一个插件
other_plugin = pm.get("other_plugin")
if other_plugin:
# 调用其函数
if hasattr(other_plugin.module, "some_function"):
result = await other_plugin.module.some_function(args)共享数据
通过文件或数据库共享数据:
# 插件 A 写入
write_json(context.data_dir.parent / "shared" / "data.json", data)
# 插件 B 读取
data = load_json(Path("plugins/shared/data.json"))错误处理实践
1. 捕获特定异常
async def handle(command: str, args: str, event: Dict, context) -> List:
try:
result = await fetch_data(args)
return segments(result)
except ValueError as e:
return segments(f"参数错误: {e}")
except aiohttp.ClientError as e:
context.logger.warning(f"网络错误: {e}")
return segments("网络请求失败,请稍后重试")
except Exception as e:
context.logger.error(f"未知错误: {e}", exc_info=True)
return segments("发生未知错误")2. 超时处理
import asyncio
async def handle(command: str, args: str, event: Dict, context) -> List:
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=10.0
)
return segments(result)
except asyncio.TimeoutError:
return segments("操作超时,请稍后重试")3. 降级处理
async def handle(command: str, args: str, event: Dict, context) -> List:
# 尝试主 API
try:
return segments(await primary_api())
except Exception:
context.logger.warning("主 API 失败")
# 降级到备用 API
try:
return segments(await backup_api())
except Exception:
context.logger.error("备用 API 也失败")
# 最终降级
return segments("服务暂时不可用")性能优化
1. 使用缓存
from functools import lru_cache
import time
# 内存缓存(简单场景)
_cache = {}
_cache_time = {}
CACHE_TTL = 300 # 5 分钟
async def get_data_cached(key: str):
now = time.time()
if key in _cache and now - _cache_time[key] < CACHE_TTL:
return _cache[key]
data = await fetch_data(key)
_cache[key] = data
_cache_time[key] = now
return data2. 并发请求
import asyncio
async def handle(command: str, args: str, event: Dict, context) -> List:
# 并发获取多个数据
results = await asyncio.gather(
fetch_data_1(),
fetch_data_2(),
fetch_data_3(),
return_exceptions=True
)
# 处理结果
...3. 避免阻塞
from core.plugin_base import run_sync
# 阻塞操作放到线程池
result = await run_sync(blocking_function, arg1, arg2)调试技巧
1. 启用 DEBUG 日志
{"log_level": "DEBUG"}2. 在插件中打印详细信息
async def handle(command: str, args: str, event: Dict, context) -> List:
context.logger.debug(f"收到事件: {event}")
context.logger.debug(f"命令: {command}, 参数: {args}")
# 处理逻辑
result = ...
context.logger.debug(f"返回结果: {result}")
return result3. 使用 test.ipynb
import asyncio
from core.app import XiaoQingApp
from pathlib import Path
async def test():
app = XiaoQingApp(Path("path/to/XiaoQing"))
await app.start()
# 模拟事件
event = {
"post_type": "message",
"message_type": "private",
"user_id": 123456,
"message": [{"type": "text", "data": {"text": "/echo test"}}]
}
result = await app.dispatcher.handle_event(event)
print(result)
await app.stop()
asyncio.run(test())部署建议
1. 使用 systemd(Linux)
创建 /etc/systemd/system/xiaoqing.service:
[Unit]
Description=XiaoQing Service
After=network.target
[Service]
Type=simple
User=xiaoqing
WorkingDirectory=/opt/xiaoqing
ExecStart=/usr/bin/python3 main.py
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.targetsudo systemctl enable xiaoqing
sudo systemctl start xiaoqing2. 使用 Docker
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]docker build -t xiaoqing .
docker run -d --name xiaoqing -v ./config:/app/config xiaoqing3. 使用 PM2(Node.js 进程管理器)
pm2 start main.py --interpreter python3 --name xiaoqing
pm2 save
pm2 startup结语
完成 00-09 文档后,读者应能掌握 XiaoQing 的运行方式、配置路径、插件开发入口和常见排障流程。
快速回顾:
- 00-overview.md - 项目概览
- 01-getting-started.md - 快速开始
- 02-architecture.md - 系统架构
- 03-plugin-development.md - 插件开发
- 04-core-modules.md - 核心模块
- 05-api-reference.md - API 参考
- 06-configuration.md - 配置详解
- 08-message-flow.md - 消息处理与并发控制
- 09-plugins.md - 内置插件说明
后续问题可通过 Issue 跟踪。