当前位置: 首页 > news >正文

C++的线程安全队列模板类封装

目录

1  线程安全队列封装一

2  线程安全队列封装二

3  线程安全队列封装三


1  线程安全队列封装一

/*** ============================================================================** Copyright (c) Huawei Technologies Co., Ltd. 2020-2022. All rights reserved.** Redistribution and use in source and binary forms, with or without* modification, are permitted provided that the following conditions are met:**   1 Redistributions of source code must retain the above copyright notice,*     this list of conditions and the following disclaimer.**   2 Redistributions in binary form must reproduce the above copyright notice,*     this list of conditions and the following disclaimer in the documentation*     and/or other materials provided with the distribution.**   3 Neither the names of the copyright holders nor the names of the*   contributors may be used to endorse or promote products derived from this*   software without specific prior written permission.** THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE* POSSIBILITY OF SUCH DAMAGE.* ============================================================================*/#ifndef THREAD_SAFE_QUEUE_H
#define THREAD_SAFE_QUEUE_H#include <mutex>
#include <queue>namespace acllite {template<typename T>class ThreadSafeQueue {public:/*** @brief ThreadSafeQueue constructor* @param [in] capacity: the queue capacity*/ThreadSafeQueue(uint32_t capacity) {// check the input value: capacity is validif (capacity >= kMinQueueCapacity && capacity <= kMaxQueueCapacity) {queueCapacity = capacity;}else { // the input value: capacity is invalid, set the default valuequeueCapacity = kDefaultQueueCapacity;}}/*** @brief ThreadSafeQueue constructor*/ThreadSafeQueue() {queueCapacity = kDefaultQueueCapacity;}/*** @brief ThreadSafeQueue destructor*/~ThreadSafeQueue() = default;/*** @brief push data to queue* @param [in] input_value: the value will push to the queue* @return true: success to push data; false: fail to push data*/bool Push(T input_value) {std::lock_guard<std::mutex> lock(mutex_);// check current size is less than capacityif (queue_.size() < queueCapacity) {queue_.push(input_value);return true;}return false;}/*** @brief pop data from queue* @return true: success to pop data; false: fail to pop data*/T Pop() {std::lock_guard<std::mutex> lock(mutex_);if (queue_.empty()) { // check the queue is emptyreturn nullptr;}T tmp_ptr = queue_.front();queue_.pop();return tmp_ptr;}/*** @brief check the queue is empty* @return true: the queue is empty; false: the queue is not empty*/bool Empty() {std::lock_guard<std::mutex> lock(mutex_);return queue_.empty();}/*** @brief get the queue size* @return the queue size*/uint32_t Size() {std::lock_guard<std::mutex> lock(mutex_);return queue_.size();}void ExtendCapacity(uint32_t newSize) {queueCapacity = newSize;kMaxQueueCapacity = newSize > kMaxQueueCapacity ? newSize : kMaxQueueCapacity;}private:std::queue<T> queue_; // the queueuint32_t queueCapacity; // queue capacitymutable std::mutex mutex_; // the mutex valueconst uint32_t kMinQueueCapacity = 1; // the minimum queue capacityconst uint32_t kMaxQueueCapacity = 10000; // the maximum queue capacityconst uint32_t kDefaultQueueCapacity = 10; // default queue capacity};
}
#endif /* THREAD_SAFE_QUEUE_H */

注意使用的时候,队列中要保存的是指针形式,例如

acllite::ThreadSafeQueue<frame_info_user_data *> videoFrameQueue_;

假如你这样写代码,那么会报错

acllite::ThreadSafeQueue<frame_info_user_data *> videoFrameQueue_;  //error

 编译报下面的错误

 src/ThreadSafeQueue.h: In instantiation of ‘T acllite::ThreadSafeQueue<T>::Pop() [with T = acllite::frame_info_user_data]’:
src/VdecHelperV2.cpp:213:51:   required from here
src/ThreadSafeQueue.h:96:24: error: could not convert ‘nullptr’ from ‘std::nullptr_t’ to ‘acllite::frame_info_user_data’96 |                 return nullptr;|                        ^~~~~~~|                        ||                        std::nullptr_t

2  线程安全队列封装二

