当前位置: 首页 > news >正文

Kafka使用案例

1、Kafka 生产者(Producer)示例

#include <iostream>
#include <string>
#include <librdkafka/rdkafkacpp.h>class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:void dr_cb (RdKafka::Message &message) {std::cout << "Message delivery for (" << message.len() << " bytes): " <<message.errstr() << std::endl;}
};int main() {std::string brokers = "localhost:9092"; // Kafka brokersstd::string errstr;RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);conf->set("bootstrap.servers", brokers, errstr);ExampleDeliveryReportCb ex_dr_cb;conf->set("dr_cb", &ex_dr_cb, errstr);RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "Failed to create producer: " << errstr << std::endl;delete tconf;delete conf;return 1;}std::string topic_str = "test_topic";RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,tconf, errstr);if (!topic) {std::cerr << "Failed to create topic: " << errstr << std::endl;delete producer;delete tconf;delete conf;return 1;}std::string key;std::string payload = "Hello, Kafka!";RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA,RdKafka::Producer::RK_MSG_COPY /* Copy payload */,const_cast<char *>(payload.c_str()), payload.size(),&key, NULL);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Failed to produce message: " <<RdKafka::err2str(resp) << std::endl;} else {std::cout << "Produced message (" << payload.size() << " bytes)" <<std::endl;producer->poll(0); // Non-blocking poll}producer->flush(10000); // Wait for up to 10 seconds to flush messagesdelete topic;delete producer;delete tconf;delete conf;return 0;
}

2、 Kafka 消费者(Consumer)示例

#include <iostream>
#include <string>
#include <librdkafka/rdkafkacpp.h>class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:void consume_cb (RdKafka::Message &message, void *opaque) {if (message.err() == RdKafka::ERR_NO_ERROR) {std::cout << "Message received (" << message.len() << " bytes)" << std::endl;if (message.key()) {std::cout << "Key: " << *message.key() << std::endl;}std::cout << "Payload: " << std::string(static_cast<char *>(message.payload()), message.len()) << std::endl;} else {std::cerr << "Error while consuming message: " << message.errstr() << std::endl;}}
};int main() {std::string brokers = "localhost:9092"; // Kafka brokersstd::string errstr;RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);conf->set("bootstrap.servers", brokers, errstr);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);if (!consumer) {std::cerr << "Failed to create consumer: " << errstr << std::endl;delete tconf;delete conf;return 1;}std::string topic_str = "test_topic";RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str,tconf, errstr);if (!topic) {std::cerr << "Failed to create topic: " << errstr << std::endl;delete consumer;delete tconf;delete conf;return 1;}ExampleConsumeCb ex_consume_cb;RdKafka::ErrorCode resp = consumer->start(topic, 0, RdKafka::Topic::OFFSET_BEGINNING);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Failed to start consumer: " <<RdKafka::err2str(resp) << std::endl;delete topic;delete consumer;delete tconf;delete conf;return 1;}while (true) {RdKafka::Message *msg = consumer->consume(topic, 0, 1000);ex_consume_cb.consume_cb(*msg, NULL);delete msg;}consumer->stop(topic, 0);consumer->poll(1000); // Final cleanupdelete topic;delete consumer;delete tconf;delete conf;return 0;
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 测量温湿度通过蓝牙和串口发送数据显示在LCD1602屏上
  • 紫辉创投开启Destiny of Gods首轮投资,伯乐与千里马的故事仍在继续
  • 2024杭电多校01——1003树
  • SpringBoot Mysql->达梦8 activiti6.0.0 项目迁移
  • JLink烧录失败
  • 免费发送邮件两种接口方式:SMTP和邮件API
  • “链动+消费增值:用户留存复购新引擎“
  • CSS3 scale 适配
  • zeppline 连接flink 1.17报错
  • WordPress 后台开发技巧:向文章发布页右侧添加自定义菜单项
  • react中的useState和Hook、副作用
  • 小白也能轻松学的计算机网络零基础入门(附学习路线 + 计算机网络教程)
  • CSS实现图片边框酷炫效果
  • PHP时间相关函数
  • 过滤和筛选树形结构数据
  • 深入了解以太坊
  • 0x05 Python数据分析,Anaconda八斩刀
  • Git同步原始仓库到Fork仓库中
  • IP路由与转发
  • Markdown 语法简单说明
  • maven工程打包jar以及java jar命令的classpath使用
  • Netty 框架总结「ChannelHandler 及 EventLoop」
  • SegmentFault 技术周刊 Vol.27 - Git 学习宝典:程序员走江湖必备
  • tab.js分享及浏览器兼容性问题汇总
  • use Google search engine
  • 老板让我十分钟上手nx-admin
  • 前端存储 - localStorage
  • 使用Envoy 作Sidecar Proxy的微服务模式-4.Prometheus的指标收集
  • postgresql行列转换函数
  • UI设计初学者应该如何入门?
  • # 移动硬盘误操作制作为启动盘数据恢复问题
  • (10)工业界推荐系统-小红书推荐场景及内部实践【排序模型的特征】
  • (1综述)从零开始的嵌入式图像图像处理(PI+QT+OpenCV)实战演练
  • (70min)字节暑假实习二面(已挂)
  • (LeetCode 49)Anagrams
  • (M)unity2D敌人的创建、人物属性设置,遇敌掉血
  • (第8天)保姆级 PL/SQL Developer 安装与配置
  • (二)斐波那契Fabonacci函数
  • (每日持续更新)jdk api之FileFilter基础、应用、实战
  • (免费领源码)python#django#mysql公交线路查询系统85021- 计算机毕业设计项目选题推荐
  • (十三)Maven插件解析运行机制
  • (原創) 如何優化ThinkPad X61開機速度? (NB) (ThinkPad) (X61) (OS) (Windows)
  • (最全解法)输入一个整数,输出该数二进制表示中1的个数。
  • .htaccess配置重写url引擎
  • .Net Core中Quartz的使用方法
  • .net 打包工具_pyinstaller打包的exe太大?你需要站在巨人的肩膀上-VC++才是王道
  • .Net 垃圾回收机制原理(二)
  • .NET6 命令行启动及发布单个Exe文件
  • .Net高阶异常处理第二篇~~ dump进阶之MiniDumpWriter
  • .net网站发布-允许更新此预编译站点
  • [ C++ ] STL---string类的使用指南
  • [ CTF ] WriteUp- 2022年第三届“网鼎杯”网络安全大赛(朱雀组)
  • [ 攻防演练演示篇 ] 利用通达OA 文件上传漏洞上传webshell获取主机权限
  • [ 英语 ] 马斯克抱水槽“入主”推特总部中那句 Let that sink in 到底是什么梗?
  • [145] 二叉树的后序遍历 js