ZMQ测试代码样例
背景
有部分网友提到上篇文章ZeroMQ性能分析,没有提供测试代码,想要自己验证测试结果;本篇文章,主要介绍代码层面,如何使用ZeroMQ。测试样例有python版本、C/C++版本。
生产-消费者模型
根据官方介绍,ZeroMQ是支持多语言,也就是说,它支持在多种不同语言的进程之间实现通讯。我们想要验证的场景是,生产者-消费者模型,一个生产者,多个消费者,ZeroMQ是否支持这种模型、数据共享性能如何,这也是常见的编程模型。
这个图的表达有一定的误差,例如生产者是进程,将任务队列替换成ZeroMQ,并且ZeroMQ内部也没有队列,图仅可用于示意。从尽快出验证结果的角度来说,Python语言最简单。
Python验证代码
Python语言客户端代码如下:
import zmq import time context = zmq.Context() socket = context.socket(zmq.PULL) socket.connect("tcp://localhost:5555") mark = "hello" test = str() for i in range(1024*1024*10): test = test + mark test = test.encode("utf-8") start = time.time() while True: response = socket.recv() #print("response: %s" % response) test = "I am test client No.2 recv data " + str(response) #socket.send(test % socket) end = time.time() print(test)
Python语言服务端代码如下:
import zmq import time context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:5555") mark = "hello" test = str() for i in range(1024*1024*10): test = test + mark test = test.encode("utf-8") index = 0 while True: test = "Hello I am index " + str(index) test = test.encode("utf-8") print(test) socket.send(test) #message = socket.recv() print("send test data %",index) index += 1 time.sleep(0.5) #socket.send("I am OK!".encode("utf-8"))
这段代码,主要验证ZeroMQ官方提供的分布式用法,一个生产者、多个消费者,竞争消费生产者发送的消息,毫无疑问使用起来没啥问题。只不过,在使用时要安装pip install zmq;要使用python3执行。
在这种场景下,用ptyhon 在同机上使用ZMQ共享10MB数据,耗时达到15ms以上,耗时比较高。
C/C++验证代码
根据以往的经验,C语言的性能会比python好很多,根据官方资料继续使用C语言验证ZeroMQ在同机不同进程之间的数据共享速度
C语言客户端代码如下:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <assert.h> #include <zmq.h> #include <sys/time.h> #define TEST_LEN 30 * 1024 * 1024 uint64_t GetTickCount() { struct timeval tv; gettimeofday(&tv, NULL); return tv.tv_sec*1000000 + tv.tv_usec; // us //return tv.tv_sec * 1000 + tv.tv_usec / 1000; // ms } int main (void) { void *context = zmq_ctx_new(); void *responder = zmq_socket (context, ZMQ_REQ); int rc = zmq_connect(responder, "ipc://test.ipc"); printf("rc :%d\n", rc); char tmp[1024] = {0}; char *buff = (char*)malloc(TEST_LEN); char *buff2 = (char*)malloc(TEST_LEN); memset(buff, 1, TEST_LEN); zmq_send(responder, buff, TEST_LEN, 0); uint64_t send_ts = 0; uint64_t send_over_ts = 0; int index = 1; while(1) { memset(buff, index, TEST_LEN); send_ts = GetTickCount(); zmq_send(responder, buff, TEST_LEN, 0); send_over_ts = GetTickCount(); printf("zmq ts diff :%d \n", send_over_ts - send_ts); int len = zmq_recv(responder, tmp, 1024,0); printf("recv data :%s \n", tmp); usleep(1000); // sleep(1); index++; } free(buff); return 0; }
C语言服务端代码如下:
#include <stdio.h> #include <unistd.h> #include <string.h> #include <assert.h> #include <zmq.h> #define MAX_LEN 100 * 1024 * 1024 char buffer[MAX_LEN]; int main (void) { /* Socket to talk to clients */ void *context = zmq_ctx_new (); void *responder = zmq_socket (context, ZMQ_REP); int rc = zmq_bind (responder, "ipc://test.ipc"); assert (rc == 0); while(1) { int len = zmq_recv (responder, buffer, MAX_LEN, 0); printf ("Received data len:%d \n", len); //usleep (1); /*Do some 'work'*/ zmq_send (responder, "World", 5, 0); } return 0; }
使用C语言时,需要通过yum 安装 zmq开发包;在编译的时候,要指定zmq动态库。 gcc -o test_client test_client.cpp -lzmq 。
从测试结果来看,使用C语言以后,共享10MB的数据,平均耗时1.38ms。
验证结论
- 使用C/C++版本相比python而言,性能提升了10倍以上。这也间接说明,要想高性能还是需要使用C/C++ 这种编译型语言。
- ZerMQ这种分布式用法,非常适合和生产者-消费者编程模型结合,间接帮助开发者实现了线程竞争、线程间负载平衡,是一个开发利器。
- 作为一名开发者,希望大家积极留言,交流开发经验心得,实现技术能力的彼此提升。
接下来将要分享后台协议设计的一些关键点,关注个人公众号,获取最新消息。