/************************************************************************** Copyright (C) [2019] by Cambricon, Inc. All rights reserved**  Licensed under the Apache License, Version 2.0 (the "License");*  you may not use this file except in compliance with the License.*  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** The above copyright notice and this permission notice shall be included in* all copies or substantial portions of the Software.* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN* THE SOFTWARE.*************************************************************************/#ifndef CNSTREAM_THREADSAFE_QUEUE_HPP_
#define CNSTREAM_THREADSAFE_QUEUE_HPP_#include <condition_variable>
#include <mutex>
#include <queue>namespace cnstream {template <typename T>
class ThreadSafeQueue {public:ThreadSafeQueue() = default;ThreadSafeQueue(const ThreadSafeQueue& other) = delete;ThreadSafeQueue& operator=(const ThreadSafeQueue& other) = delete;bool TryPop(T& value);  // NOLINTvoid WaitAndPop(T& value);  // NOLINTbool WaitAndTryPop(T& value, const std::chrono::microseconds rel_time);  // NOLINTvoid Push(const T& new_value);  // NOLINTbool Empty() {std::lock_guard<std::mutex> lk(data_m_);return q_.empty();}uint32_t Size() {std::lock_guard<std::mutex> lk(data_m_);return q_.size();}private:std::mutex data_m_;std::queue<T> q_;std::condition_variable notempty_cond_;
};template <typename T>
bool ThreadSafeQueue<T>::TryPop(T& value) {  // NOLINTstd::lock_guard<std::mutex> lk(data_m_);if (q_.empty()) {return false;} else {value = q_.front();q_.pop();return true;}
}template <typename T>
void ThreadSafeQueue<T>::WaitAndPop(T& value) {  // NOLINTstd::unique_lock<std::mutex> lk(data_m_);notempty_cond_.wait(lk, [&] { return !q_.empty(); });value = q_.front();q_.pop();
}template <typename T>
bool ThreadSafeQueue<T>::WaitAndTryPop(T& value, const std::chrono::microseconds rel_time) {  // NOLINTstd::unique_lock<std::mutex> lk(data_m_);if (notempty_cond_.wait_for(lk, rel_time, [&] { return !q_.empty(); })) {value = q_.front();q_.pop();return true;} else {return false;}
}template <typename T>
void ThreadSafeQueue<T>::Push(const T& new_value) {std::unique_lock<std::mutex> lk(data_m_);q_.push(new_value);lk.unlock();notempty_cond_.notify_one();
}}  // namespace cnstream#endif  // CNSTREAM_THREADSAFE_QUEUE_HPP_

3  线程安全队列封装三

/************************************************************************** Copyright (C) [2020] by Cambricon, Inc. All rights reserved**  Licensed under the Apache License, Version 2.0 (the "License");*  you may not use this file except in compliance with the License.*  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** The above copyright notice and this permission notice shall be included in* all copies or substantial portions of the Software.* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN* THE SOFTWARE.*************************************************************************/#ifndef INFER_SERVER_UTIL_THREADSAFE_QUEUE_H_
#define INFER_SERVER_UTIL_THREADSAFE_QUEUE_H_#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <utility>
#include <vector>namespace infer_server {/*** @brief Thread-safe queue** @tparam T Type of stored elements* @tparam Q Type of underlying container to store the elements, which acts as queue,*           `std::queue` and `std::priority_queue` satisfy the requirements*/
template <typename T, typename Container = std::queue<T>>
class ThreadSafeQueue {public:/// type of containerusing queue_type = typename std::enable_if<std::is_same<typename Container::value_type, T>::value, Container>::type;/// type of elementsusing value_type = T;/// Container::size_typeusing size_type = typename Container::size_type;/*** @brief Construct a new Thread Safe Queue object*/ThreadSafeQueue() = default;/*** @brief Try to pop an element from queue** @param value An element* @retval true Succeed* @retval false Fail, no element stored in queue*/bool TryPop(T& value);  // NOLINT/*** @brief Try to pop an element from queue, wait for `rel_time` if queue is empty** @param value An element* @param rel_time Maximum duration to block for* @retval true Succeed* @retval false Timeout*/bool WaitAndTryPop(T& value, const std::chrono::microseconds rel_time);  // NOLINT/*** @brief Pushes the given element value to the end of the queue** @param new_value the value of the element to push*/void Push(const T& new_value) {std::lock_guard<std::mutex> lk(data_m_);q_.push(new_value);notempty_cond_.notify_one();}/*** @brief Pushes the given element value to the end of the queue** @param new_value the value of the element to push*/void Push(T&& new_value) {std::lock_guard<std::mutex> lk(data_m_);q_.push(std::move(new_value));notempty_cond_.notify_one();}/*** @brief Pushes a new element to the end of the queue. The element is constructed in-place.** @tparam Arguments Type of arguments to forward to the constructor of the element* @param args Arguments to forward to the constructor of the element*/template <typename... Arguments>void Emplace(Arguments&&... args) {std::lock_guard<std::mutex> lk(data_m_);q_.emplace(std::forward<Arguments>(args)...);notempty_cond_.notify_one();}/*** @brief Checks if the underlying container has no elements** @retval true If the underlying container is empty* @retval false Otherwise*/bool Empty() {std::lock_guard<std::mutex> lk(data_m_);return q_.empty();}/*** @brief Returns the number of elements in the underlying container** @return size_type The number of elements in the container*/size_type Size() {std::lock_guard<std::mutex> lk(data_m_);return q_.size();}private:ThreadSafeQueue(const ThreadSafeQueue& other) = delete;ThreadSafeQueue& operator=(const ThreadSafeQueue& other) = delete;std::mutex data_m_;queue_type q_;std::condition_variable notempty_cond_;
};  // class ThreadSafeQueuenamespace detail {
template <typename T, typename = typename std::enable_if<!std::is_move_assignable<T>::value>::type>
inline void GetFrontAndPop(std::queue<T>* q_, T* value) {*value = q_->front();q_->pop();
}template <typename T, typename Container = std::vector<T>, typename Compare = std::less<T>,typename = typename std::enable_if<!std::is_move_assignable<T>::value>::type>
inline void GetFrontAndPop(std::priority_queue<T, Container, Compare>* q_, T* value) {*value = q_->top();q_->pop();
}template <typename T>
inline void GetFrontAndPop(std::queue<T>* q_, T* value) {*value = std::move(q_->front());q_->pop();
}template <typename T, typename Container = std::vector<T>, typename Compare = std::less<T>>
inline void GetFrontAndPop(std::priority_queue<T, Container, Compare>* q_, T* value) {// cut off const to enable move*value = std::move(const_cast<T&>(q_->top()));q_->pop();
}
}  // namespace detailtemplate <typename T, typename Q>
bool ThreadSafeQueue<T, Q>::TryPop(T& value) {  // NOLINTstd::lock_guard<std::mutex> lk(data_m_);if (q_.empty()) {return false;}detail::GetFrontAndPop<T>(&q_, &value);return true;
}template <typename T, typename Q>
bool ThreadSafeQueue<T, Q>::WaitAndTryPop(T& value, const std::chrono::microseconds rel_time) {  // NOLINTstd::unique_lock<std::mutex> lk(data_m_);if (notempty_cond_.wait_for(lk, rel_time, [&] { return !q_.empty(); })) {detail::GetFrontAndPop<T>(&q_, &value);return true;} else {return false;}
}/*** @brief Alias of ThreadSafeQueue<T, std::queue<T>>** @tparam T Type of stored elements*/
template <typename T>
using TSQueue = ThreadSafeQueue<T, std::queue<T>>;/*** @brief Alias of ThreadSafeQueue<T, std::priority_queue<T>>** @tparam T Type of stored elements*/
template <typename T>
using TSPriorityQueue = ThreadSafeQueue<T, std::priority_queue<T>>;}  // namespace infer_server#endif  // INFER_SERVER_UTIL_THREADSAFE_QUEUE_H_

