当前位置: 首页 > news >正文

java多线程和长连接,三方转换通信的实践(1)——转换端程序

为什么80%的码农都做不了架构师?>>>   hot3.png

        由于工作网络安全关系,我们一个项目的公共服务网站部署在外网,而数据库(并非只供外网访问,内网也要使用数据库)部署在内网,重要的是外网不能直接访问内网,这就打破原来网站的访问顺序了,本来网站页面查询由后台直接访问数据库,现在只能通过多线程+转换平台+数据库端服务程序组成了。

    首先由数据库端服务程序建立socket访问部署在外网的“转换平台”(Server)端建立长连接,并定时检查长连接情况,开启断线重连。网站端每一次查询也是向“转换平台”(Server)端建立短连接,然后由“转换平台”(Server)端转发至数据库端服务程序并接收数据库端服务程序查询结果转发给网站端。

下面首先展示“转换平台”(Server)端代码:(服务端由1/2/3/4/5/6组成)

1、

public class serverSocketForApp {
	 //记录日志
	 Log log = LogFactory.getLog(this .getClass()); 
	 CommonTool commonTool=new CommonTool();
	 private static MyBlockingQueue myBlockingQueue=new MyBlockingQueue(10);
	 public static  Socket socketWZ;
	 
	 
	 public static void main(String[] args) {
		try{
			final int portWZ=8888;
	        final int portYW = 9999; 
	        ServerSocket serverSocketYW= new ServerSocket(portYW);
	        //建立线程一直监听服务端连接
	       // System.out.println("建立线程一直监听服务端连接");
	        AcceptFromYW acceptFromYW =new AcceptFromYW(serverSocketYW,myBlockingQueue);
	        Thread threadYW=new Thread(acceptFromYW);
	        threadYW.setName("监听服务端连接线程");
	        threadYW.start();
	       // System.out.println("监听服务端连接线程开启...");
	        Thread checkSocket=new Thread(new checkSocket(myBlockingQueue));
	        checkSocket.setName("检查线程+++");
	        checkSocket.start();
	        System.out.println("检查线程开启。。。。");
	        ServerSocket serverSocketWZ= new ServerSocket(portWZ);
			while(true){
				 //System.out.println("==================监听网站端连接开始=====================");
				 socketWZ=serverSocketWZ.accept();//监听网站端连接
				 System.out.println("myBlockingQueue.size():"+myBlockingQueue.size()+";ListElement:"+myBlockingQueue.getListElement());
				 while(myBlockingQueue.size()<=0){
					// System.out.println("myBlockingQueue.size():"+myBlockingQueue.size()+";休眠1秒");
					 Thread.sleep(1000);
				 }
			  TranFromWZToYW tranFromWZToYW=new TranFromWZToYW(socketWZ,myBlockingQueue);//转换端转发线程
			  new Thread(tranFromWZToYW).start();
				 
			}
			
		}catch (Exception e) {
			// TODO: handle exception
		}
			
	}

}

上面代码很简单,主要是开启三个线程:1、threadYW(监听线程,用于监听数据库端连接请求),2、checkSocket(检查线程,用于定时检查线程安全队列myBlockingQueue存储的数据库端发起的socket长连接是否正常,并剔除非正常连接),3、TranFromWZToYW(转发线程,用于接收网站端请求——>转发至数据库端——>接收数据库端相应结果

——>转发至网站端)。

2、下面首先介绍的是线程安全队列:myBlockingQueue,这个是我自己封装的一个arraylist。

public class MyBlockingQueue {

	@SuppressWarnings("rawtypes")
	private List list=new ArrayList<String>();
	
	private int MaxSize;
	//自定义对象锁
	private Object lock=new Object();
	
	public MyBlockingQueue(int MaxSize){
		this.MaxSize=MaxSize;
		//System.out.println("线程"+Thread.currentThread().getName()+"已经完成初始化。。。大小是:"+MaxSize);
	}
	
	public int size(){
		return this.list.size();
	}
	
