Skip to content

service_client

ROS2ServiceClient - ROS2 Service Client using Zenoh

ROS2ServiceClient

ROS2 Service Client using Zenoh - sends requests and receives responses

Source code in zenoh_ros2_sdk/service_client.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
class ROS2ServiceClient:
    """ROS2 Service Client using Zenoh - sends requests and receives responses"""

    def __init__(
        self,
        service_name: str,
        srv_type: str,
        request_definition: Optional[str] = None,
        response_definition: Optional[str] = None,
        node_name: Optional[str] = None,
        namespace: str = "/",
        domain_id: Optional[int] = None,
        router_ip: str = "127.0.0.1",
        router_port: int = 7447,
        type_hash: Optional[str] = None,
        timeout: float = 10.0,
        qos: Optional[object] = None,
    ):
        """
        Create a ROS2 service client.

        This client sends requests using Zenoh queries and expects:
        - `Reply.ok` to be a Zenoh `Sample` with `.payload.to_bytes()`
        - `Reply.err` to be a Zenoh `ReplyError` with `.payload.to_bytes()`

        Args:
            service_name: ROS2 service name (e.g., "/add_two_ints")
            srv_type: ROS2 service type (e.g., "example_interfaces/srv/AddTwoInts")
            request_definition: Request message definition text (None to auto-load)
            response_definition: Response message definition text (None to auto-load)
            node_name: Node name (auto-generated if None)
            namespace: Node namespace
            domain_id: ROS domain ID (defaults to ROS_DOMAIN_ID or 0)
            router_ip: Zenoh router IP
            router_port: Zenoh router port
            type_hash: Service type hash (auto-detected if None)
            timeout: Timeout for service calls in seconds (default: 10.0)
            qos: QoS used for liveliness discovery tokens.
                Accepts `QosProfile`, an encoded rmw_zenoh QoS string, or `None` for default.

        Raises:
            ValueError: If `srv_type` format is invalid.
            RuntimeError: If service definitions cannot be loaded to compute the type hash.
        """
        self.service_name = service_name
        self.srv_type = srv_type
        self.domain_id = resolve_domain_id(domain_id)
        self.namespace = namespace
        self.node_name = node_name or f"zenoh_service_client_{uuid.uuid4().hex[:8]}"
        self.timeout = timeout
        _, self.qos = self._normalize_qos(qos, default=DEFAULT_QOS_PROFILE, fallback=DEFAULT_QOS_PROFILE.encode())

        # Get or create shared session
        self.session_mgr = ZenohSession.get_instance(router_ip, router_port)

        # Parse service type to get request and response types
        # Service types are like "example_interfaces/srv/AddTwoInts"
        # Request type: "example_interfaces/srv/AddTwoInts_Request"
        # Response type: "example_interfaces/srv/AddTwoInts_Response"
        parts = srv_type.split("/")
        if len(parts) != 3:
            raise ValueError(f"Invalid service type format: {srv_type}. Expected format: namespace/srv/ServiceName")

        namespace_part, srv, service_name_part = parts
        self.request_type = f"{namespace_part}/srv/{service_name_part}_Request"
        self.response_type = f"{namespace_part}/srv/{service_name_part}_Response"

        # Register message types (will auto-load from registry if definitions are empty)
        # register_message_type will automatically detect service request/response types
        # and load the service type if needed (like publisher/subscriber do for messages)
        self.request_msg_class = self.session_mgr.register_message_type(request_definition, self.request_type)
        self.response_msg_class = self.session_mgr.register_message_type(response_definition, self.response_type)

        # Get the actual store type names (may be converted for service types)
        # rosbags converts srv/TypeName to srv/msg/TypeName
        self.request_store_type = self.session_mgr._registered_types.get(self.request_type, self.request_type)
        self.response_store_type = self.session_mgr._registered_types.get(self.response_type, self.response_type)

        # Get DDS type name (remove _Request suffix for service type)
        # Service type name should be without Request_ or Response_ suffix
        service_dds_type = ros2_to_dds_type(srv_type)
        # Remove the last underscore and Request/Response suffix if present
        if service_dds_type.endswith("_Request_"):
            service_dds_type = service_dds_type[:-9]  # Remove "_Request_"
        elif service_dds_type.endswith("_Response_"):
            service_dds_type = service_dds_type[:-10]  # Remove "_Response_"

        self.dds_type_name = service_dds_type

        # Get type hash if not provided
        if type_hash is None:
            # Get message definitions for hash computation
            hash_request_def = request_definition
            hash_response_def = response_definition

            if not hash_request_def or not hash_response_def:
                try:
                    registry = get_registry()

                    # get_srv_file_path returns the same .srv file for both request and response
                    # We need to read it and split by '---' to get request and response parts
                    if not hash_request_def or not hash_response_def:
                        srv_file = registry.get_srv_file_path(srv_type, is_request=True)
                        if srv_file and srv_file.exists():
                            with open(srv_file, 'r') as f:
                                srv_content = f.read()

                            # Split by '---' separator
                            # ROS2 .srv files MUST have a '---' separator between request and response
                            parts = srv_content.split('---', 1)
                            if len(parts) < 2:
                                raise ValueError(
                                    f"Invalid service definition file for {srv_type}: "
                                    "missing '---' separator between request and response. "
                                    f"File content: {repr(srv_content[:100])}"
                                )

                            if not hash_request_def:
                                hash_request_def = parts[0].strip()
                            if not hash_response_def:
                                hash_response_def = parts[1].strip()

                            # Validate that we got both parts
                            if not hash_request_def:
                                raise ValueError(
                                    f"Service definition file for {srv_type} has empty request definition"
                                )
                            if not hash_response_def:
                                raise ValueError(
                                    f"Service definition file for {srv_type} has empty response definition"
                                )
                except Exception as e:
                    # Re-raise with more context - don't silently swallow errors
                    raise RuntimeError(
                        f"Failed to load service definitions from registry for {srv_type}: {e}"
                    ) from e

            if not hash_request_def or not hash_response_def:
                raise ValueError(
                    f"Cannot compute type hash for {srv_type}: service definitions not provided. "
                    "Please provide request_definition and response_definition or ensure the service type is loaded in the registry."
                )

            # Get dependencies recursively
            dependencies = None
            try:
                registry = get_registry()
                # Load dependencies for both request and response using shared utility function
                req_deps = load_dependencies_recursive(self.request_type, hash_request_def, registry)
                resp_deps = load_dependencies_recursive(self.response_type, hash_response_def, registry)
                dependencies = {**req_deps, **resp_deps}
            except Exception as e:
                logger.debug(f"Could not load dependencies for {srv_type}: {e}")

            # For services, compute hash from the service type itself (not just request)
            # Services are represented as a type with request_message, response_message, and event_message fields
            type_hash = compute_service_type_hash(
                self.srv_type,
                request_definition=hash_request_def,
                response_definition=hash_response_def,
                dependencies=dependencies
            )

        self.type_hash = type_hash

        # Generate unique GID for this client
        self.client_gid = self.session_mgr.generate_gid()

        # Get node and entity IDs
        self.node_id = self.session_mgr.get_next_node_id()
        self.entity_id = self.session_mgr.get_next_entity_id()

        # Build keyexpr for service (used for queries)
        # Format: domain_id/service_name/dds_type_name/type_hash
        self.keyexpr = topic_keyexpr(self.domain_id, service_name, self.dds_type_name, type_hash)

        # Declare liveliness tokens
        self._declare_liveliness_tokens()

        # Create querier for sending requests
        # Zenoh Python API: declare_querier(key_expr, *, target=None, consolidation=None, timeout=None, ...)
        querier_ke = zenoh.KeyExpr(self.keyexpr)

        self.querier = self.session_mgr.session.declare_querier(
            querier_ke,
            target=zenoh.QueryTarget.ALL_COMPLETE,
            timeout=int(self.timeout * 1000),
            consolidation=zenoh.ConsolidationMode.NONE
        )

        # Sequence tracking
        self.sequence_number = 1
        self._lock = threading.Lock()
        self._closed = False

    @staticmethod
    def _normalize_qos(
        qos: Optional[object],
        *,
        default: QosProfile,
        fallback: str,
    ) -> tuple[QosProfile, str]:
        if qos is None:
            return default, fallback
        if isinstance(qos, QosProfile):
            return qos, qos.encode()
        if isinstance(qos, str):
            return QosProfile.decode(qos), qos
        return default, fallback

    def _declare_liveliness_tokens(self):
        """Declare liveliness tokens for ROS2 discovery"""
        node = NodeEntity(
            domain_id=self.domain_id,
            session_id=self.session_mgr.session_id,
            node_id=self.node_id,
            node_name=self.node_name,
            namespace=self.namespace,
        )
        ep = EndpointEntity(
            node=node,
            entity_id=self.entity_id,
            kind=EntityKind.CLIENT,
            name=self.service_name,
            dds_type_name=self.dds_type_name,
            type_hash=self.type_hash,
            qos=self.qos,
            gid=self.client_gid,
        )

        self.node_token = self.session_mgr.liveliness.declare_token(node_liveliness_keyexpr(node))
        self.client_token = self.session_mgr.liveliness.declare_token(endpoint_liveliness_keyexpr(ep))

    def _create_attachment(self, seq_num: int, timestamp_ns: int) -> bytes:
        """Create rmw_zenoh attachment for service request"""
        return Attachment(sequence_id=seq_num, timestamp_ns=timestamp_ns, gid=self.client_gid).to_bytes()

    def call(self, **kwargs: Any) -> Optional[object]:
        """
        Call the service synchronously

        Args:
            **kwargs: Request field values

        Returns:
            Response message object or None if timeout/error
        """
        # Create request message instance
        request_msg = self.request_msg_class(**kwargs)

        # Serialize to CDR (use store type name which may be converted)
        cdr_bytes = bytes(self.session_mgr.store.serialize_cdr(request_msg, self.request_store_type))

        # Create attachment
        timestamp_ns = int(time.time() * 1e9)
        sequence_id = self.sequence_number
        self.sequence_number += 1
        attachment = self._create_attachment(sequence_id, timestamp_ns)

        # Response event for synchronization
        response_event = threading.Event()
        response_data = {"response": None, "error": None}

        def reply_callback(reply: zenoh.Reply):
            """Callback for receiving service response"""
            try:
                # Verified in-container:
                # - reply.ok is a builtins.Sample, with ok.payload: ZBytes
                # - reply.err is a builtins.ReplyError, with err.payload: ZBytes
                if reply.err is not None:
                    err = reply.err
                    payload = getattr(err, "payload", None)
                    if payload is None or not hasattr(payload, "to_bytes"):
                        raise TypeError(
                            "Unexpected ReplyError shape. Expected err.payload with to_bytes(). "
                            f"err_type={type(err)} payload_type={type(payload)}"
                        )
                    error_msg = payload.to_bytes().decode("utf-8", errors="ignore")
                    logger.error(f"Service call failed: {error_msg}")
                    response_data["error"] = error_msg
                    response_event.set()
                    return

                if reply.ok is not None:
                    ok = reply.ok
                    payload = getattr(ok, "payload", None)
                    if payload is None or not hasattr(payload, "to_bytes"):
                        raise TypeError(
                            "Unexpected Reply ok shape. Expected ok.payload with to_bytes(). "
                            f"ok_type={type(ok)} payload_type={type(payload)}"
                        )
                    cdr_bytes = payload.to_bytes()

                    # Deserialize response (use store type name which may be converted)
                    response_msg = self.session_mgr.store.deserialize_cdr(cdr_bytes, self.response_store_type)
                    response_data["response"] = response_msg
                    response_event.set()
                else:
                    logger.error("Reply has neither ok nor err")
                    response_data["error"] = "Invalid reply format"
                    response_event.set()
            except Exception as e:
                logger.error(f"Error processing service response: {e}", exc_info=True)
                response_data["error"] = str(e)
                response_event.set()

        # Send request using querier
        # Zenoh Python API: get(handler=None, *, parameters=None, payload=None, encoding=None, attachment=None, ...)
        self.querier.get(
            reply_callback,
            parameters="",
            payload=zenoh.ZBytes(cdr_bytes),
            encoding=Encoding("application/cdr"),
            attachment=zenoh.ZBytes(attachment)
        )

        # querier.get() doesn't return a result code in Python API - it's fire-and-forget
        # Errors will be reported in the reply_callback

        # Wait for response with timeout
        if response_event.wait(timeout=self.timeout):
            if response_data["error"]:
                logger.error(f"Service call error: {response_data['error']}")
                return None
            return response_data["response"]
        else:
            logger.warning(f"Service call timed out after {self.timeout} seconds")
            return None

    def call_async(self, callback: Callable, **kwargs: Any) -> None:
        """
        Call the service asynchronously

        Args:
            callback: Callback function(response_msg) called when response is received
            **kwargs: Request field values
        """
        # Create request message instance
        request_msg = self.request_msg_class(**kwargs)

        # Serialize to CDR (use store type name which may be converted)
        cdr_bytes = bytes(self.session_mgr.store.serialize_cdr(request_msg, self.request_store_type))

        # Create attachment
        timestamp_ns = int(time.time() * 1e9)
        sequence_id = self.sequence_number
        self.sequence_number += 1
        attachment = self._create_attachment(sequence_id, timestamp_ns)

        def reply_callback(reply: zenoh.Reply):
            """Callback for receiving service response"""
            try:
                if reply.err is not None:
                    err = reply.err
                    payload = getattr(err, "payload", None)
                    if payload is None or not hasattr(payload, "to_bytes"):
                        raise TypeError(
                            "Unexpected ReplyError shape. Expected err.payload with to_bytes(). "
                            f"err_type={type(err)} payload_type={type(payload)}"
                        )
                    error_msg = payload.to_bytes().decode("utf-8", errors="ignore")
                    logger.error(f"Async service call failed: {error_msg}")
                    callback(None)
                    return

                if reply.ok is not None:
                    ok = reply.ok
                    payload = getattr(ok, "payload", None)
                    if payload is None or not hasattr(payload, "to_bytes"):
                        raise TypeError(
                            "Unexpected Reply ok shape. Expected ok.payload with to_bytes(). "
                            f"ok_type={type(ok)} payload_type={type(payload)}"
                        )
                    cdr_bytes = payload.to_bytes()

                    # Deserialize response (use store type name which may be converted)
                    response_msg = self.session_mgr.store.deserialize_cdr(cdr_bytes, self.response_store_type)
                    callback(response_msg)
                else:
                    logger.error("Reply has neither ok nor err")
                    callback(None)
            except Exception as e:
                logger.error(f"Error processing async service response: {e}", exc_info=True)
                callback(None)

        # Send request using querier
        # Zenoh Python API: get(handler=None, *, parameters=None, payload=None, encoding=None, attachment=None, ...)
        # querier.get() doesn't return a result code in Python API - it's fire-and-forget
        # Errors will be reported in the reply_callback
        self.querier.get(
            reply_callback,
            parameters="",
            payload=zenoh.ZBytes(cdr_bytes),
            encoding=Encoding("application/cdr"),
            attachment=zenoh.ZBytes(attachment)
        )

    def close(self):
        """
        Close the service client and undeclare tokens.

        This method is idempotent - it's safe to call multiple times.
        """
        if hasattr(self, '_closed') and self._closed:
            return

        try:
            if hasattr(self, 'node_token') and self.node_token is not None:
                self.node_token.undeclare()
            if hasattr(self, 'client_token') and self.client_token is not None:
                self.client_token.undeclare()
            if hasattr(self, 'querier') and self.querier is not None:
                if hasattr(self.querier, 'undeclare'):
                    self.querier.undeclare()
                self.querier = None
            self._closed = True
        except (AttributeError, RuntimeError) as e:
            logger.debug(f"Error during service client cleanup for service {self.service_name}: {e}")
            self._closed = True
        except Exception as e:
            logger.warning(f"Unexpected error during service client cleanup for service {self.service_name}: {e}")
            self._closed = True

