INT(带内遥测)¶
单交换机示例:P4 流水线给每个转发的 IPv4 包嵌入一个 14 字节的 INT(带内网络遥测)shim 头。shim 中携带交换机 ID、入端时间戳、 出端口、队列深度,以及原始 etherType。接收主机命名空间里的原始 套接字 listener 解析 shim,按包打印结构化遥测。
这个示例展示了什么¶
- 线缆级头插入:P4 deparser 在以太网与 IPv4 之间发出一个新头。
- EtherType 替换:外层 etherType 改成
0x88B6(INT shim 标识), 这样内核与抓包过滤器能分清 INT 帧。 - 原始 etherType 保留:shim 的
next_proto字段保留原始 etherType(IPv4 是0x0800),接收端可以恢复内层头链。 - 原始套接字解码:用户态 listener 通过
AF_PACKET收帧,按 字节偏移解析 shim,每帧打一行。
拓扑¶
examples/int/topology.py:
"""Two hosts, one switch, IPv4 forwarding with INT shim insertion.
The P4 program (`int.p4`) inserts a 14-byte INT shim header between the
Ethernet and IPv4 headers on every forwarded packet. The shim carries the
switch identifier, ingress timestamp, egress port, queue depth, and the
original etherType (so a receiver can recover the inner IPv4 header).
Run as root:
sudo p4net examples/int/topology.py
Then in a separate terminal on h2 (e.g. ``h2 xterm`` from the CLI):
sudo python3 /path/to/examples/int/listener.py --iface h2-eth0
And from another terminal:
sudo ip netns exec h1 ping -c 3 -W 1 10.0.0.2
The listener prints one structured line per INT-stamped frame.
"""
from __future__ import annotations
from pathlib import Path
from p4net import Network
from p4net.topo import Topology
HERE = Path(__file__).resolve().parent
topology = Topology()
h1 = topology.add_host("h1", ip="10.0.0.1/24", mac="00:00:00:00:00:01")
h2 = topology.add_host("h2", ip="10.0.0.2/24", mac="00:00:00:00:00:02")
s1 = topology.add_switch("s1", p4_src=HERE / "int.p4")
topology.add_link(h1, s1, port_b=1)
topology.add_link(h2, s1, port_b=2)
def setup(net: Network) -> None:
"""Static ARP both sides; LPM entries; write the switch_id register."""
h1 = net.host("h1")
h2 = net.host("h2")
h1.exec(
[
"ip",
"neigh",
"replace",
"10.0.0.2",
"lladdr",
"00:00:00:00:00:02",
"dev",
"h1-eth0",
"nud",
"permanent",
]
)
h2.exec(
[
"ip",
"neigh",
"replace",
"10.0.0.1",
"lladdr",
"00:00:00:00:00:01",
"dev",
"h2-eth0",
"nud",
"permanent",
]
)
s1 = net.switch("s1")
s1.client.insert_table_entry(
table="MyIngress.ipv4_lpm",
match={"hdr.ipv4.dstAddr": "10.0.0.1/32"},
action="MyIngress.set_egress_port",
params={"port": 1},
)
s1.client.insert_table_entry(
table="MyIngress.ipv4_lpm",
match={"hdr.ipv4.dstAddr": "10.0.0.2/32"},
action="MyIngress.set_egress_port",
params={"port": 2},
)
# Assign this switch's INT identifier. The INT shim stamps every
# forwarded packet with this value. For multi-switch topologies,
# give each switch a distinct id.
s1.client.write_register("MyIngress.switch_id", index=0, value=1)
if __name__ == "__main__":
from p4net.cli.main import main
raise SystemExit(main([__file__]))
两主机一交换机,P4Runtime 跑 IPv4 转发,加静态 ARP。
P4 程序¶
examples/int/int.p4:
/* In-band Network Telemetry (INT) — single-switch demo.
*
* For every IPv4 packet that the LPM table forwards, the switch inserts a
* 14-byte INT shim header between the Ethernet header and the IPv4 payload.
*
* Wire layout produced by the deparser:
*
* [ Ethernet (14 B, etherType=0x88B6) ]
* [ INT shim (14 B) ]
* [ IPv4 + payload ]
*
* INT shim layout (most-significant bit first, total 14 bytes):
*
* +--------+--------+--------+--------+--------+--------+--------+
* | swid | ingress_timestamp_us (48 bits) |
* | (8) | |
* +--------+----------------+-----------------+-----------------+
* | egress_port (16) | queue_depth (16) | next_proto (16) |
* +-------------------------+-------------------+-------------+
* | reserved (8) |
* +----------------+
*
* The shim's `next_proto` field carries the original etherType (0x0800
* for IPv4) so the receiver can recover the inner IPv4 header. A
* user-space listener on the receiving host parses the shim from a raw
* AF_PACKET socket; see `examples/int/listener.py`.
*
* Pairs with `examples/int/topology.py`, which programs `ipv4_lpm`,
* writes the ``switch_id`` register, and pre-seeds static ARP entries.
*/
#include <core.p4>
#include <v1model.p4>
const bit<16> ETHERTYPE_IPV4 = 0x0800;
const bit<16> ETHERTYPE_INT = 0x88B6;
header ethernet_t {
bit<48> dstAddr;
bit<48> srcAddr;
bit<16> etherType;
}
header int_shim_t {
bit<8> switch_id;
bit<48> ingress_timestamp_us;
bit<16> egress_port;
bit<16> queue_depth;
bit<16> next_proto;
bit<8> reserved;
}
header ipv4_t {
bit<4> version;
bit<4> ihl;
bit<8> diffserv;
bit<16> totalLen;
bit<16> identification;
bit<3> flags;
bit<13> fragOffset;
bit<8> ttl;
bit<8> protocol;
bit<16> hdrChecksum;
bit<32> srcAddr;
bit<32> dstAddr;
}
struct headers {
ethernet_t ethernet;
int_shim_t int_shim;
ipv4_t ipv4;
}
struct metadata {}
parser MyParser(packet_in pkt, out headers hdr, inout metadata meta,
inout standard_metadata_t std) {
state start {
pkt.extract(hdr.ethernet);
transition select(hdr.ethernet.etherType) {
ETHERTYPE_IPV4: parse_ipv4;
default: accept;
}
}
state parse_ipv4 {
pkt.extract(hdr.ipv4);
transition accept;
}
}
control MyVerifyChecksum(inout headers hdr, inout metadata meta) { apply {} }
control MyIngress(inout headers hdr, inout metadata meta,
inout standard_metadata_t std) {
/* One-element register holding the configured switch identifier.
* The control plane writes this at start via
* ``client.write_register("MyIngress.switch_id", index=0, value=N)``. */
register<bit<8>>(1) switch_id;
action drop() {
mark_to_drop(std);
}
action set_egress_port(bit<9> port) {
std.egress_spec = port;
}
table ipv4_lpm {
key = {
hdr.ipv4.dstAddr: lpm;
}
actions = {
drop;
set_egress_port;
NoAction;
}
default_action = NoAction();
size = 1024;
}
apply {
if (hdr.ipv4.isValid()) {
ipv4_lpm.apply();
/* Only stamp INT shim on packets actually being forwarded. */
if (std.egress_spec != 0) {
bit<8> sid;
switch_id.read(sid, 0);
hdr.int_shim.setValid();
hdr.int_shim.switch_id = sid;
hdr.int_shim.ingress_timestamp_us = (bit<48>) std.ingress_global_timestamp;
hdr.int_shim.egress_port = (bit<16>) std.egress_spec;
hdr.int_shim.queue_depth = (bit<16>) std.deq_qdepth;
hdr.int_shim.next_proto = hdr.ethernet.etherType;
hdr.int_shim.reserved = 0;
hdr.ethernet.etherType = ETHERTYPE_INT;
}
}
}
}
control MyEgress(inout headers hdr, inout metadata meta,
inout standard_metadata_t std) { apply {} }
control MyComputeChecksum(inout headers hdr, inout metadata meta) { apply {} }
control MyDeparser(packet_out pkt, in headers hdr) {
apply {
pkt.emit(hdr.ethernet);
pkt.emit(hdr.int_shim);
pkt.emit(hdr.ipv4);
}
}
V1Switch(MyParser(), MyVerifyChecksum(), MyIngress(), MyEgress(),
MyComputeChecksum(), MyDeparser()) main;
要点:
- shim 头静态声明,deparser 按 valid 位条件发送。
- ingress 控制在 LPM 表设好
std.egress_spec之后再从standard_metadata填 shim。 switch_id现在是寄存器(register<bit<8>>(1) switch_id;)。 拓扑的setup(net)用s1.client.write_register("MyIngress.switch_id", index=0, value=1)写入;多交换机 INT 部署可以无重编译地为每台分配独立编号。
listener¶
examples/int/listener.py:
"""INT shim listener — runs inside a host namespace, prints per-frame INT data.
Usage (must be run as root because AF_PACKET sockets are privileged):
sudo ip netns exec h2 python3 listener.py --iface h2-eth0
Or from the p4net interactive shell:
h2 xterm
# in the spawned xterm:
sudo python3 examples/int/listener.py --iface h2-eth0
The script opens a raw AF_PACKET socket, filters by EtherType 0x88B6 (the
INT shim), and decodes the 14-byte shim that follows the Ethernet header.
Wire layout (matches the deparser in int.p4):
[ Ethernet (14 B, etherType = 0x88B6) ]
[ INT shim (14 B): ]
switch_id uint8
ingress_timestamp_us uint48 (big-endian, packed in 6 bytes; BMv2 reports microseconds)
egress_port uint16
queue_depth uint16
next_proto uint16 (= 0x0800 for IPv4)
reserved uint8
[ IPv4 + payload ]
"""
from __future__ import annotations
import argparse
import socket
import struct
import sys
ETH_P_ALL = 0x0003
ETHERTYPE_INT = 0x88B6
SHIM_LEN = 14
def _decode_int_shim(buf: bytes) -> dict[str, int]:
"""Decode a 14-byte INT shim into a dict."""
if len(buf) < SHIM_LEN:
raise ValueError(f"INT shim truncated: got {len(buf)} bytes, need {SHIM_LEN}")
switch_id = buf[0]
# 48-bit big-endian timestamp packed in 6 bytes.
ts = int.from_bytes(buf[1:7], "big")
egress_port, queue_depth, next_proto = struct.unpack("!HHH", buf[7:13])
reserved = buf[13]
return {
"switch_id": switch_id,
"ingress_timestamp_us": ts,
"egress_port": egress_port,
"queue_depth": queue_depth,
"next_proto": next_proto,
"reserved": reserved,
}
def _decode_ipv4_addrs(buf: bytes) -> tuple[str, str] | None:
"""Pull src/dst from a buffer beginning at the IPv4 header. Returns None on truncation."""
if len(buf) < 20:
return None
src = socket.inet_ntoa(buf[12:16])
dst = socket.inet_ntoa(buf[16:20])
return src, dst
def main() -> int:
parser = argparse.ArgumentParser(
description="Decode INT shim headers from a raw AF_PACKET socket."
)
parser.add_argument(
"--iface",
required=True,
help="Interface name to bind to (e.g. h2-eth0).",
)
parser.add_argument(
"--count",
type=int,
default=0,
help="Exit after printing this many INT frames (0 = forever).",
)
args = parser.parse_args()
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_ALL))
sock.bind((args.iface, 0))
sys.stdout.write(f"[listener] bound on {args.iface}, waiting for INT frames\n")
sys.stdout.flush()
seen = 0
while True:
frame, _addr = sock.recvfrom(65535)
if len(frame) < 14 + SHIM_LEN:
continue
etype = int.from_bytes(frame[12:14], "big")
if etype != ETHERTYPE_INT:
continue
shim = _decode_int_shim(frame[14 : 14 + SHIM_LEN])
inner = frame[14 + SHIM_LEN :]
addrs = _decode_ipv4_addrs(inner) if shim["next_proto"] == 0x0800 else None
flow = f" {addrs[0]} -> {addrs[1]}" if addrs else ""
sys.stdout.write(
f"[switch={shim['switch_id']} "
f"ts={shim['ingress_timestamp_us']}us "
f"egress={shim['egress_port']} "
f"queue={shim['queue_depth']} "
f"next_proto=0x{shim['next_proto']:04x}]{flow}\n"
)
sys.stdout.flush()
seen += 1
if args.count and seen >= args.count:
return 0
if __name__ == "__main__":
raise SystemExit(main())
listener 打开原始 AF_PACKET 套接字,按 etherType == 0x88B6
过滤,按字节偏移解 14 字节 shim,打印结构化结果。
跑起来¶
一个终端:
setup(net) 装 LPM 条目,预置静态 ARP,落到 p4net> shell。
另一个终端(或者从 shell 用 h2 xterm 起):
再一个终端发流量:
每过交换机一个包,listener 打一行:
[listener] bound on h2-eth0, waiting for INT frames
[switch=1 ts=745907us egress=2 queue=0 next_proto=0x0800] 10.0.0.1 -> 10.0.0.2
[switch=1 ts=1750021us egress=2 queue=0 next_proto=0x0800] 10.0.0.1 -> 10.0.0.2
[switch=1 ts=2754336us egress=2 queue=0 next_proto=0x0800] 10.0.0.1 -> 10.0.0.2
注意事项¶
queue_depth几乎总是 0:BMv2 默认队列下,除非出端队列 确实积压(本示例流量根本不会),该字段就停在 0。线路通了 仅此而已。- 只演示单跳:真实 INT 每过一跳叠一层 shim;多跳留作扩展 练习。
- 交换机 ID 由寄存器配置:改拓扑
setup(net)中write_register("MyIngress.switch_id", index=0, value=N)的N即可换标。多交换机部署给每台分配不同N,无须重编译。
可以试试¶
- 加第二个交换机,把它的
switch_id寄存器写为2,串成 h1 → s1 → s2 → h2,扩展 listener(或 P4 流水线)处理 shim 栈。 - 把 listener 输出重定向到文件,离线算出基于
ingress_timestamp_us的逐流延迟差。 - 给某条 h↔s 链路加
delay="50ms"或loss_pct=2.0,看 时间戳与包数是否如预期变化。