	//使用自定义锁同步代码块
	@SuppressWarnings("unchecked")
	public void put(Socket socket) {//Socket socket
		synchronized (lock) {
			try {
				//判断是否满了
				if(this.list.size()==MaxSize){
					//System.out.println("线程"+Thread.currentThread().getName()+"当前队列已满,put等待中。。。");
					lock.wait();//当队列满时阻塞队列,释放锁
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
			this.list.add(socket);
			//System.out.println("线程"+Thread.currentThread().getName()+"向队列中添加了元素:"+socket);
			lock.notifyAll();//当队列未满时唤醒阻塞队列其他线程
		}
	}
	//获取socket
	public Socket get() {
		Socket socket=null;
		synchronized (lock) {
			try {
				if(this.list.size()==0){
					//System.out.println("线程"+Thread.currentThread().getName()+"当前队列已经空了,get等待中。。。");
					lock.wait();
				}else{
					 socket=(Socket)list.get(0);
					//String socket=list.get(0).toString();
					list.remove(0);//移出list
					//System.out.println("线程"+Thread.currentThread().getName()+"取到了元素:"+socket);
					lock.notifyAll();
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
			return socket;
		}
	}
   //获取所有socket,测试用
	public String getListElement(){
		String strtemp="";
		for(int i=0;i<this.list.size();i++){
			strtemp+=list.get(i)+";";
		}
		return strtemp;
	}
}

3、再次介绍的是CommonTool 类,用于接收、发送、检查的方法都放在里面,由于我这边发送的都是message(封装的XML格式),所以后面转发什么的都是这个类,如果想测试的话,可以用这个类注释掉的代码发送字符串(String)。

public class CommonTool {
	//记录日志
	Log log = LogFactory.getLog(this .getClass()); 
	//PublicAppService publicAppService=new PublicAppService();
	/**
	* 判断是否断开连接,断开返回true,没有返回false
	* @param socket
	* @return
	*/
	public  Boolean isServerClose(Socket socket){
		try{
			socket.sendUrgentData(0);//发送1个字节的紧急数据,默认情况下,服务器端没有开启紧急数据处理,不影响正常通信
			return false;
		}catch(Exception se){
			return true;
		}
	}
	
	/**
	* 发送数据,发送失败返回false,发送成功返回true
	* @param csocket
	* @param message
	* @return
	*/
	public synchronized  Boolean Send(Socket csocket,Message message){
		ObjectOutputStream os=null;
		XmlEntity appxmlentity=new XmlEntity();
		try{
			 if(null!=message){
             	os=new ObjectOutputStream(csocket.getOutputStream());
					//发数据到服务器端    
					 os.writeObject(message);
					 os.flush();
 			}
			 return true;
		}catch(Exception se){
			return false;
		}
	}
	/**
	* 读取数据,返回字符串类型
	* @param csocket
	* @return
	*/
	public  synchronized  Message ReadText(Socket csocket){
		ObjectInputStream ois = null;
		Object obj=null;
		String strObj="";
		boolean flag=true;
		Message  message=new Message();
		try{
			while(flag){
				ois = new ObjectInputStream(new BufferedInputStream(csocket.getInputStream()));
				obj=ois.readObject();
				message=(Message)obj;
    			strObj=obj.toString();
    			if(strObj.indexOf("维持连接包")!=-1 || strObj.isEmpty() || strObj==""){
              		flag=true;
              	}else{
              		break;
              	}
			}
			
			//csocket.setSoTimeout(sotimeout);
			/*InputStream input = csocket.getInputStream();
			BufferedReader in = new BufferedReader(new InputStreamReader(input));
			char[] sn = new char[1000];
			in.read(sn);
			String sc = new String(sn);
			return sc;*/
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}catch(IOException se){
			return null;
		}
		return message;
	} 
}

  4、监听线程:threadYW,很简单的一个实现Runnable接口的类

public class AcceptFromYW implements Runnable {
	int portYW;
	ServerSocket serverSocket;
	Socket socket;
	MyBlockingQueue myBlockingQueue;
	public AcceptFromYW(){
		
	}
	public AcceptFromYW(ServerSocket serverSocket,MyBlockingQueue myBlockingQueue){
		this.serverSocket=serverSocket;
		this.myBlockingQueue=myBlockingQueue;
	}
	public synchronized void run() {
		try {
			while(true){
				 //调用accept()方法开始监听,等待客户端的连接
				 socket=serverSocket.accept();
				 socket.setKeepAlive(true);
				 if(socket!=null){
					 myBlockingQueue.put(socket);
					 try {
						Thread.sleep(50);
					} catch (InterruptedException e1) {
						e1.printStackTrace();
					}
					 //System.out.println("服务端接收到业务端连接请求:"+socket);
				 }
			}
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

}

 5、检查线程:checkSocket,也是实现Runnable接口的类。

public class checkSocket implements Runnable {
	CommonTool commonTool=new CommonTool();
	Socket socket;
	MyBlockingQueue myBlockingQueue;
	/**
	 * @param socket
	 */
	public checkSocket(MyBlockingQueue myBlockingQueue) {
		super();
		this.myBlockingQueue = myBlockingQueue;
	}

	public void run() {
		while(true){
			try {
				if(myBlockingQueue.size()>0){//线程安全队列存在socket才会去检查
					for(int i=0;i<=myBlockingQueue.size();i++){
						socket=myBlockingQueue.get();
						if(null!=socket && !commonTool.isServerClose(socket)){
							myBlockingQueue.put(socket);
						}else if(null!=socket){
							socket.close();
						}
						Thread.sleep(50);
					}
					System.out.println("myBlockingQueue.size:"+myBlockingQueue.size()+";ListElement:"+myBlockingQueue.getListElement());
				}
				Thread.sleep(2000);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

}

6、转发线程:TranFromWZToYW也是一个实现Runnable接口的类。

public class TranFromWZToYW implements Runnable {
	//记录日志
	Log log = LogFactory.getLog(this .getClass()); 
	CommonTool commonTool=new CommonTool();
	private boolean close;
	Socket socketYW;
	Socket socketWZ;
	ObjectInputStream ois = null;
	ObjectOutputStream oos = null;
  	Message readMessageWZ=null;
  	Message readMessageYW=null;
  	MyBlockingQueue myBlockingQueue;
  	
  	/**
	 * @param socketYW
	 * @param socketWZ
	 */
	public TranFromWZToYW(Socket socketWZ,MyBlockingQueue myBlockingQueue) {
		super();
		this.socketWZ = socketWZ;
		this.myBlockingQueue=myBlockingQueue;
	}
	
	public synchronized void run() {
		try{
			
			close = commonTool.isServerClose(socketWZ);//判断是否断开
			if(!close){//没有断开,开始读数据
				readMessageWZ = commonTool.ReadText(socketWZ);
				if(null!=readMessageWZ){
					System.out.println("读取WZ数据:"+readMessageWZ);
					//log.info("读取WZ数据:"+readMessageWZ);
				}
			}
			 if(null!=readMessageWZ){
			 	   while(myBlockingQueue.size()<=0){
			 		   System.out.println("服务端连接已用完,等待新连接...");
			 		  // log.info("服务端连接已用完,等待新连接...");
			 		   Thread.sleep(1000);
			 	   }
			 	  System.out.println("转换平台开始写数据至业务端。。。");
			 	  while(null==socketYW){
			 		 socketYW=myBlockingQueue.get();
			 	  }
			 	  boolean a=commonTool.Send(socketYW, readMessageWZ);
			 	  if(a){
			 		 System.out.println("转发成功,转换平台开始从业务端读数据+++");
			 		 int i=0;
			 		 while(null==readMessageYW && i<3){
			 			 readMessageYW=commonTool.ReadText(socketYW);
			 			 Thread.sleep(50);
			 			 i++;
			 			 if(commonTool.isServerClose(socketYW) && null==readMessageYW){//用于判断socketYW是否失效
			 				socketYW=myBlockingQueue.get();
			 			 }
			 		 }
				 	 if(null!=readMessageYW){
				 		 System.out.println("转换平台开始从业务端读到数据,并开始转发数据至客户端网站.......");
				 		 close = commonTool.isServerClose(socketWZ);//判断是否断开
				 		 if(!close){
				 			 boolean b=commonTool.Send(socketWZ, readMessageYW);
					 		 if(b){
					 			 System.out.println("转发至网站端成功+++readMessageYW:"+readMessageYW);
					 			// log.info("转发至网站端成功+++readMessageYW:"+readMessageYW);
					 			 myBlockingQueue.put(socketYW);
					 			 System.out.println("socketYW放入myBlockingQueue成功!");
					 		 }
				 		 }else{
				 			 System.out.println("网站端连接关闭....转发至网站端失败!!!!   readMessageYW:"+readMessageYW);
				 			 //log.info("网站端连接关闭....转发至网站端失败!!!!    readMessageYW:"+readMessageYW);
				 			 myBlockingQueue.put(socketYW);
				 			// System.out.println("socketYW放入myBlockingQueue成功!");
				 		 }
				 		
				 	 }
			 	  }
			 	 
			 	 
			 }
		
		}catch (Exception e) {
			System.out.println("系统出现异常。。。。");
			try{
				if(ois!=null){
					ois.close();
				}
				if(oos!=null){
					oos.close();
				}
				if(socketWZ!=null){
					socketWZ.close();
				}
			}catch (Exception e1) {
				// TODO: handle exception
				System.err.println(e1);
			}
			
		}finally{
			try{
				if(ois!=null){
					ois.close();
				}
				if(oos!=null){
					oos.close();
				}
				if(socketWZ!=null){
					socketWZ.close();
				}
			}catch (Exception e) {
				// TODO: handle exception
				System.err.println(e);
			}
		}
	}
}

上面介绍的是状态服务程序的相应代码,下面一篇将会介绍数据库服务端程序。

转载于:https://my.oschina.net/czpdjx/blog/2249249

相关文章:

  • 【呆鸟译Py】这位老师的70个问题,100个数据分析师都想不全
  • 用Gmail做QQ邮件群发的一种方式
  • 机器人操作系统来到Windows
  • sublime text3
  • Spring Cloud 之 Consul 与 Consul 服务剔除
  • 阿里中间件开源组件:Sentinel 0.2.0正式发布
  • 在ABAP里取得一个数据库表记录数的两种方法
  • 【Unity Shader】Shader基础
  • vue项目使用微信公众号支付
  • 你连这10个工具都不知道,还是程序员?
  • DDL语言
  • BZOJ 1568: [JSOI2008]Blue Mary开公司
  • Ubuntu 16.04 下 安装go
  • PHP CLI应用的调试原理
  • springboot入门_email
  • php的引用
  • Date型的使用
  • flutter的key在widget list的作用以及必要性
  • hadoop集群管理系统搭建规划说明
  • Redis在Web项目中的应用与实践
  • use Google search engine
  • 关于字符编码你应该知道的事情
  • 回流、重绘及其优化
  • 聚簇索引和非聚簇索引
  • 看域名解析域名安全对SEO的影响
  • 批量截取pdf文件
  • 数据库写操作弃用“SELECT ... FOR UPDATE”解决方案
  • 问:在指定的JSON数据中(最外层是数组)根据指定条件拿到匹配到的结果
  • 物联网链路协议
  • Spring第一个helloWorld
  • 新年再起“裁员潮”,“钢铁侠”马斯克要一举裁掉SpaceX 600余名员工 ...
  • #Spring-boot高级
  • #stm32整理(一)flash读写
  • #WEB前端(HTML属性)
  • $Django python中使用redis, django中使用(封装了),redis开启事务(管道)
  • (23)Linux的软硬连接
  • (day6) 319. 灯泡开关
  • (done) 两个矩阵 “相似” 是什么意思?
  • (层次遍历)104. 二叉树的最大深度
  • (三) diretfbrc详解
  • (转)EXC_BREAKPOINT僵尸错误
  • (转)linux下的时间函数使用
  • **PHP二维数组遍历时同时赋值
  • .NET Core 2.1路线图
  • .NET Framework 的 bug?try-catch-when 中如果 when 语句抛出异常,程序将彻底崩溃
  • .NET 发展历程
  • .Net 知识杂记
  • .Net 转战 Android 4.4 日常笔记(4)--按钮事件和国际化
  • .NET/C# 使窗口永不获得焦点
  • .NET设计模式(7):创建型模式专题总结(Creational Pattern)
  • .sys文件乱码_python vscode输出乱码
  • @ConditionalOnProperty注解使用说明
  • @Documented注解的作用
  • @Mapper作用
  • [ 2222 ]http://e.eqxiu.com/s/wJMf15Ku