Skip to content

🏗 02 - 系统架构 ​

本章把 XiaoQing 的内郚架构和工䜜原理拆匀诎明。

NOTE

本章偏向框架内郚实现。只写普通插件时可以先阅读 03-plugin-development.md。


🔭 架构总览 ​

XiaoQing 的栞心架构分成䞉层。

  1. 协议接入层server.py 和 onebot.py 莟莣接收 OneBot 事件、绎技 WebSocket 连接和发送 OneBot API 请求。
  2. 框架调床层app.py、dispatcher.py、router.py、plugin_manager.py、session.py、scheduler.py 莟莣生呜呚期、消息分发、呜什匹配、插件加蜜、倚蜮䌚话和定时任务。
  3. 插件䞚务层plugins/ 内的插件实现具䜓胜力。蜻量插件通垞只需芁 plugin.json + main.py倧型插件劂 xiaoqing_chat、pendo 和 codex 拥有自己的服务层、状态层、Web/API、LLM 子系统或后台任务队列。

栞心框架䞍盎接理解 Pendo 的莊本暡型也䞍盎接生成 xiaoqing_chat 的拟人回倍也䞍调床 Codex CLI 的内郚任务队列。它提䟛统䞀的事件、䞊䞋文、路由和发送胜力䞚务插件圚这䞪蟹界内自行组织曎倍杂的内郚架构。

                              ┌─────────────────┐
                              │   QQ 服务噚     │
                              └────────┬────────┘
                                       │
                              ┌────────▌────────┐
                              │  OneBot 实现    │
                              │  (NapCat等)     │
                              └────────┬────────┘
                                       │
              ┌────────────────────────┌────────────────────────┐
              │                        │                        │
              ▌                        ▌                        ▌
    ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
    │   HTTP POST     │    │   WebSocket     │    │   HTTP API      │
    │  (事件掚送)     │    │  (双向通信)     │    │  (发送消息)     │
    └────────┬────────┘    └────────┬────────┘    └────────▲────────┘
             │                      │                      │
             │                      │                      │
┌────────────┌──────────────────────┌──────────────────────┌────────────┐
│            │         XiaoQing 框架    │                      │            │
│            â–Œ                      â–Œ                      │            │
│  ┌─────────────────┐    ┌─────────────────┐             │            │
│  │ InboundServer   │    │ OneBotWsClient  │             │            │
│  │ (server.py)     │    │ (onebot.py)     │             │            │
│  └────────┬────────┘    └────────┬────────┘             │            │
│           │                      │                      │            │
│           └──────────┬───────────┘                      │            │
│                      │ 事件                             │            │
│                      â–Œ                                  │            │
│           ┌──────────────────────────────────────────────            │
│           │         Dispatcher (dispatcher.py)          │            │
│           │  • 消息解析                                 │            │
│           │  • 觊发条件刀断                             │            │
│           │  • 䌚话管理                                 │            │
│           │  • 呜什/闲聊路由                           │            │
│           └────────────────┬────────────────────────────┘            │
│                            │                                         │
│                            â–Œ                                         │
│           ┌─────────────────────────────────────────────┐            │
│           │            Router (router.py)               │            │
│           │  • 呜什觊发词匹配                           │            │
│           │  • 䌘先级排序                               │            │
│           └────────────────┬────────────────────────────┘            │
│                            │                                         │
│                            â–Œ                                         │
│           ┌─────────────────────────────────────────────┐            │
│           │       PluginManager (plugin_manager.py)      │            │
│           │  • 插件加蜜/卞蜜                            │            │
│           │  • 热重蜜监控                               │            │
│           │  • Context 构建                             │            │
│           └────────────────┬────────────────────────────┘            │
│                            │                                         │
│                            â–Œ                                         │
│           ┌─────────────────────────────────────────────┐            │
│           │           Plugin.handle()                    │            │
│           │           䜠的插件代码                       │            │
│           └────────────────┬────────────────────────────┘            │
│                            │                                         │
│                            │ 消息段                                  │
│                            â–Œ                                         │
│           ┌─────────────────────────────────────────────┐            │
│           │        OneBotHttpSender (onebot.py)         ├────────────┘
│           │           发送响应消息                       │
│           └─────────────────────────────────────────────┘
│
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  │ SessionManager  │    │ SchedulerManager│    │ ConfigManager   │
│  │ (session.py)    │    │ (scheduler.py)  │    │ (config.py)     │
│  │ 倚蜮对话管理    │    │ 定时任务管理    │    │ 配眮热重蜜      │
│  └─────────────────┘    └─────────────────┘    └─────────────────┘
│
└──────────────────────────────────────────────────────────────────────┘

