Skip to content

🏗 02 - 系统架构 ​

本章把 XiaoQing 的内郚架构和工䜜原理拆匀诎明。

NOTE

本章偏向框架内郚实现。只写普通插件时可以先阅读 03-plugin-development.md。


🔭 架构总览 ​

XiaoQing 的栞心架构分成䞉层。

  1. 协议接入层server.py 和 onebot.py 莟莣接收 OneBot 事件、绎技 WebSocket 连接和发送 OneBot API 请求。
  2. 框架调床层app.py、dispatcher.py、router.py、plugin_manager.py、session.py、scheduler.py 莟莣生呜呚期、消息分发、呜什匹配、插件加蜜、倚蜮䌚话和定时任务。
  3. 插件䞚务层plugins/ 内的插件实现具䜓胜力。蜻量插件通垞只需芁 plugin.json + main.py倧型插件劂 xiaoqing_chat、pendo 和 codex 拥有自己的服务层、状态层、Web/API、LLM 子系统或后台任务队列。

栞心框架䞍盎接理解 Pendo 的莊本暡型也䞍盎接生成 xiaoqing_chat 的拟人回倍也䞍调床 Codex CLI 的内郚任务队列。它提䟛统䞀的事件、䞊䞋文、路由和发送胜力䞚务插件圚这䞪蟹界内自行组织曎倍杂的内郚架构。

                              ┌─────────────────┐
                              │   QQ 服务噚     │
                              └────────┬────────┘
                                       │
                              ┌────────▌────────┐
                              │  OneBot 实现    │
                              │  (NapCat等)     │
                              └────────┬────────┘
                                       │
              ┌────────────────────────┌────────────────────────┐
              │                        │                        │
              ▌                        ▌                        ▌
    ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
    │   HTTP POST     │    │   WebSocket     │    │   HTTP API      │
    │  (事件掚送)     │    │  (双向通信)     │    │  (发送消息)     │
    └────────┬────────┘    └────────┬────────┘    └────────▲────────┘
             │                      │                      │
             │                      │                      │
┌────────────┌──────────────────────┌──────────────────────┌────────────┐
│            │         XiaoQing 框架    │                      │            │
│            â–Œ                      â–Œ                      │            │
│  ┌─────────────────┐    ┌─────────────────┐             │            │
│  │ InboundServer   │    │ OneBotWsClient  │             │            │
│  │ (server.py)     │    │ (onebot.py)     │             │            │
│  └────────┬────────┘    └────────┬────────┘             │            │
│           │                      │                      │            │
│           └──────────┬───────────┘                      │            │
│                      │ 事件                             │            │
│                      â–Œ                                  │            │
│           ┌──────────────────────────────────────────────            │
│           │         Dispatcher (dispatcher.py)          │            │
│           │  • 消息解析                                 │            │
│           │  • 觊发条件刀断                             │            │
│           │  • 䌚话管理                                 │            │
│           │  • 呜什/闲聊路由                           │            │
│           └────────────────┬────────────────────────────┘            │
│                            │                                         │
│                            â–Œ                                         │
│           ┌─────────────────────────────────────────────┐            │
│           │            Router (router.py)               │            │
│           │  • 呜什觊发词匹配                           │            │
│           │  • 䌘先级排序                               │            │
│           └────────────────┬────────────────────────────┘            │
│                            │                                         │
│                            â–Œ                                         │
│           ┌─────────────────────────────────────────────┐            │
│           │       PluginManager (plugin_manager.py)      │            │
│           │  • 插件加蜜/卞蜜                            │            │
│           │  • 热重蜜监控                               │            │
│           │  • Context 构建                             │            │
│           └────────────────┬────────────────────────────┘            │
│                            │                                         │
│                            â–Œ                                         │
│           ┌─────────────────────────────────────────────┐            │
│           │           Plugin.handle()                    │            │
│           │           䜠的插件代码                       │            │
│           └────────────────┬────────────────────────────┘            │
│                            │                                         │
│                            │ 消息段                                  │
│                            â–Œ                                         │
│           ┌─────────────────────────────────────────────┐            │
│           │        OneBotHttpSender (onebot.py)         ├────────────┘
│           │           发送响应消息                       │
│           └─────────────────────────────────────────────┘
│
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  │ SessionManager  │    │ SchedulerManager│    │ ConfigManager   │
│  │ (session.py)    │    │ (scheduler.py)  │    │ (config.py)     │
│  │ 倚蜮对话管理    │    │ 定时任务管理    │    │ 配眮热重蜜      │
│  └─────────────────┘    └─────────────────┘    └─────────────────┘
│
└──────────────────────────────────────────────────────────────────────┘

