Ah, writing chats. So simple yet so complex. Yes, writing chats — as in coding them, not chatting (though that might prove to be problematic too, but that’s a whole different problem).
Let’s dive into the technicalities. To give you some more details, the service will be implemented as a mix of a simple REST API and a Web Socket app. To make this more interesting, I decided to use Akka-related libraries and typed actors in as many numbers as possible.
Please note that all the code that’s used in this article is also available in the GitHub repository.
Okay, let’s get to it and start with a simple question:
Why Chat?
Why not? That would be the simplest answer, but as I aspire to be a serious writer, I will add some more context.
Firstly, chats inner workings is a very interesting topic for me. Secondly, writing a chat application is probably the simplest use case to get familiar with the WebSocket protocol. What is more, most of the chats out there on the web work in the same way as the one described below. Of course, scale and features are much different but the base idea remains the same.
Tools Used
Let’s start the technical part of this text with a description of tools that will be further used to implement the whole application.
- Akka — libraries from the Akka toolkit and Actor model will play a crucial role in the implementation of the whole application. Because type safety is important and compile time type checking tends to solve lots of problems before even running the code, I have decided to use as many typed actors as possible. That is why I have added akka-stream-typed and akka-actor-typed as a base instead of their classic versions.
- akka-http — nothing unexpected as I need to expose REST API with WebSocket capabilities and use Akka actors. akka-http is the easiest possible way to achieve it because I do not have to worry about integration and interoperability between different libraries. Additionally, I am using akka-http-circe. I’m going with the circe library for parsing incoming JSONs and I need interoperability between akka-http and circe.
- pureconfig — for loading config files and parsing them to Scala objects without too much boilerplate.
- logback-classic — logging
And that is all I will need to implement the WebSockets chat app in Akka. Let’s start the implementation!
Implementation
I decided to use Scala 2.13.12 as a base Scala version for the chat application.
1. I’m starting by adding all necessary dependencies to the project build.sbt file.
libraryDependencies := Seq( "com.typesafe.akka" %% "akka-actor-typed" % "2.6.19", "com.typesafe.akka" %% "akka-stream-typed" % "2.6.19", "com.typesafe.akka" %% "akka-http" % "10.2.9", "de.heikoseeberger" %% "akka-http-circe" % "1.39.2", "io.circe" %% "circe-generic" % "0.14.1", "com.github.pureconfig" %% "pureconfig" % "0.17.1", "ch.qos.logback" % "logback-classic" % "1.2.11", )
2. I’m adding a config file named chatp-app.conf for the application and two case classes that will later be used to represent these config values.
Example content of .conf file:
http-config { port = 8070 port = ${?PORT} host = "localhost" host = ${?HOST} }
Such notation allows for reading an environmental variable named PORT and if it is not present, then value 8070 will be used. Basically, all the values from this file can be overridden with the use of environmental variables.
Case classes to model config:
case class AppConfig(httpConfig: HttpConfig) case class HttpConfig(port: Int, host: String) { val toPath = s"$host:$port" }
3. I’m implementing a runner class for the application.
object ChatAppStarter { def main(args: Array[String]): Unit = { val appConfig = ConfigSource.resources("chat-app.conf").loadOrThrow[AppConfig] val rootBehavior = Behaviors.setup[Nothing] { context => context.setLoggerName("ChatApiLogger") Behaviors.same } ActorSystem[Nothing](rootBehavior, "ChatApp") } }
As for now, it will only read our configuration file. I will add more code here in the following steps.
4. Now, I’m implementing an actor representing each particular Chat created via the application.
object Chat { sealed trait ChatCommand final case class ProcessMessage(sender: String, content: String) extends ChatCommand final case class AddNewUser(ref: ActorRef[String]) extends ChatCommand def apply(): Behavior[ChatCommand] = Behaviors.setup { _ => var participants = List.empty[ActorRef[String]] val messageQueue = mutable.Queue.empty[String] Behaviors.receiveMessage[ChatCommand] { case ProcessMessage(sender, content) => participants.foreach(ref => ref ! s"$sender: $content") Behaviors.same case AddNewUser(ref) => participants = participants.appended(ref) messageQueue.foreach(m => ref ! m) Behaviors.same } } }
A chat is a simple actor that handles two types of messages: AddNewUser with actorRef argument and ProcessMessage which represents each message sent between users inside the chat.
- After receiving the AddNewUser message, the chat actor will add actorRef to the participant’s list and right away send all the messages already exchanged in the chat to a newly joined user.
- After receiving ProcessMessage, the actor will simply add message content to the message queue and broadcast the content to all the participants present in the chat.
5. I’m implementing a ChatsStore actor responsible for storing all the chats existing in the application.
object ChatsStore { sealed trait StoreCommand final case class AddNewChat(sender: User, receiver: User, replyTo: ActorRef[Int]) extends StoreCommand final case class GetChatMeta(chatId: Int, userName: String, replyTo: ActorRef[Option[GetChatMetaResponse]]) extends StoreCommand final case class GetChatMetaResponse(userName: String, ref: ActorRef[ChatCommand]) private var sequence = 0 private val store = mutable.Map.empty[Int, ChatMetadata] private case class ChatMetadata(participants: Map[String, User], ref: ActorRef[ChatCommand]) { def containUserId(userId: String): Boolean = participants.contains(userId) } def apply(): Behavior[StoreCommand] = Behaviors.setup(context => { Behaviors.receiveMessage { case AddNewChat(sender, receiver, replyTo) => sequence += 1 val newChat: ActorRef[ChatCommand] = context.spawn(Chat(), s"Chat$sequence") val participants = Map(sender.id.toString -> sender, receiver.id.toString -> receiver) val metadata = ChatMetadata(participants, newChat) store.put(sequence, metadata) replyTo ! sequence Behaviors.same case GetChatMeta(chatId, userId, replyTo) => val chatRef = store .get(chatId) .filter(_.containUserId(userId)) .flatMap(meta => meta.participants .get(userId) .map(user => GetChatMetaResponse(user.name, meta.ref)) ) replyTo ! chatRef Behaviors.same } }) }
It is yet another simple actor that supports only two types of messages: AddNewChat and GetChatMeta. After receiving the AddNewChat message, the Store will spawn a new instance of Chat actor with the provided list of is as participants and the next number in sequence as id.
On the other hand, after receiving GetChatMetada, the store will try to find the Chat with the provided id and userId. If the chat for a particular combination exists, the Store will return its acrofRef along with the retrieved user name.
As its internal state actor holds the exact store represented here as a simple map from Integer to internal case class — ChatMetadata alongside sequence implemented as a simple Integer that is used to assign ids for newly added chats.
Additionally, I will add this line to ChatAppStarter to effectively spawn the ChatsStore actor.
val store = context.spawn(ChatsStore(), "Store")
6. I’m implementing ChatService which will play the role of utils class.
class ChatService(contextPath: List[String], httpConfig: HttpConfig) { private val apiPath = contextPath.mkString("/") val path: PathMatcher[Unit] = toPath(contextPath, PathMatcher("")) @tailrec private def toPath(l: List[String], pathMatcher: PathMatcher[Unit]): PathMatcher[Unit] = { l match { case x :: Nil => toPath(Nil, pathMatcher.append(x)) case x :: tail => toPath(tail, pathMatcher.append(x / "")) case Nil => pathMatcher } } def generateChatLinks(chatId: Int, senderId: String, receiverId: String): (String, String) = { val chatPath = s"ws://${httpConfig.toPath}/$apiPath/chats/$chatId/messages" (s"$chatPath/$senderId", s"$chatPath/$receiverId") } }
Here you can see the toPath method, which is unfolding a context path from List[String] to Akka Http compliant PathMatcher and generateChatLinks method that generates links to chats which will be later sent as a response after creating a chat for particular userIds and id combination.
Additionally, I will add this line to ChatAppStarter to instantiate the ChatService class
val service = new ChatService(List("api", "v1"), appConfig.httpConfig)
7. I’m implementing the ChatApi class responsible for exposing REST API with WebSocket capabilities along with Akka Stream Flow to handle WebSocket messages.
class ChatApi(service: ChatService, store: ActorRef[StoreCommand], logger: Logger)(implicit val system: ActorSystem[_]) { private implicit val timeout: Timeout = Timeout(2.seconds) private implicit val ec: ExecutionContextExecutor = system.executionContext val routes: Route = { pathPrefix(service.path / "chats") { concat(pathEnd { post { entity(as[StartChat]) { start => val senderId = start.sender.id.toString val receiverId = start.receiver.id.toString logger.info(s"Starting new chat sender: $senderId, receiver: $receiverId") val eventualCreated = store .ask(ref => AddNewChat(start.sender, start.receiver, ref)) .map(id => { val chatLinks = service.generateChatLinks(id, senderId, receiverId) ChatCreated(id, chatLinks._1, chatLinks._2) }) onSuccess(eventualCreated) { c => complete(StatusCodes.Created, c) } } } }, path(IntNumber / "messages" / Segment) { (id, userId) => onSuccess(store.ask(ref => GetChatMeta(id, userId, ref))) { case Some(meta) => handleWebSocketMessages(websocketFlow(meta.userName, meta.ref)) case None => complete(StatusCodes.NotFound) } }) } } def websocketFlow(userName: String, chatActor: ActorRef[ChatCommand]): Flow[Message, Message, Any] = { val source: Source[TextMessage, Unit] = ActorSource.actorRef[String](PartialFunction.empty, PartialFunction.empty, 5, OverflowStrategy.fail) .map[TextMessage](TextMessage(_)) .mapMaterializedValue(sourceRef => chatActor ! AddNewUser(sourceRef)) val sink: Sink[Message, Future[Done]] = Sink .foreach[Message] { case tm: TextMessage => chatActor ! ProcessMessage(userName, tm.getStrictText) case _ => logger.warn(s"User with id: '{}', send unsupported message", userName) } Flow.fromSinkAndSource(sink, source) } } object ChatApi { case class StartChat(sender: User, receiver: User) case class User(id: UUID, name: String) case class ChatCreated(chatId: Int, senderChatLink: String, receiverChatLink: String) implicit val startChatDecoder: Decoder[StartChat] = deriveDecoder implicit val startChatEncoder: Encoder[StartChat] = deriveEncoder implicit val userDecoder: Decoder[User] = deriveDecoder implicit val userEncoder: Encoder[User] = deriveEncoder implicit val chatCreatedDecoder: Decoder[ChatCreated] = deriveDecoder implicit val chatCreatedEncoder: Encoder[ChatCreated] = deriveEncoder }
Most of the code is dedicated to exposing our API in the form of two endpoints. The first one is a pure REST endpoint under path http://{url}/chats
with POST responsibility for exposing the way to create chats.
The second one is a mixed endpoint under the path http::/{url}/chats/{chatId}/messages/{userId}
, which starts the WebSocket channel alongside process messages for a particular chat.
All of the code from the ChatApi companion object is dedicated to modeling requests and responses along some circe semi-auto derivation which I decided to use over auto derivation.
The most important piece of code in this step is the part responsible for handling WebSocket messages. In particular, the websocketFlow method. It has the classic akka-http signature for handling websockets but the implementation is not so trivial.
Firstly, I am creating an ActorRef based source responsible for receiving messages from actors representing chat users. After each source is materialized, it will send its actor ref to the chat actor for which it was requested. The second thing is the Sink to which all messages from the WebSocket endpoint will go and which will forward all the messages to the interested chat actors. I merged them using Flow.fromSinkAndSource.
Additionally, I will add this line to ChatAppStarter to create a new instance of ChatApi.
val api = new ChatApi(service, store, context.log)(context.system)
8. In this step, I’m adding an object responsible for starting the HTTP Server
object Server { def start(routes: Route, config: HttpConfig, logger: Logger)(implicit system: ActorSystem[_]): Unit = { import system.executionContext val bindingFuture = Http() .newServerAt(config.host, config.port) .bind(routes) bindingFuture.onComplete { case Success(binding) => val address = binding.localAddress logger.info("Server online at http://{}:{}/", address.getHostString, address.getPort) case Failure(ex) => logger.error("Failed to bind HTTP endpoint, terminating system", ex) system.terminate() } } }
Nothing complex here, just a simple Akka Http server that starts with a message.
And one last line in ChatAppStarter to merge them all:Server.start(api.routes, appConfig.httpConfig, context.log)(context.system)
9. I’m adding one more config file responsible for loading the Akka config. After loading, it will result in an automatic keep-alive ping being sent to our chat’s WebSockets connections.
The file is a simple one-liner:
akka.http.server.websocket.periodic-keep-alive-max-idle = 1 second
And I’m adding two lines to ChatAppStarter:
val akkaConfig = ConfigSource.resources("akka.conf").config().right.get
For loading the config file
ActorSystem[Nothing](rootBehavior, "ChatApp", akkaConfig)
For starting the whole actor system with the loaded config
And here we go, we have the whole chat implemented. Let’s test it!
Testing
For tests, I’m using Postman and Simple Web Socket Client.
1. I’m creating a new chat for two users using Postman.
In response body, I got a chat ID and two personalized links for users to join and use a chat in quasi hyper-media style.
2. Now it’s time to use them and check if users can communicate with one another. Simple Web Socket Client comes into play here.
And here we are, everything is working and users are able to communicate with each other. There is one last thing to do. Let’s spend a moment to look at things that can be done better.
What Can Be Done Better
As what I have just built is the most basic chat app, there are a few (or in fact quite a lot) things that may be done better. Below, I listed the things I find worthy of improving:
- Better support for users leaving and rejoining — right now, it is not implemented in the most optimal way and there is a place for significant improvements here.
- Sending attachments — for now, the chat only supports simple text messages. While texting is the basic function of a chat, users enjoy exchanging images and audio files, too.
- Message model — rethink and reconsider what you exactly need in your message, maybe some changes in the API model will also be needed.
- Persistent message store — to make messages persistent between application restarts. Also, some level of encryption will be required because of security concerns.
- Group chat support — now, the application only supports one chat, so group chats are the first logical next feature to add to the application.
Summary
Et voilà! The chat is implemented and the main task is done. In the last paragraph, I described some ideas on the topic of what to do next to develop the app.
Please keep in mind that this chat case is very simple and it will require lots of changes and development for any type of commercial project.
Anyway, I hope that you learned something new while reading this article. Thank you for your time
Comments are closed.