Saturday, December 21, 2013

Big Data the 'reactive' way

This has been posted (by me ofc) on the Java Advent Calendar originally. Check it  out for more interesting articles:

A metatrend going on in the IT industry is a shift from query-based, batch oriented systems to (soft) realtime updated systems. While this is associated with financial trading only, there are many other examples such as "Just-In-Time"-logistic systems, flight companies doing realtime pricing of passenger seats based on demand and load, C2C auction system like EBay, real time traffic control and many more.

It is likely this trend will continue, as the (commercial) value of information is time dependent, value decreases with age of information.

Automated trading in the finance sector is just a forerunner in this area, because some microseconds time advantage can be worth millions of dollars. Its natural real time processing systems evolve in this domain faster.

However big parts of traditional IT infrastructure is not designed for reactive, event based systems. From query based databases to request-response based Http protcol, the common paradigm is to store and query data "when needed".

Current Databases are static and query-oriented

Current approaches to data management such as SQL and NOSQL databases focus on data transactions and static query of data. Databases provide convenience in slicing and dicing data but they do not support update of complex queries in real time. Uprising NOSQL databases still focus on computing a static result.
Databases are clearly not "reactive".

Current Messaging Products provide poor query/filtering options

Current messaging products are weak at filtering. Messages are separated into different streams (or topics), so clients can do a raw preselection on the data received. However this frequently means a client application receives like 10 times more data than needed, doing fine grained filtering 'on-top'.
A big disadvantage is, that the topic approach builts filter capabilities "into" the system's data design.
E.g. if a stock exchange system splits streams on a per-stock base, a client application still needs to subscribe to all streams in order to provide a dynamically updated list of "most active" stocks. Querying usually means "replay+search the complete message history".

A scalable, "continuous query" distributed Datagrid. 

I had the enjoyment to do conceptional & technical design for a large scale realtime system, so I'd like to share a generic scalable solution for continuous query processing at high volume and large scale.

It is common, that real-time processing systems are designed "event sourced". This means, persistence is replaced by journaling transactions. System state is kept in memory, the transaction journal is required for historic analysis and crash recovery only.
Client applications do not query, but listen to event streams instead. A common issue with event sourced systems is the problem of "late joining client". A late client would have to replay the whole system event journal in order to get an up-to-date snapshot of the system state.
In order to support late joining clients, a kind of "Last Value Cache" (LVC) component is required. The LVC holds current system state and allows late joiners to bootstrap by querying.
In a high performance, large data system, the LVC component becomes a bottleneck as the number of clients rises.

Generalizing the Last Value Cache: Continuous Queries

In a continuous query data cache, a query result is kept up to date automatically. Queries are replaced by subscriptions.
subscribe * from Orders where
   symbol in ['ALV', 'BMW'] and
   volume > 1000 and
creates a message stream, which initially performs a query operation, after that updates the result set whenever a data change affecting the query result happened (transparent to the client application). The system ensures each subscriber receives exactly the change notifications necessary to keep its "live" query results up-to-date.

A distributed continous query system: The LVC Nodes hold data. Transactions are sent to them on a message bus (red). The LVC nodes compute the actual difference caused by a transaction and send change notifications on a message bus (blue).  This enables "processing nodes" to keep a mirror of their relevant data partition up-to-date. External clients connected via TCP/Http do not listen to the message bus (because multicast is not an option in WAN). "Subscription processors" keep the client's continuous queries up-to-date by listening to the (blue) message bus and dispatching required change notifications only to client's point2point connection.


Difference of data access patterns compared to static data management:

  • High write volume
    Real time systems create a high volume of write access/change in data.
  • Fewer full table scans.
    Only late-joining clients or changes of a query's condition require a full data scan. Because continuous queries make "refreshing" a query result obsolete, Read/Write ratio is ~ 1:1 (if one counts the change notification resulting from a transaction as "Read Access").
  • The majority of load is generated, when evaluating queries of active continuous subscriptions with each change of data. Consider a transaction load of 100.000 changes per second with 10.000 active continuous queries: this requires 100.000*10.000 = 1 Billion evaluations of query conditions per second. That's still an underestimation: When a record gets updated, it must be tested whether the record has matched a query condition before the update and whether it matches after the update. A record's update may result in an add (because it matches after the change) or a remove transaction (because the record does not match anymore after a change) to a query subscription.

