当前位置: 首页 > news >正文

初识 beanstalkd

简述

Beanstalkd 是一个轻量级的内存型队列,利用了和 Memcache 类似的协议。依赖 libevent 单线程事件分发机制, 可以部署多个实例,但是高并发支持还是不太友好;

管道

即有名称的任务队列,一个服务器有一个或者多个管道,用来储存统一类型的 job。每个管道由一个就绪队列与延迟队列组成。每个job所有的状态迁移在一个管道中完成。消费者可以监控感兴趣的管道,通过发送 watch 指令。消费者也可以取消监控 tube,通过发送 ignore 命令。通过 list 命令返回所有监控的管道,当客户端预订一个job,此 job 可能来自任何一个它监控的管道。

当一个客户端连接上服务器时,客户端监控的tube 默认为 default,如果客户端提交 job 时,没有使用 use 命令,那么这些 job 就存于名为 defaulttube 中。

管道按需求创建,无论他们在地方被引用到。如果一个管道变为空和没有任何客户端引用,它将会被自动删除。

Job

任务在队里之中被称作 Job. 一个 JobBeanstalkd 中有以下的生命周期:

  • put 将一个任务放置进 tube
  • deayed 这个任务现在再等待中,需要若干秒才能准备完毕【延迟队列】
  • ready 这个任务已经准备好了,可以消费了。所有的消费都是要从取 ready 状态的 job
  • reserved 这个任务已经被消费者消费
  • release 这个 job 执行失败了,把它放进 ready 状态队列中。让其他队列执行
  • bury 这个 job 执行失败了,但不希望其他队列执行,先把它埋起来

此处输入图片的描述

安装

Centos7 上通过命令 yum -y install beanstalkd --enablerepo=epel;

其他系统的安装在 官网 上查看

查看当前版本号

beanstalkd -v

启动

常见启动如下:

beanstalkd -l 0.0.0.0 -p 11300 -b /home/software/binstalkd/binlogs

启动后对 beanstalkd 的操作可以使用 telnet,比如 telnet 127.0.0.1 11300。然后便可以执行 beanstalkd 的各命令,如 stats 查看信息,use, put, watch 等等。

使用场景

  • 用作延时队列:比如可以用于如果用户30分钟内不操作,任务关闭。
  • 用作定时任务:比如可以用于专门的后台任务。
  • 用作异步操作:这是所有消息队列都最常用的,先将任务仍进去,顺序执行。
  • 用作循环队列:用release命令可以循环执行任务,比如可以做负载均衡任务分发。
  • 用作兜底机制:比如一个请求有失败的概率,可以用Beanstalk不断重试,设定超时时间,时间内尝试到成功为止。

操作

以下示例代码是基于 pda/pheanstalk 这个第三方扩展写的;

查看统计信息

整个 beanstalkd 信息

<?php

require './vendor/autoload.php';

use Pheanstalk\Pheanstalk;

$pheanstalk = new Pheanstalk('127.0.0.1');

# 查看 beanstalkd 当前的状态信息
var_dump($pheanstalk->stats());

输出的信息为以下 :

   'current-jobs-urgent' => '0', // 优先级小于1024状态为ready的job数量
   'current-jobs-ready' => '0', // 状态为ready的job数量
   'current-jobs-reserved' => '0', // 状态为reserved的job数量
   'current-jobs-delayed' => '0', // 状态为delayed的job数量
   'current-jobs-buried' => '0', // 状态为buried的job数量
   'cmd-put' => '0', // 总共执行put指令的次数
   'cmd-peek' => '0', // 总共执行peek指令的次数
   'cmd-peek-ready' => '0', // 总共执行peek-ready指令的次数
   'cmd-peek-delayed' => '0', // 总共执行peek-delayed指令的次数
   'cmd-peek-buried' => '0', // 总共执行peek-buried指令的次数
   'cmd-reserve' => '0', // 总共执行reserve指令的次数
   'cmd-reserve-with-timeout' => '0',
   'cmd-delete' => '0',
   'cmd-release' => '0',
   'cmd-use' => '0', // 总共执行use指令的次数
   'cmd-watch' => '0', // 总共执行watch指令的次数
   'cmd-ignore' => '0',
   'cmd-bury' => '0',
   'cmd-kick' => '0',
   'cmd-touch' => '0',
   'cmd-stats' => '2',
   'cmd-stats-job' => '0',
   'cmd-stats-tube' => '0',
   'cmd-list-tubes' => '0',
   'cmd-list-tube-used' => '0',
   'cmd-list-tubes-watched' => '0',
   'cmd-pause-tube' => '0',
   'job-timeouts' => '0', // 所有超时的job的总共数量
   'total-jobs' => '0', // 创建的所有job数量
   'max-job-size' => '65535', // job的数据部分最大长度
   'current-tubes' => '1', // 当前存在的tube数量
   'current-connections' => '1', // 当前打开的连接数
   'current-producers' => '0', // 当前所有的打开的连接中至少执行一次put指令的连接数量
   'current-workers' => '0', // 当前所有的打开的连接中至少执行一次reserve指令的连接数量
   'current-waiting' => '0', // 当前所有的打开的连接中执行reserve指令但是未响应的连接数量
   'total-connections' => '2', // 总共处理的连接数
   'pid' => '3609', // 服务器进程的id
   'version' => '1.10', // 服务器版本号
   'rusage-utime' => '0.000000', // 进程总共占用的用户CPU时间
   'rusage-stime' => '0.001478', // 进程总共占用的系统CPU时间
   'uptime' => '12031', // 服务器进程运行的秒数
   'binlog-oldest-index' => '2', // 开始储存jobs的binlog索引号
   'binlog-current-index' => '2', // 当前储存jobs的binlog索引号
   'binlog-records-migrated' => '0',
   'binlog-records-written' => '0', // 累积写入的记录数
   'binlog-max-size' => '10485760', // binlog的最大容量
   'id' => '37604ac4305d3b16', // 一个随机字符串,在beanstalkd进程启动时产生
   'hostname' => 'localhost.localdomain',

单个 job 任务的统计信息

var_export($pheanstalk->statsJob($job_4));

执行结果如下:

   'id' => '1', // job id
   'tube' => 'test', // job 所在的管道
   'state' => 'reserved', // job 当前的状态
   'pri' => '1024', // job 的优先级
   'age' => '5222', // 自 job 创建时间为止 单位:秒
   'delay' => '0',
   'ttr' => '60', // time to run
   'time-left' => '58', // 仅在job状态为reserved或者delayed时有意义,当job状态为reserved时表示剩余的超时时间
   'file' => '2', // 表示包含此job的binlog序号,如果没有开启它将为0
   'reserves' => '10', // 表示job被reserved的次数
   'timeouts' => '0', // 表示job处理的超时时间
   'releases' => '1', // 表示job被released的次数
   'buries' => '0', // 表示job被buried的次数
   'kicks' => '0', // 表示job被kiced的次数

管道相关的命令

// 查看有多少个tube
//var_export($pheanstalk->listTubes());

// 在 put 之前预申明要使用的管道,如果管道不存在,即创建
//$pheanstalk->useTube('test');

//设置要监听的tube
$pheanstalk->watch('test');

//取消对默认tube的监听,可以省略
$pheanstalk->ignore('default');

//查看监听的tube列表
var_export($pheanstalk->listTubesWatched());

//查看test的tube当前的状态
var_export($pheanstalk->statsTube('test'));

生产者调用的方法

// put 任务 方式一; 返回新 job 的任务标识,整型值;
$pheanstalk->useTube('test')->put(
    'hello, beanstalk, i am job 1', // 任务内容
    23, // 任务的优先级, 默认为 1024
    0, // 不等待直接放到ready队列中.
    60 // 处理任务的时间(单位为秒)
);

// put 任务 方式二; 返回新 job 的任务标识,整型值;
$pheanstalk->putInTube(
    'test', // 管道名称
    'hello, beanstalk, i am job 2', // 任务内容
    23, // 任务的优先级, 默认为 1024
    0, // 不等待直接放到ready队列中. 如值为 60 表示 60秒;
    60 // 处理任务的时间(单位为秒)
);

// 给管道里所有新任务设置延迟
$pheanstalk->pauseTube('test', 30);

// 取消管道延迟
$pheanstalk->resumeTube('test');

此处介绍几个概念:

  • 任务优先级
    任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn).
  • ttr(time to run, 预设的执行时间)
    消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release / bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,状态改为 ready,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 重置该任务的 time-left 剩余执行时间.

消费者方法

正常的获取和执行 Job 流程

// 获取 test 管道的 job
$job = $pheanstalk->watch('test')->ignore('default')->reserve();
$job_2 = $pheanstalk->reserveFromTube('test');
$job_3 = $pheanstalk->peekReady('test');

// 如果知道 job 的 id, 也可以
$job_4 = $pheanstalk->peek($id);
// var_export($pheanstalk->statsJob($job_4));

