Netty(10)协议设计与解析(IdleStateHandler:空闲检测器、心跳)
为什么需要协议?
TCP/IP 中消息传输基于流的方式,没有边界。
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
协议举例
redis 协议
客户端代码
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class TestRedis {
/*
向redis发送这样的一条命令:set name zhangsan
redis将上面的一条命令看做数组,依次发送下面的内容
1. 规定*号后面的就是数组元素的个数
*3:元素的个数为3
2. 发送每个命令每个键值的长度,多个命令之间使用回车+换行进行分隔
$3: set命令是三个字节
set
$4: key的长度是四个字节
name
$8: value的长度是八个字节
zhangsan
下面演示的客户端怎么发送:
*/
public static void main(String[] args) {
/*
13: 回车
10: 换行
*/
final byte[] LINE = {13, 10};
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//因为使用的是英文和拼音,所以不考虑编码问题
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes());
buf.writeBytes(LINE);//回车换行
buf.writeBytes("$3".getBytes());//set命令的长度
buf.writeBytes(LINE);//回车换行
buf.writeBytes("set".getBytes());//set命令
buf.writeBytes(LINE);//回车换行
buf.writeBytes("$4".getBytes());//key的长度
buf.writeBytes(LINE);
buf.writeBytes("name".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$8".getBytes());//value的长度
buf.writeBytes(LINE);
buf.writeBytes("zhangsan".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
//接收redis返回到的结果
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
});
//连接本地的redis服务器
ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
上面的客户端启动后,会向本机的redis发送数据,最终可以在本机的redis读取到set的内容“zhangsan”
http协议
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
@Slf4j
public class TestHttp {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
/*
http协议的编解码器:
HttpServerCodec: 作为http协议的服务器端
Codec: 即包含解密又包含编码
Decode:解码
EnCode:编码
二者组合在一起便是Codec
当http请求来了过后,便会从上往下经过一个个的handler
到了HttpServerCodec时,便会对请求进行解码,解码的内容将会被分为下面的两部分
1. HttpRequest:包含请求行的请求头
2. HttpContent:代表请求体
*/
//HttpServerCodec: 即是入站处理器,又是出站处理器
ch.pipeline().addLast(new HttpServerCodec());
//解码后,对解码后的内容进行处理
/*
SimpleChannelInboundHandler:入站处理器,只关心某一种类型的消息;
可以对消息进行区分,进行选择处理
SimpleChannelInboundHandler<HttpRequest>():只关心HttpRequest类型的消息;
其他类型消息便会被跳过
*/
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
//msg: 代表请求头和请求行
// 获取请求行
log.debug(msg.uri());
// 给浏览器返回响应
/*
DefaultFullHttpResponse: 响应结果便会结果出站处理器HttpServerCodec;
编码成为ByteBuf,符合http协议的响应
protocolVersion: http协议的版本
HttpResponseStatus: 响应的状态码
*/
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
byte[] bytes = "<h1>Hello, world!</h1>".getBytes();
//加入响应头,设置其响应的消息长度
response.headers().setInt(CONTENT_LENGTH, bytes.length);
//写入一些响应的内容给response对象
response.content().writeBytes(bytes);
// 写回响应
ctx.writeAndFlush(response);
}
});
/*
//不进行消息的区分,使用instanceof判断消息的类型
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("{}", msg.getClass());
//判断msg是否是HttpRequest类型,是的话便去处理请求行,请求头
if (msg instanceof HttpRequest) { // 请求行,请求头
} else if (msg instanceof HttpContent) { //请求体
//判断msg是否是HttpRequest类型,是的话便去处理请求体
}
}
});
*/
}
});
//监听本机的8080端口,当使用浏览器访问localhost:8080时,当前这个方法便会被触发,进行上面的处理
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
当浏览器访问:localhost:8080
这个链接后,浏览器页面将会显示写入的html内容
在控制台也会进行相应的输出
自定义协议
自定义协议要素
-
魔数
,用来在第一时间判定是否是无效数据包
发送的数据,一般前几个就是魔术值
通过魔术值,来判断这个消息是不是需要的,可以理解为对暗号 -
版本号
,可以支持协议的升级 -
序列化算法
,消息正文到底采用哪种序列化反序列化方式,可以由此扩展
例如:将消息正文格式为(**
常见的序列化方式
**为)json、protobuf(谷歌出品的)、hessian、jdk -
指令类型
,是登录、注册、单聊、群聊… 跟业务相关消息是什么类型
-
请求序号
,为了双工通信,提供异步能力消息发送出去,接收的时候不一定是按照发送出去的顺序接收,所以需要一个序号去判断这次响应对应的是之前的那一次请求
-
正文长度
通过正文长度去判断需要读取的字节数是多少
-
消息正文
实战
自定义消息的类型
所有消息的公共父类(Message)
@Data
public abstract class Message implements Serializable {
/**
* 根据消息类型字节,获得对应的消息 class
* @param messageType 消息类型字节
* @return 消息 class
*/
public static Class<? extends Message> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}
private int sequenceId;
private int messageType;
//获取消息类型,其他继承这个方法的重新该方法,如此,子类便可以通过调用这个方法知道是什么消息类型
public abstract int getMessageType();
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
public static final int PongMessage = 15;
/**
* 请求类型 byte 值
*/
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
/**
* 响应类型 byte 值
*/
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
static {
messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
}
各种消息类型的类
LoginRequestMessage
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message {
private String username;
private String password;
public LoginRequestMessage() {
}
public LoginRequestMessage(String username, String password) {
this.username = username;
this.password = password;
}
@Override
public int getMessageType() {
return LoginRequestMessage;
}
}
LoginResponseMessage
@Data
@ToString(callSuper = true)
public class LoginResponseMessage extends AbstractResponseMessage {
public LoginResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return LoginResponseMessage;
}
}
ChatRequestMessage
@Data
@ToString(callSuper = true)
public class ChatRequestMessage extends Message {
private String content;
private String to;
private String from;
public ChatRequestMessage() {
}
public ChatRequestMessage(String from, String to, String content) {
this.from = from;
this.to = to;
this.content = content;
}
@Override
public int getMessageType() {
return ChatRequestMessage;
}
}
GroupCreateRequestMessage
@Data
@ToString(callSuper = true)
public class GroupCreateRequestMessage extends Message {
private String groupName;
private Set<String> members;
public GroupCreateRequestMessage(String groupName, Set<String> members) {
this.groupName = groupName;
this.members = members;
}
@Override
public int getMessageType() {
return GroupCreateRequestMessage;
}
}
GroupCreateRequestMessage
@Data
@ToString(callSuper = true)
public class GroupCreateRequestMessage extends Message {
private String groupName;
private Set<String> members;
public GroupCreateRequestMessage(String groupName, Set<String> members) {
this.groupName = groupName;
this.members = members;
}
@Override
public int getMessageType() {
return GroupCreateRequestMessage;
}
}
GroupMembersRequestMessage
@Data
@ToString(callSuper = true)
public class GroupMembersRequestMessage extends Message {
private String groupName;
public GroupMembersRequestMessage(String groupName) {
this.groupName = groupName;
}
@Override
public int getMessageType() {
return GroupMembersRequestMessage;
}
}
GroupJoinRequestMessage
@Data
@ToString(callSuper = true)
public class GroupJoinRequestMessage extends Message {
private String groupName;
private String username;
public GroupJoinRequestMessage(String username, String groupName) {
this.groupName = groupName;
this.username = username;
}
@Override
public int getMessageType() {
return GroupJoinRequestMessage;
}
}
GroupQuitRequestMessage
@Data
@ToString(callSuper = true)
public class GroupQuitRequestMessage extends Message {
private String groupName;
private String username;
public GroupQuitRequestMessage(String username, String groupName) {
this.groupName = groupName;
this.username = username;
}
@Override
public int getMessageType() {
return GroupQuitRequestMessage;
}
}
PingMessage
public class PingMessage extends Message {
@Override
public int getMessageType() {
return PingMessage;
}
}
PongMessage
public class PongMessage extends Message {
@Override
public int getMessageType() {
return PongMessage;
}
}
自定义消息的编解码器
@Slf4j
//加了Sharable就说明以后在使用这个类进行实例化时,可以将其抽取出来共享, 但是不能继承ByteToMessageCodec这个方法
//@ChannelHandler.Sharable
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 1. 4 字节的魔数:类似与一种暗号
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式, 约定:jdk: 0 , json: 1
out.writeByte(0);//jdk的序列化方式
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节的请求序号
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充, 满足2的n次方
out.writeByte(0xff);
// 6. 获取内容的字节数组: 序列化为二进制的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容, 在写入内容前面的长度部分是固定的:15+1
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//获取魔数值:四个字节
int magicNum = in.readInt();
//获取版本:一字节
byte version = in.readByte();
//获取序列化算法:一字节
byte serializerType = in.readByte();
//获取消息类型:一字节
byte messageType = in.readByte();
//消息序号:四字节
int sequenceId = in.readInt();
//填充,对齐的一字节
in.readByte();
//长度:四字节
int length = in.readInt();
//分配缓冲区
byte[] bytes = new byte[length];
//通过长度去读取多少个字节
in.readBytes(bytes, 0, length);//内容
// if (messageType == 0){//如果是什么类型,就进行反序列化
// //。。。。
// }
//将内容进行反序列化
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
//将其转换为Message
Message message = (Message) ois.readObject();
//将内容进行打印
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
//将netty解码后的结果存入对应的参数里面去,不存入的话接下来的handler将拿不到结果
out.add(message);
}
}
测试
编码
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
public class TestMessageCodec {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
//打印日志
new LoggingHandler(),
new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
channel.writeInbound(message);
}
}
解码
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
public class TestMessageCodec {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
// decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);
//入站
channel.writeInbound(buf );
}
}
黏包/半包问题解决
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
public class TestMessageCodec {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
//解决黏包/半包问题
//最大长度1024,长度字段偏移量12,长度字段长度四字节,长度字段不需要调整0,不需要去除前面的部分0
new LengthFieldBasedFrameDecoder(
1024, 12, 4, 0, 0),
new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
// decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);
//将buf进行切片为两部分
ByteBuf s1 = buf.slice(0, 100);
ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
//writeInbound会在ByteBuf写完后,调用release方法,这时其引用记便变为0,
//这时候ByteBuf占用的内存都被释放掉了
s1.retain(); // 引用计数+1 = 2
//模拟半包
channel.writeInbound(s1); // release = 引用计数 - 1 = 1
channel.writeInbound(s2);
}
}
@Sharable什么情况下才能使用
-
当 handler 不保存状态时,就可以安全地在多线程下被共享
-
但要注意对于编解码器类,
不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
, 如果使用了便会报下面的错
-
如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类
正确的编解码代码
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
@Slf4j
@ChannelHandler.Sharable
/**
* 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(0);
//out.writeByte(Config.getSerializerAlgorithm().ordinal());
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream ();
oos.wirteObject(msg);
byte[] bytes = bos.toByteArray();
//byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
//序列化算法
byte serializerAlgorithm = in.readByte(); // 0 或 1
byte messageType = in.readByte(); // 0,1,2...
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
// 确定具体消息类型
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectOutputStream oos = new ObjectOutputStream (new ByteArrayOutputStream(bytes));
Message message = (Message)ois.readObject();
/*
// 找到反序列化算法
Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
// 确定具体消息类型
Class<? extends Message> messageClass = Message.getMessageClass(messageType);
Message message = algorithm.deserialize(messageClass, bytes);
*/
// log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
// log.debug("{}", message);
out.add(message);
}
}
config
public abstract class Config {
static Properties properties;
static {
try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}
public static int getServerPort() {
String value = properties.getProperty("server.port");
if(value == null) {
return 8080;
} else {
return Integer.parseInt(value);
}
}
public static Serializer.Algorithm getSerializerAlgorithm() {
String value = properties.getProperty("serializer.algorithm");
if(value == null) {
return Serializer.Algorithm.Java;
} else {
return Serializer.Algorithm.valueOf(value);
}
}
}
application.properties
serializer.algorithm=Json
cn.abc.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl
聊天室业务说明
Service
/**
* 用户管理接口
*/
public interface UserService {
/**
* 登录
* @param username 用户名
* @param password 密码
* @return 登录成功返回 true, 否则返回 false
*/
boolean login(String username, String password);
}
方便起见,这里直接使用map去存储用户信息
public class UserServiceMemoryImpl implements UserService {
private Map<String, String> allUserMap = new ConcurrentHashMap<>();
{
allUserMap.put("zhangsan", "123");
allUserMap.put("lisi", "123");
allUserMap.put("wangwu", "123");
allUserMap.put("zhaoliu", "123");
allUserMap.put("qianqi", "123");
}
@Override
public boolean login(String username, String password) {
String pass = allUserMap.get(username);
if (pass == null) {
return false;
}
return pass.equals(password);
}
}
session
/**
* 会话管理接口
*/
public interface Session {
/**
* 绑定会话
* @param channel 哪个 channel 要绑定会话
* @param username 会话绑定用户
*/
void bind(Channel channel, String username);
/**
* 解绑会话
* @param channel 哪个 channel 要解绑会话
*/
void unbind(Channel channel);
/**
* 获取属性
* @param channel 哪个 channel
* @param name 属性名
* @return 属性值
*/
Object getAttribute(Channel channel, String name);
/**
* 设置属性
* @param channel 哪个 channel
* @param name 属性名
* @param value 属性值
*/
void setAttribute(Channel channel, String name, Object value);
/**
* 根据用户名获取 channel
* @param username 用户名
* @return channel
*/
Channel getChannel(String username);
}
这里使用两个map集合去记录 用户名 跟 channel 的对应关系
public class SessionMemoryImpl implements Session {
private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();
@Override
public void bind(Channel channel, String username) {
usernameChannelMap.put(username, channel);
channelUsernameMap.put(channel, username);
channelAttributesMap.put(channel, new ConcurrentHashMap<>());
}
@Override
public void unbind(Channel channel) {
String username = channelUsernameMap.remove(channel);
usernameChannelMap.remove(username);
channelAttributesMap.remove(channel);
}
@Override
public Object getAttribute(Channel channel, String name) {
return channelAttributesMap.get(channel).get(name);
}
@Override
public void setAttribute(Channel channel, String name, Object value) {
channelAttributesMap.get(channel).put(name, value);
}
@Override
public Channel getChannel(String username) {
return usernameChannelMap.get(username);
}
@Override
public String toString() {
return usernameChannelMap.toString();
}
}
/**
* 聊天组会话管理接口
*/
public interface GroupSession {
/**
* 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
* @param name 组名
* @param members 成员
* @return 成功时返回组对象, 失败返回 null
*/
Group createGroup(String name, Set<String> members);
/**
* 加入聊天组
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group joinMember(String name, String member);
/**
* 移除组成员
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeMember(String name, String member);
/**
* 移除聊天组
* @param name 组名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeGroup(String name);
/**
* 获取组成员
* @param name 组名
* @return 成员集合, 如果群不存在或没有成员会返回 empty set
*/
Set<String> getMembers(String name);
/**
* 获取组成员的 channel 集合, 只有在线的 channel 才会返回
* @param name 组名
* @return 成员 channel 集合
*/
List<Channel> getMembersChannel(String name);
}
SessionFactory
public abstract class SessionFactory {
private static Session session = new SessionMemoryImpl();
public static Session getSession() {
return session;
}
}
GroupSession
/**
* 聊天组会话管理接口
*/
public interface GroupSession {
/**
* 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
* @param name 组名
* @param members 成员
* @return 成功时返回组对象, 失败返回 null
*/
Group createGroup(String name, Set<String> members);
/**
* 加入聊天组
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group joinMember(String name, String member);
/**
* 移除组成员
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeMember(String name, String member);
/**
* 移除聊天组
* @param name 组名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeGroup(String name);
/**
* 获取组成员
* @param name 组名
* @return 成员集合, 如果群不存在或没有成员会返回 empty set
*/
Set<String> getMembers(String name);
/**
* 获取组成员的 channel 集合, 只有在线的 channel 才会返回
* @param name 组名
* @return 成员 channel 集合
*/
List<Channel> getMembersChannel(String name);
}
GroupSessionMemoryImpl
实现GroupSession里面的方法
public class GroupSessionMemoryImpl implements GroupSession {
private final Map<String, Group> groupMap = new ConcurrentHashMap<>();
@Override
public Group createGroup(String name, Set<String> members) {
Group group = new Group(name, members);
return groupMap.putIfAbsent(name, group);
}
@Override
public Group joinMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().add(member);
return value;
});
}
@Override
public Group removeMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().remove(member);
return value;
});
}
@Override
public Group removeGroup(String name) {
return groupMap.remove(name);
}
@Override
public Set<String> getMembers(String name) {
return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
}
@Override
public List<Channel> getMembersChannel(String name) {
return getMembers(name).stream()
.map(member -> SessionFactory.getSession().getChannel(member))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
ProcotolFrameDecoder
处理黏包和半包的代码一般是不会进行变化的,所以将这种处理的方法抽取出来,在这里提供一种无参构造
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
public ProcotolFrameDecoder() {
this(1024, 12, 4, 0, 0);
}
public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
过程代码
连接建立后,就向服务器发送一个登录的请求(发一个登录消息),服务器去验证这个消息是否正确
通过多个客户端,连接同一个服务器端,对二者的客户端进行发送消息,进行通信
客户端代码
@Slf4j
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
//使两个线程之间进行通信
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
//登录的状态
AtomicBoolean LOGIN = new AtomicBoolean(false);
AtomicBoolean EXIT = new AtomicBoolean(false);
Scanner scanner = new Scanner(System.in);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(MESSAGE_CODEC);
// 用来判断是不是 读空闲时间过长,或 写空闲时间过长
// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 用来触发特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
IdleStateEvent event = (IdleStateEvent) evt;
// 触发了写空闲事件
if (event.state() == IdleState.WRITER_IDLE) {
// log.debug("3s 没有写数据了,发送一个心跳包");
ctx.writeAndFlush(new PingMessage());
}
}
});
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
// 接收响应消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("msg: {}", msg);
if ((msg instanceof LoginResponseMessage)) {
LoginResponseMessage response = (LoginResponseMessage) msg;
if (response.isSuccess()) {
// 如果登录成功, 设置登录状态为true
LOGIN.set(true);
}
// 使记数减1:唤醒 system in 线程
WAIT_FOR_LOGIN.countDown();
}
}
// 在连接建立后触发 active 事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 负责接收用户在控制台的输入,负责向服务器发送各种消息
/*
创建一个新线程,不创建新线程的话,其是在Nio的线程里面执行,会阻塞
*/
new Thread(() -> {
System.out.println("请输入用户名:");
String username = scanner.nextLine();
if(EXIT.get()){
return;
}
System.out.println("请输入密码:");
String password = scanner.nextLine();
if(EXIT.get()){
return;
}
// 构造消息对象
LoginRequestMessage message = new LoginRequestMessage(username, password);
System.out.println(message);
// 使用ctx对象,发送消息
ctx.writeAndFlush(message);
System.out.println("等待后续操作...");
try {
//阻塞线程,直到记数减为0(当channelRead里调用WAIT_FOR_LOGIN.countDown()时,该线程将被唤醒)
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 如果登录失败
if (!LOGIN.get()) {
//关闭channel
ctx.channel().close();
return;
}
//如果登录没有失败,则向下运行
while (true) {
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
String command = null;
try {
command = scanner.nextLine();
} catch (Exception e) {
break;
}
if(EXIT.get()){
return;
}
String[] s = command.split(" ");
switch (s[0]){
case "send":
ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
break;
case "gsend":
ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
break;
case "gcreate":
//组里面的成员
Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));
set.add(username); // 加入自己
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));
break;
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
break;
case "quit":
ctx.channel().close();
return;
}
}
}, "system in").start();
}
// 在连接断开时触发
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("连接已经断开,按任意键退出..");
EXIT.set(true);
}
// 在出现异常时触发
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.debug("连接已经断开,按任意键退出..{}", cause.getMessage());
EXIT.set(true);
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
服务器端代码
@Slf4j
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();
GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();
GroupMembersRequestMessageHandler GROUP_MEMBERS_HANDLER = new GroupMembersRequestMessageHandler();
GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();
GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler();
QuitHandler QUIT_HANDLER = new QuitHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
// 用来判断是不是 读空闲时间过长,或 写空闲时间过长
// 5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 用来触发特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
IdleStateEvent event = (IdleStateEvent) evt;
// 触发了读空闲事件
if (event.state() == IdleState.READER_IDLE) {
log.debug("已经 5s 没有读到数据了");
ctx.channel().close();
}
}
});
ch.pipeline().addLast(LOGIN_HANDLER);
ch.pipeline().addLast(CHAT_HANDLER);
ch.pipeline().addLast(GROUP_CREATE_HANDLER);
ch.pipeline().addLast(GROUP_JOIN_HANDLER);
ch.pipeline().addLast(GROUP_MEMBERS_HANDLER);
ch.pipeline().addLast(GROUP_QUIT_HANDLER);
ch.pipeline().addLast(GROUP_CHAT_HANDLER);
ch.pipeline().addLast(QUIT_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
handler
将服务器端的处理handler抽取处理,避免服务器端里面的代码过多
LoginRequestMessageHandler
登录的信息处理
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
boolean login = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage message;
if (login) {
SessionFactory.getSession().bind(ctx.channel(), username);
message = new LoginResponseMessage(true, "登录成功");
} else {
message = new LoginResponseMessage(false, "用户名或密码不正确");
}
ctx.writeAndFlush(message);
}
}
ChatRequestMessageHandler
聊天的信息处理
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
String to = msg.getTo();
Channel channel = SessionFactory.getSession().getChannel(to);
// 在线
if (channel != null) {
channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
}
// 不在线
else {
ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或者不在线"));
}
}
}
GroupCreateRequestMessageHandler
创建群
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
//成员
Set<String> members = msg.getMembers();
// 群管理器
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.createGroup(groupName, members);
//创建成功
if (group == null) {
// 发送成功消息给群的创建者
ctx.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创建成功"));
// 发送拉群消息:给每一个群员发送消息
//得到该群里面的所有在线人员
List<Channel> channels = groupSession.getMembersChannel(groupName);
//遍历channel集合,对这些channel发送消息
for (Channel channel : channels) {
channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));
}
} else {
//给创建者发送失败消息
ctx.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "已经存在"));
}
}
}
GroupJoinRequestMessageHandler
@ChannelHandler.Sharable
public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
Group group = GroupSessionFactory.getGroupSession().joinMember(msg.getGroupName(), msg.getUsername());
if (group != null) {
ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群加入成功"));
} else {
ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));
}
}
}
GroupMembersRequestMessageHandler
@ChannelHandler.Sharable
public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
Set<String> members = GroupSessionFactory.getGroupSession()
.getMembers(msg.getGroupName());
ctx.writeAndFlush(new GroupMembersResponseMessage(members));
}
}
GroupQuitRequestMessageHandler
@ChannelHandler.Sharable
public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
Group group = GroupSessionFactory.getGroupSession().removeMember(msg.getGroupName(), msg.getUsername());
if (group != null) {
ctx.writeAndFlush(new GroupJoinResponseMessage(true, "已退出群" + msg.getGroupName()));
} else {
ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));
}
}
}
GroupChatRequestMessageHandler
群组里面发送信息
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
List<Channel> channels = GroupSessionFactory.getGroupSession()
.getMembersChannel(msg.getGroupName());
//遍历所有群成员的channel,对这些channel发送消息
for (Channel channel : channels) {
//发消息
channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));
}
}
}
QuitHandler
处理退出事件
- 异常退出
- 正常退出
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
// 当连接断开时触发 inactive 事件
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 已经断开", ctx.channel());
}
// 当出现异常时触发
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage());
}
}
空闲检测: 连接假死(IdleStateHandler
)
原因
- 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
- 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
- 应用程序线程阻塞,无法进行数据读写
问题
- 假死的连接占用的资源不能自动释放
- 向假死的连接发送数据,得到的反馈是发送超时
服务器端解决
- 怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此
策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死
/*
参数1:检查读的空闲时间超过对应的秒数(默认是秒)
参数2:检查写的空闲时间超过对应的秒数
参数3:读写都空闲的时间超过对应的秒数
参数4:时间的单位
*/
/*
下面这个设置的意思是:
5秒钟没有收到客户端发过来的数据(这时候就认为读的空闲时间太长)
不关心其他的空闲
5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
*/
new IdleStateHandler(5, 0, 0);
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 用来触发特殊事件: 如读空闲事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
//获取 IdleState 事件
IdleStateEvent event = (IdleStateEvent) evt;
// 触发了读空闲事件
if (event.state() == IdleState.READER_IDLE) {
log.debug("已经 5s 没有读到数据了");
ctx.channel().close();
}
}
});
心跳包
客户端定时心跳
- 客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器
// 用来判断是不是 读空闲时间过长,或 写空闲时间过长
// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 用来触发特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
IdleStateEvent event = (IdleStateEvent) evt;
// 触发了写空闲事件
if (event.state() == IdleState.WRITER_IDLE) {
// log.debug("3s 没有写数据了,发送一个心跳包");
ctx.writeAndFlush(new PingMessage());
}
}
});