跳转至

p4net.runtime

Runtime primitives: namespaces, links, traffic shaping.

__all__ module-attribute

__all__ = ['BMv2NotFoundError', 'BMv2StartupError', 'BMv2Switch', 'LinkError', 'NSProcess', 'NamespaceError', 'NetworkNamespace', 'P4NetError', 'PrivilegeError', 'TcError', 'VethPair', 'apply_netem', 'clear_qdisc', 'disable_ipv6', 'enable_ipv6']

BMv2Switch

Process lifecycle wrapper for a single simple_switch_grpc instance.

Source code in src/p4net/runtime/bmv2.py
class BMv2Switch:
    """Process lifecycle wrapper for a single `simple_switch_grpc` instance."""

    def __init__(
        self,
        name: str,
        *,
        device_id: int,
        grpc_port: int,
        thrift_port: int,
        bmv2_json: Path,
        port_to_iface: Mapping[int, str],
        log_dir: Path,
        pcap_dir: Path | None = None,
        cpu_port: int | None = None,
        log_level: str = "info",
        grpc_bind_addr: str = "127.0.0.1",
        binary: str = "simple_switch_grpc",
        startup_timeout: float = 10.0,
    ) -> None:
        if not name:
            raise ValueError("BMv2Switch name must be non-empty")
        if log_level not in _VALID_LOG_LEVELS:
            raise ValueError(
                f"invalid log_level {log_level!r}: must be one of {sorted(_VALID_LOG_LEVELS)}"
            )
        if startup_timeout <= 0:
            raise ValueError("startup_timeout must be positive")
        self._name = name
        self._device_id = device_id
        self._grpc_port = grpc_port
        self._thrift_port = thrift_port
        self._bmv2_json = Path(bmv2_json)
        self._port_to_iface: dict[int, str] = dict(port_to_iface)
        self._log_dir = Path(log_dir)
        self._pcap_dir = Path(pcap_dir) if pcap_dir is not None else None
        self._cpu_port = cpu_port
        self._log_level = log_level
        self._grpc_bind_addr = grpc_bind_addr
        self._binary = binary
        self._startup_timeout = startup_timeout

        self._proc: subprocess.Popen[bytes] | None = None
        self._log_file_handle: object | None = None  # an open file, kept alive
        self._started = False
        self._boot_timestamp_us: int | None = None

    # Properties ---------------------------------------------------------

    @property
    def name(self) -> str:
        """The switch name (used as the basename of the log file)."""
        return self._name

    @property
    def pid(self) -> int | None:
        """OS process ID, or ``None`` if the switch has not been started."""
        return None if self._proc is None else self._proc.pid

    @property
    def grpc_address(self) -> str:
        """``host:port`` string the P4Runtime gRPC server binds to."""
        return f"{self._grpc_bind_addr}:{self._grpc_port}"

    @property
    def log_file(self) -> Path:
        """Path to the BMv2 log file inside ``log_dir``."""
        return self._log_dir / f"{self._name}.log"

    @property
    def device_id(self) -> int:
        """P4Runtime device ID this switch reports as."""
        return self._device_id

    @property
    def thrift_port(self) -> int:
        """Thrift bind port for ``simple_switch_CLI`` and register operations."""
        return self._thrift_port

    @property
    def grpc_port(self) -> int:
        """gRPC port the P4Runtime server bound to (host portion is ``grpc_bind_addr``)."""
        return self._grpc_port

    @property
    def boot_timestamp_us(self) -> int | None:
        """Wall-clock microseconds since Unix epoch when this BMv2 process started.

        ``None`` if the switch has not been started yet, or has been stopped
        since its last start. Captured immediately before ``subprocess.Popen``,
        so drift from BMv2's internal clock zero is bounded by Popen overhead
        (sub-millisecond on a typical Linux host).

        Combined with INT shim ``ingress_timestamp_us`` to derive wall-clock
        arrival time across multiple switches::

            wall_clock_us = bmv2.boot_timestamp_us + shim.ingress_timestamp_us
        """
        return self._boot_timestamp_us

    # Argv construction --------------------------------------------------

    def _build_argv(self) -> list[str]:
        argv: list[str] = [
            self._binary,
            "--device-id",
            str(self._device_id),
        ]
        for port in sorted(self._port_to_iface):
            argv += ["-i", f"{port}@{self._port_to_iface[port]}"]
        argv += [
            "--thrift-port",
            str(self._thrift_port),
            "--log-console",
            "--log-flush",
            "-L",
            self._log_level,
        ]
        if self._pcap_dir is not None:
            argv += ["--pcap", str(self._pcap_dir)]
        # The JSON config path is a positional argument of the BMv2 core
        # parser, so it must precede the `--` separator. Anything after `--`
        # is consumed by the simple_switch_grpc target parser.
        argv.append(str(self._bmv2_json))
        argv.append("--")
        argv += ["--grpc-server-addr", f"{self._grpc_bind_addr}:{self._grpc_port}"]
        if self._cpu_port is not None:
            argv += ["--cpu-port", str(self._cpu_port)]
        return argv

    # Lifecycle ----------------------------------------------------------

    def start(self) -> None:
        """Spawn the simple_switch_grpc process. Non-blocking."""
        if self._started:
            raise BMv2StartupError(f"BMv2 {self._name!r} already started (pid={self.pid})")
        binary_path = shutil.which(self._binary)
        if binary_path is None:
            raise BMv2NotFoundError(f"binary {self._binary!r} not found on PATH")

        self._log_dir.mkdir(parents=True, exist_ok=True)
        if self._pcap_dir is not None:
            self._pcap_dir.mkdir(parents=True, exist_ok=True)

        argv = self._build_argv()
        # Replace argv[0] with the absolute path so PATH lookups don't
        # surprise us at exec time.
        argv[0] = binary_path
        log_path = self.log_file
        log_handle = log_path.open("ab")
        self._log_file_handle = log_handle
        logger.debug("BMv2 %r starting: %s", self._name, argv)
        # Capture wall-clock immediately before Popen — drift from BMv2's
        # internal clock zero is bounded by Popen overhead. Do NOT add any
        # work between this assignment and the Popen call.
        self._boot_timestamp_us = time.time_ns() // 1000
        try:
            self._proc = subprocess.Popen(
                argv,
                stdout=log_handle,
                stderr=subprocess.STDOUT,
                bufsize=0,
                close_fds=True,
            )
        except OSError as exc:
            log_handle.close()
            self._log_file_handle = None
            self._boot_timestamp_us = None
            raise BMv2StartupError(f"failed to spawn BMv2 {self._name!r}: {exc}") from exc
        self._started = True
        logger.debug("BMv2 %r started (pid=%d, log=%s)", self._name, self._proc.pid, log_path)

    def wait_until_ready(self, *, timeout: float | None = None) -> None:
        """Block until the gRPC port accepts a connection, or fail."""
        if not self._started or self._proc is None:
            raise BMv2StartupError(f"BMv2 {self._name!r} has not been started; call start() first")
        effective_timeout = self._startup_timeout if timeout is None else timeout
        deadline = time.monotonic() + effective_timeout
        start_time = time.monotonic()
        while time.monotonic() < deadline:
            if not self.is_running():
                rc = self.returncode()
                raise BMv2StartupError(
                    f"BMv2 {self._name!r} exited before becoming ready "
                    f"(returncode={rc}); see {self.log_file}"
                )
            try:
                with socket.create_connection(
                    (self._grpc_bind_addr, self._grpc_port),
                    timeout=_READY_POLL_INTERVAL,
                ):
                    elapsed = time.monotonic() - start_time
                    logger.debug("BMv2 %r became ready after %.3fs", self._name, elapsed)
                    return
            except OSError:
                time.sleep(_READY_POLL_INTERVAL)
        raise BMv2StartupError(
            f"BMv2 {self._name!r} did not open gRPC port within "
            f"{effective_timeout}s; see {self.log_file}"
        )

    def stop(self, *, timeout: float = 5.0) -> None:
        """Send SIGTERM, escalate to SIGKILL on timeout. Idempotent."""
        if self._proc is None:
            return
        if self._proc.poll() is not None:
            self._cleanup_log()
            logger.debug("BMv2 %r already exited (rc=%s)", self._name, self._proc.returncode)
            return
        logger.debug("BMv2 %r SIGTERM", self._name)
        self._proc.terminate()
        try:
            self._proc.wait(timeout=timeout)
        except subprocess.TimeoutExpired:
            logger.debug(
                "BMv2 %r did not exit in %.1fs after SIGTERM; sending SIGKILL",
                self._name,
                timeout,
            )
            self._proc.kill()
            try:
                self._proc.wait(timeout=timeout)
            except subprocess.TimeoutExpired:
                # Truly unkillable; bail without raising. The caller will see
                # the process via /proc and can decide what to do.
                logger.debug("BMv2 %r did not exit even after SIGKILL", self._name)
                return
        logger.debug("BMv2 %r exited with rc=%s", self._name, self._proc.returncode)
        self._cleanup_log()

    def kill(self) -> None:
        """Send SIGKILL immediately. Idempotent."""
        if self._proc is None:
            return
        if self._proc.poll() is not None:
            self._cleanup_log()
            return
        logger.debug("BMv2 %r SIGKILL (direct)", self._name)
        self._proc.kill()
        try:
            self._proc.wait(timeout=5.0)
        except subprocess.TimeoutExpired:
            return
        self._cleanup_log()

    def is_running(self) -> bool:
        """``True`` while the BMv2 child process is still alive."""
        if self._proc is None:
            return False
        return self._proc.poll() is None

    def returncode(self) -> int | None:
        """Process exit code, or ``None`` if it has not exited yet."""
        if self._proc is None:
            return None
        return self._proc.poll()

    def _cleanup_log(self) -> None:
        handle = self._log_file_handle
        # Reset the boot timestamp alongside log cleanup so it becomes None
        # on every shutdown path (stop / kill / already-exited).
        self._boot_timestamp_us = None
        if handle is None:
            return
        try:
            close = getattr(handle, "close", None)
            if callable(close):
                close()
        finally:
            self._log_file_handle = None

    # Context manager ----------------------------------------------------

    def __enter__(self) -> BMv2Switch:
        self.start()
        try:
            self.wait_until_ready()
        except BaseException:
            try:
                self.stop()
            finally:
                pass
            raise
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        tb: TracebackType | None,
    ) -> None:
        self.stop()

    def __repr__(self) -> str:
        state = "running" if self.is_running() else "stopped"
        return (
            f"BMv2Switch(name={self._name!r}, grpc={self.grpc_address}, "
            f"pid={self.pid}, state={state})"
        )