⚙ 栞心组件 ​

圚圓前项目䞭core/ 的职莣蟹界保持皳定。它倄理所有插件共享的通甚问题䞍把某䞪䞚务插件的规则写进栞心。smalltalk_provider = "xiaoqing_chat" 时栞心先调甚插件的 observe_message() 观察消息只有通过 dispatcher 闚控并萜到 smalltalk 回萜时才由聊倩插件刀断是吊实际回倍。

1. XiaoQingAppapp.py ​

职莣应甚入口管理所有组件的生呜呚期。

python
class XiaoQingApp:
    def __init__(self, root: Path):
        # 初始化配眮
        self.config_manager = ConfigManager(...)
        
        # 初始化各组件
        self.router = CommandRouter()
        self.plugin_manager = PluginManager(...)
        self.scheduler = SchedulerManager(...)
        self.session_manager = SessionManager(...)
        self.dispatcher = Dispatcher(...)
        
    async def start(self):
        # 1. 初始化并发控制
        concurrency = self.config.get("max_concurrency", 5)
        self.dispatcher.semaphore = asyncio.Semaphore(concurrency)

        # 2. 创建 HTTP 䌚话
        self.http_session = aiohttp.ClientSession()
        
        # 3. 加蜜所有插件
        self.plugin_manager.load_all()
        
        # 4. 启劚通信服务
        if enable_ws_client:
            self.ws_client.connect_and_listen(...)
        if enable_inbound_server:
            self.inbound_server.start()
            
    async def stop(self):
        # 䌘雅关闭所有组件
        if self.ws_client:
            await self.ws_client.stop()
        # ...

关键属性

  • config - 配眮字兞
  • secrets - 敏感配眮
  • is_admin(user_id) - 刀断是吊管理员

2. Dispatcherdispatcher.py ​

职莣消息分发的栞心。Dispatcher 䜿甚单䞪线性流皋倄理消息倄理状态保存圚 MessageContext 䞎局郚控制流䞭。

python
class Dispatcher:
    async def _process_event(self, event: dict[str, Any]) -> list[dict[str, Any]]:
        ctx = self.parser.parse(event)
        if ctx is None:
            return []

        await self._observe_message(ctx)

        if ctx.is_url_only:
            return await self._invoke_url_parser(ctx, ctx.clean_text.strip()) or []

        should_process = (
            ctx.is_private
            or not config.get("require_bot_name_in_group", True)
            or ctx.has_prefix
            or has_active_session(ctx)
        )
        if not should_process:
            return []

        if ctx.is_only_bot_name:
            return await self._handle_bot_name_only(ctx)

        resolved = self.router.resolve(ctx.clean_text)
        if resolved:
            return await self._execute_command(resolved, ctx) or []

        if ctx.has_command_prefix:
            return unknown_command_hint(ctx)

        if ctx.cached_session:
            return await self._try_handle_session(ctx) or []

        if self.is_muted(ctx.group_id):
            return []

        return await self._handle_smalltalk(ctx)

关键解析信号

  • has_prefix 衚瀺消息以呜什前猀默讀 /匀倎或包含 bot_name任意䜍眮或包含 @机噚人任意䜍眮。
  • has_command_prefix 单独标识䞥栌以呜什前猀匀倎。未知呜什提瀺只看这䞪字段。
  • URL 倄理改甚 ctx.is_url_only仅圓 clean_text strip 后敎䜓匹配 ^https?://\S+$ 时调床到 url_parser。文本䞭倹垊 URL 䞍䌚觊发 URL 短路。

线性倄理顺序

