Skip to content

📚 08 - 消息倄理流皋 ​

本文档诊细描述 XiaoQing 的消息倄理逻蟑包括消息接收、路由、呜什解析、䌚话管理等完敎流皋。

TIP

想快速定䜍问题跳蜬 9⃣ 完敎倄理流皋囟 查看端到端瀺意囟再按需查阅对应章节。


📑 目圕 ​

  1. 消息倄理总览
  2. 消息接收䞎解析
  3. 觊发条件刀断
  4. 呜什路由䞎参数拆分
  5. 䌚话管理
  6. 闲聊倄理
  7. 静音机制
  8. 并发控制䞎消息队列
  9. 完敎倄理流皋囟

1⃣ 消息倄理总览 ​

XiaoQing 的消息倄理由以䞋栞心暡块协䜜完成

暡块文件职莣
Dispatchercore/dispatcher.py消息分发噚协调敎䞪倄理流皋
CommandRoutercore/router.py呜什路由匹配觊发词并拆分参数
SessionManagercore/session.py䌚话管理支持倚蜮对话
messagecore/message.py消息解析工具凜数

1.1 倄理流皋抂述 ​

OneBot 事件
    ↓
Dispatcher.handle_event()
    ↓
┌─────────────────────────────────────────────────────────────┐
│  1. 事件类型检查仅倄理 message 类型                      │
│  2. 消息解析提取文本、user_id、group_id                  │
│  3. URL 检测党局监听可选                               │
│  4. 决策刀断是吊需芁倄理                                 │
│  5. Handler 铟匏倄理按䌘先级䟝次尝试                     │
│     - BotNameHandler  → 仅机噚人名字                        │
│     - CommandHandler  → 呜什匹配                            │
│     - SessionHandler  → 掻跃䌚话                            │
│     - SmalltalkHandler → 闲聊                               │
└─────────────────────────────────────────────────────────────┘
    ↓
返回 OneBot 消息段列衚

1.2 Handler 铟匏架构 ​

XiaoQing 采甹 莣任铟暡匏 来倄理消息每䞪 Handler 按顺序尝试倄理消息劂果倄理成功则返回吊则䌠递给䞋䞀䞪 Handler。

Handler 铟顺序 ​

python
self._handlers: tuple[MessageHandler, ...] = (
    BotNameHandler(self),      # 1. 倄理仅提及机噚人名字的消息
    CommandHandler(self),       # 2. 匹配并执行呜什
    SessionHandler(self),       # 3. 倄理掻跃䌚话
    SmalltalkHandler(self),    # 4. 倄理闲聊
)

各 Handler 职莣 ​

Handler倄理场景返回条件
BotNameHandler甚户仅发送机噚人名字劂 "小青"文本仅包含 bot_name 或其变䜓
CommandHandler甚户发送呜什劂 "/help"呜什路由匹配成功
SessionHandler甚户倄于掻跃䌚话䞭存圚掻跃䌚话
SmalltalkHandler其他情况闲聊smalltalk_mode 䞺 True

关键特性 ​

  1. 短路机制䞀旊某䞪 Handler 倄理成功后续 Handler 䞍䌚执行
  2. 䌘先级明确呜什䌘先于䌚话䌚话䌘先于闲聊
  3. 独立决策每䞪 Handler 独立决定是吊倄理互䞍圱响

xiaoqing_chat 特殊倄理 ​

圓 smalltalk_provider 讟眮䞺 xiaoqing_chat 时决策逻蟑特殊

python
if self._get_smalltalk_provider() == "xiaoqing_chat":
    return ProcessDecision(True, True)  # 所有消息郜允讞进入 SmalltalkHandler

这意味着

  • random_reply_rate 配眮倱效 - 所有矀聊消息郜䌚进入 xiaoqing_chat
  • 插件自䞻控制 - xiaoqing_chat 插件内郚有自己的频率控制和回倍抂率刀断
  • 返回空列衚䞍回倍 - 劂果插件决定䞍回倍返回 [] 即可

这种讟计让 LLM 暡型胜借根据䞊䞋文智胜刀断是吊需芁回倍比简单的随机抂率曎智胜。


2⃣ 消息接收䞎解析 ​

2.1 事件栌匏 ​

XiaoQing 接收 OneBot 标准栌匏的消息事件

json
{
    "post_type": "message",
    "message_type": "group",
    "user_id": 123456789,
    "group_id": 987654321,
    "message": [
        {"type": "text", "data": {"text": "/help 查看垮助"}}
    ]
}

2.2 消息解析 ​

normalize_message() 凜数从事件䞭提取关键信息

python
text, user_id, group_id = normalize_message(event)
# text: "/help 查看垮助"
# user_id: 123456789
# group_id: 987654321 (私聊时䞺 None)

2.3 文本提取 ​

extract_text() 凜数从 OneBot 消息段䞭提取纯文本

  • 字笊䞲消息: 盎接返回
  • 消息段数组: 提取所有 type: "text" 段的文本并拌接
  • 其他类型囟片、@等: 被応略
python
# 蟓入
message = [
    {"type": "at", "data": {"qq": "123"}},
    {"type": "text", "data": {"text": "䜠奜"}},
    {"type": "image", "data": {"file": "abc.jpg"}},
    {"type": "text", "data": {"text": "侖界"}}
]

# 蟓出
text = "䜠奜䞖界"

3⃣ 决策刀断 ​

3.1 决策逻蟑 ​

_decide_process() 方法刀断消息是吊需芁倄理返回 ProcessDecision(should_process, smalltalk_mode)

场景should_processsmalltalk_mode诎明
私聊✅ True✅ True私聊消息始终倄理可闲聊
矀聊 + 呜什前猀✅ True❌ False劂 /help䞍觊发闲聊
矀聊 + 包含 bot_name✅ True⚠ 取决于静音劂 小青 䜠奜
矀聊 + 随机觊发✅ True✅ True按 random_reply_rate 抂率
矀聊 + 静音䞭❌ False❌ False陀非有呜什前猀或 bot_name

3.2 决策䞎 Handler 铟的关系 ​

重芁理解决策刀断的结果䌚圱响 Handler 铟的执行

  1. should_process = False盎接返回 []所有 Handler 郜䞍䌚执行
  2. should_process = True进入 Handler 铟按顺序尝试各䞪 Handler

特殊场景

  • 掻跃䌚话存圚无论 should_process 劂䜕SessionHandler 郜䌚倄理䌚话䌘先级最高
  • xiaoqing_chat 䜜䞺 smalltalk_provider决策逻蟑特殊所有矀聊消息郜返回 (True, True)random_reply_rate 倱效

3.3 配眮项 ​

json
{
    "bot_name": "小青",
    "command_prefixes": ["/"],
    "require_bot_name_in_group": true,
    "random_reply_rate": 0.05
}
  • bot_name: 机噚人名称矀聊䞭提及时觊发
  • command_prefixes: 呜什前猀列衚通垞䞺 ["/"]
  • require_bot_name_in_group: 矀聊是吊需芁 @ 或提及 bot_name
  • random_reply_rate: 无觊发条件时随机回倍的抂率 (0-1)

3.4 前猀剥犻 ​

parse_text_command_context() 凜数按照以䞋顺序䞥栌倄理前猀剥犻

  1. 去陀 @机噚人䟋劂 [CQ:at,qq=123] 
  2. 去陀 bot_name䟋劂 小青支持暡糊匹配及其后的标点
  3. 去陀 command_prefixes䟋劂 /

⚠ 重芁解析规则

圓甚户蟓入 小青配眮 时

  1. bot_name小青銖先被检测并移陀剩䜙文本变䞺 配眮。
  2. 随后尝试移陀呜什前猀劂 /因䞍匹配而跳过。
  3. 最终䌠递给 Router 的文本是 配眮。

这意味着劂果䜠的插件呜什觊发词定义䞺 ["小青配眮"]将䌚匹配倱莥因䞺 Router 看到的是 "配眮"。

最䜳实践 建议圚 plugin.json 䞭定义觊发词时包含剥犻 bot 名后的版本。

python
# 蟓入: "小青 /help 查看垮助"
# 1. 剥犻 bot_name -> "/help 查看垮助"
# 2. 剥犻 prefix   -> "help 查看垮助"
# 结果: 匹配 trigger "help"
python
# 蟓入: "小青配眮"
# 1. 剥犻 bot_name -> "配眮"
# 2. 剥犻 prefix   -> "配眮" (无前猀可剥犻)
# 结果: 需匹配 trigger "配眮" (因歀建议圚 json 䞭添加 "配眮" 䜜䞺 trigger)

4⃣ 呜什路由䞎参数拆分 ​

4.1 呜什泚册 ​

每䞪插件圚 plugin.json 䞭声明呜什

json
{
    "commands": [
        {
            "name": "help",
            "triggers": ["help", "h", "垮助"],
            "help": "查看垮助 | /help [关键词]",
            "admin_only": false,
            "priority": 0
        }
    ]
}

4.2 路由匹配 ​

CommandRouter.resolve() 方法匹配呜什

python
resolved = router.resolve("help 查看垮助")
# resolved = (CommandSpec, args)
# spec.name = "help"
# spec.plugin = "core"
# args = "查看垮助"

匹配规则

  1. 遍历所有泚册的呜什
  2. 检查文本是吊以任意 trigger 匀倎
  3. 按䌘先级排序priority 越倧越䌘先
  4. 同䌘先级时trigger 越长越䌘先

4.3 参数拆分 ​

匹配成功后trigger 后面的文本䜜䞺 args 䌠递给 handler

蟓入文本: "echo 䜠奜 侖界"
匹配 trigger: "echo"
args: "䜠奜 侖界"

插件可䜿甚 core.args 暡块进䞀步解析参数

python
from core.args import parse

parsed = parse("䜠奜 侖界 -v --name=test")
# parsed.tokens = ["䜠奜", "侖界"]
# parsed.first = "䜠奜"
# parsed.second = "侖界"
# parsed.opt("v") = "true"
# parsed.opt("name") = "test"

4.4 Handler 调甚 ​

python
async def handle(command: str, args: str, event: dict, context) -> List[dict]:
    """
    Args:
        command: 呜什名plugin.json 侭的 name
        args: 参数字笊䞲trigger 后的郚分
        event: 原始 OneBot 事件
        context: 插件䞊䞋文
    
    Returns:
        OneBot 消息段列衚
    """

5⃣ 䌚话管理 ​

5.1 䌚话觊发 ​

䌚话是 Handler 铟的第䞉环圚呜什匹配倱莥后执行

python
# SessionHandler 倄理逻蟑
session = await session_manager.get(user_id, group_id)
if session:
    # 路由到䌚话插件的 handle_session()

重芁特性

  • 䌘先级䌚话倄理圚呜什之后、闲聊之前
  • 绕过觊发条件即䜿 should_process = False掻跃䌚话仍䌚倄理
  • 独立倄理䌚话倄理䞍受 random_reply_rate 或 bot_name 圱响

5.2 䌚话创建 ​

插件通过 context.create_session() 创建䌚话

python
async def handle(command, args, event, context):
    session = await context.create_session(
        initial_data={"target": 42},
        timeout=180  # 3 分钟超时
    )
    return segments("枞戏匀始")

5.3 䌚话倄理 ​

圓甚户圚䌚话䞭发送消息时调甚 handle_session()

python
async def handle_session(text: str, event: dict, context, session) -> List[dict]:
    """
    Args:
        text: 甚户蟓入的文本
        event: 原始事件
        context: 插件䞊䞋文
        session: 圓前䌚话对象
    """
    guess = int(text)
    target = session.get("target")
    if guess == target:
        await context.end_session()
        return segments("恭喜猜对了")

5.4 退出呜什 ​

以䞋呜什可退出䌚话

  • 退出、取消、exit、quit、q

6⃣ 闲聊倄理 ​

6.1 觊发条件 ​

圓以䞋条件郜满足时进入闲聊暡匏

  1. 没有匹配到呜什
  2. 没有掻跃䌚话
  3. smalltalk_mode = True

6.2 闲聊提䟛者 ​

通过配眮选择闲聊插件

json
{
    "plugins": {
        "smalltalk_provider": "xiaoqing_chat"
    }
}

支持的提䟛者

  • smalltalk: 基于规则的简单闲聊
  • xiaoqing_chat: 基于 LLM 的智胜对话

6.3 xiaoqing_chat 特殊倄理 ​

圓 smalltalk_provider 讟眮䞺 xiaoqing_chat 时

  • random_reply_rate 䞍生效 - 所有矀聊消息郜䌚进入 xiaoqing_chat 倄理
  • 插件自行决定是吊回倍 - xiaoqing_chat 有自己的频率控制和回倍抂率刀断
  • 返回空列衚衚瀺䞍回倍 - 劂果插件决定䞍回倍返回 [] 即可

这样讟计的原因是 LLM 暡型可以根据䞊䞋文刀断是吊需芁回倍比简单的随机抂率曎智胜。

6.4 倄理凜数 ​

闲聊插件需实现 handle_smalltalk()

python
async def handle_smalltalk(text: str, event: dict, context) -> List[dict]:
    """
    Args:
        text: 甚户蟓入已去陀前猀
        event: 原始事件
        context: 插件䞊䞋文
    
    Returns:
        回倍消息段或 None/[] 衚瀺䞍回倍
    """

7⃣ 静音机制 ​

7.1 静音呜什 ​

/闭嘎 30      # 静音 30 分钟
/闭嘎 1h      # 静音 1 小时
/诎话         # 解陀静音

7.2 静音圱响 ​

消息类型静音时是吊倄理
垊呜什前猀的消息✅ 倄理
䞻劚 @ 机噚人✅ 倄理呜什❌ 䞍闲聊
随机回倍❌ 䞍回倍
定时任务❌ 䞍发送由插件自行刀断

8⃣ 并发控制䞎消息队列 ​

8.1 抂述 ​

XiaoQing 䜿甚倚层并发控制机制来管理消息倄理确保系统皳定性和响应性胜。

8.2 OneBot WebSocket Client 倄理流皋 ​

┌──────────────────────────────────────────────────────┐
│  OneBot 服务噚 (NapCatQQ/go-cqhttp)                  │
└─────────────────────┬────────────────────────────────┘
                      │ WebSocket 消息掚送
                      ↓
┌──────────────────────────────────────────────────────┐
│  OneBotWsClient._listen()                            │
│  接收并解析 WebSocket 消息                            │
└─────────────────────┬────────────────────────────────┘
                      ↓
┌──────────────────────────────────────────────────────┐
│  第䞀层控制_pending_semaphore                       │
│  最倚 100 䞪消息等埅分发硬猖码䞍可配眮            │
│                                                      │
│  async with self._pending_semaphore:                │
│      await self._dispatch_event(handler, event)     │
└─────────────────────┬────────────────────────────────┘
                      ↓
┌──────────────────────────────────────────────────────┐
│  按甚户/矀分队列 (智胜讟计)                           │
│                                                      │
│  根据 queue_key 分发到䞍同队列                      │
│  - group:123:user:456 → Queue1 [event1, event2]     │
│  - user:789          → Queue2 [event3]              │
│  - group:999:user:111 → Queue3 [event4, event5]     │
│                                                      │
│  每䞪队列有独立的 _drain_queue() 协皋䞲行倄理          │
│  保证同䞀甚户圚同䞀矀的消息按顺序倄理                 │
│  允讞䞍同甚户/矀的消息并行倄理                       │
└─────────────────────┬────────────────────────────────┘
                      ↓
┌──────────────────────────────────────────────────────┐
│  _drain_queue() → handler() → app._process_event()  │
└─────────────────────┬────────────────────────────────┘
                      ↓
┌──────────────────────────────────────────────────────┐
│  第二层控制max_concurrency (默讀 5) 🔥             │
│                                                      │
│  Dispatcher.handle_event():                         │
│  async with self.semaphore:                         │
│      return await self._process_event(event)        │
│                                                      │
│  ✅ 党局并发控制的栞心                                │
│  ✅ 对所有接收方匏WS Client/Inbound郜生效         │
└─────────────────────┬────────────────────────────────┘
                      ↓
┌──────────────────────────────────────────────────────┐
│  执行呜什/䌚话/闲聊倄理并返回结果                       │
└──────────────────────────────────────────────────────┘

8.3 䞀层并发控制机制 ​

第䞀层_pending_semaphore (max_pending_events = 100) ​

  • 䜍眮core/onebot.py
  • 䜜甚限制同时等埅分发的事件数量
  • 是吊可配眮❌ 吊硬猖码
  • 圱响仅对 OneBot WS Client 有效

第二层max_concurrency (默讀 5) 🔥 ​

  • 䜍眮core/dispatcher.py
  • 䜜甚限制同时执行倄理逻蟑的数量真正的并发控制
  • 是吊可配眮✅ 是config.json
  • 圱响党局生效WS Client、Inbound Server

8.4 按甚户/矀分队列讟计 ​

栞心讟计每䞪 (group_id, user_id) 组合对应䞀䞪独立队列。

queue_key 生成规则

python
# 矀聊消息
queue_key = f"group:{group_id}:user:{user_id}"

# 私聊消息
queue_key = f"user:{user_id}"

䌘势

  1. ✅ 保证顺序同䞀甚户圚同䞀矀的消息䞥栌按顺序倄理
  2. ✅ 提高吞吐䞍同甚户/矀的消息可以并行倄理
  3. ✅ 避免阻塞某䞪甚户的慢操䜜䞍圱响其他甚户

实际运行瀺䟋

假讟 max_concurrency = 5同时收到劂䞋消息

时闎来源queue_key状态
T1矀A甚户1group:A:user:1✅ 获埗第1䞪并发槜
T2矀A甚户1group:A:user:1⏳ 圚队列䞭等埅同䞀甚户䞲行
T3矀B甚户2group:B:user:2✅ 获埗第2䞪并发槜
T4矀C甚户3group:C:user:3✅ 获埗第3䞪并发槜
T5矀D甚户4group:D:user:4✅ 获埗第4䞪并发槜
T6矀E甚户5group:E:user:5✅ 获埗第5䞪并发槜
T7矀F甚户6group:F:user:6⏞ 等埅并发槜释攟
T8矀A甚户1group:A:user:1⏳ 圚队列䞭等埅排圚 T2 后面

8.5 Inbound WebSocket Server 队列机制 ​

对于 Inbound Server被劚接收掚送有额倖的队列机制

WebSocket 消息到蟟
    ↓
攟入队列 (maxsize = ws_queue_size, 默讀 200)
    ↓
inbound_ws_max_workers 䞪 worker 从队列取消息 (默讀 8 䞪)
    ↓
通过 Semaphore 获取倄理讞可 (max_concurrency, 默讀 5)
    ↓
倄理消息 (Dispatcher)

配眮参数

参数默讀倌䜜甚范囎诎明
ws_queue_size200Inbound + OneBot等埅倄理的消息队列长床
inbound_ws_max_workers8Inbound Server并发倄理队列消息的 worker 数

⚠ 泚意inbound_ws_max_workers 仅对 Inbound WS Server 有效ws_queue_size 同时圱响 Inbound WS Server 侎 OneBot WS Client。

8.6 栞心配眮参数 ​

参数默讀倌适甚范囎诎明
max_concurrency5党局🔥 最重芁党局并发控制
inbound_ws_max_workers8Inbound ServerWorker 协皋数
ws_queue_size200Inbound + OneBot队列长床0 衚瀺䞍限制

8.7 配眮建议 ​

䜎莟蜜场景䞪人䜿甚1-3 䞪矀 ​

json
{
  "max_concurrency": 5
}

䞭等莟蜜倚䞪掻跃矀组 ​

json
{
  "max_concurrency": 10,
  "ws_queue_size": 300,
  "inbound_ws_max_workers": 12
}

高莟蜜场景倧量矀组频繁消息 ​

json
{
  "max_concurrency": 20,
  "ws_queue_size": 500,
  "inbound_ws_max_workers": 24
}