// 获取下一个延迟时间最短 的 job
$job_5 = $pheanstalk->peekDelayed('test');

// do job .... 这里省略异常的考虑

// 释放任务 让别人执行
$pheanstalk->release($job);
// 或成功执行完,则删除任务
//$pheanstalk->delete($job);
// 将任务埋起来,预留
//$pheanstalk->bury($job);

处理 buried 状态的 Job

// 获取下一个被埋藏的 job
$job = $pheanstalk->peekBuried('test');
// 将任务状态从 buried 改为 ready
//$pheanstalk->kickJob($job);
// 批量将指定数目的任务从 buried 改为 ready
$pheanstalk->kick(10);

总结

如果是有优先级/延时任务的需求的话, beanstalkd 是个不错选择。如果作为常规的先进先出队列来说,以性能和稳定来说 kafka/redis 会是更好的选择,redis 本身也是全内存,队列操作 O(1), 而 benastalkdlog(n)。redis 也更加成熟和稳定,同时支持本地持久化和主从。

另外有一个加分项是 beanstalkd 作者本身比较活跃,之前提了一个 pr`, 当天就得到回馈,这也是作为开源项目选择一个很重要的因素。

附录

  • beanstalkd 协议

相关文章:

  • Oracle Apps AutoConfig
  • 用以太坊开发框架Truffle开发智能合约实践攻略(代码详解)
  • javascript解汉诺塔问题
  • RIP 漏打 no auto-summary 造成环路故障
  • oracle Like模糊查询与带有关键字Reverse的索引应用
  • JQuery EasyUI 动态隐藏
  • 安装OpenStack Queens版本的教程推荐
  • [leetcode]Flatten Binary Tree to Linked List
  • 深度学习中常见问题
  • 华为云:如何解除数据库高并发场景下的达摩克利斯之剑?
  • Tomcat的参数配置及一般问题的解决
  • node.js来爬取智联全国的竞争最激烈的前十岗位
  • css颜色代码大全:(网页设计师和平面设计师常用)
  • python3.6+scrapy+mysql 爬虫实战
  • 数据库分库分表思路
  • 【347天】每日项目总结系列085(2018.01.18)
  • Angular 响应式表单之下拉框
  • Apache Pulsar 2.1 重磅发布
  • gulp 教程
  • input的行数自动增减
  • MySQL几个简单SQL的优化
  • Node + FFmpeg 实现Canvas动画导出视频
  • python 学习笔记 - Queue Pipes,进程间通讯
  • Python3爬取英雄联盟英雄皮肤大图
  • React-生命周期杂记
  • SpiderData 2019年2月16日 DApp数据排行榜
  • Theano - 导数
  • Webpack 4x 之路 ( 四 )
  • Yii源码解读-服务定位器(Service Locator)
  • 从零到一:用Phaser.js写意地开发小游戏(Chapter 3 - 加载游戏资源)
  • 基于Vue2全家桶的移动端AppDEMO实现
  • 记一次删除Git记录中的大文件的过程
  • 漂亮刷新控件-iOS
  • 想使用 MongoDB ,你应该了解这8个方面!
  • 协程
  • 中文输入法与React文本输入框的问题与解决方案
  • scrapy中间件源码分析及常用中间件大全
  • #pragma once
  • #考研#计算机文化知识1(局域网及网络互联)
  • (04)Hive的相关概念——order by 、sort by、distribute by 、cluster by
  • (04)odoo视图操作
  • (32位汇编 五)mov/add/sub/and/or/xor/not
  • (Redis使用系列) Springboot 使用redis实现接口幂等性拦截 十一
  • (SpringBoot)第二章:Spring创建和使用
  • (待修改)PyG安装步骤
  • (附源码)spring boot建达集团公司平台 毕业设计 141538
  • (简单有案例)前端实现主题切换、动态换肤的两种简单方式
  • (九)One-Wire总线-DS18B20
  • (每日持续更新)jdk api之FileFilter基础、应用、实战
  • (亲测成功)在centos7.5上安装kvm,通过VNC远程连接并创建多台ubuntu虚拟机(ubuntu server版本)...
  • (未解决)jmeter报错之“请在微信客户端打开链接”
  • (轉貼) 2008 Altera 亞洲創新大賽 台灣學生成果傲視全球 [照片花絮] (SOC) (News)
  • .[hudsonL@cock.li].mkp勒索加密数据库完美恢复---惜分飞
  • .NET 2.0中新增的一些TryGet,TryParse等方法
  • .net core 6 redis操作类