当前位置: 首页 > news >正文

Aeron:两个代理之间的单向IPC(One-way IPC between two agents)

一、概述

本例展示了如何通过 IPC 在调度于不同线程的两个代理之间传输缓冲区。在继续学习本示例之前,最好先复习一下Simplest Full Example ,因为该示例展示的是 IPC 通信,没有增加代理的复杂性。读者还应熟悉Media Driver

流程构建如下:

  • 以默认模式运行的嵌入式Media Driver(发送器、接收器和指挥器的代理(an agent for Sender, Receiver, Conductor))(an embedded media driver running in default mode (an agent for Sender, Receiver, Conductor))
  • 通过publication发送 IPC 数据的代理(SendAgent)(an agent to send the IPC data over a publication (SendAgent))
  • 通过subscription接收 IPC 数据的代理(ReceiveAgent)(an agent to receive the IPC data over a subscription (ReceiveAgent))

Code Sample overview

代码示例包含在 ipc-core 项目 com.aeroncookbook.ipc.agents 命名空间中的三个文件中。它们是:

  • StartHere.java - the class responsible for setting up Aeron and scheduling the agents;(负责设置 Aeron 和调度代理的类)
  • SendAgent.java - the class holding the Agent responsible for sending data;(负责发送数据的代理的类)
  • ReceiveAgent.java - the class holding the Agent responsible for receiving data.(负责接收数据的代理的类)

下文将对每个部分进行细分和讨论。

 Execution Output

15:13:42.814 [main] starting
15:13:42.964 [receiver] received: 1000000

二、StartHere.java

public static void main(String[] args)
{final String channel = "aeron:ipc";final int stream = 10;final int sendCount = 1_000_000;final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();//construct Media Driver, cleaning up media driver folder on start/stopfinal MediaDriver.Context mediaDriverCtx = new MediaDriver.Context().dirDeleteOnStart(true).threadingMode(ThreadingMode.SHARED).sharedIdleStrategy(new BusySpinIdleStrategy()).dirDeleteOnShutdown(true);final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);//construct Aeron, pointing at the media driver's folderfinal Aeron.Context aeronCtx = new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName());final Aeron aeron = Aeron.connect(aeronCtx);//construct the subs and pubsfinal Subscription subscription = aeron.addSubscription(channel, stream);final Publication publication = aeron.addPublication(channel, stream);//construct the agentsfinal SendAgent sendAgent = new SendAgent(publication, sendCount);final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier,sendCount);//construct agent runnersfinal AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,Throwable::printStackTrace, null, sendAgent);final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,Throwable::printStackTrace, null, receiveAgent);LOGGER.info("starting");//start the runnersAgentRunner.startOnThread(sendAgentRunner);AgentRunner.startOnThread(receiveAgentRunner);//wait for the final item to be received before closingbarrier.await();//close the resourcesreceiveAgentRunner.close();sendAgentRunner.close();aeron.close();mediaDriver.close();
}

 Constructing support objects

final String channel = "aeron:ipc";
final int stream = 10;
final int sendCount = 1000;
final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();
final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

这部分代码构建了一些支持对象。(This section of the code constructs a few support objects.)

  • Line 1 holds the channel definition, in this case aeron:ipc
  • Line 2 holds the stream ID to use, in this case 10
  • Line 3 is the number of integers to send over IPC
  • 第 4、5 行构建了代理使用的空闲策略(IdleStrategy)。在这种情况下,只要 doWork 工作周期返回 0,空闲策略就会忙于旋转。(Line 4,5 constructs the IdleStrategy to be used by the agents. In this case, whenever the doWork duty cycle returns 0, the idle strategy will busy spin.)
  • 第 6 行是一个屏障,用于协调样本的关闭。一旦 ReceiveAgent 总共接收到一个发送计数整数,它就会向屏障发出信号,触发关闭。(Line 6 is a barrier that will be used to co-ordinate a shutdown of the sample. Once the ReceiveAgent has received a total of sendCount integers, it will signal the barrier, triggering the shutdown.)

Constructing the Media Driver 

//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context().dirDeleteOnStart(true).threadingMode(ThreadingMode.SHARED).sharedIdleStrategy(new BusySpinIdleStrategy()).dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

本节代码使用定义的上下文构建Media Driver。上下文是一个对象,其中包含Media Driver的所有可选配置参数。在本例中,有两项配置被重写,以确保Media Driver在启动和关闭时整理Media Driver目录。一旦上下文准备就绪,Media Driver就会作为嵌入式代理启动。

See also: Media Driver

 Constructing Aeron, the Publication and the Subscription

//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

这部分代码再次使用 Context 构建 Aeron 对象。有了这个上下文,我们就能让 Aeron 知道Media Driver的 Aeron 目录在哪里。一旦上下文准备就绪,Aeron 对象就会连接到Media Driver。接下来,我们将使用之前定义的通道和流 id 创建 IPC 发布和订阅(IPC publication and subscription)。

//construct the subs and pubs
final Subscription subscription = aeron.addSubscription(channel, stream);
final Publication publication = aeron.addPublication(channel, stream);

Constructing and scheduling the agents

