Commit 52f2b135 authored by Jordan Sissel's avatar Jordan Sissel

Fix pushpull example

parent 85db39e7
......@@ -6,7 +6,8 @@
#include <stdlib.h>
#include <unistd.h>
#define ENDPOINT "inproc://foo"
//#define ENDPOINT "tcp://127.0.0.1:12345"
#define ENDPOINT "inproc://asdf"
void free2(void *data, void __attribute__((unused)) *hint) {
free(data);
......@@ -15,7 +16,7 @@ void free2(void *data, void __attribute__((unused)) *hint) {
void *pusher(void *zmq) {
void *socket = zmq_socket(zmq, ZMQ_PUSH);
int rc;
int hwm = 1;
int64_t hwm = 1;
zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
while (rc = zmq_connect(socket, ENDPOINT), rc != 0) {
......@@ -33,9 +34,14 @@ void *pusher(void *zmq) {
void *puller(void *zmq) {
void *socket = zmq_socket(zmq, ZMQ_PULL);
int hwm = 1;
int64_t hwm = 1;
int rc;
zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
zmq_bind(socket, ENDPOINT);
rc = zmq_bind(socket, ENDPOINT);
if (rc != 0) {
printf("bind(%s) failed: %s\n", ENDPOINT, zmq_strerror(errno));
abort();
}
for (;;) {
zmq_msg_t msg;
......@@ -46,7 +52,6 @@ void *puller(void *zmq) {
}
int main(int argc, char **argv) {
pthread_t p;
void *zmq = zmq_init(0); /* inproc only, no threads needed */
if (argc != 2) {
......@@ -54,16 +59,16 @@ int main(int argc, char **argv) {
return 1;
}
pthread_create(&p, NULL, puller, zmq);
int i = 0;
int threads = atoi(argv[1]);
pthread_t p;
pthread_create(&p, NULL, puller, zmq);
/* Create pusher threads, thread count comes from command args */
for (i = 0; i < atoi(argv[1]); i++) {
for (i = 0; i < threads; i++) {
pthread_t *pushthread = calloc(1, sizeof(pthread_t));
pthread_create(pushthread, NULL, pusher, zmq);
}
pthread_join(p, NULL);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment