Java Recipe for Realtime Graphing with JS and Bayeux

The data comes from my employer, but this is was my pet project.
Note: Don't try a non-Sun/Oracle JVM with Jetty 7.

Everyone in retail finance needs to generate realtime graphs for the web. It gives users a sense of what the price is doing and they're pretty.

Technology

To do this you will need a data source, a price server and a graphing client. The data source is going to depend on your application, but I would expect it to be a webpage. If so, HttpClient and XPath are your friends. We then have a choice of price servers and protocols to communicate with our clients.

Fast Polling HTTP Simply hitting the server for new data every 2-5 seconds. This isn't as dumb as it sounds. Placing a caching reverse proxy between the application server and client reduces the CPU overhead of servicing these request to almost nothing. (On a regular server Varnish can handle 7000 requests/sec !)

Binary Protocol Hold a TCP/IP connection open between server and client, then push individual prices down it. Which is the most bandwidth efficient option. It also gives you the option of writing clever code to make the server faster. On the downside, you will need a custom client (probably a Java applet) and it is likely that firewalls will regard the traffic as suspicious.

Comet Comet means having a server stream an infinitely long HTTP response. There are multiple techniques for doing this. The upside is that you can have realtime networking within the browser, the downside is that you need a web server able to handle an open connection per client.

Fortunately, Jetty is able to handle very large numbers of simultaneous HTTP connections. Rather than tying down a thread for each request, Jetty uses continuations to suspend processing a request and resume processing it when new information is available. The CometD implementation of the Bayeux comet protocol was built around Jetty continuations.

Before we think about charts, we should go over how Bayeux works.

  • Every Bayeux message is a single JSON encoded object. (JSON is just a plain javascript object.)
  • All messages are sent over channels defined by URL paths. (Globbing subscriptions is permitted.)
  • Messages can be sent to every subscriber of a channel, or delivered to specific client.

And that's pretty much it. The entire specification is only a few pages long. And Bayeux clients are available for javascript, which means we won't need to fiddle with Flash or Java applets.

