在游戏服务端开发中,消息队列是 Actor 模型的核心组件。本文介绍 kapi 框架中消息队列的设计与实现,包括双缓冲设计、overload 保护机制,以及线程安全的考量。

一、为什么需要消息队列?

在 Actor 模型中,每个 Actor(服务)是独立的处理单元,通过消息进行通信:

Actor A --[消息]--> Actor B --[消息]--> Actor C

消息队列的作用:

  1. 解耦:生产者和消费者不需要同步等待
  2. 缓冲:削峰填谷,应对消息突发
  3. 顺序:保证同一 Actor 的消息按顺序处理
  4. 隔离:消息处理在 Actor 内部,避免共享状态

二、设计目标

在设计消息队列时,我们考虑以下几点:

目标 说明
低延迟 消息从生产到消费的延迟要小
高吞吐 支持高并发消息投递
线程安全 多生产者单消费者模型
overload 保护 防止消息堆积导致内存溢出
可监控 提供统计信息,便于排查问题

三、核心设计:双缓冲队列

3.1 传统队列的问题

传统单队列在多线程环境下的瓶颈:

生产者1 ──┐
生产者2 ──┼──> [锁竞争] ──> 单队列 ──> [锁竞争] ──> 消费者
生产者3 ──┘

问题:

  • 生产者和消费者竞争同一把锁
  • 高并发时锁竞争严重,性能下降
3.2 双缓冲设计

参考 Moon 的设计,使用双缓冲减少锁竞争:

生产者 ──> [写队列] ──┐
                     │ swap
消费者 <── [读队列] <─┘

核心思想:

  • 写队列:生产者 push 到写队列
  • 读队列:消费者从读队列读取
  • swap:消费者读取时,交换两个队列

优势:

  • swap 操作极快(指针交换)
  • 消费者处理消息时,不需要持锁
  • 锁竞争时间极短
3.3 代码实现
class MessageQueue {
public:
    explicit MessageQueue(size_t overload_threshold = 1024)
        : overload_threshold_(overload_threshold)
        , overload_(false)
    {}

    // 生产者调用
    size_t push(MessagePtr msg) {
        std::lock_guard<std::mutex> lock(mutex_);
        write_queue_.push_back(std::move(msg));
        return write_queue_.size();
    }

    // 消费者调用
    std::vector<MessagePtr> swap_on_read() {
        std::vector<MessagePtr> result;
        {
            std::lock_guard<std::mutex> lock(mutex_);
            
            // 交换读写队列
            write_queue_.swap(read_queue_);
            
            // overload 检测
            if (read_queue_.size() > overload_threshold_) {
                overload_ = true;
                overload_count_++;
            } else {
                overload_ = false;
            }
            
            // 将读队列内容移到结果
            result.reserve(read_queue_.size());
            while (!read_queue_.empty()) {
                result.push_back(std::move(read_queue_.front()));
                read_queue_.pop_front();
            }
        }
        return result;
    }

private:
    mutable std::mutex mutex_;
    std::deque<MessagePtr> write_queue_;  // 写队列
    std::deque<MessagePtr> read_queue_;   // 读队列
    size_t overload_threshold_;
    bool overload_;
};

四、Overload 保护机制

4.1 问题背景

在高并发场景下,如果消息生产速度 > 消费速度,消息会堆积:

消息堆积 -> 内存增长 -> OOM -> 服务崩溃

参考 Skynet 的设计,我们需要 overload 保护。

4.2 实现方案
  1. 阈值检测:设置 overload 阈值(如 1024 条消息)
  2. 告警标记:超过阈值时标记 overload 状态
  3. 统计信息:记录 overload 次数,便于监控
class MessageQueue {
public:
    bool is_overload() const { return overload_; }
    
    QueueStats stats() const {
        QueueStats s;
        s.size = size();
        s.total_pushed = total_pushed_;
        s.total_popped = total_popped_;
        s.overload_count = overload_count_;
        return s;
    }

private:
    size_t overload_threshold_;
    bool overload_;
    std::atomic<uint64_t> overload_count_;
};
4.3 Overload 处理策略

检测到 overload 后,可以采取以下策略:

策略 说明
丢弃非关键消息 如日志、心跳等
降级处理 减少处理逻辑,快速消费
告警通知 发送告警,人工介入
限流 限制上游发送速率

五、Worker 如何使用消息队列

在 kapi 中,每个 Worker 线程管理多个 Service:

class Worker {
public:
    void send_message(MessagePtr msg) {
        // 投递到消息队列
        mq_.push(msg);
        
        // 通知 io_context 处理
        asio::post(io_ctx_, [this]() {
            process_messages();
        });
    }

private:
    void process_messages() {
        // swap 获取所有消息
        auto messages = mq_.swap_on_read();
        
        // 逐条派发
        for (auto& msg : messages) {
            dispatch_message(std::move(msg));
        }
    }
    
    void dispatch_message(MessagePtr msg) {
        // 找到目标服务
        auto* service = find_service(msg->destination());
        if (service) {
            service->dispatch(std::move(msg));
        }
    }

private:
    asio::io_context io_ctx_;
    MessageQueue mq_;
    std::unordered_map<ServiceId, ServicePtr> services_;
};

流程图:

外部消息 ──> mq_.push(msg)
                      │
                      ▼
           asio::post(io_ctx_, process_messages)
                      │
                      ▼
              mq_.swap_on_read()
                      │
                      ▼
              for msg in messages:
                  service->dispatch(msg)

六、性能测试

6.1 测试场景
  • 生产者线程数:4
  • 每个生产者发送:1000 条消息
  • 总消息数:4000 条
TEST(test_concurrent_access) {
    MessageQueue mq;
    const int producer_count = 4;
    const int messages_per_producer = 1000;
    
    std::vector<std::thread> producers;
    
    for (int p = 0; p < producer_count; ++p) {
        producers.emplace_back([&mq, p, messages_per_producer]() {
            for (int i = 0; i < messages_per_producer; ++i) {
                mq.push(std::make_shared<Message>(...));
            }
        });
    }
    
    for (auto& t : producers) {
        t.join();
    }
    
    auto messages = mq.swap_on_read();
    ASSERT(messages.size() == producer_count * messages_per_producer);
}
6.2 测试结果
=== MessageQueue Unit Tests ===

Testing: test_basic_push_pop...
  PASSED
Testing: test_batch_push...
  PASSED
Testing: test_overload_detection...
  PASSED
Testing: test_concurrent_access...
  PASSED
Testing: test_empty_queue...
  PASSED
Testing: test_stats...
  PASSED

=== Results ===
Passed: 6, Failed: 0

七、与其他实现对比

特性 kapi Skynet Moon
双缓冲
overload 保护
统计信息
语言 C++17 C C++17
依赖 asio asio

设计取舍

  1. 双缓冲 vs 单队列:双缓冲减少锁竞争,但多一次 swap 开销
  2. 阈值检测 vs 动态适应:固定阈值简单可靠,动态适应更灵活但复杂
  3. deque vs ring buffer:deque 内存更灵活,ring buffer 性能更稳定

八、总结

本文介绍了 kapi 框架中消息队列的设计与实现:

  1. 双缓冲设计:减少锁竞争,提高吞吐
  2. overload 保护:防止消息堆积,保证系统稳定
  3. 线程安全:多生产者单消费者模型
  4. 可监控:统计信息便于问题排查

消息队列是 Actor 模型的基础,好的设计能让服务端在高并发场景下保持稳定。希望本文对你有所帮助。

九、参考

  1. Skynet - 云风的 Actor 框架
  2. Moon - 基于 ASIO 的 Actor 框架
  3. kapi 项目源码