Skip to content

Commit

Permalink
[syncd][sairedis] Change pub/sub model to push/pull in zmq notificati…
Browse files Browse the repository at this point in the history
…on (sonic-net#695)

* [syncd][sairedis] Change pub/sub model to push/pull in zmq notification

* [tests] Add zmq push/pull notification unittests

* [tests] Reorder libs in makefile

* [tests] Update libs dir in Makefile.am
  • Loading branch information
kcudnik authored Nov 7, 2020
1 parent 584ce03 commit 74eb14e
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 17 deletions.
47 changes: 33 additions & 14 deletions lib/src/ZeroMQChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "swss/select.h"

#include <zmq.h>
#include <unistd.h>

using namespace sairedis;

Expand Down Expand Up @@ -55,11 +56,11 @@ ZeroMQChannel::ZeroMQChannel(

m_ntfContext = zmq_ctx_new();

m_ntfSocket = zmq_socket(m_ntfContext, ZMQ_SUB);
m_ntfSocket = zmq_socket(m_ntfContext, ZMQ_PULL);

SWSS_LOG_NOTICE("opening zmq ntf endpoint: %s", ntfEndpoint.c_str());

rc = zmq_connect(m_ntfSocket, ntfEndpoint.c_str());
rc = zmq_bind(m_ntfSocket, ntfEndpoint.c_str());

if (rc != 0)
{
Expand All @@ -68,15 +69,6 @@ ZeroMQChannel::ZeroMQChannel(
zmq_errno());
}

rc = zmq_setsockopt(m_ntfSocket, ZMQ_SUBSCRIBE, "", 0);

if (rc != 0)
{
SWSS_LOG_THROW("failed to set sock opt ZMQ_SUBSCRIBE on ntf endpoint %s, zmqerrno: %d",
ntfEndpoint.c_str(),
zmq_errno());
}

// start thread

m_runNotificationThread = true;
Expand All @@ -95,18 +87,42 @@ ZeroMQChannel::~ZeroMQChannel()
zmq_close(m_socket);
zmq_ctx_destroy(m_context);

// create new context, and perform send to break notification recv

void* ctx = zmq_ctx_new();
void* socket = zmq_socket(ctx, ZMQ_PUSH);

int rc = zmq_connect(socket, m_ntfEndpoint.c_str());

if (rc != 0)
{
SWSS_LOG_THROW("failed to open zmq ntf endpoint %s, zmqerrno: %d",
m_ntfEndpoint.c_str(),
zmq_errno());
}

rc = zmq_send(socket, "1", 1, 0);

if (rc < 0)
{
SWSS_LOG_THROW("send error: %d, errno: %d, %s", rc, zmq_errno(), strerror(zmq_errno()));
}

zmq_close(socket);
zmq_ctx_destroy(ctx);

// when zmq context is destroyed, zmq_recv will be interrupted and errno
// will be set to ETERM, so we don't need actual FD to be used in
// selectable event

zmq_close(m_ntfSocket);
zmq_ctx_destroy(m_ntfContext);

SWSS_LOG_NOTICE("join ntf thread begin");

m_notificationThread->join();

SWSS_LOG_NOTICE("join ntf thread end");

zmq_close(m_ntfSocket);
zmq_ctx_destroy(m_ntfContext);
}

void ZeroMQChannel::notificationThreadFunction()
Expand All @@ -126,6 +142,9 @@ void ZeroMQChannel::notificationThreadFunction()

int rc = zmq_recv(m_ntfSocket, buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0);

if (!m_runNotificationThread)
break;

if (rc <= 0 && zmq_errno() == ETERM)
{
SWSS_LOG_NOTICE("zmq_recv interrupted with ETERM, ending thread");
Expand Down
2 changes: 1 addition & 1 deletion syncd/ZeroMQNotificationProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ ZeroMQNotificationProducer::ZeroMQNotificationProducer(

m_ntfContext = zmq_ctx_new();

m_ntfSocket = zmq_socket(m_ntfContext, ZMQ_PUB);
m_ntfSocket = zmq_socket(m_ntfContext, ZMQ_PUSH);

SWSS_LOG_NOTICE("opening zmq ntf endpoint: %s", ntfEndpoint.c_str());

Expand Down
12 changes: 10 additions & 2 deletions tests/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
AM_CPPFLAGS = -I$(top_srcdir)/vslib/inc -I$(top_srcdir)/lib/inc -I$(top_srcdir)/SAI/inc -I$(top_srcdir)/SAI/meta -I$(top_srcdir)/SAI/experimental

bin_PROGRAMS = vssyncd
bin_PROGRAMS = vssyncd tests

if DEBUG
DBGFLAGS = -ggdb -DDEBUG
Expand All @@ -19,4 +19,12 @@ if SAITHRIFT
vssyncd_LDADD += -lrpcserver -lthrift
endif

TESTS = aspellcheck.pl conflictnames.pl swsslogentercheck.sh BCM56850.pl MLNX2700.pl
tests_SOURCES = tests.cpp
tests_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON)
tests_LDADD = -lhiredis -lswsscommon -lpthread \
$(top_srcdir)/lib/src/libsairedis.la \
$(top_srcdir)/syncd/libSyncd.a \
-L$(top_srcdir)/meta/.libs \
-lsaimetadata -lsaimeta -lzmq

TESTS = aspellcheck.pl conflictnames.pl swsslogentercheck.sh tests BCM56850.pl MLNX2700.pl
97 changes: 97 additions & 0 deletions tests/tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>

#include "lib/inc/ZeroMQChannel.h"
#include "syncd/ZeroMQNotificationProducer.h"

#include "meta/sai_serialize.h"

#include <thread>
#include <memory>

using namespace sairedis;
using namespace syncd;

#define ASSERT_EQ(a,b) if ((a) != (b)) { SWSS_LOG_THROW("ASSERT EQ FAILED: " #a " != " #b); }

/*
* Test if destructor proper clean and join zeromq socket and context, and
* break recv method.
*/
static void test_zeromqchannel_destructor()
{
SWSS_LOG_ENTER();

std::cout << " * " << __FUNCTION__ << std::endl;

for (int i = 0; i < 10; i++)
{
ZeroMQChannel z("ipc:///tmp/feeds1", "ipc:///tmp/feeds2", nullptr);

usleep(10*1000);
}
}

/*
* Test if first notification sent from notification producer will arrive at
* zeromq channel notification thread. There is an issue with PUB/SUB inside
* zeromq, and in our model this was changed to push/pull model.
*/
static void test_zeromqchannel_first_notification()
{
SWSS_LOG_ENTER();

std::cout << " * " << __FUNCTION__ << std::endl;

for (int i = 0; i < 10; i++)
{
std::string rop;
std::string rdata;

bool got = false;

auto callback = [&](
const std::string& op,
const std::string& data,
const std::vector<swss::FieldValueTuple>& values)
{
SWSS_LOG_NOTICE("got: %s %s", op.c_str(), data.c_str());

rop = op;
rdata = data;

got = true;
};

ZeroMQChannel z("ipc:///tmp/feeds1", "ipc:///tmp/feeds2", callback);

ZeroMQNotificationProducer p("ipc:///tmp/feeds2");

p.send("foo", "bar", {});

int count = 0;

while (!got && count++ < 200) // in total we will wait max 2 seconds
{
usleep(10*1000);
}

ASSERT_EQ(rop, "foo");
ASSERT_EQ(rdata, "bar");
}
}

int main()
{
SWSS_LOG_ENTER();

test_zeromqchannel_destructor();

test_zeromqchannel_first_notification();

return 0;
}

0 comments on commit 74eb14e

Please sign in to comment.