本文介绍akka的基本使用方法,由于属于基础功能,想不出一个很高大上的名称,此处就以基础模式命名。下文会介绍actor的使用方法,及其优劣点。
class SimpleActor(name:String) extends Actor { private def doWork(message:SayHello):Unit = { println(s"$name 收到 ${message.from.path.name} 的消息 [$message] ,工作进行中... 当前线程号 ${Thread.currentThread().getId}") } override def receive: Receive = { case msg @ SayHello(from,message) => doWork(msg) val returnMsg = HelloSaid(s"嗨 ${from.path.name} ,${self.path.name} 收到了 $message 消息") println(s"$name 工作结束,准备返回消息[${returnMsg.message}]") } } object SimpleBasicPattern { def main(args: Array[String]): Unit = { val system = ActorSystem("BasicPattern",ConfigFactory.load()) val person1 = system.actorOf(Props(new SimpleActor("person1")),"personActor1") println(s"Main thread Id ${Thread.currentThread().getId}") person1 ! SayHello(person1,"Hello World 1") person1 ! SayHello(person1,"Hello World 2") } }
输出: Main thread Id 1 person1 收到 personActor1 的消息 [SayHello(Actor[akka://BasicPattern/user/personActor1#1662593548],Hello World 1)] ,工作进行中... 当前线程号 13 person1 工作结束,准备返回消息[嗨 personActor1 ,personActor1 收到了 Hello World 1 消息] person1 收到 personActor1 的消息 [SayHello(Actor[akka://BasicPattern/user/personActor1#1662593548],Hello World 2)] ,工作进行中... 当前线程号 13 person1 工作结束,准备返回消息[嗨 personActor1 ,personActor1 收到了 Hello World 2 消息]
如上图,我设计了一个简单的actor:HelloWroldActor。它有两个方法,其中receive是收到消息之后处理消息的入口函数,定义了对消息的处理方式。收到SayHello之后,调用doWork同步处理消息
我们可以跟上一篇博客进行对比,此处给person1发送了一条SayHello消息,在OOP中是直接调用函数,此处使用 ! 函数发送消息;person1收到消息后,同步调用doWork处理消息。这是最基本的actor使用方式:通过 ! 发消息给actor。从输出中可以看到,主线程和doWork所在线程是不同的线程。
这是基础模式的最基本形式,给actor发送消息,actor对消息进行响应,发送和响应是异步的,同一个actor对所有的消息都是按照邮箱队列的顺序,串行调用的。下面是基础模式的另外一种高级形式。
class HelloWorldActor(other:ActorRef,name:String) extends Actor { private def doWork(message:String):HelloSaid = { println(s"$name 收到 ${other.path.name} 的消息 [$message] ,工作进行中...") HelloSaid("这是处理后返回的消息") } override def receive: Receive = { case DoWork(message) => println(s"嗨 ${other.path.name} ,我正在为你工作") val returnMsg = doWork(message) other ! WorkDone(returnMsg.message) case WorkDone(message) => println(s"$name 收到了 ${sender().path.name} 的回复消息:[$message]") } } object BasicPattern { def main(args: Array[String]): Unit = { val system = ActorSystem("BasicPattern",ConfigFactory.load()) val person1 = system.actorOf(Props(new HelloWorldActor(null,"person1")),"personActor1") val person2 = system.actorOf(Props(new HelloWorldActor(person1,"person2")),"personActor2") person2 ! DoWork("Hello World") } }
输出: 嗨 personActor1 ,我正在为你工作 person2 收到 personActor1 的消息 [Hello World] ,工作进行中... person1 收到了 personActor2 的回复消息:[嗨 personActor1 工作已完成,这是返回消息 HelloSaid(这是处理后返回的消息)]
在上面的模式中,我们首先给person2发送了开始工作的消息,person2收到消息后,开始为person1工作:调用doWork进行计算。计算结束后把消息发送给了person1,person1收到workDone的消息后,将结果打印了出来。这个例子稍微复杂点,涉及到了两个actor的通信。但这仍然是一种简单的形式,因为person1的actorRef引用是通过构造函数传递给person2的,这样person2就只能为person1工作。这非常不方便,因为actor创建的时候不一定能知道另外一个actor的地址。那么下面又是一种高级形式:
class HelloActor(name:String) extends Actor { private def doWork(message:String,forActor:ActorRef):HelloSaid = { println(s"$name 收到 ${forActor.path.name} 的消息 [$message] ,工作进行中...") HelloSaid("这是处理后返回的消息") } override def receive: Receive = { case DoWorkFor(message,forActor) => println(s"嗨 ${forActor.path.name} ,我正在为你工作") val returnMsg = doWork(message,forActor) forActor ! WorkDone(returnMsg.message) case WorkDone(message) => println(s"$name 收到了 ${sender().path.name} 的回复消息:[$message]") } } object BasicPattern3 { def main(args: Array[String]): Unit = { val system = ActorSystem("BasicPattern2",ConfigFactory.load()) val person1 = system.actorOf(Props(new HelloActor("person1")),"personActor1") val person2 = system.actorOf(Props(new HelloActor("person2")),"personActor2") person2 ! DoWorkFor("Hello World",person1) } }
输出: 嗨 personActor1 ,我正在为你工作 person2 收到 personActor1 的消息 [Hello World] ,工作进行中... person1 收到了 personActor2 的回复消息:[这是处理后返回的消息]
在上面的图中,我们把person1的actorRef通过消息的形式发送给了person2,这样person2就能为不同的person工作了,因为工作的对象是通过消息传递的。
通过上面3个例子,我们可以看到,只能通过给actor发送消息与actor通信,调用其对应的函数,函数的返回结果也只能异步的发送给调用方。而在OOP中调用另一个对象的函数,看起来比这个简单多了,获取函数处理结果也非常简单。但读者要仔细思考这两者的区别,actor的通信全都是异步的。意味着person2给person1发送消息之后,可以立即进行其他的处理,而不需要等待person1的应答,即person1和person2功能做到了完全解耦。
BasicPattern2和BasicPattern3的区别是调用方获取方式的不同,其实还有另外一种形式:
class Master(workerPath:String) extends Actor{ override def receive: Receive = { case DoWork(message) => println(s"master 收到 doWork消息:$message") val worker = context.actorSelection( s"/user/$workerPath") worker ! DoWorkFor(message,self) case WorkDone(message) => println(s"master 收到 ${sender().path.name} 的返回消息 $message") } } class Worker extends Actor{ private def doWork(message:String):String = { println(s"worker 收到了消息 $message") "这里是worker返回消息" } override def receive: Receive = { case DoWorkFor(message,forActor) => val result = doWork(message) forActor ! WorkDone(result) } } object BasicPattern4 { def main(args: Array[String]): Unit = { val system = ActorSystem("BasicPattern2",ConfigFactory.load()) system.actorOf(Props(new Worker()),"workerActor") val master = system.actorOf(Props(new Master("workerActor")),"masterWorker") master ! DoWork("Hello World") } }
输出 master 收到 doWork消息:Hello World worker 收到了消息 Hello World master 收到 workerActor 的返回消息 这里是worker返回消息
在这个形式中,master通过worker的actorPath,用actorSelection查询了worker的地址,然后发送消息给它。与BasicPattern3不同的是,master不需要知道worker的邮箱地址,它只需要知道worker的actorPath就可以发消息了。然后master和worker就可以按照前面的pattern互通消息了。
请注意DoWork、WorkDone、DoWork这三个消息的处理完全是异步的,没有任何直接的关系。
上面的4个例子我都将其认定为基础模式,因为这都是Akka的基础功能,没有涉及太高深的技术,也是在学习Akka的初期最容易理解的模式。虽然简单,还是有很多值得学习的地方的。在下一篇博客中,我们会针对BasicPattern4进行优化,讲解另外MasterWorkerBackend模式,这种模式比较复杂,希望读者深刻理解本博文的4个例子,再阅读后续文章。