Step A: URL short-circuitclean_text 单 URL → url_parsermute 䞍圱响
Step B: 倄理闚控私聊、require_bot_name_in_group=False、has_prefix、掻跃 session
Step C: is_only_bot_name → 默讀回应 / call_bot_name_only
Step D: router 呜䞭 → 执行呜什
Step E: has_command_prefix 䞔呜什未呜䞭 → 未知呜什提瀺
Step F: 掻跃 session → 蜬 session 插件
Step G: 回萜 smalltalk providermute 仅圚歀步阻塞

xiaoqing_chat 仍然通过 observe_message() 观察已解析消息是吊实际回倍由插件内郚的 attention gate、硬频控、普通矀聊插话抂率、heartflow 和 PFC planner 决定。


3. Routerrouter.py ​

职莣根据觊发词匹配呜什。

python
@dataclass
class CommandSpec:
    plugin: str       # 所属插件名
    name: str         # 呜什名
    triggers: List[str]  # 觊发词列衚
    help_text: str    # 垮助文本
    admin_only: bool  # 是吊仅管理员
    handler: Handler  # 倄理凜数
    priority: int     # 䌘先级

class CommandRouter:
    def register(self, spec: CommandSpec):
        """泚册呜什"""
        self._commands.append(spec)
        
    def resolve(self, text: str) -> Optional[Tuple[CommandSpec, str]]:
        """解析呜什"""
        # 按䌘先级和觊发词长床排序长的䌘先
        for spec in sorted_commands:
            for trigger in spec.triggers:
                if text.startswith(trigger):
                    args = text[len(trigger):].strip()
                    return spec, args
        return None

䌘先级规则

  1. priority 数倌越倧越䌘先
  2. 同䌘先级时觊发词越长越䌘先避免 help 抢走 helpme 的匹配

4. PluginManagerplugin_manager.py ​

职莣管理插件的加蜜、卞蜜和热重蜜。

python
class PluginManager:
    def load_all(self):
        """加蜜 plugins/ 䞋所有插件"""
        for plugin_dir in self.plugins_dir.iterdir():
            if self._is_plugin_dir(plugin_dir):
                self.load_plugin(plugin_dir)
    
    def load_plugin(self, plugin_dir: Path):
        """加蜜单䞪插件"""
        # 1. 读取 plugin.json
        definition = self._load_definition(plugin_dir)
        
        # 2. 富入 main.py 暡块
        module = self._load_module(plugin_dir, definition)
        
        # 3. 泚册呜什到 Router
        self._register_commands(definition, module)
        
        # 4. 调甚 init() 钩子劂果存圚
        #    若返回协皋䌚被纳入 init task 跟螪并等埅完成
        if hasattr(module, "init"):
            result = module.init()
            if asyncio.iscoroutine(result):
                ...
    
    async def reload_plugin(self, name: str):
        """热重蜜插件"""
        await self.unload_plugin(name)
        self.load_plugin(self.plugins_dir / name)
        await self.wait_inits()
    
    async def watch(self):
        """监控插件文件变化自劚重蜜"""
        while True:
            await asyncio.sleep(self._poll_interval)
            # 检查 mtime劂有变化则重蜜

诎明应甚启劚时䌚自劚创建配眮 watcher插件 watcher 仅圚 config.json 里启甚 enable_plugin_watcher 后才䌚启劚。插件匂步 init() 圚重蜜路埄䞊也䌚被等埅劂果初始化倱莥半加蜜插件䌚被立即卞蜜避免继续接流量。

插件加蜜流皋

plugins/echo/
    │
    ├── plugin.json ──> PluginDefinition
    │                   (name, version, commands, schedule...)
    │
    └── main.py ──────> Module
                        (handle, init, shutdown...)
                             │
                             ▌
                      Router.register(CommandSpec)

5. SessionManagersession.py ​

职莣管理倚蜮对话的䌚话状态。

python
@dataclass
class Session:
    user_id: int
    group_id: Optional[int]  # None = 私聊
    plugin_name: str         # 所属插件
    data: Dict[str, Any]     # 䌚话数据
    timeout: float           # è¶…æ—¶æ—¶é—Ž
    
    def get(self, key, default=None): ...
    def set(self, key, value): ...
    def is_expired(self) -> bool: ...

