增城市网站建设物联网的应用

张小明 2026/1/9 6:00:18
增城市网站建设,物联网的应用,阿里云 网站建设,wordpress 个人简洁redis发布订阅是一种消息通知模式#xff0c;发布者发送消息#xff0c;订阅者接收消息。角色说明发布者 (Publisher)向频道发送消息的客户端订阅者 (Subscriber)订阅频道接收消息的客户端频道 (Channel)消息传递的管道/主题基本命令1. 订阅频道#订阅一个或多个频道 SUBSCRIB…redis发布订阅是一种消息通知模式发布者发送消息订阅者接收消息。角色说明发布者 (Publisher)向频道发送消息的客户端订阅者 (Subscriber)订阅频道接收消息的客户端频道 (Channel)消息传递的管道/主题基本命令1. 订阅频道#订阅一个或多个频道 SUBSCRIBE 频道1 频道2 #使用模式匹配订阅 PSUBSCRIBE news.* # 订阅所有以 news. 开头的频道2. 取消订阅# 退订指定频道 UNSUBSCRIBE channel1 # 退订模式匹配的频道 PUNSUBSCRIBE news.*3. 发布消息# 向指定频道发布消息 PUBLISH channel1 Hello, World!4. 查看信息# 查看活跃的频道 PUBSUB CHANNELS [pattern] # 查看频道的订阅者数量 PUBSUB NUMSUB channel1 # 查看模式订阅的数量 PUBSUB NUMPAT 代码示例步骤连接到Redis服务器。bool Redis::connect() { // 负责publish发布消息的上下文连接 _publish_context redisConnect(127.0.0.1, 6379); if (nullptr _publish_context) { cerr connect redis failed! endl; return false; } // 负责subscribe订阅消息的上下文连接 _subcribe_context redisConnect(127.0.0.1, 6379); if (nullptr _subcribe_context) { cerr connect redis failed! endl; return false; } // 在单独的线程中监听通道上的事件有消息给业务层进行上报 thread t([]() { observer_channel_message(); }); t.detach(); cout connect redis-server success! endl; return true; }订阅者订阅一个或多个频道。bool Redis::subscribe(int channel) { // SUBSCRIBE命令本身会造成线程阻塞等待通道里面发生消息这里只做订阅通道不接收通道消息 // 通道消息的接收专门在observer_channel_message函数中的独立线程中进行 // 只负责发送命令不阻塞接收redis server响应消息否则和notifyMsg线程抢占响应资源 if (REDIS_ERR redisAppendCommand(this-_subcribe_context, SUBSCRIBE %d, channel)) { cerr subscribe command failed! endl; return false; } // redisBufferWrite可以循环发送缓冲区直到缓冲区数据发送完毕done被置为1 int done 0; while (!done) { if (REDIS_ERR redisBufferWrite(this-_subcribe_context, done)) { cerr subscribe command failed! endl; return false; } } // redisGetReply return true; }发布者向频道发布消息。bool Redis::publish(int channel, string message) { redisReply *reply (redisReply *)redisCommand(_publish_context, PUBLISH %d %s, channel, message.c_str()); if (nullptr reply) { cerr publish command failed! endl; return false; } freeReplyObject(reply); return true; }订阅者接收并处理消息。void Redis::observer_channel_message() { redisReply *reply nullptr; while (REDIS_OK redisGetReply(this-_subcribe_context, (void **)reply)) { // 订阅收到的消息是一个带三元素的数组 if (reply ! nullptr reply-element[2] ! nullptr reply-element[2]-str ! nullptr) { // 给业务层上报通道上发生的消息 _notify_message_handler(atoi(reply-element[1]-str) , reply-element[2]-str); } freeReplyObject(reply); } cerr observer_channel_message quit endl; }Redis 发布订阅原理深度解析️Redis 发布订阅架构总览┌─────────────────────────────────────────────────────────┐ │ Redis Server │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ Pub/Sub 子系统 │ │ │ │ │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ Channel │ │ Pattern │ │ Clients │ │ │ │ │ │ Hash Table │ │ List │ │ List │ │ │ │ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │ │ │ │ │ │ │ │ │ └────────┼───────────────┼───────────────┼────────┘ │ │ │ │ │ │ │ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │ │ │ 普通订阅 │ │ 模式订阅 │ │ 客户端状态 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────┘核心数据结构1.频道订阅表// Redis 源码中的数据结构简化版 typedef struct redisServer { // 普通频道订阅字典键是频道名值是客户端链表 dict *pubsub_channels; // Mapchannel_name, Listclient // 模式订阅链表每个节点包含模式和客户端链表 list *pubsub_patterns; // List{pattern, Listclient} // ... 其他字段 } redisServer;2.客户端订阅状态typedef struct client { // 客户端订阅的频道列表 dict *pubsub_channels; // Setchannel_name // 客户端订阅的模式列表 list *pubsub_patterns; // Listpattern // ... 其他字段 } client;消息传递流程发布消息的完整流程1. 客户端发送: PUBLISH channel message ↓ 2. Redis 接收命令并解析 ↓ 3. 查找 channel 的订阅者 ├── 3.1 在 pubsub_channels 字典中查找 channel ├── 3.2 获取该 channel 的客户端链表 └── 3.3 遍历链表向每个客户端发送消息 ↓ 4. 查找模式匹配的订阅者 ├── 4.1 遍历 pubsub_patterns 链表 ├── 4.2 对每个 pattern检查是否匹配 channel └── 4.3 向匹配的客户端发送消息 ↓ 5. 返回接收消息的客户端数量源码级解析基于 Redis 6.x// pubsub.c - 发布消息的核心函数 void publishCommand(client *c) { int receivers pubsubPublishMessage(c-argv[1], c-argv[2]); addReplyLongLong(c, receivers); } int pubsubPublishMessage(robj *channel, robj *message) { int receivers 0; // 1. 向普通频道订阅者发送 dictEntry *de dictFind(server.pubsub_channels, channel); if (de) { list *list dictGetVal(de); receivers dictSize(list); // 遍历所有订阅该频道的客户端 dictIterator *di dictGetIterator(list); dictEntry *de; while ((de dictNext(di)) ! NULL) { client *c dictGetKey(de); addReplyPubsubMessage(c, channel, message); } dictReleaseIterator(di); } // 2. 向模式订阅者发送 if (listLength(server.pubsub_patterns)) { listIter li; listNode *ln; listRewind(server.pubsub_patterns, li); while ((ln listNext(li)) ! NULL) { pubsubPattern *pat ln-value; // 检查频道是否匹配模式 if (stringmatchlen(pat-pattern-ptr, sdslen(pat-pattern-ptr), channel-ptr, sdslen(channel-ptr),0)) { addReplyPubsubPatMessage(pat-client, pat-pattern, channel, message); receivers; } } } return receivers; }订阅机制详解1. 普通订阅流程// 客户端订阅频道的处理 void subscribeCommand(client *c) { for (int j 1; j c-argc; j) { pubsubSubscribeChannel(c, c-argv[j]); } } void pubsubSubscribeChannel(client *c, robj *channel) { // 1. 将频道添加到客户端的订阅集合 if (dictAdd(c-pubsub_channels, channel, NULL) DICT_OK) { incrRefCount(channel); // 2. 将客户端添加到服务器的频道订阅列表 dictEntry *de dictFind(server.pubsub_channels, channel); if (de NULL) { // 频道不存在创建新的客户端列表 clients dictCreate(pubsubDictType); dictAdd(server.pubsub_channels, channel, clients); incrRefCount(channel); } else { clients dictGetVal(de); } // 3. 将客户端添加到频道的订阅者列表 dictAdd(clients, c, NULL); } }2. 模式订阅流程// 客户端模式订阅的处理 void psubscribeCommand(client *c) { for (int j 1; j c-argc; j) { pubsubSubscribePattern(c, c-argv[j]); } } void pubsubSubscribePattern(client *c, robj *pattern) { // 1. 检查客户端是否已经订阅了此模式 listIter li; listNode *ln; listRewind(c-pubsub_patterns, li); while ((ln listNext(li)) ! NULL) { pubsubPattern *pat ln-value; if (equalStringObjects(pattern, pat-pattern)) { return; // 已订阅 } } // 2. 创建新的模式订阅节点 pubsubPattern *pat; pat zmalloc(sizeof(*pat)); pat-pattern getDecodedObject(pattern); pat-client c; // 3. 添加到客户端的模式列表 listAddNodeTail(c-pubsub_patterns, pat); // 4. 添加到服务器的模式列表 listAddNodeTail(server.pubsub_patterns, pat); }内存数据结构图示普通订阅数据结构server.pubsub_channels (字典) | ├── news → dict(clients) │ ├── client1 │ ├── client2 │ └── client3 │ ├── sports → dict(clients) │ ├── client1 │ └── client4 │ └── weather → dict(clients) └── client2 client.pubsub_channels (字典客户端视角) | ├── client1: {news, sports} ├── client2: {news, weather} ├── client3: {news} └── client4: {sports}模式订阅数据结构server.pubsub_patterns (链表) | ├── node1: {patternnews.*, clientclient1} ├── node2: {pattern*.sports, clientclient2} ├── node3: {patternweather.*, clientclient3} └── node4: {patternnews.*, clientclient4} client.pubsub_patterns (链表客户端视角) | ├── client1: [news.*] ├── client2: [*.sports] ├── client3: [weather.*] └── client4: [news.*]高性能设计要点1. 零拷贝消息传递// Redis 使用引用计数避免消息内容的复制 void addReplyPubsubMessage(client *c, robj *channel, robj *message) { // 增加引用计数而不是复制消息内容 incrRefCount(channel); incrRefCount(message); // 构建响应但不复制数据 addReply(c, shared.mbulkhdr[3]); addReply(c, shared.messagebulk); addReplyBulk(c, channel); addReplyBulk(c, message); // 减少引用计数 decrRefCount(channel); decrRefCount(message); }2. 批量消息发送优化// 当多个客户端订阅同一频道时优化网络发送 for (每个订阅者客户端) { if (clientHasPendingReplies(c)) { // 如果客户端输出缓冲区已有数据先发送 writeToClient(c, 0); } // 添加新消息到缓冲区 addReplyPubsubMessage(c, channel, message); // 非阻塞发送尝试 if (clientHasPendingReplies(c)) { writeToClient(c, 0); } }内存与性能统计PUBSUB 命令实现// PUBSUB CHANNELS [pattern] - 获取活跃频道 void pubsubChannelsCommand(client *c) { dictIterator *di; dictEntry *de; long mblen 0; void *replylen NULL; // 遍历所有频道 di dictGetIterator(server.pubsub_channels); while((de dictNext(di)) ! NULL) { robj *cobj dictGetKey(de); char *channel cobj-ptr; // 如果有模式参数进行匹配 if (pattern !stringmatchlen(pattern, patlen, channel, clen, 0)) continue; addReplyBulk(c, cobj); } dictReleaseIterator(di); } // PUBSUB NUMSUB [channel ...] - 获取订阅者数量 // PUBSUB NUMPAT - 获取模式订阅数量⚠️重要限制与边界条件1. 消息不持久化// Redis 不会将发布的消息写入磁盘 // 消息只在内存中传递 // 服务器重启或崩溃会导致消息丢失2. 无消息确认机制发布者 → Redis → 订阅者 发送 接收 ↓ ↓ 无确认 ←───── 无ACK3. 客户端缓冲区限制// 每个客户端有输出缓冲区限制 #define PROTO_REPLY_CHUNK_BYTES (16*1024) // 16KB 块 // 如果订阅者处理速度慢缓冲区可能满 // Redis 会断开慢客户端连接 if (client-bufpos client-buf_soft_limit) { freeClient(client); }4. 订阅数量限制// 理论上无限制但受内存限制 // 每个订阅在内存中占用 // - 频道名共享字符串对象 // - 客户端指针8字节 // - 字典条目开销~24字节与 Redis Streams 的对比特性Pub/SubStreams消息持久化❌ 不持久化✅ 可持久化消费者组❌ 不支持✅ 支持消息确认❌ 不支持✅ 支持历史消息❌ 不保留✅ 可回溯性能⭐⭐⭐⭐⭐ 极高⭐⭐⭐⭐ 高内存使用临时内存持久内存设计哲学1. 简单性优于复杂性// Redis 选择简单的广播模型而不是复杂的消息队列 // 设计目标高吞吐量、低延迟2. 内存速度优先// 所有操作在内存中完成 // 使用高效的数据结构字典、链表 // 避免磁盘I/O和复杂的事务3. 客户端驱动模型// Redis 不主动推送而是响应客户端命令 // 但 Pub/Sub 是例外服务器主动向订阅者推送
版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