name property

name: str

The switch name (used as the basename of the log file).

pid property

pid: int | None

OS process ID, or None if the switch has not been started.

grpc_address property

grpc_address: str

host:port string the P4Runtime gRPC server binds to.

log_file property

log_file: Path

Path to the BMv2 log file inside log_dir.

device_id property

device_id: int

P4Runtime device ID this switch reports as.

thrift_port property

thrift_port: int

Thrift bind port for simple_switch_CLI and register operations.

grpc_port property

grpc_port: int

gRPC port the P4Runtime server bound to (host portion is grpc_bind_addr).

boot_timestamp_us property

boot_timestamp_us: int | None

Wall-clock microseconds since Unix epoch when this BMv2 process started.

None if the switch has not been started yet, or has been stopped since its last start. Captured immediately before subprocess.Popen, so drift from BMv2's internal clock zero is bounded by Popen overhead (sub-millisecond on a typical Linux host).

Combined with INT shim ingress_timestamp_us to derive wall-clock arrival time across multiple switches::

wall_clock_us = bmv2.boot_timestamp_us + shim.ingress_timestamp_us

start

start() -> None

Spawn the simple_switch_grpc process. Non-blocking.

Source code in src/p4net/runtime/bmv2.py
def start(self) -> None:
    """Spawn the simple_switch_grpc process. Non-blocking."""
    if self._started:
        raise BMv2StartupError(f"BMv2 {self._name!r} already started (pid={self.pid})")
    binary_path = shutil.which(self._binary)
    if binary_path is None:
        raise BMv2NotFoundError(f"binary {self._binary!r} not found on PATH")

    self._log_dir.mkdir(parents=True, exist_ok=True)
    if self._pcap_dir is not None:
        self._pcap_dir.mkdir(parents=True, exist_ok=True)

    argv = self._build_argv()
    # Replace argv[0] with the absolute path so PATH lookups don't
    # surprise us at exec time.
    argv[0] = binary_path
    log_path = self.log_file
    log_handle = log_path.open("ab")
    self._log_file_handle = log_handle
    logger.debug("BMv2 %r starting: %s", self._name, argv)
    # Capture wall-clock immediately before Popen — drift from BMv2's
    # internal clock zero is bounded by Popen overhead. Do NOT add any
    # work between this assignment and the Popen call.
    self._boot_timestamp_us = time.time_ns() // 1000
    try:
        self._proc = subprocess.Popen(
            argv,
            stdout=log_handle,
            stderr=subprocess.STDOUT,
            bufsize=0,
            close_fds=True,
        )
    except OSError as exc:
        log_handle.close()
        self._log_file_handle = None
        self._boot_timestamp_us = None
        raise BMv2StartupError(f"failed to spawn BMv2 {self._name!r}: {exc}") from exc
    self._started = True
    logger.debug("BMv2 %r started (pid=%d, log=%s)", self._name, self._proc.pid, log_path)

