当前位置: 首页 > news >正文

Netty优化-rpc

Netty优化-rpc

      • 1.3 RPC 框架
        • 1)准备工作

1.3 RPC 框架

1)准备工作

这些代码可以认为是现成的,无需从头编写练习

为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息

@Data
public abstract class Message implements Serializable {// 省略旧的代码public static final int RPC_MESSAGE_TYPE_REQUEST = 101;public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;static {// ...messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);}}

请求消息

@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {/*** 调用的接口全限定名,服务端根据它找到实现*/private String interfaceName;/*** 调用接口中的方法名*/private String methodName;/*** 方法返回类型*/private Class<?> returnType;/*** 方法参数类型数组*/private Class[] parameterTypes;/*** 方法参数值数组*/private Object[] parameterValue;public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {super.setSequenceId(sequenceId);this.interfaceName = interfaceName;this.methodName = methodName;this.returnType = returnType;this.parameterTypes = parameterTypes;this.parameterValue = parameterValue;}@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_REQUEST;}
}

响应消息

@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {/*** 返回值*/private Object returnValue;/*** 异常值*/private Exception exceptionValue;@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_RESPONSE;}
}

服务器架子

@Slf4j
public class RpcServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc 请求消息处理器,待实现RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}

服务器 handler

@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response = new RpcResponseMessage();response.setSequenceId(message.getSequenceId());try {// 获取真正的实现对象HelloService service = (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));// 获取要调用的方法Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());// 调用方法Object invoke = method.invoke(service, message.getParameterValue());// 调用成功response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();// 调用异常response.setExceptionValue(e);}// 返回结果ctx.writeAndFlush(response);}public static void main(String[] args) throws Exception {RpcRequestMessage message = new RpcRequestMessage(1,"cn.itcast.rpc.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"});// 获取真正的实现对象HelloService service = (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object invoke = method.invoke(service, message.getParameterValue());System.out.println(invoke);}
}

客户端架子

@Slf4j
public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc 响应消息处理器,待实现RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(1,"cn.itcast.rpc.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"})).addListener(promise -> {if (!promise.isSuccess()) {Throwable cause = promise.cause();log.error("error", cause);}});channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}

客户端handler

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);}
}

服务器端的 service 获取

public class ServicesFactory {static Properties properties;static Map<Class<?>, Object> map = new ConcurrentHashMap<>();static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);Set<String> names = properties.stringPropertyNames();for (String name : names) {if (name.endsWith("Service")) {Class<?> interfaceClass = Class.forName(name);Class<?> instanceClass = Class.forName(properties.getProperty(name));map.put(interfaceClass, instanceClass.newInstance());}}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new ExceptionInInitializerError(e);}}public static <T> T getService(Class<T> interfaceClass) {return (T) map.get(interfaceClass);}
}

相关配置 application.properties

serializer.algorithm=Json
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl

业务类

public interface HelloService {String sayHello(String name);
}public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String msg) {//int i = 1 / 0;return "你好, " + msg;}
}

计数器

public abstract class SequenceIdGenerator {private static final AtomicInteger id = new AtomicInteger();public static int nextId() {return id.incrementAndGet();}
}

客户端最终代码

@Slf4j
public class RpcClientManager {public static void main(String[] args) throws IOException {HelloService service = getProxyService(HelloService.class);System.out.println(service.sayHello("zhangsan"));System.out.println(service.sayHello("lisi"));}public static <T> T getProxyService(Class<T> serviceClass) {ClassLoader classLoader = serviceClass.getClassLoader();Class<?>[] interfaces = new Class[]{serviceClass};Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {// 1. 将方法调用转换为 消息对象int sequenceId = SequenceIdGenerator.nextId();RpcRequestMessage msg = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 准备一个空 Promise 对象,来接收结果             指定 promise 对象异步接收结果线程DefaultPromise<Object> promise = new DefaultPromise<>(channel.eventLoop());RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);log.debug("主线程开始等待...");// 4. 阻塞等待 promise 结果promise.await();log.debug("主线程放行...");if (promise.isSuccess()) {return promise.getNow();} else {throw new RuntimeException(promise.cause());}});return (T) o;}private static Channel channel = null;private static final Object LOCK = new Object();public static Channel getChannel() {if (channel != null) {return channel;}synchronized (LOCK) {if (channel != null) {return channel;}initChannel();return channel;}}//初始化channelprivate static void initChannel() {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});try {channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().addListener(future -> {group.shutdownGracefully();});} catch (Exception e) {log.error("client error", e);}}
}

服务端不变,服务端handler

@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response = new RpcResponseMessage();response.setSequenceId(message.getSequenceId());try {HelloService service = (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object invoke = method.invoke(service, message.getParameterValue());response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();String msg = e.getCause().getMessage();response.setExceptionValue(new Exception("远程调用出错:" + msg));}log.error(response.toString());ctx.writeAndFlush(response);}
}

执行结果

16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程开始等待...
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - 16909060, 1, 0, 102, 1, 228
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - RpcResponseMessage(super=Message(sequenceId=1, messageType=102), returnValue=你好, zhangsan, exceptionValue=null)
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程放行...
你好, zhangsan
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程开始等待...
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - 16909060, 1, 0, 102, 2, 224
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - RpcResponseMessage(super=Message(sequenceId=2, messageType=102), returnValue=你好, lisi, exceptionValue=null)
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程放行...
你好, lisi

相关文章:

  • idea 提升效率的常用快捷键 汇总
  • Kafka KRaft模式探索
  • 帆软report JS实现填报控件只能填写一次
  • mac电脑怎么永久性彻底删除文件?
  • 二十三种设计模式全面解析-抽象工厂模式:创造无限可能的工厂之道
  • 首次cmake 多目录构建失败
  • 图像无损放大画质修复工具 Topaz Photo AI「Mac」
  • 基于闪电搜索算法的无人机航迹规划-附代码
  • 设计模式之适配器模式
  • 【开源框架】Glide的图片加载流程
  • Spring Kafka生产者实现
  • React-快速搭建开发环境
  • Unity C#中LuaTable、LuaArrayTable、LuaDictTable中数据的增删改查
  • SAP从入门到放弃系列之QM动态修改(Dynamic Modification)
  • Element UI + Vue 新增和编辑共用表单校验无法清除问题(已解决)
  • [微信小程序] 使用ES6特性Class后出现编译异常
  • Javascripit类型转换比较那点事儿,双等号(==)
  • nodejs实现webservice问题总结
  • Objective-C 中关联引用的概念
  • PHP 使用 Swoole - TaskWorker 实现异步操作 Mysql
  • QQ浏览器x5内核的兼容性问题
  • Spring核心 Bean的高级装配
  • 初识MongoDB分片
  • 从setTimeout-setInterval看JS线程
  • 基于axios的vue插件,让http请求更简单
  • 记一次删除Git记录中的大文件的过程
  • 爬虫模拟登陆 SegmentFault
  • 如何使用 JavaScript 解析 URL
  • 探索 JS 中的模块化
  • Java数据解析之JSON
  • 如何用纯 CSS 创作一个菱形 loader 动画
  • ​sqlite3 --- SQLite 数据库 DB-API 2.0 接口模块​
  • # Swust 12th acm 邀请赛# [ A ] A+B problem [题解]
  • $ is not function   和JQUERY 命名 冲突的解说 Jquer问题 (
  • (1)(1.8) MSP(MultiWii 串行协议)(4.1 版)
  • (android 地图实战开发)3 在地图上显示当前位置和自定义银行位置
  • (C)一些题4
  • (Spark3.2.0)Spark SQL 初探: 使用大数据分析2000万KF数据
  • (附源码)ssm基于jsp的在线点餐系统 毕业设计 111016
  • (切换多语言)vantUI+vue-i18n进行国际化配置及新增没有的语言包
  • (幽默漫画)有个程序员老公,是怎样的体验?
  • (转) RFS+AutoItLibrary测试web对话框
  • .bat文件调用java类的main方法
  • .net Application的目录
  • .NET Core 2.1路线图
  • .Net MVC + EF搭建学生管理系统
  • .net wcf memory gates checking failed
  • .NET 设计模式—适配器模式(Adapter Pattern)
  • .NET/C# 在 64 位进程中读取 32 位进程重定向后的注册表
  • .NET简谈互操作(五:基础知识之Dynamic平台调用)
  • .NET轻量级ORM组件Dapper葵花宝典
  • .NET中统一的存储过程调用方法(收藏)
  • .set 数据导入matlab,设置变量导入选项 - MATLAB setvaropts - MathWorks 中国
  • /etc/sudoers (root权限管理)
  • @cacheable 是否缓存成功_Spring Cache缓存注解