Since the launch of the LGO platform earlier in the year, we’ve had to make quite a number of difficult design decisions which will be explained in this article.

Most of these are based on the principle that LGO is trying to achieve the same throughput as a traditional institutional exchange : millions of orders per second.

To achieve these kinds of numbers, we are expecting a significant amount of orders from a limited number of institutional customers who are connected via http API or better yet, socket, which is similar to how a traditional exchange is setup.

So, enough intro talk, let’s get started.

Photo by Taylor Vick on Unsplash

Why Speed Matters

Millions of orders per second. Do we really expect to have to deal with those kind of volumes? Not always, of course, but this speed is about dealing with activity spikes, and applying a safe margin.

For instance, let’s say the average activity is around 30k orders/s, in case of a sudden rush, the exchange may need to absorb 400k orders/s. It will need to be able to process this activity without slowing down and exposing clients to greater market risk. So why one million per second ? Simply because it would constitute a safe margin for the exchange.

The goal is for LGO to be the «last exchange standing» in the case of a large spike in volume.

The problem

An exchange is a strange beast, really. You may have read elsewhere that with the developments in current technology, such as cloud computing, distributed databases, whatever buzzword, all you have to do to scale is to click on a magic button, to add more machines to your cluster, and off you go. Of course, you need some sort of magic, auto-sharding, auto-everything database, to be the real host of your application state. There is some truth to this narrative, especially if the application is mostly concerned with one user updating or creating a state without concurrency concerns, and it just has to display it back. Of course, you can add some fancy business rules, a shiny user interface, but the real problem is still subject to the «add more horsepower» paradigm, aka horizontal scalability.

An exchange falls into another scenario being that each order execution requires an interaction with the order book and will therefore produce a modified order book. In other words, it’s impossible to execute orders in parallel, because the execution of an order will affect the execution of the next one.

Single threaded, in memory business logic

So, given the speed and performance we want to deliver, given the concurrency issue, we had to rule out common case solutions. For instance, hitting a database for each and every order is not viable. No database is fast enough to achieve these numbers. Even an in-memory one, like redis, won’t suffice. Redis’ documentation explains the problem quite clearly : most of the CPU time is spent doing I/O work, instead of being applied for business logic. These problems have to be solved differently.

But what would we use a database for then? It’s just a way to share some state with other systems, and hopefully survive a crash of its application. Do we really need it on an exchange, or can we find other ways to solve these problems ? As a matter of fact, we can, but more about that later.

So, the first «rule» of the LGO design, is that every single order must be sent to single thread containing the whole state in its memory (RAM/CPU cache).

It may sounds strange, a single threaded business logic. Nowadays processors have a lot of cores. Still, the power of a single thread is often underestimated, as long as the cost of thread synchronisation. For instance, here are some numbers for a 4 CPU windows machine, to work on a simple counter:

Locked counter with 1 thread:                     46,895,000 ops/second
Locked counter with 4 threads:                    30,949,000 ops/second
CAS counter with 4 threads:                       34,595,000 ops/second
CAS counter with 1 thread:                       158,395,000 ops/second
Single threaded counter (with contrived boxing):  86,820,000 ops/second
Single threaded counter:                         529,416,000 ops/second

(courtesy of adaptive financial consulting)

This single threaded business logic must be fast. It must never be blocked with an I/O operation, so every CPU tick allocated to the thread can be used for business processing. Of course, we must also pay attention to the complexity of our business logic, and its memory consumption, but really we must first worry about our (lack of) I/O.

Replayability, aka determinism

Servers may fail, reboot, or just be updated, so we need a mechanism to ensure we are able to rebuild our in-memory state.

As a reminder, each order changes the state, and this new state is then used against the next order. This is called a pure function and one advantage of pure functions is that they can be fed with the same inputs and expect to obtain the same outputs.

The inputs used here are the orders from our LGO’s customers, but there are other kinds of inputs such as raising or lowering limits, fees level changes, etc. Being able to save all these inputs to disk, and replay them on startup would allow for the state to be rebuilt, but writing to disk is not allowed in the business thread. A first thread must do this logging activity before allowing the business thread to process it.

Given enough time and inputs, it may become unrealistic to replay everything from the ground up, so a snapshot mechanism is needed. A snapshot is basically a picture of the memory at a given time. Most strategies can be applied here. For now, it’s a visitor and a builder that will walk the object graph, and build a binary representation accordingly. This is the only part that violates the persistence ignorance principle, but we have some means to avoid this in the future.

