My current employer wanted a new application to replace a collection of Excel spreadsheets and Access databases. We made the decision early on to use event sourcing, as we were not sure of all the features it would need, and wanted to be able to add additional features easily. This article describes the process I went through searching for solutions, and then designing an event-sourcing service in Node.js and Redis.

What is event-sourcing?

Event-sourcing is the concept of storing all of an application’s data as a time-ordered sequence of events. Once recorded, these events may never change, be removed, or be re-ordered. You can then construct a summary of some or all of the application’s state (usually called a “projection”) from these events.

Storing our application’s data in this manner had several advantages for us:

  • We were not yet sure (and still aren’t) exactly what data will be needed in the final project. With event-sourcing, the application records everything that happens as it happens. If at any point we need some data, I can simply add another projection.
  • We can maintain a history of certain data without any extra work. This is highly valuable for auditing or business purposes.
  • If we ever need some data to be structured differently, or the algorithm for calculating something changes, I can simply change the code for that projection and re-run it from the beginning. Data migrations are never an issue.
  • Event-sourcing allows me to clearly separate code responsible for querying data with code responsible for recording events. This makes the code simpler to understand.

An event-sourcing library or service would be responsible for recording events as they happen, maintaining the history of events in a database, and sending the events to projections as needed.

Desired features & existing implementations

We had a few requirements:

  • Lightweight: No giant Java services that require multiple gigabytes of memory to run, as our servers are low-powered virtual machines.
  • Simple: It must be relatively easy to learn how it works and what it is doing with the data.
  • No vendor lock-in: It must be possible to move to a different solution, or build our own, at any time.
  • Full control: Our application must be in full control, and external services should not be able to do anything without our application telling it to. Although there is nothing wrong with this, it it is harder to understand when multiple applications/services are running code.

I looked at the following possible solutions:

  • Event Store: I found the design of this Javascript service somewhat non-intuitive. Events are saved by simply sending an HTTP request to the service which we liked. The event log is read as an Atom feed which I did not like--it does work but it requires additional application code to read the correct feed. We did not use this primarily because it was still very new at the time, and I was not confident it would remain supported long enough for us. However, it has been improving greatly and we would most likely strongly consider it if we were making this decision today.
  • Apache Samza: This Java service is probably the most well-known solution for event sourcing. It is robust, well documented, and has a substantial feature set. However, it requires your projections to be written in Java and run directly by Samza, which was unacceptable for us. I also did not see a clear way to move to a different event sourcing solution without rewriting every projection and possibly running a migration on the event log itself.
  • Apache Kafka: Although not strictly an event sourcing service, this service is extremely robust and well-known. The Java requirement was a slight turn-off as it generally requires substantial system resources, but that was not enough to prevent us from using it. Similar to Samza, I did not see a clear way to move to another service without running a migration on the event log, but projections would not necessarily need rewritten. If we were making this decision today, we would probably chose Kafka, but at the time I was unable to find a Node.js client for it, and was generally not happy with the state of its ecosystem.

I also found a few other solutions that were either unfinished, undocumented, or appeared dead, and thus were not considered seriously. Eventually I decided to build an event-sourcing service using Node.js to do exactly what we wanted. It could have been written in any language though--I only chose Node.js as the rest of the application was already using it.

The rest of this article describes how I went about designing and building this service.

Redis event log

The first design decision I made was to store the event log as a Redis list, as they are extremely fast, simple, and with the proper configuration they can be reliable. Each event is a single element in the list, saved as a JSON object with three keys:

  • time is the Unix timestamp at which the event was created.
  • event is a string identifying the name of the event, such as “blogPostEdited” or “saleShipped”.
  • data is a JSON object with any data associated with the event.

A numerical index or ID is not saved for each event as Redis provides those as part of it being a list.

The event log is still saved in this exact format in eventduck.

Saving Events

The next step was to write a module which would save an event to the event log with a given event name and data. This was simple enough to implement:

'use strict';

var Promise = require('promise');
var log = require('logule').init(module, 'saveEvent');

var redisClient = require('./redisClient');

function saveEvent(event, data) {
    return new Promise((resolve, reject) => {
        var message = {
            time: Date.now(),
            event: event,
            data: data
        };
        redisClient.rpush('events', JSON.stringify(message), (err, length) => {
            if (err) {
                log.error(err);
                return reject(err);
            }
            message.index = length - 1;
            log.info('Saved event ' + message.index + ' (' + message.event + ')');
            resolve(message);
        });
    });
}

Projections

Now that we can save events, we need a way to get them to our projections. To do this, I built an EventStreamer module. Unlike with the Redis list or the saveEvent module, the design of this one changed quite a few times over the course of development.

In this first attempt, I poll the Redis list, starting at the first event, and waiting for all projections to finish processing it before moving on to the next. This was not a good idea, as some projections might take a long time to run, wait for another service, or not need the event at all. Thus, this is not how eventduck handles projections, but it worked for an initial prototype.

Initially, I dealt with a single event at a time, but this became slow with even a moderate number of events, so I started fetching batches of events and put them into a queue using the async library. This implementation worked quite well, and was used for about two months.

This implementation ended up in three pieces, which are described below.

registerProjection

The registerProjection function, as its name suggests, is used to register a callback for a specific event. It takes three arguments:

  • event - the name of the event this projection wants to receive. This should match the event argument given to the saveEvent method.
  • name - an identifier for this projection for logging purposes.
  • handler - the function to call for this event, which should either throw, return, or return a Promise.

An individual projection can be registered multiple times so it can handle multiple events, and one type of event can be registered to multiple projections. This is done by simply calling registerProjection for each event and projection combination that we care about.

All this function does is add the handler to an array for that event, with some error checks.

'use strict';

var Promise = require('promise');
var log = require('logule').init(module, 'eventStreamer');

var redisClient = require('./redisClient');

/*
    List of listeners
    structure: {
        eventName: [
            {index: 4, handler: function}
        ]
    }
*/
var listeners = {};

function registerProjection(event, name, handler) {
    var alreadyRegistered = false;
    if (typeof listeners[event] === 'undefined') {
        listeners[event] = [];
    }
    listeners[event].forEach(listener => {
        if (listener.handler === handler) {
            alreadyRegistered = true;
        }
    });
    if (alreadyRegistered) {
        log.warn('You are registering projection ' + name + ' for event ' + ' ' + event + ' multiple times; ignoring all but the first one');
        return;
    }

    listeners[event].push({
        name: name,
        index: -1,
        handler: handler
    });

    log.info('Registered projection ' + name + ' for event ' + event);
}

projectionQueue

The projectionQueue is an async queue which calls each projection handler registered to an event for each event in the event log. After all projection handlers have completed successfully, it then moves on to the next event.

(...snip...)

var async = require('async');

var projectionQueue = async.queue(function(message, next) {
    if (typeof listeners[message.event] === 'undefined') {
        log.warn('No projections for event ' + message.index + ' (' + message.event + ')');
        next();
        return;
    }
    var handlers = [];
    var promises = [];
    listeners[message.event].forEach(function(listener) {
        let handlerPromise = Promise.resolve(listener.handler(message)).then(function() {
            log.info('Ran projection ' + listener.name + ' for event ' + message.index + ' (' + message.event + ')');
        });
        promises.push(handlerPromise);
    });
    Promise.all(promises).then(function() {
        log.info('Ran projections for event ' + message.index + ' (' + message.event + ')');
        next();
    }).done();
});

processNew

The processNew function checks for new events in the Redis list, and adds them to the queue. It then calls itself after a delay using setTimeout.

(...snip...)

var INTERVAL = 500;
var latestQueriedIndex = -1;

function processNew() {
    redisClient.llen('events', function(err, eventCount) {
        if (err) {
            throw err;
        }
        var newCount = eventCount - latestQueriedIndex - 1;
        if (newCount > 100) {
            // Only fetch 100 at a time for rate limiting
            newCount = 100;
            eventCount = latestQueriedIndex + 100;
        }

        // If there are new events, fetch them and add them to the queue
        // TODO: Should also check to see if the queue isn't very long as well, so we don't
        // take up a large amount of memory.
        if (newCount) {
            var first = 1 * latestQueriedIndex + 1;
            redisClient.lrange('events', first, eventCount, function(err1, events) {
                if (err1) {
                    throw err;
                }

                events.forEach(function(message) {
                    latestQueriedIndex++;
                    message = JSON.parse(message);
                    message.index = latestQueriedIndex;
                    projectionQueue.push(message);
                });

                log.info('Processed ' + newCount + ' events (' + first + ' - ' + latestQueriedIndex + ')');
            });
        } else {
            // No new events
        }
        setTimeout(processNew, INTERVAL);

    });
}

Problems with this implementation

At this point, I had a working implementation of event sourcing. However, it had some problems:

  • High memory usage. Every event that had to be sent to a projection was kept in memory, leading to unnecessarily high memory usage. However, this didn’t bother me at first, since there weren’t so many events in our database yet that the server actually ran out of memory.
  • Slow. Only one event is processed at a time, and it waits for all projections to finish before moving onto the next. This was only a problem during the initial application startup.
  • Un-modular. These were just two files in our application that solve a very generic problem. It would be better to have this as either a separate npm module, or even a standalone service.

Despite these problems, it did work. Mission accomplished… for a while.

Standalone event-sourcing service

A few months later, I decided it was time to revisit this, and turn it into a standalone service.

I knew I wanted it to communicate with applications using an HTTP API, so the first thing I did was write an API reference. Then, I made a new project, set up a basic Express server, and started building.

I implemented it one API endpoint at a time, starting with some tests that describe the API reference, copying the existing code into an Express route, and then modifying it until it passed the test. I also added some additional functionality to it, such as support for multiple applications and a configuration file.

The result: eventduck, an easy-to-use event-sourcing service with an HTTP API backed by Redis. It will be published soon on npm and GitHub, then you can use it for your own projects, look at the full source code on GitHub, or read its documentation.