假设有一个很耗时的运算,单台机器已经没法满足需求,这时你可以想到由多台计算机协作完成。具体怎么做呢。
举个很简单的例子,假设这个耗时的运算是从1加到100000,你现在有两台服务器,可以让这两台服务器分别完成从1加到50000,和从50001加到100000,然后本机完成这两个结果之和。
两台服务器分别启动两个akka Server,同时还有一个CalcActor。这个计算actor接收两个参数:Integer start和Integer
end,可以从start一直加到end,最后将结果返回给发送者:getSender().tell(result)。
@Log4j
class CalcActor extends UntypedActor {
@Override
void onReceive(Object message) {
log.debug "CalcActor received: ${message}----self:${getSelf()},sender:${getSender()}"
if (message instanceof String) {
String[] args = message.split(",")
int start = Integer.parseInt(args[0])
int end = Integer.parseInt(args[1])
int result = 0
println("start calc:" + start + " upto " + end)
start.upto(end) {
result += it
}
sleep(5000) //模拟还要额外耗时5秒
println("result:" + result)
getSender().tell(result)
} else {
unhandled(message)
}
}
}
两个服务器分别为:
AkkaServerApp serverA = new AkkaServerApp("sc", "10.68.3.122", 8888, "calc") //AkkaSystemName为sc,ip为10.68.3.122,端口为8888,serviceName为calc。
AkkaServerApp serverA = new AkkaServerApp("sp", "10.68.3.124", 8888, "calc")//AkkaSystemName为sp,ip为10.68.3.124,端口为8888,serviceName为calc。
主要的代码在客户端:
public static void main(String[] args) throws Exception {
final AkkaServerApp app = new AkkaServerApp("xwc", "127.0.0.1", 6666, "client");//客户端akka配置
ActorRef remoteCalcA1 = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA1");//将CalcActor发布到远程10.68.3.122上
ActorRef remoteCalcA2 = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA2");//将CalcActor发布到远程10.68.3.124上
final List<Future<Double>> frs = new ArrayList<Future<Double>>();//异步返回结果Future存放在list中
//tell只请求,是否响应它完全不知道。ask是请求,并明确知道未来会相应。
// remoteCalcA.tell("1,10000", app.getServerActor());
// remoteCalcB.tell("10001,20000", app.getServerActor());
Future f1 = akka.pattern.Patterns.ask(remoteCalcA1, "1,50000", 150000);//让远程122计算从1加到50000,超时时间为150秒
Future f2 = akka.pattern.Patterns.ask(remoteCalcA1, "50001,100000", 150000);//并发地让远程124计算从50001加到100000,超时时间150秒
frs.add(f1);
frs.add(f2);
Future<Iterable<Double>> future = Futures.sequence(frs, app.getSystem().dispatcher());将未来返回的结果转换成Future<Iterable<Double>>
Future<Double> fr = future.map(new Mapper<Iterable<Double>, Double>() {
@Override
public Double apply(Iterable<Double> parameter) {
Double result = 0d;
for (Double s : parameter) {//计算两个服务器返回的结果
result += s;
}
return result;
}
});
fr.onSuccess(new OnSuccess<Double>() {
@Override
public void onSuccess(Double result) {
System.out.println("云计算返回结果-----" + result);
}
});
}
还可以让服务器并发处理:把给从1加到50000的任务分成5个线程并行处理:1..10000,10001..20000,20001..30000,30001..40000,40001..50000,这样能更好地提高效率。
如果按上面的方法仅仅是发布多个remote actor:
ActorRef remoteCalcAn = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new
RemoteScope(new Address("akka", "sc",
"10.68.3.122", 8888)))), "clientCalcAn");
是没法提高效率的,因为这时的CalcActor是单线程的,它只会先接收1..10000,处理完后再接收10001..20000并处理。。。。。
使其能够并行处理很简单,创建remoteActor时加上withRoute即可:
ActorRef remoteCalcAn = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(5)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcAn"); //RoundRobinRouter的参数5可以理解为分配5个线程并行处理
代码跟上面基本相同
public static void main(String[] args) throws Exception {
final AkkaServerApp app = new AkkaServerApp("xwc", "127.0.0.1", 6666, "client");
ActorRef remoteCalcA1 = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(4)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA1");
ActorRef remoteCalcB1 = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(4)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sp", "10.68.3.124", 8888)))), "clientCalcB1");
final List<Future<Double>> frs = new ArrayList<Future<Double>>();
Future f1 = akka.pattern.Patterns.ask(remoteCalcA1, "1,10000", 150000);
Future f2 = akka.pattern.Patterns.ask(remoteCalcA1, "10001,20000", 150000);
Future f3 = akka.pattern.Patterns.ask(remoteCalcA1, "20001,30000", 150000);
Future f4 = akka.pattern.Patterns.ask(remoteCalcA1, "30001,40000", 150000);
Future f5 = akka.pattern.Patterns.ask(remoteCalcB1, "40001,50000", 150000);
Future f6 = akka.pattern.Patterns.ask(remoteCalcB1, "50001,60000", 150000);
Future f7 = akka.pattern.Patterns.ask(remoteCalcB1, "60001,70000", 150000);
Future f8 = akka.pattern.Patterns.ask(remoteCalcB1, "70001,80000", 150000);
frs.add(f1);
frs.add(f2);
frs.add(f3);
frs.add(f4);
frs.add(f5);
frs.add(f6);
frs.add(f7);
frs.add(f8);
Future<Iterable<Double>> future = Futures.sequence(frs, app.getSystem().dispatcher());
Future<Double> fr = future.map(new Mapper<Iterable<Double>, Double>() {
@Override
public Double apply(Iterable<Double> parameter) {
Double result = 0d;
for (Double s : parameter) {
result += s;
}
return result;
}
});
fr.onSuccess(new OnSuccess<Double>() {
@Override
public void onSuccess(Double result) {
System.out.println("云计算返回从1加到80000的结果-----" + result);
}
});
}
分享到:
相关推荐
用Scala写的akka actor简单demo,已经打包成SBT程序,因为上传大小限制依赖包没上传,用户安装了sbt后只需要执行update命令即可
Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。 通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发...
akka 官方网站actor部分文档翻译
赠送jar包:akka-actor_2.11-2.5.19.jar; 赠送原API文档:akka-actor_2.11-2.5.19-javadoc.jar; 赠送源代码:akka-actor_2.11-2.5.19-sources.jar; 赠送Maven依赖信息文件:akka-actor_2.11-2.5.19.pom; 包含...
赠送jar包:akka-actor_2.11-2.5.19.jar; 赠送原API文档:akka-actor_2.11-2.5.19-javadoc.jar; 赠送源代码:akka-actor_2.11-2.5.19-sources.jar; 赠送Maven依赖信息文件:akka-actor_2.11-2.5.19.pom; 包含...
响应式架构 消息模式Actor实现与Scala.Akka应用集成 响应式架构 消息模式Actor实现与Scala.Akka应用集成
使用Actor处理高并发。易于测试。服务的插件管理。高性能,可伸缩的Java Tcp服务器架构,1.Avalon基于Akka构建的服务器核心。天生分布式基因便于横向拓展;2.网络部分使用Netty;3.服务器分为单服务器和多节点分布式...
akka-http-rest, 在 akka http上使用灵活REST服务编写示例 Akka平滑REST服务模板 例如展示如何使用Akka和Slick在Lightbend堆栈上创建反应性REST服务。示例包含实体交互的完整REST服务。插件功能:CRUD操作实体部分...
Java 中 Akka 的简单生产者消费者示例 此存储库包含 3 个简单网络爬虫的示例: 一个连续的例子 将逻辑拆分为 3 个 Actor 的示例 页面的检索由多个 Actor 并行处理的示例。 检索失败且应用程序挂起的示例 重新发送...
scalafx-akka-demo 使用Akka actor的ScalaFX应用程序的简单示例如何构建和运行安装更新40或更高版本。 安装 运行示例:转到包含此示例的目录,并使用SBT生成并运行示例: %> sbt run它将下载所需的依赖项,包括Scala...
akka-actor_2.11 jar包
Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。 Akka处理并发的方法基于Actor模型。在Akka里,Actor之间通信的唯一机制就是消息...
akka-actor_2.12 jar包
赠送jar包:akka-actor_2.11-2.5.21.jar; 赠送原API文档:akka-actor_2.11-2.5.21-javadoc.jar; 赠送源代码:akka-actor_2.11-2.5.21-sources.jar; 赠送Maven依赖信息文件:akka-actor_2.11-2.5.21.pom; 包含...
第 2 章 Actor 与并发:响应式编程。Actor 与 Future 的使用。 第 3 章 传递消息:消息传递模式。 第 4 章 Actor 的生命周期—处理状态与错误:Actor 生命周期、监督机制、Stash/ Unstash、Become/Unbecome 以及有限...
通过Actor模型使用响应式消息传输模式,可编写出具有高性能、高响应性、高可伸缩性和高韧性的并发应用程序。《响应式架构:消息模式Actor实现与Scala、Akka应用集成》由10章构成,详细介绍了使用Actor模型中的响应式...
赠送jar包:akka-actor_2.11-2.4.20.jar; 赠送原API文档:akka-actor_2.11-2.4.20-javadoc.jar; 赠送源代码:akka-actor_2.11-2.4.20-sources.jar; 赠送Maven依赖信息文件:akka-actor_2.11-2.4.20.pom; 包含...
第 2 章 Actor 与并发:响应式编程。Actor 与 Future 的使用。 第 3 章 传递消息:消息传递模式。 第 4 章 Actor 的生命周期—处理状态与错误:Actor 生命周期、监督机制、Stash/ Unstash、Become/Unbecome 以及有限...
使用 Akka Actors 的简单聊天系统(Akka Remoting) 聊天框演员 管理客户端参与者和客户端参与者注册到聊天框参与者。 ChatBox 演员的 IP 可供客户端演员使用。 ChatBox actor 监视每个向其注册的 Client actor,并...