class SessionManager:
    # 䌚话存傚(user_id, group_id) -> Session
    _sessions: Dict[tuple, Session]
    
    async def create(self, user_id, group_id, plugin_name, initial_data, timeout):
        """创建新䌚话"""
        
    async def get(self, user_id, group_id) -> Optional[Session]:
        """获取䌚话自劚枅理过期"""
        
    async def delete(self, user_id, group_id) -> bool:
        """删陀䌚话"""

䌚话生呜呚期

1. 甚户发送呜什劂 /猜数字
       │
       ▌
2. 插件调甚 context.create_session()
       │
       ▌
3. 䌚话创建存傚初始数据
       │
       ▌
4. 甚户后续消息被路由到 handle_session()
       │
       ▌
5. 插件曎新䌚话数据 session.set()
       │
       ├─ 继续对话 ──> 回到步骀 4
       │
       └─ 对话结束 ──> context.end_session()
                           │
                           ▌
                      䌚话被删陀

6. SchedulerManagerscheduler.py ​

职莣管理定时任务。

python
class SchedulerManager:
    def __init__(self, timezone: str):
        self.scheduler = AsyncIOScheduler(timezone=timezone)
        self.scheduler.start()
    
    def add_job(self, job_id: str, func, cron: Dict):
        """添加定时任务"""
        self.scheduler.add_job(func, trigger="cron", id=job_id, **cron)
    
    def remove_job(self, job_id: str):
        """移陀任务"""
        
    def clear_prefix(self, prefix: str):
        """移陀某前猀的所有任务甚于插件卞蜜"""

Cron 衚蟟匏瀺䟋

python
# 每倩 8:00
{"hour": 8, "minute": 0}

# 每 2 小时
{"hour": "*/2"}

# 工䜜日 9:00
{"day_of_week": "mon-fri", "hour": 9}

# 每月 1 号 0:00
{"day": 1, "hour": 0, "minute": 0}

7. OneBot 通信onebot.py + server.py ​

䞀种通信方匏

OneBotHttpSender - 发送消息 ​

python
class OneBotHttpSender:
    async def send_action(self, action: Dict):
        """发送 OneBot Action"""
        url = f"{self.http_base}/{action['action']}"
        await self.session.post(url, json=action['params'], headers=headers)

OneBotWsClient - WebSocket 双向通信 ​

python
class OneBotWsClient:
    async def connect_and_listen(self, handler):
        """连接并持续监听"""
        async with websockets.connect(self.ws_uri) as ws:
            async for message in ws:
                event = json.loads(message)
                await handler(event)
    
    async def send_action(self, action: Dict):
        """通过 WS 发送"""
        await self._ws.send(json.dumps(action))

InboundServer - 被劚接收 ​

python
class InboundServer:
    """HTTP 服务噚接收 OneBot 掚送"""
    
    async def post_event(self, request):
        """POST /event - 接收事件"""
        payload = await request.json()
        actions = await self.handler(payload)
        return web.json_response({"actions": actions})
    
    async def ws_handler(self, request):
        """WebSocket 端点"""
        # 持久连接倄理

🔄 数据流诊解 ​

完敎请求流皋 ​

1. OneBot 掚送事件
   POST http://127.0.0.1:12000/event
   {
     "post_type": "message",
     "message_type": "group",
     "group_id": 123456,
     "user_id": 789,
     "message": [{"type": "text", "data": {"text": "/echo hello"}}]
   }

2. InboundServer 接收
   └─ 验证 Authorization Token
   └─ 解析 JSON
   └─ 调甚 handler(event)

3. Dispatcher 倄理
   └─ MessageParser.parse() 构建 MessageContext
   └─ ctx.has_command_prefix=Truectx.clean_text="echo hello"
   └─ ctx.is_url_only=False跳过 URL 短路
   └─ 倄理闚控通过
   └─ router.resolve("echo hello") 埗到 (echo插件, "hello")
   └─ 权限检查通过
   └─ 构建 context
   └─ 调甚 echo.handle("echo", "hello", event, context)

4. 插件倄理
   └─ 返回 [{"type": "text", "data": {"text": "hello"}}]

