Tuesday, January 28, 2014

Comparision of different concurrency models: Actors, CSP, Disruptor and Threads

In order to make use of modern multi core/socket hardware, Java offers threads and locks for concurrent programming. It is well known this model suffers of a number of problems:
  • Deadlocks
  • Bad scaling due to "over-synchronization" or contention on common resources
  • Errors caused by invalid synchronization are timing dependent and hard to reproduce. It is common they appear under special conditions (load, hardware) in production.
  • As a software project grows, it gets hard to safely modify the application. A programmer needs global awareness of an applications control flow and data access patterns in order to change the software without introducing errors.
  • The subtleties of the Java Memory Model are not well known to most Java programmers. Even if JMM is known, incorrect programming isn't obvious and will fail rarely. It can be a nightmare to spot them from production logs of large applications.
So I am looking for alternative models to express concurrency. Therefore a simple benchmark of the most popular "alternative" concurrency models is done: Actors, CSP and Disruptor.



Actors


Real Life Analogy: People cooperating by sending mails to each other.

An Actor is like an object instance executed by a single thread. Instead of direct calls to methods, messages are put into the Actors "mailbox" (~queue). The actor single threaded reads and processes messages from the queue sequentially (with exceptions).
Internal state is exposed/shared by passing messages (with copied state) to other Actors.







CSP (Communicating Sequential Processes)


Real Life Analogy: People phoning each other.

Though a different terminology is used, CSP systems can be seen as a special actor system having bounded mailboxes (queues) of size 0.

So if one process (~Actor) wants to pass a message to another process, the caller is blocked until the receiver accepts the message. Alternatively to being blocked, a CSP-process can choose to do other things e.g. check for incoming messages (this introduces  non determinism to the order of outgoing messages). A receiver cannot accept incoming messages if he is occupied with processing a prior one.





Disruptor


The disruptor data structure came up some years ago and was invented and pioneered by Martin Thompson, Mike Barker, and Dave Farley at LMAX exchange.

Real Life Analogy: Assembly line




The disruptor is a bounded queue (implemented by a ring buffer) where producers add to the head of the queue (if slots are available, else the producer is blocked).
Consumers access the queue at different points, so each consumer has its own read position (cursor) inside the queue. Sequencing is used to manage queue access and processing order.



Thread + locking of shared data


contention
Ok, everybody knows this. Its Java's default to model concurrent execution. Actually these are the primitives (+CAS) used to build higher level concurrent models like those described above.











Conclusion


While the CSP/Actor-Model realizes inter-thread communication by passing/copying thread-local data to some queue or process, the Disruptor keeps data in place and assures only one thread (consumer or producer) is owner of a data item (=queue slot) at a time.
This model seems to fit existing CPU, memory and VM architectures better resulting in high throughput and superior performance. It avoids high allocation rates and reduces the probability of cache misses. Additionally it implicitly balances processing speed of interdependent consumers without growing queues somewhere.
There is no silver bullet for concurrent programming, one still has to plan carefully and needs to know what's going on.

While I am writing this, I realize each of these patterns has its set of best-fit problem domains, so my initial intention using a single benchmark to compare performance of different concurrency models might be somewhat off :-).


 

The Benchmark Application


 

Update: Jason Koch implemented a custom executor performing significantly better than the stock JDK executors. See his blog post.

The benchmark approximates the value of PI concurrently. Therefore N jobs are created, and the result of each job must be added to finally get an approximation of PI. For maximum performance one would create one large job for each Core of the CPU used. With this setup all solutions roughly would perform the same as there is no significant concurrent access and scheduling overhead.

However if we compute PI by creating 1 million jobs, each computing a 0..100-loop slice of PI, we get an impression of:
  • cost of accessing shared data when the result of each job is added to one single result value
  • overhead created by the concurrency control mechanics (e.g. sending messages/scheduling Runnables to a thread pool, CAS, Locks, volatile r/w ..).
The test is run with 2 configurations
  • 100 iterations per pi-slice-job, 1,000,000 jobs
  • 1,000 iterations per pi-slice-job, 100,000 jobs
As hardware I use
  • AMD Opteron 6274 Dual Socket 2,2 Ghz. Each socket has 8 cores + 8 Hardware Threads (so overall 16 cores + 16 HW Threads)
  • Intel XEON@3Ghz dual socket six core (12 cores + Hyperthreading turned off). (2011 release)
I never use more threads as there are "real" cores.



Stop the blabber, gimme results ! 


See bottom of this post for the benchmark source.
  • Threads - somewhat naive thread based implementation of the Pi computation. Enough effort invested it is possible to match any of the other results of course. At the core of the VM, threads, locks (+atomic, volatile r/w + CAS) are the only concurrent primitives. However there is no point in creating an ad-hoc copy of the Disruptor or an Actor system in order to compare concurrency approaches.
  • Akka - a popular Actor implementation on the VM. The benchmark has been reviewed and revised (especially the ActorSystem configuration can make a big difference) by the Akka crew. Threads are scheduled using Java 7's fork join pool. Actually the Pi computation is one of Akka's tutorial examples. 
  • Abstraktor - my experimental Actor/CSP implementation. It's using short bounded queues (so leans more to the CSP side) and avoids deadlocks by maintaining 2 queues per Actor (in and out). If the out-queue is blocked, it just reads from the in-queue.
    I am using Nitsan Wakarts excellent MPSC queue implementation (check his blog or github jaq-in-a-box) and that's the major reason it shows kind of competitive performance+scaling.
    I use this to get a rough baseline for comparision and experiment with different flavours of Actors/CSP. Probably the only thing one can do with it is to run the Pi bench ;-).
    Update: The experimental version benchmarked here has been consolidated + improved. You can find it ohttps://github.com/RuedigerMoeller/kontraktor
  • Disruptor - my naive approach implementing the benchmark based on the Disruptor 3.2. It turned out that I used a not-yet-polished utility class, however I keep this benchmark just to illustrate how smallish differences in implementation may have big consequences.
  • Disruptor2 - As Michael Barker would have implemented it (Thanks :-) ). Its actually more than twice as fast for the 1 million test as the closest runner up.
Intel (Xeon 2 socket each 6 cores, Hyperthreading off)

Well, it seems with 100k jobs of 1000 iterations, the benchmark is dominated by computation, not concurrency control. Therefore I retry with 1 million jobs each computing a 100 iteration slice.

Ok, we probably can see differences here :-)



AMD Opteron 6274 Dual Socket 2,2 Ghz (=16 real cores, 16 HW Threads)


Again the 1 million tiny-job variant spreads the difference amongst the approaches (and their implementation):


Note there is like 5% run to run jitter (GC and stuff), however that does not change the big picture.

Last but not least: Best results per CPU architecture per lib:




Discussion of Results


We see that scaling behaviour can be significantly different depending on the hardware platform used.

Akka loses a lot of CPU time doing GC and allocation. If I modify Abstraktor to use unbounded Concurrent Linked Queue (Akka uses those), it performs similar to Akka and builds up temporary queues >100k elements. This is an inherent issue of the Actor model. By leaning towards CSP (use very short bounded blocking queues with size <100), performance also suffers, as threads are blocked/spinning for input too often. However with a queue size of 5000 elements, things work out pretty well (introducing other problems like deadlocks in case of cyclic Actor graphs).
The Disruptor library is very well implemented, so a good part of the performance advantage could be attributed to the excellent quality of implementation.

Cite from the Disruptor documentation regarding queuing:
3.5 The Problems of Queues
[...]  If an in-memory queue is allowed to be unbounded then for many classes of problem it can grow unchecked until it reaches the point of catastrophic failure by exhausting memory. This happens when producers outpace the consumers. Unbounded queues can be useful in systems where the producers are guaranteed not to outpace the consumers and memory is a precious resource, but there is always a risk if this assumption doesn’t hold and queue grows without limit. [...]
When in use, queues are typically always close to full or close to empty due to the differences in pace between consumers and producers. They very rarely operate in a balanced middle ground where the rate of production and consumption is evenly matched. [...]
Couldn't have said it better myself =).



