Rewriting Pipes and Filters with Typed Actors
Today I found some of my code I wrote in Scala using Akka as I was following along the excellent Vaughn Vernon book Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka.
Seeing as I recently started playing with Akka Typed I decided to rewrite the Pipes and Filters example code with types to see if there would be any advantages to doing so.
The pattern is described in Chapter 4: Pipes and Filters in Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka by Vaughn Vernon. In order to save time I’m not going to recap what is already in the book. I will only show the code and it should not be very hard to grasp the intent. You can find the full source code for this posting at my repository.
The first thing I did was to convert each actor from a class to a behavior. In certain cases, it didn’t even need to be a function, but just a val.
Let’s start with the three filters. All these filters take the same type of message as input and forward possibly modified message but of the same type to the next filter. With regular Akka we have no way of enforcing that the types have to be the same, and you’ll agree with me that this fact is not immediately obvious from the code below:
case class ProcessIncomingOrder(orderInfo: Array[Byte])class Authenticator(nextFilter: ActorRef) extends Actor {
def receive = {
case message: ProcessIncomingOrder =>
val text = new String(message.orderInfo)
println(s"Authenticator authenticating: $text")
val orderText = text.replace("(certificate)", "")
nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte))
}
}class Decrypter(nextFilter: ActorRef) extends Actor {
def receive = {
case message: ProcessIncomingOrder =>
val text = new String(message.orderInfo)
println(s"Decrypter decrypting: $text")
val orderText = text.replace("(encryption)", "")
nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte))
}
}class Deduplicator(nextFilter: ActorRef) extends Actor {
var ids:Set[String] = Set.empty
def orderIdFrom(orderText: String): String = {
val orderIdIndex = orderText.indexOf("id='") + 4
val orderIdLastIndex = orderText.indexOf("'", orderIdIndex)
orderText.substring(orderIdIndex, orderIdLastIndex)
}
def receive = {
case message:ProcessIncomingOrder =>
val text = new String(message.orderInfo)
val orderId = orderIdFrom(text)
if (ids.contains(orderId)) {
println(s"Deduplicator excluding duplicate: $orderId")
}
else {
ids = ids + orderId
nextFilter ! message
}
}
}
Now let us see the same code with typed actors:
def authenticator(nextFilter: ActorRef[ProcessIncomingOrder]): Behavior[ProcessIncomingOrder] =
Static {
case message =>
val text = new String(message.orderInfo)
println(s"Authenticator authenticating: $text")
val orderText = text.replace("(certificate)", "")
nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte))
}
def decrypter(nextFilter: ActorRef[ProcessIncomingOrder]): Behavior[ProcessIncomingOrder] =
Static {
case message =>
val text = new String(message.orderInfo)
println(s"Decrypter decrypting $text")
val orderText = text.replace("(encryption)", "")
nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte))
}
def deduplicator(nextFilter: ActorRef[ProcessIncomingOrder]): Behavior[ProcessIncomingOrder] = {
var ids: Set[String] = Set.empty
def orderIdFrom(orderText: String): String = {
val orderIdIndex = orderText.indexOf("id='") + 4
val orderIdLastIndex = orderText.indexOf("'", orderIdIndex)
orderText.substring(orderIdIndex, orderIdLastIndex)
}
Static {
case message =>
val text = new String(message.orderInfo)
val orderId = orderIdFrom(text)
if (ids.contains(orderId)) {
println(s"Deduplicator excluding duplicate: $orderId")
}
else {
ids = ids + orderId
nextFilter ! message
}
}
}
Here we can see that each behavior takes the same message type as the next actor will receive. The contract of each filter is the same!
Notice also how we no longer have to write:
case message: ProcessIncomingOrder =>
but can simply say:
case message =>
this is because we declared our behavior to be of type ProcessIncomingOrder, so the type is known at compile time. Awesome!
Then we have two other actors, the first OrderAcceptanceEndpoint receives a bunch of bytes and converts it to a ProcessIncomingOrder message that is suitable to be fed into the first filter. This actor initiates flow. The actor at the end, that receives the final message and takes action on it is called OrderManagementSystem. Here they both are:
class OrderAcceptanceEndpoint(nextFilter: ActorRef) extends Actor {
def receive = {
case rawOrder: Array[Byte] =>
val text = new String(rawOrder)
println(s"OrderAcceptanceEndpoint processing: $text.")
nextFilter ! ProcessIncomingOrder(rawOrder)
}
}class OrderManagementSystem extends Actor {
def receive = {
case message:ProcessIncomingOrder =>
val text = new String(message.orderInfo)
println(s"OrderManagementSystem processing: $text")
}
}
Here are the same two actors in Akka Typed:
def orderAcceptanceEndpoint(nextFilter: ActorRef[ProcessIncomingOrder]): Behavior[Array[Byte]] =
Static {
case rawOrder =>
val text = new String(rawOrder)
println(s"OrderAcceptanceEndpoint processing: $text.")
nextFilter ! ProcessIncomingOrder(rawOrder)
}val orderManagementSystem: Behavior[ProcessIncomingOrder] = {
Static {
case message =>
val text = new String(message.orderInfo)
println(s"OrderManagementSystem processing: $text")
}
}
You can view the complete source code for both examples here.
Now, I’ve been lying to you a little here, as clearly the behaviors are not actors. To create an actual actor we ask the system to give us one. I feel like this distinction is very important, but I’ve yet to learn its true power.
Regardless, here is how we would instantiate actors and piece this whole system together with regular Akka:
object PipesAndFiltersDriver extends App {
val orderText = "(encryption)(certificate)<order id='123'>xbc</order>"
val orderText2 = "(encryption)(certificate)<order id='234'>xbcd</order>"
val rawOrderBytes = orderText.toCharArray.map(_.toByte)
val rawOrderBytes2 = orderText2.toCharArray.map(_.toByte)
val system = ActorSystem("system")
val filter5 = system.actorOf(Props[OrderManagementSystem], "orderMgmntSystem")
val filter4 = system.actorOf(Props(classOf[Deduplicator], filter5), "deduplicator")
val filter3 = system.actorOf(Props(classOf[Authenticator], filter4), "authenticator")
val filter2 = system.actorOf(Props(classOf[Decrypter], filter3), "decrypter")
val filter1 = system.actorOf(Props(classOf[OrderAcceptanceEndpoint], filter2), "orderAccEndpoint")
val input = filter1
input ! rawOrderBytes
input ! rawOrderBytes
input ! rawOrderBytes2
// ctrl-c to shutdown the system
}
With Typed Akka we cannot just create actors without a parent. So we encompass all our actor creation within a single guardian, and then instantiate it in turn:
object PipesAndFiltersTypedDriver extends App {
val pipesAndFiltersTypedDriver: Behavior[Array[Byte]] =
ContextAware[Array[Byte]] {
context =>
val filter5 = context.spawn(Props(Filters.orderManagementSystem), "orderMgmntSystem")
val filter4 = context.spawn(Props(Filters.deduplicator(filter5)), "deduplicator")
val filter3 = context.spawn(Props(Filters.authenticator(filter4)), "authenticator")
val filter2 = context.spawn(Props(Filters.decrypter(filter3)), "decrypter")
val filter1 = context.spawn(Props(Filters.orderAcceptanceEndpoint(filter2)), "orderAccEndpoint")
Static {
case message => filter1 ! message
}
}
val orderText = "(encryption)(certificate)<order id='123'>xbc</order>"
val orderText2 = "(encryption)(certificate)<order id='234'>xbcd</order>"
val rawOrderBytes = orderText.toCharArray.map(_.toByte)
val rawOrderBytes2 = orderText2.toCharArray.map(_.toByte)
val system: ActorSystem[Array[Byte]] = ActorSystem("system", Props(pipesAndFiltersTypedDriver))
system ! rawOrderBytes
system ! rawOrderBytes
system ! rawOrderBytes2
// ctrc-c to shutdown the system
}
Observations
Rewriting the code with typed actors gives a few immediate benefits:
- There is immediately less code. Most of the time I no longer needed to write full-fledged classes.
- Easier message flow. Since all the behaviors are typed, I could immediately see which way the messages could go in and out.
- Pleasure to write. There is something about having to define explicit contracts between components and then seeing enforced at compile time. It makes my brain tingle.
There are a few things missing, and that is mostly in the area of tooling:
- Testing infrastructure is not here yet. No Test Kit here to hold my hand.
- Lack of more in-depth guides. While it’s true that the code base for Akka Typed is pretty small, and you can probably skim through most of it, a more extensive guide would have been useful. Besides, who doesn’t love reading about actors?
- No guidance on idiomatic way of doing things. I am sure I am doing things the “non-typed” way, and the behaviors can get pretty complex, but I don’t have the confidence yet to make the call of which way to go. More examples would be an amazing resource.
Conclusion
Akka Typed is an amazing undertaking. In my opinion, it is more important than the streams effort that the Typesafe (company that created Akka) is focusing on right now. Especially with the newcomers like pony, it seems clear that Actor model is here to stay.
It is only a matter of time before Actor model is the standard, being taught in university and used in enterprise systems. One of the biggest downsides to Akka has always been its lack of compile-time guarantees. With the advent of Akka Typed this guarantee is now here. I suspect we will see a rapid growth in the number of complex, concurrent systems in the coming years. Jump on this bandwagon now! Why wait? :-)