当前位置: 首页 > news >正文

Active Objects设计模式

   Active是主动的意思,Active Object是主动对象的意思。主动对象就是拥有自己的独立线程。 Active Object模式不仅有自己的独立线程,还可以接受异步消息,并能返回处理结果。从标准的Active Objects设计入手,将一个接口的方法调用转换成可接收异步消息的主动对象,也就是说方法的执行和方法的调用是在不同的线程中进行的,接口方法的参数以及具体的实现封装成特定的Message告诉执行线程,接口方法需要返回值,必须以Future形式返回。

   第一种方法:当某个线程调用OrderService接口的findOrderDetails方法时,是会发送一个包含findOrderDetails方法参数以及OrderService具体实现的Message到Message队列,执行线程通过从队列中获取Message来调用具体的实现,接口的方法的调用和接口方法的执行分别处于不同的线程中,因此称该接口为Active Objects(可接受异步消息的主动对象)。 具体样例代码如下:

public interface OrderService {
Future<String> findOrderDetails(long orderId);
void order(String account,long orderId);
}
public class OrderServiceImpl implements OrderService{
@Override
public Future<String> findOrderDetails(long orderId) {
return FutureService.<Long,String>newService().submit(input->{
try {
System.out.println("process the orderId->"+orderId);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "The order details Information";
},orderId);
}@Override
public void order(String account, long orderId) {
try {
System.out.println("process the orderId->"+orderId+" , account->"+account);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}}
public class OrderServiceProxy implements OrderService{
private final OrderService orderService;
private final ActiveMessageQueue activeMessageQueue;public OrderServiceProxy(OrderService orderService,ActiveMessageQueue activeMessageQueue) {
this.orderService=orderService;
this.activeMessageQueue=activeMessageQueue;
}@Override
public Future<String> findOrderDetails(long orderId) {
final ActiveFuture<String> activeFuture=new ActiveFuture<>();
Map<String,Object> params=new HashMap<>();
params.put("orderId", orderId);
params.put("activeFuture", activeFuture);
MethodMessage message=new FindOrderDetailsMessage(params,orderService);
activeMessageQueue.offer(message);
return activeFuture;
}@Override
public void order(String account, long orderId) {
Map<String,Object> params=new HashMap<>();
params.put("account", account);
params.put("orderId", orderId);
MethodMessage message=new OrderMessage(params,orderService);
System.out.println("processing in OrderServicePoxy.order method");
activeMessageQueue.offer(message);
}}
public class ActiveFuture<T> extends FutureTask<T>{
@Override
public void finish(T result) {
super.finish(result);
}
}
import java.util.Map;public abstract class MethodMessage {
protected final Map<String,Object> params;
protected final OrderService orderService;public MethodMessage(Map<String,Object> params,OrderService orderService) {
this.params=params;
this.orderService=orderService;
}public abstract void execute();
}
public class FindOrderDetailsMessage extends MethodMessage{public FindOrderDetailsMessage(Map<String, Object> params, OrderService orderService) {
super(params, orderService);
}@Override
public void execute() {
Future<String> realFuture=orderService.findOrderDetails((Long) params.get("orderId"));
ActiveFuture<String> activeFuture=(ActiveFuture<String>)params.get("activeFuture");
try {
String result=realFuture.get();
activeFuture.finish(result);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}}
import java.util.Map;public class OrderMessage extends MethodMessage{public OrderMessage(Map<String, Object> params, OrderService orderService) {
super(params, orderService);
}@Override
public void execute() {
String account=(String)params.get("account");
long orderId=(long)params.get("orderId");
orderService.order(account, orderId);
}}
import java.util.LinkedList;public class ActiveMessageQueue {
private final LinkedList<MethodMessage> message=new LinkedList<>();public ActiveMessageQueue() {
System.out.println("active Object Thread is build");
new ActiveDaemonThread(this).start();
}public void offer(MethodMessage methodMessage) {
synchronized(this) {
message.add(methodMessage);
System.out.println("processing in ActiveMessageQueue.offer method");
this.notify();
}
}protected MethodMessage take() {
synchronized(this) {
while(message.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("processing in ActiveMessageQueue.take method");
return message.removeFirst();
}
}
}
public class ActiveDaemonThread extends Thread{
private final ActiveMessageQueue queue;public ActiveDaemonThread(ActiveMessageQueue queue) {
super("ActiveDaemonThread");
this.queue=queue;
this.setDaemon(true);
}@Override
public void run() {
for(;;) {
System.out.println(" active daemon thread is running");
MethodMessage methodMessage=this.queue.take();
methodMessage.execute();
}
}}
public class OrderServiceFactory {
private final static ActiveMessageQueue activeMessageQueue=new ActiveMessageQueue();private OrderServiceFactory() {}public static OrderService toActiveObject(OrderService orderService) {
return new OrderServiceProxy(orderService,activeMessageQueue);
}}
public class AOtest {public static void main(String[] args) {
OrderService orderService=OrderServiceFactory.toActiveObject(new OrderServiceImpl());
orderService.order("aACC", 5);
Future<String> f=orderService.findOrderDetails(50);
try {
System.out.println("future result is "+f.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Return immedately");
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}}

    第二种方法:第一种方法在接口方法非常多的情况下,会需要封装成很多的Message类。基于JDK动态代理的方式,可以实现一种更加通用的Active Objects。这种方式下,可以将任意接口方法转换w Active Objects,如果接口方法有返回值,必须返回Future类型才可以,否则会抛出IllegalActiveMethod异常。示例代码如下:

public class IllegalActivedException extends Exception{
public IllegalActivedException(String message) {
super(message);
}
}
public interface OrderService {
Future<String> findOrderDetails(long orderId);
void order(String account,long orderId);
}
public class OrderServiceImpl implements OrderService{@ActiveMethod
@Override
public Future<String> findOrderDetails(long orderId) {
return FutureService.<Long,String>newService().submit(input->{
try {
System.out.println("process the orderId->"+orderId);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "The order details Information";
},orderId);
}@ActiveMethod
@Override
public void order(String account, long orderId) {
try {
System.out.println("process the orderId->"+orderId+" , account->"+account);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}}
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;import java.lang.annotation.Retention;
import java.lang.annotation.Target;@Retention(RUNTIME)
@Target(METHOD)
public @interface ActiveMethod {}
public class ActiveMessage {
private final Object[] objects;
private final Method method;
private final ActiveFuture<Object> future;
private final Object service;private ActiveMessage(Builder builder) {
this.objects=builder.objects;
this.method=builder.method;
this.future=builder.future;
this.service=builder.service;
}public void execute() {
Object result;
try {
result = method.invoke(service, objects);
if(future!=null) {
Future<?> realFuture= (Future<?>)result;
Object realResult=realFuture.get();
future.finish(realResult);
}
} catch (Exception e) {
if(future!=null) {
future.finish(null);
}
e.printStackTrace();
}
}static class Builder{
private Object[] objects;
private Method method;
private ActiveFuture<Object> future;
private Object service;public Builder useMethod(Method method) {
this.method=method;
return this;
}public Builder returnFuture(ActiveFuture<Object> future) {
this.future=future;
return this;
}public Builder withObjects(Object[] objects) {
this.objects=objects;
return this;
}public Builder forService(Object service) {
this.service=service;
return this;
}public ActiveMessage build() {
return new ActiveMessage(this);
}}}
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;import org.multithread.future.Future;public class ActiveServiceFactory {
private final static ActiveMessageQueue queue=new ActiveMessageQueue();public static <T> T active(T instance) {
Object proxy=Proxy.newProxyInstance(instance.getClass().getClassLoader(),
instance.getClass().getInterfaces(),
new ActiveInvocationHandler<>(instance));
return (T)proxy;
}private static class ActiveInvocationHandler<T> implements InvocationHandler{
private final T instance;ActiveInvocationHandler(T instance){
this.instance=instance;
}private void checkMethod(Method method) throws IllegalActivedException{
if(!isReturnVoidType(method)&&!isReturnFutureType(method)) {
throw new IllegalActivedException("the method ["+method.getName()+"] return type must be void/Future");
}
}private boolean isReturnVoidType(Method method) {
return method.getReturnType().equals(Void.TYPE);
}private boolean isReturnFutureType(Method method) {
return method.getReturnType().isAssignableFrom(Future.class);
}@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(method.isAnnotationPresent(ActiveMethod.class)) {
this.checkMethod(method);
ActiveMessage.Builder builder=new ActiveMessage.Builder();
builder.useMethod(method).withObjects(args).forService(instance);
Object result=null;
if(this.isReturnFutureType(method)) {
result=new ActiveFuture<>();
builder.returnFuture((ActiveFuture) result);
}
queue.offer(builder.build());
return result;
}else {
return method.invoke(instance, args);
}
}}}
import java.util.LinkedList;public class ActiveMessageQueue {
private final LinkedList<ActiveMessage> activeMessages=new LinkedList<>();public ActiveMessageQueue() {
System.out.println("active Object Thread is build");
new ActiveDaemonThread(this).start();
}public void offer(ActiveMessage activeMessage) {
synchronized(this) {
this.activeMessages.add(activeMessage);
System.out.println("processing in ActiveMessageQueue.offer method");
this.notify();
}
}public ActiveMessage takeActive() {
synchronized(this) {
while(this.activeMessages.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return this.activeMessages.removeFirst();
}
}}
public class ActiveDaemonThread extends Thread{
private final ActiveMessageQueue queue;public ActiveDaemonThread(ActiveMessageQueue queue) {
super("ActiveDaemonThread");
this.queue=queue;
this.setDaemon(true);
}@Override
public void run() {
for(;;) {
System.out.println(" active daemon thread is running");
ActiveMessage activeMessage=this.queue.takeActive();
activeMessage.execute();
}
}
}
public class AOtest {public static void main(String[] args) {
ActiveServiceFactory activeInstance=new ActiveServiceFactory();
OrderService orderService=activeInstance.active(new OrderServiceImpl());
orderService.order("aACC", 5);
Future<String> f=orderService.findOrderDetails(150);
try {
System.out.println("future result is "+f.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Return immedately");
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}}

相关文章:

  • Pandas实践_分组
  • 020 OpenCV 轮廓、外接圆、外接矩形
  • Postman-脚本自动化及定时执行脚本(7)
  • Mac 中文版 Navicat Premium 16 下载安装详细教程
  • LeetCode力扣每日一题(Java):58、最后一个单词的长度
  • 内网服务器部署maven私服简记
  • 微信小程序map视野发生改变时切换定位点
  • 案例055:基于微信小程序的四六级词汇
  • 服务器被DDOS攻击如何用高防ip去防护?
  • 小程序开发实战案例四 | 小程序标题栏如何设置
  • 微信小程序:布局样式
  • Python实现多种图像锐化方法:拉普拉斯算子和Sobel算子
  • WPF-一个简单登录界面
  • 跨境电商如何利用跨境客服软件提升销售额
  • 使用FiscoBcos的Go-SDK【1:配置go环境】
  • -------------------- 第二讲-------- 第一节------在此给出链表的基本操作
  • [译]前端离线指南(上)
  • Android 控件背景颜色处理
  • es6--symbol
  • maven工程打包jar以及java jar命令的classpath使用
  • PHP的类修饰符与访问修饰符
  • Python 反序列化安全问题(二)
  • python 装饰器(一)
  • Spring Boot快速入门(一):Hello Spring Boot
  • webgl (原生)基础入门指南【一】
  • XML已死 ?
  • 程序员最讨厌的9句话,你可有补充?
  • 给初学者:JavaScript 中数组操作注意点
  • 浅谈Kotlin实战篇之自定义View图片圆角简单应用(一)
  • 如何实现 font-size 的响应式
  • 如何选择开源的机器学习框架?
  • 微信小程序填坑清单
  • 问题之ssh中Host key verification failed的解决
  • 线上 python http server profile 实践
  • 一、python与pycharm的安装
  • PostgreSQL 快速给指定表每个字段创建索引 - 1
  • 基于django的视频点播网站开发-step3-注册登录功能 ...
  • ​DB-Engines 11月数据库排名:PostgreSQL坐稳同期涨幅榜冠军宝座
  • #[Composer学习笔记]Part1:安装composer并通过composer创建一个项目
  • #if 1...#endif
  • #微信小程序:微信小程序常见的配置传值
  • (0)Nginx 功能特性
  • (4) openssl rsa/pkey(查看私钥、从私钥中提取公钥、查看公钥)
  • (4)STL算法之比较
  • (AtCoder Beginner Contest 340) -- F - S = 1 -- 题解
  • (ctrl.obj) : error LNK2038: 检测到“RuntimeLibrary”的不匹配项: 值“MDd_DynamicDebug”不匹配值“
  • (Python) SOAP Web Service (HTTP POST)
  • (附源码)ssm基于web技术的医务志愿者管理系统 毕业设计 100910
  • (附源码)计算机毕业设计SSM基于健身房管理系统
  • (附源码)计算机毕业设计高校学生选课系统
  • (规划)24届春招和25届暑假实习路线准备规划
  • (每日持续更新)jdk api之FileFilter基础、应用、实战
  • (转载)CentOS查看系统信息|CentOS查看命令
  • (转载)OpenStack Hacker养成指南
  • *2 echo、printf、mkdir命令的应用