Skip to content

subscriber

ROS2Subscriber - ROS2 Subscriber using Zenoh

ROS2Subscriber

ROS2 Subscriber using Zenoh

Source code in zenoh_ros2_sdk/subscriber.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
class ROS2Subscriber:
    """ROS2 Subscriber using Zenoh"""

    def __init__(
        self,
        topic: str,
        msg_type: str,
        callback: Callable,
        msg_definition: Optional[str] = None,
        node_name: Optional[str] = None,
        namespace: str = "/",
        domain_id: Optional[int] = None,
        router_ip: str = "127.0.0.1",
        router_port: int = 7447,
        type_hash: Optional[str] = None,
        qos: Optional[object] = None,
    ):
        """
        Create a ROS2 subscriber

        Args:
            topic: ROS2 topic name (e.g. `/chatter`).
            msg_type: ROS2 message type (e.g. `std_msgs/msg/String`).
            callback: Callback function(msg) called when message is received
            msg_definition: Message definition text (None to auto-load from registry)
            node_name: Node name (auto-generated if None)
            namespace: Node namespace
            domain_id: ROS domain ID (defaults to ROS_DOMAIN_ID or 0)
            router_ip: Zenoh router IP
            router_port: Zenoh router port
            type_hash: Message type hash (auto-detected if None)
            qos: QoS used for liveliness discovery tokens.
                Accepts `QosProfile`, an encoded rmw_zenoh QoS string, or `None` for default.

        Raises:
            ValueError: If the type hash cannot be computed because message definitions are missing.
            TypeError: If Zenoh delivers an unexpected payload type (see `_listener`).
        """
        self.topic = topic
        self.msg_type = msg_type
        self.callback = callback
        self.domain_id = resolve_domain_id(domain_id)
        self.namespace = namespace
        self.node_name = node_name or f"zenoh_subscriber_{uuid.uuid4().hex[:8]}"
        # QoS is only used for liveliness discovery tokens. It does not affect the
        # data keyexpr subscription (which is topic/type-hash based).
        _, self.qos = self._normalize_qos(qos, default=DEFAULT_QOS_PROFILE, fallback=DEFAULT_QOS_PROFILE.encode())

        # Get or create shared session
        self.session_mgr = ZenohSession.get_instance(router_ip, router_port)

        # Register message type
        self.session_mgr.register_message_type(msg_definition, msg_type)

        # Get DDS type name
        self.dds_type_name = ros2_to_dds_type(msg_type)

        # Get type hash if not provided
        if type_hash is None:
            # Get message definition for hash computation
            # If msg_definition is None, load from registry (same as register_message_type does)
            # Empty string ("") is valid for messages with no fields (like std_msgs/msg/Empty)
            hash_msg_definition = msg_definition
            if hash_msg_definition is None:
                # Load from registry (same logic as register_message_type)
                try:
                    registry = get_registry()
                    msg_file = registry.get_msg_file_path(msg_type)
                    if msg_file and msg_file.exists():
                        with open(msg_file, 'r') as f:
                            hash_msg_definition = f.read()
                except Exception as e:
                    # Registry not available or file not found - will raise ValueError below
                    logger.debug(f"Could not load message definition from registry for {msg_type}: {e}")
                    pass

            # If still None after trying to load, raise error
            if hash_msg_definition is None:
                raise ValueError(
                    f"Cannot compute type hash for {msg_type}: message definition not provided. "
                    "Please provide msg_definition or ensure the message type is loaded in the registry."
                )

            # Get dependencies from message registry if available (recursively)
            dependencies = None
            try:
                registry = get_registry()
                # Load all dependencies recursively using shared utility function
                dependencies = load_dependencies_recursive(msg_type, hash_msg_definition, registry)
            except Exception as e:
                # If dependency loading fails, continue without dependencies
                # Type hash computation will still work, just without nested type info
                logger.debug(f"Could not load dependencies for {msg_type}: {e}")
                pass

            type_hash = get_type_hash(msg_type, msg_definition=hash_msg_definition, dependencies=dependencies)
        self.type_hash = type_hash

        # Generate IDs used in ROS2 liveliness discovery (mirrors publisher/service patterns)
        self.subscriber_gid = self.session_mgr.generate_gid()
        self.node_id = self.session_mgr.get_next_node_id()
        self.entity_id = self.session_mgr.get_next_entity_id()

        # Build keyexpr
        self.keyexpr = topic_keyexpr(self.domain_id, topic, self.dds_type_name, type_hash)

        # Declare liveliness tokens so ROS2 nodes that publish-on-subscribe (e.g., image_transport)
        # will see this subscriber.
        self._declare_liveliness_tokens()

        # Create subscriber
        self.sub = self.session_mgr.session.declare_subscriber(self.keyexpr, self._listener)
        self._closed = False

        # For transient_local durability, query for cached/historical data
        # This mimics rmw_zenoh's AdvancedSubscriber behavior
        self.qos_profile, _ = self._normalize_qos(qos, default=DEFAULT_QOS_PROFILE, fallback=DEFAULT_QOS_PROFILE.encode())
        if self.qos_profile.durability == QosDurability.TRANSIENT_LOCAL:
            self._query_historical_data()

    @staticmethod
    def _normalize_qos(
        qos: Optional[object],
        *,
        default: QosProfile,
        fallback: str,
    ) -> tuple[QosProfile, str]:
        """
        Subscriber only needs the encoded QoS string for tokens, but we normalize
        similarly to publisher for API consistency.
        """
        if qos is None:
            return default, fallback
        if isinstance(qos, QosProfile):
            return qos, qos.encode()
        if isinstance(qos, str):
            # User provided an authoritative encoded QoS string. It must be parseable.
            return QosProfile.decode(qos), qos
        return default, fallback

    def _declare_liveliness_tokens(self):
        """Declare liveliness tokens for ROS2 discovery"""
        node = NodeEntity(
            domain_id=self.domain_id,
            session_id=self.session_mgr.session_id,
            node_id=self.node_id,
            node_name=self.node_name,
            namespace=self.namespace,
        )
        ep = EndpointEntity(
            node=node,
            entity_id=self.entity_id,
            kind=EntityKind.SUBSCRIPTION,
            name=self.topic,
            dds_type_name=self.dds_type_name,
            type_hash=self.type_hash,
            qos=self.qos,
            gid=self.subscriber_gid,
        )

        self.node_token = self.session_mgr.liveliness.declare_token(node_liveliness_keyexpr(node))
        self.subscriber_token = self.session_mgr.liveliness.declare_token(endpoint_liveliness_keyexpr(ep))

    def _listener(self, sample):
        """Internal message listener"""
        self._process_sample(sample)

    def _process_sample(self, sample):
        """Process a Zenoh sample (from subscription or query reply)"""
        try:
            payload = getattr(sample, "payload", None)
            if payload is None:
                raise ValueError("Zenoh sample has no payload")

            # Verified in-container: sample.payload is a ZBytes and supports to_bytes().
            # Prefer to_bytes() because it is explicit and version-stable.
            if hasattr(payload, "to_bytes"):
                cdr_bytes = payload.to_bytes()
                if not isinstance(cdr_bytes, (bytes, bytearray)):
                    raise TypeError(
                        "Zenoh payload.to_bytes() returned a non-bytes value: "
                        f"type={type(cdr_bytes)}"
                    )
                cdr_bytes = bytes(cdr_bytes)
            elif isinstance(payload, (bytes, bytearray)):
                cdr_bytes = bytes(payload)
            elif isinstance(payload, memoryview):
                cdr_bytes = payload.tobytes()
            else:
                raise TypeError(
                    "Unsupported Zenoh payload type. Expected an object with to_bytes(), "
                    f"or bytes-like payload. Got type={type(payload)}. "
                    f"Has to_bytes={hasattr(payload, 'to_bytes')}. "
                    f"Payload repr={repr(payload)[:200]}"
                )

            if not cdr_bytes:
                raise ValueError("Received empty payload")

            msg = self.session_mgr.store.deserialize_cdr(cdr_bytes, self.msg_type)
            self.callback(msg)
        except Exception as e:
            logger.error(f"Error deserializing message on topic {self.topic}: {e}", exc_info=True)

    def _query_historical_data(self):
        """Query for cached data from transient_local publishers.

        rmw_zenoh uses AdvancedPublisher which caches messages at:
            <key_expr>/@adv/pub/<zenoh_id>/<entity_id>/_

        The `_anyke` selector parameter allows replies on any key expression.
        Zenoh uses ';' as parameter separator (not '&').
        """
        publishers = self._discover_publishers()
        if not publishers:
            logger.debug(f"No transient_local publishers for {self.topic}")
            return

        max_samples = self.qos_profile.history_depth
        total_replies = 0

        for zenoh_id in publishers:
            # Query the AdvancedPublisher cache with _anyke to accept any reply key
            selector = f"{self.keyexpr}/@adv/pub/{zenoh_id}/**?_anyke;_max={max_samples}"

            try:
                for reply in self.session_mgr.session.get(selector, timeout=2.0):
                    if reply.ok is not None:
                        total_replies += 1
                        self._process_sample(reply.ok)
                    elif reply.err is not None:
                        logger.warning(f"Query error for {self.topic}: {reply.err}")
            except Exception as e:
                logger.warning(f"Failed to query publisher {zenoh_id}: {e}")

        if total_replies > 0:
            logger.debug(f"Received {total_replies} cached samples for {self.topic}")

    def _discover_publishers(self) -> list[str]:
        """Discover publishers for this topic via liveliness tokens.

        Returns:
            List of zenoh_id strings for each publisher found.
        """
        publishers = set()

        # rmw_zenoh liveliness token format:
        # @ros2_lv/<domain>/<zenoh_id>/<seq>/<entity_id>/MP/<gid_hi>/<gid_lo>/<node>/%<topic>/<dds_type>/<type_hash>/<qos>
        lv_pattern = (
            f"@ros2_lv/{self.domain_id}/*/*/*/MP/*/*/*"
            f"/%{self.topic.lstrip('/')}/{self.dds_type_name}/{self.type_hash}/*"
        )

        try:
            replies = self.session_mgr.liveliness.get(lv_pattern, timeout=2.0)
            for reply in replies:
                if reply.ok is not None:
                    parts = str(reply.ok.key_expr).split('/')
                    if len(parts) > 2:
                        publishers.add(parts[2])  # zenoh_id is at index 2
        except Exception as e:
            logger.warning(f"Failed to discover publishers for {self.topic}: {e}")

        return list(publishers)

    def close(self):
        """
        Close the subscriber and cleanup resources.

        This method is idempotent - it's safe to call multiple times.
        """
        # Check if already closed
        if hasattr(self, '_closed') and self._closed:
            return

        try:
            # Undeclare liveliness tokens first (best-effort)
            if hasattr(self, "subscriber_token") and self.subscriber_token is not None:
                self.subscriber_token.undeclare()
                self.subscriber_token = None
            if hasattr(self, "node_token") and self.node_token is not None:
                self.node_token.undeclare()
                self.node_token = None

            if hasattr(self, 'sub') and self.sub is not None:
                # Zenoh subscribers have an undeclare() method to explicitly remove them
                # This is the proper way to clean up a subscriber
                if hasattr(self.sub, 'undeclare'):
                    self.sub.undeclare()
                # Mark as closed
                self.sub = None
            self._closed = True
        except (AttributeError, RuntimeError) as e:
            # AttributeError: subscriber doesn't exist or undeclare method not available
            # RuntimeError: Zenoh runtime errors
            logger.debug(f"Error during subscriber cleanup for topic {self.topic}: {e}")
            # Mark as closed even if undeclare failed to prevent retry loops
            self._closed = True
        except Exception as e:
            # Catch any other unexpected exceptions during cleanup
            # Log at warning level since this is unexpected
            logger.warning(f"Unexpected error during subscriber cleanup for topic {self.topic}: {e}")
            self._closed = True

