当前位置: 首页 > news >正文

Azure设计模式之资源争竞下的消费者模式

竞争的消费者模式


允许多并发处理在同一消息传输通道上所接收的消息。这能够使系统同时处理多个消息以提高吞吐量,提高可伸缩性与可用性,并平衡工作负载。


问题背景
在云环境中运行的应用程序需要异步处理大量的请求。而不是同步处理每个请求,传统的技术是应用程序通过消息系统传递给另一个服务(使用者服务)达到异步处理的目的。此策略有助于确保在处理请求时不会阻塞应用程序中的业务逻辑。而由于多种原因,请求的数量随着时间的推移持续增长。例如来自多个租户的用户活动或聚合请求的突然增加可能会导致不可预知的工作负载。在高峰时段,系统可能需要每秒处理数百个请求,而在其他时间,该数字可能非常小。此外,处理这些请求所执行的工作的性质可能是高度可变的。使用消费者服务的单实例可能导致该实例被请求所淹没,或是消息传递系统被涌入的消息超载。为了处理这种波动工作负载的情形,系统可以运行多个消费者服务实例。但是,必须对消费者进行协调,以确保每条消息只传递给单个用户。工作负载还需要跨消费者进行负载均衡,以防止某实例成为瓶颈。


解决方案
使用消息队列作为应用程序和消费者服务实例之间的通信通道。应用程序以消息的形式向队列发送请求,而消费者服务实例从队列中拉取消息并处理。可使用消费者服务实例池处理来自应用程序的任何实例的消息。下图说明了如何使用消息队列将工作分配给服务的实例。





