BPMUX (back-pressure multiplexing)
@aljoscha very keen for back pressure rpc though!
I guess teasing about an alternative message format and then not giving any information about it would be somewhat mean. So here are some notes on the protocol.
At its core, there's an abstract specification of different communication patterns, single unidirectional messages, request/response exchanges, sinks, streams, duplexes (combined sink + stream). Everything involves backpressure, data can only be transmitted once the receiver has signaled that it wants to receive data. This abstract specification determines the API exposed to the programmer.
There can be different concrete implementations for these APIs, depending on the physical (or logical) network over which the multiplexing happens. So far I've only specced out bpmux/rel
, which implements the abstract specification on a bidirectional, reliable, ordered communication channel (i.e. what you get with unix domain sockets or tcp). But there could be implementations for infrastructure other than reliable channels, e.g. bpmux/udp
, bpmux/[sctp](https://en.wikipedia.org/wiki/Stream_Control_Transmission_Protocol)
or bpmux/[dccp](https://en.wikipedia.org/wiki/Datagram_Congestion_Control_Protocol)
.
Here are some old notes of mine on the abstract model and the concrete bpmux/rel specification. I've iterated on the protocol design a few times since then, so they are not up to date. Key things I'd handle differently than described beloware unordered streams of messages, and the details of how bpmux/rel encodes packet types. The core ideas (credit-based backpressure à la reactive-streams, packets à la packet-stream) are the same though.
The whole thing is pretty similiar to packet-stream, just with mandatory backpressure on all levels, and a slightly different abstract communication model.
Here are the notes I jotted down a few months ago, I hope the above introduction helps you to make some sense of them.
Infodump
A protocol for multiplexing messages, requests and responses, and sinks, streams and duplexes over a bidirectional, reliable, order preserving channel. Provides backpressure for top-level packets as well as per-stream backpressure.
TODO provide a description of how things work.
Top-level packets are guaranteed to be delivered in the order in which they were sent.
Unless indicated otherwise, sending a message consumes top-level credit (1 credit per byte sent, including metadata).
The VarU64 type is defined and implemented here.
When receiving a packet of unknown first byte, treat the next byte(s) as a VarU64 and drop that many bytes after that VarU64, then continue the stream. If any new packets are added to the BPMux protocol, they will include VarU64 indicating the remaining packet length directly after the first type. This will allow backwards-compatible protocol additions.
TODO:
- close top-level with error
- reject min-credit
- include min-credit in CreateSink and CreateDuplex
- add greedy sending versions of CreateSink and CreateDuplex
- consume top-level credit to attach any number of items to directly send
- peer replies how many of these items it accepted (the dropped ones should be resent using inner credit)
- handshaking (give initial top-level credit, specify min-credit)
Top Level Packets
/// Allow the peer to send `credit` more bytes of top-level packets (does not consume credit)
MainCredit | credit: VarU64
/// Indicate that no more Message, Request, OpenStream, OpenSink, OpenDuplex packets will be sent (does not consume credit)
Close
/// Indicate the lowest credit at which you can still fully function.
Minimum | min_credit: VarU64
/// The same as sending RequestCancel and InnerCancel messages for all requests, streams and duplexes (does not consume credit)
CancelAll
/// Send a message to the peer (one-shot, unidirectional)
Message | length: VarU64 | payload: [u8; length]
/// The same as sending multiple messages. `total_length` is the remaining length of the packet (including the bytes for the length VarU64s)
Messages | total_length: VarU64 | (length: VarU64 | payload: [u8; length])*
/// Send a request to the peer (one-shot, receive a single response)
Request | req_id: VarU64 | length: VarU64 | payload: [u8; length]
/// The same as sending multiple requests. The reqests are assigned consecutive ids, starting from `initial_req_id`. `total_length` is the remaining length of the packet (including the bytes for the length VarU64s)
Requests | initial_req_id: VarU64 | total_length: VarU64 | (length: VarU64 | payload: [u8; length])*
/// Indicate that you are no longer interested in the response to a request of a certain id (does not consume credit)
RequestCancel | req_id: VarU64
/// Send the response to a previously received request (does not consume credit)
Response | req_id: VarU64 | length: VarU64 | payload: [u8; length]
/// Answer a previously received request with an error (does not consume credit)
ResponseError | req_id: VarU64 | length: VarU64 | payload: [u8; length]
/// Create a stream from the peer, with initial credit
CreateStream | inner_id: VarU64 | credit: VarU64 | length: VarU64 | payload: [u8; length]
/// Create a sink to the peer
CreateSink | inner_id: VarU64 | length: VarU64 | payload: [u8; length]
/// Create a duplex with the peer, with initial credit for the stream half
CreateDuplex | inner_id: VarU64 | credit: VarU64 | length: VarU64 | payload: [u8; length]
Inner Packets
These consume inner credit, not top-level credit.
/// Allow the peer to send `credit` more bytes of stream-level packets (does not consume credit)
InnerCredit | inner_id: VarU64 | credit: VarU64
/// Indicate that you are no longer interested in more items over a stream (does not consume credit)
InnerCancel | inner_id: VarU64
/// Send an item over a stream
InnerItem | stream_id: VarU64 | length: VarU64 | payload: [u8; length]
/// Send multiple items over a stream. `total_length` is the remaining length of the packet (including the bytes for the length VarU64s)
InnerItems | inner_id: VarU64 | total_length: VarU64 | (length: VarU64 | payload: [u8; length])*
/// Signal that no more items will be sent over a stream
InnerEnd | stream_id: VarU64
/// Signal an error over a stream (same as InnerEnd, but with a payload)
InnerError | stream_id: VarU64 | length: VarU64 | payload: [u8; length]