wait_until_ready

wait_until_ready(*, timeout: float | None = None) -> None

Block until the gRPC port accepts a connection, or fail.

Source code in src/p4net/runtime/bmv2.py
def wait_until_ready(self, *, timeout: float | None = None) -> None:
    """Block until the gRPC port accepts a connection, or fail."""
    if not self._started or self._proc is None:
        raise BMv2StartupError(f"BMv2 {self._name!r} has not been started; call start() first")
    effective_timeout = self._startup_timeout if timeout is None else timeout
    deadline = time.monotonic() + effective_timeout
    start_time = time.monotonic()
    while time.monotonic() < deadline:
        if not self.is_running():
            rc = self.returncode()
            raise BMv2StartupError(
                f"BMv2 {self._name!r} exited before becoming ready "
                f"(returncode={rc}); see {self.log_file}"
            )
        try:
            with socket.create_connection(
                (self._grpc_bind_addr, self._grpc_port),
                timeout=_READY_POLL_INTERVAL,
            ):
                elapsed = time.monotonic() - start_time
                logger.debug("BMv2 %r became ready after %.3fs", self._name, elapsed)
                return
        except OSError:
            time.sleep(_READY_POLL_INTERVAL)
    raise BMv2StartupError(
        f"BMv2 {self._name!r} did not open gRPC port within "
        f"{effective_timeout}s; see {self.log_file}"
    )

stop

stop(*, timeout: float = 5.0) -> None

Send SIGTERM, escalate to SIGKILL on timeout. Idempotent.

Source code in src/p4net/runtime/bmv2.py
def stop(self, *, timeout: float = 5.0) -> None:
    """Send SIGTERM, escalate to SIGKILL on timeout. Idempotent."""
    if self._proc is None:
        return
    if self._proc.poll() is not None:
        self._cleanup_log()
        logger.debug("BMv2 %r already exited (rc=%s)", self._name, self._proc.returncode)
        return
    logger.debug("BMv2 %r SIGTERM", self._name)
    self._proc.terminate()
    try:
        self._proc.wait(timeout=timeout)
    except subprocess.TimeoutExpired:
        logger.debug(
            "BMv2 %r did not exit in %.1fs after SIGTERM; sending SIGKILL",
            self._name,
            timeout,
        )
        self._proc.kill()
        try:
            self._proc.wait(timeout=timeout)
        except subprocess.TimeoutExpired:
            # Truly unkillable; bail without raising. The caller will see
            # the process via /proc and can decide what to do.
            logger.debug("BMv2 %r did not exit even after SIGKILL", self._name)
            return
    logger.debug("BMv2 %r exited with rc=%s", self._name, self._proc.returncode)
    self._cleanup_log()

kill

kill() -> None

Send SIGKILL immediately. Idempotent.

Source code in src/p4net/runtime/bmv2.py
def kill(self) -> None:
    """Send SIGKILL immediately. Idempotent."""
    if self._proc is None:
        return
    if self._proc.poll() is not None:
        self._cleanup_log()
        return
    logger.debug("BMv2 %r SIGKILL (direct)", self._name)
    self._proc.kill()
    try:
        self._proc.wait(timeout=5.0)
    except subprocess.TimeoutExpired:
        return
    self._cleanup_log()

is_running

is_running() -> bool

True while the BMv2 child process is still alive.

Source code in src/p4net/runtime/bmv2.py
def is_running(self) -> bool:
    """``True`` while the BMv2 child process is still alive."""
    if self._proc is None:
        return False
    return self._proc.poll() is None

returncode

returncode() -> int | None

Process exit code, or None if it has not exited yet.

Source code in src/p4net/runtime/bmv2.py
def returncode(self) -> int | None:
    """Process exit code, or ``None`` if it has not exited yet."""
    if self._proc is None:
        return None
    return self._proc.poll()

BMv2NotFoundError

Bases: P4NetError

Raised when the configured simple_switch_grpc binary is not on PATH.

Source code in src/p4net/runtime/exceptions.py
class BMv2NotFoundError(P4NetError):
    """Raised when the configured simple_switch_grpc binary is not on PATH."""

BMv2StartupError

Bases: P4NetError

Raised when a BMv2 process fails to start, fails to become ready, or exits unexpectedly during startup.

Source code in src/p4net/runtime/exceptions.py
class BMv2StartupError(P4NetError):
    """Raised when a BMv2 process fails to start, fails to become ready,
    or exits unexpectedly during startup.
    """

LinkError

Bases: P4NetError

Failure creating, configuring, or destroying a network link.

Source code in src/p4net/runtime/exceptions.py
class LinkError(P4NetError):
    """Failure creating, configuring, or destroying a network link."""

NamespaceError

Bases: P4NetError

Failure creating, destroying, or operating in a network namespace.

Source code in src/p4net/runtime/exceptions.py
class NamespaceError(P4NetError):
    """Failure creating, destroying, or operating in a network namespace."""

P4NetError

Bases: Exception

Base class for all p4net-specific errors.

Source code in src/p4net/runtime/exceptions.py
class P4NetError(Exception):
    """Base class for all p4net-specific errors."""

PrivilegeError

Bases: P4NetError

Raised when the operation requires root or CAP_NET_ADMIN and the caller has neither.

Source code in src/p4net/runtime/exceptions.py
class PrivilegeError(P4NetError):
    """Raised when the operation requires root or CAP_NET_ADMIN and the caller has neither."""

TcError

Bases: P4NetError

Failure configuring traffic control / netem.

Source code in src/p4net/runtime/exceptions.py
class TcError(P4NetError):
    """Failure configuring traffic control / netem."""

VethPair

A veth pair with explicit lifecycle and per-side namespace tracking.