该解决方案具有以下优点:
它提供了一个负载分配系统,可以对可变的请求数进行处理。队列充当了应用程序实例和消费者服务实例之间的缓冲区。这有助于最小化对应用程序和服务实例的可用性和响应能力的影响,参见队列负载调配模式(https://docs.microsoft.com/en-us/azure/architecture/patterns/queue-based-load-leveling)。处理一些长时间运行的消息而不会阻塞对其他消息的处理。


它提高了可靠性。如果生产者直接与消费者通信而不使用此模式,不对消费者进行监视,那么当调用失败时,消息有很大的可能性会丢失或无法再次处理。而在此模式下,消息不会发送到特定的服务实例。失败的服务实例也不会阻塞生产者, 任何工作服务实例都可以处理消息。


它不需要在消费者之间或在生产者和消费者实例之间进行复杂的协同。消息队列能够确保每条消息至少发送一次。


它是可伸缩的。随着消息量的波动,系统可以动态地增加或减少消费者服务实例数。


如果消息队列读操作支持事务,可以提高恢复能力。如果使用者服务实例读取并处理消息作为事务性操作的一部分,当调用失败时, 此模式能够确保将消息返回到队列, 并由消费者其他服务进行处理。


问题和注意事项
在决定实现此模式时,要考虑以下几点:
消息顺序。服务实例接收消息的顺序并不能保证与消息的创建顺序一致。这就需要消息的处理是幂等操作,因为这样能够解除对消息处理顺序的依赖。有关详细信息,请参阅乔纳森奥利弗的博客上的幂等模式(http://blog.jonathanoliver.com/idempotency-patterns/)。


可以使用Microsoft Azure服务总线队列来保证消息排序。有关详细信息,请参阅消息会话模式(https://msdn.microsoft.com/magazine/jj863132.aspx)。


创建还原服务。如果系统设计中包含检测并重启失败的服务实例,则需要将服务实例所执行的操作设计为幂等操作,以最大限度地减少当消息被多次处理所带来的影响。


检测“有毒”的消息。格式错误的消息或访问了不可用资源的任务可能会导致服务实例错误。系统应防止将此类消息再次返回队列, 而应捕获并存储这些消息的详细信息, 以便在进行分析。


处理结果。处理消息的服务实例应当与生成消息的应用程序逻辑分离,而且它们无法直接通信。如果服务实例生成的结果必须传回应用程序,则信息必须存储在可供两者访问的共同位置。为了防止应用程序逻辑检索到不完整的数据,系统必须返回处理完成所需的时间。


如果使用的是Azure,则工作进程可通过使用专用的消息队列将结果回传到应用程序。应用程序必须能够将这些结果与消息原始数据关联起来。在异步消息传递入门(https://msdn.microsoft.com/library/dn589781.aspx)中有更详细地介绍。


扩展消息系统。在高度可扩展的解决方案中,单一的消息队列可能会被消息的数量所淹没,从而成为系统的瓶颈。在这种情况下,可考虑对消息系统进行分区,将来自特定生产者的消息发送到指定队列,或者使用负载平衡在多个消息队列中分发消息。


确保消息系统的可靠性。需要建立一个可靠的消息系统,以保证在消息排队后不会丢失。这对于确保所有消息至少传递一次是至关重要的。


何时使用这个模式
应用程序的工作负载可分解为可异步运行的任务。
任务间是独立的, 可以并行运行。
工作量是高度可变的,需要一个可伸缩的解决方案。解决方案必须高可用,并且如果任务的处理失败,可以重试。


以下情况下可能不会考虑使用这个模式:
应用程序的工作负载与任务间高度耦合,不易分解。
任务必须同步执行, 并且应用程序逻辑必须等待任务完成才能继续。
任务必须按特定的顺序执行。
某些消息系统支持会话,使生产者能够将消息组合在一起,以确保它们都被同一使用者处理。此机制可与优先级消息 (如果支持) 一起使用, 以实现消息排序,并将消息按顺序从生产者传递到单个使用者。


例子。
Azure提供了存储队列和服务总线队列,可用于实现此模式。应用程序逻辑可将消息放到队列中,可被一个或多个任务从队列中拉取并处理。为了具备可恢复的弹性机制,服务总线队列使用户能够在从队列中检索消息时使用PeekLock模式。该模式实际上不会删除消息,只是对其他使用者隐藏。原使用者可以在处理完后将其删除。如果使用者出现故障, peek 锁将超时, 消息将再次变为可见, 可被其他任务实例来处理。


有关使用 Azure 服务总线队列的详细信息, 请参阅服务总线队列 (https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions)中的主题与订阅。有关使用 azure 存储队列的信息, 请参阅使用. NET 进行 azure 队列存储入门。(https://docs.microsoft.com/en-us/azure/storage/queues/storage-dotnet-how-to-use-queues)
下面的代码来自竞争消费者模式中的 QueueManager 类, GitHub (https://github.com/mspnp/cloud-design-patterns/tree/master/competing-consumers) 显示了如何通过使用 QueueClient 实例来创建队列。并在 web 或worker role中启动事件处理程序。


private string queueName = ...;
private string connectionString = ...;
...


public async Task Start()
{
  // Check if the queue already exists.
  var manager = NamespaceManager.CreateFromConnectionString(this.connectionString);
  if (!manager.QueueExists(this.queueName))
  {
    var queueDescription = new QueueDescription(this.queueName);


    // Set the maximum delivery count for messages in the queue. A message
    // is automatically dead-lettered after this number of deliveries. The
    // default value for dead letter count is 10.
    queueDescription.MaxDeliveryCount = 3;


    await manager.CreateQueueAsync(queueDescription);
  }
  ...


  // Create the queue client. By default the PeekLock method is used.
  this.client = QueueClient.CreateFromConnectionString(
    this.connectionString, this.queueName);
}




以下代码演示了应用如何创建并发送多条消息到队列。


public async Task SendMessagesAsync()
{
  // Simulate sending a batch of messages to the queue.
  var messages = new List<BrokeredMessage>();


  for (int i = 0; i < 10; i++)
  {
    var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
    messages.Add(message);
  }
  await this.client.SendBatchAsync(messages);
}



下面的代码演示了如何通过事件驱动的方法来接收来自队列的消息。ReceiveMessages方法的processMessageTask 参数是一个委托,它是对接收消息时要运行的代码的引用。这段代码是异步运行的。


private ManualResetEvent pauseProcessingEvent;
...


public void ReceiveMessages(Func<BrokeredMessage, Task> processMessageTask)
{
  // Set up the options for the message pump.
  var options = new OnMessageOptions();


  // When AutoComplete is disabled it's necessary to manually
  // complete or abandon the messages and handle any errors.
  options.AutoComplete = false;
  options.MaxConcurrentCalls = 10;
  options.ExceptionReceived += this.OptionsOnExceptionReceived;


  // Use of the Service Bus OnMessage message pump.
  // The OnMessage method must be called once, otherwise an exception will occur.
  this.client.OnMessageAsync(
    async (msg) =>
    {
      // Will block the current thread if Stop is called.
      this.pauseProcessingEvent.WaitOne();


      // Execute processing task here.
      await processMessageTask(msg);
    },
    options);
}
...


private void OptionsOnExceptionReceived(object sender,
  ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
  ...
}




请注意,诸如azure所提供的自动伸缩功能 也可被用于在队列长度波动时启动和停止worker role实例。有关详细信息, 请参阅(https://docs.microsoft.com/en-us/azure/architecture/best-practices/auto-scaling)此外, 不需要在workerRole实例与工作进程间保持一对一的对应关系-单个workerRole就可以实现多个工作进程。有关详细信息, 请参阅计算资源整合模式。(https://docs.microsoft.com/en-us/azure/architecture/patterns/compute-resource-consolidation)




相关的模式与建议
在实现此模式时, 以下模式可能会有帮助:
异步消息入门。(https://msdn.microsoft.com/library/dn589781.aspx)消息队列是一种异步通信机制。如果使用者服务需要向应用程序发送答复,则可能需要实现响应消息传递机制。异步消息传递入门介绍了有关如何使用消息队列来实现请求/应答消息。


自动伸缩。(https://docs.microsoft.com/en-us/azure/architecture/best-practices/auto-scaling)可以启动或停止消费服务的实例,因为应用程序的队列是高度可变的。自动伸缩可以帮助自动化吞吐量的维护工作。


计算资源整合模式。(https://docs.microsoft.com/en-us/azure/architecture/patterns/compute-resource-consolidation)可将消费者服务的多个实例合并单一进程中,以降低成本和管理的开销。计算资源整合模式中描述了遵循此方法的好处和弊端。


队列负载分级模式。(https://docs.microsoft.com/en-us/azure/architecture/patterns/queue-based-load-leveling)引入消息队列可以增加系统的恢复能力,使服务实例能够处理来自应用程序实例的大量请求。消息队列充当缓冲区, 它对负载进行了分级。队列负载分级模式更详细地描述了此方案。


本模式对应的示例应用程序 放在:https://github.com/mspnp/cloud-design-patterns/tree/master/competing-consumers

相关文章:

  • 搜狗状告腾讯的几个重要看点
  • Azure 设计模式之资源整合
  • Azure 设计模式之事件源模式
  • 新加坡Ruby开发者6月聚会
  • Azure设计模式之外配存储模式
  • [Web 开发] URL 的最大长度
  • Azure设计模式之联邦身份模式
  • 从Oracle到DB2(四)--字符集
  • Azure设计模式之看门人模式
  • 圣淘沙闲逛,傻照两张!
  • Azure设计模式之网关模式
  • Azure设计模式之网关卸载模式
  • Makefile与Shell的问题
  • jQuery:收集一些基于jQuery框架开发的控件/jquery插件。(2)
  • Azure设计模式之网关路由模式
  • python3.6+scrapy+mysql 爬虫实战
  • $translatePartialLoader加载失败及解决方式
  • .pyc 想到的一些问题
  • 78. Subsets
  • CoolViewPager:即刻刷新,自定义边缘效果颜色,双向自动循环,内置垂直切换效果,想要的都在这里...
  • create-react-app项目添加less配置
  • CSS盒模型深入
  • HTTP请求重发
  • Idea+maven+scala构建包并在spark on yarn 运行
  • Java Agent 学习笔记
  • Javascript基础之Array数组API
  • Logstash 参考指南(目录)
  • Node.js 新计划:使用 V8 snapshot 将启动速度提升 8 倍
  • react-core-image-upload 一款轻量级图片上传裁剪插件
  • seaborn 安装成功 + ImportError: DLL load failed: 找不到指定的模块 问题解决
  • vue+element后台管理系统,从后端获取路由表,并正常渲染
  • 前端学习笔记之观察者模式
  • 如何合理的规划jvm性能调优
  • 世界上最简单的无等待算法(getAndIncrement)
  • 白色的风信子
  • mysql 慢查询分析工具:pt-query-digest 在mac 上的安装使用 ...
  • RDS-Mysql 物理备份恢复到本地数据库上
  • 进程与线程(三)——进程/线程间通信
  • 如何用纯 CSS 创作一个菱形 loader 动画
  • 移动端高清、多屏适配方案
  • #define,static,const,三种常量的区别
  • #每天一道面试题# 什么是MySQL的回表查询
  • (12)Hive调优——count distinct去重优化
  • (14)目标检测_SSD训练代码基于pytorch搭建代码
  • (Mac上)使用Python进行matplotlib 画图时,中文显示不出来
  • (附源码)ssm基于web技术的医务志愿者管理系统 毕业设计 100910
  • *_zh_CN.properties 国际化资源文件 struts 防乱码等
  • .NET/C# 阻止屏幕关闭,阻止系统进入睡眠状态
  • [\u4e00-\u9fa5] //匹配中文字符
  • [AIGC] Kong:一个强大的 API 网关和服务平台
  • [BZOJ5250][九省联考2018]秘密袭击(DP)
  • [CareerCup] 17.8 Contiguous Sequence with Largest Sum 连续子序列之和最大
  • [CC2642R1][VSCODE+Embedded IDE+IAR Build+Cortex-Debug] TI CC2642R1基于VsCode的开发环境
  • [Codeforces] probabilities (R1600) Part.1
  • [C语言]一维数组二维数组的大小