当前位置: 首页 > news >正文

httpslb.php_Kafka生产者的客户端(PHP)开发

一、准备工作

虽然 Kafka 是用 Java/Scala 语言编写的,但这不妨碍它对多语言的支持。可以在 Kafka 官网的 CLIENTS 查看 Kafka 支持的语言,其中包括 C/C++、PHP、Python、Go 等语言。

PHP 操作 Kafka 需要安装 librdkafka 库和 kafka 的 PHP 扩展。

1.安装 librdkafka 库

git clone https://github.com/edenhill/librdkafka.git

./configure

make

sudo make install复制代码

2.安装 php-kafka 扩展

$ git clone https://github.com/arnaud-lb/php-rdkafka.git

$ cd librdkafka/

$ phpize

$ ./configure

$ make

$ sudo make install

#在php.ini 文件中配置 rdkafka扩展

extension=rdkafka.so

#查看扩展是否生效

php -m | grep kafka复制代码

二、代码实现

正常的生产逻辑如下:

1.配置生产者客户端参数及创建相应的生产者实例;

/**

* Create a producer

*/

$conf = new RdKafka\Conf();

$conf->set('log_level', LOG_DEBUG);

//$conf->set('debug', 'all');

$rk = new RdKafka\Producer($conf);

$rk->addBrokers("127.0.0.1");复制代码

2.构建主题;

/**

* Create a topic instance from the producer

*/

$topic = $rk->newTopic("test");复制代码

3.发送消息;

/**

* Producing messages

* The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.

* 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。

* The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.

* 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。

* The message payload can be anything.

* 消息可以是任何内容。

*/

$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");复制代码

4.关闭生产者实例。

/**

* Proper shutdown

* This should be done prior to destroying a producer instance

* to make sure all queued and in-flight produce requests are completed before terminating.

* 关闭生产者实例前需确保所有在队列中和正在生产的生产请求都已完成。

* Not calling flush can lead to message loss!

* 不调用flush会导致消息丢失!

*/

$timeout_ms = 60000; // 1 minute

$rk->flush($timeout_ms);复制代码

检验消息是否发送成功

终端开启一个消费者:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test复制代码

在另一个窗口执行 php producer.php,

可看到消费者终端接收到消息。

完整代码如下:

/**

* Created by PhpStorm.

* User: liulu

* Date: 2020/1/1

* Time: 18:38

*/

/**

* Create a producer

*/

$conf = new RdKafka\Conf();

$conf->set('log_level', LOG_DEBUG);

//$conf->set('debug', 'all');

$rk = new RdKafka\Producer($conf);

$rk->addBrokers("127.0.0.1");

/**

* Create a topic instance from the producer

*/

$topic = $rk->newTopic("test");

/**

* Producing messages

* The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.

* 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。

* The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.

* 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。

* The message payload can be anything.

* 消息可以是任何内容。

*/

$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

/**

* Proper shutdown

* This should be done prior to destroying a producer instance

* to make sure all queued and in-flight produce requests are completed before terminating.

* 关闭生产者实例前需确保所有在队列中和正在生产的生产请求都已完成。

* Not calling flush can lead to message loss!

* 不调用flush会导致消息丢失!

*/

$timeout_ms = 60000; // 1 minute

$rk->flush($timeout_ms);

echo 'finished';exit;复制代码

以上内容希望帮助到大家,很多PHPer在进阶的时候总会遇到一些问题和瓶颈,业务代码写多了没有方向感,不知道该从那里入手去提升,对此我整理了一些资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、服务器性能调优、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql优化、shell脚本、Docker、微服务、Nginx等多个知识点高级进阶干货需要的可以免费分享给大家

相关文章:

  • php 实现二叉树的最大深度_PHP实现二叉树的深度优先遍历(前序、中序、后序)和广度优先遍历(层次)...
  • sap客户信贷_企业客户信用控制管理机制实践(2/2)
  • 晶闸管有几个pn结_【半导体物理】PN结
  • 模板文字修改_ppt模板适用于室内装修设计的图片展示国片混排等演示PPT模版
  • 机器学习预测时间序列 特征值怎么确定_AWS发布时间序列预测云服务,无机器学习基础也能上手...
  • mybatis 二级缓存失效_Mybatis 一二级缓存实现原理与使用指南
  • pycharm导入mysql_Pycharm创建Django项目讲解 python django
  • fanuc机器人控制柜接线_FANUC机器人系统镜像还原步骤
  • zabbix监控磁盘_zabbix监控cpu、内存、磁盘使用情况
  • 一直跳动的按钮插件_职场表格插件 Power Click功能介绍03:工作便签
  • 手机屏幕常见故障_华强北二手苹果手机面容损坏可修复原理(重磅,大家务必小心,莫贪小便宜)...
  • springcloud 创建子父项目_SpringCloud(四)- 父子项目
  • redis常用命令getex_详解Redis基本命令
  • 需要显卡还是cpu_组装电脑装机预算不足的情况下,选择高U低显还是高显低U?...
  • 云计算体系结构中soa构建层_云计算体系结构
  • JS 中的深拷贝与浅拷贝
  • 【140天】尚学堂高淇Java300集视频精华笔记(86-87)
  • Android开发 - 掌握ConstraintLayout(四)创建基本约束
  • Angular2开发踩坑系列-生产环境编译
  • Docker 笔记(1):介绍、镜像、容器及其基本操作
  • Docker 笔记(2):Dockerfile
  • Docker入门(二) - Dockerfile
  • ES6, React, Redux, Webpack写的一个爬 GitHub 的网页
  • JavaScript函数式编程(一)
  • Python_网络编程
  • SwizzleMethod 黑魔法
  • vue从入门到进阶:计算属性computed与侦听器watch(三)
  • 表单中readonly的input等标签,禁止光标进入(focus)的几种方式
  • 讲清楚之javascript作用域
  • 力扣(LeetCode)357
  • 两列自适应布局方案整理
  • 罗辑思维在全链路压测方面的实践和工作笔记
  • 腾讯视频格式如何转换成mp4 将下载的qlv文件转换成mp4的方法
  • 通过几道题目学习二叉搜索树
  • 在Docker Swarm上部署Apache Storm:第1部分
  • 做一名精致的JavaScripter 01:JavaScript简介
  • 阿里云ACE认证之理解CDN技术
  • 好程序员web前端教程分享CSS不同元素margin的计算 ...
  • #《AI中文版》V3 第 1 章 概述
  • #1015 : KMP算法
  • #LLM入门|Prompt#1.8_聊天机器人_Chatbot
  • $.proxy和$.extend
  • (13)[Xamarin.Android] 不同分辨率下的图片使用概论
  • (echarts)echarts使用时重新加载数据之前的数据存留在图上的问题
  • (Mirage系列之二)VMware Horizon Mirage的经典用户用例及真实案例分析
  • (SpringBoot)第二章:Spring创建和使用
  • (附源码)ssm高校实验室 毕业设计 800008
  • (算法)Game
  • (算法)N皇后问题
  • (完整代码)R语言中利用SVM-RFE机器学习算法筛选关键因子
  • (万字长文)Spring的核心知识尽揽其中
  • (原创)boost.property_tree解析xml的帮助类以及中文解析问题的解决
  • (转)利用PHP的debug_backtrace函数,实现PHP文件权限管理、动态加载 【反射】...
  • .net core IResultFilter 的 OnResultExecuted和OnResultExecuting的区别
  • .NET Core 版本不支持的问题