-
Notifications
You must be signed in to change notification settings - Fork 0
/
zmqprotoSocket.cxx
141 lines (118 loc) · 3.25 KB
/
zmqprotoSocket.cxx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#include <iostream>
#include <sstream>
#include "zmqprotoContext.h"
#include "zmqprotoSocket.h"
using namespace std;
zmqprotoSocket::zmqprotoSocket (void* fContext, const string& type, int num) :
fBytesTx(0),
fBytesRx(0),
fMessagesTx(0),
fMessagesRx(0)
{
stringstream id;
id << type << "." << num;
fId = id.str();
fSocket = zmq_socket(fContext, GetConstant(type));
int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length());
if (rc != 0) {
cout << "failed setting socket option, reason: " << zmq_strerror(errno) << endl;
}
if (type == "sub") {
rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0);
if (rc != 0) {
cout << "failed setting socket option, reason: " << zmq_strerror(errno) << endl;
}
}
cout << "created socket #" << fId << endl;
}
unsigned long zmqprotoSocket::GetBytesTx()
{
return fBytesTx;
}
unsigned long zmqprotoSocket::GetBytesRx()
{
return fBytesRx;
}
unsigned long zmqprotoSocket::GetMessagesTx()
{
return fMessagesTx;
}
unsigned long zmqprotoSocket::GetMessagesRx()
{
return fMessagesRx;
}
void zmqprotoSocket::Close()
{
if (fSocket == NULL){
return;
}
int rc = zmq_close (fSocket);
if (rc != 0) {
cout << "failed closing socket, reason: " << zmq_strerror(errno) << endl;
}
fSocket = NULL;
}
void* zmqprotoSocket::GetSocket()
{
return fSocket;
}
size_t zmqprotoSocket::Send(zmq_msg_t *msg, const string& flag)
{
int nbytes = zmq_msg_send (msg, fSocket, GetConstant(flag));
if (nbytes >= 0){
fBytesTx += nbytes;
++fMessagesTx;
return nbytes;
}
if (zmq_errno() == EAGAIN){
return false;
}
cout << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno) << endl;
return nbytes;
}
size_t zmqprotoSocket::Receive(zmq_msg_t *msg, const string& flag)
{
int nbytes = zmq_msg_recv (msg, fSocket, GetConstant(flag));
if (nbytes >= 0){
fBytesRx += nbytes;
++fMessagesRx;
return nbytes;
}
if (zmq_errno() == EAGAIN){
return false;
}
cout << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno) << endl;
return nbytes;
}
void zmqprotoSocket::Bind(const string& address)
{
// cout << "bind socket " << fId << " on " << address << endl;
int rc = zmq_bind (fSocket, address.c_str());
if (rc != 0) {
cout << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno) << endl;
}
}
void zmqprotoSocket::Connect(const string& address)
{
// cout << "connect socket #" << fId << " on " << address << endl;
int rc = zmq_connect (fSocket, address.c_str());
if (rc != 0) {
cout << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno) << endl;
}
}
int zmqprotoSocket::GetConstant(const string& constant)
{
if (constant == "") return 0;
if (constant == "sub") return ZMQ_SUB;
if (constant == "pub") return ZMQ_PUB;
if (constant == "xsub") return ZMQ_XSUB;
if (constant == "xpub") return ZMQ_XPUB;
if (constant == "push") return ZMQ_PUSH;
if (constant == "pull") return ZMQ_PULL;
if (constant == "snd-hwm") return ZMQ_SNDHWM;
if (constant == "rcv-hwm") return ZMQ_RCVHWM;
if (constant == "snd-more") return ZMQ_SNDMORE;
if (constant == "rcv-more") return ZMQ_RCVMORE;
if (constant == "no-block") return ZMQ_NOBLOCK;
return -1;
}