Skip to content

service_server

ROS2ServiceServer - ROS2 Service Server using Zenoh

ServiceRequestKey

Correlation key for service requests, aligned with ros-z QueryKey and rmw_zenoh.

Source code in zenoh_ros2_sdk/service_server.py
25
26
27
28
29
30
31
32
@slotted_dataclass(frozen=True)
class ServiceRequestKey:
    """
    Correlation key for service requests, aligned with ros-z QueryKey and rmw_zenoh.
    """

    sequence_id: int
    gid: bytes

ROS2ServiceServer

ROS2 Service Server using Zenoh - receives requests and sends responses

Source code in zenoh_ros2_sdk/service_server.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
class ROS2ServiceServer:
    """ROS2 Service Server using Zenoh - receives requests and sends responses"""

    def __init__(
        self,
        service_name: str,
        srv_type: str,
        callback: Optional[Callable] = None,
        request_definition: Optional[str] = None,
        response_definition: Optional[str] = None,
        node_name: Optional[str] = None,
        namespace: str = "/",
        domain_id: Optional[int] = None,
        router_ip: str = "127.0.0.1",
        router_port: int = 7447,
        type_hash: Optional[str] = None,
        qos: Optional[object] = None,
        mode: str = "callback",
    ):
        """
        Create a ROS2 service server.

        Modes:
            - `mode="callback"` (default): `callback(request_msg) -> response_msg` is called and the server replies immediately.
            - `mode="queue"`: requests are queued; user calls `take_request()` then `send_response()` with the returned key.

        Attachments:
            Service requests **must** include an attachment (sequence_id + gid). The server uses this to:
            - correlate requests/responses
            - reply with an attachment that contains the same (sequence_id, gid) plus a new timestamp

        Args:
            service_name: ROS2 service name (e.g., "/add_two_ints")
            srv_type: ROS2 service type (e.g., "example_interfaces/srv/AddTwoInts")
            callback: Callback function(request_msg) -> response_msg called when request is received
            request_definition: Request message definition text (None to auto-load)
            response_definition: Response message definition text (None to auto-load)
            node_name: Node name (auto-generated if None)
            namespace: Node namespace
            domain_id: ROS domain ID (defaults to ROS_DOMAIN_ID or 0)
            router_ip: Zenoh router IP
            router_port: Zenoh router port
            type_hash: Service type hash (auto-detected if None)
            qos: QoS used for liveliness discovery tokens.
                Accepts `QosProfile`, an encoded rmw_zenoh QoS string, or `None` for default.
            mode: `callback` or `queue`.

        Raises:
            ValueError: If `srv_type` format is invalid or if mode/callback is inconsistent.
            RuntimeError: If called queue-only APIs while not in queue mode.
        """
        if mode not in ("callback", "queue"):
            raise ValueError(f"Invalid mode: {mode}. Expected 'callback' or 'queue'")
        if mode == "callback" and callback is None:
            raise ValueError("callback must be provided when mode='callback'")

        self.service_name = service_name
        self.srv_type = srv_type
        self.callback = callback
        self.mode = mode
        self.domain_id = resolve_domain_id(domain_id)
        self.namespace = namespace
        self.node_name = node_name or f"zenoh_service_server_{uuid.uuid4().hex[:8]}"
        _, self.qos = self._normalize_qos(qos, default=DEFAULT_QOS_PROFILE, fallback=DEFAULT_QOS_PROFILE.encode())

        # Get or create shared session
        self.session_mgr = ZenohSession.get_instance(router_ip, router_port)

        # Parse service type to get request and response types
        parts = srv_type.split("/")
        if len(parts) != 3:
            raise ValueError(f"Invalid service type format: {srv_type}. Expected format: namespace/srv/ServiceName")

        namespace_part, srv, service_name_part = parts
        self.request_type = f"{namespace_part}/srv/{service_name_part}_Request"
        self.response_type = f"{namespace_part}/srv/{service_name_part}_Response"

        # Register message types (will auto-load from registry if definitions are empty)
        # register_message_type will automatically detect service request/response types
        # and load the service type if needed (like publisher/subscriber do for messages)
        self.request_msg_class = self.session_mgr.register_message_type(request_definition, self.request_type)
        self.response_msg_class = self.session_mgr.register_message_type(response_definition, self.response_type)

        # Get the actual store type names (may be converted for service types)
        # rosbags converts srv/TypeName to srv/msg/TypeName
        self.request_store_type = self.session_mgr._registered_types.get(self.request_type, self.request_type)
        self.response_store_type = self.session_mgr._registered_types.get(self.response_type, self.response_type)

        # Get DDS type name (remove _Response suffix for service type)
        service_dds_type = ros2_to_dds_type(srv_type)
        if service_dds_type.endswith("_Request_"):
            service_dds_type = service_dds_type[:-9]
        elif service_dds_type.endswith("_Response_"):
            service_dds_type = service_dds_type[:-10]

        self.dds_type_name = service_dds_type

        # Get type hash if not provided
        if type_hash is None:
            # Get message definitions for hash computation
            hash_request_def = request_definition
            hash_response_def = response_definition

            if not hash_request_def or not hash_response_def:
                try:
                    registry = get_registry()

                    # get_srv_file_path returns the same .srv file for both request and response
                    # We need to read it and split by '---' to get request and response parts
                    if not hash_request_def or not hash_response_def:
                        srv_file = registry.get_srv_file_path(srv_type, is_request=True)
                        if srv_file and srv_file.exists():
                            with open(srv_file, 'r') as f:
                                srv_content = f.read()

                            # Split by '---' separator
                            # ROS2 .srv files MUST have a '---' separator between request and response
                            parts = srv_content.split('---', 1)
                            if len(parts) < 2:
                                raise ValueError(
                                    f"Invalid service definition file for {srv_type}: "
                                    "missing '---' separator between request and response. "
                                    f"File content: {repr(srv_content[:100])}"
                                )

                            if not hash_request_def:
                                hash_request_def = parts[0].strip()
                            if not hash_response_def:
                                hash_response_def = parts[1].strip()

                            # Validate that we got both parts
                            if not hash_request_def:
                                raise ValueError(
                                    f"Service definition file for {srv_type} has empty request definition"
                                )
                            if not hash_response_def:
                                raise ValueError(
                                    f"Service definition file for {srv_type} has empty response definition"
                                )
                except Exception as e:
                    # Re-raise with more context - don't silently swallow errors
                    raise RuntimeError(
                        f"Failed to load service definitions from registry for {srv_type}: {e}"
                    ) from e

            if not hash_request_def or not hash_response_def:
                raise ValueError(
                    f"Cannot compute type hash for {srv_type}: service definitions not provided. "
                    "Please provide request_definition and response_definition or ensure the service type is loaded in the registry."
                )

            # Get dependencies recursively
            dependencies = None
            try:
                registry = get_registry()
                # Load dependencies for both request and response using shared utility function
                req_deps = load_dependencies_recursive(self.request_type, hash_request_def, registry)
                resp_deps = load_dependencies_recursive(self.response_type, hash_response_def, registry)
                dependencies = {**req_deps, **resp_deps}
            except Exception as e:
                logger.debug(f"Could not load dependencies for {srv_type}: {e}")

            # For services, compute hash from the service type itself (not just request)
            # Services are represented as a type with request_message, response_message, and event_message fields
            type_hash = compute_service_type_hash(
                self.srv_type,
                request_definition=hash_request_def,
                response_definition=hash_response_def,
                dependencies=dependencies
            )

        self.type_hash = type_hash

        # Generate unique GID for this server
        self.server_gid = self.session_mgr.generate_gid()

        # Get node and entity IDs
        self.node_id = self.session_mgr.get_next_node_id()
        self.entity_id = self.session_mgr.get_next_entity_id()

        # Build keyexpr for service (used for queryable)
        self.keyexpr = topic_keyexpr(self.domain_id, service_name, self.dds_type_name, type_hash)
        logger.info(f"Service keyexpr: {self.keyexpr}")
        logger.info(f"Service type hash: {type_hash}")

        # Declare liveliness tokens
        self._declare_liveliness_tokens()

        # Create queryable for receiving requests
        queryable_ke = zenoh.KeyExpr(self.keyexpr)
        logger.info(f"Declaring queryable on keyexpr: {self.keyexpr}")

        # Zenoh Python API: declare_queryable(key_expr, handler=None, *, complete=None, allowed_origin=None)
        self.queryable = self.session_mgr.session.declare_queryable(
            queryable_ke,
            self._query_handler,
            complete=True
        )
        logger.info(f"Queryable declared successfully on: {self.keyexpr}")

        # Queue-mode state (ros-z style)
        self._lock = threading.Lock()
        self._cv = threading.Condition(self._lock)
        self._queue: deque[Tuple[ServiceRequestKey, object]] = deque()
        self._pending_queries: Dict[ServiceRequestKey, zenoh.Query] = {}

        self._closed = False

    @staticmethod
    def _normalize_qos(
        qos: Optional[object],
        *,
        default: QosProfile,
        fallback: str,
    ) -> tuple[QosProfile, str]:
        if qos is None:
            return default, fallback
        if isinstance(qos, QosProfile):
            return qos, qos.encode()
        if isinstance(qos, str):
            return QosProfile.decode(qos), qos
        return default, fallback

    def _declare_liveliness_tokens(self):
        """Declare liveliness tokens for ROS2 discovery"""
        node = NodeEntity(
            domain_id=self.domain_id,
            session_id=self.session_mgr.session_id,
            node_id=self.node_id,
            node_name=self.node_name,
            namespace=self.namespace,
        )
        ep = EndpointEntity(
            node=node,
            entity_id=self.entity_id,
            kind=EntityKind.SERVICE,
            name=self.service_name,
            dds_type_name=self.dds_type_name,
            type_hash=self.type_hash,
            qos=self.qos,
            gid=self.server_gid,
        )

        self.node_token = self.session_mgr.liveliness.declare_token(node_liveliness_keyexpr(node))
        self.service_token = self.session_mgr.liveliness.declare_token(endpoint_liveliness_keyexpr(ep))

    def _create_response_attachment(self, request_seq_num: int, request_gid: bytes) -> bytes:
        """Create rmw_zenoh attachment for service response (seq + new_ts + same_gid)."""
        timestamp_ns = int(time.time() * 1e9)
        return Attachment(sequence_id=request_seq_num, timestamp_ns=timestamp_ns, gid=request_gid).to_bytes()

    def take_request(self, timeout: Optional[float] = None) -> Tuple[ServiceRequestKey, object]:
        """
        Queue-mode API (ros-z style): block until a request is available, then return (key, request_msg).
        """
        if self.mode != "queue":
            raise RuntimeError("take_request() is only available when mode='queue'")
        with self._cv:
            if timeout is None:
                while not self._queue:
                    self._cv.wait()
            else:
                end = time.time() + timeout
                while not self._queue:
                    remaining = end - time.time()
                    if remaining <= 0:
                        raise TimeoutError("Timed out waiting for service request")
                    self._cv.wait(timeout=remaining)
            key, msg = self._queue.popleft()
            return key, msg

    def send_response(self, key: ServiceRequestKey, response_msg: object) -> None:
        """
        Queue-mode API (ros-z style): reply to a previously taken request using its correlation key.
        """
        if self.mode != "queue":
            raise RuntimeError("send_response() is only available when mode='queue'")
        with self._lock:
            query = self._pending_queries.pop(key, None)
        if query is None:
            raise KeyError(f"No pending query found for key={key}")

        response_cdr_bytes = bytes(self.session_mgr.store.serialize_cdr(response_msg, self.response_store_type))
        response_attachment = self._create_response_attachment(key.sequence_id, key.gid)
        query.reply(
            zenoh.KeyExpr(self.keyexpr),
            zenoh.ZBytes(response_cdr_bytes),
            encoding=Encoding("application/cdr"),
            attachment=zenoh.ZBytes(response_attachment),
        )

    def _query_handler(self, query: zenoh.Query):
        """Handle incoming service request query"""
        # Keep logs at debug to avoid spamming in production.
        query_key = str(query.key_expr) if hasattr(query, 'key_expr') else 'unknown'
        logger.debug(f"Service request received. Query keyexpr: {query_key}, Expected: {self.keyexpr}")
        try:
            # Verified in-container: query.payload is a ZBytes and supports to_bytes().
            payload = getattr(query, "payload", None)
            if payload is None or not hasattr(payload, "to_bytes"):
                error_msg = (
                    "Service request has unsupported payload shape. Expected query.payload with to_bytes(). "
                    f"payload_type={type(payload)}"
                )
                logger.error(error_msg)
                query.reply_err(zenoh.ZBytes(error_msg.encode()))
                return

            cdr_bytes = payload.to_bytes()
            if not cdr_bytes:
                error_msg = "Service request payload is empty"
                logger.error(error_msg)
                query.reply_err(zenoh.ZBytes(error_msg.encode()))
                return

            # Get attachment from query (required for response)
            # Following ros-z and rmw_zenoh pattern: response attachment includes
            # sequence number and GID from request, plus new timestamp
            # According to rmw_zenoh design, attachment is REQUIRED for service requests
            attachment = getattr(query, "attachment", None)
            if attachment is None or not hasattr(attachment, "to_bytes"):
                error_msg = "Service request attachment is None - attachment is required for service requests"
                logger.error(error_msg)
                query.reply_err(zenoh.ZBytes(error_msg.encode()))
                return

            # Parse attachment (strict)
            try:
                att = Attachment.from_bytes(attachment.to_bytes())
            except Exception as e:
                error_msg = f"Failed to parse service request attachment: {e}"
                logger.error(error_msg, exc_info=True)
                query.reply_err(zenoh.ZBytes(error_msg.encode()))
                return

            key = ServiceRequestKey(sequence_id=int(att.sequence_id), gid=bytes(att.gid))

            # Deserialize request (use store type name which may be converted)
            request_msg = self.session_mgr.store.deserialize_cdr(cdr_bytes, self.request_store_type)

            if self.mode == "queue":
                # Store query for later response (ros-z style).
                with self._cv:
                    if key in self._pending_queries:
                        query.reply_err(zenoh.ZBytes(b"Duplicate service request key"))
                        return

                    # Enforce queue depth via QoS (KeepAll => unbounded)
                    qos_profile = QosProfile.decode(self.qos)
                    if qos_profile.history_kind != qos_profile.history_kind.KEEP_ALL and qos_profile.history_depth > 0:
                        while len(self._queue) >= qos_profile.history_depth:
                            dropped_key, _ = self._queue.popleft()
                            self._pending_queries.pop(dropped_key, None)
                            logger.warning(
                                "Service request queue depth reached; dropping oldest request. "
                                f"service={self.service_name} depth={qos_profile.history_depth}"
                            )

                    self._pending_queries[key] = query
                    self._queue.append((key, request_msg))
                    self._cv.notify()
                return

            # callback mode (default): call user callback and reply immediately
            try:
                response_msg = self.callback(request_msg)  # type: ignore[misc]
                if response_msg is None:
                    raise RuntimeError("Service callback returned None")

                response_cdr_bytes = bytes(self.session_mgr.store.serialize_cdr(response_msg, self.response_store_type))
                response_attachment = self._create_response_attachment(key.sequence_id, key.gid)

                query.reply(
                    zenoh.KeyExpr(self.keyexpr),
                    zenoh.ZBytes(response_cdr_bytes),
                    encoding=Encoding("application/cdr"),
                    attachment=zenoh.ZBytes(response_attachment),
                )
            except Exception as e:
                logger.error(f"Error in service callback: {e}", exc_info=True)
                query.reply_err(zenoh.ZBytes(f"Service callback error: {str(e)}".encode()))

        except Exception as e:
            logger.error(f"Error handling service request: {e}", exc_info=True)
            try:
                query.reply_err(zenoh.ZBytes(f"Service handler error: {str(e)}".encode()))
            except Exception as reply_error:
                logger.debug(f"Failed to send error reply: {reply_error}")

    def close(self):
        """
        Close the service server and undeclare tokens.

        This method is idempotent - it's safe to call multiple times.
        """
        if hasattr(self, '_closed') and self._closed:
            return

        try:
            if hasattr(self, 'node_token') and self.node_token is not None:
                self.node_token.undeclare()
            if hasattr(self, 'service_token') and self.service_token is not None:
                self.service_token.undeclare()
            if hasattr(self, 'queryable') and self.queryable is not None:
                if hasattr(self.queryable, 'undeclare'):
                    self.queryable.undeclare()
                self.queryable = None
            self._closed = True
        except (AttributeError, RuntimeError) as e:
            logger.debug(f"Error during service server cleanup for service {self.service_name}: {e}")
            self._closed = True
        except Exception as e:
            logger.warning(f"Unexpected error during service server cleanup for service {self.service_name}: {e}")
            self._closed = True

