当前位置: 首页 > news >正文

【从零开始一步步学习VSOA开发】发布订阅重连时同步

发布订阅重连时同步

概念

数据同步是指在数据发布与订阅场景中,当客户端因故障断开重连后,需要立即获取当前最新数据的需求。
如前面开发示例中的 axis_server 陀螺仪服务,若产生故障断开重连,则需要客户端上线后立即获取 /axis 的最新状态,以保证数据的一致性。通常情况下,客户端会在断开重连后,主动发起一次 RPC 请求以获取数据的最新状态,但如果需要获取的数据量较大,则会给代码编程带来更多的复杂性。此时,可以使用客户端机器人带有的自动数据同步接口进行处理。

程序源码

发布订阅重连时同步需要服务端和客户端都进行支持。服务端需要增加一个 RPC 服务,用于返回最后一次发布数据。客户端需要调用自动数据同步接口 vsoa_client_auto_consistent以便重连后能及时请求最新数据。

修改后的服务端源码如下,注意 RPC 服务的添加,同时为了测试效果明显,将发布周期由 1 秒改为了 10 秒。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_platform.h"
#include "vsoa_server.h"#define MY_SERVER_ADDR                      "0.0.0.0"
#define MY_SERVER_PORT                      (4002)
#define MY_SERVER_NAME                      "{\"name\":\"axis_server\"}"
#define MY_SERVER_PASSWD                    "123456"#define AXIS_SER_BUF_LEN                    100static int roll = 1, pitch = 1, yaw = 1;static void *publish_axis_thread (void *arg)
{vsoa_url_t url;vsoa_payload_t payload;vsoa_server_t *server = arg;char param[AXIS_SER_BUF_LEN + 1];url.url     = "/axis";url.url_len = strlen(url.url);payload.data = NULL;payload.data_len  = 0;payload.param = param;roll  = 1;pitch = 1;yaw   = 1;while (TRUE) {sleep(10);if (!vsoa_server_is_subscribed(server, &url)) {continue;}payload.param_len = snprintf(param, AXIS_SER_BUF_LEN,"{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",roll++, pitch++, yaw++);printf("publish:%s\n", param);vsoa_server_publish(server, &url, &payload);}return (NULL);
}static void command_axis (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,vsoa_header_t *vsoa_hdr, vsoa_url_t *url,vsoa_payload_t *payload)
{vsoa_payload_t send;char param[100];uint32_t  seqno = vsoa_parser_get_seqno(vsoa_hdr);sprintf(param, "{\"roll\": %d, \"pitch\": %d, \"yaw\": %d}",roll, pitch, yaw);send.data = NULL;send.data_len = 0;send.param = param;send.param_len = strlen(send.param);vsoa_server_cli_reply(server, cid, 0, seqno, 0, &send);
}int main (int argc, char **argv)
{vsoa_server_t *server;/** 创建服务端*/server = vsoa_server_create(MY_SERVER_NAME);if (!server) {fprintf(stderr, "Can not create VSOA server!\n");return  (-1);}/** 设置密码,设置为NULL,表示密码为空,客户端可以不输入密码*/vsoa_server_passwd(server, MY_SERVER_PASSWD);vsoa_url_t url;url.url     = "/axis";url.url_len = strlen(url.url);vsoa_server_add_listener(server, &url, command_axis, NULL);/** 启动微服务*/struct sockaddr_in addr;bzero(&addr, sizeof(struct sockaddr_in));addr.sin_family      = AF_INET;addr.sin_port        = htons(MY_SERVER_PORT);addr.sin_addr.s_addr = inet_addr(MY_SERVER_ADDR);addr.sin_len         = sizeof(struct sockaddr_in);if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {vsoa_server_close(server);fprintf(stderr, "Can not start VSOA server!\n");return  (-1);}/** Create publish thread*/pthread_t pub_threadid;pthread_create(&pub_threadid, NULL, publish_axis_thread, server);/** 进入监听事件循环*/while (1) {int     cnt;int     max_fd;fd_set  fds;struct timespec timeout = {1, 0 };FD_ZERO(&fds);max_fd = vsoa_server_fds(server, &fds);cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);if (cnt > 0) {vsoa_server_input_fds(server, &fds);}}return (0);
}