Conclusion


Although the Disruptor worked best for this example, I think looking for "the concurrency model to go for" is wrong. If we look at the real world, we see all 4 patterns used dependent on use case.
So a broad concurrency library ideally would integrate the assembly-line pattern (~Disruptor), queued messaging (~Actors) and unqueued communication (~CSP).



Benchmark source


AKKA

public class Pi {
static volatile CountDownLatch latch;
static long timSum = 0;
public static void main(String[] args) throws InterruptedException {
Pi pi = new Pi();
int numStepsPerComp = 1000;
int numJobs = 100000;
final int MAX_ACT = 16;
String results[] = new String[MAX_ACT];
for (int numActors = 1; numActors <= MAX_ACT; numActors++) {
timSum = 0;
for (int i = 0; i < 30; i++) {
latch = new CountDownLatch(1);
pi.calculate(numActors, numStepsPerComp, numJobs);
latch.await();
if ( i == 20 ) { // take last 10 samples only
timSum = 0;
}
}
results[numActors-1] = "average "+numActors+" threads : "+(timSum/10/1000/1000);
}
for (int i = 0; i < results.length; i++) {
String result = results[i];
System.out.println(result);
}
}
static class Calculate {
}
static class Work {
private final int start;
private final int nrOfElements;
public Work(int start, int nrOfElements) {
this.start = start;
this.nrOfElements = nrOfElements;
}
public int getStart() {
return start;
}
public int getNrOfElements() {
return nrOfElements;
}
}
static class Result {
private final double value;
public Result(double value) {
this.value = value;
}
public double getValue() {
return value;
}
}
static class PiApproximation {
private final double pi;
private final long duration;
public PiApproximation(double pi, long duration) {
this.pi = pi;
this.duration = duration;
}
public double getPi() {
return pi;
}
public long getDuration() {
return duration;
}
}
public static class Worker extends UntypedActor {
private double calculatePiFor(int start, int nrOfElements) {
double acc = 0.0;
for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) {
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
}
return acc;
}
public void onReceive(Object message) {
if (message instanceof Work) {
Work work = (Work) message;
double result = calculatePiFor(work.getStart(), work.getNrOfElements());
getSender().tell(new Result(result), getSelf());
} else {
unhandled(message);
}
}
}
public static class Master extends UntypedActor {
private final int nrOfMessages;
private final int nrOfElements;
private double pi;
private int nrOfResults;
private final long start = System.nanoTime();
private final ActorRef listener;
private final ActorRef workerRouter;
public Master(
final int nrOfWorkers,
int nrOfMessages,
int nrOfElements,
ActorRef listener) {
this.nrOfMessages = nrOfMessages;
this.nrOfElements = nrOfElements;
this.listener = listener;
workerRouter = this.getContext().actorOf(new Props(Worker.class).withRouter(
new RoundRobinRouter(nrOfWorkers)), "workerRouter");
}
public void onReceive(Object message) {
if (message instanceof Calculate) {
for (int start = 0; start < nrOfMessages; start++) {
workerRouter.tell(new Work(start, nrOfElements), getSelf());
}
} else if (message instanceof Result) {
Result result = (Result) message;
pi += result.getValue();
nrOfResults += 1;
if (nrOfResults == nrOfMessages) {
// Send the result to the listener
long duration = System.nanoTime() - start;
listener.tell(new PiApproximation(pi, duration), getSelf());
// Stops this actor and all its supervised children
getContext().stop(getSelf());
}
} else {
unhandled(message);
}
}
}
public static class Listener extends UntypedActor {
public void onReceive(Object message) {
if (message instanceof PiApproximation) {
PiApproximation approximation = (PiApproximation) message;
long duration = approximation.getDuration();
System.out.println(String.format("Pi approximation: " +
"%s Calculation time: \t%s",
approximation.getPi(), duration/1000/1000));
timSum += duration;
getContext().system().shutdown();
latch.countDown();
} else {
unhandled(message);
}
}
}
public void calculate(
final int nrOfWorkers,
final int nrOfElements,
final int nrOfMessages) {
// Create an Akka system
ActorSystem system = ActorSystem.create("PiSystem", ConfigFactory.parseString(
"akka {\n" +
" actor.default-dispatcher {\n" +
" fork-join-executor {\n" +
" parallelism-min = 2\n" +
" parallelism-factor = 0.4\n" +
" parallelism-max = "+nrOfWorkers+"\n" +
" }\n" +
" throughput = 1000\n" +
" }\n" +
"\n" +
" log-dead-letters = off\n" +
"\n" +
" actor.default-mailbox {\n" +
" mailbox-type = \"akka.dispatch.SingleConsumerOnlyUnboundedMailbox\"\n" +
" }\n" +
"}"
)
);
// create the result listener, which will print the result and shutdown the system
final ActorRef listener = system.actorOf(new Props(Listener.class), "listener");
// create the master
ActorRef master = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener);
}
}), "master");
// start the calculation
master.tell(new Calculate(), master);
}
}
view raw gistfile1.java hosted with ❤ by GitHub