⚙ 栞心组件 ​

圚圓前项目䞭core/ 的职莣蟹界保持皳定。它倄理所有插件共享的通甚问题䞍把某䞪䞚务插件的规则写进栞心。少数特殊分支服务于枅晰的职莣划分䟋劂 smalltalk_provider = "xiaoqing_chat" 时让所有矀聊消息进入 SmalltalkHandler由聊倩插件刀断回倍时机栞心只莟莣入口分发。

1. XiaoQingAppapp.py ​

职莣应甚入口管理所有组件的生呜呚期。

python
class XiaoQingApp:
    def __init__(self, root: Path):
        # 初始化配眮
        self.config_manager = ConfigManager(...)
        
        # 初始化各组件
        self.router = CommandRouter()
        self.plugin_manager = PluginManager(...)
        self.scheduler = SchedulerManager(...)
        self.session_manager = SessionManager(...)
        self.dispatcher = Dispatcher(...)
        
    async def start(self):
        # 1. 初始化并发控制
        concurrency = self.config.get("max_concurrency", 5)
        self.dispatcher.semaphore = asyncio.Semaphore(concurrency)

        # 2. 创建 HTTP 䌚话
        self.http_session = aiohttp.ClientSession()
        
        # 3. 加蜜所有插件
        self.plugin_manager.load_all()
        
        # 4. 启劚通信服务
        if enable_ws_client:
            self.ws_client.connect_and_listen(...)
        if enable_inbound_server:
            self.inbound_server.start()
            
    async def stop(self):
        # 䌘雅关闭所有组件
        if self.ws_client:
            await self.ws_client.stop()
        # ...

关键属性

  • config - 配眮字兞
  • secrets - 敏感配眮
  • is_admin(user_id) - 刀断是吊管理员

2. Dispatcherdispatcher.py ​

职莣消息分发的栞心采甚 Handler 铟匏倄理暡匏。

python
class Dispatcher:
    def __init__(self, ...):
        # Handler 铟按䌘先级䟝次尝试倄理
        self._handlers: tuple[MessageHandler, ...] = (
            BotNameHandler(self),      # 1. 倄理仅提及机噚人名字
            CommandHandler(self),       # 2. 呜什匹配
            SessionHandler(self),       # 3. 掻跃䌚话
            SmalltalkHandler(self),    # 4. 闲聊
        )
    
    async def handle_event(self, event: Dict) -> List[Dict]:
        # 1. 并发控制
        async with self.semaphore:
            return await self._handle_event(event)
    
    async def _handle_event(self, event: Dict) -> List[Dict]:
        # 2. 解析消息
        text, user_id, group_id = normalize_message(event)
        
        # 3. 决策刀断
        decision = self._make_decision(text, user_id, group_id)
        if not decision.should_process:
            return []
        
        # 4. URL 检测党局监听
        if url_match and not has_prefix:
            result = await url_plugin.handle_url(url, event, context)
            if result:
                return result
        
        # 5. Handler 铟匏倄理
        for handler in self._handlers:
            result = await handler.handle(text, event, context)
            if result is not None:
                return result
        
        return []

Handler 铟工䜜原理

每䞪 Handler 实现盞同的接口按顺序尝试倄理

python
class MessageHandler(ABC):
    @abstractmethod
    async def handle(self, text: str, event: Dict, context) -> Optional[List[Dict]]:
        """倄理消息返回消息段列衚或 None衚瀺䞍倄理"""
        pass

IMPORTANT

短路机制䞀旊某䞪 Handler 返回非 None 结果后续 Handler 䞍䌚执行。呜什䌘先于闲聊䌚话倄理䌘先于普通匹配。

消息倄理决策树

收到消息
    │
    ├─ 私聊消息 ─────────────────────────────────> 进入 Handler 铟
    │
    └─ 矀聊消息
         │
         ├─ 存圚呜什前猀劂 /help────────────> 进入 Handler 铟呜什䌘先
         │
         ├─ 包含机噚人名字劂"小青"──────────> 进入 Handler 铟可闲聊
         │
         ├─ 矀被静音 ──────────────────────────> 䞍倄理呜什陀倖
         │
         ├─ 存圚掻跃䌚话 ──────────────────────> 进入 Handler 铟䌚话䌘先
         │
         ├─ 随机觊发random_reply_rate──────> 进入 Handler 铟闲聊暡匏
         │
         └─ 吊则 ──────────────────────────────> 䞍倄理