修改后的客户端源码如下,注意第 31 行。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include "vsoa_client.h"
#include "vsoa_cliauto.h"#define MY_SERVER_PASSWD                    "123456"static void onmessage (void *arg, struct vsoa_client *client, vsoa_url_t *url, vsoa_payload_t *payload, bool quick)
{printf("subscribe message, url:%.*s, quick:%s\n",(int)url->url_len, url->url,quick ? "ture":"false");printf("subscribe message, param:%.*s, data:%.*s\n",(int)payload->param_len, payload->param,(int)payload->data_len, (char *)payload->data);
}int main (int argc, char **argv)
{vsoa_client_auto_t *cliauto;static char *sub_urls[] = { "/axis" };/** 创建客户端机器人*/cliauto = vsoa_client_auto_create(onmessage, NULL);vsoa_client_auto_consistent(cliauto, sub_urls, 1, 1000);/** 启动客户端机器人127.0.0.1:4001  vsoa://axis_server*/vsoa_client_auto_start(cliauto, "vsoa://axis_server", MY_SERVER_PASSWD, sub_urls, 1, 1000, 1000, 1000);while (true) {sleep(1);}
}

执行效果

先启动服务端再启动客户端,在客户端刚收到某次发布消息后立即“ctrl+c”退出客户端并重新执行,这个过程不会超过 1 秒,案例约 10 秒后才能收到下一次发布消息,但因为同步功能的原因会立即得到一次数据。
服务端执行效果:
image.png
客户端执行效果:
image.png

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 代码随想录算法训练营Day26 | Leetcode 455 分发饼干 Leetcode 376 摆动序列 Leetcode 53 最大子序和
  • 【CSharp】简单定义一个异步方法
  • python连接MySQL数据库使用pymysql
  • 嵌入式day25
  • SAP与网易大数据系统集成案例
  • C++实现单例模式/工厂模式
  • 质量管理理论(至简)
  • Latex或者word里面mathtype类型的数学公式如何变成mathematica里面的形式
  • 学习笔记--算法(双指针)7
  • 控制反转(IOC)VS 依赖注入(DI)
  • Go 语言常量 6
  • 反射---Java
  • 达梦数据库的系统视图v$sql_stat
  • Element-UI自学实践
  • 【数据库】MySql深度分页SQL查询优化
  • CSS选择器——伪元素选择器之处理父元素高度及外边距溢出
  • ES6核心特性
  • Git 使用集
  • If…else
  • JavaScript-Array类型
  • Magento 1.x 中文订单打印乱码
  • magento 货币换算
  • miniui datagrid 的客户端分页解决方案 - CS结合
  • Nginx 通过 Lua + Redis 实现动态封禁 IP
  • Node项目之评分系统(二)- 数据库设计
  • Python学习笔记 字符串拼接
  • React 快速上手 - 07 前端路由 react-router
  • Vue ES6 Jade Scss Webpack Gulp
  • Webpack 4 学习01(基础配置)
  • 阿里云购买磁盘后挂载
  • 理解IaaS, PaaS, SaaS等云模型 (Cloud Models)
  • 容器服务kubernetes弹性伸缩高级用法
  • 如何将自己的网站分享到QQ空间,微信,微博等等
  • 世界上最简单的无等待算法(getAndIncrement)
  • 跳前端坑前,先看看这个!!
  • 应用生命周期终极 DevOps 工具包
  • 原生 js 实现移动端 Touch 滑动反弹
  • # 数仓建模:如何构建主题宽表模型?
  • #1014 : Trie树
  • #git 撤消对文件的更改
  • $LayoutParams cannot be cast to android.widget.RelativeLayout$LayoutParams
  • (2015)JS ES6 必知的十个 特性
  • (3)nginx 配置(nginx.conf)
  • (day6) 319. 灯泡开关
  • (PADS学习)第二章:原理图绘制 第一部分
  • (二)Linux——Linux常用指令
  • (七)MySQL是如何将LRU链表的使用性能优化到极致的?
  • (三)模仿学习-Action数据的模仿
  • (一)Spring Cloud 直击微服务作用、架构应用、hystrix降级
  • (源码分析)springsecurity认证授权
  • (转)树状数组
  • (转)重识new
  • (自用)learnOpenGL学习总结-高级OpenGL-抗锯齿
  • .360、.halo勒索病毒的最新威胁:如何恢复您的数据?
  • .NET CORE 第一节 创建基本的 asp.net core