当前位置: 首页> 科技> 数码 > 【从零开始一步步学习VSOA开发】VSOA数据流

【从零开始一步步学习VSOA开发】VSOA数据流

时间:2025/9/18 8:23:58来源:https://blog.csdn.net/ScilogyHunter/article/details/141052477 浏览次数:0次

VSOA数据流

概念

实际业务中常常存在既有实时命令通信,又有非实时的大数据通信,如文件、音视频传输服务等,如果使用常规的 RPC 或订阅/发布功能来实现,将实时命令和大数据传输混在一起,则会影响 RPC 通道响应的实时性,尤其在需要 RPC 实时控制的应用中就显得极不合理;另一种方式是针对此类大数据的每一个通信,均额外实现对应的服务,但这会给开发者带来额外的编程复杂度,同时增加整个系统的资源开销。

VSOA 中提供了全双工高速并行数据流,称为 “VSOA Stream”。它允许程序进行长时间、大量的数据传输,同时可保证 RPC 通道的实时响应。

VSOA Stream 是一个在已有服务链路上建立的传输隧道,具有以下功能特点:

  • 独立性:VSOA Stream 不会影响命令通道的数据交互,即不影响 RPC 的实时性。
  • 临时性:VSOA Stream 按需创建,在数据传输完成后可随时销毁,不持续占用连接资源。
  • 用户无感:VSOA Stream 的传输隧道在 VSOA 内部自动协商建立,用户无需关心创建隧道的细节(例如无需提前约定好一个确定的通信参数),从而避免复杂的设计实现。
  • 全双工:客户端和服务端均可进行读写操作,读写可并行进行。

要使用 VSOA Stream,服务端和客户端需要分别创建 Server Stream 和 Client Stream,创建的流程如下:

  1. 客户端向服务端请求 RPC 服务。
  2. 服务端收到 RPC 请求后,创建 VSOA Server Stream,并获得通道 ID (tunid)。
  3. 服务端将通道 ID 返回客户端。
  4. 服务端等待客户端连接Server Stream。
  5. 客户端收到 RPC 应答,应答中包含 stream 通道 ID ,更具 ID 创建 VSOA Client Stream。
  6. 服务端收到客户端 TCP 连接请求,然后通过 TCP 连接发送数据,关闭服务端Stream。
  7. 客户端接收 TCP 发送内容,删除客户端Stream。

服务端开发

