Skip to content

Commit 42f4249

Browse files
authored
Merge pull request #101 from mujin/revert-97-silence_zmq_bullseye
Revert "Silence ZMQ related warnings"
2 parents 7a02a7b + 9a918a0 commit 42f4249

4 files changed

Lines changed: 106 additions & 78 deletions

File tree

include/mujincontrollerclient/mujinjson.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ inline void ParseJson(rapidjson::Document& d, const std::string& str) {
164164
inline void ParseJson(rapidjson::Document& d, std::istream& is) {
165165
rapidjson::IStreamWrapper isw(is);
166166
// see note in: void ParseJson(rapidjson::Document& d, const std::string& str)
167-
rapidjson::Document tempDoc;
167+
rapidjson::Document(tempDoc);
168168
tempDoc.ParseStream<rapidjson::kParseFullPrecisionFlag>(isw); // parse float in full precision mode
169169
if (tempDoc.HasParseError()) {
170170
throw MujinJSONException(boost::str(boost::format("Json stream is invalid (offset %u) %s")%((unsigned)d.GetErrorOffset())%GetParseError_En(d.GetParseError())), MJE_Failed);

src/binpickingtask.cpp

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1785,12 +1785,14 @@ void BinPickingTaskResource::_HeartbeatMonitorThread(const double reinitializeti
17851785
socket.reset();
17861786
}
17871787
socket.reset(new zmq::socket_t((*_zmqcontext.get()),ZMQ_SUB));
1788-
socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect
1789-
socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further
1790-
socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime
1791-
socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer
1792-
socket->connect("tcp://" + _mujinControllerIp + ":" + std::to_string(_heartbeatPort));
1793-
socket->set(zmq::sockopt::subscribe, "");
1788+
socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect
1789+
socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further
1790+
socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime
1791+
socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer
1792+
std::stringstream ss; ss << std::setprecision(std::numeric_limits<double>::digits10+1);
1793+
ss << _heartbeatPort;
1794+
socket->connect(("tcp://"+ _mujinControllerIp+":"+ss.str()).c_str());
1795+
socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
17941796

17951797
zmq::pollitem_t pollitem;
17961798
memset(&pollitem, 0, sizeof(zmq::pollitem_t));
@@ -1799,12 +1801,13 @@ void BinPickingTaskResource::_HeartbeatMonitorThread(const double reinitializeti
17991801

18001802
unsigned long long lastheartbeat = GetMilliTime();
18011803
while (!_bShutdownHeartbeatMonitor && (GetMilliTime() - lastheartbeat) / 1000.0f < reinitializetimeout) {
1802-
zmq::poll(&pollitem, 1, std::chrono::milliseconds{50}); // wait 50 ms for message
1804+
zmq::poll(&pollitem,1, 50); // wait 50 ms for message
18031805
if (pollitem.revents & ZMQ_POLLIN) {
18041806
zmq::message_t reply;
1805-
socket->recv(reply);
1807+
socket->recv(&reply);
1808+
std::string replystring((char *)reply.data (), (size_t)reply.size());
18061809
//if ((size_t)reply.size() == 1 && ((char *)reply.data())[0]==255) {
1807-
if (reply.to_string() == "255") {
1810+
if (replystring == "255") {
18081811
lastheartbeat = GetMilliTime();
18091812
}
18101813
}
@@ -1824,36 +1827,37 @@ void BinPickingTaskResource::_HeartbeatMonitorThread(const double reinitializeti
18241827
std::string utils::GetHeartbeat(const std::string& endpoint) {
18251828
zmq::context_t zmqcontext(1);
18261829
zmq::socket_t socket(zmqcontext, ZMQ_SUB);
1827-
socket.connect(endpoint);
1828-
socket.set(zmq::sockopt::subscribe, "");
1830+
socket.connect(endpoint.c_str());
1831+
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
18291832

18301833
zmq::pollitem_t pollitem;
18311834
memset(&pollitem, 0, sizeof(zmq::pollitem_t));
18321835
pollitem.socket = socket;
18331836
pollitem.events = ZMQ_POLLIN;
18341837

1835-
zmq::poll(&pollitem, 1, std::chrono::milliseconds{50}); // wait 50 ms for message
1838+
zmq::poll(&pollitem,1, 50); // wait 50 ms for message
18361839
if (!(pollitem.revents & ZMQ_POLLIN)) {
18371840
return "";
18381841
}
18391842

18401843
zmq::message_t reply;
1841-
socket.recv(reply);
1844+
socket.recv(&reply);
1845+
const std::string received((char *)reply.data (), (size_t)reply.size());
18421846
#ifndef _WIN32
1843-
return reply.to_string();
1847+
return received;
18441848
#else
18451849
// sometimes buffer can container \n or \\, which windows does not like
18461850
std::string newbuffer;
18471851
std::vector< std::pair<std::string, std::string> > serachpairs(2);
18481852
serachpairs[0].first = "\n"; serachpairs[0].second = "";
18491853
serachpairs[1].first = "\\"; serachpairs[1].second = "";
1850-
SearchAndReplace(newbuffer, reply.to_string(), serachpairs);
1854+
SearchAndReplace(newbuffer, received, serachpairs);
18511855
return newbuffer;
18521856
#endif
18531857
}
18541858

1855-
namespace {
18561859

1860+
namespace {
18571861
std::string FindSmallestSlaveRequestId(const rapidjson::Value& pt) {
18581862
// get all slave request ids
18591863
std::vector<std::string> slavereqids;
@@ -1894,11 +1898,13 @@ std::string FindSmallestSlaveRequestId(const rapidjson::Value& pt) {
18941898
return slavereqids[smallest_suffix_index];
18951899
}
18961900

1897-
std::string GetValueForSmallestSlaveRequestId(const std::string& heartbeat, const std::string& key)
1901+
std::string GetValueForSmallestSlaveRequestId(const std::string& heartbeat,
1902+
const std::string& key)
18981903
{
18991904

19001905
rapidjson::Document pt(rapidjson::kObjectType);
1901-
ParseJson(pt, heartbeat);
1906+
std::stringstream ss(heartbeat);
1907+
ParseJson(pt, ss.str());
19021908
try {
19031909
const std::string slavereqid = FindSmallestSlaveRequestId(pt);
19041910
std::string result;
@@ -1910,8 +1916,8 @@ std::string GetValueForSmallestSlaveRequestId(const std::string& heartbeat, cons
19101916
}
19111917

19121918
}
1919+
}
19131920

1914-
} // anonymous namespace
19151921

19161922
std::string mujinclient::utils::GetScenePkFromHeartbeat(const std::string& heartbeat) {
19171923
static const std::string prefix("mujin:/");
@@ -1920,7 +1926,8 @@ std::string mujinclient::utils::GetScenePkFromHeartbeat(const std::string& heart
19201926

19211927
std::string utils::GetSlaveRequestIdFromHeartbeat(const std::string& heartbeat) {
19221928
rapidjson::Document pt;
1923-
ParseJson(pt, heartbeat);
1929+
std::stringstream ss(heartbeat);
1930+
ParseJson(pt, ss.str());
19241931
try {
19251932
static const std::string prefix("slaverequestid-");
19261933
return FindSmallestSlaveRequestId(pt).substr(prefix.length());

src/binpickingtaskzmq.cpp

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -267,26 +267,30 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ
267267
socket.reset();
268268
}
269269
socket.reset(new zmq::socket_t((*_zmqcontext.get()),ZMQ_SUB));
270-
socket->set(zmq::sockopt::tcp_keepalive, 1); // turn on tcp keepalive, do these configuration before connect
271-
socket->set(zmq::sockopt::tcp_keepalive_idle, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further
272-
socket->set(zmq::sockopt::tcp_keepalive_intvl, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime
273-
socket->set(zmq::sockopt::tcp_keepalive_cnt, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer
274-
socket->connect("tcp://" + _mujinControllerIp + ":" + std::to_string(_heartbeatPort));
275-
socket->set(zmq::sockopt::subscribe, "");
270+
socket->setsockopt(ZMQ_TCP_KEEPALIVE, 1); // turn on tcp keepalive, do these configuration before connect
271+
socket->setsockopt(ZMQ_TCP_KEEPALIVE_IDLE, 2); // the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; after the connection is marked to need keepalive, this counter is not used any further
272+
socket->setsockopt(ZMQ_TCP_KEEPALIVE_INTVL, 2); // the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime
273+
socket->setsockopt(ZMQ_TCP_KEEPALIVE_CNT, 2); // the number of unacknowledged probes to send before considering the connection dead and notifying the application layer
274+
std::stringstream ss; ss << std::setprecision(std::numeric_limits<double>::digits10+1);
275+
ss << _heartbeatPort;
276+
socket->connect (("tcp://"+ _mujinControllerIp+":"+ss.str()).c_str());
277+
socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
276278

277279
zmq::pollitem_t pollitem;
278280
memset(&pollitem, 0, sizeof(zmq::pollitem_t));
279281
pollitem.socket = socket->operator void*();
280282
pollitem.events = ZMQ_POLLIN;
281283
unsigned long long lastheartbeat = GetMilliTime();
282284
while (!_bShutdownHeartbeatMonitor && (GetMilliTime() - lastheartbeat) / 1000.0f < reinitializetimeout) {
283-
zmq::poll(&pollitem, 1, std::chrono::milliseconds{50}); // wait 50 ms for message
285+
zmq::poll(&pollitem,1, 50); // wait 50 ms for message
284286
if (pollitem.revents & ZMQ_POLLIN) {
285287
zmq::message_t reply;
286-
socket->recv(reply);
288+
socket->recv(&reply);
289+
std::string replystring((char *)reply.data (), (size_t)reply.size());
287290
rapidjson::Document pt(rapidjson::kObjectType);
288291
try{
289-
ParseJson(pt, reply.to_string());
292+
std::stringstream replystring_ss(replystring);
293+
ParseJson(pt, replystring_ss.str());
290294
heartbeat.Parse(pt);
291295
{
292296
boost::mutex::scoped_lock lock(_mutexTaskState);
@@ -300,7 +304,7 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ
300304
}
301305
catch (std::exception const &e) {
302306
MUJIN_LOG_ERROR("HeartBeat reply is not JSON");
303-
MUJIN_LOG_ERROR(reply.to_string());
307+
MUJIN_LOG_ERROR(replystring);
304308
MUJIN_LOG_ERROR(e.what());
305309
continue;
306310
}
@@ -315,4 +319,6 @@ void BinPickingTaskZmqResource::_HeartbeatMonitorThread(const double reinitializ
315319
MUJIN_LOG_DEBUG(str(boost::format("Stopped controller %s monitoring thread on port %d for slaverequestid=%s.")%_mujinControllerIp%_heartbeatPort%_slaverequestid));
316320
}
317321

322+
323+
318324
} // end namespace mujinclient

0 commit comments

Comments
 (0)