Multi Threading


public class ThreadPi {
static double calculatePiFor(int slice, int nrOfIterations) {
double acc = 0.0;
for (int i = slice * nrOfIterations; i <= ((slice + 1) * nrOfIterations - 1); i++) {
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
}
return acc;
}
private static long piTest(final int numThreads) throws InterruptedException {
final int numMessages = 100000;
final int step = 1000;
final ExecutorService test = Executors.newFixedThreadPool(numThreads);
final AtomicInteger latch = new AtomicInteger(numMessages);
final AtomicReference<Double> result = new AtomicReference<>(0.0);
final AtomicLong timSum = new AtomicLong(0);
final long tim = System.currentTimeMillis();
for ( int i= 0; i< numMessages; i++) {
final int finalI = i;
while ( ((ThreadPoolExecutor)test).getQueue().size() > 40000 ) {
LockSupport.parkNanos(100);
}
test.execute(new Runnable() {
public void run() {
double res = calculatePiFor(finalI, step);
Double expect;
boolean success;
do {
expect = result.get();
success = result.compareAndSet(expect,expect+res);
} while( !success );
int lc = latch.decrementAndGet();
if (lc == 0 ) {
long l = System.currentTimeMillis() - tim;
timSum.set(timSum.get()+l);
System.out.println("pi: " + result.get() + " t:" + l + " finI " + finalI);
test.shutdown();
}
}
});
}
while (latch.get() > 0 ) {
LockSupport.parkNanos(1000*500); // don't care as 0,5 ms are not significant per run
}
return timSum.get();
}
public static void main( String arg[] ) throws Exception {
final int MAX_ACT = 16;
String results[] = new String[MAX_ACT];
for ( int numActors = 1; numActors <= MAX_ACT; numActors++ ) {
long sum = 0;
for ( int ii=0; ii < 30; ii++) {
long res = piTest(numActors);
if ( ii >= 20 ) {
sum+=res;
}
}
results[numActors-1] = "average "+numActors+" threads : "+sum/10;
}
for (int i = 0; i < results.length; i++) {
String result = results[i];
System.out.println(result);
}
}
}
view raw gistfile1.java hosted with ❤ by GitHub


Abstraktor