䌘化原则

  1. inbound_ws_max_workers >= max_concurrency避免 worker 空闲
  2. ws_queue_size 足借倧以吞收突发流量或讟䞺 0 䞍限制
  3. max_concurrency 䞍芁讟眮过高避免资源耗尜

8.8 性胜监控 ​

查看队列状态

GET /health

响应瀺䟋

json
{
  "status": "ok",
  "ws_connections": 1,
  "pending_jobs": 3,
  "active_sessions": 2
}

9⃣ 完敎倄理流皋囟 ​

┌─────────────────────────────────────────────────────────────────┐
│                     OneBot 消息事件到蟟                          │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▌
                    ┌─────────────────┐
                    │ post_type ==    │
                    │  "message" ?    │
                    └────────┬────────┘
                             │
                    ┌────────┮────────┐
                    │                 │
                   Yes               No ──────────────────► 応略
                    │
                    ▌
        ┌───────────────────────┐
        │   normalize_message   │
        │ 提取 text, user_id,   │
        │      group_id         │
        └───────────┬───────────┘
                    │
                    ▌
        ┌───────────────────────┐
        │   URL 检测            │──── 有 URL 䞔无前猀 ────► url_parser
        └───────────┬───────────┘
                    │
                    ▌
        ┌───────────────────────┐
        │   _decide_process    │
        │   刀断是吊倄理消息     │
        └───────────┬───────────┘
                    │
           ┌────────┮────────┐
           │                 │
    should_process        䞍倄理 ──────────────────────► 返回 []
        = True
           │
           ▌
    ┌──────────────────────────────────────────────────────────┐
    │                  Handler 铟匏倄理                          │
    │                   按䌘先级䟝次尝试                          │
    └──────────────────────────────────────────────────────────┘
                    │
           ┌────────┮────────┐
           │                 │
        BotNameHandler      倱莥
        仅机噚人名字?         │
           │                 │
      ┌────┮────┐            â–Œ
     Yes       No    ┌─────────────────┐
      │        │     │ CommandHandler  │
      â–Œ        │     │  呜什匹配?       │
  倄理完成     │     └────────┬────────┘
  _handle_bot  │              │
    _name_only  │      ┌───────┮───────┐
                │   匹配成功         未匹配
                │      │              │
                │      â–Œ              â–Œ
                │  ┌───────────┐  ┌─────────────────┐
                │  │ 权限检查   │  │ SessionHandler  │
                │  └─────┬─────┘  │  掻跃䌚话?       │
                │        │        └────────┬────────┘
                │        â–Œ         ┌────────┮────────┐
                │  ┌───────────┐ 有䌚话          无䌚话
                │  │ 执行呜什   │    │              │
                │  │ handler() │    â–Œ              â–Œ
                │  └─────┬─────┘  ┌──────────┐  ┌─────────────────┐
                │        │        │handle_   │  │ SmalltalkHandler│
                │        │        │session() │  │  smalltalk_mode? │
                │        │        └─────┬────┘  └────────┬────────┘
                │        │              │       ┌────────┮────────┐
                │        │              │      Yes              No
                │        │              │       │                │
                │        │              │       â–Œ                â–Œ
                │        │              │  ┌───────────┐      返回 []
                │        │              │  │_handle_   │
                │        │              │  │smalltalk() │
                │        │              │  └─────┬─────┘
                │        │              │        │
                └────────┎──────────────┌────────┎────────┘
                                       │
                                       ▌
                              ┌─────────────────┐
                              │ 返回消息段列衚   │
                              └─────────────────┘

📎 附圕关键代码䜍眮 ​

功胜文件凜数/方法
消息解析core/message.pynormalize_message(), extract_text()
前猀剥犻 & 䞊䞋文解析core/message.pyparse_text_command_context()
决策刀断core/dispatcher.pyDispatcher._decide_process()
呜什路由core/router.pyCommandRouter.resolve()
䌚话管理core/session.pySessionManager
静音控制core/dispatcher.pymute_group(), is_muted()
Handler 铟core/dispatcher.pyBotNameHandler, CommandHandler, SessionHandler, SmalltalkHandler

基于 MIT 讞可发垃

加蜜䞭...