take_request(timeout: Optional[float] = None) -> Tuple[ServiceRequestKey, object]

Queue-mode API (ros-z style): block until a request is available, then return (key, request_msg).

Source code in zenoh_ros2_sdk/service_server.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def take_request(self, timeout: Optional[float] = None) -> Tuple[ServiceRequestKey, object]:
    """
    Queue-mode API (ros-z style): block until a request is available, then return (key, request_msg).
    """
    if self.mode != "queue":
        raise RuntimeError("take_request() is only available when mode='queue'")
    with self._cv:
        if timeout is None:
            while not self._queue:
                self._cv.wait()
        else:
            end = time.time() + timeout
            while not self._queue:
                remaining = end - time.time()
                if remaining <= 0:
                    raise TimeoutError("Timed out waiting for service request")
                self._cv.wait(timeout=remaining)
        key, msg = self._queue.popleft()
        return key, msg

send_response(key: ServiceRequestKey, response_msg: object) -> None

Queue-mode API (ros-z style): reply to a previously taken request using its correlation key.

Source code in zenoh_ros2_sdk/service_server.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def send_response(self, key: ServiceRequestKey, response_msg: object) -> None:
    """
    Queue-mode API (ros-z style): reply to a previously taken request using its correlation key.
    """
    if self.mode != "queue":
        raise RuntimeError("send_response() is only available when mode='queue'")
    with self._lock:
        query = self._pending_queries.pop(key, None)
    if query is None:
        raise KeyError(f"No pending query found for key={key}")

    response_cdr_bytes = bytes(self.session_mgr.store.serialize_cdr(response_msg, self.response_store_type))
    response_attachment = self._create_response_attachment(key.sequence_id, key.gid)
    query.reply(
        zenoh.KeyExpr(self.keyexpr),
        zenoh.ZBytes(response_cdr_bytes),
        encoding=Encoding("application/cdr"),
        attachment=zenoh.ZBytes(response_attachment),
    )

close()

Close the service server and undeclare tokens.

This method is idempotent - it's safe to call multiple times.

Source code in zenoh_ros2_sdk/service_server.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def close(self):
    """
    Close the service server and undeclare tokens.

    This method is idempotent - it's safe to call multiple times.
    """
    if hasattr(self, '_closed') and self._closed:
        return

    try:
        if hasattr(self, 'node_token') and self.node_token is not None:
            self.node_token.undeclare()
        if hasattr(self, 'service_token') and self.service_token is not None:
            self.service_token.undeclare()
        if hasattr(self, 'queryable') and self.queryable is not None:
            if hasattr(self.queryable, 'undeclare'):
                self.queryable.undeclare()
            self.queryable = None
        self._closed = True
    except (AttributeError, RuntimeError) as e:
        logger.debug(f"Error during service server cleanup for service {self.service_name}: {e}")
        self._closed = True
    except Exception as e:
        logger.warning(f"Unexpected error during service server cleanup for service {self.service_name}: {e}")
        self._closed = True