当前位置: 首页 > news >正文

并行编程实战——TBB框架的应用之一Supra的基础

一、TBB的应用

在前面分析了TBB框架的各种基本知识和相关的基础应用。这些基础的应用很容易通过学习文档或相关的代码来较为轻松的掌握。为了能够更好的理解TBB框架的优势,这里从一个开源的应用程序来分析一下TBB在其中的更高一层的抽象应用,以方便开发者能够更好的理解和深入掌握TBB框架。也从设计角度为开发者提供了一个TBB应用的方向,从而能够进一步将TBB框架从基础的技术应用上升到架构设计。

二、Supra项目的介绍

SUPRA: Open Source Software Defined Ultrasound Processing for Real-Time Applications。它是一个开源的超声实时应用软件,主要是在医学领域的超声数据的图像重建和输出,同时允许在图像数据的处理过程中对其进行完善和修改。它支持CPU和GPU两种模式,支持2D和3D图像的格式。
SUPRA的整体的框架基础是在TBB的框架基础上进行设计的,它在TBB的任务、节点等抽象的层次上又进行了一次抽象的封装,让其更接近于人们的认知形态。

三、整体架构分析

SUPRA的架构分析将整体略过上层应用的部分,因为这块与UI和实际业务强相关,与今天要分析的TBB没有什么关系。主要谈一下其库的内容设计。SUPRA将整体的设计划分成了几层:
1、TBB节点的抽象层
SUPRA在TBB现有节点的基础上,再次抽象。实现了输入、输出和算法等节点。但是,其这种抽象,与TBB本身的输入、输出完全不同,其设计的节点完全是纯逻辑上的意义。看一下它的代码定义:

class AbstractNode {
protected:typedef tbb::flow::function_node<std::shared_ptr<RecordObject>, std::shared_ptr<RecordObject>, tbb::flow::rejecting>NodeTypeDiscarding;typedef tbb::flow::function_node<std::shared_ptr<RecordObject>, std::shared_ptr<RecordObject>, tbb::flow::queueing>NodeTypeQueueing;typedef tbb::flow::function_node<std::shared_ptr<RecordObject>, tbb::flow::continue_msg, tbb::flow::rejecting>NodeTypeOneSidedDiscarding;typedef tbb::flow::function_node<std::shared_ptr<RecordObject>, tbb::flow::continue_msg, tbb::flow::queueing>NodeTypeOneSidedQueueing;public:/// Base constructor for all nodesAbstractNode(const std::string &nodeID, bool queueing) : m_nodeID(nodeID), m_queueing(queueing) {m_configurationDictionary.setValueRangeDictionary(&m_valueRangeDictionary);}virtual ~AbstractNode() {}virtual size_t getNumInputs() = 0;virtual size_t getNumOutputs() = 0;/// Returns a pointer to the input port with the given numbervirtual tbb::flow::graph_node *getInput(size_t index) { return nullptr; }/// Returns a pointer to the output port with the given numbervirtual tbb::flow::graph_node *getOutput(size_t index) { return nullptr; }const ValueRangeDictionary *getValueRangeDictionary() { return &m_valueRangeDictionary; }const ConfigurationDictionary *getConfigurationDictionary() { return &m_configurationDictionary; }const std::string &getNodeID() { return m_nodeID; }template <typename ValueType> bool changeConfig(const std::string &configKey, const ValueType &newValue) {if (m_valueRangeDictionary.hasKey(configKey) && m_valueRangeDictionary.isInRange(configKey, newValue)) {LOG(INFO) << "Parameter: " << m_nodeID << "." << configKey << " = " << newValue;m_configurationDictionary.set(configKey, newValue);configurationEntryChanged(configKey);return true;}return false;}void changeConfig(const ConfigurationDictionary &newConfig) {configurationDictionaryChanged(newConfig);// validate the configuration entriesConfigurationDictionary validConfig = newConfig;validConfig.setValueRangeDictionary(&m_valueRangeDictionary);validConfig.checkEntriesAndLog(m_nodeID);// store all valid entriesm_configurationDictionary = validConfig;configurationChanged();}std::string getTimingInfo() { return m_callFrequency.getTimingInfo(); }protected:/// The collection of node parametersConfigurationDictionary m_configurationDictionary;/// The definition of parameters and their respective rangesValueRangeDictionary m_valueRangeDictionary;CallFrequency m_callFrequency;bool m_queueing;protected:virtual void configurationEntryChanged(const std::string &configKey) {}virtual void configurationChanged() {}virtual void configurationDictionaryChanged(const ConfigurationDictionary &newConfig) {}private:std::string m_nodeID;
};
class AbstractInput : public AbstractNode {
public:/// Base constructor for the input node. Initializes its output ports.AbstractInput(tbb::flow::graph &graph, const std::string &nodeID, size_t numPorts): AbstractNode(nodeID, false), m_numOutputs(numPorts) {m_pOutputNodes.resize(m_numOutputs);for (size_t i = 0; i < m_numOutputs; i++) {m_pOutputNodes[i] = std::unique_ptr<tbb::flow::broadcast_node<std::shared_ptr<RecordObject>>>(new tbb::flow::broadcast_node<std::shared_ptr<RecordObject>>(graph));}}~AbstractInput() { waitForFinish(); }void waitForFinish() {if (m_pInputDeviceThread && m_pInputDeviceThread->joinable()) {m_pInputDeviceThread->join();}}void detachThread() { this->m_pInputDeviceThread->detach(); }void start() {setRunning(true);m_pInputDeviceThread = std::make_shared<std::thread>(std::thread([this]() { this->startAcquisition(); }));}/// Set the state of the input node, if newState is false, the node is stoppedvirtual bool setRunning(bool newState) {bool oldState = m_running;m_running = newState;if (!m_running) {stopAcquisition();}return (oldState || newState) && !(oldState && oldState);}/// Returns whether the node is runningbool getRunning() { return m_running; }/// returns the output port with the given indextemplate <size_t index> tbb::flow::broadcast_node<std::shared_ptr<RecordObject>> &getOutputNode() {return *std::get<index>(m_pOutputNodes);}virtual size_t getNumInputs() { return 0; }/// returns the number of output ports of this nodevirtual size_t getNumOutputs() { return m_pOutputNodes.size(); }/// returns a pointer to the output port with the given indexvirtual tbb::flow::graph_node *getOutput(size_t index) {if (index < m_pOutputNodes.size()) {return m_pOutputNodes[index].get();}return nullptr;}protected:/// The nodes output. An implementing node calls this method when it has a/// dataset to send into the graph.template <size_t index> bool addData(std::shared_ptr<RecordObject> data) {return m_pOutputNodes[index]->try_put(data);}double getTimerFrequency() { return m_timer.getFrequency(); }void setUpTimer(double frequency) { m_timer.setFrequency(frequency); }void timerLoop() {bool shouldContinue = true;while (shouldContinue) {shouldContinue = timerCallback();if (shouldContinue) {m_timer.sleepUntilNextSlot();}}}private:std::vector<std::unique_ptr<tbb::flow::broadcast_node<std::shared_ptr<RecordObject>>>> m_pOutputNodes;SingleThreadTimer m_timer;std::shared_ptr<std::thread> m_pInputDeviceThread;std::atomic_bool m_running;protected:std::mutex m_mutex;const size_t m_numOutputs;// Functions to be overwritten
public:virtual void initializeDevice() {}virtual bool ready() { return false; }virtual std::vector<size_t> getImageOutputPorts() = 0;virtual std::vector<size_t> getTrackingOutputPorts() = 0;virtual void freeze() = 0;virtual void unfreeze() = 0;protected:/// The entry point for the implementing input node/// This method is called in a separate thread once the node is started.virtual void startAcquisition() = 0;virtual void stopAcquisition() {}virtual bool timerCallback() { return false; }
};

其它节点的代码就不拷贝上来了。从上面的代码可以看到,其节点的设计几乎等同于应用的逻辑表达了,也就是说抽象的层次更高了。

2、TBB节点抽象后的动态管理层
动态管理层分为几部分:首先是节点的性质、参数以太连接状态等由一个XML配置文件来实现。当然,它也支持在代码中完全动态的调整;其次,节点的管理注册由一个专门的工厂类的实现,包括节点的创建和连接等;最后,它实现了对节点重点参数的动态修改(实现了一套相关的XML自动映射机制)。
这里暂时只关注一下工厂类的处理:

std::shared_ptr<AbstractOutput> InterfaceFactory::createOutputDevice(shared_ptr<tbb::flow::graph> pG,const std::string &nodeID, std::string deviceType,bool queueing) {std::shared_ptr<AbstractOutput> retVal = std::shared_ptr<AbstractOutput>(nullptr);if (deviceType == "OpenIGTLinkClientOutputDevice") {retVal = std::make_shared<OpenIGTLinkClientOutputDevice>(*pG, nodeID, queueing);}if (deviceType == "DatasCacheOutputDevice") {retVal = std::make_shared<DatasCacheOutputDevice>(*pG, nodeID, queueing);}LOG_IF(ERROR, !((bool)retVal)) << "Error creating output device. Requested type '" << deviceType<< "' is unknown. Did you activate the corresponding "<< "module in the build of the library?";LOG_IF(INFO, (bool)retVal) << "Created output device '" << deviceType << "' with ID '" << nodeID << "'";return retVal;
}

在后面会对这一块进行较详细的分析。
3、数据处理层
在每个功能节点,都会有类似下面的writeData来连接抽象节点与TBB节点之间的联系,来处理数据:

  AbstractOutput(tbb::flow::graph &graph, const std::string &nodeID, bool queueing) : AbstractNode(nodeID, queueing) {if (queueing) {m_inputNode = std::unique_ptr<NodeTypeOneSidedQueueing>(new NodeTypeOneSidedQueueing(graph, 1, [this](const std::shared_ptr<RecordObject> &inMessage) {if (this->m_running) {writeData(inMessage);}}));} else {m_inputNode = std::unique_ptr<NodeTypeOneSidedDiscarding>(new NodeTypeOneSidedDiscarding(graph, 1, [this](const std::shared_ptr<RecordObject> &inMessage) {if (this->m_running) {writeData(inMessage);}}));}}

这样就非常巧妙的把二者整合到一想,非常值得借鉴。

4、算法处理层

其算法处理层就是在节点中拿到数据后,对相关数据进行处理,比如各种图像的处理、数据的压缩等等:

	shared_ptr<RecordObject> BeamformingMVNode::checkTypeAndBeamform(shared_ptr<RecordObject> inObj){unique_lock<mutex> l(m_mutex);shared_ptr<USImage> pImageRF = nullptr;if (inObj->getType() == TypeUSRawData){shared_ptr<const USRawData> pRawData = dynamic_pointer_cast<const USRawData>(inObj);if (pRawData){if (pRawData->getImageProperties()->getImageState() == USImageProperties::RawDelayed){m_callFrequency.measure();switch (pRawData->getDataType()){case TypeInt16:pImageRF = beamformTemplated<int16_t>(pRawData);break;case TypeFloat:pImageRF = beamformTemplated<float>(pRawData);break;default:logging::log_error("BeamformingMVNode: Input rawdata type is not supported.");break;}m_callFrequency.measureEnd();if (m_lastSeenImageProperties != pImageRF->getImageProperties()){updateImageProperties(pImageRF->getImageProperties());}pImageRF->setImageProperties(m_editedImageProperties);}else {logging::log_error("BeamformingMVNode: Cannot beamform undelayed RawData. Apply RawDelayNode first");}}else {logging::log_error("BeamformingMVNode: could not cast object to USRawData type, is it in supported ElementType?");}}return pImageRF;}

这个函数checkTypeAndBeamform(类似于writedata函数)内部调用了:

	template <typename RawDataType>std::shared_ptr<USImage> BeamformingMVNode::beamformTemplated(shared_ptr<const USRawData> rawData){shared_ptr<USImage> pImageRF = nullptr;cudaSafeCall(cudaDeviceSynchronize());cublasSafeCall(cublasSetStream(m_cublasH, rawData->getData<RawDataType>()->getStream()));switch (m_outputType){case supra::TypeInt16:pImageRF = RxBeamformerMV::performRxBeamforming<RawDataType, int16_t>(rawData, m_subArraySize, m_temporalSmoothing, m_cublasH, m_subArrayScalingPower, m_computeMeans);break;case supra::TypeFloat:pImageRF = RxBeamformerMV::performRxBeamforming<RawDataType, float>(rawData, m_subArraySize, m_temporalSmoothing, m_cublasH, m_subArrayScalingPower, m_computeMeans);break;default:logging::log_error("BeamformingMVNode: Output image type not supported:");break;}cudaSafeCall(cudaDeviceSynchronize());return pImageRF;}