Source code in src/p4net/runtime/link.py
class VethPair:
    """A veth pair with explicit lifecycle and per-side namespace tracking."""

    def __init__(self, name_a: str, name_b: str) -> None:
        _validate_ifname(name_a)
        _validate_ifname(name_b)
        if name_a == name_b:
            raise ValueError("the two veth sides must have distinct names")
        self._names: dict[Side, str] = {"a": name_a, "b": name_b}
        self._ns: dict[Side, NetworkNamespace | None] = {"a": None, "b": None}
        self._created = False

    @property
    def name_a(self) -> str:
        """Interface name on the ``a`` side of the pair."""
        return self._names["a"]

    @property
    def name_b(self) -> str:
        """Interface name on the ``b`` side of the pair."""
        return self._names["b"]

    def name_of(self, side: Side) -> str:
        """Return the interface name on ``side`` (``"a"`` or ``"b"``)."""
        _validate_side(side)
        return self._names[side]

    def namespace_of(self, side: Side) -> NetworkNamespace | None:
        """Return the namespace ``side`` currently lives in (``None`` for root)."""
        _validate_side(side)
        return self._ns[side]

    @contextmanager
    def _netlink_for(self, side: Side) -> Iterator[Any]:
        ns = self._ns[side]
        ipr: Any = IPRoute() if ns is None else NetNS(ns.name)
        try:
            yield ipr
        finally:
            ipr.close()

    @staticmethod
    def _index(ipr: Any, ifname: str) -> int:
        results = ipr.link_lookup(ifname=ifname)
        if not results:
            raise LinkError(f"interface {ifname!r} not found in this namespace")
        return int(results[0])

    def create(self) -> None:
        """Create the kernel veth pair via netlink. Both ends start in root netns.

        Raises:
            LinkError: if either interface already exists or the pair has
                already been created.
        """
        if self._created:
            raise LinkError("veth pair already created")
        with IPRoute() as ipr:
            if ipr.link_lookup(ifname=self._names["a"]):
                raise LinkError(f"interface {self._names['a']!r} already exists")
            if ipr.link_lookup(ifname=self._names["b"]):
                raise LinkError(f"interface {self._names['b']!r} already exists")
            ipr.link(
                "add",
                ifname=self._names["a"],
                kind="veth",
                peer=self._names["b"],
            )
        self._created = True
        logger.debug("created veth pair %r <-> %r", self._names["a"], self._names["b"])

    def destroy(self) -> None:
        """Delete the veth pair (both ends, regardless of namespace)."""
        if not self._created:
            raise LinkError("veth pair has not been created (or already destroyed)")
        # Deleting either side deletes both ends. We delete from side 'a'.
        side: Side = "a"
        try:
            with self._netlink_for(side) as ipr:
                idx = self._index(ipr, self._names[side])
                ipr.link("del", index=idx)
        except LinkError:
            raise
        except Exception as exc:
            raise LinkError(f"failed to destroy veth pair: {exc}") from exc
        self._created = False
        logger.debug("destroyed veth pair %r <-> %r", self._names["a"], self._names["b"])

    def move_to_namespace(self, side: Side, ns: NetworkNamespace | None) -> None:
        """Move ``side`` of the pair into ``ns`` (or back to root if ``None``)."""
        _validate_side(side)
        ifname = self._names[side]
        with self._netlink_for(side) as ipr:
            idx = self._index(ipr, ifname)
            if ns is None:
                # Move back to the namespace of pid 1 (the host root netns).
                ipr.link("set", index=idx, net_ns_pid=1)
            else:
                ns_path = f"/var/run/netns/{ns.name}"
                fd = os.open(ns_path, os.O_RDONLY)
                try:
                    ipr.link("set", index=idx, net_ns_fd=fd)
                finally:
                    os.close(fd)
        self._ns[side] = ns
        logger.debug("moved %r to namespace %r", ifname, ns.name if ns is not None else "<root>")

    def set_up(self, side: Side) -> None:
        """Bring ``side`` of the pair administratively up."""
        self._set_state(side, "up")

    def set_down(self, side: Side) -> None:
        """Bring ``side`` of the pair administratively down."""
        self._set_state(side, "down")

    def _set_state(self, side: Side, state: str) -> None:
        _validate_side(side)
        ifname = self._names[side]
        with self._netlink_for(side) as ipr:
            idx = self._index(ipr, ifname)
            ipr.link("set", index=idx, state=state)
        logger.debug("set %r %s", ifname, state)

    def set_address(self, side: Side, cidr: str) -> None:
        """Assign an IPv4 CIDR (e.g. ``"10.0.0.1/24"``) to ``side``."""
        _validate_side(side)
        try:
            iface_addr = ipaddress.IPv4Interface(cidr)
        except (ValueError, ipaddress.AddressValueError, ipaddress.NetmaskValueError) as exc:
            raise ValueError(f"invalid IPv4 CIDR: {cidr!r}") from exc
        ifname = self._names[side]
        with self._netlink_for(side) as ipr:
            idx = self._index(ipr, ifname)
            ipr.addr(
                "add",
                index=idx,
                address=str(iface_addr.ip),
                prefixlen=iface_addr.network.prefixlen,
            )
        logger.debug("assigned %s to %r", cidr, ifname)

    def set_address6(self, side: Side, cidr: str) -> None:
        """Assign an IPv6 CIDR (e.g. ``"fd00::1/64"``) to one side of the pair.

        Implementation: same pyroute2 IPRoute path as `set_address`, but with
        ``family=socket.AF_INET6``. Validates the input via
        ``ipaddress.IPv6Interface`` and rejects IPv4 strings (their `/0`
        through `/32` masks would parse as IPv6Interface only by accident).
        Raises :class:`LinkError` on assignment failure.
        """
        _validate_side(side)
        if isinstance(cidr, str) and "." in cidr:
            raise ValueError(f"expected IPv6 CIDR, got IPv4-shaped {cidr!r}")
        try:
            iface_addr = ipaddress.IPv6Interface(cidr)
        except (ValueError, ipaddress.AddressValueError, ipaddress.NetmaskValueError) as exc:
            raise ValueError(f"invalid IPv6 CIDR: {cidr!r}") from exc
        ifname = self._names[side]
        try:
            with self._netlink_for(side) as ipr:
                idx = self._index(ipr, ifname)
                ipr.addr(
                    "add",
                    index=idx,
                    address=str(iface_addr.ip),
                    prefixlen=iface_addr.network.prefixlen,
                    family=socket.AF_INET6,
                )
        except LinkError:
            raise
        except Exception as exc:
            raise LinkError(f"failed to assign IPv6 {cidr!r} to {ifname!r}: {exc}") from exc
        logger.debug("assigned %s to %r", cidr, ifname)

    def set_mtu(self, side: Side, mtu: int) -> None:
        """Set the MTU on ``side`` (clamped to ``[68, 65535]``)."""
        _validate_side(side)
        if not isinstance(mtu, int) or isinstance(mtu, bool):
            raise ValueError(f"MTU must be an int, got {type(mtu).__name__}")
        if not _MTU_MIN <= mtu <= _MTU_MAX:
            raise ValueError(f"MTU must be in [{_MTU_MIN}, {_MTU_MAX}], got {mtu}")
        ifname = self._names[side]
        with self._netlink_for(side) as ipr:
            idx = self._index(ipr, ifname)
            ipr.link("set", index=idx, mtu=mtu)
        logger.debug("set MTU of %r to %d", ifname, mtu)

    def set_mac(self, side: Side, mac: str) -> None:
        """Override the MAC on ``side`` (canonical ``XX:XX:XX:XX:XX:XX``)."""
        _validate_side(side)
        if not isinstance(mac, str) or not _MAC_RE.match(mac):
            raise ValueError(f"invalid MAC address: {mac!r}")
        ifname = self._names[side]
        with self._netlink_for(side) as ipr:
            idx = self._index(ipr, ifname)
            ipr.link("set", index=idx, address=mac.lower())
        logger.debug("set MAC of %r to %s", ifname, mac)

    def __repr__(self) -> str:
        return f"VethPair({self._names['a']!r}, {self._names['b']!r})"