If you are familiar with terms like event-sourcing, and CQRS, here, you could say that I’m describing a command-sourced architecture. We are writing commands in an append only log, hence ensuring the total order of inputs, and replaying it when necessary.

Multi threading

Given what I said earlier about multi-threading, It may sounds like a waste of time to use different threads to log commands, and to process them. You can actually apply some pipelining and smart batching to not only reduce the problem, but in fact gain some impressive performance improvements.

Pipelining explained

As you can see in this graph, having two steps to process a single command doesn’t imply to wait for both before starting to process another one. If a task is accumulating some lag, you can even try to apply batching to catch-up more quickly than by just treating elements one by one.

Once communicating threads are introduced, the traditional questions arise : what kind of data structure and synchronization mechanism should be used to pass messages? How is a flow control mechanism applied?

The trick is to realize that both threads work on the same data, so instead of copying data from one queue to another, they can work on the very same queue. To apply some flow control and to avoid the unbounded queue issues, a ring buffer can be effective.

To synchronize their work, threads only need to look at the other threads position in the ring buffer. A memory barrier can achieve that without implying any lock.

Thse are the principles behind the disruptor.

High Availability

Single threaded business logic may work to some degree, but multiple servers will be necessary. These will mitigate issues if the main server goes down.

In other words, instead of having just one server, and if it goes down, wait for it to be live again, another server must already be ready to kick in for redundancy.

As previously stated, the main server is writing commands to an append only log, before trying to process it. What we need is for other servers to write the very same command on their own append only log, before processing it. To ensure consistency, it could work if a command would only be processed if, and only if, enough servers were able to append the command. Network consensus on log writing is a very common problem in computer science. Fortunately for LGO, a new protocol named RAFT has recently been getting some traction, thanks to its relative simplicity compared to alternatives.

In a nutshell, one of LGO’s execution-engine servers becomes the leader after an election. Every time it receives a command, the server writes it to its log, sends it to other consensus members, and waits to have enough acknowledgement before processing it. A small amount of latency will be inevitable, but this is faster than relying on a database or a broker.

Network efficiency

In order to process millions of commands per second on a single thread, LGO obviously needs a way to send, and consume, at this pace.

A traditional brokered architecture may achieve this throughput, but only with parallelisation (several producers, several consumers). For instance, this article from Spotify shows how they reached the 2M messages per second with 29 producers.

On the other hand, LGO has one consumer and only a very limited number of producers (LGO gateways, but they will be the subject of another article).A smart network library is needed rather than a smart broker.

Being smart includes, but is not limited to :

  • Using UDP instead of TCP,
  • Using NACK for loss detection,
  • Using multicast to dispatch messages, if available on your network, or being able to have a fallback mechanism
  • Having a flow-control mechanism to apply back pressure to producers

Message encoding is also of the essence to extract most of the benefits of this kind of libraries. Binary encoding is the way to go in order to reduce message size and to speed up the serialization/deserialization process. Text based encoding may look easier at first glance, but in the end, you pay a heavy price parsing and producing it, for a readability gain that is not that obvious compared to a good binary encoder.

Wrapping it up

So, how we do put everything together ? To sum up, LGO needs :

  • A single threaded business logic
  • An efficient way to log messages to disk
  • An efficient way to pass messages between threads, without locking
  • An efficient implementation of RAFT, working on this common append only binary log
  • A way to snapshot our business state once in a while
  • An efficient network library
  • An efficient binary format

We’ve experimented several implementations last year. Being inspired by the lmax architecture, we began our work with the disruptor, but in the end, the network part was clearly the main issue.

Luckily, Martin Thompson and Todd Montgomery are working on an open source solution called Aeron which does everything required.

Aeron at its core is a smart and (very) efficient network library. It can operate in different scenarios : IPC, udp, udp multicast, simulating multicast (aka multi destination cast).

It also comes with both Aeron-archive, which is able to spy, record, replay data streams, and Aeron-cluster, which is a kind of raft consensus on steroids that can be used to implement personalized business logic, following all the principles described.

This presentation describes in more details some Aeron design principles.

We got some support from the guys at Adaptive who are experts in Aeron and Aeron-cluster and have built many of these kinds of systems before.

Regarding binary encoding, we naturally chose SBE, another library from Martin Thompson.

The next question would be how to extract informations out of the cluster, to send it to our customers or other systems. That’s a subject I’ll cover in another article.