public class ActorPiSample {
public static class PiActor extends Actor {
public void calculatePiFor(int start, int nrOfElements, ChannelActor result ) {
double acc = 0.0;
for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) {
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
}
result.receiveResult(acc);
}
}
public static class PiStriper extends Actor {
PiActor actors[];
public void run(int numActors, int iterationSize, int numJobs, final ChannelActor resultListener ) {
final long tim = System.currentTimeMillis();
actors = new PiActor[numActors];
for (int i = 0; i < actors.length; i++) {
actors[i] = SpawnActor(PiActor.class);
}
final int iterPerAct = numJobs / numActors;
final int iterSum = iterPerAct * actors.length;
final ChannelActor endResult = Actors.QueuedChannel(new ChannelReceiver<Double>() {
double sum = 0;
int count = 0;
@Override
public void receiveResult(Double result) {
count++;
sum += result;
if (count == iterSum) {
resultListener.receiveResult(sum);
done();
PiStriper.this.getDispatcher().shutDown();
for (int i = 0; i < actors.length; i++) {
PiActor actor = actors[i];
actor.getDispatcher().shutDown();
}
}
}
});
int iteri = 0;
for (int i = 0; i < actors.length; i++) {
for ( int ii = 0; ii < iterPerAct; ii++ ) {
actors[iteri%actors.length].calculatePiFor(iteri, iterationSize, endResult /*subRes*/);
iteri++;
}
}
System.out.println("POK iteri " + iteri);
}
}
static long calcPi(final int numMessages, int step, final int numActors) throws InterruptedException {
final long tim = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(1); // to be able to wait for finish
final AtomicLong time = new AtomicLong(0);
ChannelActor resultReceiver = Actors.Channel(
new ChannelReceiver<Double>() {
public void receiveResult(Double pi) {
long l = System.currentTimeMillis() - tim;
System.out.println("T = " + numActors + " pi: " + pi + " " + l + " disp:" + de.ruedigermoeller.abstraktor.impl.Dispatcher.instanceCount.get());
time.set(l);
done();
latch.countDown();
}
});
PiStriper piStriper = Actors.AsActor(PiStriper.class);
piStriper.run(numActors,step, numMessages, resultReceiver );
// wait until done
latch.await();
return time.get();
}
}
view raw gistfile1.java hosted with ❤ by GitHub


Disruptor naive


public class DisruptorTest {
public static class PiJob {
public double result;
public int sliceNr;
public int numIter;
public void calculatePi() {
double acc = 0.0;
for (int i = sliceNr * numIter; i <= ((sliceNr + 1) * numIter - 1); i++) {
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
}
result = acc;
}
}
public static class PiEventFac implements EventFactory<PiJob> {
@Override
public PiJob newInstance() {
return new PiJob();
}
}
public static class PiEventProcessor implements WorkHandler<PiJob> {
@Override
public void onEvent(PiJob event) throws Exception {
event.calculatePi();
}
}
public static class PiResultReclaimer implements WorkHandler<PiJob> {
double result;
public AtomicLong seq = new AtomicLong(0);
@Override
public void onEvent(PiJob event) throws Exception {
result += event.result;
seq.incrementAndGet();
}
}
public long run(int numTH, int numSlice, int numIter) throws InterruptedException {
PiEventFac fac = new PiEventFac();
ExecutorService executor = Executors.newCachedThreadPool();
Disruptor<PiJob> disruptor = new Disruptor<>(fac,16384, executor, ProducerType.SINGLE, new BlockingWaitStrategy());
PiEventProcessor procs[] = new PiEventProcessor[numTH];
PiResultReclaimer res = new PiResultReclaimer();
for (int i = 0; i < procs.length; i++) {
procs[i] = new PiEventProcessor();
}
disruptor.handleEventsWithWorkerPool(procs).thenHandleEventsWithWorkerPool(res);
disruptor.start();
final RingBuffer<PiJob> ringBuffer = disruptor.getRingBuffer();
long tim = System.currentTimeMillis();
for (int i= 0; i < numSlice; i++ ) {
final long seq = ringBuffer.next();
final PiJob piJob = ringBuffer.get(seq);
piJob.numIter = numIter;
piJob.sliceNr = i;
piJob.result = 0;
ringBuffer.publish(seq);
}
while (res.seq.get() < numSlice)
{
Thread.sleep(1);
// spin
}
long timTest = System.currentTimeMillis() - tim;
System.out.println(numTH+": tim: "+ timTest +" Pi: "+res.result);
disruptor.shutdown();
executor.shutdownNow();
return timTest;
}
public static void main(String arg[] ) throws InterruptedException {
final DisruptorTest disruptorTest = new DisruptorTest();
int numSlice = 1000000;
int numIter = 100;
int NUM_CORE = 16;
String res[] = new String[NUM_CORE];
for ( int i = 1; i <= NUM_CORE; i++ ) {
long sum = 0;
System.out.println("--------------------------");
for ( int ii = 0; ii < 20; ii++ ) {
long t = disruptorTest.run(i, numSlice, numIter);
if ( ii >= 10 )
sum += t;
}
res[i-1] = i+": "+(sum/10);
}
for (int i = 0; i < res.length; i++) {
String re = res[i];
System.out.println(re);
}
}
}
view raw gistfile1.java hosted with ❤ by GitHub


