跳转至

异步并发多交换机

三交换机全网格拓扑,每台交换机的 IPv4 LPM 表通过 v1.6 AsyncP4RuntimeClient 并发下发。可见的收益是耗时:三台交换机 并行下发比串行快了大约 gRPC RTT 倍数(减去事件循环开销)。

拓扑

examples/async_concurrent/topology.py

"""Three-switch full-mesh topology demonstrating the v1.6 async client.

Topology::

    h1 (10.0.1.1/24)            h2 (10.0.2.1/24)            h3 (10.0.3.1/24)
        |                           |                           |
        | port 1                    | port 1                    | port 1
       s1 -------- port 2 ---------- s2 -------- port 3 -------- s3
        |                                                       |
        |                  port 3                       port 2  |
        +-------------------------------------------------------+

Three hosts each on their own /24, on different switches. Switches are
fully meshed (s1-s2, s2-s3, s3-s1). Each switch loads the same P4 binary
(``concurrent.p4``) and gets a per-switch ipv4_lpm table populated from
Python via the v1.6 ``AsyncP4RuntimeClient``.

``setup(net)`` drives all six concurrent inserts (2 cross-subnet routes
per switch) via ``asyncio.gather`` against three switches' async clients
in parallel — visibly faster than the sequential equivalent on a
multi-switch topology.

Run as root:

    sudo p4net examples/async_concurrent/topology.py

Then in the shell::

    pingall
"""

from __future__ import annotations

import asyncio
from pathlib import Path
from typing import Any

from p4net import Network
from p4net.topo import Topology

HERE = Path(__file__).resolve().parent

topology = Topology()

h1 = topology.add_host("h1", ip="10.0.1.1/24", mac="00:00:00:00:00:01", default_route="10.0.1.254")
h2 = topology.add_host("h2", ip="10.0.2.1/24", mac="00:00:00:00:00:02", default_route="10.0.2.254")
h3 = topology.add_host("h3", ip="10.0.3.1/24", mac="00:00:00:00:00:03", default_route="10.0.3.254")

s1 = topology.add_switch("s1", p4_src=HERE / "concurrent.p4")
s2 = topology.add_switch("s2", p4_src=HERE / "concurrent.p4")
s3 = topology.add_switch("s3", p4_src=HERE / "concurrent.p4")

# Each switch to its host on port 1
topology.add_link(h1, s1, port_b=1)
topology.add_link(h2, s2, port_b=1)
topology.add_link(h3, s3, port_b=1)
# Switch mesh: s1.port2 ↔ s2.port2, s2.port3 ↔ s3.port2, s3.port3 ↔ s1.port3.
topology.add_link(s1, s2, port_a=2, port_b=2)
topology.add_link(s2, s3, port_a=3, port_b=2)
topology.add_link(s3, s1, port_a=3, port_b=3)


# Per-switch routing table: each switch points at port 1 for its own
# subnet, and routes the other two subnets out the mesh ports above.
_ROUTES: dict[str, list[tuple[str, int]]] = {
    "s1": [("10.0.1.0/24", 1), ("10.0.2.0/24", 2), ("10.0.3.0/24", 3)],
    "s2": [("10.0.2.0/24", 1), ("10.0.1.0/24", 2), ("10.0.3.0/24", 3)],
    "s3": [("10.0.3.0/24", 1), ("10.0.1.0/24", 3), ("10.0.2.0/24", 2)],
}


async def _async_setup(net: Network) -> dict[str, Any]:
    """Concurrent table programming across all three switches.

    Returns a dict with timing measurements so the example can be
    instrumented from the shell or a test harness.
    """
    switches = [net.switch(n) for n in ("s1", "s2", "s3")]
    clients = [sw.async_client for sw in switches]

    # Connect all clients concurrently.
    await asyncio.gather(*(c.connect() for c in clients))

    # Build the full set of insert coroutines, then gather them.
    tasks = []
    for sw, client in zip(switches, clients, strict=True):
        for cidr, port in _ROUTES[sw.name]:
            tasks.append(
                client.insert_table_entry(
                    "MyIngress.ipv4_lpm",
                    {"hdr.ipv4.dstAddr": cidr},
                    "MyIngress.set_egress_port",
                    {"port": port},
                )
            )
    import time as _time

    t0 = _time.perf_counter()
    await asyncio.gather(*tasks)
    t1 = _time.perf_counter()
    concurrent_ms = (t1 - t0) * 1000.0

    # Disconnect (Network.stop would do this anyway).
    await asyncio.gather(*(c.disconnect() for c in clients))
    return {"concurrent_ms": concurrent_ms, "inserts": len(tasks)}


