PacketIn observation from a secondary controller
I want to: run a read-only monitor that subscribes to PacketIn without holding primary, while the existing primary controller manages the device's forwarding state.
The pattern
import io.github.zhh2001.jp4.P4Switch;
import io.github.zhh2001.jp4.ElectionId;
import io.github.zhh2001.jp4.entity.PacketIn;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
try (P4Switch monitor = P4Switch.connect("127.0.0.1:50051")
.electionId(ElectionId.of(1)) // low id — primary holds something higher
.asSecondary()) {
// Secondaries can't bindPipeline (it's a write), but loadPipeline is a
// read-only RPC that fetches the device's currently-installed P4Info
// into the local switch. PacketIn parsing requires this schema.
monitor.loadPipeline();
AtomicInteger counter = new AtomicInteger();
monitor.packetInStream().subscribe(new Flow.Subscriber<PacketIn>() {
@Override public void onSubscribe(Flow.Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override public void onNext(PacketIn p) {
int port = p.metadataInt("ingress_port");
int bytes = p.payload().toByteArray().length;
System.out.printf("[#%d] port=%d bytes=%d%n",
counter.incrementAndGet(), port, bytes);
}
@Override public void onError(Throwable t) { /* log + reconnect logic */ }
@Override public void onComplete() { /* device closed stream */ }
});
// ... block until shutdown signal ...
}Real usage: network-monitor.
Walkthrough
- Connect with
asSecondary()and a low election id. The id has to be lower than the existing primary's. If the device replies withAcquired(you became primary), check operations team coordination on election ids —asSecondary()is a request, not a guarantee. loadPipeline()is the read-side complement ofbindPipeline. It issuesGetForwardingPipelineConfig, populates the localP4Info, and returns. Without it,packetInStream()throwsP4PipelineException("no pipeline bound; ...")because the inbound parser has no metadata schema to decode against.packetInStream()returns aFlow.Publisher<PacketIn>(JDK 9+, no extra dependency). Each subscriber sees every packet. For backpressure, request fewer items thanLong.MAX_VALUE; for fanout to a reactive stack, adapt withJdkFlowAdapter.flowPublisherToFlux(...)(Reactor) orFlowAdapters.toPublisher(...)(Reactive Streams).- The
onNextcallback runs on jp4's callback executor. Same FIFO contract asonPacketIn— slow consumers delay subsequent dispatches but never block the gRPC inbound thread.
Important: BMv2 PacketIn delivery caveat
P4Runtime spec §16.1 says PacketIn MUST be delivered to the primary client and SHOULD also be delivered to secondaries. BMv2 implements only the MUST. A secondary connection against BMv2 will register and load the pipeline successfully, but no PacketIn will arrive through packetInStream() because the device only fans them out to the primary.
On a target that does broadcast PacketIn to secondaries (some Tofino / Stratum builds), the same subscriber code attaches unchanged. The Java code is target-agnostic.
If you're testing this recipe against BMv2 and seeing no packets, that's the spec MUST/SHOULD gap, not a jp4 bug — see Troubleshooting: PacketIn handler never fires.
See also
- Connection and arbitration — primary vs secondary, election ids, mastership semantics.
- Packet I/O —
packetInStream()mechanics and the three consumption styles. - Threading model — which executor runs
Flow.Subscriber.onNext. network-monitorexample — the full runnable program this recipe was extracted from.