当前位置: 首页 > news >正文

网络编程:使用UDP实现数据帧的接收

目录

1、需求

2、逻辑处理

3、代码实现

4、总结


1、需求

        使用java代码实现数据帧的接收需求,完成数据到数据库的存储。

2、逻辑处理

        由于udp传输不保证数据的有序性、可靠性,所以在做业务开发的时候,要程序员自己考虑需求完善udp的缺陷。因此我们定义几个字段保证数据的有序,对于可靠性未进行考虑。

3、代码实现

        监听端口55000,等待数据的发送:

@Service
public class UDPReceiverSuper {private static final int BUFFER_SIZE = 1044;private static final int HEAD_SIZE = 20;private static final int DATA_SIZE = 1024;private static final int MAX_BUFFER_SIZE = 1 * 1024 * 1024; // 缓冲器大小设置为1MBprivate static final double MAX_BUFFER_THRESHOLD = 0.8; // 缓冲区阈值private static final int MAX_BUFFER_INDEX = (int) (MAX_BUFFER_SIZE * MAX_BUFFER_THRESHOLD / DATA_SIZE); //缓冲区元素数量阈值//timestampToBufferMap存储的是:时间戳,TreeMap,TreeMap里面存储的是:当前包序号,接受数据的对象private Map<Long, ConcurrentHashMap<Long, DatagramPacket>> timestampToBufferMap = new HashMap();private long timeStamp;private boolean isClosed = false;// 使用阻塞队列作为缓冲区private long errorPackageSum = 0;private int frameNum;        //用于帧计数Thread udpReceiverThread;@Value("${GK.GKOriginalDataFilePath}")private String GKOriginalDataFilePath; // 管控原始数据文件存储路径@Value("${HP.storagePath}")private String storagePath;    //高性能数据接收路径@Autowiredprivate INetworkConfigService networkConfigService;@Autowiredprivate DealGkDataServiceSuperWithNewThread dealGkDataServiceSuperWithNewThread;@Autowiredprivate DealGkDataServiceSuperWithThreadPoolAndBuffer dealGkDataServiceSuperWithThreadPoolAndBuffer;@Autowiredprivate DealGkDataServiceSuperWithThreadPool dealGkDataServiceSuperWithThreadPool;@Autowiredprivate SaveGKOriginalDataService saveGKOriginalDataService;@Autowiredprivate SaveGKOriginalDataServiceWithBuffer saveGKOriginalDataServiceWithBuffer;public UDPReceiverSuper() {}public void start() {//创建父文件夹Path path = Paths.get(storagePath);if (Files.notExists(path)) {try {Files.createDirectories(path);System.out.println("Directories created successfully: " + storagePath);} catch (IOException e) {System.err.println("Failed to create directories: " + e.getMessage());}} else {System.out.println("Directories already exist: " + storagePath);}// 启动接收数据的线程if (udpReceiverThread == null) {udpReceiverThread = new Thread(new Receiver());udpReceiverThread.start();}}//数据帧头定义private class PackageHeader {public long id = 0;public long timestamp = 0;public long totalPackageNum = 0;public long currentPackageNum = 0;public long dataLength = 0;}// 接收数据的线程private class Receiver implements Runnable {@Overridepublic void run() {NetworkConfig networkConfig = networkConfigService.selectNetworkConfigById(1L);String port = networkConfig.getPort();String ip = networkConfig.getIp();System.out.println("实际未绑定ip");System.out.println("ip: " + ip + "  port: " + port);try {DatagramSocket ds = new DatagramSocket(Integer.parseInt(port));if (ds != null) {isClosed = false;}System.out.println("udpReceiver_ds: " + ds + "   等待接收数据......");while (true) {if (isClosed) {break;}byte[] receiveData = new byte[BUFFER_SIZE];   //接收数据缓存区,大小为1044DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);ds.receive(receivePacket);     //接收数据byte[] data1 = receivePacket.getData();frameNum++;
//                    System.out.println("当前帧数为: " + frameNum);   //todo 用于打印输出当前接收到的帧数ByteBuffer byteBuffer1 = ByteBuffer.allocate(data1.length);byteBuffer1.put(data1);byteBuffer1.flip();   //flip操作是将:写模式切换到读模式,将‘limit’设置为当前的‘position’,将‘position’重置为0
//                    ByteBuffer byteBuffer1 = ByteBuffer.allocate(receiveData.length);
//                    byteBuffer1.put(receiveData);
//                    byteBuffer1.flip();   //flip操作是将:写模式切换到读模式,将‘limit’设置为当前的‘position’,将‘position’重置为0/*两种情况:1、接收管控  2、接收高性能*/byteBuffer1.order(ByteOrder.LITTLE_ENDIAN);  //转化为小端int headerType = byteBuffer1.getInt();       //得到设备标识符if (headerType == 1) {/*解决方法一: 这个是采用多线程的方式进行写入数据到txt文件*/saveGKOriginalDataService.saveGKOriginalData(receivePacket, GKOriginalDataFilePath);/*解决方法二:直接处理管控的函数*/
//                        dealGkDataServiceSuperWithNewThread.dealGKApi(byteBuffer1);/*解决方法三:在UDPReceiverSuperSuper类里面,并且要在NetworkConfigController中进行函数*//*解决方法四:使用线程池的方式解决,每接收一帧,就开始处理*/
//                        dealGkDataServiceSuperWithThreadPoolAndBuffer.dealGKApi(byteBuffer1);/*解决方法五:直接开启线程进行处理数据,这个方法是对的*/dealGkDataServiceSuperWithThreadPool.dealGKApi(byteBuffer1);/*解决方法六:将接收到的数据存储到缓冲区中,然后使用多线程从缓冲区中取出,方法实现写在method3中*/} 

        业务处理逻辑:

package com.ruoyi.system.service.customService.dealGKService_ThreadPool;import com.ruoyi.system.domain.*;
import com.ruoyi.system.mapper.*;
import com.ruoyi.system.utlis.CSVFileUtil;
import com.ruoyi.system.utlis.ConvertUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;/*** @Author 不要有情绪的  ljy* @Date 2024/3/22 15:54* @Description:*/
@Service
public class DealGkDataServiceSuperWithThreadPool {@Value("${GK.statusLogPath}")private String stateLogFilePath;  //状态日志存储路径@Autowiredprivate OperateLogInfoMapper operateLogInfoMapper;@Autowiredprivate InstructLogInfoMapper instructLogInfoMapper;@Autowiredprivate Instruct1553bLogInfoMapper instruct1553bLogInfoMapper;@Autowiredprivate InstructRs422LogInfoMapper instructRs422LogInfoMapper;@Autowiredprivate StateLogInfoMapper stateLogInfoMapper;@Autowiredprivate ErrorLogInfoMapper errorLogInfoMapper;int frontTimeFlag = -1;private int currentReceivedFrameNum = 0;  //用于计算管控接收帧数private Map<Integer, BlockingQueue<byte[]>> currTimeFlagToQueue = new HashMap<>();int threadNum = 1;private ExecutorService threadPool;private Counter counter = new Counter();public DealGkDataServiceSuperWithThreadPool() {int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;System.out.println("核心线程数:   " + corePoolSize);int maximumPoolSize = corePoolSize * 2;long keepAliveTime = 60L;threadPool = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.SECONDS,new LinkedBlockingQueue<>());}public void dealGKApi(ByteBuffer byteBuffer) {currentReceivedFrameNum++;if (currentReceivedFrameNum % 1000 == 0) {System.out.println("管控当前接收的数据帧数(每隔1000打印一次): " + currentReceivedFrameNum);}int currTimeFlag = byteBuffer.getInt();       //当前时间标识,用于区分是否丢包int packagesTotalNum = byteBuffer.getInt();   //表示当前发送的包总数int currPackageNum = byteBuffer.getInt();     //表示当前包序号int messageLength = byteBuffer.getInt();     //消息长度int remainingBytes = byteBuffer.remaining();byte[] remainingData = new byte[messageLength];   //用于获取日志长度 1024if (remainingBytes > 0) {byteBuffer.get(remainingData); // 获取剩余的字节threadPool.submit(new GKRunnable(remainingData));}}class GKRunnable implements Runnable {private byte[] bytes;public GKRunnable(byte[] remainingData) {this.bytes = remainingData;}@Overridepublic void run() {System.out.println("新启动一个线程用于处理管控日志,当前线程名:  " + Thread.currentThread().getName());ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);   //将不用byte数组放到ByteBuffer中byteBuffer.put(bytes);dealWithAssembledFrame(byteBuffer);}private void dealWithAssembledFrame(ByteBuffer byteBuffer) {byteBuffer.flip();byteBuffer.order(ByteOrder.LITTLE_ENDIAN);  //转化为小端dealLogPart(byteBuffer);}/*todo 要写的内容为:出现错误就要将异常抛出,并且找到帧头的位置*/private void dealLogPart(ByteBuffer byteBuffer) {
//            /*找到帧头的位置*/
//            searchFrameHeaderAndSetPosition(byteBuffer);while (byteBuffer.position() != byteBuffer.capacity() && (byteBuffer.position() + 4) < byteBuffer.capacity()) {try {/*找到帧头的位置*/searchFrameHeaderAndSetPosition(byteBuffer);int startPosition = byteBuffer.position();//获取开始的长度//每个日志都包含的部分byte[] bytes2 = new byte[2];byteBuffer.get(bytes2);String frameHeaderInfo = ConvertUtil.byteArrayToHexString(bytes2);   //日志帧头字段short logLength = byteBuffer.getShort();                             //日志长度int logNumber = byteBuffer.getShort();                             //日志编号
//                    System.out.println(logNumber + logLength);byte logType = byteBuffer.get();//byte转化为字符串                     //日志类型String logTypeStr = String.format("%02X", logType);int time = byteBuffer.getInt();                //日志时间//根据日志类型,选择处理日志剩余方式if ("01".equals(logTypeStr) || "02".equals(logTypeStr) || "03".equals(logTypeStr)) {byte sendingAndReceivingBit = byteBuffer.get();byte sourceDeviceId = byteBuffer.get();byte targetDeviceId = byteBuffer.get();//得到日志内容长度    先将日志长度转化为十进制,然后减掉帧头信息,减掉日志长度,减掉日志编号,减掉日志类型、减掉时间,减掉校验码,减掉发送接收位,减掉源原设备ID,减掉目标设备IDint logContentLength = logLength - 2 - 2 - 2 - 1 - 4 - 1 - 1 - 1 - 1;String instructDataContent = null;if ("01".equals(logTypeStr)) {  //子地址+数据内容,子地址占1个字节byte[] bytes = new byte[1];byteBuffer.get(bytes);String subAddress = ConvertUtil.byteArrayToHexString(bytes);int dataContentLength = logContentLength - 1;byte[] bytes1 = new byte[dataContentLength];// 输出剩余字节的十六进制表示,即指令数据内容byteBuffer.get(bytes1);instructDataContent = ConvertUtil.byteArrayToHexString(bytes1);dealInstruct1553bLog(byteBuffer, startPosition, logLength, logNumber & 0xFFFF, time, sendingAndReceivingBit, sourceDeviceId, targetDeviceId, subAddress, instructDataContent);} else if ("02".equals(logTypeStr)) { //can指令类型(1个字节)  + ID(子地址)+数据内容,ID占4个字节byte[] bytes = new byte[1];byteBuffer.get(bytes);String canInstructType = ConvertUtil.byteArrayToHexString(bytes);byte[] bytes1 = new byte[4];byteBuffer.get(bytes1);String subAddress = ConvertUtil.byteArrayToHexString(bytes1);int dataContentLength = logContentLength - 1 - 4;byte[] ID = new byte[dataContentLength];// 输出剩余字节的十六进制表示,即指令数据内容byteBuffer.get(ID);instructDataContent = ConvertUtil.byteArrayToHexString(ID);dealInstructCANLog(byteBuffer, startPosition, logLength, logNumber & 0xFFFF, time, sendingAndReceivingBit, sourceDeviceId, targetDeviceId, canInstructType, subAddress, instructDataContent);} else if ("03".equals(logTypeStr)) { //数据内容byte[] bytes1 = new byte[logContentLength];byteBuffer.get(bytes1);instructDataContent = ConvertUtil.byteArrayToHexString(bytes1);dealInstructRs422Log(byteBuffer, startPosition, logLength, logNumber & 0xFFFF, time, sendingAndReceivingBit, sourceDeviceId, targetDeviceId, instructDataContent);}} else if ("04".equals(logTypeStr)) {//存储到excel表中
//                        dealStateLog(byteBuffer, startPosition, time);//存储到数据库中dealStateLog(byteBuffer, startPosition, logLength, logNumber & 0xFFFF, time);} else if ("05".equals(logTypeStr)) {dealOperateLog(byteBuffer, startPosition, logLength, logNumber & 0xFFFF, time);} else if ("06".equals(logTypeStr)) {dealErrorLog(byteBuffer, startPosition, logLength, logNumber & 0xFFFF, time);} else {System.out.println("as;dasd");}} catch (Exception e) {if (e.getCause().toString().contains("SQLIntegrityConstraintViolationException")) {counter.increment();}else {e.printStackTrace();//                        System.err.println(e.getMessage());}//在处理过程报错了,那么就认为日志内容错误,就要重新寻找帧头的位置searchFrameHeaderAndSetPosition(byteBuffer);}}System.out.println("线程名称:" + Thread.currentThread().getName() + "  线程id:" + Thread.currentThread().getId() + "  处理完成!");System.out.println("========"+counter.getCount());}private void dealStateLog(ByteBuffer byteBuffer, int startPosition, short logLength, int logNumber, int time) {byte[] bytes92 = new byte[92];  //将92个长度传递到byte数组中byteBuffer.get(bytes92);int endPosition = byteBuffer.position();byte checkCodeByte = calculateCheckCode(byteBuffer, startPosition, endPosition);byte checkCode = byteBuffer.get();  //用于校验日志的正确和完整性boolean logIsIntegrity = logIsIntegrity(checkCode, checkCodeByte);if (logIsIntegrity) {StateLogInfo stateLogInfo = new StateLogInfo();stateLogInfo.setLogLength(Short.toString(logLength));stateLogInfo.setLogNumber(Long.valueOf(logNumber));stateLogInfo.setTime(Long.valueOf(time));
//                Integer size = stateLogInfoMapper.searchDataIsDuplicate(stateLogInfo);
//                if (size > 0) { //判断数据库中是否已经存在
//                    return;
//                }//将参数设置到stateLogInfo实例中setParameter(bytes92, stateLogInfo);stateLogInfoMapper.insertStateLogInfo(stateLogInfo);} else {logIsNotIntegrity(byteBuffer, startPosition, endPosition);}}private void searchFrameHeaderAndSetPosition(ByteBuffer byteBuffer) {/*找到帧头的位置*/int frameHeaderPosition = findFrameHeaderPosition(byteBuffer, hexStringToByteArray("eafc"));if (frameHeaderPosition != -1) {byteBuffer.position(frameHeaderPosition);} else {System.out.println("未找到帧头为 eafc 的位置");return;   //说明从头查到尾都没有查到,就直接退出}}/*** 判断日志是否完整的函数,如果不完整,那么就要找下一个eafc帧头的位置** @param byteBuffer*/private void logIsNotIntegrity(ByteBuffer byteBuffer, int startPosition, int endPosition) {System.out.println("日志不完整!丢掉");//如果日志不完整,那么就要找到下一帧的头位置/*找到帧头的位置*/searchFrameHeaderAndSetPosition(byteBuffer);}/*** 判断是否为完整日志的函数isIntegrity(日志长度 + 帧头 - 校验码)** @param a* @param b* @return*/private boolean logIsIntegrity(byte a, byte b) {return a == b;}/*** 计算校验码的函数** @param byteBuffer* @param startPosition* @param endPosition* @return*/private byte calculateCheckCode(ByteBuffer byteBuffer, int startPosition, int endPosition) {int length = endPosition - startPosition;byteBuffer.position(startPosition);byte res = 0;for (int i = 0; i < length; i++) {byte b = byteBuffer.get();res += b;}return res;}/*** 通过遍历的方式得到帧头的位置** @param byteBuffer* @param frameHeader* @return*/public int findFrameHeaderPosition(ByteBuffer byteBuffer, byte[] frameHeader) {// 记录当前位置int startPosition = byteBuffer.position();// 遍历 ByteBuffer 从当前位置开始搜索帧头for (int i = startPosition; i < byteBuffer.limit() - frameHeader.length + 1; i++) {// 标记当前位置byteBuffer.position(i);boolean found = true;for (int j = 0; j < frameHeader.length; j++) {if (byteBuffer.get() != frameHeader[j]) {found = false;break;}}if (found) {// 恢复 ByteBuffer 的当前位置
//                byteBuffer.position(startPosition);return i; // 返回帧头 'e' 的位置}}// 恢复 ByteBuffer 的当前位置
//        byteBuffer.position(startPosition);// 如果没有找到,返回 -1 表示未找到return -1;}/*** 将十六进制字符串转换为字节数组的方法** @param hexString* @return*/public byte[] hexStringToByteArray(String hexString) {int len = hexString.length();byte[] byteArray = new byte[len / 2]; // 每两个十六进制字符表示一个字节for (int i = 0; i < len; i += 2) {byteArray[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4)+ Character.digit(hexString.charAt(i + 1), 16));}return byteArray;}}}class Counter {private LongAdder count = new LongAdder();public void increment() {count.increment();}public int getCount() {return count.intValue();}
}

解释:以上代码是采用多线程,将接收到的数据帧解析为日志,并将日志存储到数据库中,根据日志类型不同,存储到不同的数据库表中。

4、总结