name_a property

name_a: str

Interface name on the a side of the pair.

name_b property

name_b: str

Interface name on the b side of the pair.

name_of

name_of(side: Side) -> str

Return the interface name on side ("a" or "b").

Source code in src/p4net/runtime/link.py
def name_of(self, side: Side) -> str:
    """Return the interface name on ``side`` (``"a"`` or ``"b"``)."""
    _validate_side(side)
    return self._names[side]

namespace_of

namespace_of(side: Side) -> NetworkNamespace | None

Return the namespace side currently lives in (None for root).

Source code in src/p4net/runtime/link.py
def namespace_of(self, side: Side) -> NetworkNamespace | None:
    """Return the namespace ``side`` currently lives in (``None`` for root)."""
    _validate_side(side)
    return self._ns[side]

create

create() -> None

Create the kernel veth pair via netlink. Both ends start in root netns.

Raises:

Type Description
LinkError

if either interface already exists or the pair has already been created.

Source code in src/p4net/runtime/link.py
def create(self) -> None:
    """Create the kernel veth pair via netlink. Both ends start in root netns.

    Raises:
        LinkError: if either interface already exists or the pair has
            already been created.
    """
    if self._created:
        raise LinkError("veth pair already created")
    with IPRoute() as ipr:
        if ipr.link_lookup(ifname=self._names["a"]):
            raise LinkError(f"interface {self._names['a']!r} already exists")
        if ipr.link_lookup(ifname=self._names["b"]):
            raise LinkError(f"interface {self._names['b']!r} already exists")
        ipr.link(
            "add",
            ifname=self._names["a"],
            kind="veth",
            peer=self._names["b"],
        )
    self._created = True
    logger.debug("created veth pair %r <-> %r", self._names["a"], self._names["b"])

destroy

destroy() -> None

Delete the veth pair (both ends, regardless of namespace).

Source code in src/p4net/runtime/link.py
def destroy(self) -> None:
    """Delete the veth pair (both ends, regardless of namespace)."""
    if not self._created:
        raise LinkError("veth pair has not been created (or already destroyed)")
    # Deleting either side deletes both ends. We delete from side 'a'.
    side: Side = "a"
    try:
        with self._netlink_for(side) as ipr:
            idx = self._index(ipr, self._names[side])
            ipr.link("del", index=idx)
    except LinkError:
        raise
    except Exception as exc:
        raise LinkError(f"failed to destroy veth pair: {exc}") from exc
    self._created = False
    logger.debug("destroyed veth pair %r <-> %r", self._names["a"], self._names["b"])

move_to_namespace

move_to_namespace(side: Side, ns: NetworkNamespace | None) -> None

Move side of the pair into ns (or back to root if None).

Source code in src/p4net/runtime/link.py
def move_to_namespace(self, side: Side, ns: NetworkNamespace | None) -> None:
    """Move ``side`` of the pair into ``ns`` (or back to root if ``None``)."""
    _validate_side(side)
    ifname = self._names[side]
    with self._netlink_for(side) as ipr:
        idx = self._index(ipr, ifname)
        if ns is None:
            # Move back to the namespace of pid 1 (the host root netns).
            ipr.link("set", index=idx, net_ns_pid=1)
        else:
            ns_path = f"/var/run/netns/{ns.name}"
            fd = os.open(ns_path, os.O_RDONLY)
            try:
                ipr.link("set", index=idx, net_ns_fd=fd)
            finally:
                os.close(fd)
    self._ns[side] = ns
    logger.debug("moved %r to namespace %r", ifname, ns.name if ns is not None else "<root>")

set_up

set_up(side: Side) -> None

Bring side of the pair administratively up.

Source code in src/p4net/runtime/link.py
def set_up(self, side: Side) -> None:
    """Bring ``side`` of the pair administratively up."""
    self._set_state(side, "up")

set_down

set_down(side: Side) -> None

Bring side of the pair administratively down.

Source code in src/p4net/runtime/link.py
def set_down(self, side: Side) -> None:
    """Bring ``side`` of the pair administratively down."""
    self._set_state(side, "down")

set_address

set_address(side: Side, cidr: str) -> None

Assign an IPv4 CIDR (e.g. "10.0.0.1/24") to side.

Source code in src/p4net/runtime/link.py
def set_address(self, side: Side, cidr: str) -> None:
    """Assign an IPv4 CIDR (e.g. ``"10.0.0.1/24"``) to ``side``."""
    _validate_side(side)
    try:
        iface_addr = ipaddress.IPv4Interface(cidr)
    except (ValueError, ipaddress.AddressValueError, ipaddress.NetmaskValueError) as exc:
        raise ValueError(f"invalid IPv4 CIDR: {cidr!r}") from exc
    ifname = self._names[side]
    with self._netlink_for(side) as ipr:
        idx = self._index(ipr, ifname)
        ipr.addr(
            "add",
            index=idx,
            address=str(iface_addr.ip),
            prefixlen=iface_addr.network.prefixlen,
        )
    logger.debug("assigned %s to %r", cidr, ifname)

set_address6

set_address6(side: Side, cidr: str) -> None

Assign an IPv6 CIDR (e.g. "fd00::1/64") to one side of the pair.

Implementation: same pyroute2 IPRoute path as set_address, but with family=socket.AF_INET6. Validates the input via ipaddress.IPv6Interface and rejects IPv4 strings (their /0 through /32 masks would parse as IPv6Interface only by accident). Raises :class:LinkError on assignment failure.

Source code in src/p4net/runtime/link.py
def set_address6(self, side: Side, cidr: str) -> None:
    """Assign an IPv6 CIDR (e.g. ``"fd00::1/64"``) to one side of the pair.

    Implementation: same pyroute2 IPRoute path as `set_address`, but with
    ``family=socket.AF_INET6``. Validates the input via
    ``ipaddress.IPv6Interface`` and rejects IPv4 strings (their `/0`
    through `/32` masks would parse as IPv6Interface only by accident).
    Raises :class:`LinkError` on assignment failure.
    """
    _validate_side(side)
    if isinstance(cidr, str) and "." in cidr:
        raise ValueError(f"expected IPv6 CIDR, got IPv4-shaped {cidr!r}")
    try:
        iface_addr = ipaddress.IPv6Interface(cidr)
    except (ValueError, ipaddress.AddressValueError, ipaddress.NetmaskValueError) as exc:
        raise ValueError(f"invalid IPv6 CIDR: {cidr!r}") from exc
    ifname = self._names[side]
    try:
        with self._netlink_for(side) as ipr:
            idx = self._index(ipr, ifname)
            ipr.addr(
                "add",
                index=idx,
                address=str(iface_addr.ip),
                prefixlen=iface_addr.network.prefixlen,
                family=socket.AF_INET6,
            )
    except LinkError:
        raise
    except Exception as exc:
        raise LinkError(f"failed to assign IPv6 {cidr!r} to {ifname!r}: {exc}") from exc
    logger.debug("assigned %s to %r", cidr, ifname)

