Saturday, August 30, 2014

Backpressure in async communication systems

I recently saw this excellent presentation video of Mr Kuhn ..  and later on found about the community standardization effort of reactive streams.

As I have done many experiments dealing with backpressure issues in a high volume async distributed system, I'd like to point out some possible pitfalls of the "flow control" mechanism proposed in the reactive streams spec and point out how I dealt with the problem (after having tried similar solutions).

The problem of flow control arises in asynchronous communication if the sender sends messages at a higher pace than (one of the) receivers can process them.
This problem is well known from networking and actually TCP implements a similar scheme to the one proposed in the RStreams spec. Another - less known - flow control scheme is NAK. NAK is used typically in low latency brokerless (multicast) messaging products.

The "Reactive Streams" spec proposes an acknowledge based protocol (although its not presented as such).

Receiver sends "requestMore(numberOfMsg)" to Sender
Sender sends "numberOfMsg" messages to Receiver

by emitting "requestMore", a Receiver signals it has received+processed (or will have finished processing soon) previously received messages. Its an acknowledgement message (ACK).

Problematic areas:

  1. it works for point to point communication only. If you have a multicast situation (e.g. one sender multicasts to N receivers), this does not work well.
    In order to make this work, the sender needs to be aware of all receivers, then wait for every receiver to have sent a "requestMore", then compute the minimum of all "requestMore" messages and finally send this amount of messages. Once a receiver dies, no "requestMore" is send/received, so the sender can't sent anything for an extended amount of time (receiver-timeout). If one receiver pauses (e.g. GC's), a "requestMore" is missing, so the sender needs to stall all communication.
    You can see those effects in action with JGroups using a configuration with credit based flow control protocol. Once a cluster node terminates, throughput falls to zero until the node-leave timeout is reached (similar issues are observable with the ACK-based NACKACK reliable UDP transport of JGroups). ACK ftl :-)
  2. The actual throughput depends hard on connection latency. As the ACK signal ("requestMore") takes a longer time then to reach the sender, the receiver buffer must be enlarged depending on latency, additionally the size of chunks requested by "requestMore" must be enlarged. If one does not do this, throughput drops depending on latency. Short said:
    sizeOfBuffer needs to be >= eventsProcessable per roundTripLatency interval. This can be solved by runtime introspection, however in practice this can be tricky especially with bursty traffic.
  3. As long reliable communication channels (in memory, TCP) are used: Those transports already have implemented a flow control / backpressure mechanism, so "Reactive Streams" effectively doubles functionality present at a lower and more efficient level. Its not too hard to make use of the backpressure signals of e.g. TCP (one solution is an "outbound" queue on sender side, then observe its size). One can achieve the same flow control results without any need for explicit application level ACK/Pull messages like "requestMore". For inmemory message passing, one simply can inspect the queue size of the receiver. [you need a queue implementation with a fast size() implementation then ofc]

Alternative: NAK instead of ACK, adaptive rate limits

The idea of NAK (=negative acknowledgement) is raise backpressure signals from receiver side only in case a component detects overload. If receivers are not in an overload situation, no backchannel message traffic is present at all.

The algorithm outlined below worked for me in a high throughput distributed cluster:

Each sender has a send rate limit which is increased stepwise over time. Usually the starting default is a high send rate (so its actually not limiting). Receivers observe the size of their inbound queue. Once the queue size grows to certain limits, receivers emit an NAK( QFillPercentage ) message. E.g. 50%, 75%, 87%, 100%. The sender then increases the send rate limit depending on the "strength" of the NAK message. To avoid permanent degrade, the sender increases the send rate stepwise (the step size may depend on the time no NAK was received).

This works for 1:1 async communication patterns as well as for 1:N. In an 1:N situation the sender reacts to the highest strength NAK message within a timing window (e.g. 50ms). The 0-traffic-on-dying-receiver problem is not present. Additionally the sender does not need to track the number of receivers/subscribers, which is a very important property in 1:N multicast messaging.

Finding the right numbers for NAK-triggers, and sender speed up steps can be tricky as they depend on the volatility of sendrate and latency of transport link. However it should be possible to adaptively find appropriate values at runtime. In practice I used empirical values found by experimenting with the given system on sample loads, which is not a viable solution for a general implementation ofc.

The backpressure signal raised by the NAK messages can be applied to the sender either blocking or nonblocking. In concrete system I just blocked the sending thread which effectively puts a dynamically computed (current rate limit) "cost" for each async message send operation. Though this is blocking , it did not hurt performance, as there was no "other work" to do in case a receiver can't keep up. However backpressure signals can be applied in a nonblocking fashion as well ("if (actualSendRate < sendRateLimit) { send } else { do other stuff }").