一、ZCM通讯简介
ZCM全称为Zero Communications and Marshalling,是一个开源的轻量级消息传递与数据序列化框架。它源自于2006年MIT的DARPA城市挑战团队的LCM项目,并在此基础上进行了优化和创新。ZCM以其独特的设计理念和广泛的应用场景,正在成为跨平台消息传递与数据处理的新宠。
1. ZCM特性
- 发布/订阅模式与数据编组的无缝集成:
- ZCM的核心在于其发布/订阅模式与数据编组的无缝集成。通过这种模式,不同系统或进程可以在无需直接连接的情况下,通过消息队列进行高效的数据交换。
- 跨平台与多语言支持:
- ZCM支持多种编程语言的绑定,包括C/C++、Java、MATLAB、Node.js/JavaScript、Python和Julia等。这使得开发者可以在不同的语言环境下进行无缝的消息交换。
- ZCM全面支持Linux、Web环境以及各类嵌入式系统,为开发者提供了极大的灵活性和适应性。
- 高效的通信与数据序列化:
- ZCM自动完成类型检查和序列化,确保数据在不同语言环境下的准确性和一致性。
- 通过定义通用的阻塞与非阻塞传输API,ZCM实现了对任何传输层的友好兼容,从POSIX操作系统上的大规模计算集群到无OS的实时嵌入式系统,都能得到很好的支持。
- 广泛的应用场景:
- 在机器人系统中,ZCM能够确保在实时环境中实现高效的数据流转。
- 在跨进程通信(IPC)中,ZCM简化了本地系统间的数据交互。
- ZCM还适用于多线程应用,提供稳定的内部通讯机制。
- 在嵌入式开发中,ZCM的C89兼容性满足严格资源限制的设备需求。
- 安全性与兼容性:
- ZCM提供了类型安全和版本安全的序列化,确保消息交流的准确无误。
- 它还提供了丰富的工具集,包括日志记录、回放、实时消息监控等,以提升开发效率。
- 嵌入式友好与现代C++支撑:
- ZCM的核心代码兼容C89,适用于广泛的嵌入式环境。
- 对于非嵌入式部分,仅需现代C++11编译器即可支持。
2. 特定领域的应用
在智能汽车领域,ZCM也被用作自研智舱计算平台的一部分。通过深度自研的底层软件,ZCM打通了智能车与IoT设备的底层通讯协议,实现了设备无感、可靠的连接。这种技术被称为“生态域”技术,它能够让不同品牌、不同型号的设备自由接入智能汽车系统。
二、zcm编译及DEMO演示
1.使用git 将zcm代码拉取到Ubuntu上
git clone https://github.com/ZeroCM/zcm.git
2.进入到zcm目录中
cd zcm
3.安装编译依赖的环境,仅限ubuntu
./scripts/install-deps.sh
4.因ZCM使用waf构建编译系统,提供了配置,编译跟安装命令,可以输入--help查看命令参数
./waf --help
5.如果不依赖java,使用IPC通讯跑一下例程,可以使用如下配置,配置依赖选项及传输层选项。
./waf configure --use-ipc --use-zmq
./waf configure --use-ipc --use-inproc --use-serial --use-zmq --use-elf
6.配好了zcm编译参数,使用build编译
sudo ./waf build
7.安装编译好的zcm环境
sudo ./waf install
8.编译一个demo,用来测试
a.配置生效demo环境参数
source ./examples/env
b.对demo进行编译配置
./waf configure
c.编译
./waf build_examples
d.运行一个sub或者pub
./build/examples/examples/cpp/sub
e.打开另外一个终端,生效demo环境
source ./examples/env
f.新终端运行sub或者pub
./build/examples/examples/cpp/pub
运行结果
注意:如果在终端中没有生效demo环境,会报错,如下所以
三、zcm在嵌入式MCU上的移植
ZCM以其对嵌入式应用的也是支持的。这种支持使得嵌入式系统能够通过自定义的嵌入式传输协议,构建并发送zcmtypes到其他使用ZCM的系统。可以应用在多种嵌入式芯片及系统中,只需要支持标准的C整数类型。
1. 将ZCM添加到项目中
默认情况下,ZCM的Linux构建会收集与嵌入式相关的源代码,检查其是否符合C89标准,并创建build/zcm/zcm-embed.tar.gz。使用ZCM只需将其解压到项目的源代码树中。我们建议使用zcm子目录来存放所有zcm库代码。为了生成zcmtypes,您需要安装标准的Linux ZCM。只需使用zcm-gen并将生成的文件复制到项目的源代码树中。通过一些脚本,大多数嵌入式环境都可以配置为使用zcm-gen作为构建工具,并且所有的类型生成都可以实现自动化。
在嵌入式ZCM中,没有内置的传输协议。要在嵌入式应用中使用ZCM,您必须实现一个使用平台硬件原语的非阻塞传输。此外,传输注册器和URL系统被禁用。要创建一个zcm_t对象,您必须使用zcm_create_from_trans()。
四、SOC与MCU上zcm通讯demo演示
若需要跑serial程序demo,需要切换到root用户,不然打开/dev/ttyusb*权限不够,报错。运行./build/examples/examples/cpp/serial "serial:///dev/ttyUSB0?baud=115200",就能通过uart进行数据发布及订阅了,UART设备端口跟波特率根据实际情况来定。
使用逻辑分析抓取数据,能看到如下数据,1s发送一次数据
数据协议与嵌入式定义一样
五、嵌入式代码阅读
ZCM通过trans创建一个zcm对象
zcm_t *zcm_create_from_trans(zcm_trans_t *zt) {zcm_t *z = NULL;ZCM_ASSERT(zcm_try_create_from_trans(&z, zt) == ZCM_EOK);ZCM_ASSERT(z);return z;
}
int zcm_try_create_from_trans(zcm_t **z, zcm_trans_t *zt) {//申请ZCM对象空间*z = malloc(sizeof(zcm_t));if (!*z)return ZCM_EMEMORY;//初始化ZCM对象int ret = zcm_init_from_trans(*z, zt);if (ret != ZCM_EOK) {free(*z);*z = NULL;return ret;}return ZCM_EOK;
}
对于嵌入式MCU系统创建非阻塞传输(无法使用非阻塞)
int zcm_init_from_trans(zcm_t *zcm, zcm_trans_t *zt) {/* Check for valid transport */if (zt == NULL) {zcm->impl = NULL;return ZCM_EINVALID;}int ret = ZCM_EUNKNOWN;switch (zt->trans_type) {case ZCM_BLOCKING:zcm->impl = NULL;return ZCM_EINVALID;case ZCM_NONBLOCKING:zcm->type = ZCM_NONBLOCKING;ret = zcm_nonblocking_try_create((zcm_nonblocking_t **) &zcm->impl, zcm, zt);break;}/* Just a safeguard in case impl is accidentally notinitialized to NULL on any failures */if (ret != ZCM_EOK)zcm->impl = NULL;return ret;
}
struct zcm_nonblocking {zcm_t * z;zcm_trans_t *zt;/* TODO speed this up */zcm_sub_t subs[ZCM_NONBLOCK_SUBS_MAX];bool subInUse[ZCM_NONBLOCK_SUBS_MAX];bool subIsRegex[ZCM_NONBLOCK_SUBS_MAX];size_t subInUseEnd;
};
int zcm_nonblocking_try_create(zcm_nonblocking_t **zcm, zcm_t *z, zcm_trans_t *zt) {if (z->type != ZCM_NONBLOCKING)return ZCM_EINVALID;//创建一个非阻塞实例*zcm = malloc(sizeof(zcm_nonblocking_t));(*zcm)->z = z;(*zcm)->zt = zt;for (size_t i = 0; i < ZCM_NONBLOCK_SUBS_MAX; ++i)(*zcm)->subInUse[i] = false;(*zcm)->subInUseEnd = 0;return ZCM_EOK;
}
int zcm_publish(zcm_t *zcm, const char *channel, const uint8_t *data, uint32_t len) {int ret = ZCM_EUNKNOWN;ZCM_ASSERT(zcm->type == ZCM_NONBLOCKING);ret = zcm_nonblocking_publish(zcm->impl, channel, data, len);return ret;
}
此函数在线程中调用,用于处理发送接收及分发数据,对数据回调进行处理
void zcm_flush(zcm_t *zcm) {ZCM_ASSERT(zcm->type == ZCM_NONBLOCKING);zcm_nonblocking_flush(zcm->impl);
}
订阅一个主题
zcm_sub_t *zcm_subscribe(zcm_t *zcm, const char *channel, zcm_msg_handler_t cb, void *usr) {zcm_sub_t *ret = NULL;ZCM_ASSERT(zcm->type == ZCM_NONBLOCKING);ret = zcm_nonblocking_subscribe(zcm->impl, channel, cb, usr);return ret;
}
void zcm_nonblocking_flush(zcm_nonblocking_t *zcm) {/* Call twice because we need to make sure publish and subscribe are both handled */zcm_trans_update(zcm->zt);zcm_trans_update(zcm->zt);zcm_msg_t msg;while (zcm_trans_recvmsg(zcm->zt, &msg, 0) == ZCM_EOK)dispatch_message(zcm, &msg);
}
static void dispatch_message(zcm_nonblocking_t *zcm, zcm_msg_t *msg) {zcm_recv_buf_t rbuf;zcm_sub_t * sub;size_t i;for (i = 0; i < zcm->subInUseEnd; ++i) {if (!zcm->subInUse[i])continue;bool shouldDispatch = false;if (zcm->subIsRegex[i]) {/* This only works because isSupportedRegex() is called on subscribe */if (strlen(msg->channel) > 2 && strncmp(zcm->subs[i].channel, msg->channel, strlen(zcm->subs[i].channel) - 2) == 0) {shouldDispatch = true;}} else {if (strncmp(zcm->subs[i].channel, msg->channel, ZCM_CHANNEL_MAXLEN) == 0) {shouldDispatch = true;}}//回调订阅的回调函数if (shouldDispatch) {rbuf.zcm = zcm->z;rbuf.data = msg->buf;rbuf.data_size = msg->len;rbuf.recv_utime = msg->utime;sub = &zcm->subs[i];sub->callback(&rbuf, msg->channel, sub->usr);}}
}
zcm更新函数中主要是进行数据发送及接收解析并执行对应的回调函数
static INLINE int zcm_trans_update(zcm_trans_t *zt) {return zt->vtbl->update(zt);
}static int _serial_update(zcm_trans_t *zt) {int rxRet = serial_update_rx(zt);int txRet = serial_update_tx(zt);return rxRet == ZCM_EOK ? txRet : rxRet;
}int serial_update_rx(zcm_trans_t *_zt) {zcm_trans_generic_serial_t *zt = cast(_zt);cb_flush_in(&zt->recvBuffer, zt->get, zt->put_get_usr);return ZCM_EOK;
}int serial_update_tx(zcm_trans_t *_zt) {zcm_trans_generic_serial_t *zt = cast(_zt);cb_flush_out(&zt->sendBuffer, zt->put, zt->put_get_usr);return ZCM_EOK;
}#define MIN(a, b) (((a) < (b)) ? (a) : (b))
size_t cb_flush_out(circBuffer_t *cb, size_t (*write)(const uint8_t *data, size_t num, void *usr), void *usr) {size_t written = 0;size_t n;size_t sz = cb_size(cb);if (sz == 0)return 0;size_t contiguous = MIN(cb->capacity - cb->front, sz);size_t wrapped = sz - contiguous;n = write(cb->data + cb->front, contiguous, usr);written += n;cb_pop_front(cb, n);// If we failed to write everything we tried to write, or if there's nothing// left to write, return.if (written != contiguous || wrapped == 0)return written;n = write(cb->data, wrapped, usr);written += n;cb_pop_front(cb, n);return written;
}size_t cb_flush_in(circBuffer_t *cb, size_t (*read)(uint8_t *data, size_t num, void *usr), void *usr) {size_t bytesRead = 0;size_t n;size_t room = cb_room(cb);// Find out how much room is left between back and end of buffer or back and front// of buffer. Because we already know there's room for whatever we're about to place,// if back < front, we can just read in every byte starting at "back".if (cb->back < cb->front) {bytesRead += read(cb->data + cb->back, room, usr);cb->back += bytesRead;return bytesRead;}// Otherwise, we need to be a bit more careful about overflowing the back of the buffer.size_t contiguous = MIN(cb->capacity - cb->back, room);size_t wrapped = room - contiguous;n = read(cb->data + cb->back, contiguous, usr);ASSERT((n <= contiguous) && "cb_flush_in 1");bytesRead += n;cb->back += n;if (n != contiguous)return bytesRead; // back could NOT have hit BUFFER_SIZE in this case// may need to wrap back here (if bytes >= BUFFER_SIZE - cb->back) but not otherwiseASSERT((cb->back <= cb->capacity) && "cb_flush_in 2");if (cb->back == cb->capacity)cb->back = 0;if (wrapped == 0)return bytesRead;n = read(cb->data, wrapped, usr);ASSERT((n <= wrapped) && "cb_flush_in 3");bytesRead += n;cb->back += n;return bytesRead;
}
#undef MIN
六、总结
ZCM不仅是一个技术框架,更是面向未来技术架构的思考与实践。无论是复杂的机器人控制、微妙的Web应用集成,还是嵌入式世界的深度探索,ZCM都提供了强大而灵活的解决方案。它的出现降低了通讯复杂性的门槛,让开发者能够更专注于业务逻辑本身。通过ZCM,我们可以共同探索更多可能性,推动技术的不断进步和发展