Provided we can draw the graphs through the browser DOM… Firefox and Safari allow this through the <canvas> tag. Until recently, people held off using this tag because IE could not support it. But there is now a script that emulates <canvas> in IE6+. (It does work in IE6, I've tested it!) There are now several javascript graphing libraries - I like flot the best. It's elegant and looks great. (Flotr has candlestick graphs though.)

For the sake of simplicity, let's build all of this inside a Spring container [Spring provides a "plug-board" for wiring together Java beans. Learn more here.]

Getting started

To begin with we will need to grab the base Maven archetype for CometD with Jetty7 and jQuery. [If you don't understand Maven, start here and come back in a month when it makes sense.]

Then follow the CometD Spring integration guide, using the Late Spring Initialisation strategy. It is actually important to use this strategy, as lazy initialisation of Spring components would interfere with the timed events we want to use later.

This should give us a Spring context like...

<beans>
    <bean id="priceService"
        class="com.example.PriceService">
        <constructor-arg>
            <ref bean="bayeux" />
        </constructor-arg>
    </bean>
</beans>

Then within our PriceService we should have somewhere to store our prices. I'm going to use a synchronised SortedMap.

// Assume a Price interface.  Longs represent timestamps
private SortedMap<Long, Price> store =
	Collections.synchronizedSortedMap( new TreeMap<Long, Price>() );

It is important to use a synchronised collection, as we will be accessing it in response to user requests while other threads add data to it./p>

Our data needs to come from somewhere, so let's have a method 'tick()' that's called frequently to harvest fresh data. Spring comes with a timer mechanism to do this.

After we've written our 'tick()' method, we can extend our Spring context with this.

<!-- Define the schedule -->
<bean id="scheduledTask"
	class="org.springframework.scheduling.timer.ScheduledTimerTask">
	<!-- run every 0.4 seconds -->
	<property name="period" value="400" />
	<property name="timerTask" ref="doIt" />
</bean>
 
<!-- Define the task to be performed -->
<bean id="doIt" class="org.springframework.scheduling.timer.MethodInvokingTimerTaskFactoryBean">
	<property name="targetObject" ref="priceService" />
	<property name="targetMethod" value="tick" />
</bean>
 
<!-- Factory that spawns the runners -->
<bean id="timerFactory"
	class="org.springframework.scheduling.timer.TimerFactoryBean">
	<property name="scheduledTimerTasks">
		<list>
			<ref bean="scheduledTask" />
		</list>
	</property>
</bean>

Now we have to think about communicating with the client. In my case, I am choosing to give each price series a Bayeux channel like so.

/price/[Security]/[Quoting Currency]/[Bid or Ask]

Then my JSON objects can be as simple as:

price { time:109345863445, amount:3453 }

Your code to push prices to your client will then look something like this.

private void pushPrice( Price price, Client client, Long time ) {
	Map<String, Object> output = new HashMap<String, Object>();
	output.put("price", price.getPrice());
	output.put("time", time );
 
	String channel = "/price/" + price.getSecurity() + "/" +
		price.getConsiderationCurrency();
 
	this.getBayeux().getChannel(channel, true).publish(getClient(), output, null );
}

This will need to be invoked whenever our tick() method finds fresh prices.

If you are representing your prices as objects (and you should), then a little bit of code needs to register them for conversion to JSON.

JSON.registerConvertor( Price.class, new JSONObjectConvertor() );

Back to the browser

Meanwhile on the client there needs to be some logic to subscribe to this data. CometD's javascript client example comes with a default method called _connectionSucceeded that sets up the client's subscriptions. We can add some code to that for our prices.

// Subscribe using the _doSomething as the callback handler
cometd.subscribe( '/price/GOOG/USD',
	function( message ) { _doSomething( message ); } );

_doSomething will now be called every time we push a message on /price/GOOG/USD . The message object will contain the data from our server as the attribute 'data'.

You will need to sort the data, as comet does not guarantee message order. And to discard data that's too old to be interesting. So if we had an array of time series data we could write a function.

function _addPrice( message, data ) {
	data.push( [ message.data.time, message.data.price ]);
	data = data.sort( function(a, b) {
			return a[0] - b[0]; });
	data = data.filter( function(a) {
			return a[0] < new Date().getTime() - (duration * 2); });
};

Historical Data

Even if you are charting short time periods you will still need to fetch past data to populate your graph.

So the client needs to ask for all the price point since a certain timestamp.

cometd.publish('/datasince', {
	since: new Date().getTime() - 10 * 60000 // 10mins });

And our server needs to respond to the client.

public void sendDataSince(Client client, Map<String, Object> data ) {
	_sendDataSince( Long.valueOf( data.get("since").toString()), client );
}
 
void _sendDataSince( Long timeFrom, Client client ) {
	client.startBatch();
	for( Entry<Long, Price> entry : this.store.tailMap( timeFrom ).entrySet() ) {
		pushPrice( entry.getValue(), client, entry.getKey() );
	}
	client.endBatch();
}

And in the PriceService constructor, we will need this line to tie this method to the channel.

subscribe("/datasince", "sendDataSince");

Notice that this code delivers the prices to a specific client. All Bayuex clients have a unique id which is sent with their messages, and we can use this to respond to them individually. The start/end batch methods will merge the price messages into a single HTTP transfer, and a single AJAX event on the browser.

(In a serious application you would want to store your historical data in an SQL database or a round-robin database.)

Graphing

Flot already has time series functionality, so we just need to get our data into that.

For realtime data the important thing is to control chart refreshes. If we refresh with every new price, we will crash the browser. So, using jQuery Timers, chart refreshes can be made to happen every 0.15s.

// upToDate flag stops us when there's no new data
$(document).everyTime( 150, function() {
	if( !upToDate ) { _plotChart(); }
});