set_mtu

set_mtu(side: Side, mtu: int) -> None

Set the MTU on side (clamped to [68, 65535]).

Source code in src/p4net/runtime/link.py
def set_mtu(self, side: Side, mtu: int) -> None:
    """Set the MTU on ``side`` (clamped to ``[68, 65535]``)."""
    _validate_side(side)
    if not isinstance(mtu, int) or isinstance(mtu, bool):
        raise ValueError(f"MTU must be an int, got {type(mtu).__name__}")
    if not _MTU_MIN <= mtu <= _MTU_MAX:
        raise ValueError(f"MTU must be in [{_MTU_MIN}, {_MTU_MAX}], got {mtu}")
    ifname = self._names[side]
    with self._netlink_for(side) as ipr:
        idx = self._index(ipr, ifname)
        ipr.link("set", index=idx, mtu=mtu)
    logger.debug("set MTU of %r to %d", ifname, mtu)

set_mac

set_mac(side: Side, mac: str) -> None

Override the MAC on side (canonical XX:XX:XX:XX:XX:XX).

Source code in src/p4net/runtime/link.py
def set_mac(self, side: Side, mac: str) -> None:
    """Override the MAC on ``side`` (canonical ``XX:XX:XX:XX:XX:XX``)."""
    _validate_side(side)
    if not isinstance(mac, str) or not _MAC_RE.match(mac):
        raise ValueError(f"invalid MAC address: {mac!r}")
    ifname = self._names[side]
    with self._netlink_for(side) as ipr:
        idx = self._index(ipr, ifname)
        ipr.link("set", index=idx, address=mac.lower())
    logger.debug("set MAC of %r to %s", ifname, mac)

NetworkNamespace

A named Linux network namespace with explicit lifecycle.

Use create() / destroy() directly, or use the instance as a context manager. exec() runs a command inside the namespace and waits for it; popen() returns a long-running process handle.

Source code in src/p4net/runtime/netns.py
class NetworkNamespace:
    """A named Linux network namespace with explicit lifecycle.

    Use `create()` / `destroy()` directly, or use the instance as a context
    manager. `exec()` runs a command inside the namespace and waits for it;
    `popen()` returns a long-running process handle.
    """

    def __init__(self, name: str) -> None:
        _validate_name(name)
        self._name = name

    @property
    def name(self) -> str:
        """The kernel-visible namespace name."""
        return self._name

    @property
    def exists(self) -> bool:
        """``True`` while the namespace is present in ``ip netns list``."""
        return self._name in _netns.listnetns()

    def create(self) -> None:
        """Create the namespace.

        Raises:
            NamespaceError: if a namespace with the same name already exists.
        """
        if self.exists:
            raise NamespaceError(f"namespace {self._name!r} already exists")
        _netns.create(self._name)
        logger.debug("created network namespace %r", self._name)

    def destroy(self) -> None:
        """Remove the namespace.

        Raises:
            NamespaceError: if the namespace does not exist.
        """
        if not self.exists:
            raise NamespaceError(f"namespace {self._name!r} does not exist")
        _netns.remove(self._name)
        logger.debug("destroyed network namespace %r", self._name)

    def exec(
        self,
        argv: Sequence[str],
        *,
        timeout: float | None = None,
        check: bool = True,
        capture_output: bool = False,
        env: Mapping[str, str] | None = None,
    ) -> subprocess.CompletedProcess[bytes]:
        """Run `argv` inside this namespace and wait for completion.

        Mirrors `subprocess.run` semantics: returns a `CompletedProcess`,
        raises `subprocess.CalledProcessError` if `check` and the exit
        status is non-zero, raises `subprocess.TimeoutExpired` on timeout.
        """
        full_argv = ["ip", "netns", "exec", self._name, *argv]
        return subprocess.run(
            full_argv,
            timeout=timeout,
            check=check,
            capture_output=capture_output,
            env=dict(env) if env is not None else None,
        )

    def popen(
        self,
        argv: Sequence[str],
        *,
        env: Mapping[str, str] | None = None,
        stdout: int | IO[Any] | None = None,
        stderr: int | IO[Any] | None = None,
        stdin: int | IO[Any] | None = None,
    ) -> NSProcess:
        """Spawn a long-running process inside this namespace.

        Returns an `NSProcess` wrapping a regular `subprocess.Popen` of
        `ip netns exec <name> <argv>`. The wrapper exposes a
        `subprocess.Popen`-compatible surface (`pid`, `poll`, `wait`,
        `terminate`, `kill`, `close`).
        """
        full_argv = ["ip", "netns", "exec", self._name, *argv]
        kwargs: dict[str, Any] = {}
        if env is not None:
            kwargs["env"] = dict(env)
        if stdout is not None:
            kwargs["stdout"] = stdout
        if stderr is not None:
            kwargs["stderr"] = stderr
        if stdin is not None:
            kwargs["stdin"] = stdin
        popen = subprocess.Popen(full_argv, **kwargs)
        logger.debug(
            "spawned process %r in namespace %r (pid=%d)",
            list(argv),
            self._name,
            popen.pid,
        )
        return NSProcess(popen)

    def __enter__(self) -> NetworkNamespace:
        self.create()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        tb: TracebackType | None,
    ) -> None:
        if self.exists:
            self.destroy()

    def __repr__(self) -> str:
        return f"NetworkNamespace({self._name!r})"

name property

name: str

The kernel-visible namespace name.

exists property

exists: bool

True while the namespace is present in ip netns list.

create

create() -> None

Create the namespace.

Raises:

Type Description
NamespaceError

if a namespace with the same name already exists.

Source code in src/p4net/runtime/netns.py
def create(self) -> None:
    """Create the namespace.

    Raises:
        NamespaceError: if a namespace with the same name already exists.
    """
    if self.exists:
        raise NamespaceError(f"namespace {self._name!r} already exists")
    _netns.create(self._name)
    logger.debug("created network namespace %r", self._name)

destroy

destroy() -> None

Remove the namespace.

Raises:

Type Description
NamespaceError

if the namespace does not exist.

Source code in src/p4net/runtime/netns.py
def destroy(self) -> None:
    """Remove the namespace.

    Raises:
        NamespaceError: if the namespace does not exist.
    """
    if not self.exists:
        raise NamespaceError(f"namespace {self._name!r} does not exist")
    _netns.remove(self._name)
    logger.debug("destroyed network namespace %r", self._name)

