Implementing Event Store in C#

What is this all about?

When looking for examples of EventStore in Event Sourcing implementation, you will probably find a lot of them, very detailed in theory, but lacking practical implementation. In this article my goal is to explain how to implement simple, yet robust, event store, test it and what are the pitfalls you should expect based on my own experience.

Event store presented here will be implemented in .NET Core, C# and MS SQL LocalDB server as a database.

Full project which contains working solutions is available on my github

Short introduction to Event Sourcing

I will not go into much detail about what Event Sourcing is since this topic is covered in many books and articles around the web.

Instead, I will focus on what is important when considering implementing an actual Event Store. But, nevertheless I will quickly summarize the main idea behind Event Sourcing and main challenges.

Aggregates are considered as a whole represented by the Aggregate Root. Conceptually an Aggregate is loaded and saved in its entirety. (Evans, 2001).

Event Sourcing is a software architecture pattern which states that state of the application should be persisted as sequence of Events. Main benefits of Events Sourcing are:

  • Auditability — since state is constructed from sequence of events it is possible to extract detailed log since the begging and up to current date
  • Projections/queries — it is possible to recreate state in different time frame since we have all the events from the beginning. For example it would be possible to check what was the bank account state one year ago. This also allows to generate some queries/reports we never thought of when starting the system, since all the data is in the Events.
  • Performance when inserting data — since EventStore is append only store with optimistic locking, we would expect to have much less deadlocks(or concurrency exceptions) happening. Also, no long running transactions which insert graph of data in multiple tables.
  • Flat database structure — we would usually use 1(or max 2 tables) as event store. In both cases it would be de-normalized form with some form of weak serialization field to store payload of the event like JSON for example. This means that when adding new fields to the database it is not needed to add them to any table — simply adjusting Event and adding required field will save it into JSON. This allows much more rapid write side development time

As with every pattern we must be aware of limitations/challenges. If used incorrectly, Event Sourcing will probably cause more harm than good. So, the main challenges we should keep in mind are:

  • Storage Growth — since data store is append only, table will grow indefinitely. This can be mitigated using snapshots or retention policy strategies.
  • Replaying Events — if the amount of events for constructing Aggregate is large it might lead to some performance issue when reconstructing the current state of the aggregate. This can be mitigated by using snapshots.
  • Events versioning and events handling — when changing existing event, or adding/deleting features, a code which projected old events MUST be in place, since it is used to reconstruct the state to the actual state. This means that if some feature is deprecated, its code cannot be removed since it is used to reconstruct the state at that time. This challenge is a bit harder to overcome but it can be mitigated.

Event Store considerations

Requirements for the event store are the following:

  • It is append only which means there is no update only insert
  • It should store aggregate state and allow fetching events for given aggregate in order they were saved.
  • It should use optimistic concurrency check: Optimistic concurrency check does not use locking on database level, therefore reducing risk of deadlocks. Instead, concurrency check is done when saving.

Optimistic concurrency check

When inserting into a database, where multiple clients exist, it can happen that 2 or more clients are trying to modify the same aggregate. Since we don’t use pessimistic concurrency check, there will be no lock and no waiting, but the check itself would be applied when trying to persists the actual data.

To make things clear let us consider an example:

Assume that there are two requests that want to modify the same aggregate named Aggregate. Implementing concurrency check should be done on database level.

  1. Both of them will fetch current version from the event store which is 1
  2. First aggregate is saved, second aggregate version is set to 2
  3. Second aggregate in this case will fail concurrency check since its version is 2 and expected version is 3. This indicates that data has been changed since it was read. In this case saving second aggregate should fail with Concurrency exception.

Example of optimistic check using Aggregate version

Database schema

Event Store will be one table, append only, which allows version tracking per aggregate and implement concurrency check on database level.

SQL for Event Store table with JSON field for data(this is where event payload is serialized):

CREATE TABLE [dbo].[EventStore](
    [Id] [uniqueidentifier] NOT NULL,
    [CreatedAt] [datetime2] NOT NULL,
    [Sequence] [int] IDENTITY(1,1) NOT NULL,
    [Version] [int] NOT NULL,
    [Name] [nvarchar](250) NOT NULL,
    [AggregateId] [nvarchar](250) NOT NULL,
    [Data] [nvarchar](max) NOT NULL,
    [Aggregate] [nvarchar](250) NOT NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO

SQL for EventStore example table

AggregateId and Version are two fields used for concurrency check. We create Unique index with these two fields. AggregateId is the id of our aggregate and can be whatever we want(therefore it is defined as string). Depending on the domain it can be GUID, int, combination of two, it doesn’t really matter.

Note that AggregateId is defined as nvarchar(250)

CREATE UNIQUE NONCLUSTERED INDEX [ConcurrencyCheckIndex] ON [dbo].[EventStore]
([Version] ASC, [AggregateId] ASC) WITH (
    PAD_INDEX = OFF, 
    STATISTICS_NORECOMPUTE = OFF, 
    SORT_IN_TEMPDB = OFF, 
    IGNORE_DUP_KEY = OFF, 
    DROP_EXISTING = OFF, ONLINE = OFF, 
    ALLOW_ROW_LOCKS = ON, 
    ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
GO

Unique index is enforced on Version and Aggregateld fields on the table on database level

Using this we ensure that the same AggregateId/Version combination is never saved. Instead a unique index check failed exception is thrown by the database. This is a transient error, which means that retry mechanism(see Retry Pattern) can(and should) be implemented on the client side.

Example project short introduction

Project is built using .NET Core 3.1.

Project architecture is layered with inversion of control.

Layers:

  • RestAPI — Web API which contains DTO and REST Controller definitions
  • Infrastructure — Factories, database model and repository implementations are defined here
  • Core — contains business logic as well as repository interface for aggregate. This project has no references to any other project or any other third party libraries( except Tactical DDD nuget which is pure C# code )

Other projects:

  • DbMigration — migrations projects used to initialize database
  • EventStoreTests — testing project, which demonstrates integration tests for event store

For Core business logic, there is only one aggregate named Person and two domain events:

  1. PersonCreated — this event is published when person is created
  2. AddressChanged — this event is published when address for given person has changed

How to set up and run the project can be found in readme file for github repository.

EventStore implementation

Let us take a look at the actual code that implements the event store. I will put only code snippets here, while fully functional project can be found on my github.

Interface for the EventStore can be defined as:

public interface IEventStore
    {
        Task SaveAsync(IEntityId aggregateId, 
            int originatingVersion, 
            IReadOnlyCollection<IDomainEvent> events,
            string aggregateName = "Aggregate Name");

        Task<IReadOnlyCollection<IDomainEvent>> LoadAsync(IEntityId aggregateRootId);
    }

The interface defines two methods.

  • SaveAsync method is used to persist Aggregate as stream of events. The aggregate itself is described as a collection of domain events, with a unique name.
  • LoadAsync method fetches aggregate, using AggregateId as param, from the event store and emits it as an array of events. This array can be used to load the aggregate.

IEntityId and IDomainEvent are both imported from a Tactical DDD nuget, which I strongly recommend for DDD in C#. Both of these are simple interfaces which mark EntityId class and DomainEvent class.

Let us analyze the actual implementation of these two methods:

Persisting Events

For persisting Aggregate into the EventStore we need 3 parameters:

  1. AggregateId — this is the id of the aggregate. In our case it will be class which implements IEntityId interface
  2. OriginatingVersion — this is the version of the aggregate that is being saved. Once saved version is incremented by one. As explained this is used in optimistic concurrency check
  3. IReadOnlyCollection<IDomainEvent> — this is the actual list of events that needs to be persisted into database. Each event is persisted as new row.

Implementing SaveAsync

Full implementation for this method can be found in EventStoryRepository.cs file.

First, insert query is created using provided parameters. For this, we use micro ORM Dapper which allows mapping parameters using @ notation.

var query = $@"INSERT INTO {EventStoreTableName} ({EventStoreListOfColumnsInsert}) VALUES (@Id,@CreatedAt,@Version,@Name,@AggregateId,@Data,@Aggregate);";
var listOfEvents = events.Select(ev => new
{
  Aggregate = aggregateName,
  ev.CreatedAt,
  Data = JsonConvert.SerializeObject(ev, Formatting.Indented, _jsonSerializerSettings),
  Id = Guid.NewGuid(),
  ev.GetType().Name,
  AggregateId = aggregateId.ToString(),
  Version = ++originatingVersion
});

Parameter name ( @Name for example) is matched with the object property and mapped. That is why in the next line, a list of anonymous objects is created with the same properties as defined in query.

Properties are:

  • Aggregate – this is a string name for an aggregate.
  • CreatedAt — is a date/time when the event has been created
  • Data — this is the event payload, serialized as JSON string. Complete event is serialized into JSON using provided jsonSettings.
  • Id — this can be any type of id. For this example I used Guid.
  • Name — this is the actual event name
  • AggregateId — this is the id of the aggregate. Using this field, events for given aggregate can be filtered.
  • Version — increments each time for given aggregate. Used in optimistic concurrency check.

List of events is then mapped to the actual query using:

await connection.ExecuteAsync(query, listOfEvents);

This line of code is using ExecuteAsync method from Dapper ORM which will map listOfEvents properties with the parameters defined in query string and create actual queries.

When persisted in this way, each event is persisted as new row in EventStore table, with the Data payload of the actual event. Here is how it looks like:

Each Event is saved as new row in database. Version changes per Aggregate and sequence is always incremental

When inspecting Data column this is the payload:

{
“$type”: “Core.Person.DomainEvents.PersonCreated, Core”,
“PersonId”: “d91f903f-3fb1–4b68–9a59-c1818c94f104”,
“FirstName”: “damir6”,
“LastName”: “bolic7”,
“CreatedAt”: “2020–02–20T07:24:54.0490305Z”
}

The payload of this event is actually mapped from PersonCreated event which is emitted when new person is created:

public class PersonCreated : DomainEvent
    {
        public string PersonId { get; }
        public string FirstName { get; }
        public string LastName { get; }

        public PersonCreated(
            string personId, 
            string firstName, 
            string lastName)
        {
            PersonId = personId;
            FirstName = firstName;
            LastName = lastName;
        }
    }

This domain event is published when new Person aggregate is created

DomainEvent class can be defined as follows:

public class DomainEvent : IDomainEvent
    {

        public DomainEvent()
        {
            CreatedAt = DateTime.UtcNow;
        }

        public DateTime CreatedAt { get; set; }
    }

Basically, CreatedAt is added based on IDomainEvent from DDD Tactical nuget.

Loading aggregate

Loading aggregate is done using AggregateId. For given aggregate all events are loaded and then Aggregate is constructed using those events which in turn results with the new Aggregate object in memory.

Aggregate is loaded using SQL query as list of events using EventStoreRepository.LoadAsync() method. The real magic happens when events are deserialized from JSON and converted to DomainEvent object:

var events = (await connection.QueryAsync<EventStoreDao>(query.ToString(), 
                                                         aggregateRootId != null ? 
                                                         new { AggregateId = aggregateRootId.ToString() } : null))
    .ToList();

 var domainEvents = events.Select(TransformEvent).Where(x => x != null).ToList().AsReadOnly();
 return domainEvents;

Selecting all events for the aggregate and transforming them to domain events

As we can see, events are fetched as EventStoreDao list of object, which are then converted to DomainEvent using TransformEvent method:

private IDomainEvent TransformEvent(EventStoreDao eventSelected)
 {
 var o = JsonConvert.DeserializeObject(eventSelected.Data, _jsonSerializerSettings);
 var evt = o as IDomainEvent;
 return evt;
 }

Here, actual payload of the event eventSelected.Data is deserialized into object which is then converted to IDomainEvent interface. Note that, should this conversion fail it will return null.

Once list of domain events is fetched Person aggregate can be constructed.

Testing

Testing of the event store is not hard.

For unit testing it has defined interface IEventStore which can be mocked.

For integration tests – in memory database can be used. In the example project, LocalDB is used for both testing and in actual project. This is located in the EventStoreIntegrationTests.cs file.

Summary

The goal of this blog is to show using concrete example how to implement simple Event Store in C#. For this we used some of the DDD concepts like Aggregate, Repository, Entity and ValueObject.

Example project, included with this blog, aims to be simple demonstration of the principles defined here.

Author: Damir Bolić