Skip to content

qos

QoS model and encoding compatible with rmw_zenoh_cpp / ros-z.

rmw_zenoh encodes QoS into liveliness tokens as a compact string:

::,: ::

Empty fields mean "use default" (as in rmw_zenoh / ros-z).

QosProfile

QoS profile compatible with rmw_zenoh / ros-z QoS token encoding.

Notes
  • This SDK primarily uses QoS to populate @ros2_lv/.../<qos> liveliness tokens.
  • Some QoS fields may not have a runtime effect unless the underlying Zenoh API supports it.
  • encode()/decode() implement the same compact format used by rmw_zenoh_cpp and ros-z.
Source code in zenoh_ros2_sdk/qos.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@slotted_dataclass(frozen=True)
class QosProfile:
    """QoS profile compatible with rmw_zenoh / ros-z QoS token encoding.

    Notes:
        - This SDK primarily uses QoS to populate `@ros2_lv/.../<qos>` liveliness tokens.
        - Some QoS fields may not have a runtime effect unless the underlying Zenoh API supports it.
        - `encode()`/`decode()` implement the same compact format used by `rmw_zenoh_cpp` and `ros-z`.
    """
    reliability: QosReliability = QosReliability.RELIABLE
    durability: QosDurability = QosDurability.VOLATILE
    history_kind: QosHistoryKind = QosHistoryKind.KEEP_LAST
    history_depth: int = 10
    deadline: Duration = DURATION_INFINITE
    lifespan: Duration = DURATION_INFINITE
    liveliness: QosLiveliness = QosLiveliness.AUTOMATIC
    liveliness_lease_duration: Duration = DURATION_INFINITE

    def encode(self, *, default: Optional["QosProfile"] = None) -> str:
        """
        Encode into the rmw_zenoh QoS token format (compatible with ros-z).

        If `default` is provided, fields equal to `default` are elided (empty).
        """
        default = default or QosProfile()

        reliability = "" if self.reliability == default.reliability else str(self.reliability.value)
        durability = "" if self.durability == default.durability else str(self.durability.value)

        # History: <kind>,<depth>
        if self.history_kind == QosHistoryKind.KEEP_LAST:
            if (self.history_kind == default.history_kind) and (self.history_depth == default.history_depth):
                history = f",{self.history_depth}"
            elif self.history_kind == default.history_kind:
                history = f",{self.history_depth}"
            else:
                history = f"{self.history_kind.value},{self.history_depth}"
        else:
            # KEEP_ALL has no depth
            history = f"{self.history_kind.value},"

        def _encode_dur(d: Duration, d_default: Duration) -> str:
            if d == d_default:
                return ","
            return f"{d.sec},{d.nsec}"

        deadline = _encode_dur(self.deadline, default.deadline)
        lifespan = _encode_dur(self.lifespan, default.lifespan)

        if (self.liveliness == default.liveliness) and (self.liveliness_lease_duration == default.liveliness_lease_duration):
            liveliness = ",,"
        else:
            liveliness = f"{self.liveliness.value},{self.liveliness_lease_duration.sec},{self.liveliness_lease_duration.nsec}"

        return f"{reliability}:{durability}:{history}:{deadline}:{lifespan}:{liveliness}"

    @staticmethod
    def decode(encoded: str) -> "QosProfile":
        """
        Decode from the rmw_zenoh QoS token format.

        This is intentionally lenient and treats empty fields as defaults.
        """
        default = QosProfile()
        parts = (encoded or "").split(":")
        if len(parts) < 6:
            raise ValueError(f"Incomplete QoS string (expected 6 fields): {encoded!r}")

        def _rel(s: str) -> QosReliability:
            if s in ("", "0", "1"):
                return default.reliability
            if s == "2":
                return QosReliability.BEST_EFFORT
            raise ValueError(f"Invalid reliability: {s!r}")

        def _dur(s: str) -> QosDurability:
            if s in ("", "0", "2"):
                return default.durability
            if s == "1":
                return QosDurability.TRANSIENT_LOCAL
            raise ValueError(f"Invalid durability: {s!r}")

        def _history(s: str) -> Tuple[QosHistoryKind, int]:
            if s in ("", ","):
                return default.history_kind, default.history_depth
            kind_s, depth_s = (s.split(",", 1) + [""])[:2]
            # kind may be empty => default keep_last
            if kind_s in ("", "0", "1"):
                kind = QosHistoryKind.KEEP_LAST
                depth = int(depth_s) if depth_s != "" else default.history_depth
                return kind, depth
            if kind_s == "2":
                return QosHistoryKind.KEEP_ALL, 0
            raise ValueError(f"Invalid history: {s!r}")

        def _dur2(s: str, d_default: Duration) -> Duration:
            if s in ("", ","):
                return d_default
            sec_s, nsec_s = (s.split(",", 1) + [""])[:2]
            sec = int(sec_s) if sec_s != "" else d_default.sec
            nsec = int(nsec_s) if nsec_s != "" else d_default.nsec
            return Duration(sec=sec, nsec=nsec)

        def _liveliness(s: str) -> Tuple[QosLiveliness, Duration]:
            if s in ("", ",,"):
                return default.liveliness, default.liveliness_lease_duration
            kind_s, sec_s, nsec_s = (s.split(",", 2) + ["", ""])[:3]
            if kind_s in ("", "0", "1"):
                kind = QosLiveliness.AUTOMATIC
            elif kind_s == "2":
                kind = QosLiveliness.MANUAL_BY_NODE
            elif kind_s == "3":
                kind = QosLiveliness.MANUAL_BY_TOPIC
            else:
                kind = default.liveliness
            sec = int(sec_s) if sec_s != "" else default.liveliness_lease_duration.sec
            nsec = int(nsec_s) if nsec_s != "" else default.liveliness_lease_duration.nsec
            return kind, Duration(sec=sec, nsec=nsec)

        reliability = _rel(parts[0])
        durability = _dur(parts[1])
        history_kind, history_depth = _history(parts[2])
        deadline = _dur2(parts[3], default.deadline)
        lifespan = _dur2(parts[4], default.lifespan)
        liveliness, liveliness_lease_duration = _liveliness(parts[5])

        return QosProfile(
            reliability=reliability,
            durability=durability,
            history_kind=history_kind,
            history_depth=history_depth,
            deadline=deadline,
            lifespan=lifespan,
            liveliness=liveliness,
            liveliness_lease_duration=liveliness_lease_duration,
        )

