增城市网站建设,物联网的应用,阿里云 网站建设,wordpress 个人简洁redis发布订阅是一种消息通知模式#xff0c;发布者发送消息#xff0c;订阅者接收消息。角色说明发布者 (Publisher)向频道发送消息的客户端订阅者 (Subscriber)订阅频道接收消息的客户端频道 (Channel)消息传递的管道/主题基本命令1. 订阅频道#订阅一个或多个频道
SUBSCRIB…redis发布订阅是一种消息通知模式发布者发送消息订阅者接收消息。角色说明发布者 (Publisher)向频道发送消息的客户端订阅者 (Subscriber)订阅频道接收消息的客户端频道 (Channel)消息传递的管道/主题基本命令1. 订阅频道#订阅一个或多个频道 SUBSCRIBE 频道1 频道2 #使用模式匹配订阅 PSUBSCRIBE news.* # 订阅所有以 news. 开头的频道2. 取消订阅# 退订指定频道 UNSUBSCRIBE channel1 # 退订模式匹配的频道 PUNSUBSCRIBE news.*3. 发布消息# 向指定频道发布消息 PUBLISH channel1 Hello, World!4. 查看信息# 查看活跃的频道 PUBSUB CHANNELS [pattern] # 查看频道的订阅者数量 PUBSUB NUMSUB channel1 # 查看模式订阅的数量 PUBSUB NUMPAT 代码示例步骤连接到Redis服务器。bool Redis::connect() { // 负责publish发布消息的上下文连接 _publish_context redisConnect(127.0.0.1, 6379); if (nullptr _publish_context) { cerr connect redis failed! endl; return false; } // 负责subscribe订阅消息的上下文连接 _subcribe_context redisConnect(127.0.0.1, 6379); if (nullptr _subcribe_context) { cerr connect redis failed! endl; return false; } // 在单独的线程中监听通道上的事件有消息给业务层进行上报 thread t([]() { observer_channel_message(); }); t.detach(); cout connect redis-server success! endl; return true; }订阅者订阅一个或多个频道。bool Redis::subscribe(int channel) { // SUBSCRIBE命令本身会造成线程阻塞等待通道里面发生消息这里只做订阅通道不接收通道消息 // 通道消息的接收专门在observer_channel_message函数中的独立线程中进行 // 只负责发送命令不阻塞接收redis server响应消息否则和notifyMsg线程抢占响应资源 if (REDIS_ERR redisAppendCommand(this-_subcribe_context, SUBSCRIBE %d, channel)) { cerr subscribe command failed! endl; return false; } // redisBufferWrite可以循环发送缓冲区直到缓冲区数据发送完毕done被置为1 int done 0; while (!done) { if (REDIS_ERR redisBufferWrite(this-_subcribe_context, done)) { cerr subscribe command failed! endl; return false; } } // redisGetReply return true; }发布者向频道发布消息。bool Redis::publish(int channel, string message) { redisReply *reply (redisReply *)redisCommand(_publish_context, PUBLISH %d %s, channel, message.c_str()); if (nullptr reply) { cerr publish command failed! endl; return false; } freeReplyObject(reply); return true; }订阅者接收并处理消息。void Redis::observer_channel_message() { redisReply *reply nullptr; while (REDIS_OK redisGetReply(this-_subcribe_context, (void **)reply)) { // 订阅收到的消息是一个带三元素的数组 if (reply ! nullptr reply-element[2] ! nullptr reply-element[2]-str ! nullptr) { // 给业务层上报通道上发生的消息 _notify_message_handler(atoi(reply-element[1]-str) , reply-element[2]-str); } freeReplyObject(reply); } cerr observer_channel_message quit endl; }Redis 发布订阅原理深度解析️Redis 发布订阅架构总览┌─────────────────────────────────────────────────────────┐ │ Redis Server │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ Pub/Sub 子系统 │ │ │ │ │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ Channel │ │ Pattern │ │ Clients │ │ │ │ │ │ Hash Table │ │ List │ │ List │ │ │ │ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │ │ │ │ │ │ │ │ │ └────────┼───────────────┼───────────────┼────────┘ │ │ │ │ │ │ │ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │ │ │ 普通订阅 │ │ 模式订阅 │ │ 客户端状态 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────┘核心数据结构1.频道订阅表// Redis 源码中的数据结构简化版 typedef struct redisServer { // 普通频道订阅字典键是频道名值是客户端链表 dict *pubsub_channels; // Mapchannel_name, Listclient // 模式订阅链表每个节点包含模式和客户端链表 list *pubsub_patterns; // List{pattern, Listclient} // ... 其他字段 } redisServer;2.客户端订阅状态typedef struct client { // 客户端订阅的频道列表 dict *pubsub_channels; // Setchannel_name // 客户端订阅的模式列表 list *pubsub_patterns; // Listpattern // ... 其他字段 } client;消息传递流程发布消息的完整流程1. 客户端发送: PUBLISH channel message ↓ 2. Redis 接收命令并解析 ↓ 3. 查找 channel 的订阅者 ├── 3.1 在 pubsub_channels 字典中查找 channel ├── 3.2 获取该 channel 的客户端链表 └── 3.3 遍历链表向每个客户端发送消息 ↓ 4. 查找模式匹配的订阅者 ├── 4.1 遍历 pubsub_patterns 链表 ├── 4.2 对每个 pattern检查是否匹配 channel └── 4.3 向匹配的客户端发送消息 ↓ 5. 返回接收消息的客户端数量源码级解析基于 Redis 6.x// pubsub.c - 发布消息的核心函数 void publishCommand(client *c) { int receivers pubsubPublishMessage(c-argv[1], c-argv[2]); addReplyLongLong(c, receivers); } int pubsubPublishMessage(robj *channel, robj *message) { int receivers 0; // 1. 向普通频道订阅者发送 dictEntry *de dictFind(server.pubsub_channels, channel); if (de) { list *list dictGetVal(de); receivers dictSize(list); // 遍历所有订阅该频道的客户端 dictIterator *di dictGetIterator(list); dictEntry *de; while ((de dictNext(di)) ! NULL) { client *c dictGetKey(de); addReplyPubsubMessage(c, channel, message); } dictReleaseIterator(di); } // 2. 向模式订阅者发送 if (listLength(server.pubsub_patterns)) { listIter li; listNode *ln; listRewind(server.pubsub_patterns, li); while ((ln listNext(li)) ! NULL) { pubsubPattern *pat ln-value; // 检查频道是否匹配模式 if (stringmatchlen(pat-pattern-ptr, sdslen(pat-pattern-ptr), channel-ptr, sdslen(channel-ptr),0)) { addReplyPubsubPatMessage(pat-client, pat-pattern, channel, message); receivers; } } } return receivers; }订阅机制详解1. 普通订阅流程// 客户端订阅频道的处理 void subscribeCommand(client *c) { for (int j 1; j c-argc; j) { pubsubSubscribeChannel(c, c-argv[j]); } } void pubsubSubscribeChannel(client *c, robj *channel) { // 1. 将频道添加到客户端的订阅集合 if (dictAdd(c-pubsub_channels, channel, NULL) DICT_OK) { incrRefCount(channel); // 2. 将客户端添加到服务器的频道订阅列表 dictEntry *de dictFind(server.pubsub_channels, channel); if (de NULL) { // 频道不存在创建新的客户端列表 clients dictCreate(pubsubDictType); dictAdd(server.pubsub_channels, channel, clients); incrRefCount(channel); } else { clients dictGetVal(de); } // 3. 将客户端添加到频道的订阅者列表 dictAdd(clients, c, NULL); } }2. 模式订阅流程// 客户端模式订阅的处理 void psubscribeCommand(client *c) { for (int j 1; j c-argc; j) { pubsubSubscribePattern(c, c-argv[j]); } } void pubsubSubscribePattern(client *c, robj *pattern) { // 1. 检查客户端是否已经订阅了此模式 listIter li; listNode *ln; listRewind(c-pubsub_patterns, li); while ((ln listNext(li)) ! NULL) { pubsubPattern *pat ln-value; if (equalStringObjects(pattern, pat-pattern)) { return; // 已订阅 } } // 2. 创建新的模式订阅节点 pubsubPattern *pat; pat zmalloc(sizeof(*pat)); pat-pattern getDecodedObject(pattern); pat-client c; // 3. 添加到客户端的模式列表 listAddNodeTail(c-pubsub_patterns, pat); // 4. 添加到服务器的模式列表 listAddNodeTail(server.pubsub_patterns, pat); }内存数据结构图示普通订阅数据结构server.pubsub_channels (字典) | ├── news → dict(clients) │ ├── client1 │ ├── client2 │ └── client3 │ ├── sports → dict(clients) │ ├── client1 │ └── client4 │ └── weather → dict(clients) └── client2 client.pubsub_channels (字典客户端视角) | ├── client1: {news, sports} ├── client2: {news, weather} ├── client3: {news} └── client4: {sports}模式订阅数据结构server.pubsub_patterns (链表) | ├── node1: {patternnews.*, clientclient1} ├── node2: {pattern*.sports, clientclient2} ├── node3: {patternweather.*, clientclient3} └── node4: {patternnews.*, clientclient4} client.pubsub_patterns (链表客户端视角) | ├── client1: [news.*] ├── client2: [*.sports] ├── client3: [weather.*] └── client4: [news.*]高性能设计要点1. 零拷贝消息传递// Redis 使用引用计数避免消息内容的复制 void addReplyPubsubMessage(client *c, robj *channel, robj *message) { // 增加引用计数而不是复制消息内容 incrRefCount(channel); incrRefCount(message); // 构建响应但不复制数据 addReply(c, shared.mbulkhdr[3]); addReply(c, shared.messagebulk); addReplyBulk(c, channel); addReplyBulk(c, message); // 减少引用计数 decrRefCount(channel); decrRefCount(message); }2. 批量消息发送优化// 当多个客户端订阅同一频道时优化网络发送 for (每个订阅者客户端) { if (clientHasPendingReplies(c)) { // 如果客户端输出缓冲区已有数据先发送 writeToClient(c, 0); } // 添加新消息到缓冲区 addReplyPubsubMessage(c, channel, message); // 非阻塞发送尝试 if (clientHasPendingReplies(c)) { writeToClient(c, 0); } }内存与性能统计PUBSUB 命令实现// PUBSUB CHANNELS [pattern] - 获取活跃频道 void pubsubChannelsCommand(client *c) { dictIterator *di; dictEntry *de; long mblen 0; void *replylen NULL; // 遍历所有频道 di dictGetIterator(server.pubsub_channels); while((de dictNext(di)) ! NULL) { robj *cobj dictGetKey(de); char *channel cobj-ptr; // 如果有模式参数进行匹配 if (pattern !stringmatchlen(pattern, patlen, channel, clen, 0)) continue; addReplyBulk(c, cobj); } dictReleaseIterator(di); } // PUBSUB NUMSUB [channel ...] - 获取订阅者数量 // PUBSUB NUMPAT - 获取模式订阅数量⚠️重要限制与边界条件1. 消息不持久化// Redis 不会将发布的消息写入磁盘 // 消息只在内存中传递 // 服务器重启或崩溃会导致消息丢失2. 无消息确认机制发布者 → Redis → 订阅者 发送 接收 ↓ ↓ 无确认 ←───── 无ACK3. 客户端缓冲区限制// 每个客户端有输出缓冲区限制 #define PROTO_REPLY_CHUNK_BYTES (16*1024) // 16KB 块 // 如果订阅者处理速度慢缓冲区可能满 // Redis 会断开慢客户端连接 if (client-bufpos client-buf_soft_limit) { freeClient(client); }4. 订阅数量限制// 理论上无限制但受内存限制 // 每个订阅在内存中占用 // - 频道名共享字符串对象 // - 客户端指针8字节 // - 字典条目开销~24字节与 Redis Streams 的对比特性Pub/SubStreams消息持久化❌ 不持久化✅ 可持久化消费者组❌ 不支持✅ 支持消息确认❌ 不支持✅ 支持历史消息❌ 不保留✅ 可回溯性能⭐⭐⭐⭐⭐ 极高⭐⭐⭐⭐ 高内存使用临时内存持久内存设计哲学1. 简单性优于复杂性// Redis 选择简单的广播模型而不是复杂的消息队列 // 设计目标高吞吐量、低延迟2. 内存速度优先// 所有操作在内存中完成 // 使用高效的数据结构字典、链表 // 避免磁盘I/O和复杂的事务3. 客户端驱动模型// Redis 不主动推送而是响应客户端命令 // 但 Pub/Sub 是例外服务器主动向订阅者推送