Sunday, October 12, 2014

Alternatives to Executors when scheduling Tasks/Actors



Executors work well when fed with short units of stateless work, e.g.: division of a computation onto many CPU's. However they are sub-optimal to schedule jobs which are part of an ongoing, larger unit of work, e.g.: Scheduling messages of an actor or lightweight process.

Many Actor Frameworks (or similar message passing concurrency frameworks) schedule batches of messages using an Executor service. As the Executor service is not context aware, this spreads out processing of a single Actor/Task's messages across several threads/CPU's.

This can lead to many cache misses when accessing the state of an Actor/Process/Task as the processing thread changes frequently.
Even worse, this way CPU caches cannot stabilize as each new "Runnable" washes out cached memory of the previously processed task's.

A second issue arises when using busy-spin polling. If a framework reads its queues using busy-spin, it generates 100% CPU load for each processing Thread. So one would like to add a second thread/core only if absolutely required, so a one-thread-per-actor policy is not feasible.

With Kontraktor 2.0 I implemented a different scheduling mechanism, which achieves a horizontal scaling using a very simple metric to measure actual application CPU requirements, without randomly spreading processing of an actor onto different Cores's.


An actor has a fixed assignment to a workerthread ("DispatcherThread"). The scheduler periodically reschedules Actors by moving them to another worker if necessary.

Since overly sophisticated algorithms tend to introduce high runtime costs, actual scheduling is done in a very simple manner:

1) a Thread is observed as being overloaded, if the consume loop (pulling messages from the actor queue) has not been idle for N messages (currently N=1000).
2) If a Thread has been marked "overloaded", Actors with largest mailbox sizes are migrated to a newly started Thread (until #Threads == ThreadMax) as long SUM_QUEUED_MSG(Actors scheduled on Thread A) > SUM_QUEUED_MSG(Actors scheduled on newly created Thread B).
3) in case #Threads == ThreadMax, actors are rebalanced based on summation of queued messages and "overload" observations.

Problem areas: 
  • If the processing time of messages vary a lot, summation of queued messages is misleading. An improvement would be to add a weight onto each message type by profiling periodically. A simpler option would be to give an Actor an additional weight to be multiplicated with its queue size.
  • For load bursts, there is a latency until all of the available CPU's are actually used.
  • The delay until JIT kicks in produces bad profiling data and leads to false scale ups (heals over time, so not that bad).

An experiment

Update: Woke up this morning and it came to my mind this experiment has a flaw, as jobs per workitem are scheduled in parallel for the executor tests, so I am probably measuring the effects of false sharing. Results below removed, check the follow up post.



Performance cost of adaptive Scaling


To measure cost of auto-scheduling vs. explicit Actor=>Thread assignment, I run the Computing-Pi Test (see previous posts).
These numbers do not show effects of locality as it compares explicit scheduling with automatic scheduling.

Test 1 manually assigns a Thread to each Pi computing Actor,
Test 2 always starts with one worker and needs to scale up automatically once it detects actual load.


(note example requires >= kontraktor2.0-beta-2, if the 'parkNanos' stuff is uncommented it scales up to 2-3 threads only)

The test is run 8 times with increasing thread_max by one with each run.

results:

Kontraktor Autoscale (always start with 1 thread, then scale up to N threads)

1 threads : 1527
2 threads : 1273
3 threads : 718
4 threads : 630
5 threads : 521
6 threads : 576
7 threads : 619
8 threads : 668

Kontraktor with dedicated assignment of threads to Actors (see commented line in source above)

1 threads : 1520
2 threads : 804
3 threads : 571
4 threads : 459
5 threads : 457
6 threads : 534
7 threads : 615
8 threads : 659


Conclusion #2

Differences in runtimes can be attributed mostly to the delay in scaling up. For deterministic load problems, prearranged Actor scheduling is more efficient ofc. However thinking of a server receiving varying request loads over time, automatic scaling is a valid option.

1 comment: