C++的线程安全队列模板类封装
目录
1 线程安全队列封装一
2 线程安全队列封装二
3 线程安全队列封装三
1 线程安全队列封装一
/*** ============================================================================** Copyright (c) Huawei Technologies Co., Ltd. 2020-2022. All rights reserved.** Redistribution and use in source and binary forms, with or without* modification, are permitted provided that the following conditions are met:** 1 Redistributions of source code must retain the above copyright notice,* this list of conditions and the following disclaimer.** 2 Redistributions in binary form must reproduce the above copyright notice,* this list of conditions and the following disclaimer in the documentation* and/or other materials provided with the distribution.** 3 Neither the names of the copyright holders nor the names of the* contributors may be used to endorse or promote products derived from this* software without specific prior written permission.** THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE* POSSIBILITY OF SUCH DAMAGE.* ============================================================================*/#ifndef THREAD_SAFE_QUEUE_H
#define THREAD_SAFE_QUEUE_H#include <mutex>
#include <queue>namespace acllite {template<typename T>class ThreadSafeQueue {public:/*** @brief ThreadSafeQueue constructor* @param [in] capacity: the queue capacity*/ThreadSafeQueue(uint32_t capacity) {// check the input value: capacity is validif (capacity >= kMinQueueCapacity && capacity <= kMaxQueueCapacity) {queueCapacity = capacity;}else { // the input value: capacity is invalid, set the default valuequeueCapacity = kDefaultQueueCapacity;}}/*** @brief ThreadSafeQueue constructor*/ThreadSafeQueue() {queueCapacity = kDefaultQueueCapacity;}/*** @brief ThreadSafeQueue destructor*/~ThreadSafeQueue() = default;/*** @brief push data to queue* @param [in] input_value: the value will push to the queue* @return true: success to push data; false: fail to push data*/bool Push(T input_value) {std::lock_guard<std::mutex> lock(mutex_);// check current size is less than capacityif (queue_.size() < queueCapacity) {queue_.push(input_value);return true;}return false;}/*** @brief pop data from queue* @return true: success to pop data; false: fail to pop data*/T Pop() {std::lock_guard<std::mutex> lock(mutex_);if (queue_.empty()) { // check the queue is emptyreturn nullptr;}T tmp_ptr = queue_.front();queue_.pop();return tmp_ptr;}/*** @brief check the queue is empty* @return true: the queue is empty; false: the queue is not empty*/bool Empty() {std::lock_guard<std::mutex> lock(mutex_);return queue_.empty();}/*** @brief get the queue size* @return the queue size*/uint32_t Size() {std::lock_guard<std::mutex> lock(mutex_);return queue_.size();}void ExtendCapacity(uint32_t newSize) {queueCapacity = newSize;kMaxQueueCapacity = newSize > kMaxQueueCapacity ? newSize : kMaxQueueCapacity;}private:std::queue<T> queue_; // the queueuint32_t queueCapacity; // queue capacitymutable std::mutex mutex_; // the mutex valueconst uint32_t kMinQueueCapacity = 1; // the minimum queue capacityconst uint32_t kMaxQueueCapacity = 10000; // the maximum queue capacityconst uint32_t kDefaultQueueCapacity = 10; // default queue capacity};
}
#endif /* THREAD_SAFE_QUEUE_H */
注意使用的时候,队列中要保存的是指针形式,例如
acllite::ThreadSafeQueue<frame_info_user_data *> videoFrameQueue_;
假如你这样写代码,那么会报错
acllite::ThreadSafeQueue<frame_info_user_data *> videoFrameQueue_; //error
编译报下面的错误
src/ThreadSafeQueue.h: In instantiation of ‘T acllite::ThreadSafeQueue<T>::Pop() [with T = acllite::frame_info_user_data]’:
src/VdecHelperV2.cpp:213:51: required from here
src/ThreadSafeQueue.h:96:24: error: could not convert ‘nullptr’ from ‘std::nullptr_t’ to ‘acllite::frame_info_user_data’96 | return nullptr;| ^~~~~~~| || std::nullptr_t
2 线程安全队列封装二
/************************************************************************** Copyright (C) [2019] by Cambricon, Inc. All rights reserved** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** The above copyright notice and this permission notice shall be included in* all copies or substantial portions of the Software.* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN* THE SOFTWARE.*************************************************************************/#ifndef CNSTREAM_THREADSAFE_QUEUE_HPP_
#define CNSTREAM_THREADSAFE_QUEUE_HPP_#include <condition_variable>
#include <mutex>
#include <queue>namespace cnstream {template <typename T>
class ThreadSafeQueue {public:ThreadSafeQueue() = default;ThreadSafeQueue(const ThreadSafeQueue& other) = delete;ThreadSafeQueue& operator=(const ThreadSafeQueue& other) = delete;bool TryPop(T& value); // NOLINTvoid WaitAndPop(T& value); // NOLINTbool WaitAndTryPop(T& value, const std::chrono::microseconds rel_time); // NOLINTvoid Push(const T& new_value); // NOLINTbool Empty() {std::lock_guard<std::mutex> lk(data_m_);return q_.empty();}uint32_t Size() {std::lock_guard<std::mutex> lk(data_m_);return q_.size();}private:std::mutex data_m_;std::queue<T> q_;std::condition_variable notempty_cond_;
};template <typename T>
bool ThreadSafeQueue<T>::TryPop(T& value) { // NOLINTstd::lock_guard<std::mutex> lk(data_m_);if (q_.empty()) {return false;} else {value = q_.front();q_.pop();return true;}
}template <typename T>
void ThreadSafeQueue<T>::WaitAndPop(T& value) { // NOLINTstd::unique_lock<std::mutex> lk(data_m_);notempty_cond_.wait(lk, [&] { return !q_.empty(); });value = q_.front();q_.pop();
}template <typename T>
bool ThreadSafeQueue<T>::WaitAndTryPop(T& value, const std::chrono::microseconds rel_time) { // NOLINTstd::unique_lock<std::mutex> lk(data_m_);if (notempty_cond_.wait_for(lk, rel_time, [&] { return !q_.empty(); })) {value = q_.front();q_.pop();return true;} else {return false;}
}template <typename T>
void ThreadSafeQueue<T>::Push(const T& new_value) {std::unique_lock<std::mutex> lk(data_m_);q_.push(new_value);lk.unlock();notempty_cond_.notify_one();
}} // namespace cnstream#endif // CNSTREAM_THREADSAFE_QUEUE_HPP_
3 线程安全队列封装三
/************************************************************************** Copyright (C) [2020] by Cambricon, Inc. All rights reserved** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** The above copyright notice and this permission notice shall be included in* all copies or substantial portions of the Software.* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN* THE SOFTWARE.*************************************************************************/#ifndef INFER_SERVER_UTIL_THREADSAFE_QUEUE_H_
#define INFER_SERVER_UTIL_THREADSAFE_QUEUE_H_#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <utility>
#include <vector>namespace infer_server {/*** @brief Thread-safe queue** @tparam T Type of stored elements* @tparam Q Type of underlying container to store the elements, which acts as queue,* `std::queue` and `std::priority_queue` satisfy the requirements*/
template <typename T, typename Container = std::queue<T>>
class ThreadSafeQueue {public:/// type of containerusing queue_type = typename std::enable_if<std::is_same<typename Container::value_type, T>::value, Container>::type;/// type of elementsusing value_type = T;/// Container::size_typeusing size_type = typename Container::size_type;/*** @brief Construct a new Thread Safe Queue object*/ThreadSafeQueue() = default;/*** @brief Try to pop an element from queue** @param value An element* @retval true Succeed* @retval false Fail, no element stored in queue*/bool TryPop(T& value); // NOLINT/*** @brief Try to pop an element from queue, wait for `rel_time` if queue is empty** @param value An element* @param rel_time Maximum duration to block for* @retval true Succeed* @retval false Timeout*/bool WaitAndTryPop(T& value, const std::chrono::microseconds rel_time); // NOLINT/*** @brief Pushes the given element value to the end of the queue** @param new_value the value of the element to push*/void Push(const T& new_value) {std::lock_guard<std::mutex> lk(data_m_);q_.push(new_value);notempty_cond_.notify_one();}/*** @brief Pushes the given element value to the end of the queue** @param new_value the value of the element to push*/void Push(T&& new_value) {std::lock_guard<std::mutex> lk(data_m_);q_.push(std::move(new_value));notempty_cond_.notify_one();}/*** @brief Pushes a new element to the end of the queue. The element is constructed in-place.** @tparam Arguments Type of arguments to forward to the constructor of the element* @param args Arguments to forward to the constructor of the element*/template <typename... Arguments>void Emplace(Arguments&&... args) {std::lock_guard<std::mutex> lk(data_m_);q_.emplace(std::forward<Arguments>(args)...);notempty_cond_.notify_one();}/*** @brief Checks if the underlying container has no elements** @retval true If the underlying container is empty* @retval false Otherwise*/bool Empty() {std::lock_guard<std::mutex> lk(data_m_);return q_.empty();}/*** @brief Returns the number of elements in the underlying container** @return size_type The number of elements in the container*/size_type Size() {std::lock_guard<std::mutex> lk(data_m_);return q_.size();}private:ThreadSafeQueue(const ThreadSafeQueue& other) = delete;ThreadSafeQueue& operator=(const ThreadSafeQueue& other) = delete;std::mutex data_m_;queue_type q_;std::condition_variable notempty_cond_;
}; // class ThreadSafeQueuenamespace detail {
template <typename T, typename = typename std::enable_if<!std::is_move_assignable<T>::value>::type>
inline void GetFrontAndPop(std::queue<T>* q_, T* value) {*value = q_->front();q_->pop();
}template <typename T, typename Container = std::vector<T>, typename Compare = std::less<T>,typename = typename std::enable_if<!std::is_move_assignable<T>::value>::type>
inline void GetFrontAndPop(std::priority_queue<T, Container, Compare>* q_, T* value) {*value = q_->top();q_->pop();
}template <typename T>
inline void GetFrontAndPop(std::queue<T>* q_, T* value) {*value = std::move(q_->front());q_->pop();
}template <typename T, typename Container = std::vector<T>, typename Compare = std::less<T>>
inline void GetFrontAndPop(std::priority_queue<T, Container, Compare>* q_, T* value) {// cut off const to enable move*value = std::move(const_cast<T&>(q_->top()));q_->pop();
}
} // namespace detailtemplate <typename T, typename Q>
bool ThreadSafeQueue<T, Q>::TryPop(T& value) { // NOLINTstd::lock_guard<std::mutex> lk(data_m_);if (q_.empty()) {return false;}detail::GetFrontAndPop<T>(&q_, &value);return true;
}template <typename T, typename Q>
bool ThreadSafeQueue<T, Q>::WaitAndTryPop(T& value, const std::chrono::microseconds rel_time) { // NOLINTstd::unique_lock<std::mutex> lk(data_m_);if (notempty_cond_.wait_for(lk, rel_time, [&] { return !q_.empty(); })) {detail::GetFrontAndPop<T>(&q_, &value);return true;} else {return false;}
}/*** @brief Alias of ThreadSafeQueue<T, std::queue<T>>** @tparam T Type of stored elements*/
template <typename T>
using TSQueue = ThreadSafeQueue<T, std::queue<T>>;/*** @brief Alias of ThreadSafeQueue<T, std::priority_queue<T>>** @tparam T Type of stored elements*/
template <typename T>
using TSPriorityQueue = ThreadSafeQueue<T, std::priority_queue<T>>;} // namespace infer_server#endif // INFER_SERVER_UTIL_THREADSAFE_QUEUE_H_