Handler 铟倄理流皋
    │
    ├─ BotNameHandler仅机噚人名字 ───────────> 倄理并返回
    │       │
    │       └─ 吊 ────────────────────────────────> 继续䞋䞀䞪 Handler
    │
    ├─ CommandHandler呜什匹配成功 ───────────> 倄理并返回
    │       │
    │       └─ 吊 ────────────────────────────────> 继续䞋䞀䞪 Handler
    │
    ├─ SessionHandler存圚掻跃䌚话 ───────────> 倄理并返回
    │       │
    │       └─ 吊 ────────────────────────────────> 继续䞋䞀䞪 Handler
    │
    └─ SmalltalkHandlersmalltalk_mode=True ─> 倄理并返回
            │
            └─ 吊 ────────────────────────────────> 返回空列衚

xiaoqing_chat 特殊倄理

圓 smalltalk_provider 讟眮䞺 xiaoqing_chat 时决策逻蟑有特殊倄理。

  • 所有矀聊消息郜返回 should_process=True 和 smalltalk_mode=True
  • random_reply_rate 配眮倱效
  • xiaoqing_chat 插件内郚有自己的 attention gate、硬频控、普通矀聊插话抂率、heartflow 和 PFC planner
  • /xc、私聊、@、盎接叫名字、只喊名字后的远问、reply 匕甚小青、以及近期䞊䞋文锚定小青的“她/ta”共指召唀䌚圚插件内标记䞺 forced跳过普通插话抂率

3. Routerrouter.py ​

职莣根据觊发词匹配呜什。

python
@dataclass
class CommandSpec:
    plugin: str       # 所属插件名
    name: str         # 呜什名
    triggers: List[str]  # 觊发词列衚
    help_text: str    # 垮助文本
    admin_only: bool  # 是吊仅管理员
    handler: Handler  # 倄理凜数
    priority: int     # 䌘先级

class CommandRouter:
    def register(self, spec: CommandSpec):
        """泚册呜什"""
        self._commands.append(spec)
        
    def resolve(self, text: str) -> Optional[Tuple[CommandSpec, str]]:
        """解析呜什"""
        # 按䌘先级和觊发词长床排序长的䌘先
        for spec in sorted_commands:
            for trigger in spec.triggers:
                if text.startswith(trigger):
                    args = text[len(trigger):].strip()
                    return spec, args
        return None

䌘先级规则

  1. priority 数倌越倧越䌘先
  2. 同䌘先级时觊发词越长越䌘先避免 help 抢走 helpme 的匹配

4. PluginManagerplugin_manager.py ​

职莣管理插件的加蜜、卞蜜和热重蜜。

python
class PluginManager:
    def load_all(self):
        """加蜜 plugins/ 䞋所有插件"""
        for plugin_dir in self.plugins_dir.iterdir():
            if self._is_plugin_dir(plugin_dir):
                self.load_plugin(plugin_dir)
    
    def load_plugin(self, plugin_dir: Path):
        """加蜜单䞪插件"""
        # 1. 读取 plugin.json
        definition = self._load_definition(plugin_dir)
        
        # 2. 富入 main.py 暡块
        module = self._load_module(plugin_dir, definition)
        
        # 3. 泚册呜什到 Router
        self._register_commands(definition, module)
        
        # 4. 调甚 init() 钩子劂果存圚
        #    若返回协皋䌚被纳入 init task 跟螪并等埅完成
        if hasattr(module, "init"):
            result = module.init()
            if asyncio.iscoroutine(result):
                ...
    
    async def reload_plugin(self, name: str):
        """热重蜜插件"""
        await self.unload_plugin(name)
        self.load_plugin(self.plugins_dir / name)
        await self.wait_inits()
    
    async def watch(self):
        """监控插件文件变化自劚重蜜"""
        while True:
            await asyncio.sleep(self._poll_interval)
            # 检查 mtime劂有变化则重蜜

诎明应甚启劚时䌚自劚创建配眮 watcher插件 watcher 仅圚 config.json 里启甚 enable_plugin_watcher 后才䌚启劚。插件匂步 init() 圚重蜜路埄䞊也䌚被等埅劂果初始化倱莥半加蜜插件䌚被立即卞蜜避免继续接流量。

插件加蜜流皋

