Netty优化-rpc
Netty优化-rpc
- 1.3 RPC 框架
- 1)准备工作
1.3 RPC 框架
1)准备工作
这些代码可以认为是现成的,无需从头编写练习
为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息
@Data
public abstract class Message implements Serializable {// 省略旧的代码public static final int RPC_MESSAGE_TYPE_REQUEST = 101;public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;static {// ...messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);}}
请求消息
@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {/*** 调用的接口全限定名,服务端根据它找到实现*/private String interfaceName;/*** 调用接口中的方法名*/private String methodName;/*** 方法返回类型*/private Class<?> returnType;/*** 方法参数类型数组*/private Class[] parameterTypes;/*** 方法参数值数组*/private Object[] parameterValue;public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {super.setSequenceId(sequenceId);this.interfaceName = interfaceName;this.methodName = methodName;this.returnType = returnType;this.parameterTypes = parameterTypes;this.parameterValue = parameterValue;}@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_REQUEST;}
}
响应消息
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {/*** 返回值*/private Object returnValue;/*** 异常值*/private Exception exceptionValue;@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_RESPONSE;}
}
服务器架子
@Slf4j
public class RpcServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc 请求消息处理器,待实现RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
服务器 handler
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response = new RpcResponseMessage();response.setSequenceId(message.getSequenceId());try {// 获取真正的实现对象HelloService service = (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));// 获取要调用的方法Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());// 调用方法Object invoke = method.invoke(service, message.getParameterValue());// 调用成功response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();// 调用异常response.setExceptionValue(e);}// 返回结果ctx.writeAndFlush(response);}public static void main(String[] args) throws Exception {RpcRequestMessage message = new RpcRequestMessage(1,"cn.itcast.rpc.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"});// 获取真正的实现对象HelloService service = (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object invoke = method.invoke(service, message.getParameterValue());System.out.println(invoke);}
}
客户端架子
@Slf4j
public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc 响应消息处理器,待实现RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(1,"cn.itcast.rpc.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"})).addListener(promise -> {if (!promise.isSuccess()) {Throwable cause = promise.cause();log.error("error", cause);}});channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}
客户端handler
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);}
}
服务器端的 service 获取
public class ServicesFactory {static Properties properties;static Map<Class<?>, Object> map = new ConcurrentHashMap<>();static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);Set<String> names = properties.stringPropertyNames();for (String name : names) {if (name.endsWith("Service")) {Class<?> interfaceClass = Class.forName(name);Class<?> instanceClass = Class.forName(properties.getProperty(name));map.put(interfaceClass, instanceClass.newInstance());}}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new ExceptionInInitializerError(e);}}public static <T> T getService(Class<T> interfaceClass) {return (T) map.get(interfaceClass);}
}
相关配置 application.properties
serializer.algorithm=Json
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl
业务类
public interface HelloService {String sayHello(String name);
}public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String msg) {//int i = 1 / 0;return "你好, " + msg;}
}
计数器
public abstract class SequenceIdGenerator {private static final AtomicInteger id = new AtomicInteger();public static int nextId() {return id.incrementAndGet();}
}
客户端最终代码
@Slf4j
public class RpcClientManager {public static void main(String[] args) throws IOException {HelloService service = getProxyService(HelloService.class);System.out.println(service.sayHello("zhangsan"));System.out.println(service.sayHello("lisi"));}public static <T> T getProxyService(Class<T> serviceClass) {ClassLoader classLoader = serviceClass.getClassLoader();Class<?>[] interfaces = new Class[]{serviceClass};Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {// 1. 将方法调用转换为 消息对象int sequenceId = SequenceIdGenerator.nextId();RpcRequestMessage msg = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 准备一个空 Promise 对象,来接收结果 指定 promise 对象异步接收结果线程DefaultPromise<Object> promise = new DefaultPromise<>(channel.eventLoop());RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);log.debug("主线程开始等待...");// 4. 阻塞等待 promise 结果promise.await();log.debug("主线程放行...");if (promise.isSuccess()) {return promise.getNow();} else {throw new RuntimeException(promise.cause());}});return (T) o;}private static Channel channel = null;private static final Object LOCK = new Object();public static Channel getChannel() {if (channel != null) {return channel;}synchronized (LOCK) {if (channel != null) {return channel;}initChannel();return channel;}}//初始化channelprivate static void initChannel() {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});try {channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().addListener(future -> {group.shutdownGracefully();});} catch (Exception e) {log.error("client error", e);}}
}
服务端不变,服务端handler
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response = new RpcResponseMessage();response.setSequenceId(message.getSequenceId());try {HelloService service = (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object invoke = method.invoke(service, message.getParameterValue());response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();String msg = e.getCause().getMessage();response.setExceptionValue(new Exception("远程调用出错:" + msg));}log.error(response.toString());ctx.writeAndFlush(response);}
}
执行结果
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程开始等待...
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - 16909060, 1, 0, 102, 1, 228
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - RpcResponseMessage(super=Message(sequenceId=1, messageType=102), returnValue=你好, zhangsan, exceptionValue=null)
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程放行...
你好, zhangsan
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程开始等待...
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - 16909060, 1, 0, 102, 2, 224
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - RpcResponseMessage(super=Message(sequenceId=2, messageType=102), returnValue=你好, lisi, exceptionValue=null)
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程放行...
你好, lisi