call(**kwargs: Any) -> Optional[object]

Call the service synchronously

Parameters:

Name Type Description Default
**kwargs Any

Request field values

{}

Returns:

Type Description
Optional[object]

Response message object or None if timeout/error

Source code in zenoh_ros2_sdk/service_client.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def call(self, **kwargs: Any) -> Optional[object]:
    """
    Call the service synchronously

    Args:
        **kwargs: Request field values

    Returns:
        Response message object or None if timeout/error
    """
    # Create request message instance
    request_msg = self.request_msg_class(**kwargs)

    # Serialize to CDR (use store type name which may be converted)
    cdr_bytes = bytes(self.session_mgr.store.serialize_cdr(request_msg, self.request_store_type))

    # Create attachment
    timestamp_ns = int(time.time() * 1e9)
    sequence_id = self.sequence_number
    self.sequence_number += 1
    attachment = self._create_attachment(sequence_id, timestamp_ns)

    # Response event for synchronization
    response_event = threading.Event()
    response_data = {"response": None, "error": None}

    def reply_callback(reply: zenoh.Reply):
        """Callback for receiving service response"""
        try:
            # Verified in-container:
            # - reply.ok is a builtins.Sample, with ok.payload: ZBytes
            # - reply.err is a builtins.ReplyError, with err.payload: ZBytes
            if reply.err is not None:
                err = reply.err
                payload = getattr(err, "payload", None)
                if payload is None or not hasattr(payload, "to_bytes"):
                    raise TypeError(
                        "Unexpected ReplyError shape. Expected err.payload with to_bytes(). "
                        f"err_type={type(err)} payload_type={type(payload)}"
                    )
                error_msg = payload.to_bytes().decode("utf-8", errors="ignore")
                logger.error(f"Service call failed: {error_msg}")
                response_data["error"] = error_msg
                response_event.set()
                return

            if reply.ok is not None:
                ok = reply.ok
                payload = getattr(ok, "payload", None)
                if payload is None or not hasattr(payload, "to_bytes"):
                    raise TypeError(
                        "Unexpected Reply ok shape. Expected ok.payload with to_bytes(). "
                        f"ok_type={type(ok)} payload_type={type(payload)}"
                    )
                cdr_bytes = payload.to_bytes()

                # Deserialize response (use store type name which may be converted)
                response_msg = self.session_mgr.store.deserialize_cdr(cdr_bytes, self.response_store_type)
                response_data["response"] = response_msg
                response_event.set()
            else:
                logger.error("Reply has neither ok nor err")
                response_data["error"] = "Invalid reply format"
                response_event.set()
        except Exception as e:
            logger.error(f"Error processing service response: {e}", exc_info=True)
            response_data["error"] = str(e)
            response_event.set()

    # Send request using querier
    # Zenoh Python API: get(handler=None, *, parameters=None, payload=None, encoding=None, attachment=None, ...)
    self.querier.get(
        reply_callback,
        parameters="",
        payload=zenoh.ZBytes(cdr_bytes),
        encoding=Encoding("application/cdr"),
        attachment=zenoh.ZBytes(attachment)
    )

    # querier.get() doesn't return a result code in Python API - it's fire-and-forget
    # Errors will be reported in the reply_callback

    # Wait for response with timeout
    if response_event.wait(timeout=self.timeout):
        if response_data["error"]:
            logger.error(f"Service call error: {response_data['error']}")
            return None
        return response_data["response"]
    else:
        logger.warning(f"Service call timed out after {self.timeout} seconds")
        return None