视频网站亏损也做百度搜索指数和资讯指数

BotW存档管理器:跨平台游戏进度迁移解决方案 【免费下载链接】BotW-Save-Manager BOTW Save Manager for Switch and Wii U 项目地址: https://gitcode.com/gh_mirrors/bo/BotW-Save-Manager 《塞尔达传说:旷野之息》作为任天堂的经典作品&#x…

张小明 2026/1/9 4:02:42 网站建设

用angularjs做的网站百度推广代理商有哪些

目录具体实现截图项目介绍论文大纲核心代码部分展示可定制开发之亮点部门介绍结论源码获取详细视频演示 :文章底部获取博主联系方式!同行可合作具体实现截图 本系统(程序源码数据库调试部署讲解)同时还支持Python(flask,django)、…

张小明 2026/1/7 15:42:23 网站建设

织梦模板网站好吗多少钱可以做网站

极速上手CreamApi:3步搞定游戏DLC自动解锁难题 【免费下载链接】CreamApi 项目地址: https://gitcode.com/gh_mirrors/cr/CreamApi 还在为心仪游戏的付费DLC望而却步?CreamApi作为一款专业的DLC自动解锁器配置工具,能够帮助普通玩家轻…

张小明 2026/1/7 15:10:38 网站建设

网站建设都有那些费用中国建设银行报网站

基于QT开发的ImageViewer轻量级图片查看器 前言: 学习QT的时候,简单实现了一个好玩的图片查看器,这个是基于QT开发的,无边框、可拖动、支持全屏以及本地目录加载。方便学习一些事件的简单处理. 一、环境要求 Qt 5.15c11windows上的vs2022 二、项目的学习点 Qt资…

张小明 2026/1/6 0:19:24 网站建设

建设部网站监理工程师公司网站开发费用入什么科目

第一章:Dify React 19.2.3 安全更新概述Dify React 19.2.3 是一次专注于安全加固与依赖项清理的补丁版本,主要修复了多个潜在的安全漏洞,并提升了前端运行时的稳定性。此次更新适用于所有使用 Dify 框架构建的 React 应用,尤其是在…

张小明 2026/1/7 0:42:56 网站建设

秦皇岛学网站建设扬州高邮网站建设

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 开发一个极简的Qt打包教学工具,要求:1. 提供step-by-step向导界面;2. 自动检测系统Qt环境;3. 内置简单的Qt示例项目;4. 可…

张小明 2026/1/6 0:18:19 网站建设