Our project is a Continuous Delivery Orchestrator. It coordinates, defines, and observes pipelines that take code through pull requests, builds, tests, and deployments across many different internal eBay systems.
There are three major components involved, and we have picked appropriate technologies for each:
-
UI—Our UI uses AngularJS (moving to React with Redux).
-
Pipeline Definition Service (PDS)—This service defines and sets up the pipelines that need to be run. It is written with Spring Boot mostly in Java, though we are moving it over to Kotlin as we go.
-
Pipeline Execution Service (PES)—This is the service that is responsible for making the pipelines run, tracking their progress and reporting their state to GitHub via email and through our UI. It is written in Scala using the Akka actor framework and the Akka-Http web framework using MongoDB for persistence (via the ReactiveMongo library). This is where we are using Event Sourcing.
While we alluded to many reasons for using Event Sourcing in our first article, there are three primary reasons we decided to use it for PES:
-
Concurrency—Before Event Sourcing, we had several race conditions where different parts of a pipeline would send us events at nearly exactly the same time.
-
Debugging/Traceability—We are a small team supporting a large number of pipelines and being able to quickly figure out what has happened when a pipeline doesn’t behave as expected is extremely important.
-
Clarity and Correctness—This kind of orchestrator can get complicated very quickly, and because it is a critical tool for the company and without dedicated QA, Ops, or support teams, we need to have the highest quality and the easiest to understand code.
We would like to expand a bit on this last point, as we consider it to be our strongest reason for using Event Sourcing.
The importance of code you can understand
One of the authors of this article has four kids. No, seriously, four of them. While of course they are amazing and beautiful and all that, they are also, lovingly, little demons who will take everything you think you need. Private time, personal space and, of course, sleep are all things of the past. What does this have to do with programming? It has made one thing very clear: you will need to come and work on your code when you are sleepy or distracted or distraught or hungover. And when that inevitably happens (such as the entire first year of a child’s life), if your code is hard to think about, reason through, and test, you and your company are in trouble. The availability of your website, the security of your customer data, and the stability of your app should NOT hinge on whether or not your 4 year old wet the bed last night.
This is why, for any reasonably complex system that involves time and state, event sourcing is the way to go. When you try to think about how to process an event, how it changes your state model, how other bits of code might also be changing that state, and what needs to happen because of that change all in the same block of code, you are going to screw it up. And that’s on a good day. The day after the baby cried all night and your 4 year old had nightmares? All bets are off.
In a well-designed event-sourced model, each of those things are dealt with separately, or not at all. There is code that processes things coming from outside your service, makes sense of these facts, and then records them accurately in your event stream. There is the code that processes those recorded events and decides how each (in the order they were inserted) changes your eventual read model. And then there is the code that decides what to do if those events change the state of your read model in particular ways. Each of these components is easy to imagine, easy to talk about, easy to change, and easy to test.
Now let’s dig in a bit on each of these components and the patterns we have used to implement them.
Handling incoming events
ECD works with systems like Jenkins and GitHub to trigger pipelines. For instance, we use GitHub Webhooks to notify us when a team opens up a Pull Request or pushes changes to a branch.
When these events show up, PES needs to figure what, if anything, they mean. For example, when we get an event from Github when someone pushes to a repository, PES will need to query PDS to see if this specific repository and branch has a pipeline specified. If there is indeed a pipeline for this Push, we create our first event:
{ "_id" : ObjectId("5b1f0c7b2e8e37072e9480c0"), "runId" : "17428f65-19ac-4066-b762-582a834bab1b", "data" : { "pipeline" : { "pipelineId" : NumberInt(2194), ... }, "pushData" : { "ref" : "refs/heads/master", "before" : "7f7b328256a1690125614b66de2c79742f9ba0ce", ... }, "runId" : "17428f65-19ac-4066-b762-582a834bab1b", "type" : "PushRunCreatedData" }, "timestamp" : ISODate("2018-06-11T16:58:20.937-0700"), "processed" : true, "eventId" : NumberInt(0), "description" : "Delivery Pipeline 2194 created by commit 7f7b328256a1690125614b66de2c79742f9ba0ce" }
From a testing and “thinking about it” perspective, this makes writing code to process incoming events quite easy. The code must simply take certain inputs and use them to produce the correct events. In essence, we are asking the question “Given that this external event happens, what does it MEAN to our system?” The event is our answer.
Let’s break this event down a bit.
Our RunEvent class looks like this:
case class RunEvent(runId: String, data: RunEventData, eventId: Option[Int] = None, timestamp: DateTime = DateTime.now(DateTimeZone.UTC), processed: Boolean = false, var description: Option[String] = None)
runId is a GUID that we create for this new run. Every subsequent event for this run will need to have the same ID.
data is the specific data for a RunEvent. All of our different events have different implementations of RunEventData.
timestamp is usually the time that the event was created.
processed is the flag that the event has been used to trigger actions. We will cover this later.
description is for informational purposes only to make it easy to read through our events and understand them.
eventId is the trickiest here. It is a sequential ID for each event for this run. It starts at 0 and goes up by one for each event added.
However, since we are using mongoDB, this is not a great fit. MongoDB does not have an auto-increment field built in. If we were greatly concerned with write performance, we might consider a database that was better optimized for this, such as Cassandra. However, while there are thousands of projects going through ECD every day, the scale is still relatively small, so we picked MongoDB for the flexibility of our event definitions.
To create something like auto-increment for mongoDB, we settled on using the “optimistic loop” solution. In our Scala code, that logic looks something like this:
case class InsertRunEvent(runEvent: RunEvent, id: String = "insertRunEvent") extends RunEventPersistence[WriteResult] with RunEventHandlers { val duplicateKeyErrorCode = 11000 override def execute[_](collections: Collections): Future[WriteResult] = { collections.runEvent.find(BSONDocument(“runId” -> runEvent.runId)) .sort(BSONDocument(EventIdField -> -1)) .one[RunEvent].flatMap { lastEvent => val highestId = lastEvent.flatMap(_.eventId).map(_ + 1).getOrElse(0) insert(collections, highestId) } } private def insert[_](collections: Collections, count:Int):Future[WriteResult] = { collections.runEvent.insert[RunEvent](runEvent.copy(eventId = Some(count))) recoverWith { case e:DatabaseException if e.code.contains(duplicateKeyErrorCode) => insert(collections, count + 1) } } }
As a summary, we query for this highest existing eventId already in our DB, then add 1. Since our DB has keys on runId and eventId, if we somehow insert multiple events with the same eventId, we will get a duplicateKeyError from MongoDB, at which point we add 1 again and retry the insert.
Calculating the View Model
You may have noticed that the “processed” flag defaults to false. This allows us to use a FindAndModify query against our DB to pull off events that have not been processed, flipping the processed field as we do so:
db.runEvent.findAndModify({ query: { processed: false}, sort: { timestamp: 1 }, update: { processed: true} });
We are running this query every second across our instances. This way each event will only be processed exactly once, and we try to process them from oldest to newest. Note that we are not guaranteed to act on each event in the order it was created, but we will show you that it does not matter if we process events out of order.
When we get a new unprocessed event, we query for all the events before it and serialize everything into actual classes in memory. For instance, the event we created when our run started is defined in Scala like this:
/** A Github Push has triggered a Pipeline Run * @param pipeline The pipeline that will be run * @param pushData The Push event that started the run * @param runId The id of the new Run. */ case class PushRunCreatedData(pipeline: Pipeline, pushData: GithubPushEvent, runId:String) extends RunEventData { /** Creates a default run based on the Pipeline, sets "activeRun" to false and copies in the data from the * PushEvent to referenceData * @param view The PipelineRun immediately before this event * @param events The events (including this one) that happened before this one * @return The PipelineRun, modified to reflect changes from this event */ override def process(view: PipelineRun, events: List[RunEvent]): PipelineRun = { pipeline.getRun(runId, events.last.timestamp).copy( activeRun = false, referenceData = Some(ReferenceData( gitRepoUrl = Some(pushData.repository.html_url), gitCommitId = Some(pushData.after), gitBranch = Some(pushData.branch), previousCommitId = Some(pushData.before))) ) } override def getDescription(): String = s"Delivery Pipeline ${pipeline.pipelineId} created by commit ${pushData.before}" }
Notice the process method. It takes in the PipelineRun that exists before this event is processed (the initial state is defined as PipelineRun.empty), and creates a modified copy based on the data for the event. For this event, that means calculating an initial state based on the Pipeline definition and then adding information about the GitHub Push that kicked us off.
So we take this initial state and then pass it through the process call of each event in our list, then take that result and pass it on to the NEXT event, and so on.
In non-functional programming, the code would look like this:
def process(events:List[RunEvent]): PipelineRun = { var run = PipelineRun.empty for (event
Scala, however, is a functional language, and it would be far better to use foldLeft to produce the same result:
def process(events:List[RunEvent]): PipelineRun = { events.foldLeft(PipelineRun.empty) { (view,event) => event.process(view, events)} }
Here again we have a contained problem. Each event impacts our view model in a specific way, and when we write the code we simply need to write tests asserting that the view is modified correctly, given various values we feed to the event. Easy to test; easy to think about.
When we get the calculated PipelineRun for a specific RunEvent, we do two things:
-
Save a copy of the view in our runView collection
-
Act on the state changes that this latest event caused
Persisting the runView
We are using here a pattern called CQRS, or Command Query Responsibility Segregation. Since we have to calculate the view from our events, it becomes very hard and expensive to try to query against views without loading all the events and processing every one of them whenever we run a query.
Instead, we save the result of calculating the runView into our database. This does mean that our read data (runViews) has eventual consistency with our Write database (our events). In practice, there is a delay of less than a second.
We mentioned that we may process our events out of order. Obviously this would mean that an older version of our view (calculated from an older event) might write over a newer version in our runView database. Since that is unacceptable, we actually have a field in our runView called “eventVersion,” which is the eventId of the event we used to create the view. Our code for inserting a runView will only insert if the runView has a higher or equal eventId to the one already in the database:
case class UpsertRunView(run: PipelineRun, id: String = "upsertRunView") extends RunViewPersistence[UpdateWriteResult] { override def execute[_](collections: Collections) = { val selector = BSONDocument(idField -> run.id, eventIdField -> BSONDocument(lessThanOrEquals -> run.eventId)) val modifier = PipelineRunHandler.write(run) collections.runView.update(selector, modifier, upsert = true) } }
Acting on state changes
At the same time that we persist our new view, we also execute code based on the how the latest event changed the view.
We do this by calculating the view for the event right before the current one and saving that view as “previous,” then calculating including the current event and packaging them up as a RunViewChange. We pass this RunViewChange onto our RunEventRouter, which takes charge checking for any change subscriptions that are looking for this change and then launching a new Actor to receive and act on the change:
override def receive: Receive = { case change: RunViewChange => changeSubscriptions.filter(_.matchesChangeType(change)).foreach { subscription => createActorOfClass(subscription.actorClass) ! change } }
This filters the entire list of change subscriptions to ones that care about this specific change, then creates an actor for the matching class and sends it the change itself to act on.
We define our change subscriptions like this:
type ChangeType = RunViewChange => Boolean case class Subscription(matchesChangeType: ChangeType, actorClass: Class[_], description: String) val runPaused: ChangeType = runViewChange => runViewChange.changedTo(_.status == PipelineRunStatus.PAUSED) val changeSubscriptions: List[Subscription] = List( Subscription(runPaused, classOf[PauseNotifier], "Run Paused"))
The “runPaused” value is a function that takes a RunViewChange and returns a Boolean, true if the run view has changed to a Paused status. This will create a PauseNotifier actor that will then send out emails to the team to let them know their Pipeline is paused and awaiting their action, something like this:
class PauseNotifier extends ConfigSetActor { override def receive: Receive = { case RunViewChange(_, run) => createEmailActor() ! RunPaused(run) stopWhenFinished() } }
The email is sent by yet another actor (important to do since sending email is synchronous and must be carefully managed) that the PauseNotifier creates and waits for it to finish.
Once again, this is easy to think about. We define a Subscription that describes what changes in the runView we care about. We create an actor that should act on that change, either by creating more actors, calling external apis or creating more events. At no point during any of this do we need to rack our overburdened brains thinking about concurrency or interactions or scale. Event Sourcing and our implementation takes care of those details.
We haven’t touched a lot on how we use the Akka Actor framework, which is an excellent library, and we have used it to great effect in this project. Perhaps another article is in order.
Creating a system that can scale up and out and enable parallel execution in a loosely coupled, non-blocking way has enabled us to handle over 2.2 million Run Events, resulting in almost 200,000 Run Views. This architecture was crucial for building a solution that is both performant and intuitive.
If you’re using Event Sourcing, CQRS, or the Akka Actor framework in your architecture we would love to hear from you. If you intend to use this as a guide to building out your own Event Sourced system, we’re excited to see what you build!