相关文章:

  • torch配置时出现问题
  • Zookeeper 面试题(六)
  • ThreadLocal原理及使用
  • 新书推荐:6.2 else if语句
  • SQL刷题笔记day1
  • 证券公司数据中心异地实时同步,如何能不依赖人工即可进行?
  • 【VsCode】通过tasks.json中的problemMatcher属性的fileLocation子属性设定问题的输出内容
  • 【笔记】软件架构师要点记录(1)
  • LeetCode-102. 二叉树的层序遍历【树 广度优先搜索 二叉树】
  • java.util.Arrays 详解
  • 【八股系列】webpack的构建流程是什么?
  • 如何用电脑批量操作多部手机
  • 二.常见算法--贪心算法
  • 基金基础知识-基金的生命周期
  • 蒙特卡洛+概率潮流!基于蒙特卡洛和新能源出力模拟的概率潮流分布程序代码!
  • HTML中设置input等文本框为不可操作
  • Java|序列化异常StreamCorruptedException的解决方法
  • MYSQL 的 IF 函数
  • oschina
  • PV统计优化设计
  • Python学习之路13-记分
  • SpiderData 2019年2月23日 DApp数据排行榜
  • 关于springcloud Gateway中的限流
  • 基于Dubbo+ZooKeeper的分布式服务的实现
  • 离散点最小(凸)包围边界查找
  • 模仿 Go Sort 排序接口实现的自定义排序
  • 双管齐下,VMware的容器新战略
  • 一些基于React、Vue、Node.js、MongoDB技术栈的实践项目
  • 最近的计划
  • 《TCP IP 详解卷1:协议》阅读笔记 - 第六章
  • ​ArcGIS Pro 如何批量删除字段
  • ​人工智能书单(数学基础篇)
  • (007)XHTML文档之标题——h1~h6
  • (2024.6.23)最新版MAVEN的安装和配置教程(超详细)
  • (26)4.7 字符函数和字符串函数
  • (MonoGame从入门到放弃-1) MonoGame环境搭建
  • (WSI分类)WSI分类文献小综述 2024
  • (二)pulsar安装在独立的docker中,python测试
  • (四)模仿学习-完成后台管理页面查询
  • (算法)Game
  • .net core 3.0 linux,.NET Core 3.0 的新增功能
  • .NET Core IdentityServer4实战-开篇介绍与规划
  • .net core IResultFilter 的 OnResultExecuted和OnResultExecuting的区别
  • .net 程序发生了一个不可捕获的异常
  • .net中调用windows performance记录性能信息
  • @cacheable 是否缓存成功_Spring Cache缓存注解
  • @require_PUTNameError: name ‘require_PUT‘ is not defined 解决方法
  • [ C++ ] STL---string类的模拟实现
  • [ vulhub漏洞复现篇 ] Grafana任意文件读取漏洞CVE-2021-43798
  • [20161214]如何确定dbid.txt
  • [AIGC] Kong:一个强大的 API 网关和服务平台
  • [Bzoj4722]由乃(线段树好题)(倍增处理模数小快速幂)
  • [C#]winform部署yolov5-onnx模型
  • [GDMEC-无人机遥感研究小组]无人机遥感小组-000-数据集制备
  • [MZ test.16]P1 评测