Data Cluster Nodes ("LastValueCache Nodes")

Data is organized in tables, column oriented. Each table's data is evenly partitioned amongst all data grid nodes (=last value cache node="LVC node"). By adding data nodes to the cluster, capacity is increased and snapshot queries (initializing a subscription) are sped up by increased concurrency.

There are three basic transactions/messages processed by the data grid nodes:

  • AddRow(table,newRow), 
  • RemoveRow(table,rowId), 
  • UpdateRow(table, rowId, diff). 

The data grid nodes provide a lambda-alike (row iterator) interface supporting the iteration of a table's rows  using plain java code. This can be used to perform map-reduce jobs and as a specialization, the initial query required by newly subscribing clients. Since ongoing computation of continuous queries is done in the "Gateway" nodes, the load of data nodes and the number of clients correlate weakly only.

All transactions processed by a data grid node are (re-)broadcasted using multicast "Change Notification" messages.

Gateway Nodes

Gateway nodes track subscriptions/connections to client applications. They listen to the global stream of change notifications and check whether a change influences the result of a continuous query (=subscription). This is very CPU intensive.

Two things make this work:

  1. by using plain java to define a query, query conditions profit from JIT compilation, no need to parse and interpret a query language. HotSpot is one of the best optimizing JIT compilers on the planet.
  2. Since multicast is used for the stream of global changes, one can add additional Gateway nodes with ~no impact on throughput of the cluster.

Processor (or Mutator) Nodes

These nodes implement logic on-top of the cluster data. E.g. a statistics processor does a continuous query for each table, incrementally counts the number of rows of each table and writes the results back to a "statistics" table, so a monitoring client application can subscribe to realtime data of current table sizes. Another example would be a "Matcher processor" in a stock exchange, listening to orders for a stock, if orders match, it removes them and adds a Trade to the "trades" table.
If one sees the whole cluster as kind of a "giant spreadsheet", processors implement the formulas of this spreadsheet.

Scaling Out

  • with data size:
    increase number of LVC nodes
  • Number of Clients
    increase subscription processor nodes.
  • TP/S
    scale up processor nodes and LVC nodes
Of cause the system relies heavily on availability of a "real" multicast messaging bus system. Any point to point oriented or broker-oriented networking/messaging will be a massive bottleneck.


Building real time processing software backed by a continuous query system simplifies application development a lot.
  • Its model-view-controller at large scale.
    Astonishing: patterns used in GUI applications for decades have not been extended regulary to the backing data storage systems.
  • Any server side processing can be partitioned in a natural way. A processor node creates an in-memory mirror of its data partition using continuous queries. Processing results are streamed back to the data grid. Computing intensive jobs, e.g. risk computation of derivatives can be scaled by adding processor instances subscribing to distinct partitions of the data ("sharding").
  • The size of the Code Base reduces significantly (both business logic and Front-End).
    A lot of code in handcrafted systems deals with keeping data up to date.

About me

I am a technical architect/senior developer consultant at an european company involved heavily in stock & derivative trading systems.

This post is part of the Java Advent Calendar and is licensed under the Creative Commons 3.0 Attribution license. If you like it, please spread the word by sharing, tweeting, FB, G+ and so on!

Sunday, December 15, 2013

Dart - a possible solution to the high.cost@lowquality.webapp syndrome

I am involved in development of high quality real time middle ware/GUI front-end applications professionally. Up to now, we were forced to use traditional Fat Client/Server designs, as Web Applications fail to fulfil basic usability requirements (e.g. Key Shortcuts) and performance requirement (high frequency real time updates). Basic UI features often require nasty and unmaintainable JavaScript hacks (the awkward, unfunful kind of hack) which blow up development+testing cost.

Regarding user experience/performance, checkout this example of a (good!) server centric framework:

(no offence, Apache-Wicket is one of the best Web-Frameworks out there IMO).
  • its sluggish
  • can't navigate with arrow keys
  • no standard multi selection (with Ctrl, Shift etc.)
if we go down the JavaScript route, we can do better:

However (having fiddled with JS myself) its suspicious: one can't resize any of the demo tables. Googling results in the following finding:

The Grid widget works with fixed width only which is specified by its ‘width’ property. It is not possible to set its width in percentages in this version. We’ll implement the requested feature for a future version. 
Best Regards, XXXX, DevTeam

So by the end of year 2013, it is still a problem to achieve basic UI functionality with current web standards and browsers. Even worse, it fails to respect basic software engineering principles such as DRY (don't repeat yourself), encapsulation, ..
(check out the "source" tab of the link above to get an impression of the mess required to create a simple table demo component).

This is not the fault of the libraries mentioned above, the root cause of this mess are the underlying half baked W3C "standards" (+ browser implementations). They are apparently defined by people never involved in real world application development.

So we sit there with an intact hype, but without a productive development stack to fulfil the vision. Since management tends to share the hype but does not validate technical feasibility, this led to some prominent mis-decisions such as Steve Jobs claiming WebApps can be used on the iPhone instead of native apps (first iPhone release had no native APIs), another example is facebook relying on html5 too much, now moving back to native phone apps hastily.

Following I'll explain why I think we urgently need the paradigm shift in Web-Applications. Without a fundamental change, industry will keep burning money trying to build WebApplications based on expensive , crappy, half-baked technology.

In the beginning there was REST

and it sucked right away ..

The use of the REST (Representational State Transfer) design pattern results in

  • the absence of state in a server application.
    Any durable client state gets passed with each request of a client. 
  • each user interaction (e.g. click a button) requires a complete html document served, transmitted and rendered. 
  • no concept of a "connection" at HTTP-protocol level,
    so authentication and security related stuff (e.g. SSL handshake) has to be repeated with each request/response (=change of GUI state). 

This allowed WebGUIs to max out on bandwidth inefficiency and latency. It also leads to usability nuggets by losing subtle GUI state (such as scrollbar position or the longish forum post you started to write).

JavaScript, Ajax and "Look ma, a menu in the browser"

Innovation (& browser wars) never sleep. Industry leading companies and consortium's figured out, that the best way to address fundamental design issues is to provide many many many workarounds on top of it.

  • Http Keep alive to reduce latency
  • Standardize on an untyped weirdish scripting language,
    which can be cluttered all across the html page in tiny snippets. Ah .. there is nothing like mixing program, content and styling into a big code soup ..
  • Content Caching (man we had fun doing software updates ..)
    to avoid superfluous reload of static content
  • Define new Html Tags. A lot of them.
    The advantage of having many html tags is, that browsers have more room to implement them differently, allowing for the popular "if ( ie6 ) {...........} else if (mozilla) .. else if (safari) .." coding style. This way bandwidth requirements could be extended even more.
  • CSS was embraced,
    so when done with definition of new tags, they could continue with definition of new css attributes. Also javascript could mess up even better modifying the HTML DOM and CSS definitions at runtime.
  • Async Http Request (AJAX)
    avoids the need to reload whole html-page.
These are kludges (some of them with notable negative side effects) addressing fundamental issues of an overly simplistic foundation technology. So we got, what I call the MESS technology stack:

The MESS principles also infiltrated server side frameworks, as they had to deal with the MESS somehow. I'd speculate some of the frameworks are 90% "mess-handlers', designed to virtualize a solid engineered foundation in order to increase productivity when messing with the MESS.

Google to the rescue !!

One can safely assume apple and microsoft are not *that* interested in providing a fluid web application platform. If all applications would be WebApplications, the importance of the underlying operation system dwindles. All a user needs, is a compliant browser (hello Chrome OS).

Google as a major web application implementor/provider suffer them self from the high.cost@low-quality.webapp syndrome.

Some clever JavaScript hackers (<= positive) started to figure out a simplified, more flexible web application design. They basically implemented a "Fat Client" in JavaScript. Instead of doing page-reloading Http requests, they used Async Http to retrieve data from the server and rendered this data by modifying parts of the displayed html document (via DOM). Whenever you see a responsive 'cool' web application, you can safely assume it relies on this approach.

However JavaScript is not designed to work well at large business applications with many lines of code and fluctuation of development staff as typical for enterprise software development. It becomes a maintenance nightmare for sure (if the project succeeds at all) or a minimalistic solution hated by end-users.

The first widely known step in backing up the hype with a structured technology was the Google Widget Toolkit, which relied on compiling ("transpiling") java to browser compatible JavaScript.
For various reasons, this project has been stalled (but not dropped) by Google in favour of their new Dart language before the technology reached a 'mature' state. It has been reported that turn-around times get unacceptable with larger applications and its unlikely this will change (because resources have been moved to Dart). Remembering the law suit of Oracle against use of Java in Android, this move of Google is not too surprising.

Dart has the potential to reach a level of convenience which has been there for native apps for years:

  • extensive API to the browser. Anything achievable with JavaScript can be done with Dart.
  • a modern structured language with (optional) compile time type checking, a package concept and support for many well-known principles of object oriented programming like Classes, Interfaces (Mixins), Inheritance, Lambdas. The concurrency model of Dart (Isolates, similar to Actors) could be interesting on the server side also.
  • compilable to JavaScript
  • support for fully encapsulated tag-based UI components (via polymer lib and upcoming WebComponents w3c standard)
  • backed by a company providing the worlds leading browser and a believable self-dependency on the technology.


Mankind not only is capable to build chips with 800 million transistors, it also might be capable creating standards defined by 800 million words .. scary ;-) .

