Akka UDP Messaging
January 18, 2018
In this tutorial, we will create an Akka UDP system and send a message between localhost using different ports.
Git Hub Project
The complete project can be cloned from here.
Prerequisites
Please make sure both Scala and SBT are installed on your local Linux system. Please follow my tutorial post here to install.
Reference
Overview
Sometimes we want to use awesome Akka and yet we want to send a message to another system which does not support Akka natively. Then a simple solution is that we send messages using UDP inside Akka. By using UDP, we are free to send datagrams to any destination and still remain connectionless.
We will create two programs called
- ScheduledSenderApp
- ListenerApp
The ScheduledSenderApp
will send a message to the ListenerApp
every one second. Once the ListenerApp
received the message, it replies back with another message.
The network setup will be like this,
Note: You can also use Python or any other UDP program to test either ScheduledSenderApp or ListenerApp individually.
Add Dependencies
We will be using scala version 2.12.4
and akka-remote
library. Add the following line to the build.sbt
. You can check the latest version from here.
scalaVersion := "2.12.4"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.8"
For SBT, we will be using 0.13.16
. Therefore, add the following line to build.properties
sbt.version=0.13.16
Note: These dependencies are for both Apps.
Scheduled Sender App
This is the main entry point to run an actor. Here, we assign our local and remote IP address to the ScheduledSenderActor
Actor class. Next, we will look at this class.
1
2
3
4
5
6
7
8
9
10
11
import java.net.InetSocketAddress
import akka.actor._
object ScheduledSenderApp extends App {
val system = ActorSystem("ActorSystem")
val remote = new InetSocketAddress("localhost", 5005)
val local = new InetSocketAddress("localhost", 5115)
val udp: ActorRef = system.actorOf(ScheduledSenderActor(local, remote), name = "Udp")
}
Scheduled Sender Actor
At line 14
, we bind self
and local
address. Once they are bound, Udp.Bound
at line 18
will be executed. Then, context
will switch to ready
function which is defined at line 22
. Once all these are done, the scheduler at line 15
will call hello
every 1 second. Since hello
is String
type, the hello
command will call case msg: String
at line 23
and it will send the message to remote
using Udp.Send
Once we get received data from some remote address, we will print them out as shown in line 26 to 29
.
At line 33
, It is a constructor for the ScheduledSenderActor
class.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable, Props}
import akka.io.{IO, Udp}
import akka.util.ByteString
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
class ScheduledSenderActor(local: InetSocketAddress, remote: InetSocketAddress) extends Actor with ActorLogging {
import context.system
IO(Udp) ! Udp.Bind(self, local)
val scheduleCancellable: Cancellable = system.scheduler.schedule(0.seconds, 1.second, self, "hello")
def receive = {
case Udp.Bound(_) ⇒
context.become(ready(sender()))
}
def ready(send: ActorRef): Receive = {
case msg: String ⇒
send ! Udp.Send(ByteString(msg), remote)
case Udp.Received(data, remoteAddress) ⇒
val ipAddress = remoteAddress.getAddress.getHostAddress
val port = remoteAddress.asInstanceOf[InetSocketAddress].getPort
log.info(s"we received ${data.utf8String} from IP Address: $ipAddress and port number: $port")
}
}
object ScheduledSenderActor {
def apply(local: InetSocketAddress, remote: InetSocketAddress) = Props(classOf[ScheduledSenderActor], local, remote)
}
Listener App
Similar to the ScheduledSenderApp
class but we only swap port number.
1
2
3
4
5
6
7
8
9
10
11
import java.net.InetSocketAddress
import akka.actor.{ActorRef, ActorSystem}
object ListenerApp extends App {
val system = ActorSystem("RemoteActorSystem")
val local = new InetSocketAddress("localhost", 5005)
val remote = new InetSocketAddress("localhost", 5115)
val udp: ActorRef = system.actorOf(ListenerActor(local, remote), name = "Udp")
}
Remote actor
It is also similar to the ScheduledSenderActor
class. We just send the message back to remote at line 25
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.io.{IO, Udp}
import akka.util.ByteString
class ListenerActor(local: InetSocketAddress, remote: InetSocketAddress) extends Actor with ActorLogging {
import context.system
IO(Udp) ! Udp.Bind(self, local)
def receive = {
case Udp.Bound(_) ⇒
context.become(ready(sender()))
}
def ready(send: ActorRef): Receive = {
case msg: String ⇒
send ! Udp.Send(ByteString(msg), remote)
case Udp.Received(data, remoteAddress) ⇒
val ipAddress = remoteAddress.getAddress.getHostAddress
val port = remoteAddress.asInstanceOf[InetSocketAddress].getPort
log.info(s"we received ${data.utf8String} from IP Address: $ipAddress and port number: $port")
send ! Udp.Send(ByteString("Hello back"), remote)
}
}
object ListenerActor {
def apply(local: InetSocketAddress, remote: InetSocketAddress) = Props(classOf[ListenerActor], local, remote)
}
Note: At
line 25
, you can use either useremote
orremoteAddress
to reply back. TheremoteAddress
is holding an address where we received data from at that point in time.
Result
Once you use click Run
or sbt run
, you will receive these messages.
At Scheduled Sender App,
... we received Hello back from IP Address: 127.0.0.1 and port number: 5005
At Listener App,
... we received hello from IP Address: 127.0.0.1 and port number: 5115