//construct the agents
final SendAgent sendAgent = new SendAgent(publication, sendCount);
final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier, sendCount);//construct agent runners
final AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,Throwable::printStackTrace, null, sendAgent);
final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,Throwable::printStackTrace, null, receiveAgent);//start the runners
AgentRunner.startOnThread(sendAgentRunner);
AgentRunner.startOnThread(receiveAgentRunner);

 

这部分代码构建发送代理(SendAgent)和接收代理(ReceiveAgent),创建代理运行程序来管理它们,然后在特定线程上启动它们。关键行如下:

  • 第 6-7 行和第 8-9 行:这两行分别构建了发送和接收的代理运行程序。请注意,每行都给出了空闲策略,用于控制线程在 doWork 工作周期后如何使用资源。
  • 第 12 和 13 行:这两行为每个代理创建新线程,并开始工作周期。

Shutting down cleanly

//wait for the final item to be received before closing
barrier.await();//close the resources
receiveAgentRunner.close();
sendAgentRunner.close();
aeron.close();
mediaDriver.close();

代码的最后部分负责等待 ReceiveAgent 触发屏障,然后正确清理资源。首先关闭代理,然后关闭 aeron 对象,最后关闭Media Driver。如果不注意关闭过程中的执行顺序,可能会出现核心转储或其他看似严重的故障。 

三、SendAgent.java

public class SendAgent implements Agent
{private final Publication publication;private final int sendCount;private final UnsafeBuffer unsafeBuffer;private int currentCountItem = 1;private final Logger logger = LoggerFactory.getLogger(SendAgent.class);public SendAgent(final Publication publication, int sendCount){this.publication = publication;this.sendCount = sendCount;this.unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocate(64));unsafeBuffer.putInt(0, currentCountItem);}@Overridepublic int doWork(){if (currentCountItem > sendCount){return 0;}if (publication.isConnected()){if (publication.offer(unsafeBuffer) > 0){currentCountItem += 1;unsafeBuffer.putInt(0, currentCountItem);}}return 0;}@Overridepublic String roleName(){return "sender";}
}

 

send 对象负责通过提供的 Aeron Publication 发送 sendCount 整数。doWork 方法用于保持代理的工作周期,该方法会被持续调用,直至代理关闭。一旦达到 sendCount 限制,它就会停止向publication发送更多信息,并开始闲置。

这段代码中最有趣的部分是:

  • Line 18 to 34: the doWork method holding the duty cycle for this agent
  • 第 22 行和第 34 行:这两条返回语句都返回 0,这将导致选定的空闲策略 BusySpinIdleStrategy 调用 ThreadHints.onSpinWait()
  • 第 25 行:只有当publication已连接时,才会返回 true。一旦连接,就可以安全地向publication提供信息。
  • 第 27 行:这将为publication提供缓冲数据。
  • Line 30: this logs the last sent integer, for example 15:13:42.818 [sender] sent: 123
  • Line 41: this sets the thread name to sender, as is visible in the log output.

四、ReceiveAgent.java

public class ReceiveAgent implements Agent
{private final Subscription subscription;private final ShutdownSignalBarrier barrier;private final int sendCount;private final Logger logger = LoggerFactory.getLogger(ReceiveAgent.class);public ReceiveAgent(final Subscription subscription,ShutdownSignalBarrier barrier, int sendCount){this.subscription = subscription;this.barrier = barrier;this.sendCount = sendCount;}@Overridepublic int doWork() throws Exception{subscription.poll(this::handler, 1000);return 0;}private void handler(DirectBuffer buffer, int offset, int length,Header header){final int lastValue = buffer.getInt(offset);if (lastValue >= sendCount){logger.info("received: {}", lastValue);barrier.signal();}}@Overridepublic String roleName(){return "receiver";}
}

接收代理负责轮询所提供的订阅并记录接收到的值。一旦达到 sendCount 值,接收代理就会发出屏障信号。该对象中最有趣的部分是:

  • 第 17-21 行 - doWork 方法保持着该代理的duty cycle 。duty cycle由两部分组成,一部分是轮询订阅,将事件传递给提供的处理程序,另一部分是返回 0。通过配置的 IdleStrategy,返回 0 将导致线程停顿一微秒。
  • Line 26 - this logs the integer value received, for example: 15:13:42.814 [receiver] received: 5
  • Lines 29-32 - this signals the barrier, triggering the clean shutdown of the process.
  • Line 38 - this sets the role name to receiver, as visible in log output.

 五、Performance

在英特尔笔记本电脑上,本示例每秒可传输约 1 千万条 4 字节信息。如果使用的是 Linux 系统,且有可用的 /dev/shm,代码会自动使用。通过交换 NoOpIdleStrategy,并将media driver线程移至 DEDICATED,每秒可传输超过 2000 万条信息。主要更改见下文。请注意,您需要确保硬件上至少有 8 个物理内核。