encode(*, default: Optional['QosProfile'] = None) -> str

Encode into the rmw_zenoh QoS token format (compatible with ros-z).

If default is provided, fields equal to default are elided (empty).

Source code in zenoh_ros2_sdk/qos.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def encode(self, *, default: Optional["QosProfile"] = None) -> str:
    """
    Encode into the rmw_zenoh QoS token format (compatible with ros-z).

    If `default` is provided, fields equal to `default` are elided (empty).
    """
    default = default or QosProfile()

    reliability = "" if self.reliability == default.reliability else str(self.reliability.value)
    durability = "" if self.durability == default.durability else str(self.durability.value)

    # History: <kind>,<depth>
    if self.history_kind == QosHistoryKind.KEEP_LAST:
        if (self.history_kind == default.history_kind) and (self.history_depth == default.history_depth):
            history = f",{self.history_depth}"
        elif self.history_kind == default.history_kind:
            history = f",{self.history_depth}"
        else:
            history = f"{self.history_kind.value},{self.history_depth}"
    else:
        # KEEP_ALL has no depth
        history = f"{self.history_kind.value},"

    def _encode_dur(d: Duration, d_default: Duration) -> str:
        if d == d_default:
            return ","
        return f"{d.sec},{d.nsec}"

    deadline = _encode_dur(self.deadline, default.deadline)
    lifespan = _encode_dur(self.lifespan, default.lifespan)

    if (self.liveliness == default.liveliness) and (self.liveliness_lease_duration == default.liveliness_lease_duration):
        liveliness = ",,"
    else:
        liveliness = f"{self.liveliness.value},{self.liveliness_lease_duration.sec},{self.liveliness_lease_duration.nsec}"

    return f"{reliability}:{durability}:{history}:{deadline}:{lifespan}:{liveliness}"