基于原 echo RPC 服务,增加 read RPC 服务。read RPC 服务收到请求后创建server stream,并将 ID号给客户端,然后在新建线程中等到客户端连接server stream 的 TCP 服务端。TCP 服务端收到连接后,发送数据,然后关闭server stream。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "vsoa_platform.h"
#include "vsoa_server.h"#define MY_SERVER_ADDR                      "0.0.0.0"
#define MY_SERVER_PORT                      (4001)
#define MY_SERVER_NAME                      "{\"name\":\"echo_server\"}"
#define MY_SERVER_PASSWD                    "123456"static void *thread_write_stream(void *arg)
{vsoa_server_stream_t   *stream = (vsoa_server_stream_t *)arg;int                     client;uint8_t data[] = "hello vsoa stream";/**等待tcp客户端连接*/client = vsoa_server_stream_accept(stream, NULL, NULL, 0);/** tcp 发送*/send(client, data, sizeof(data), 0);/** 关闭stream*/vsoa_server_stream_close(stream);return NULL;
}static void command_read (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{struct sockaddr_in addr_in;uint32_t           seqno = vsoa_parser_get_seqno(vsoa_hdr);socklen_t          namelen = sizeof(addr_in);pthread_t               thread;vsoa_server_stream_t   *stream;stream = vsoa_server_stream_create(server);pthread_create(&thread, NULL, thread_write_stream, stream);vsoa_server_cli_address(server, cid, (struct sockaddr *)&addr_in, &namelen);printf("read client %d addr is %s:%d, url:%.*s\n", cid, inet_ntoa(addr_in.sin_addr),ntohs(addr_in.sin_port),(int)url->url_len, url->url);vsoa_server_cli_reply(server, cid, 0, seqno, stream->tunid, NULL);
}static void command_echo (void *arg, vsoa_server_t *server, vsoa_cli_id_t cid,vsoa_header_t *vsoa_hdr, vsoa_url_t *url, vsoa_payload_t *payload)
{struct sockaddr_in addr_in;socklen_t          namelen = sizeof(addr_in);vsoa_server_cli_address(server, cid, (struct sockaddr *)&addr_in, &namelen);printf("echo client %d addr is %s:%d\n", cid, inet_ntoa(addr_in.sin_addr), ntohs(addr_in.sin_port));printf("echo message, url:%.*s, param:%.*s, data:%.*s\n",(int)url->url_len, url->url,(int)payload->param_len, payload->param,(int)payload->data_len, (char *)payload->data);vsoa_server_cli_reply(server, cid, 0, vsoa_parser_get_seqno(vsoa_hdr), 0, payload);
}int main (int argc, char **argv)
{vsoa_server_t *server;/** 创建服务端*/server = vsoa_server_create(MY_SERVER_NAME);if (!server) {fprintf(stderr, "Can not create VSOA server!\n");return  (-1);}/** 设置密码,设置为NULL,表示密码为空,客户端可以不输入密码*/vsoa_server_passwd(server, MY_SERVER_PASSWD);/** 创建RPC服务*/vsoa_url_t url;url.url     = "/echo";url.url_len = strlen(url.url);vsoa_server_add_listener(server, &url, command_echo, NULL);url.url     = "/read";url.url_len = strlen(url.url);vsoa_server_add_listener(server, &url, command_read, NULL);/** 启动微服务*/struct sockaddr_in addr;bzero(&addr, sizeof(struct sockaddr_in));addr.sin_family      = AF_INET;addr.sin_port        = htons(MY_SERVER_PORT);addr.sin_addr.s_addr = inet_addr(MY_SERVER_ADDR);addr.sin_len         = sizeof(struct sockaddr_in);if (!vsoa_server_start(server, (struct sockaddr *)&addr, sizeof(struct sockaddr_in))) {vsoa_server_close(server);fprintf(stderr, "Can not start VSOA server!\n");return  (-1);}/** 进入监听事件循环*/while (1) {int     cnt;int     max_fd;fd_set  fds;struct timespec timeout = {1, 0 };FD_ZERO(&fds);max_fd = vsoa_server_fds(server, &fds);cnt = pselect(max_fd + 1, &fds, NULL, NULL, &timeout, NULL);if (cnt > 0) {vsoa_server_input_fds(server, &fds);}}return (0);
}

客户端开发

基于原 echo RPC 客户端,将 echo 服务改为 read 服务。每秒发送一次 RPC 请求,请求应答中带有server stream 的 ID号,创建client stream 会自动连接服务端建立 TCP 连接,然后通过 TCP 接收数据,最后关闭client stream 。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include "vsoa_client.h"
#include "vsoa_cliauto.h"#define MY_SERVER_PASSWD                    "123456"static void on_command_read (void *arg, struct vsoa_client *client, vsoa_header_t *vsoa_hdr, vsoa_payload_t *payload)
{int             tunid;struct timespec timeout = {1,0};int             stream;int             len;uint8_t         buffer[32];tunid = vsoa_parser_get_tunid(vsoa_hdr);stream = vsoa_client_stream_create(client, tunid, &timeout, 0);len = recv(stream, buffer, sizeof(buffer), 0);if (len > 0) {printf("Received %d bytes from VSOA stream: %s\n", len, buffer);}vsoa_client_stream_close(stream);
}int main (int argc, char **argv)
{vsoa_client_t *client;vsoa_client_auto_t *cliauto;/** 创建客户端机器人*/cliauto = vsoa_client_auto_create(NULL, NULL);/** 由客户端机器人获取客户端对象*/client  = vsoa_client_auto_handle(cliauto);/** 启动客户端机器人*/vsoa_client_auto_start(cliauto, "vsoa://echo_server", MY_SERVER_PASSWD, NULL, 0, 1000, 1000, 1000);while (true) {/** 检查客户端是否正常链接到服务端*/if (vsoa_client_is_connect(client) == false) {continue;}/** 注册异步RPC请求*/vsoa_url_t url;url.url     = "/read";url.url_len = strlen(url.url);vsoa_client_call(client, VSOA_CLIENT_RPC_METHOD_GET, &url, NULL, on_command_read, NULL, NULL);sleep(1);}
}

运行效果

这里同时开启 4 个 telnet 命令窗口,分别执行 4 个程序。执行命令及执行效果如下:
命令行 1,启动 server_stream 服务端:
image.png
命令行 2,启动位置服务:
image.png
命令行 3,启动之前的 client_auto 程序,每秒请求一次 echo RPC 服务:
image.png
命令行 4,启动 client_stream 程序,每秒请求一次 read RPC 服务:
image.png

关键字:【从零开始一步步学习VSOA开发】VSOA数据流

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: