Skip to content

keyexpr

Key expression builders aligned with rmw_zenoh_cpp design.md and ros-z implementation.

topic_keyexpr(domain_id: int, fully_qualified_name: str, dds_type_name: str, type_hash: str) -> str

Data-plane key expression for topics/services.

Format

///

Source code in zenoh_ros2_sdk/keyexpr.py
14
15
16
17
18
19
20
21
def topic_keyexpr(domain_id: int, fully_qualified_name: str, dds_type_name: str, type_hash: str) -> str:
    """
    Data-plane key expression for topics/services.

    Format:
      <domain_id>/<fully_qualified_name>/<type_name>/<type_hash>
    """
    return f"{domain_id}/{fully_qualified_name.lstrip('/')}/{dds_type_name}/{type_hash}"

node_liveliness_keyexpr(node: NodeEntity) -> str

Liveliness token for a node.

Format

@ros2_lv/////NN///

Source code in zenoh_ros2_sdk/keyexpr.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def node_liveliness_keyexpr(node: NodeEntity) -> str:
    """
    Liveliness token for a node.

    Format:
      @ros2_lv/<domain_id>/<session_id>/<node_id>/<node_id>/NN/<enclave>/<namespace>/<node_name>
    """
    namespace = mangle_name(node.namespace)
    name = mangle_name(node.node_name)
    enclave = node.enclave if node.enclave else "%"
    return (
        f"{ADMIN_SPACE}/{node.domain_id}/{node.session_id}/"
        f"{node.node_id}/{node.node_id}/{EntityKind.NODE.value}/"
        f"{enclave}/{namespace}/{name}"
    )

endpoint_liveliness_keyexpr(ep: EndpointEntity) -> str

Liveliness token for a publisher/subscriber/service/client.

Format

@ros2_lv///////// ///

Source code in zenoh_ros2_sdk/keyexpr.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def endpoint_liveliness_keyexpr(ep: EndpointEntity) -> str:
    """
    Liveliness token for a publisher/subscriber/service/client.

    Format:
      @ros2_lv/<domain_id>/<session_id>/<node_id>/<entity_id>/<kind>/<enclave>/<namespace>/<node_name>/
      <mangled_qualified_name>/<type_name>/<type_hash>/<qos>
    """
    node = ep.node
    namespace = mangle_name(node.namespace)
    node_name = mangle_name(node.node_name)
    qualified_name = mangle_name(ep.name)
    enclave = node.enclave if node.enclave else "%"
    return (
        f"{ADMIN_SPACE}/{node.domain_id}/{node.session_id}/"
        f"{node.node_id}/{ep.entity_id}/{ep.kind.value}/"
        f"{enclave}/{namespace}/{node_name}/"
        f"{qualified_name}/{ep.dds_type_name}/{ep.type_hash}/{ep.qos}"
    )