24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445 | class ROS2ServiceClient:
"""ROS2 Service Client using Zenoh - sends requests and receives responses"""
def __init__(
self,
service_name: str,
srv_type: str,
request_definition: Optional[str] = None,
response_definition: Optional[str] = None,
node_name: Optional[str] = None,
namespace: str = "/",
domain_id: Optional[int] = None,
router_ip: str = "127.0.0.1",
router_port: int = 7447,
type_hash: Optional[str] = None,
timeout: float = 10.0,
qos: Optional[object] = None,
):
"""
Create a ROS2 service client.
This client sends requests using Zenoh queries and expects:
- `Reply.ok` to be a Zenoh `Sample` with `.payload.to_bytes()`
- `Reply.err` to be a Zenoh `ReplyError` with `.payload.to_bytes()`
Args:
service_name: ROS2 service name (e.g., "/add_two_ints")
srv_type: ROS2 service type (e.g., "example_interfaces/srv/AddTwoInts")
request_definition: Request message definition text (None to auto-load)
response_definition: Response message definition text (None to auto-load)
node_name: Node name (auto-generated if None)
namespace: Node namespace
domain_id: ROS domain ID (defaults to ROS_DOMAIN_ID or 0)
router_ip: Zenoh router IP
router_port: Zenoh router port
type_hash: Service type hash (auto-detected if None)
timeout: Timeout for service calls in seconds (default: 10.0)
qos: QoS used for liveliness discovery tokens.
Accepts `QosProfile`, an encoded rmw_zenoh QoS string, or `None` for default.
Raises:
ValueError: If `srv_type` format is invalid.
RuntimeError: If service definitions cannot be loaded to compute the type hash.
"""
self.service_name = service_name
self.srv_type = srv_type
self.domain_id = resolve_domain_id(domain_id)
self.namespace = namespace
self.node_name = node_name or f"zenoh_service_client_{uuid.uuid4().hex[:8]}"
self.timeout = timeout
_, self.qos = self._normalize_qos(qos, default=DEFAULT_QOS_PROFILE, fallback=DEFAULT_QOS_PROFILE.encode())
# Get or create shared session
self.session_mgr = ZenohSession.get_instance(router_ip, router_port)
# Parse service type to get request and response types
# Service types are like "example_interfaces/srv/AddTwoInts"
# Request type: "example_interfaces/srv/AddTwoInts_Request"
# Response type: "example_interfaces/srv/AddTwoInts_Response"
parts = srv_type.split("/")
if len(parts) != 3:
raise ValueError(f"Invalid service type format: {srv_type}. Expected format: namespace/srv/ServiceName")
namespace_part, srv, service_name_part = parts
self.request_type = f"{namespace_part}/srv/{service_name_part}_Request"
self.response_type = f"{namespace_part}/srv/{service_name_part}_Response"
# Register message types (will auto-load from registry if definitions are empty)
# register_message_type will automatically detect service request/response types
# and load the service type if needed (like publisher/subscriber do for messages)
self.request_msg_class = self.session_mgr.register_message_type(request_definition, self.request_type)
self.response_msg_class = self.session_mgr.register_message_type(response_definition, self.response_type)
# Get the actual store type names (may be converted for service types)
# rosbags converts srv/TypeName to srv/msg/TypeName
self.request_store_type = self.session_mgr._registered_types.get(self.request_type, self.request_type)
self.response_store_type = self.session_mgr._registered_types.get(self.response_type, self.response_type)
# Get DDS type name (remove _Request suffix for service type)
# Service type name should be without Request_ or Response_ suffix
service_dds_type = ros2_to_dds_type(srv_type)
# Remove the last underscore and Request/Response suffix if present
if service_dds_type.endswith("_Request_"):
service_dds_type = service_dds_type[:-9] # Remove "_Request_"
elif service_dds_type.endswith("_Response_"):
service_dds_type = service_dds_type[:-10] # Remove "_Response_"
self.dds_type_name = service_dds_type
# Get type hash if not provided
if type_hash is None:
# Get message definitions for hash computation
hash_request_def = request_definition
hash_response_def = response_definition
if not hash_request_def or not hash_response_def:
try:
registry = get_registry()
# get_srv_file_path returns the same .srv file for both request and response
# We need to read it and split by '---' to get request and response parts
if not hash_request_def or not hash_response_def:
srv_file = registry.get_srv_file_path(srv_type, is_request=True)
if srv_file and srv_file.exists():
with open(srv_file, 'r') as f:
srv_content = f.read()
# Split by '---' separator
# ROS2 .srv files MUST have a '---' separator between request and response
parts = srv_content.split('---', 1)
if len(parts) < 2:
raise ValueError(
f"Invalid service definition file for {srv_type}: "
"missing '---' separator between request and response. "
f"File content: {repr(srv_content[:100])}"
)
if not hash_request_def:
hash_request_def = parts[0].strip()
if not hash_response_def:
hash_response_def = parts[1].strip()
# Validate that we got both parts
if not hash_request_def:
raise ValueError(
f"Service definition file for {srv_type} has empty request definition"
)
if not hash_response_def:
raise ValueError(
f"Service definition file for {srv_type} has empty response definition"
)
except Exception as e:
# Re-raise with more context - don't silently swallow errors
raise RuntimeError(
f"Failed to load service definitions from registry for {srv_type}: {e}"
) from e
if not hash_request_def or not hash_response_def:
raise ValueError(
f"Cannot compute type hash for {srv_type}: service definitions not provided. "
"Please provide request_definition and response_definition or ensure the service type is loaded in the registry."
)
# Get dependencies recursively
dependencies = None
try:
registry = get_registry()
# Load dependencies for both request and response using shared utility function
req_deps = load_dependencies_recursive(self.request_type, hash_request_def, registry)
resp_deps = load_dependencies_recursive(self.response_type, hash_response_def, registry)
dependencies = {**req_deps, **resp_deps}
except Exception as e:
logger.debug(f"Could not load dependencies for {srv_type}: {e}")
# For services, compute hash from the service type itself (not just request)
# Services are represented as a type with request_message, response_message, and event_message fields
type_hash = compute_service_type_hash(
self.srv_type,
request_definition=hash_request_def,
response_definition=hash_response_def,
dependencies=dependencies
)
self.type_hash = type_hash
# Generate unique GID for this client
self.client_gid = self.session_mgr.generate_gid()
# Get node and entity IDs
self.node_id = self.session_mgr.get_next_node_id()
self.entity_id = self.session_mgr.get_next_entity_id()
# Build keyexpr for service (used for queries)
# Format: domain_id/service_name/dds_type_name/type_hash
self.keyexpr = topic_keyexpr(self.domain_id, service_name, self.dds_type_name, type_hash)
# Declare liveliness tokens
self._declare_liveliness_tokens()
# Create querier for sending requests
# Zenoh Python API: declare_querier(key_expr, *, target=None, consolidation=None, timeout=None, ...)
querier_ke = zenoh.KeyExpr(self.keyexpr)
self.querier = self.session_mgr.session.declare_querier(
querier_ke,
target=zenoh.QueryTarget.ALL_COMPLETE,
timeout=int(self.timeout * 1000),
consolidation=zenoh.ConsolidationMode.NONE
)
# Sequence tracking
self.sequence_number = 1
self._lock = threading.Lock()
self._closed = False
@staticmethod
def _normalize_qos(
qos: Optional[object],
*,
default: QosProfile,
fallback: str,
) -> tuple[QosProfile, str]:
if qos is None:
return default, fallback
if isinstance(qos, QosProfile):
return qos, qos.encode()
if isinstance(qos, str):
return QosProfile.decode(qos), qos
return default, fallback
def _declare_liveliness_tokens(self):
"""Declare liveliness tokens for ROS2 discovery"""
node = NodeEntity(
domain_id=self.domain_id,
session_id=self.session_mgr.session_id,
node_id=self.node_id,
node_name=self.node_name,
namespace=self.namespace,
)
ep = EndpointEntity(
node=node,
entity_id=self.entity_id,
kind=EntityKind.CLIENT,
name=self.service_name,
dds_type_name=self.dds_type_name,
type_hash=self.type_hash,
qos=self.qos,
gid=self.client_gid,
)
self.node_token = self.session_mgr.liveliness.declare_token(node_liveliness_keyexpr(node))
self.client_token = self.session_mgr.liveliness.declare_token(endpoint_liveliness_keyexpr(ep))
def _create_attachment(self, seq_num: int, timestamp_ns: int) -> bytes:
"""Create rmw_zenoh attachment for service request"""
return Attachment(sequence_id=seq_num, timestamp_ns=timestamp_ns, gid=self.client_gid).to_bytes()
def call(self, **kwargs: Any) -> Optional[object]:
"""
Call the service synchronously
Args:
**kwargs: Request field values
Returns:
Response message object or None if timeout/error
"""
# Create request message instance
request_msg = self.request_msg_class(**kwargs)
# Serialize to CDR (use store type name which may be converted)
cdr_bytes = bytes(self.session_mgr.store.serialize_cdr(request_msg, self.request_store_type))
# Create attachment
timestamp_ns = int(time.time() * 1e9)
sequence_id = self.sequence_number
self.sequence_number += 1
attachment = self._create_attachment(sequence_id, timestamp_ns)
# Response event for synchronization
response_event = threading.Event()
response_data = {"response": None, "error": None}
def reply_callback(reply: zenoh.Reply):
"""Callback for receiving service response"""
try:
# Verified in-container:
# - reply.ok is a builtins.Sample, with ok.payload: ZBytes
# - reply.err is a builtins.ReplyError, with err.payload: ZBytes
if reply.err is not None:
err = reply.err
payload = getattr(err, "payload", None)
if payload is None or not hasattr(payload, "to_bytes"):
raise TypeError(
"Unexpected ReplyError shape. Expected err.payload with to_bytes(). "
f"err_type={type(err)} payload_type={type(payload)}"
)
error_msg = payload.to_bytes().decode("utf-8", errors="ignore")
logger.error(f"Service call failed: {error_msg}")
response_data["error"] = error_msg
response_event.set()
return
if reply.ok is not None:
ok = reply.ok
payload = getattr(ok, "payload", None)
if payload is None or not hasattr(payload, "to_bytes"):
raise TypeError(
"Unexpected Reply ok shape. Expected ok.payload with to_bytes(). "
f"ok_type={type(ok)} payload_type={type(payload)}"
)
cdr_bytes = payload.to_bytes()
# Deserialize response (use store type name which may be converted)
response_msg = self.session_mgr.store.deserialize_cdr(cdr_bytes, self.response_store_type)
response_data["response"] = response_msg
response_event.set()
else:
logger.error("Reply has neither ok nor err")
response_data["error"] = "Invalid reply format"
response_event.set()
except Exception as e:
logger.error(f"Error processing service response: {e}", exc_info=True)
response_data["error"] = str(e)
response_event.set()
# Send request using querier
# Zenoh Python API: get(handler=None, *, parameters=None, payload=None, encoding=None, attachment=None, ...)
self.querier.get(
reply_callback,
parameters="",
payload=zenoh.ZBytes(cdr_bytes),
encoding=Encoding("application/cdr"),
attachment=zenoh.ZBytes(attachment)
)
# querier.get() doesn't return a result code in Python API - it's fire-and-forget
# Errors will be reported in the reply_callback
# Wait for response with timeout
if response_event.wait(timeout=self.timeout):
if response_data["error"]:
logger.error(f"Service call error: {response_data['error']}")
return None
return response_data["response"]
else:
logger.warning(f"Service call timed out after {self.timeout} seconds")
return None
def call_async(self, callback: Callable, **kwargs: Any) -> None:
"""
Call the service asynchronously
Args:
callback: Callback function(response_msg) called when response is received
**kwargs: Request field values
"""
# Create request message instance
request_msg = self.request_msg_class(**kwargs)
# Serialize to CDR (use store type name which may be converted)
cdr_bytes = bytes(self.session_mgr.store.serialize_cdr(request_msg, self.request_store_type))
# Create attachment
timestamp_ns = int(time.time() * 1e9)
sequence_id = self.sequence_number
self.sequence_number += 1
attachment = self._create_attachment(sequence_id, timestamp_ns)
def reply_callback(reply: zenoh.Reply):
"""Callback for receiving service response"""
try:
if reply.err is not None:
err = reply.err
payload = getattr(err, "payload", None)
if payload is None or not hasattr(payload, "to_bytes"):
raise TypeError(
"Unexpected ReplyError shape. Expected err.payload with to_bytes(). "
f"err_type={type(err)} payload_type={type(payload)}"
)
error_msg = payload.to_bytes().decode("utf-8", errors="ignore")
logger.error(f"Async service call failed: {error_msg}")
callback(None)
return
if reply.ok is not None:
ok = reply.ok
payload = getattr(ok, "payload", None)
if payload is None or not hasattr(payload, "to_bytes"):
raise TypeError(
"Unexpected Reply ok shape. Expected ok.payload with to_bytes(). "
f"ok_type={type(ok)} payload_type={type(payload)}"
)
cdr_bytes = payload.to_bytes()
# Deserialize response (use store type name which may be converted)
response_msg = self.session_mgr.store.deserialize_cdr(cdr_bytes, self.response_store_type)
callback(response_msg)
else:
logger.error("Reply has neither ok nor err")
callback(None)
except Exception as e:
logger.error(f"Error processing async service response: {e}", exc_info=True)
callback(None)
# Send request using querier
# Zenoh Python API: get(handler=None, *, parameters=None, payload=None, encoding=None, attachment=None, ...)
# querier.get() doesn't return a result code in Python API - it's fire-and-forget
# Errors will be reported in the reply_callback
self.querier.get(
reply_callback,
parameters="",
payload=zenoh.ZBytes(cdr_bytes),
encoding=Encoding("application/cdr"),
attachment=zenoh.ZBytes(attachment)
)
def close(self):
"""
Close the service client and undeclare tokens.
This method is idempotent - it's safe to call multiple times.
"""
if hasattr(self, '_closed') and self._closed:
return
try:
if hasattr(self, 'node_token') and self.node_token is not None:
self.node_token.undeclare()
if hasattr(self, 'client_token') and self.client_token is not None:
self.client_token.undeclare()
if hasattr(self, 'querier') and self.querier is not None:
if hasattr(self.querier, 'undeclare'):
self.querier.undeclare()
self.querier = None
self._closed = True
except (AttributeError, RuntimeError) as e:
logger.debug(f"Error during service client cleanup for service {self.service_name}: {e}")
self._closed = True
except Exception as e:
logger.warning(f"Unexpected error during service client cleanup for service {self.service_name}: {e}")
self._closed = True
|