5. 构建响应
   └─ build_action(segs, user_id, group_id)
   └─ {
        "action": "send_group_msg",
        "params": {
          "group_id": 123456,
          "message": [{"type": "text", "data": {"text": "hello"}}]
        }
      }

6. 返回给 OneBot
   └─ InboundServer 返回 {"actions": [...]}
   └─ OneBot 执行 action发送消息到 QQ

䌚话倄理流皋瀺䟋 ​

1. 甚户发送 /guess 启劚猜数字枞戏
   └─ guess.handle() 创建䌚话
   └─ context.create_session(initial_data={"target": 42})

2. 甚户后续消息 "50"
   └─ Dispatcher 倄理
   └─ 呜什未呜䞭
   └─ Step F 发现掻跃䌚话
   └─ 调甚 guess.handle_session("50", event, context, session)
   └─ 返回 ["倪倧了"]

3. 甚户猜测正确 "42"
   └─ Step F 䌚话倄理
   └─ guess.handle_session() 刀断正确
   └─ context.end_session() 删陀䌚话
   └─ 返回 ["恭喜䜠猜对了"]

⚡ 并发控制 ​

XiaoQing 䜿甚 asyncio.Semaphore 控制并发

python
# app.py
concurrency = int(config.get("max_concurrency", 5))
self.dispatcher = Dispatcher(..., semaphore=asyncio.Semaphore(concurrency))

# dispatcher.py
async def handle_event(self, event):
    async with self.semaphore:  # 最倚同时倄理 5 条消息
        return await self._handle_event(event)

🧩 插件内嵌服务 ​

郚分插件可以圚框架之倖独立运行附加服务。兞型案䟋是 pendo 插件

XiaoQing 䞻进皋
├── 正垞消息倄理流皋Dispatcher → Plugin
└── pendo 插件main.py
        └── 插件初始化或 /pendo web start
                └── FastAPI Web Serveruvicorn
                        ├── /api/*  # REST APIJWT 鉎权、CRUD、统计、Bundle、widget
                        └── /*      # 静态 SPA 文件

特点

  • Web Server 圚独立后台线皋䞭运行䞍阻塞消息倄理
  • 插件初始化䌚尝试自劚启劚也可以通过 /pendo web start 手劚重试通过 /pendo web stop 关闭
  • 应甚退出、插件卞蜜或 Ctrl+C 时䌚先请求 Pendo Web 䌘雅停止再枅理数据库和运行时状态
  • 支持通过 nginx 圚子路埄劂 /pendo/䞋反向代理访问
  • Pendo Web 䞎聊倩呜什共甚 plugins/pendo/services/db.py、utils/validators.py 和事件囟/提醒服务避免 Web 侎 CLI 各自绎技䞀套字段语义

及䞀类独立服务是 codex 插件的后台队列。它䞍䜿甚 SessionManager 捕获甚户后续消息而是圚插件内郚绎技 label -> session/thread/queue

  • /codex create <label> [cwd:<path>] 创建䞚务䌚话标筟
  • /codex <label> <任务> 将任务攟入该标筟队列handler 立即返回“已收到”
  • 同䞀标筟内任务䞲行执行䞍同标筟受 max_parallel_jobs 限制并行执行
  • 任务完成后通过 context.send_action() 䞻劚发送文字和囟片结果底层仍走统䞀 OneBot 发送铟路
  • 䌚话玢匕保存圚 plugins/codex/data/sessions.json每䞪标筟的记圕、囟片和任务 artifacts 保存圚 plugins/codex/data/session/<label>/删陀䌚话时旧目圕䌚園档到 plugins/codex/data/deleted_sessions/

arxiv_filter 的每日摘芁就是这䞪暡匏的䞚务化甚法筛选插件先返回论文列衚再通过后台䟧路把所有 positive arXiv 铟接投递到 Codex astro-ph 䌚话。astro-ph 銖次没有 Codex thread 时䌚先执行静默初始化任务之后摘芁任务倍甚同䞀 thread 和工䜜目圕䞭的 arxiv-summary-methodology.md。

这种方匏适合耗时蟃长䜆䞍应占甚 bot 倚蜮䌚话的后台工䜜。


➡ 䞋䞀步 ​

基于 MIT 讞可发垃

加蜜䞭...