这其实就进入了算法的处理。后面复杂的算法处理就不拷贝上来了,有兴趣可以自己看看。
5、输入输出层
这个也非常值得借鉴,数据的输入和经过TBB算法处理后的数据需要传递给相关的各方,此处SUPRA也提供了很好的范例

//输入bool UltrasoundInterfaceRawDataMock::timerCallback() {if (!m_frozen){double timestamp = getCurrentTime();m_callFrequency.measure();shared_ptr<USRawData> pRawData = std::make_shared<USRawData>(m_protoRawData->getNumScanlines(),m_protoRawData->getNumElements(),m_protoRawData->getElementLayout(),m_protoRawData->getNumReceivedChannels(),m_protoRawData->getNumSamples(),m_protoRawData->getSamplingFrequency(),m_pMockData,m_protoRawData->getRxBeamformerParameters(),m_protoRawData->getImageProperties(),getCurrentTime(),getCurrentTime());addData<0>(pRawData);if (!m_singleImage){if (m_lastFrame){setRunning(false);}else{readNextFrame();}}m_callFrequency.measureEnd();}return getRunning();}

输入节点通过读取MOCK的文件数据,来复现实际的图像和相关的数据。再看一下输出:

//输出void OpenIGTLinkOutputDevice::writeData(std::shared_ptr<RecordObject> data){if (m_isReady && getRunning() && m_isConnected){m_callFrequency.measure();sendMessage(data);m_callFrequency.measureEnd();}}//最终调用template <typename T>void OpenIGTLinkOutputDevice::sendImageMessageTemplated(shared_ptr<const USImage> imageData){static_assert(std::is_same<T, uint8_t>::value ||std::is_same<T, int16_t>::value ||std::is_same<T, float>::value,"Image only implemented for uchar, short and float at the moment");auto properties = imageData->getImageProperties();if (properties->getImageType() == USImageProperties::BMode ||properties->getImageType() == USImageProperties::Doppler){double resolution = properties->getImageResolution();vec3s imageSize = imageData->getSize();igtl::ImageMessage::Pointer pImageMsg = igtl::ImageMessage::New();pImageMsg->SetDimensions((int)imageSize.x, (int)imageSize.y, (int)imageSize.z);pImageMsg->SetSpacing(resolution, resolution, resolution);if (is_same<T, uint8_t>::value){pImageMsg->SetScalarTypeToUint8();}if (is_same<T, int16_t>::value){pImageMsg->SetScalarTypeToInt16();}if (is_same<T, float>::value){pImageMsg->SetScalarType(igtl::ImageMessage::TYPE_FLOAT32);}pImageMsg->SetEndian(igtl::ImageMessage::ENDIAN_LITTLE);igtl::Matrix4x4 m;igtl::IdentityMatrix(m);m[0][0] = -1;m[1][1] = -1;pImageMsg->SetMatrix(m);pImageMsg->SetNumComponents(1);pImageMsg->SetDeviceName(m_streamName.c_str());pImageMsg->AllocateScalars();igtl::TimeStamp::Pointer pTimestamp = igtl::TimeStamp::New();double timestampSeconds;double timestampFrac = modf(imageData->getSyncTimestamp(), &timestampSeconds);pTimestamp->SetTime((uint32_t)timestampSeconds, (uint32_t)(timestampFrac*1e9));pImageMsg->SetTimeStamp(pTimestamp);auto imageContainer = imageData->getData<T>();if (!imageContainer->isHost()){imageContainer = make_shared<Container<T> >(LocationHost, *imageContainer);}size_t numElements = imageSize.x * imageSize.y * imageSize.z;memcpy(pImageMsg->GetScalarPointer(), imageContainer->get(), numElements * sizeof(T));pImageMsg->Pack();int sendResult = m_clientConnection->Send(pImageMsg->GetPackPointer(), pImageMsg->GetPackSize());if (sendResult == 0) //when it could not be sent{m_isConnected = false;log_info("IGTL: Lost connection. Waiting for next connection.");waitAsyncForConnection();}}}