close()

Close the subscriber and cleanup resources.

This method is idempotent - it's safe to call multiple times.

Source code in zenoh_ros2_sdk/subscriber.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def close(self):
    """
    Close the subscriber and cleanup resources.

    This method is idempotent - it's safe to call multiple times.
    """
    # Check if already closed
    if hasattr(self, '_closed') and self._closed:
        return

    try:
        # Undeclare liveliness tokens first (best-effort)
        if hasattr(self, "subscriber_token") and self.subscriber_token is not None:
            self.subscriber_token.undeclare()
            self.subscriber_token = None
        if hasattr(self, "node_token") and self.node_token is not None:
            self.node_token.undeclare()
            self.node_token = None

        if hasattr(self, 'sub') and self.sub is not None:
            # Zenoh subscribers have an undeclare() method to explicitly remove them
            # This is the proper way to clean up a subscriber
            if hasattr(self.sub, 'undeclare'):
                self.sub.undeclare()
            # Mark as closed
            self.sub = None
        self._closed = True
    except (AttributeError, RuntimeError) as e:
        # AttributeError: subscriber doesn't exist or undeclare method not available
        # RuntimeError: Zenoh runtime errors
        logger.debug(f"Error during subscriber cleanup for topic {self.topic}: {e}")
        # Mark as closed even if undeclare failed to prevent retry loops
        self._closed = True
    except Exception as e:
        # Catch any other unexpected exceptions during cleanup
        # Log at warning level since this is unexpected
        logger.warning(f"Unexpected error during subscriber cleanup for topic {self.topic}: {e}")
        self._closed = True