当前位置: 首页 > news >正文

使用缓冲区提高并发

使用缓冲区提高并发

我们知道在计算机处理的处理过程中,CPU运算和内存访问是很快的,而涉及到硬盘访问的数据库或文件操作相对就很慢了。所以在处理较高并发的请求过程中,数据库或文件操作往往会成为处理流程中的性能瓶颈。本文介绍使用缓冲区配合多线程的方法来实现高并发处理能力。

基本测试环境说明

下面我们以springboot作为基础,实现一个简单的http接口,需求是按请求到达服务器端的先后顺序对请求进行编号,并且将请求的编号按序打印在控制台(打印时加入sleep延时以模拟慢处理)。

  • DAO层:

打印控制台的实现类如下:(writeString是打印单行字符串,writeStringInBatch是批量打印多行字符串,用于模拟批量数据库写入,在写入大量数据的时候批量写入数据库往往比循环多次写入单条数据的效率高很多)

import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Random;

@Component
public class DataWriter {
    public void writeString(String line) {
        try {
            Thread.sleep(new Random().nextInt(500));
            // Thread.sleep(500);
            System.out.println(Thread.currentThread().getName()+"->"+line);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void writeStringInBatch(List<String> lines) {
        try {
            Thread.sleep(new Random().nextInt(500));
            // Thread.sleep(500);
            for(String l: lines) {
                System.out.println(Thread.currentThread().getName() + "->" + l);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • controller层:收到/queue请求直接调用service层接口

    @RestController
    public class HelloController {
    
        @Autowired
        private HelloService helloService;
    
        @GetMapping("/queue")
        public String queue() {
            return helloService.writeNormal();
        }
    }
    
  • service层:使用seqNo来记录每个请求的编号,收到请求时,将seqNo加1后交给ISeqHandler处理,因为springboot是多线程处理,因此seqNo加1时需要使用synchronized加锁来保证编号不重复。

    @Service
    public class HelloService {
        private long seqNo = 0;
      
        @Qualifier("NormalSeqHandlerImpl")
        @Autowired
        private ISeqHandler normalHandler;
    
        public synchronized String writeNormal() {
            seqNo++;
            normalHandler.handleData(seqNo+"");
            return "OK";
        }
    }
    
    @Component("NormalSeqHandlerImpl")
    public class NormalSeqHandlerImpl implements ISeqHandler {
    
        @Autowired
        private DataWriter dataWriter;
    
        @Override
        public void handleData(String line) {
            dataWriter.writeString(line);
        }
    
        @Override
        public void handleDataInBatch(List<String> lines) {
            handleDataInBatch(lines);
        }
    }
    
  • 接口测试:使用jmeter对接口做性能测试,创建测试项,步骤如下:

    • 启动jmeter后默认有一个TestPlan,右键点击TestPlan添加一个线程组

      在这里插入图片描述

      Number od Threads表示一次测试并发的总线程数,Ramp-up period表示这些线程在几秒内创建完:

      在这里插入图片描述

    • 右键点击建好的线程组,添加http请求:

      在这里插入图片描述

    • 右键点击线程组,添加聚合结果查看器:

      在这里插入图片描述

    • 如果要查看每个http请求返回的结果可以,右键点击线程组,点击Add->Listener->View Results Tree添加结果树查看器

    • 一个TestPlan下还可以添加多个线程组进行多组不同的测试

    • 创建后点击保存,右键点击某个线程组,点击Start即可开始测试,测试跑完后可以在聚合报告中查看测试情况,我们上面的接口没做任何优化,50个并发请求,平均响应时间达到了6489毫秒:

      在这里插入图片描述

      注意:springboot后台启动后的第一次测试可能需要做一些线程创建操作会比较慢可以多测试几次。

使用缓冲区优化

上面的service层所有的处理过程(包括慢的打印操作)都放在synchronized中,所以多线程一点没发挥出来,完全是串行执行。

为了提高性能,引入缓冲区,seqNo编号加1后按顺序放入缓冲区后直接返回(只放在内存中所以很快)。慢操作不能加锁,但是为了保证按编号顺序打印输出,我们引入一个独立的线程作为消费者专门从缓冲区中获取数据进行控制台打印,为了提高打印效率,采用批量打印方式。

@Service
public class HelloService {
    private long seqNo = 0;

    @Qualifier("NormalSeqHandlerImpl") // 低效处理实现类
    @Autowired
    private ISeqHandler normalHandler;

    @Qualifier("BetterSeqHandlerImpl") // 优化的实现类
    @Autowired
    private ISeqHandler betterHandler;

    public synchronized String writeNormal() {
        seqNo++;
        normalHandler.handleData(seqNo+"");
        return "OK";
    }

    public synchronized String writeBetter() {
        seqNo++;
        betterHandler.handleData(seqNo+"");
        return "OK";
    }
}

@Component("BetterSeqHandlerImpl")
public class BetterSeqHandlerImpl implements ISeqHandler, Runnable {
    @Autowired
    private DataWriter dataWriter;

    private ConcurrentLinkedDeque<String> buffer = new ConcurrentLinkedDeque();
    private Thread flushThread;
    private boolean isRunning = true;

    @PostConstruct // 实例化后启动线程
    public void initThread() {
        flushThread = new Thread(this);
        flushThread.start();
    }

    @PreDestroy
    public void exitThread() {
        isRunning = false;
    }

    @Override
    public void handleData(String line) {
        buffer.add(line);
    }

    @Override
    public void handleDataInBatch(List<String> lines) {
        buffer.addAll(lines);
    }

    @Override
    public void run() {
        System.out.println("flushThread is running...");
        List<String> lines = new ArrayList<>();
        while (isRunning) { // 线程一直运行,从缓冲区拿数据进行处理
            if(buffer.isEmpty()) {
                try {
                    Thread.sleep(10); // 这里要加个sleep,不然CPU会100%
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                while(!buffer.isEmpty()) {
                    lines.add(buffer.poll());
                }
                dataWriter.writeStringInBatch(lines); // 批量处理
                lines.clear();
            }
        }
        System.out.println("flushThread exit.");
    }
}

优化后50个并发很快(平均2毫秒):

在这里插入图片描述

并发加到2000,依然很快(平均6毫秒):

在这里插入图片描述

使用springboot内置线程池

上面的缓冲区消费使用了一个独立线程来消费缓冲的队列,直接使用线程编程比较简单,但对这个线程的可靠性要求比较高,万一线程崩溃了缓冲区就没有消费者了,springboot处理http请求使用的是线程池,我们也可以利用这个线程池来提高一下可靠性,在处理http请求的线程中,由一个线程来消费缓冲区,注意这里不是固定一个线程,而是设置一个标志,每个处理请求线程将数据放入缓冲后判断一下标识,如果没有消费线程就自己来做一次队列消费批量处理,为了避免在有线程处于消费刷写的情况下,其他线程新加的数据无人处理,我们需要设置一个等待线程,这样能确保队列中的数据不会遗漏,下面用一个新的service来完成这个功能:

@Service
public class DoubleBufferService {
    @Autowired
    private DataWriter dataWriter;
    private DoubleBuffer doubleBuffer = new DoubleBuffer();

    private long seqNo = 0;
    // isWaiting表示是否有线程处于等待中, isFlushing表示是否有线程在进行消费
    private boolean isWaiting = false;
    private boolean isFlushing = false;


    public String writeNo() {
        synchronized(this) {
            seqNo++;
            doubleBuffer.addData(seqNo + "");
            if(isWaiting) {
                return "OK";
            }
            isWaiting = true;
            while(isFlushing) {
                try {
                    this.wait(100); // wait期间会释放锁
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            isWaiting = false;
            // flush if there is no thread is flushing
            if(doubleBuffer.getCurrentSize() == 0) {
                return "OK";
            }
            doubleBuffer.exchangeBuf();
            isFlushing = true;
        }
        doubleBuffer.flushBuf();
        synchronized (this) {
            isFlushing = false;
            this.notifyAll(); // 唤醒wait的线程
        }
        return "OK";
    }

    private class DoubleBuffer {
        private LinkedList<String> inBuf = new LinkedList();
        private LinkedList<String> outBuf = new LinkedList();

        public void addData(String data) {
            inBuf.add(data);
        }

        public void exchangeBuf() {
            LinkedList<String> tmp = inBuf;
            inBuf = outBuf;
            outBuf = tmp;
        }

        public void flushBuf() {
            dataWriter.writeStringInBatch(outBuf);
            /*for(String l: outBuf) {
                dataWriter.writeString(l);
            }*/
            outBuf.clear();
        }
        public int getCurrentSize() {
            return inBuf.size();
        }
    }
}

测试结果:

在这里插入图片描述

相关文章:

  • Windows10环境下Python 开发环境搭建
  • JavaEE TCP协议
  • 51单片机DS18B20温度报警器proteus仿真设计_可调上下限
  • SSRF漏洞
  • 猿创征文|平凡的应届生四年学习之路
  • mysql8忘记密码如何重置(禅道的mysqlzt服务和mysql服务冲突)
  • Nginx 配置 SSL(HTTPS)
  • 用css实现简单的动画——“奔跑的小子”(有知识梳理和图片)
  • macbook m1芯片 实现vscode下debug(解决无法读入的问题)
  • 前端:下载文件(多种方法)
  • 猿创征文|【JavaSE】 Collection集合全家桶
  • 【Coppeliasim+Add-on】附加组件-喷涂路径自动生成及喷涂仿真
  • 简易下载并使用Jupyter(Anaconda)
  • 北京大学肖臻老师《区块链技术与应用》公开课笔记:以太坊(四):The DAO、反思、美链、总结
  • 算法与数据结构(2)--- 绪论(下)
  • android图片蒙层
  • Android组件 - 收藏集 - 掘金
  • conda常用的命令
  • ES2017异步函数现已正式可用
  • ES6系统学习----从Apollo Client看解构赋值
  • JS实现简单的MVC模式开发小游戏
  • MD5加密原理解析及OC版原理实现
  • nginx(二):进阶配置介绍--rewrite用法,压缩,https虚拟主机等
  • PHP那些事儿
  • select2 取值 遍历 设置默认值
  • ViewService——一种保证客户端与服务端同步的方法
  • vue的全局变量和全局拦截请求器
  • Vue--数据传输
  • Yii源码解读-服务定位器(Service Locator)
  • 从地狱到天堂,Node 回调向 async/await 转变
  • 从零开始学习部署
  • 第13期 DApp 榜单 :来,吃我这波安利
  • 分享一个自己写的基于canvas的原生js图片爆炸插件
  • 聊聊flink的TableFactory
  • 如何设计一个比特币钱包服务
  • 使用Envoy 作Sidecar Proxy的微服务模式-4.Prometheus的指标收集
  • 我的面试准备过程--容器(更新中)
  • #LLM入门|Prompt#1.8_聊天机器人_Chatbot
  • #经典论文 异质山坡的物理模型 2 有效导水率
  • ()、[]、{}、(())、[[]]命令替换
  • (编译到47%失败)to be deleted
  • (附源码)springboot教学评价 毕业设计 641310
  • (附源码)springboot优课在线教学系统 毕业设计 081251
  • (三) diretfbrc详解
  • (一)WLAN定义和基本架构转
  • (自用)learnOpenGL学习总结-高级OpenGL-抗锯齿
  • ***通过什么方式***网吧
  • ./和../以及/和~之间的区别
  • .net MySql
  • .NET 常见的偏门问题
  • .net对接阿里云CSB服务
  • .net使用excel的cells对象没有value方法——学习.net的Excel工作表问题
  • .net专家(张羿专栏)
  • @ModelAttribute注解使用
  • @param注解什么意思_9000字,通俗易懂的讲解下Java注解