Skip to content

session

The SDK uses a default Zenoh session config aligned with rmw_zenoh (copied from ros2/rmw_zenoh DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5), so session behavior (scouting, transport, timings, etc.) matches ROS 2 nodes using rmw_zenoh. The router_ip / router_port passed to ZenohSession.get_instance() override the config’s connect/endpoints.

  • Custom config file: set ZENOH_SESSION_CONFIG_URI to the path of a JSON5 config file to use instead of the bundled default.
  • Overrides: set ZENOH_CONFIG_OVERRIDE to apply extra config (e.g. connect/endpoints=["tcp/192.168.1.1:7447"]) on top of the loaded config.

ZenohSession - Manages shared Zenoh session and type store

ZenohSession

Manages a shared Zenoh session and type store

Source code in zenoh_ros2_sdk/session.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
class ZenohSession:
    """Manages a shared Zenoh session and type store"""
    _instance = None
    _lock = threading.Lock()

    def __init__(self, router_ip: str = "127.0.0.1", router_port: int = 7447):
        self.router_ip = router_ip
        self.router_port = router_port

        # Load default session config (rmw_zenoh-aligned) or custom file from ZENOH_SESSION_CONFIG_URI
        config_uri = os.environ.get(ZENOH_SESSION_CONFIG_URI_ENV, "").strip()
        if config_uri and os.path.isfile(config_uri):
            config_path = Path(config_uri)
            logger.debug("Loading Zenoh session config from %s", config_path)
            self.conf = zenoh.Config.from_file(str(config_path))
        else:
            if not _DEFAULT_SESSION_CONFIG_PATH.is_file():
                raise FileNotFoundError(
                    f"Default session config not found: {_DEFAULT_SESSION_CONFIG_PATH}"
                )
            self.conf = zenoh.Config.from_file(str(_DEFAULT_SESSION_CONFIG_PATH))

        # Override connect endpoints with SDK router address (and env overrides)
        self.conf.insert_json5(
            "connect/endpoints", f'["tcp/{router_ip}:{router_port}"]'
        )

        override = os.environ.get("ZENOH_CONFIG_OVERRIDE", "").strip()
        if override:
            _apply_zenoh_config_override(self.conf, override)

        self.session = zenoh.open(self.conf)
        self.store = get_typestore(Stores.EMPTY)
        self._registered_types = {}
        self._node_counter = 0
        self._entity_counter = 0
        self._lock = threading.Lock()

        # Get session ID
        session_info = self.session.info
        self.session_id = str(session_info.zid())
        self.liveliness = self.session.liveliness()

    @classmethod
    def get_instance(cls, router_ip: str = "127.0.0.1", router_port: int = 7447):
        """Get or create singleton instance"""
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = cls(router_ip, router_port)
        return cls._instance

    def register_message_type(self, msg_definition: Optional[str], ros2_type_name: str):
        """Register a ROS2 message type"""
        # Check if already registered and in store
        if ros2_type_name in self._registered_types:
            # Get the actual store key (may be converted name for service types)
            actual_key = self._registered_types[ros2_type_name]
            if isinstance(actual_key, str):
                # actual_key is the store key
                msg_class = self.store.types.get(actual_key)
                if msg_class is not None:
                    return msg_class
            else:
                # actual_key is the types dict (old format), try direct lookup
                msg_class = self.store.types.get(ros2_type_name)
                if msg_class is not None:
                    return msg_class
                # Try converted name for service types
                if '/srv/' in ros2_type_name:
                    converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
                    msg_class = self.store.types.get(converted_name)
                    if msg_class is not None:
                        # Update mapping to use converted name
                        self._registered_types[ros2_type_name] = converted_name
                        return msg_class

            # If in _registered_types but not in store, something went wrong - clear it
            logger.warning(f"Type {ros2_type_name} was marked as registered but not in store, re-registering")
            del self._registered_types[ros2_type_name]

        # If msg_definition is None, try to load from message registry
        if msg_definition is None:
            registry = get_registry()

            # Special case: handle messages with empty definitions (like std_msgs/msg/Empty)
            # If the .msg file exists and is empty, register it directly to prevent recursion
            try:
                msg_file = registry.get_msg_file_path(ros2_type_name)
                if msg_file and msg_file.exists():
                    with open(msg_file, 'r') as f:
                        file_content = f.read()
                    # If file is empty (valid for messages with no fields), register it directly
                    if not file_content.strip():
                        types = get_types_from_msg("", ros2_type_name)
                        self.store.register(types)
                        # Find the actual key that was registered (same logic as below)
                        actual_type_key = None
                        for key in types.keys():
                            if key == ros2_type_name:
                                actual_type_key = key
                                break
                        if actual_type_key is None and types:
                            actual_type_key = list(types.keys())[0]
                        if actual_type_key:
                            self._registered_types[ros2_type_name] = actual_type_key
                            msg_class = self.store.types.get(actual_type_key)
                            if msg_class is not None:
                                return msg_class
            except Exception:
                # If file check or registration fails, fall through to registry loading
                pass

            # Check if this is a service request/response type
            # Service types are like "namespace/srv/ServiceName_Request" or "namespace/srv/ServiceName_Response"
            is_service_type = '/srv/' in ros2_type_name and ('_Request' in ros2_type_name or '_Response' in ros2_type_name)

            if is_service_type:
                # For service types, we need to load the service type first
                # Extract service type from request/response type
                # e.g., "example_interfaces/srv/AddTwoInts_Request" -> "example_interfaces/srv/AddTwoInts"
                if ros2_type_name.endswith('_Request'):
                    srv_type = ros2_type_name[:-8]  # Remove "_Request"
                elif ros2_type_name.endswith('_Response'):
                    srv_type = ros2_type_name[:-9]  # Remove "_Response"
                else:
                    srv_type = None

                if srv_type:
                    # Load the service type - this will register both request and response types
                    if load_service_type(srv_type):
                        # After loading, check if the type is now in _registered_types
                        # (load_service_type calls register_message_type which adds it)
                        if ros2_type_name in self._registered_types:
                            # Type was registered, get it from store using the actual key
                            actual_key = self._registered_types[ros2_type_name]
                            if isinstance(actual_key, str):
                                msg_class = self.store.types.get(actual_key)
                                if msg_class is not None:
                                    return msg_class

                        # If not found via _registered_types, try direct lookup (both original and converted names)
                        # Try converted name first (rosbags stores service types with /srv/msg/)
                        converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
                        msg_class = self.store.types.get(converted_name)
                        if msg_class is not None:
                            self._registered_types[ros2_type_name] = converted_name
                            return msg_class

                        # Try original name
                        msg_class = self.store.types.get(ros2_type_name)
                        if msg_class is not None:
                            self._registered_types[ros2_type_name] = ros2_type_name
                            return msg_class

            # For regular message types, use the existing logic
            if registry.is_loaded(ros2_type_name):
                # Already loaded, check if it's in the store (try both original and converted names)
                msg_class = self.store.types.get(ros2_type_name)
                if msg_class is None and '/srv/' in ros2_type_name:
                    # Try converted name for service types
                    converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
                    msg_class = self.store.types.get(converted_name)
                    if msg_class is not None:
                        self._registered_types[ros2_type_name] = converted_name
                        return msg_class
                if msg_class is not None:
                    # Store the mapping (use original name as key if found directly)
                    self._registered_types[ros2_type_name] = ros2_type_name
                    return msg_class
            elif registry.load_message_type(ros2_type_name):
                # Successfully loaded from registry, try both original and converted names
                msg_class = self.store.types.get(ros2_type_name)
                if msg_class is None and '/srv/' in ros2_type_name:
                    # Try converted name for service types
                    converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
                    msg_class = self.store.types.get(converted_name)
                    if msg_class is not None:
                        self._registered_types[ros2_type_name] = converted_name
                        return msg_class
                if msg_class is not None:
                    # Store the mapping (use original name as key if found directly)
                    self._registered_types[ros2_type_name] = ros2_type_name
                    return msg_class

            # If we get here, the type wasn't found or couldn't be loaded
            raise ValueError(
                f"Message type {ros2_type_name} not found in registry and no definition provided. "
                f"Please provide msg_definition or ensure the message type is loaded."
            )

        # Register the type from the provided definition
        try:
            types = get_types_from_msg(msg_definition, ros2_type_name)
            self.store.register(types)

            # get_types_from_msg may convert the type name (e.g., srv/TypeName -> srv/msg/TypeName)
            # Find the actual key that was registered in the store
            actual_type_key = None
            for key in types.keys():
                # Check if this key matches our type name
                if key == ros2_type_name:
                    actual_type_key = key
                    break
                # Handle conversion: srv/TypeName -> srv/msg/TypeName
                # Check if the key is a converted version of our type name
                if '/srv/' in ros2_type_name and '/srv/msg/' in key:
                    # Extract the base name (everything after srv/)
                    our_base = ros2_type_name.split('/srv/')[1]
                    store_base = key.split('/srv/msg/')[1]
                    if our_base == store_base:
                        actual_type_key = key
                        break

            # If no match found, use the first (and likely only) key from types
            if actual_type_key is None and types:
                actual_type_key = list(types.keys())[0]

            # Store mapping: our type name -> actual store key
            if actual_type_key:
                self._registered_types[ros2_type_name] = actual_type_key
                msg_class = self.store.types.get(actual_type_key)
                if msg_class is not None:
                    return msg_class
        except Exception as e:
            raise RuntimeError(
                f"Failed to register message type {ros2_type_name}: {e}"
            ) from e

        # Handle name conversion: rosbags converts srv/TypeName to srv/msg/TypeName
        # Try original name first, then converted name
        msg_class = self.store.types.get(ros2_type_name)
        if msg_class is None:
            # Try with /msg/ inserted (for service types: srv/ -> srv/msg/)
            if '/srv/' in ros2_type_name:
                converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
                msg_class = self.store.types.get(converted_name)
                if msg_class is not None:
                    # Cache the mapping for future lookups
                    self._registered_types[ros2_type_name] = converted_name
                    return msg_class

        if msg_class is None:
            # Provide helpful error message
            tried_names = [ros2_type_name]
            if '/srv/' in ros2_type_name:
                tried_names.append(ros2_type_name.replace('/srv/', '/srv/msg/'))
            available = [k for k in self.store.types.keys() if ros2_type_name.split('/')[-1] in k][:5]
            raise KeyError(
                f"Message type {ros2_type_name} was registered but not found in store. "
                f"Tried: {tried_names}. "
                f"Available matching types: {available}"
            )
        return msg_class

    def get_next_node_id(self):
        """Get next available node ID"""
        with self._lock:
            node_id = self._node_counter
            self._node_counter += 1
            return node_id

    def get_next_entity_id(self):
        """Get next available entity ID"""
        with self._lock:
            entity_id = self._entity_counter
            self._entity_counter += 1
            return entity_id

    def generate_gid(self) -> bytes:
        """Generate a unique GID (16 bytes)"""
        # Use UUID to generate unique GID
        uuid_bytes = uuid.uuid4().bytes
        return uuid_bytes

    def close(self):
        """Close the session"""
        if self.session:
            self.session.close()
            ZenohSession._instance = None

get_instance(router_ip: str = '127.0.0.1', router_port: int = 7447) classmethod

Get or create singleton instance

Source code in zenoh_ros2_sdk/session.py
131
132
133
134
135
136
137
138
@classmethod
def get_instance(cls, router_ip: str = "127.0.0.1", router_port: int = 7447):
    """Get or create singleton instance"""
    if cls._instance is None:
        with cls._lock:
            if cls._instance is None:
                cls._instance = cls(router_ip, router_port)
    return cls._instance

register_message_type(msg_definition: Optional[str], ros2_type_name: str)

Register a ROS2 message type

Source code in zenoh_ros2_sdk/session.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def register_message_type(self, msg_definition: Optional[str], ros2_type_name: str):
    """Register a ROS2 message type"""
    # Check if already registered and in store
    if ros2_type_name in self._registered_types:
        # Get the actual store key (may be converted name for service types)
        actual_key = self._registered_types[ros2_type_name]
        if isinstance(actual_key, str):
            # actual_key is the store key
            msg_class = self.store.types.get(actual_key)
            if msg_class is not None:
                return msg_class
        else:
            # actual_key is the types dict (old format), try direct lookup
            msg_class = self.store.types.get(ros2_type_name)
            if msg_class is not None:
                return msg_class
            # Try converted name for service types
            if '/srv/' in ros2_type_name:
                converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
                msg_class = self.store.types.get(converted_name)
                if msg_class is not None:
                    # Update mapping to use converted name
                    self._registered_types[ros2_type_name] = converted_name
                    return msg_class

        # If in _registered_types but not in store, something went wrong - clear it
        logger.warning(f"Type {ros2_type_name} was marked as registered but not in store, re-registering")
        del self._registered_types[ros2_type_name]

    # If msg_definition is None, try to load from message registry
    if msg_definition is None:
        registry = get_registry()

        # Special case: handle messages with empty definitions (like std_msgs/msg/Empty)
        # If the .msg file exists and is empty, register it directly to prevent recursion
        try:
            msg_file = registry.get_msg_file_path(ros2_type_name)
            if msg_file and msg_file.exists():
                with open(msg_file, 'r') as f:
                    file_content = f.read()
                # If file is empty (valid for messages with no fields), register it directly
                if not file_content.strip():
                    types = get_types_from_msg("", ros2_type_name)
                    self.store.register(types)
                    # Find the actual key that was registered (same logic as below)
                    actual_type_key = None
                    for key in types.keys():
                        if key == ros2_type_name:
                            actual_type_key = key
                            break
                    if actual_type_key is None and types:
                        actual_type_key = list(types.keys())[0]
                    if actual_type_key:
                        self._registered_types[ros2_type_name] = actual_type_key
                        msg_class = self.store.types.get(actual_type_key)
                        if msg_class is not None:
                            return msg_class
        except Exception:
            # If file check or registration fails, fall through to registry loading
            pass

        # Check if this is a service request/response type
        # Service types are like "namespace/srv/ServiceName_Request" or "namespace/srv/ServiceName_Response"
        is_service_type = '/srv/' in ros2_type_name and ('_Request' in ros2_type_name or '_Response' in ros2_type_name)

        if is_service_type:
            # For service types, we need to load the service type first
            # Extract service type from request/response type
            # e.g., "example_interfaces/srv/AddTwoInts_Request" -> "example_interfaces/srv/AddTwoInts"
            if ros2_type_name.endswith('_Request'):
                srv_type = ros2_type_name[:-8]  # Remove "_Request"
            elif ros2_type_name.endswith('_Response'):
                srv_type = ros2_type_name[:-9]  # Remove "_Response"
            else:
                srv_type = None

            if srv_type:
                # Load the service type - this will register both request and response types
                if load_service_type(srv_type):
                    # After loading, check if the type is now in _registered_types
                    # (load_service_type calls register_message_type which adds it)
                    if ros2_type_name in self._registered_types:
                        # Type was registered, get it from store using the actual key
                        actual_key = self._registered_types[ros2_type_name]
                        if isinstance(actual_key, str):
                            msg_class = self.store.types.get(actual_key)
                            if msg_class is not None:
                                return msg_class

                    # If not found via _registered_types, try direct lookup (both original and converted names)
                    # Try converted name first (rosbags stores service types with /srv/msg/)
                    converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
                    msg_class = self.store.types.get(converted_name)
                    if msg_class is not None:
                        self._registered_types[ros2_type_name] = converted_name
                        return msg_class

                    # Try original name
                    msg_class = self.store.types.get(ros2_type_name)
                    if msg_class is not None:
                        self._registered_types[ros2_type_name] = ros2_type_name
                        return msg_class

        # For regular message types, use the existing logic
        if registry.is_loaded(ros2_type_name):
            # Already loaded, check if it's in the store (try both original and converted names)
            msg_class = self.store.types.get(ros2_type_name)
            if msg_class is None and '/srv/' in ros2_type_name:
                # Try converted name for service types
                converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
                msg_class = self.store.types.get(converted_name)
                if msg_class is not None:
                    self._registered_types[ros2_type_name] = converted_name
                    return msg_class
            if msg_class is not None:
                # Store the mapping (use original name as key if found directly)
                self._registered_types[ros2_type_name] = ros2_type_name
                return msg_class
        elif registry.load_message_type(ros2_type_name):
            # Successfully loaded from registry, try both original and converted names
            msg_class = self.store.types.get(ros2_type_name)
            if msg_class is None and '/srv/' in ros2_type_name:
                # Try converted name for service types
                converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
                msg_class = self.store.types.get(converted_name)
                if msg_class is not None:
                    self._registered_types[ros2_type_name] = converted_name
                    return msg_class
            if msg_class is not None:
                # Store the mapping (use original name as key if found directly)
                self._registered_types[ros2_type_name] = ros2_type_name
                return msg_class

        # If we get here, the type wasn't found or couldn't be loaded
        raise ValueError(
            f"Message type {ros2_type_name} not found in registry and no definition provided. "
            f"Please provide msg_definition or ensure the message type is loaded."
        )

    # Register the type from the provided definition
    try:
        types = get_types_from_msg(msg_definition, ros2_type_name)
        self.store.register(types)

        # get_types_from_msg may convert the type name (e.g., srv/TypeName -> srv/msg/TypeName)
        # Find the actual key that was registered in the store
        actual_type_key = None
        for key in types.keys():
            # Check if this key matches our type name
            if key == ros2_type_name:
                actual_type_key = key
                break
            # Handle conversion: srv/TypeName -> srv/msg/TypeName
            # Check if the key is a converted version of our type name
            if '/srv/' in ros2_type_name and '/srv/msg/' in key:
                # Extract the base name (everything after srv/)
                our_base = ros2_type_name.split('/srv/')[1]
                store_base = key.split('/srv/msg/')[1]
                if our_base == store_base:
                    actual_type_key = key
                    break

        # If no match found, use the first (and likely only) key from types
        if actual_type_key is None and types:
            actual_type_key = list(types.keys())[0]

        # Store mapping: our type name -> actual store key
        if actual_type_key:
            self._registered_types[ros2_type_name] = actual_type_key
            msg_class = self.store.types.get(actual_type_key)
            if msg_class is not None:
                return msg_class
    except Exception as e:
        raise RuntimeError(
            f"Failed to register message type {ros2_type_name}: {e}"
        ) from e

    # Handle name conversion: rosbags converts srv/TypeName to srv/msg/TypeName
    # Try original name first, then converted name
    msg_class = self.store.types.get(ros2_type_name)
    if msg_class is None:
        # Try with /msg/ inserted (for service types: srv/ -> srv/msg/)
        if '/srv/' in ros2_type_name:
            converted_name = ros2_type_name.replace('/srv/', '/srv/msg/')
            msg_class = self.store.types.get(converted_name)
            if msg_class is not None:
                # Cache the mapping for future lookups
                self._registered_types[ros2_type_name] = converted_name
                return msg_class

    if msg_class is None:
        # Provide helpful error message
        tried_names = [ros2_type_name]
        if '/srv/' in ros2_type_name:
            tried_names.append(ros2_type_name.replace('/srv/', '/srv/msg/'))
        available = [k for k in self.store.types.keys() if ros2_type_name.split('/')[-1] in k][:5]
        raise KeyError(
            f"Message type {ros2_type_name} was registered but not found in store. "
            f"Tried: {tried_names}. "
            f"Available matching types: {available}"
        )
    return msg_class

get_next_node_id()

Get next available node ID

Source code in zenoh_ros2_sdk/session.py
343
344
345
346
347
348
def get_next_node_id(self):
    """Get next available node ID"""
    with self._lock:
        node_id = self._node_counter
        self._node_counter += 1
        return node_id

get_next_entity_id()

Get next available entity ID

Source code in zenoh_ros2_sdk/session.py
350
351
352
353
354
355
def get_next_entity_id(self):
    """Get next available entity ID"""
    with self._lock:
        entity_id = self._entity_counter
        self._entity_counter += 1
        return entity_id

generate_gid() -> bytes

Generate a unique GID (16 bytes)

Source code in zenoh_ros2_sdk/session.py
357
358
359
360
361
def generate_gid(self) -> bytes:
    """Generate a unique GID (16 bytes)"""
    # Use UUID to generate unique GID
    uuid_bytes = uuid.uuid4().bytes
    return uuid_bytes

close()

Close the session

Source code in zenoh_ros2_sdk/session.py
363
364
365
366
367
def close(self):
    """Close the session"""
    if self.session:
        self.session.close()
        ZenohSession._instance = None