这个输出节点提供的是医疗上常用的IGTL通信模块来做为输出节点的最终通信方式。

四、总结

之所以从SUPRA框架入手,最主要的就是其在设计上有机的整合了TBB框架,将业务逻辑更好的与TBB框架的设计再次抽象,在整体流程实现的过程中,实现了业务逻辑与底层技术的动态组合。是一个非常值得借鉴的设计。

相关文章:

  • 【2024】前端学习笔记11-网页布局-弹性布局flex
  • 常用bash脚本
  • 在大数据爬取中选择合适的IP
  • OpenCV学堂 | YOLOv8官方团队宣布YOLOv11 发布了
  • uniapp 知识点
  • 中九无科研无竞赛保研经验帖——上交软院、中科大计算机、复旦工程硕、南大工程硕、浙大软件
  • android 原生加载pdf
  • 【Linux笔记】在VMware中,为基于NAT模式运行的CentOS虚拟机设置固定的网络IP地址
  • 3. 轴指令(omron 机器自动化控制器)——>MC_MoveRelative
  • 随身 WiFi 扩展 USB 接口 可用于外接 U 盘 有线网卡 打印机
  • 计算机毕业设计 基于协同过滤算法的个性化音乐推荐系统的设计与实现 Java实战项目 附源码+文档+视频讲解
  • 链表OJ经典题目及思路总结(一)
  • 【计算机网络超强概念总结】第二章 物理层
  • 欧几里得8月模考总结
  • 使用容器启动的zk无法暴露3888问题解决
  • 【5+】跨webview多页面 触发事件(二)
  • const let
  • CSS相对定位
  • Docker下部署自己的LNMP工作环境
  • egg(89)--egg之redis的发布和订阅
  • es6(二):字符串的扩展
  • JavaScript 无符号位移运算符 三个大于号 的使用方法
  • JSONP原理
  • Spring Cloud(3) - 服务治理: Spring Cloud Eureka
  • Vim 折腾记
  • Vue源码解析(二)Vue的双向绑定讲解及实现
  • Work@Alibaba 阿里巴巴的企业应用构建之路
  • 排序算法之--选择排序
  • 前端性能优化--懒加载和预加载
  • 前嗅ForeSpider教程:创建模板
  • 浅谈Kotlin实战篇之自定义View图片圆角简单应用(一)
  • 数据科学 第 3 章 11 字符串处理
  • 协程
  • 因为阿里,他们成了“杭漂”
  • 用Canvas画一棵二叉树
  • 优化 Vue 项目编译文件大小
  • 最近的计划
  • k8s使用glusterfs实现动态持久化存储
  • 哈罗单车融资几十亿元,蚂蚁金服与春华资本加持 ...
  • 浅谈sql中的in与not in,exists与not exists的区别
  • ​DB-Engines 11月数据库排名:PostgreSQL坐稳同期涨幅榜冠军宝座
  • ​力扣解法汇总1802. 有界数组中指定下标处的最大值
  • #绘制圆心_R语言——绘制一个诚意满满的圆 祝你2021圆圆满满
  • (16)Reactor的测试——响应式Spring的道法术器
  • (LLM) 很笨
  • (rabbitmq的高级特性)消息可靠性
  • (太强大了) - Linux 性能监控、测试、优化工具
  • **《Linux/Unix系统编程手册》读书笔记24章**
  • . ./ bash dash source 这五种执行shell脚本方式 区别
  • .locked1、locked勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • .NET/C# 检测电脑上安装的 .NET Framework 的版本
  • .netcore 6.0/7.0项目迁移至.netcore 8.0 注意事项
  • .net反编译工具
  • .NET牛人应该知道些什么(2):中级.NET开发人员
  • .NET企业级应用架构设计系列之结尾篇