rosbag: expose latching & callerid connection headers
This commit is contained in:
parent
1ad0baf671
commit
9a5d6720c5
|
@ -67,6 +67,9 @@ public:
|
|||
std::string const& getMessageDefinition() const;
|
||||
ros::M_string const& getConnectionHeader() const;
|
||||
|
||||
std::string getCallerId() const;
|
||||
bool isLatching() const;
|
||||
|
||||
//! Templated call to instantiate a message
|
||||
/*!
|
||||
* returns NULL pointer if incompatible
|
||||
|
|
|
@ -44,6 +44,16 @@ string const& MessageInstance::getMessageDefinition() const { return conn
|
|||
ros::M_string const& MessageInstance::getConnectionHeader() const { return connection_info_->header; }
|
||||
Time const& MessageInstance::getTime() const { return index_entry_.time; }
|
||||
|
||||
string MessageInstance::getCallerId() const {
|
||||
ros::M_string::const_iterator header_iter = connection_info_->header.find("callerid");
|
||||
return header_iter != connection_info_->header.end() ? header_iter->second : string("");
|
||||
}
|
||||
|
||||
bool MessageInstance::isLatching() const {
|
||||
ros::M_string::const_iterator header_iter = connection_info_->header.find("latching");
|
||||
return header_iter != connection_info_->header.end() && header_iter->second == "1";
|
||||
}
|
||||
|
||||
uint32_t MessageInstance::size() const {
|
||||
return bag_->readMessageDataSize(index_entry_);
|
||||
}
|
||||
|
|
|
@ -161,24 +161,14 @@ void Player::publish() {
|
|||
void Player::doPublish(MessageInstance const& m) {
|
||||
string const& topic = m.getTopic();
|
||||
ros::Time const& time = m.getTime();
|
||||
|
||||
// Make a unique id composed of the callerid and the topic allowing separate advertisers for separate latching topics
|
||||
|
||||
ros::M_string const& header = m.getConnectionHeader();
|
||||
|
||||
ros::M_string::const_iterator header_iter = header.find("callerid");
|
||||
string callerid;
|
||||
if (header_iter != header.end())
|
||||
callerid = header_iter->second;
|
||||
string callerid = m.getCallerId();
|
||||
|
||||
string callerid_topic = callerid + topic;
|
||||
|
||||
map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
|
||||
if (pub_iter == publishers_.end()) {
|
||||
ros::AdvertiseOptions opts = createAdvertiseOptions(m, options_.queue_size);
|
||||
|
||||
ros::M_string::const_iterator header_iter = header.find("latching");
|
||||
opts.latch = (header_iter != header.end() && header_iter->second == "1");
|
||||
opts.latch = m.isLatching();
|
||||
|
||||
ros::Publisher pub = node_handle_->advertise(opts);
|
||||
publishers_.insert(publishers_.begin(), pair<string, ros::Publisher>(callerid_topic, pub));
|
||||
|
|
|
@ -121,9 +121,6 @@ class Bag(object):
|
|||
self._file_header_pos = None
|
||||
self._index_data_pos = 0 # (1.2+)
|
||||
|
||||
self._topic_connection_ids = {}
|
||||
self._connections = {} # id -> ConnectionInfo
|
||||
|
||||
self._connection_indexes = {} # id -> IndexEntry[] (1.2+)
|
||||
self._topic_indexes = {} # topic -> IndexEntry[] (1.2+)
|
||||
|
||||
|
@ -974,34 +971,32 @@ class _BagReader(object):
|
|||
self.bag = bag
|
||||
|
||||
def start_reading(self): pass
|
||||
def read_messages(self, topics, start_time, end_time, topic_filter, raw): pass
|
||||
def read_messages(self, topics, start_time, end_time, connection_filter, raw): pass
|
||||
|
||||
def get_entries(self, topics, start_time, end_time, topic_filter):
|
||||
for entry, _ in _mergesort(self.get_indexes(topics, topic_filter), key=lambda entry: entry.time):
|
||||
def get_entries(self, topics, start_time, end_time, connection_filter):
|
||||
for entry, _ in _mergesort(self.get_indexes(topics, connection_filter), key=lambda entry: entry.time):
|
||||
if start_time and entry.time < start_time:
|
||||
continue
|
||||
if end_time and entry.time > end_time:
|
||||
return
|
||||
yield entry
|
||||
|
||||
def get_indexes(self, topics, topic_filter):
|
||||
if topics is None and topic_filter is None:
|
||||
return self.bag._topic_indexes.values()
|
||||
def get_indexes(self, topics, connection_filter):
|
||||
if topics:
|
||||
topics_set = set(topics)
|
||||
else:
|
||||
topics_set = set()
|
||||
|
||||
filtered_connections = []
|
||||
for connection_info in self.bag._connections.values():
|
||||
if connection_info.topic not in topics_set:
|
||||
continue
|
||||
if not connection_filter:
|
||||
continue
|
||||
if connection_filter(connection_info.topic, connection_info.datatype, connection_info.md5sum, connection_info.msg_def):
|
||||
filtered_connections.append(connection_info.id)
|
||||
|
||||
if topics is None:
|
||||
topics = self.bag._topic_indexes.keys()
|
||||
|
||||
if topic_filter is not None:
|
||||
filtered_topics = []
|
||||
for topic in topics:
|
||||
if topic not in self.bag._topic_infos:
|
||||
continue
|
||||
topic_info = self.bag._topic_infos[topic]
|
||||
if topic_filter(topic, topic_info.datatype, topic_info.md5sum, topic_info.msg_def):
|
||||
filtered_topics.append(topic)
|
||||
topics = filtered_topics
|
||||
|
||||
return [index for topic, index in self.bag._topic_indexes.items() if topic in topics]
|
||||
return [index for connection_id, index in self.bag._connection_indexes.items() if connection_id in filtered_connections]
|
||||
|
||||
def read_message_definition_record(self, header=None):
|
||||
if not header:
|
||||
|
@ -1016,9 +1011,9 @@ class _BagReader(object):
|
|||
|
||||
connection_header = { 'topic' : topic, 'type' : datatype, 'md5sum' : md5sum, 'message_definition' : msg_def }
|
||||
|
||||
id = len(self._connections)
|
||||
id = len(self.bag._connections)
|
||||
|
||||
return _ConnectionInfo(id, topic, datatype, md5sum, msg_def, connection_header)
|
||||
return _ConnectionInfo(id, topic, connection_header)
|
||||
|
||||
def get_message_type(self, info):
|
||||
md5sum, datatype, msg_def = info.md5sum, info.datatype, info.msg_def
|
||||
|
@ -1131,21 +1126,26 @@ class _BagReader102_Unindexed(_BagReader):
|
|||
if op != _OP_MSG_DEF:
|
||||
break
|
||||
|
||||
info = self.read_message_definition_record(header)
|
||||
self.bag._connections[info.topic] = info
|
||||
connection_info = self.read_message_definition_record(header)
|
||||
|
||||
if connection_info.topic not in self.bag._topic_connections:
|
||||
self.bag._topic_connections[connection_info.topic] = connection_info.id
|
||||
|
||||
self.bag._connections[connection_info.id] = connection_info
|
||||
|
||||
# Check that we have a MSG_DATA record
|
||||
if op != _OP_MSG_DATA:
|
||||
raise ROSBagFormatException('Expecting OP_MSG_DATA, got %d' % op)
|
||||
|
||||
topic = _read_str_field(header, 'topic')
|
||||
info = self.bag._topic_infos[topic]
|
||||
|
||||
# todo: get connection_info from topic_info
|
||||
|
||||
connection_id = self.bag._topic_connections[topic]
|
||||
|
||||
info = self.bag._connections[connection_id]
|
||||
|
||||
# Get the message type
|
||||
try:
|
||||
msg_type = self.get_message_type(topic_info)
|
||||
msg_type = self.get_message_type(info)
|
||||
except KeyError:
|
||||
raise ROSBagException('Cannot deserialize messages of type [%s]. Message was not preceeded in bagfile by definition' % topic_info.datatype)
|
||||
|
||||
|
@ -1158,7 +1158,7 @@ class _BagReader102_Unindexed(_BagReader):
|
|||
data = _read_record_data(f)
|
||||
|
||||
if raw:
|
||||
msg = (topic_info.datatype, data, topic_info.md5sum, position, msg_type)
|
||||
msg = (info.datatype, data, info.md5sum, position, msg_type)
|
||||
else:
|
||||
# Deserialize the message
|
||||
msg = msg_type()
|
||||
|
|
Loading…
Reference in New Issue