call_async(callback: Callable, **kwargs: Any) -> None

Call the service asynchronously

Parameters:

Name Type Description Default
callback Callable

Callback function(response_msg) called when response is received

required
**kwargs Any

Request field values

{}
Source code in zenoh_ros2_sdk/service_client.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def call_async(self, callback: Callable, **kwargs: Any) -> None:
    """
    Call the service asynchronously

    Args:
        callback: Callback function(response_msg) called when response is received
        **kwargs: Request field values
    """
    # Create request message instance
    request_msg = self.request_msg_class(**kwargs)

    # Serialize to CDR (use store type name which may be converted)
    cdr_bytes = bytes(self.session_mgr.store.serialize_cdr(request_msg, self.request_store_type))

    # Create attachment
    timestamp_ns = int(time.time() * 1e9)
    sequence_id = self.sequence_number
    self.sequence_number += 1
    attachment = self._create_attachment(sequence_id, timestamp_ns)

    def reply_callback(reply: zenoh.Reply):
        """Callback for receiving service response"""
        try:
            if reply.err is not None:
                err = reply.err
                payload = getattr(err, "payload", None)
                if payload is None or not hasattr(payload, "to_bytes"):
                    raise TypeError(
                        "Unexpected ReplyError shape. Expected err.payload with to_bytes(). "
                        f"err_type={type(err)} payload_type={type(payload)}"
                    )
                error_msg = payload.to_bytes().decode("utf-8", errors="ignore")
                logger.error(f"Async service call failed: {error_msg}")
                callback(None)
                return

            if reply.ok is not None:
                ok = reply.ok
                payload = getattr(ok, "payload", None)
                if payload is None or not hasattr(payload, "to_bytes"):
                    raise TypeError(
                        "Unexpected Reply ok shape. Expected ok.payload with to_bytes(). "
                        f"ok_type={type(ok)} payload_type={type(payload)}"
                    )
                cdr_bytes = payload.to_bytes()

                # Deserialize response (use store type name which may be converted)
                response_msg = self.session_mgr.store.deserialize_cdr(cdr_bytes, self.response_store_type)
                callback(response_msg)
            else:
                logger.error("Reply has neither ok nor err")
                callback(None)
        except Exception as e:
            logger.error(f"Error processing async service response: {e}", exc_info=True)
            callback(None)

    # Send request using querier
    # Zenoh Python API: get(handler=None, *, parameters=None, payload=None, encoding=None, attachment=None, ...)
    # querier.get() doesn't return a result code in Python API - it's fire-and-forget
    # Errors will be reported in the reply_callback
    self.querier.get(
        reply_callback,
        parameters="",
        payload=zenoh.ZBytes(cdr_bytes),
        encoding=Encoding("application/cdr"),
        attachment=zenoh.ZBytes(attachment)
    )

close()

Close the service client and undeclare tokens.

This method is idempotent - it's safe to call multiple times.

Source code in zenoh_ros2_sdk/service_client.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def close(self):
    """
    Close the service client and undeclare tokens.

    This method is idempotent - it's safe to call multiple times.
    """
    if hasattr(self, '_closed') and self._closed:
        return

    try:
        if hasattr(self, 'node_token') and self.node_token is not None:
            self.node_token.undeclare()
        if hasattr(self, 'client_token') and self.client_token is not None:
            self.client_token.undeclare()
        if hasattr(self, 'querier') and self.querier is not None:
            if hasattr(self.querier, 'undeclare'):
                self.querier.undeclare()
            self.querier = None
        self._closed = True
    except (AttributeError, RuntimeError) as e:
        logger.debug(f"Error during service client cleanup for service {self.service_name}: {e}")
        self._closed = True
    except Exception as e:
        logger.warning(f"Unexpected error during service client cleanup for service {self.service_name}: {e}")
        self._closed = True