        采用DatagramPacket实现数据帧接收准备,将接收到的每一帧数据解析为日志,每一帧都交给一个线程去处理,为节省线程频繁创建和销毁的资源,采用多线程。

学习之所以会想睡觉,是因为那是梦开始的地方。
ଘ(੭ˊᵕˋ)੭ (开心) ଘ(੭ˊᵕˋ)੭ (开心)ଘ(੭ˊᵕˋ)੭ (开心)ଘ(੭ˊᵕˋ)੭ (开心)ଘ(੭ˊᵕˋ)੭ (开心)
                                                                                                        ------不写代码不会凸的小刘

相关文章:

  • AI安全研究滞后?清华专家团来支招
  • VMamba: Visual State Space Model论文笔记
  • [PyTorch]:加速Pytorch 模型训练的几种方法(几行代码),最快提升八倍(附实验记录)
  • 入门PHP就来我这(纯干货)05
  • 【CSS】什么是CSS的columns属性
  • 全面了解机器学习
  • 数据结构 - C/C++ - 队列
  • 《刺客信条:英灵殿》找不到emp.dll文件的多种解决方法,亲测有效
  • java 代码块
  • 【C++】main函数及返回值深度解析
  • antd中Select大数据分页触底刷新处理优化
  • 虚拟纪念展馆建设的重大意义:重新定义纪念活动的未来
  • C++——探索智能指针的设计原理
  • 深入Ruby缓存:掌握Memcached的使用艺术
  • 【ARM系列】GIC600AE功能安全
  • 【5+】跨webview多页面 触发事件(二)
  • Go 语言编译器的 //go: 详解
  • IDEA 插件开发入门教程
  • Java 网络编程(2):UDP 的使用
  • JavaScript 一些 DOM 的知识点
  • javascript 总结(常用工具类的封装)
  • java多线程
  • Js基础——数据类型之Null和Undefined
  • python学习笔记-类对象的信息
  • VirtualBox 安装过程中出现 Running VMs found 错误的解决过程
  • 从@property说起(二)当我们写下@property (nonatomic, weak) id obj时,我们究竟写了什么...
  • 记录:CentOS7.2配置LNMP环境记录
  • 简单数学运算程序(不定期更新)
  • 精益 React 学习指南 (Lean React)- 1.5 React 与 DOM
  • 聊一聊前端的监控
  • 七牛云 DV OV EV SSL 证书上线,限时折扣低至 6.75 折!
  • 如何邀请好友注册您的网站(模拟百度网盘)
  • 一起来学SpringBoot | 第十篇:使用Spring Cache集成Redis
  • 因为阿里,他们成了“杭漂”
  • ionic入门之数据绑定显示-1
  • puppet连载22:define用法
  • Redis4.x新特性 -- 萌萌的MEMORY DOCTOR
  • 长三角G60科创走廊智能驾驶产业联盟揭牌成立,近80家企业助力智能驾驶行业发展 ...
  • ​埃文科技受邀出席2024 “数据要素×”生态大会​
  • ​一文看懂数据清洗:缺失值、异常值和重复值的处理
  • # 数据结构
  • #1014 : Trie树
  • #includecmath
  • #NOIP 2014# day.1 T2 联合权值
  • (04)Hive的相关概念——order by 、sort by、distribute by 、cluster by
  • (2)STM32单片机上位机
  • (动手学习深度学习)第13章 计算机视觉---图像增广与微调
  • (附源码)小程序儿童艺术培训机构教育管理小程序 毕业设计 201740
  • (转)es进行聚合操作时提示Fielddata is disabled on text fields by default
  • (转)树状数组
  • (最全解法)输入一个整数,输出该数二进制表示中1的个数。
  • ***linux下安装xampp,XAMPP目录结构(阿里云安装xampp)
  • .net 4.0发布后不能正常显示图片问题
  • .net core 使用js,.net core 使用javascript,在.net core项目中怎么使用javascript
  • .net core开源商城系统源码,支持可视化布局小程序