Akka 中的有类型 Actor 是 Active Objects 模式的一种实现.
Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。
有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。
对普通actor来说,你拥有一个外部API (public接口的实例) 来将方法调用异步地委托给其实现类的私有实例。
有类型Actor相对于普通Actor的优势在于有类型Actor拥有静态的契约, 你不需要定义你自己的消息,
它的劣势在于对你能做什么和不能做什么进行了一些限制,比如 你不能使用 become/unbecome.
有类型Actor是使用 JDK Proxies 实现的,JDK Proxies提供了非常简单的api来拦截方法调用。
注意
和普通Akka actor一样,有类型actor也一次处理一个消息。
什么时候使用有类型的Actor
有类型的Actor很适合用在连接actor系统和非actor的代码,因为它可以使你能在外部编写正常的OO模式的代码。但切记不可滥用。
工具箱
返回有类型actor扩展 Returns the Typed Actor Extension
TypedActorExtension
extension =
TypedActor.get(system); //system is an instance of
ActorSystem
判断一个引用是否是有类型actor代理 Returns whether the reference is a Typed Actor Proxy or
not
TypedActor.get(system).isTypedActor(someReference);
返回一个外部有类型actor代理所代表的Akka actor Returns the backing Akka Actor behind an
external Typed Actor
Proxy
TypedActor.get(system).getActorRefFor(someReference);
返回当前的ActorContext//Returns the current ActorContext,
此方法仅在一个TypedActor
实现的方法中有效 // method only valid within methods of a TypedActor
implementation
ActorContext context = TypedActor.context();
返回当前有类型actor的外部代理//Returns the external proxy of the current Typed
Actor,
此方法仅在一个TypedActor 实现的方法中有效// method only valid within methods of a
TypedActor implementation
Squarer sq = TypedActor.<Squarer>self();
返回一个有类型Actor扩展的上下文实例//Returns a contextual instance of the Typed Actor
Extension
这意味着如果你用它创建其它的有类型actor,它们会成为当前有类型actor的子actor//this means that if
you create other Typed Actors with this,
//they will become children to the
current Typed Actor.
TypedActor.get(TypedActor.context());
具体例子及说明
package practise.akka.typedactors
import akka.dispatch.Future
import akka.japi.Option
/**
* 这个就是对外的接口,各函数就是Typed Actor的接口方法
*/
public interface Squarer {
void squareDontCare(int i); //fire-forget
Future<Integer> square(int i); //non-blocking send-request-reply
Option<Integer> squareNowPlease(int i);//blocking send-request-reply
int squareNow(int i); //blocking send-request-reply
}
package practise.akka.typedactors
import akka.dispatch.Future
import akka.dispatch.Futures
import akka.actor.TypedActor
import akka.japi.Option
import akka.actor.ActorContext
import groovy.util.logging.Log4j
import akka.actor.ActorRef
/**
* 这个是接口实现。(实现akka.actor.TypedActor.Receiver接口就能接收actor发来的普通消息(非函数调用消息)。)
*/
@Log4j
class SquarerImpl implements Squarer, akka.actor.TypedActor.Receiver {
private String name;
public SquarerImpl() {
this.name = "default";
}
public SquarerImpl(String name) {
this.name = name;
}
public void squareDontCare(int i) {
log.debug("squareDontCare,fire-and-forget只接收不返回结果,与ActorRef.tell完全一致----" + i) //可以从线程号看出是异步处理的
int sq = i * i; //Nobody cares :(
//返回当前的ActorContext,
// 此方法仅在一个TypedActor 实现的方法中有效
ActorContext context = TypedActor.context();
println "context ---- " + context
//返回当前有类型actor的外部代理,
// 此方法仅在一个TypedActor 实现的方法中有效
Squarer mysq = TypedActor.<Squarer> self();
println "--self --" + mysq
}
public Future<Integer> square(int i) {
log.debug("square send-request-reply Future----" + i) //可以从线程号看出是异步处理的
return Futures.successful(i * i, TypedActor.dispatcher());
}
public Option<Integer> squareNowPlease(int i) {
log.debug("squareNowPlease send-request-reply Option----" + i) //可以从线程号看出是异步处理的
return Option.some(i * i);
}
public int squareNow(int i) {
log.debug("squareNow send-request-reply result----" + i) //可以从线程号看出是异步处理的
return i * i;
}
@Override
void onReceive(Object o, ActorRef actorRef) {
log.debug("TypedActor收到消息----${o}---from:${actorRef}")
}
}
package practise.akka.typedactors
import akka.actor.ActorSystem
import akka.actor.TypedActor
import akka.actor.TypedProps
import com.typesafe.config.ConfigFactory
import akka.japi.Creator
import groovy.util.logging.Log4j
import akka.actor.ActorContext
/**
* 这里创建Typed Actor.
*/
@Log4j
class TypedActorsFactory {
ActorSystem system
private final String config = """akka {
loglevel = "${log?.debugEnabled ? "DEBUG" : "INFO"}"
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.hostname = "127.0.0.1"
remote.netty.port = 2552
remote.log-received-messages = on
remote.log-sent-messages = on
}"""
TypedActorsFactory(String sysName) {
this.system = ActorSystem.create(sysName, ConfigFactory.parseString(config))
}
Squarer getTypedActorDefault() {
Squarer mySquarer =
TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class));
//这里创建的是代理类型
return mySquarer
}
Squarer getTypedActor(String name) {
Squarer otherSquarer =
TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class,
new Creator<SquarerImpl>() {
public SquarerImpl create() { return new SquarerImpl(name); } //这里创建的是具体的实现类型
}),
name); //这个name是actor的name:akka//sys@host:port/user/name
return otherSquarer
}
}
下面用几个测试用例实验一下
package practise.akka.typedactors
import akka.actor.ActorRef
import akka.actor.TypedActor
import akka.actor.UntypedActorContext
import akka.dispatch.Future
import com.baoxian.akka.AkkaClientNoReply
import com.baoxian.akka.AkkaServerApp
class TestTypedActors extends GroovyTestCase {
def testTypeActor() {
println("----")
TypedActorsFactory factory = new TypedActorsFactory("typedServer")
// Squarer squarer = factory?.getTypedActorDefault() //创建代理
Squarer squarer = factory?.getTypedActor("serv") //具体实现
squarer?.squareDontCare(10)
Future future = squarer?.square(10)
AkkaServerApp app = new AkkaServerApp("tmp", "127.0.0.1", 6666, "result") //这是我自己构建的接收器
app.messageProcessor = {msg, UntypedActorContext context ->
log.info("结果为" + msg)
}
app.startup()
akka.pattern.Patterns.pipe(future).to(app.serverActor) //Future的返回结果pipe到接收器中了,在log中能看到结果
println "----" + squarer?.squareNowPlease(10)?.get()
println "----" + squarer?.squareNow(10)
//返回有类型actor扩展
TypedActor.get(factory.system)
//返回一个外部有类型actor代理所代表的Akka actor
ActorRef actor = TypedActor.get(factory.system).getActorRefFor(squarer);
actor.tell("消息") //这个消息将会在SquarerImpl的onReceive方法中接收到
sleep(1000 * 60 * 10)
// TypedActor.get(factory.system).stop(squarer); //这将会尽快地异步终止与指定的代理关联的有类型Actor
TypedActor.get(factory.system).poisonPill(squarer);//这将会在有类型actor完成所有在当前调用之前对它的调用后异步地终止它
}
def testRemoteTypedActor() {
AkkaClientNoReply client = new AkkaClientNoReply("akka://typedServer@127.0.0.1:2552/user/serv")
client.send("远程消息") //这将会在SquarerImpl的onReceive方法中接收到
sleep(1000)
client.shutdown()
}
}
分享到:
相关推荐
typed-actors, 编译时间 TypeChecked akka参与者 注释: 对于 Akka 2.4,你必须向版本号中添加 -a24 。 例如 1.1.0 变成 1.1.0-a24 。 Akka 2.4可用的第一个版本是 1.4.0 。libraryDependencies
标签:akka-typed-actor-1.0-RC2.jar.zip,akka,typed,actor,1.0,RC2,jar.zip包下载,依赖包
Akka Typed的活动来源 该库通过事件源实现演员持久性。 它提供: 演员定义API, 与Akka Typed集成在一起, 静态类型安全, 并支持参与者的持久性(带有事件源和快照); 和此API的实现基于akka-typed和akka-...
akka型团队:与Akka Typed,Akka Cluster Sharding和Cassandra结合在一起的未打磨宠物项目
阿卡型的例子以下博客文章中使用了Akka Typed的快速示例: :
Implementation of AKKA Actors framework in Java.
akka-typed-session:Akka Typed的附加组件,用于跟踪与会话类型一起使用的效果
As well as simplifying development, Akka enables multiple concurrency development patterns with particular support and architecture derived from Erlang’s concept of actors (lightweight concurrent ...
Akka类型的Java群集示例 这是一个Java,Maven,Akka项目,演示了如何设置基本。 该项目是一系列项目中的一个,该项目从一个简单的Akka Cluster项目开始,逐步构建为事件源和命令查询责任隔离的示例。 该项目系列由...
阿卡骆驼乒乓球使用可以使用消息代理和参与者。 的目的是作为您应用程序的集成框架,并且消息代理可以将技术堆栈彼此隔离,克服不兼容性,例如。 在旧版和新版 Akka 之间。 这个非常简单的例子展示了如何设置 ...
akka-typed
You’ll start with the big picture of how Akka works, and then quickly build and deploy a fully functional REST service out of actors. You’ll explore test-driven development and deploying and ...
just neet some points to download
Akka is Open Source and available under the Apache 2 License. Download from http://akka.io/downloads. Please note that all code samples compile, so if you want direct access to the sources, have a ...
akka-kryo-serialization, 基于Kryo的Akka序列化 akka-kryo-serialization-- Scala 和Akka基于kryo的序列化程序这个库为 Scala 和Akka提供定制的基于kryo的序列化程序。 它可以用于更高效的akka远程处理。它还可以...
如何使用 Akka 来构建具备高容错性、可以横向扩展的分布式网络应用程序。Akka 是一 个强大的工具集,提供了很多选项,可以对在本地机器上处理或网络远程机器上处理的 某项工作进行抽象封装,使之对开发者不可见。...
使用 Akka Actors 的简单聊天系统(Akka Remoting) 聊天框演员 管理客户端参与者和客户端参与者注册到聊天框参与者。 ChatBox 演员的 IP 可供客户端演员使用。 ChatBox actor 监视每个向其注册的 Client actor,并...
Akka uses Actors-independently executing processes that communicate via message passing—as the foundation for fault-tolerant applications where individual actors can fail without crashing everything...
akka_2.10
Akka Essentials,学习akka很好的一本书