exec

exec(argv: Sequence[str], *, timeout: float | None = None, check: bool = True, capture_output: bool = False, env: Mapping[str, str] | None = None) -> subprocess.CompletedProcess[bytes]

Run argv inside this namespace and wait for completion.

Mirrors subprocess.run semantics: returns a CompletedProcess, raises subprocess.CalledProcessError if check and the exit status is non-zero, raises subprocess.TimeoutExpired on timeout.

Source code in src/p4net/runtime/netns.py
def exec(
    self,
    argv: Sequence[str],
    *,
    timeout: float | None = None,
    check: bool = True,
    capture_output: bool = False,
    env: Mapping[str, str] | None = None,
) -> subprocess.CompletedProcess[bytes]:
    """Run `argv` inside this namespace and wait for completion.

    Mirrors `subprocess.run` semantics: returns a `CompletedProcess`,
    raises `subprocess.CalledProcessError` if `check` and the exit
    status is non-zero, raises `subprocess.TimeoutExpired` on timeout.
    """
    full_argv = ["ip", "netns", "exec", self._name, *argv]
    return subprocess.run(
        full_argv,
        timeout=timeout,
        check=check,
        capture_output=capture_output,
        env=dict(env) if env is not None else None,
    )

popen

popen(argv: Sequence[str], *, env: Mapping[str, str] | None = None, stdout: int | IO[Any] | None = None, stderr: int | IO[Any] | None = None, stdin: int | IO[Any] | None = None) -> NSProcess

Spawn a long-running process inside this namespace.

Returns an NSProcess wrapping a regular subprocess.Popen of ip netns exec <name> <argv>. The wrapper exposes a subprocess.Popen-compatible surface (pid, poll, wait, terminate, kill, close).

Source code in src/p4net/runtime/netns.py
def popen(
    self,
    argv: Sequence[str],
    *,
    env: Mapping[str, str] | None = None,
    stdout: int | IO[Any] | None = None,
    stderr: int | IO[Any] | None = None,
    stdin: int | IO[Any] | None = None,
) -> NSProcess:
    """Spawn a long-running process inside this namespace.

    Returns an `NSProcess` wrapping a regular `subprocess.Popen` of
    `ip netns exec <name> <argv>`. The wrapper exposes a
    `subprocess.Popen`-compatible surface (`pid`, `poll`, `wait`,
    `terminate`, `kill`, `close`).
    """
    full_argv = ["ip", "netns", "exec", self._name, *argv]
    kwargs: dict[str, Any] = {}
    if env is not None:
        kwargs["env"] = dict(env)
    if stdout is not None:
        kwargs["stdout"] = stdout
    if stderr is not None:
        kwargs["stderr"] = stderr
    if stdin is not None:
        kwargs["stdin"] = stdin
    popen = subprocess.Popen(full_argv, **kwargs)
    logger.debug(
        "spawned process %r in namespace %r (pid=%d)",
        list(argv),
        self._name,
        popen.pid,
    )
    return NSProcess(popen)

NSProcess

A process running inside a NetworkNamespace.

Wraps a regular subprocess.Popen returned by NetworkNamespace.popen. The forwarded API (pid, poll, wait, terminate, kill) mirrors subprocess.Popen. close() is preserved as a no-op so callers from earlier phases keep working unchanged: there is no namespace-side handle to release because the wrapped process is a regular child of ip netns exec.

Source code in src/p4net/runtime/netns.py
class NSProcess:
    """A process running inside a `NetworkNamespace`.

    Wraps a regular `subprocess.Popen` returned by `NetworkNamespace.popen`.
    The forwarded API (`pid`, `poll`, `wait`, `terminate`, `kill`) mirrors
    `subprocess.Popen`. `close()` is preserved as a no-op so callers from
    earlier phases keep working unchanged: there is no namespace-side
    handle to release because the wrapped process is a regular child of
    `ip netns exec`.
    """

    def __init__(self, popen: Any) -> None:
        self._popen = popen
        self._closed = False

    @property
    def pid(self) -> int:
        """OS process ID of the wrapped child."""
        return int(self._popen.pid)

    def poll(self) -> int | None:
        """Non-blocking liveness check; returns the exit code or ``None``."""
        rc = self._popen.poll()
        return None if rc is None else int(rc)

    def wait(self, timeout: float | None = None) -> int:
        """Block until the child exits and return its exit code."""
        return int(self._popen.wait(timeout=timeout))

    def terminate(self) -> None:
        """Send ``SIGTERM`` to the child."""
        self._popen.terminate()

    def kill(self) -> None:
        """Send ``SIGKILL`` to the child."""
        self._popen.kill()

    def close(self) -> None:
        """Idempotent no-op preserved for API stability.

        Earlier phases needed this method to release a `pyroute2.NSPopen`
        helper; with a regular `subprocess.Popen` there is nothing to
        release, but the method is kept so callers compiled against the
        old API don't break. Calling `close` more than once is safe.
        """
        self._closed = True

    def __enter__(self) -> NSProcess:
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        tb: TracebackType | None,
    ) -> None:
        try:
            if self.poll() is None:
                try:
                    self.terminate()
                    self.wait(timeout=5)
                except Exception as terr:
                    logger.debug("NSProcess.__exit__: terminate/wait failed: %r", terr)
                if self.poll() is None:
                    try:
                        self.kill()
                        self.wait()
                    except Exception as kerr:
                        logger.debug("NSProcess.__exit__: kill/wait failed: %r", kerr)
        finally:
            self.close()

    def __del__(self) -> None:
        # __del__ must never raise; swallow everything.
        with contextlib.suppress(Exception):
            self.close()

    def __repr__(self) -> str:
        state = "closed" if self._closed else "open"
        return f"NSProcess(pid={self.pid if not self._closed else '?'}, {state})"

pid property

pid: int

OS process ID of the wrapped child.

poll

poll() -> int | None

Non-blocking liveness check; returns the exit code or None.

Source code in src/p4net/runtime/netns.py
def poll(self) -> int | None:
    """Non-blocking liveness check; returns the exit code or ``None``."""
    rc = self._popen.poll()
    return None if rc is None else int(rc)

wait

wait(timeout: float | None = None) -> int

Block until the child exits and return its exit code.

Source code in src/p4net/runtime/netns.py
def wait(self, timeout: float | None = None) -> int:
    """Block until the child exits and return its exit code."""
    return int(self._popen.wait(timeout=timeout))

terminate

terminate() -> None

Send SIGTERM to the child.

Source code in src/p4net/runtime/netns.py
def terminate(self) -> None:
    """Send ``SIGTERM`` to the child."""
    self._popen.terminate()

kill

kill() -> None