final IdleStrategy idleStrategySend = new NoOpIdleStrategy();
final IdleStrategy idleStrategyReceive = new NoOpIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context().dirDeleteOnStart(true).threadingMode(ThreadingMode.DEDICATED).conductorIdleStrategy(new BusySpinIdleStrategy()).senderIdleStrategy(new NoOpIdleStrategy()).receiverIdleStrategy(new NoOpIdleStrategy()).dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context().idleStrategy(new NoOpIdleStrategy()).aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

 

有一个相关的 Two Agent example of OneToOneRingBuffer 非常相似,只不过它使用了 Agrona 的 OneToOneRingBuffer,并通过 BusySpinIdleStrategy 每秒发送大约 1800 万条 4 字节信息,或通过 NoOpIdleStrategy 每秒发送超过 5000 万条信息。

六、Using the C Media Driver

要使用 Aeron 的 C Media Driver测试此示例,您需要执行以下操作:

首先,从源代码中构建 C Media Driver(说明因操作系统而异,此部分参考博客即可,建议翻墙进行操作,贴出cookbook只是帮助借阅):

  • Building the C Media Driver on macOS
  • Building the C Media Driver on CentOS Linux 8
  • Building the C Media Driver on Ubuntu 20.04
  • Building the C Media Driver on Windows 10

Next, start the C Media Driver with default settings

  • ./aeronmd (Linux/macOS)
  • aeronmd (Windows)

Then, remove the Media Driver from StartHere.java, and reduce the Aeron context to defaults:

//construct Media Driver, cleaning up media driver folder on start/stop
//final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
//        .dirDeleteOnStart(true)
//        .threadingMode(ThreadingMode.SHARED)
//        .sharedIdleStrategy(new BusySpinIdleStrategy())
//        .dirDeleteOnShutdown(true);
//final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);
//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context();
final Aeron aeron = Aeron.connect(aeronCtx);

Aeron 和Media Driver将默认使用同一目录。

最后,正常运行 StartHere.java。进程应正常运行,输出应包括类似内容:

14:30:00.293 [main] starting
14:30:00.758 [receiver] received: 10000000

相关文章:

  • visual studio下载安装
  • 【MySQL基础随缘更系列】AB复制
  • 你是否感受到AI就在身边?
  • Leetcode - 132双周赛
  • 海康充电桩报文校验TCP校验和
  • 刷题——链表中倒数最后k个结点
  • 什么是隐马尔可夫模型?
  • 【第5章】Stable Diffusion大模型(简介/两种版本/安装/模型推荐/使用方式)ComfyUI基础入门教程
  • 【Vue3】使用v-model实现父子组件通信(常用在组件封装规范中)
  • Part 4.2 背包动态规划
  • 适用于 macOS 的最佳免费数据恢复软件
  • 浏览器必装插件推荐:最新版Simple Allow Copy,解除网页复制限制!
  • Arcgis投影问题
  • 在mysql中GROUP_CONCAT字段的作用
  • vivado PIN
  • 《Javascript高级程序设计 (第三版)》第五章 引用类型
  • 【402天】跃迁之路——程序员高效学习方法论探索系列(实验阶段159-2018.03.14)...
  • 【RocksDB】TransactionDB源码分析
  • 【附node操作实例】redis简明入门系列—字符串类型
  • 5、React组件事件详解
  • ECMAScript 6 学习之路 ( 四 ) String 字符串扩展
  • extjs4学习之配置
  • github指令
  • iOS帅气加载动画、通知视图、红包助手、引导页、导航栏、朋友圈、小游戏等效果源码...
  • Java-详解HashMap
  • js
  • JS创建对象模式及其对象原型链探究(一):Object模式
  • Linux后台研发超实用命令总结
  • mysql 5.6 原生Online DDL解析
  • ⭐ Unity 开发bug —— 打包后shader失效或者bug (我这里用Shader做两张图片的合并发现了问题)
  • 分类模型——Logistics Regression
  • 面试总结JavaScript篇
  • 使用API自动生成工具优化前端工作流
  • gunicorn工作原理
  • scrapy中间件源码分析及常用中间件大全
  • ​【C语言】长篇详解,字符系列篇3-----strstr,strtok,strerror字符串函数的使用【图文详解​】
  • ​【数据结构与算法】冒泡排序:简单易懂的排序算法解析
  • ​Kaggle X光肺炎检测比赛第二名方案解析 | CVPR 2020 Workshop
  • #pragma预处理命令
  • #多叉树深度遍历_结合深度学习的视频编码方法--帧内预测
  • #考研#计算机文化知识1(局域网及网络互联)
  • $.ajax,axios,fetch三种ajax请求的区别
  • (floyd+补集) poj 3275
  • (十七)Flask之大型项目目录结构示例【二扣蓝图】
  • *Django中的Ajax 纯js的书写样式1
  • .htaccess配置常用技巧
  • .NET Core引入性能分析引导优化
  • .NET导入Excel数据
  • @configuration注解_2w字长文给你讲透了配置类为什么要添加 @Configuration注解
  • @EnableAsync和@Async开始异步任务支持
  • @RequestMapping用法详解
  • [AI Google] Ask Photos: 使用Gemini搜索照片的新方法
  • [C#]使用C#部署yolov8-seg的实例分割的tensorrt模型
  • [codeforces]Recover the String
  • [CSS]浮动