decode(encoded: str) -> 'QosProfile' staticmethod

Decode from the rmw_zenoh QoS token format.

This is intentionally lenient and treats empty fields as defaults.

Source code in zenoh_ros2_sdk/qos.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@staticmethod
def decode(encoded: str) -> "QosProfile":
    """
    Decode from the rmw_zenoh QoS token format.

    This is intentionally lenient and treats empty fields as defaults.
    """
    default = QosProfile()
    parts = (encoded or "").split(":")
    if len(parts) < 6:
        raise ValueError(f"Incomplete QoS string (expected 6 fields): {encoded!r}")

    def _rel(s: str) -> QosReliability:
        if s in ("", "0", "1"):
            return default.reliability
        if s == "2":
            return QosReliability.BEST_EFFORT
        raise ValueError(f"Invalid reliability: {s!r}")

    def _dur(s: str) -> QosDurability:
        if s in ("", "0", "2"):
            return default.durability
        if s == "1":
            return QosDurability.TRANSIENT_LOCAL
        raise ValueError(f"Invalid durability: {s!r}")

    def _history(s: str) -> Tuple[QosHistoryKind, int]:
        if s in ("", ","):
            return default.history_kind, default.history_depth
        kind_s, depth_s = (s.split(",", 1) + [""])[:2]
        # kind may be empty => default keep_last
        if kind_s in ("", "0", "1"):
            kind = QosHistoryKind.KEEP_LAST
            depth = int(depth_s) if depth_s != "" else default.history_depth
            return kind, depth
        if kind_s == "2":
            return QosHistoryKind.KEEP_ALL, 0
        raise ValueError(f"Invalid history: {s!r}")

    def _dur2(s: str, d_default: Duration) -> Duration:
        if s in ("", ","):
            return d_default
        sec_s, nsec_s = (s.split(",", 1) + [""])[:2]
        sec = int(sec_s) if sec_s != "" else d_default.sec
        nsec = int(nsec_s) if nsec_s != "" else d_default.nsec
        return Duration(sec=sec, nsec=nsec)

    def _liveliness(s: str) -> Tuple[QosLiveliness, Duration]:
        if s in ("", ",,"):
            return default.liveliness, default.liveliness_lease_duration
        kind_s, sec_s, nsec_s = (s.split(",", 2) + ["", ""])[:3]
        if kind_s in ("", "0", "1"):
            kind = QosLiveliness.AUTOMATIC
        elif kind_s == "2":
            kind = QosLiveliness.MANUAL_BY_NODE
        elif kind_s == "3":
            kind = QosLiveliness.MANUAL_BY_TOPIC
        else:
            kind = default.liveliness
        sec = int(sec_s) if sec_s != "" else default.liveliness_lease_duration.sec
        nsec = int(nsec_s) if nsec_s != "" else default.liveliness_lease_duration.nsec
        return kind, Duration(sec=sec, nsec=nsec)

    reliability = _rel(parts[0])
    durability = _dur(parts[1])
    history_kind, history_depth = _history(parts[2])
    deadline = _dur2(parts[3], default.deadline)
    lifespan = _dur2(parts[4], default.lifespan)
    liveliness, liveliness_lease_duration = _liveliness(parts[5])

    return QosProfile(
        reliability=reliability,
        durability=durability,
        history_kind=history_kind,
        history_depth=history_depth,
        deadline=deadline,
        lifespan=lifespan,
        liveliness=liveliness,
        liveliness_lease_duration=liveliness_lease_duration,
    )