88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367 | class ZenohSession:
"""Manages a shared Zenoh session and type store"""
_instance = None
_lock = threading.Lock()
def __init__(self, router_ip: str = "127.0.0.1", router_port: int = 7447):
self.router_ip = router_ip
self.router_port = router_port
# Load default session config (rmw_zenoh-aligned) or custom file from ZENOH_SESSION_CONFIG_URI
config_uri = os.environ.get(ZENOH_SESSION_CONFIG_URI_ENV, "").strip()
if config_uri and os.path.isfile(config_uri):
config_path = Path(config_uri)
logger.debug("Loading Zenoh session config from %s", config_path)
self.conf = zenoh.Config.from_file(str(config_path))
else:
if not _DEFAULT_SESSION_CONFIG_PATH.is_file():
raise FileNotFoundError(
f"Default session config not found: {_DEFAULT_SESSION_CONFIG_PATH}"
)
self.conf = zenoh.Config.from_file(str(_DEFAULT_SESSION_CONFIG_PATH))
# Override connect endpoints with SDK router address (and env overrides)
self.conf.insert_json5(
"connect/endpoints", f'["tcp/{router_ip}:{router_port}"]'
)
override = os.environ.get("ZENOH_CONFIG_OVERRIDE", "").strip()
if override:
_apply_zenoh_config_override(self.conf, override)
self.session = zenoh.open(self.conf)
self.store = get_typestore(Stores.EMPTY)
self._registered_types = {}
self._node_counter = 0
self._entity_counter = 0
self._lock = threading.Lock()
# Get session ID
session_info = self.session.info
self.session_id = str(session_info.zid())
self.liveliness = self.session.liveliness()
@classmethod
def get_instance(cls, router_ip: str = "127.0.0.1", router_port: int = 7447):
"""Get or create singleton instance"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = cls(router_ip, router_port)
return cls._instance
def register_message_type(self, msg_definition: Optional[str], ros2_type_name: str):
"""Register a ROS2 message type"""
# Check if already registered and in store
if ros2_type_name in self._registered_types:
# Get the actual store key (may be converted name for service types)
actual_key = self._registered_types[ros2_type_name]
if isinstance(actual_key, str):
# actual_key is the store key
msg_class = self.store.types.get(actual_key)
if msg_class is not None:
return msg_class
else:
# actual_key is the types dict (old format), try direct lookup
msg_class = self.store.types.get(ros2_type_name)
if msg_class is not None:
return msg_class
# Try converted name for service types
if '/srv/' in ros2_type_name:
converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
msg_class = self.store.types.get(converted_name)
if msg_class is not None:
# Update mapping to use converted name
self._registered_types[ros2_type_name] = converted_name
return msg_class
# If in _registered_types but not in store, something went wrong - clear it
logger.warning(f"Type {ros2_type_name} was marked as registered but not in store, re-registering")
del self._registered_types[ros2_type_name]
# If msg_definition is None, try to load from message registry
if msg_definition is None:
registry = get_registry()
# Special case: handle messages with empty definitions (like std_msgs/msg/Empty)
# If the .msg file exists and is empty, register it directly to prevent recursion
try:
msg_file = registry.get_msg_file_path(ros2_type_name)
if msg_file and msg_file.exists():
with open(msg_file, 'r') as f:
file_content = f.read()
# If file is empty (valid for messages with no fields), register it directly
if not file_content.strip():
types = get_types_from_msg("", ros2_type_name)
self.store.register(types)
# Find the actual key that was registered (same logic as below)
actual_type_key = None
for key in types.keys():
if key == ros2_type_name:
actual_type_key = key
break
if actual_type_key is None and types:
actual_type_key = list(types.keys())[0]
if actual_type_key:
self._registered_types[ros2_type_name] = actual_type_key
msg_class = self.store.types.get(actual_type_key)
if msg_class is not None:
return msg_class
except Exception:
# If file check or registration fails, fall through to registry loading
pass
# Check if this is a service request/response type
# Service types are like "namespace/srv/ServiceName_Request" or "namespace/srv/ServiceName_Response"
is_service_type = '/srv/' in ros2_type_name and ('_Request' in ros2_type_name or '_Response' in ros2_type_name)
if is_service_type:
# For service types, we need to load the service type first
# Extract service type from request/response type
# e.g., "example_interfaces/srv/AddTwoInts_Request" -> "example_interfaces/srv/AddTwoInts"
if ros2_type_name.endswith('_Request'):
srv_type = ros2_type_name[:-8] # Remove "_Request"
elif ros2_type_name.endswith('_Response'):
srv_type = ros2_type_name[:-9] # Remove "_Response"
else:
srv_type = None
if srv_type:
# Load the service type - this will register both request and response types
if load_service_type(srv_type):
# After loading, check if the type is now in _registered_types
# (load_service_type calls register_message_type which adds it)
if ros2_type_name in self._registered_types:
# Type was registered, get it from store using the actual key
actual_key = self._registered_types[ros2_type_name]
if isinstance(actual_key, str):
msg_class = self.store.types.get(actual_key)
if msg_class is not None:
return msg_class
# If not found via _registered_types, try direct lookup (both original and converted names)
# Try converted name first (rosbags stores service types with /srv/msg/)
converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
msg_class = self.store.types.get(converted_name)
if msg_class is not None:
self._registered_types[ros2_type_name] = converted_name
return msg_class
# Try original name
msg_class = self.store.types.get(ros2_type_name)
if msg_class is not None:
self._registered_types[ros2_type_name] = ros2_type_name
return msg_class
# For regular message types, use the existing logic
if registry.is_loaded(ros2_type_name):
# Already loaded, check if it's in the store (try both original and converted names)
msg_class = self.store.types.get(ros2_type_name)
if msg_class is None and '/srv/' in ros2_type_name:
# Try converted name for service types
converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
msg_class = self.store.types.get(converted_name)
if msg_class is not None:
self._registered_types[ros2_type_name] = converted_name
return msg_class
if msg_class is not None:
# Store the mapping (use original name as key if found directly)
self._registered_types[ros2_type_name] = ros2_type_name
return msg_class
elif registry.load_message_type(ros2_type_name):
# Successfully loaded from registry, try both original and converted names
msg_class = self.store.types.get(ros2_type_name)
if msg_class is None and '/srv/' in ros2_type_name:
# Try converted name for service types
converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
msg_class = self.store.types.get(converted_name)
if msg_class is not None:
self._registered_types[ros2_type_name] = converted_name
return msg_class
if msg_class is not None:
# Store the mapping (use original name as key if found directly)
self._registered_types[ros2_type_name] = ros2_type_name
return msg_class
# If we get here, the type wasn't found or couldn't be loaded
raise ValueError(
f"Message type {ros2_type_name} not found in registry and no definition provided. "
f"Please provide msg_definition or ensure the message type is loaded."
)
# Register the type from the provided definition
try:
types = get_types_from_msg(msg_definition, ros2_type_name)
self.store.register(types)
# get_types_from_msg may convert the type name (e.g., srv/TypeName -> srv/msg/TypeName)
# Find the actual key that was registered in the store
actual_type_key = None
for key in types.keys():
# Check if this key matches our type name
if key == ros2_type_name:
actual_type_key = key
break
# Handle conversion: srv/TypeName -> srv/msg/TypeName
# Check if the key is a converted version of our type name
if '/srv/' in ros2_type_name and '/srv/msg/' in key:
# Extract the base name (everything after srv/)
our_base = ros2_type_name.split('/srv/')[1]
store_base = key.split('/srv/msg/')[1]
if our_base == store_base:
actual_type_key = key
break
# If no match found, use the first (and likely only) key from types
if actual_type_key is None and types:
actual_type_key = list(types.keys())[0]
# Store mapping: our type name -> actual store key
if actual_type_key:
self._registered_types[ros2_type_name] = actual_type_key
msg_class = self.store.types.get(actual_type_key)
if msg_class is not None:
return msg_class
except Exception as e:
raise RuntimeError(
f"Failed to register message type {ros2_type_name}: {e}"
) from e
# Handle name conversion: rosbags converts srv/TypeName to srv/msg/TypeName
# Try original name first, then converted name
msg_class = self.store.types.get(ros2_type_name)
if msg_class is None:
# Try with /msg/ inserted (for service types: srv/ -> srv/msg/)
if '/srv/' in ros2_type_name:
converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
msg_class = self.store.types.get(converted_name)
if msg_class is not None:
# Cache the mapping for future lookups
self._registered_types[ros2_type_name] = converted_name
return msg_class
if msg_class is None:
# Provide helpful error message
tried_names = [ros2_type_name]
if '/srv/' in ros2_type_name:
tried_names.append(ros2_type_name.replace('/srv/', '/srv/msg/'))
available = [k for k in self.store.types.keys() if ros2_type_name.split('/')[-1] in k][:5]
raise KeyError(
f"Message type {ros2_type_name} was registered but not found in store. "
f"Tried: {tried_names}. "
f"Available matching types: {available}"
)
return msg_class
def get_next_node_id(self):
"""Get next available node ID"""
with self._lock:
node_id = self._node_counter
self._node_counter += 1
return node_id
def get_next_entity_id(self):
"""Get next available entity ID"""
with self._lock:
entity_id = self._entity_counter
self._entity_counter += 1
return entity_id
def generate_gid(self) -> bytes:
"""Generate a unique GID (16 bytes)"""
# Use UUID to generate unique GID
uuid_bytes = uuid.uuid4().bytes
return uuid_bytes
def close(self):
"""Close the session"""
if self.session:
self.session.close()
ZenohSession._instance = None
|