def setup(net: Network) -> None:
    """Static ARP across subnets + concurrent table programming."""
    # Each host learns the cross-subnet gateway MACs. We use a single
    # virtual gateway MAC per subnet (one MAC == one switch's port-1).
    # On the wire the switches forward by destination IP/24, so the
    # source MAC isn't relevant to the LPM table.
    arp_entries = [
        ("h1", "10.0.2.1", "00:00:00:00:00:02"),
        ("h1", "10.0.3.1", "00:00:00:00:00:03"),
        ("h2", "10.0.1.1", "00:00:00:00:00:01"),
        ("h2", "10.0.3.1", "00:00:00:00:00:03"),
        ("h3", "10.0.1.1", "00:00:00:00:00:01"),
        ("h3", "10.0.2.1", "00:00:00:00:00:02"),
    ]
    for host, ip, mac in arp_entries:
        net.host(host).exec(
            [
                "ip",
                "neigh",
                "replace",
                ip,
                "lladdr",
                mac,
                "dev",
                f"{host}-eth0",
                "nud",
                "permanent",
            ]
        )

    timings = asyncio.run(_async_setup(net))
    print(
        f"async setup: {timings['inserts']} table entries installed "
        f"concurrently in {timings['concurrent_ms']:.2f} ms",
        flush=True,
    )


if __name__ == "__main__":
    from p4net.cli.main import main

    raise SystemExit(main([__file__]))

P4 程序

examples/async_concurrent/concurrent.p4

/* IPv4 LPM forwarder, identical functional shape to a subset of
 * ``examples/l3_forwarding/ipv4_lpm.p4``. Kept self-contained so the
 * async-concurrent example doesn't take a cross-example dependency.
 *
 * One match table (``ipv4_lpm``) keyed on destination address; one
 * forwarding action (``set_egress_port``) plus drop. Three switches in
 * the topology each load this same binary; their forwarding tables are
 * populated concurrently from Python via the v1.6 async client.
 */
#include <core.p4>
#include <v1model.p4>

header ethernet_t {
    bit<48> dstAddr;
    bit<48> srcAddr;
    bit<16> etherType;
}

header ipv4_t {
    bit<4>  version;
    bit<4>  ihl;
    bit<8>  diffserv;
    bit<16> totalLen;
    bit<16> identification;
    bit<3>  flags;
    bit<13> fragOffset;
    bit<8>  ttl;
    bit<8>  protocol;
    bit<16> hdrChecksum;
    bit<32> srcAddr;
    bit<32> dstAddr;
}

const bit<16> ETHERTYPE_IPV4 = 0x0800;

struct headers {
    ethernet_t ethernet;
    ipv4_t     ipv4;
}

struct metadata {}

parser MyParser(packet_in pkt, out headers hdr, inout metadata meta,
                inout standard_metadata_t std) {
    state start {
        pkt.extract(hdr.ethernet);
        transition select(hdr.ethernet.etherType) {
            ETHERTYPE_IPV4: parse_ipv4;
            default: accept;
        }
    }
    state parse_ipv4 {
        pkt.extract(hdr.ipv4);
        transition accept;
    }
}

control MyVerifyChecksum(inout headers hdr, inout metadata meta) { apply {} }

control MyIngress(inout headers hdr, inout metadata meta,
                  inout standard_metadata_t std) {
    action drop_packet() {
        mark_to_drop(std);
    }

    action set_egress_port(bit<9> port) {
        std.egress_spec = port;
    }

    table ipv4_lpm {
        key = {
            hdr.ipv4.dstAddr: lpm;
        }
        actions = {
            drop_packet;
            set_egress_port;
            NoAction;
        }
        default_action = NoAction();
        size = 1024;
    }

    apply {
        if (hdr.ipv4.isValid()) {
            ipv4_lpm.apply();
        }
    }
}

control MyEgress(inout headers hdr, inout metadata meta,
                 inout standard_metadata_t std) { apply {} }

control MyComputeChecksum(inout headers hdr, inout metadata meta) { apply {} }

control MyDeparser(packet_out pkt, in headers hdr) {
    apply {
        pkt.emit(hdr.ethernet);
        pkt.emit(hdr.ipv4);
    }
}

V1Switch(MyParser(), MyVerifyChecksum(), MyIngress(), MyEgress(),
         MyComputeChecksum(), MyDeparser()) main;

标准 ipv4_lpm 表,流水线本身没有什么花活;有趣的部分在 Python 那一侧。

跑起来

sudo p4net examples/async_concurrent/topology.py

setup(net) 通过 asyncio.run(_async_setup(net)) 并发连接三个异步 客户端、并发安装九条表项,并打印一行耗时。接着在 p4net> shell:

pingall

三台主机可以跨网格互 ping。

相关阅读

  • 异步客户端——总览、主控权模式、取消语义。
  • API 稳定性——AsyncP4RuntimeClient 在 1.x 自 1.7.0 起为稳定等级。