Send SIGKILL to the child.

Source code in src/p4net/runtime/netns.py
def kill(self) -> None:
    """Send ``SIGKILL`` to the child."""
    self._popen.kill()

close

close() -> None

Idempotent no-op preserved for API stability.

Earlier phases needed this method to release a pyroute2.NSPopen helper; with a regular subprocess.Popen there is nothing to release, but the method is kept so callers compiled against the old API don't break. Calling close more than once is safe.

Source code in src/p4net/runtime/netns.py
def close(self) -> None:
    """Idempotent no-op preserved for API stability.

    Earlier phases needed this method to release a `pyroute2.NSPopen`
    helper; with a regular `subprocess.Popen` there is nothing to
    release, but the method is kept so callers compiled against the
    old API don't break. Calling `close` more than once is safe.
    """
    self._closed = True

disable_ipv6

disable_ipv6(ns: NetworkNamespace | None, iface: str) -> None

Set net.ipv6.conf.<iface>.disable_ipv6=1 inside ns.

Idempotent — sysctl -w to an already-set value is harmless. Pass ns=None to target the root namespace.

Source code in src/p4net/runtime/sysctl.py
def disable_ipv6(ns: NetworkNamespace | None, iface: str) -> None:
    """Set ``net.ipv6.conf.<iface>.disable_ipv6=1`` inside ``ns``.

    Idempotent — sysctl ``-w`` to an already-set value is harmless. Pass
    ``ns=None`` to target the root namespace.
    """
    _set_one(ns, f"net.ipv6.conf.{iface}.disable_ipv6", "1")
    logger.debug("disabled IPv6 on %r in ns=%r", iface, ns.name if ns else "<root>")

enable_ipv6

enable_ipv6(ns: NetworkNamespace | None, iface: str, *, accept_ra: bool = False, autoconf: bool = False) -> None

Set disable_ipv6=0 plus accept_ra and autoconf on iface.

Defaults turn off Router Advertisement handling and SLAAC autoconfiguration so the host only carries addresses we explicitly assign. Set accept_ra=True or autoconf=True if you want kernel-level auto-addressing on this interface.

Source code in src/p4net/runtime/sysctl.py
def enable_ipv6(
    ns: NetworkNamespace | None,
    iface: str,
    *,
    accept_ra: bool = False,
    autoconf: bool = False,
) -> None:
    """Set ``disable_ipv6=0`` plus ``accept_ra`` and ``autoconf`` on ``iface``.

    Defaults turn off Router Advertisement handling and SLAAC autoconfiguration
    so the host only carries addresses we explicitly assign. Set
    ``accept_ra=True`` or ``autoconf=True`` if you want kernel-level
    auto-addressing on this interface.
    """
    _set_one(ns, f"net.ipv6.conf.{iface}.disable_ipv6", "0")
    _set_one(ns, f"net.ipv6.conf.{iface}.accept_ra", "1" if accept_ra else "0")
    _set_one(ns, f"net.ipv6.conf.{iface}.autoconf", "1" if autoconf else "0")
    logger.debug(
        "enabled IPv6 on %r in ns=%r (accept_ra=%s, autoconf=%s)",
        iface,
        ns.name if ns else "<root>",
        accept_ra,
        autoconf,
    )

apply_netem

apply_netem(ns: NetworkNamespace | None, iface: str, *, rate: str | None = None, delay: str | None = None, jitter: str | None = None, loss_pct: float | None = None) -> None

Apply a netem root qdisc to iface inside ns (or root if ns is None).

Uses tc qdisc replace so calling this function repeatedly is safe. At least one of rate, delay, jitter, loss_pct must be set. jitter requires delay.

Source code in src/p4net/runtime/tc.py
def apply_netem(
    ns: NetworkNamespace | None,
    iface: str,
    *,
    rate: str | None = None,
    delay: str | None = None,
    jitter: str | None = None,
    loss_pct: float | None = None,
) -> None:
    """Apply a netem root qdisc to `iface` inside `ns` (or root if ns is None).

    Uses `tc qdisc replace` so calling this function repeatedly is safe.
    At least one of `rate`, `delay`, `jitter`, `loss_pct` must be set.
    `jitter` requires `delay`.
    """
    if rate is None and delay is None and jitter is None and loss_pct is None:
        raise TcError("apply_netem requires at least one of rate, delay, jitter, loss_pct")
    if jitter is not None and delay is None:
        raise TcError("jitter requires delay")
    if rate is not None:
        _validate_rate(rate)
    if delay is not None:
        _validate_time(delay, "delay")
    if jitter is not None:
        _validate_time(jitter, "jitter")
    if loss_pct is not None and not 0.0 <= loss_pct <= 100.0:
        raise TcError(f"loss_pct must be in [0.0, 100.0], got {loss_pct}")

    argv = ["tc", "qdisc", "replace", "dev", iface, "root", "netem"]
    if delay is not None:
        argv += ["delay", delay]
        if jitter is not None:
            argv += [jitter]
    if loss_pct is not None:
        argv += ["loss", f"{loss_pct}%"]
    if rate is not None:
        argv += ["rate", rate]

    result = _run(ns, argv)
    if result.returncode != 0:
        stderr = result.stderr.decode(errors="replace").strip()
        raise TcError(f"tc qdisc replace failed (rc={result.returncode}): {stderr}")
    logger.debug(
        "applied netem on %r in ns=%r: %s",
        iface,
        ns.name if ns is not None else None,
        " ".join(argv[6:]),
    )

clear_qdisc

clear_qdisc(ns: NetworkNamespace | None, iface: str) -> None

Remove the root qdisc on iface. Idempotent: no-op if none is set.

Source code in src/p4net/runtime/tc.py
def clear_qdisc(ns: NetworkNamespace | None, iface: str) -> None:
    """Remove the root qdisc on `iface`. Idempotent: no-op if none is set."""
    argv = ["tc", "qdisc", "del", "dev", iface, "root"]
    result = _run(ns, argv)
    if result.returncode == 0:
        logger.debug("cleared qdisc on %r in ns=%r", iface, ns.name if ns is not None else None)
        return
    stderr_lc = result.stderr.decode(errors="replace").lower()
    # iproute2 reports "no such" / "cannot find" / "handle of zero" when no
    # user-installed root qdisc exists; treat all three as a successful no-op.
    if (
        "no such" in stderr_lc
        or "no qdisc" in stderr_lc
        or "cannot find" in stderr_lc
        or "handle of zero" in stderr_lc
    ):
        logger.debug(
            "no qdisc to clear on %r in ns=%r (already clean)",
            iface,
            ns.name if ns is not None else None,
        )
        return
    raise TcError(
        f"tc qdisc del failed (rc={result.returncode}): "
        f"{result.stderr.decode(errors='replace').strip()}"
    )