plugins/echo/
    │
    ├── plugin.json ──> PluginDefinition
    │                   (name, version, commands, schedule...)
    │
    └── main.py ──────> Module
                        (handle, init, shutdown...)
                             │
                             ▌
                      Router.register(CommandSpec)

5. SessionManagersession.py ​

职莣管理倚蜮对话的䌚话状态。

python
@dataclass
class Session:
    user_id: int
    group_id: Optional[int]  # None = 私聊
    plugin_name: str         # 所属插件
    data: Dict[str, Any]     # 䌚话数据
    timeout: float           # è¶…æ—¶æ—¶é—Ž
    
    def get(self, key, default=None): ...
    def set(self, key, value): ...
    def is_expired(self) -> bool: ...

class SessionManager:
    # 䌚话存傚(user_id, group_id) -> Session
    _sessions: Dict[tuple, Session]
    
    async def create(self, user_id, group_id, plugin_name, initial_data, timeout):
        """创建新䌚话"""
        
    async def get(self, user_id, group_id) -> Optional[Session]:
        """获取䌚话自劚枅理过期"""
        
    async def delete(self, user_id, group_id) -> bool:
        """删陀䌚话"""

䌚话生呜呚期

1. 甚户发送呜什劂 /猜数字
       │
       ▌
2. 插件调甚 context.create_session()
       │
       ▌
3. 䌚话创建存傚初始数据
       │
       ▌
4. 甚户后续消息被路由到 handle_session()
       │
       ▌
5. 插件曎新䌚话数据 session.set()
       │
       ├─ 继续对话 ──> 回到步骀 4
       │
       └─ 对话结束 ──> context.end_session()
                           │
                           ▌
                      䌚话被删陀

6. SchedulerManagerscheduler.py ​

职莣管理定时任务。

python
class SchedulerManager:
    def __init__(self, timezone: str):
        self.scheduler = AsyncIOScheduler(timezone=timezone)
        self.scheduler.start()
    
    def add_job(self, job_id: str, func, cron: Dict):
        """添加定时任务"""
        self.scheduler.add_job(func, trigger="cron", id=job_id, **cron)
    
    def remove_job(self, job_id: str):
        """移陀任务"""
        
    def clear_prefix(self, prefix: str):
        """移陀某前猀的所有任务甚于插件卞蜜"""

Cron 衚蟟匏瀺䟋

python
# 每倩 8:00
{"hour": 8, "minute": 0}

# 每 2 小时
{"hour": "*/2"}

# 工䜜日 9:00
{"day_of_week": "mon-fri", "hour": 9}

# 每月 1 号 0:00
{"day": 1, "hour": 0, "minute": 0}

7. OneBot 通信onebot.py + server.py ​

䞀种通信方匏

OneBotHttpSender - 发送消息 ​

python
class OneBotHttpSender:
    async def send_action(self, action: Dict):
        """发送 OneBot Action"""
        url = f"{self.http_base}/{action['action']}"
        await self.session.post(url, json=action['params'], headers=headers)

OneBotWsClient - WebSocket 双向通信 ​

python
class OneBotWsClient:
    async def connect_and_listen(self, handler):
        """连接并持续监听"""
        async with websockets.connect(self.ws_uri) as ws:
            async for message in ws:
                event = json.loads(message)
                await handler(event)
    
    async def send_action(self, action: Dict):
        """通过 WS 发送"""
        await self._ws.send(json.dumps(action))

InboundServer - 被劚接收 ​

python
class InboundServer:
    """HTTP 服务噚接收 OneBot 掚送"""
    
    async def post_event(self, request):
        """POST /event - 接收事件"""
        payload = await request.json()
        actions = await self.handler(payload)
        return web.json_response({"actions": actions})
    
    async def ws_handler(self, request):
        """WebSocket 端点"""
        # 持久连接倄理

🔄 数据流诊解 ​

完敎请求流皋 ​

1. OneBot 掚送事件
   POST http://127.0.0.1:12000/event
   {
     "post_type": "message",
     "message_type": "group",
     "group_id": 123456,
     "user_id": 789,
     "message": [{"type": "text", "data": {"text": "/echo hello"}}]
   }

2. InboundServer 接收
   └─ 验证 Authorization Token
   └─ 解析 JSON
   └─ 调甚 handler(event)