What about Java ?

Update: Back2Brwsr and TeaVM are projects based on Java=>JS transpiling.

While the Scala guys join the party late, Oracle misses the train and seems to favour JavaFX as a GUI platform (my prediction: Will fall asleep prior to reaching maturity).
GWT has been open sourced, so maybe the community will do further development. Given that there are more open source projects than developers willing to contribute, I doubt this will be able to keep pace with Google's money backed Dart. Also I could imagine the (aging) Java community shys away from (partially) deprecating their habitual server side framework landscape.

There is also backed by a bigger german hosting company. I have not tested this, but I doubt that pure community driven development will keep pace. Being compliant to ever changing draft specs and dealing with browser implementation variance is not fun ...

What's required is a byte-code to JavaScript transpiler. 
The browser API (DOM access) could be modeled by a set of interfaces at compile time and replaced by the transpiler with the appropriate javascript calls. 
Using static bytecode analysis or by tracking required classes at runtime, the set of required transpiled classes/methods could be found. Additionally one would need a fall-back transpiler at runtime. If a client side script instantiates an "un-transpiled" java class or method, it could be transpiled on-the-fly at server side and loaded to the client.
Nice thing of such a bytecode level transpiler would be, that all VM-based languages (groovy, scala, jruby, jphyton, ..) could profit from such a system. 
Example of a possible Java2JavaScript transpiling solution: Since Java is a wide spread server platform, a java2js transpiler could work dynamically, which is a major strategic advantage over Dart, which needs to use static analysis & compilation because most server systems are not running native dart.

There needs to be put real money on such a Java solution, with slow moving JCP we probably can expect something like this 2016 or later ...
Would be fun building such a transpiler system .. but no time - gotta mess with the MESS.

Update: The J2EE java 8 project seems to adapt to new 'thin' webapp architecture with project "Avatar". I still haven't figured out the details. On a first glance it looks like they bring JS to the server instead of bringing Java transpilation to the client .. which is a mad idea imho :-).

Update on GWT: A google engineer answered back on GWT telling me the project is alive and is in active development. It turns out Dart and GWT are handled by distinct (competing?) divisions of google. There still is evidence, that Dart receives the real money while GWT is handed over to the community. Google plans to bring native DartVM to chrome within ~1 year (rumours) and plans on delivering the DartVM to Android also.