Event sourcing
What is Event Sourcing?
Before diving into code, let's understand what event sourcing is and why it matters.
Traditional approach: Store only the current state.
Account: { id: "123", balance: 500 }When the balance changes, you overwrite the old value.
Event sourcing approach: Store the sequence of events that led to the current state.
Events for Account "123":
1. AccountOpened
2. Deposited(1000)
3. Withdrawn(300)
4. Deposited(100)
5. Withdrawn(300)
Current balance: 1000 - 300 + 100 - 300 = 500
Why event sourcing?
- Complete audit trail: You know exactly what happened and when
- Time travel: Rebuild state at any point in history
- Debugging: See the exact sequence of events that led to a bug
- Analytics: Derive new insights from historical events
- Flexibility: Build multiple views (projections) from the same events
Imports
import edomata.core.*
import edomata.syntax.all.* // for convenient extension methods
Domain layer
Decision
It all starts with decisions!
What is a Decision? Think of it like a committee making a choice. When you ask them to approve something, they can:
- Accept it (and record what they decided, e.g., "approved the budget")
- Reject it (and say why, e.g., "insufficient funds")
- Stay undecided (need more information)
Decision models programs that decide in an event-driven context, these programs are pure and can:
- accept events
- reject with errors
- stay indecisive!
Examples:
val d1 = Decision(1)
// d1: Decision[Nothing, Nothing, Int] = InDecisive(1)
val d2 = Decision.accept("Missile Launched!")
// d2: Decision[Nothing, String, Unit] = Accepted(Chain(Missile Launched!),())
val d3 = Decision.reject("No remained missiles to launch!")
// d3: Decision[String, Nothing, Nothing] = Rejected(Chain(No remained missiles to launch!))
Reading the code above:
Decision(1)creates an indecisive decision that just returns the value1Decision.accept("...")accepts with an event (the string is the event here)Decision.reject("...")rejects with an error message
You can also use the following syntax if you import syntax modules
import edomata.syntax.all.*
1.asDecision
// res0: Decision[Nothing, Nothing, Int] = InDecisive(1)
"Missile Launched!".accept
// res1: Decision[Nothing, String, Unit] = Accepted(Chain(Missile Launched!),())
"No remained missiles to launch!".reject
// res2: Decision[String, Nothing, Nothing] = Rejected(Chain(No remained missiles to launch!))
Scala syntax note:
1.asDecisionis an extension method - it adds the.asDecisionmethod to any value. The imports enable this convenient syntax.
Decisions are composable - you can chain them together:
val d4 = d1.map(_ * 2)
// d4: Decision[Nothing, Nothing, Int] = InDecisive(2)
val d5 = d2 >> d1
// d5: Decision[Nothing, String, Int] = Accepted(Chain(Missile Launched!),1)
val d6 = d5 >> d3
// d6: Decision[String, String, Nothing] = Rejected(Chain(No remained missiles to launch!))
What does
>>mean? It's a sequencing operator - do the left side, then do the right side. If either fails (rejects), the whole thing fails.
You can also use for-comprehension:
val d7 = for {
i <- Decision.pure(1)
_ <- Decision.accept("A") // accepting one event
_ <- Decision.accept("B", "C") // accepting several events
j <- Decision.acceptReturn(i * 2)("D", "E") // accepting several events and returning a value
} yield i + j
// d7: Decision[Nothing, String, Int] = Accepted(Chain(A, B, C, D, E),3)
Scala syntax: for-comprehension This is Scala's way of chaining operations that might fail. It's similar to:
Decision.pure(1).flatMap { i =>
Decision.accept("A").flatMap { _ =>
Decision.accept("B", "C").flatMap { _ =>
// ... and so on
}
}
}If any step rejects, the whole chain stops and returns the rejection.
or in presence of cats syntax:
import cats.implicits.*
val d8 = (d1, d7).mapN(_ + _)
// d8: Decision[Nothing, String, Int] = Accepted(Chain(A, B, C, D, E),4)
val d9 = List.range(1, 5).traverse(Decision.accept(_))
// d9: Decision[Nothing, Int, List[Unit]] = Accepted(Chain(1, 2, 3, 4),List((), (), (), ()))
val d10 = Decision(List.range(1, 5)).sequence[List, Int]
// d10: List[Decision[Nothing, Nothing, Int]] = List(InDecisive(1), InDecisive(2), InDecisive(3), InDecisive(4))
What is
cats? Cats is a library for functional programming in Scala. It provides useful abstractions likemapN(combine multiple values) andtraverse(apply an operation to each element in a collection).
Modeling
Let's use what we've learned so far to create an overly simplified model of a bank account. Assume we have the following business requirements:
- We must be able to open an account, if we did not have used that account name before
- We can deposit/withdraw some amount of assets to an open account
- We must be able to close an account, if it is settled (meaning its balance is zero)
We'll start by modeling domain events first, that we have from event storming or other kinds of design practices:
What is Event Storming? A workshop technique where you use sticky notes to discover domain events by asking "what happens in the system?" It's a great way to understand your domain before coding.
Scala syntax: enum In Scala 3,
enumdefines a set of possible values (like a sealed trait hierarchy but more concise):enum Color { case Red, Green, Blue }
// Color can only be Red, Green, or Blue
enum Event {
case Opened
case Deposited(amount: BigDecimal)
case Withdrawn(amount: BigDecimal)
case Closed
}
Why past tense? Events represent facts that already happened. "Deposited" not "Deposit" - because by the time we record it, the deposit has occurred.
We'll continue with modeling rejection scenarios that also came from event storming:
enum Rejection {
case ExistingAccount
case NoSuchAccount
case InsufficientBalance
case NotSettled
case AlreadyClosed
case BadRequest
}
What are Rejections? The reasons why a command might fail. These aren't exceptions - they're expected business outcomes. "Sorry, you can't withdraw $500 when you only have $100" is a valid rejection, not an error.
Now we must model an aggregate root:
enum Account {
case New
case Open(balance: BigDecimal)
case Close
}
Why three states? An account has a lifecycle:
New: Not yet opened (the "initial state" every aggregate starts in)Open(balance): Active account with a balanceClose: Account has been closedThis is a state machine: the account can only transition between states in specific ways.
Now we can use Decision to write our domain logic:
import edomata.core.*
import edomata.syntax.all.*
import cats.implicits.*
import cats.data.ValidatedNec
enum Account {
case New
case Open(balance: BigDecimal)
case Close
def open : Decision[Rejection, Event, Open] = this.decide { // 1
case New => Decision.accept(Event.Opened)
case _ => Decision.reject(Rejection.ExistingAccount)
}.validate(_.mustBeOpen) // 2
def close : Decision[Rejection, Event, Account] = this.perform(mustBeOpen.toDecision.flatMap { account => // 3, 4
if account.balance == 0 then Event.Closed.accept
else Decision.reject(Rejection.NotSettled)
})
def withdraw(amount: BigDecimal): Decision[Rejection, Event, Open] = this.perform(mustBeOpen.toDecision.flatMap { account =>
if account.balance >= amount && amount > 0
then Decision.accept(Event.Withdrawn(amount))
else Decision.reject(Rejection.InsufficientBalance)
// We can model rejections to have values, which helps a lot for showing error messages, but it's out of scope for this document
}).validate(_.mustBeOpen)
def deposit(amount: BigDecimal): Decision[Rejection, Event, Open] = this.perform(mustBeOpen.toDecision.flatMap { account =>
if amount > 0 then Decision.accept(Event.Deposited(amount))
else Decision.reject(Rejection.BadRequest)
}).validate(_.mustBeOpen)
private def mustBeOpen : ValidatedNec[Rejection, Open] = this match { // 5
case o@Open(_) => o.validNec
case New => Rejection.NoSuchAccount.invalidNec
case Close => Rejection.AlreadyClosed.invalidNec
}
}
Reading the type signature:
Decision[Rejection, Event, Open]means:
- Can reject with a
Rejection- Can accept with
Event(s)- Returns an
Openaccount state on success
What is
ValidatedNec? "Validated Non-Empty Chain" - a way to collect multiple errors instead of stopping at the first one. If three things are wrong, you get all three error messages, not just the first.
.decideis an extension method from Edomata syntax that helps with deciding and transitioning to a new state which I'll describe below.validateensures that after applying decision events, we will end up in anOpenstate, and will returnOpeninstead of simplyAccount.performlike.decidetakes a decision as an argument, and runs the decision on the current state to return new state, it is basically a helper for folding to let you reusetransitionand not repeating yourselfDecisioncan be converted.toValidated,.toOptionor.toEither.- as functional data structures are composable, you can extract common use cases like validations.
But you might say, domain logic is not just deciding, you must perform the actions you decided; and you are right, let's go to that part:
DomainModel
In order to complete modeling we must also define transitions (the famous event sourcing fold!) and our starting point. For doing so we use DomainModel, which is a helper class that creates the required stuff for the next steps:
What is a "fold"? In event sourcing, rebuilding state from events is like folding a list:
initial state + event1 -> state1
state1 + event2 -> state2
state2 + event3 -> current stateEach event transforms the previous state into the next state.
object Account extends DomainModel[Account, Event, Rejection] {
def initial = New // 1
def transition = { // 2
case Event.Opened => _ => Open(0).validNec
case Event.Withdrawn(b) => _.mustBeOpen.map(s => s.copy(balance = s.balance - b)) // 3
case Event.Deposited(b) => _.mustBeOpen.map(s => s.copy(balance = s.balance + b))
case Event.Closed => _=> Close.validNec
}
}
Understanding
DomainModel: You're defining:
- What type is your state (
Account)- What type are your events (
Event)- What type are your rejections (
Rejection)Then you implement:
initial: The starting state for new aggregatestransition: How each event type transforms the state
initialis the initial state of your domain model which is the first start in timeline changes, you can assume that it's likeNoneinOption[T]. beside it being required for our tutorial purposes, defining an initial state has the following advantages:
- Model consistency; you are always working with your domain model, not with
Option[YourModel] - It enables to add new default values in the future, where your model evolves, and reduces the number of times when an upcasting or migration is required.
transitionis the fold function that given an event, tells how to go to the next state. it's a function inE => S => ValidatedNec[R, S]. if an event is not possible to apply, we call it a conflict; and it might be due to programming errors, storage manipulation, or changing your transition to a conflicting logic with a history of already created timelines.
Reading the transition type:
E => S => ValidatedNec[R, S]means:
- Given an event
E- And a current state
S- Return either errors
Ror a new stateS
Tip
Writing transition without using a library for optics and lenses would be cumbersome and not expressive enough for domain modeling; I suggest you to use the great monocle library which provides neat macros for lenses, and you don't even need to think about it twice after using it for the first time.
What are lenses? A way to update deeply nested data structures without boilerplate. Instead of
s.copy(address = s.address.copy(city = "Paris")), you can writeAddress.city.replace("Paris")(s).
Thinking further
Writing event-driven (and specifically event-sourced) domain models deal with explicitly modeling a timeline of facts, and one thing that you will face as soon you start modeling your first domain in such a context, is that not all timelines are valid as you are not always in the control of what you'll read from a journal; for example, in the example above, we might receive a Withdrawn event on
NeworClosedstate (of course that does not happen unless there is a programming error or manipulated data, but we are speaking about possibility here), and you can't even find a balance to decrement! in non-functional settings where states are not modeled as ADTs, this problem is somewhat implicit, as it is hidden from eyes, but in modeling with ADTs, compiler slaps you in the face! just kidding :D, explicit modeling allows you to be more expressive and find logical problems very easily.
Info
If you have a programming error in your folding which can cause conflicts, Edomata has your back! it won't let your program to reach a result where it can be persisted and corrupting data; we'll see this in the next chapters.
As simple as that!
Testing domain model
As everything is pure and all the logic is implemented as programs that are values, you can easily test everything:
Account.New.open
// res4: Decision[Rejection, Event, Open] = Accepted(Chain(Opened),Open(0))
Account.Open(10).deposit(2)
// res5: Decision[Rejection, Event, Open] = Accepted(Chain(Deposited(2)),Open(12))
Account.Open(5).close
// res6: Decision[Rejection, Event, Account] = Rejected(Chain(NotSettled))
Account.New.open.flatMap(_.close)
// res7: Decision[Rejection, Event, Account] = Accepted(Chain(Opened, Closed),Close)
Why is testing so easy? Because the domain logic is pure:
- No database connections to mock
- No external services to stub
- Just call the function and check the result
This is one of the biggest benefits of separating domain logic from side effects.
Service layer
Domain models are pure state machines, in isolation, in order to create a complete service you need a way to:
- store and load models
- interact with them
- possibly perform some side effects
You can do all that without Edomata, as everything in Edomata is just pure data and you can use it however you like; but here we'll focus on what Edomata has to offer.
Edomata is designed around the idea of event-driven state machines, and it's not surprising that the tools that it provides for building services are also event-driven state machines! These state machines are like actors that respond to incoming messages which are domain commands, may possibly change state through emitting some events as we've seen in domain modeling above, and possibly emit some other type of events for communication and integration with other services; while doing so, they can also perform any side effects that are idempotent, as these machines may be run several times in case of failure. That takes us to the next building block:
Edomaton
An Edomaton is an event-driven automata (pl. Edomata) that can do the following:
- read the current state
- ask what is requested (command message)
- perform side effects
- decide (as described in the decision above)
- notify (emit notifications, integration events, ...)
- output a value
Edomatons are composable, so you can assemble them together, change them or treat them like normal data structures.
Info
An
Edomatonis like an employee, while aDecisionis like a business rule.Employee (Edomaton): "I received a request to withdraw money. Let me check the account state, apply our business rules, record the withdrawal event, and notify the accounting department."
Business rule (Decision): "You can only withdraw if balance >= amount and amount > 0."
For creating our first Edomaton, we need to model 2 more ADTs; commands, and notifications.
Commands vs Events:
- Command: A request to do something (imperative mood): "Deposit $100"
- Event: A fact that happened (past tense): "Deposited $100"
Commands can be rejected; events cannot (they've already happened).
What are Notifications? Messages sent to other systems after something happens. Also called "integration events." Unlike domain events (which rebuild state), notifications tell the outside world what happened.
enum Command {
case Open
case Deposit(amount: BigDecimal)
case Withdraw(amount: BigDecimal)
case Close
}
enum Notification {
case AccountOpened(accountId: String)
case BalanceUpdated(accountId: String, balance: BigDecimal)
case AccountClosed(accountId: String)
}
And we can create our first service:
object AccountService extends Account.Service[Command, Notification] {
import cats.Monad
def apply[F[_] : Monad] : App[F, Unit] = App.router {
case Command.Open => for {
ns <- App.state.decide(_.open)
acc <- App.aggregateId
_ <- App.publish(Notification.AccountOpened(acc))
} yield ()
case Command.Deposit(amount) => for {
deposited <- App.state.decide(_.deposit(amount))
accId <- App.aggregateId
_ <- App.publish(Notification.BalanceUpdated(accId, deposited.balance))
} yield ()
case Command.Withdraw(amount) => for {
withdrawn <- App.state.decide(_.withdraw(amount))
accId <- App.aggregateId
_ <- App.publish(Notification.BalanceUpdated(accId, withdrawn.balance))
} yield ()
case Command.Close =>
App.state.decide(_.close).void
}
}
Reading the service code:
App.router { ... }matches on the incoming commandApp.state.decide(_.open)gets the current state and calls theopendecision on itApp.aggregateIdgets the ID of the aggregate we're working onApp.publish(...)sends a notification to be delivered to other systemsWhat is
F[_]? A type parameter representing "some effect type" (likeIO,Future, etc.). This is called "tagless final" style - your code works with any effect system.
That's it! We've just written our first Edomaton.
Testing an Edomaton
As said earlier, everything in Edomata is just a normal value, and you can treat them like normal data.
Edomatons take an environment, and work in that context to produce a result;
using default DSL like what we've done in this tutorial creates Edomatons that require a data type called RequestContext which is pretty standard modeling of a request context in an event-driven setup.
import java.time.Instant
val scenario1 = RequestContext(
command = CommandMessage(
id = "some random id for request",
time = Instant.MIN,
address = "our account id",
payload = Command.Open
),
state = Account.New
)
// scenario1: RequestContext[Command, Account] = RequestContext(
// command = CommandMessage(
// id = "some random id for request",
// time = -1000000000-01-01T00:00:00Z,
// address = "our account id",
// payload = Open,
// metadata = MessageMetadata(
// correlation = Some("some random id for request"),
// causation = Some("some random id for request")
// )
// ),
// state = New
// )
What's in a RequestContext?
command: The command message being processed
id: Unique identifier for idempotency (so we don't process the same command twice)time: When the command was issuedaddress: Which aggregate this command targetspayload: The actual command datastate: The current state of the aggregate
There are 2 ways for running an Edomaton to get its result:
- Recommended way is to use
.executewhich takes input and returns processed results, which is used in real backends too.
// as we've written our service definition in a tagless style,
// we are free to provide any type param that satisfies required type-classes
import cats.Id
val obtained = AccountService[Id].execute(scenario1)
// obtained: EdomatonResult[Account, Event, Rejection, Notification] = Accepted(Open(0),Chain(Opened),Chain(AccountOpened(our account id)))
// or even use a real IO monad if needed
import cats.effect.IO
AccountService[IO].execute(scenario1)
// res8: IO[EdomatonResult[Account, Event, Rejection, Notification]] = IO(...)
What is
cats.Id? The simplest "effect" - it does nothing special, just returns the value directly. Useful for testing when you don't need real I/O.
- or you can use
.run, which takes input and returns rawResponsemodel, which you can interpret however you like.
AccountService[Id].run(scenario1)
// res9: ResponseT[[_$1 >: Nothing <: Any] => Decision[Rejection, Event, _$1], Rejection, Notification, Unit] = ResponseT(
// result = Accepted(events = Singleton(Opened), result = ()),
// notifications = Singleton(AccountOpened("our account id"))
// )
Now we can easily assert our expectations using our favorite test framework
assertEquals(
obtained,
EdomatonResult.Accepted(
newState = Account.Open(0),
events = ...
notifications = ...
)
)
What's next?
So far we've created our program definitions, in order to run them as a real application in production, we need to compile them using a backend; which I'll discuss in the next chapter