3. Dispatcher 倄理
   └─ normalize_message() 提取 text="echo hello", user_id=789, group_id=123456
   └─ _make_decision() 刀断需芁倄理有呜什前猀
   └─ URL 检测无 URL跳过
   └─ Handler 铟倄理
       ├─ BotNameHandler䞍是仅机噚人名字 → None
       ├─ CommandHandler匹配成功
       │   └─ router.resolve("echo hello") 埗到 (echo插件, "hello")
       │   └─ 权限检查通过
       │   └─ 构建 context
       │   └─ 调甚 echo.handle("echo", "hello", event, context)
       └─ 短路后续 Handler 䞍执行

4. 插件倄理
   └─ 返回 [{"type": "text", "data": {"text": "hello"}}]

5. 构建响应
   └─ build_action(segs, user_id, group_id)
   └─ {
        "action": "send_group_msg",
        "params": {
          "group_id": 123456,
          "message": [{"type": "text", "data": {"text": "hello"}}]
        }
      }

6. 返回给 OneBot
   └─ InboundServer 返回 {"actions": [...]}
   └─ OneBot 执行 action发送消息到 QQ

䌚话倄理流皋瀺䟋 ​

1. 甚户发送 /guess 启劚猜数字枞戏
   └─ guess.handle() 创建䌚话
   └─ context.create_session(initial_data={"target": 42})

2. 甚户后续消息 "50"
   └─ Dispatcher 倄理
   └─ Handler 铟
       ├─ BotNameHandlerNone
       ├─ CommandHandler无呜什匹配 → None
       ├─ SessionHandler发现掻跃䌚话
       │   └─ 调甚 guess.handle_session("50", event, context, session)
       │   └─ 返回 ["倪倧了"]
       └─ 短路

3. 甚户猜测正确 "42"
   └─ SessionHandler 倄理
   └─ guess.handle_session() 刀断正确
   └─ context.end_session() 删陀䌚话
   └─ 返回 ["恭喜䜠猜对了"]

⚡ 并发控制 ​

XiaoQing 䜿甚 asyncio.Semaphore 控制并发

python
# app.py
concurrency = int(config.get("max_concurrency", 5))
self.dispatcher = Dispatcher(..., semaphore=asyncio.Semaphore(concurrency))

# dispatcher.py
async def handle_event(self, event):
    async with self.semaphore:  # 最倚同时倄理 5 条消息
        return await self._handle_event(event)

🧩 插件内嵌服务 ​

郚分插件可以圚框架之倖独立运行附加服务。兞型案䟋是 pendo 插件

XiaoQing 䞻进皋
├── 正垞消息倄理流皋Dispatcher → Plugin
└── pendo 插件main.py
        └── 插件初始化或 /pendo web start
                └── FastAPI Web Serveruvicorn
                        ├── /api/*  # REST APIJWT 鉎权、CRUD、统计、Bundle、widget
                        └── /*      # 静态 SPA 文件

特点

  • Web Server 圚独立后台线皋䞭运行䞍阻塞消息倄理
  • 插件初始化䌚尝试自劚启劚也可以通过 /pendo web start 手劚重试通过 /pendo web stop 关闭
  • 应甚退出、插件卞蜜或 Ctrl+C 时䌚先请求 Pendo Web 䌘雅停止再枅理数据库和运行时状态
  • 支持通过 nginx 圚子路埄劂 /pendo/䞋反向代理访问
  • Pendo Web 䞎聊倩呜什共甚 plugins/pendo/services/db.py、utils/validators.py 和事件囟/提醒服务避免 Web 侎 CLI 各自绎技䞀套字段语义

及䞀类独立服务是 codex 插件的后台队列。它䞍䜿甚 SessionManager 捕获甚户后续消息而是圚插件内郚绎技 label -> session/thread/queue

  • /codex create <label> [cwd:<path>] 创建䞚务䌚话标筟
  • /codex <label> <任务> 将任务攟入该标筟队列handler 立即返回“已收到”
  • 同䞀标筟内任务䞲行执行䞍同标筟受 max_parallel_jobs 限制并行执行
  • 任务完成后通过 context.send_action() 䞻劚发送文字和囟片结果底层仍走统䞀 OneBot 发送铟路
  • 䌚话玢匕保存圚 plugins/codex/data/sessions.json每䞪标筟的记圕、囟片和任务 artifacts 保存圚 plugins/codex/data/session/<label>/

这种方匏适合耗时蟃长䜆䞍应占甚 bot 倚蜮䌚话的后台工䜜。


➡ 䞋䞀步 ​

基于 MIT 讞可发垃

加蜜䞭...