Disruptor optimized


public class DisruptorTest2 {
public static class PiJob {
public double result;
public int sliceNr;
public int numIter;
public int partionId;
public void calculatePi() {
double acc = 0.0;
for (int i = sliceNr * numIter; i <= ((sliceNr + 1) * numIter - 1); i++) {
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
}
result = acc;
}
}
public static class PiEventFac implements EventFactory<PiJob> {
@Override
public PiJob newInstance() {
return new PiJob();
}
}
public static class PiEventProcessor implements EventHandler<PiJob> {
private int partionId;
public PiEventProcessor(int partionId) {
this.partionId = partionId;
}
@Override
public void onEvent(PiJob event, long sequence, boolean isEndOfBatch) throws Exception {
if (partionId == event.partionId) {
event.calculatePi();
}
}
}
public static class PiResultReclaimer implements EventHandler<PiJob> {
double result;
public long seq = 0;
private final int numSlice;
final CountDownLatch latch;
public PiResultReclaimer(int numSlice)
{
this.numSlice = numSlice;
latch = new CountDownLatch(1);
}
@Override
public void onEvent(PiJob event, long sequence, boolean isEndOfBatch) throws Exception {
result += event.result;
++seq;
if (seq >= numSlice) {
latch.countDown();
}
}
}
public long run(int numTH, int numSlice, int numIter) throws InterruptedException {
PiEventFac fac = new PiEventFac();
ExecutorService executor = Executors.newCachedThreadPool();
Disruptor<PiJob> disruptor = new Disruptor<>(fac,16384, executor, ProducerType.SINGLE, new SleepingWaitStrategy());
PiEventProcessor procs[] = new PiEventProcessor[numTH];
PiResultReclaimer res = new PiResultReclaimer(numSlice);
for (int i = 0; i < procs.length; i++) {
procs[i] = new PiEventProcessor(i);
}
disruptor.handleEventsWith(procs).then(res);
disruptor.start();
final RingBuffer<PiJob> ringBuffer = disruptor.getRingBuffer();
long tim = System.currentTimeMillis();
int partionId = 0;
for (int i= 0; i < numSlice; i++ ) {
final long seq = ringBuffer.next();
final PiJob piJob = ringBuffer.get(seq);
piJob.numIter = numIter;
piJob.sliceNr = i;
piJob.result = 0;
piJob.partionId = partionId;
ringBuffer.publish(seq);
partionId = (partionId == (numTH - 1)) ? 0 : partionId + 1;
}
res.latch.await();
long timTest = System.currentTimeMillis() - tim;
System.out.println(numTH+": tim: "+ timTest +" Pi: "+res.result);
disruptor.shutdown();
executor.shutdownNow();
return timTest;
}
public static void main(String arg[] ) throws InterruptedException {
final DisruptorTest2 disruptorTest = new DisruptorTest2();
int numSlice = 1000000;
int numIter = 100;
int NUM_CORE = 16;
String res[] = new String[NUM_CORE];
for ( int i = 1; i <= NUM_CORE; i++ ) {
long sum = 0;
System.out.println("--------------------------");
for ( int ii = 0; ii < 20; ii++ ) {
long t = disruptorTest.run(i, numSlice, numIter);
if ( ii >= 10 )
sum += t;
}
res[i-1] = i+": "+(sum/10);
}
for (int i = 0; i < res.length; i++) {
String re = res[i];
System.out.println(re);
}
}
}
view raw gistfile1.java hosted with ❤ by GitHub