当前位置: 首页 > news >正文

大数据-55 Kafka sh脚本使用 与 JavaAPI使用 topics.sh producer.sh consumer.sh kafka-clients

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka (正在更新…)

章节内容

上节我们完成了:

  • Kafka介绍
  • ZK的基本环境
  • Kafka下载解压配置
  • Kafka启动配置
  • Kafka启动服务

在这里插入图片描述

Kafka启动

上节我们通过sh脚本启动,但是当我们的SSH关闭的时候,Kafka服务也退出。
这里我们可以使用 Kakfa 的守护进程的方式启动,就可以在后台运行了。

kafka-server-start.sh -daemon /opt/servers/kafka_2.12-2.7.2/config/server.properties

启动之后,我们可以通过 ps 工具看到:

ps aux | grep kafka

返回结果如下图:
在这里插入图片描述

sh脚本使用

topics.sh

kakfa-topics.sh 用于管理主题

查看所有

kafka-topics.sh --list --zookeeper h121.wzk.icu:2181

当前执行返回的是空的,因为我们没有任何主题。

创建主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_1 --partitions 1 --replication-factor 1

执行结果中,我们可以观察到,已经顺利的完成了。
在这里插入图片描述

查看主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --describe --topic wzk_topic_1

执行结果中,我们可以观察到,已经顺利的完成了。
在这里插入图片描述

删除主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --delete --topic wzk_topic_1

在这里插入图片描述

新建主题(用于测试)

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_test --partitions 1 --replication-factor 1

producer.sh

kafka-console-producer.sh 用于生产消息

生成数据

kafka-console-producer.sh --topic wzk_topic_test --broker-list h121.wzk.icu:9092

手动生成一批数据来进行测试:
在这里插入图片描述

consumer.sh

kafka-console-consumer.sh 用于消费消息

消费数据

kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test

此时,我们需要再开启一个 Producer 产生数据,它才会继续消费。

从头消费

kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test --from-beginning

从头开始消费的话,我们可以看到消费者已经把刚才我们写入的数据都消费了
在这里插入图片描述

Java API

架构图

在这里插入图片描述

POM

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.2</version>
</dependency>

生产者1测试


public class TestProducer01 {public static void main(String[] args) throws Exception {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "h121.wzk.icu:9092");configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test",0, 0,"hello world by java!");Future<RecordMetadata> future = producer.send(record);future.get(3_000, TimeUnit.SECONDS);producer.close();}}

生产者1运行

  2024-07-12 11:53:11,542 INFO [org.apache.kafka.clients.producer.ProducerConfig] - ProducerConfig values: acks = 1batch.size = 16384bootstrap.servers = [h121.wzk.icu:9092]buffer.memory = 33554432client.dns.lookup = use_all_dns_ipsclient.id = producer-1compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = falseinterceptor.classes = []internal.auto.downgrade.txn.commit = falsekey.serializer = class org.apache.kafka.common.serialization.IntegerSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576

运行结果如下图:
在这里插入图片描述

生产者2测试

public class TestProducer02 {public static void main(String[] args) throws Exception {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "h121.wzk.icu:9092");configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test",0, 0,"hello world by java! CallBack test!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {System.out.println("主题: " + recordMetadata.topic() + ", " +"分区: " + recordMetadata.partition() + ", " +"时间戳: " + recordMetadata.timestamp());} else {System.out.println("生产消息异常!!!");}}});producer.close();}}

运行之后,控制台输出:

2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka version: 2.7.2
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka commitId: 37a1cc36bf4d76f3
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka startTimeMs: 1720759608792
2024-07-12 12:46:49,200 INFO [org.apache.kafka.clients.Metadata] - [Producer clientId=producer-1] Cluster ID: DGjwPmfLSk2OKosFFLZJpg
2024-07-12 12:46:49,209 INFO [org.apache.kafka.clients.producer.KafkaProducer] - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
主题: wzk_topic_test, 分区: 0, 时间戳: 1720759609201
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Metrics scheduler closed
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Metrics reporters closed
2024-07-12 12:46:49,283 INFO [org.apache.kafka.common.utils.AppInfoParser] - App info kafka.producer for producer-1 unregistered

运行的之后的控制台如下:
在这里插入图片描述

消费者01运行


public class TestConsumer01 {public static void main(String[] args) throws Exception {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "h121.wzk.icu:9092");configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("group.id", "wzk-test");KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);final List<String> topics = Arrays.asList("wzk_topic_test");consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> collection) {collection.forEach(item -> {System.out.println("剥夺的分区: " + item.partition());});}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> collection) {collection.forEach(item -> {System.out.println("接收的分区: " + item.partition());});}});final ConsumerRecords<Integer, String> records = consumer.poll(3_000);final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");topic1Iterable.forEach(record -> {System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));System.out.println("消息的key:" + record.key());System.out.println("消息的偏移量:" + record.offset());System.out.println("消息的分区号:" + record.partition());System.out.println("消息的序列化key字节数:" + record.serializedKeySize());System.out.println("消息的序列化value字节数:" + record.serializedValueSize());System.out.println("消息的时间戳:" + record.timestamp());System.out.println("消息的时间戳类型:" + record.timestampType());System.out.println("消息的主题:" + record.topic());System.out.println("消息的值:" + record.value());});consumer.close();}}

消费者01测试

2024-07-12 13:00:17,456 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] - [Consumer clientId=consumer-wzk-test-1, groupId=wzk-test] Adding newly assigned partitions: wzk_topic_test-0
接收的分区: 0
2024-07-12 13:00:17,480 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] - [Consumer clientId=consumer-wzk-test-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[h121.wzk.icu:9092 (id: 0 rack: null)], epoch=0}}
消息头字段:[]
消息的key:0
消息的偏移量:12
消息的分区号:0
消息的序列化key字节数:4
消息的序列化value字节数:20
消息的时间戳:1720760404260
消息的时间戳类型:CreateTime
消息的主题:wzk_topic_test
消息的值:hello world by java!

控制台运行截图如下:
在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 昇思25天学习打卡营第XX天|基于MindSpore的红酒分类实验
  • flink standakone提交任务参数
  • 现在的Java面试都这么扯淡了吗?
  • 2024下《网络工程师》案例简答题,刷这些就够了!
  • linux timestamp
  • 【吊打面试官系列-Dubbo面试题】Dubbo SPI 和 Java SPI 区别?
  • Hive——UDF函数:高德地图API逆地理编码,实现离线解析经纬度转换省市区(离线地址库,非调用高德API)
  • http协议深度解析——网络时代的安全与效率(1)
  • React-Native 宝藏库大揭秘:精选开源项目与实战代码解析
  • SpinalHDL之仿真(一)
  • Java学习Day19:基础篇9
  • 数学基础 -- 隐函数解题思路之微分运算满足线性性
  • Linux初学基本命令
  • Java面试——Tomcat
  • 兴业严选|朝阳优质好房合集 低至6.3折起~
  • 时间复杂度分析经典问题——最大子序列和
  • [译]如何构建服务器端web组件,为何要构建?
  • 【css3】浏览器内核及其兼容性
  • ES6 ...操作符
  • java架构面试锦集:开源框架+并发+数据结构+大企必备面试题
  • macOS 中 shell 创建文件夹及文件并 VS Code 打开
  • Magento 1.x 中文订单打印乱码
  • mysql 数据库四种事务隔离级别
  • php面试题 汇集2
  • rc-form之最单纯情况
  • Stream流与Lambda表达式(三) 静态工厂类Collectors
  • vuex 学习笔记 01
  • zookeeper系列(七)实战分布式命名服务
  • 产品三维模型在线预览
  • 好的网址,关于.net 4.0 ,vs 2010
  • 回顾2016
  • 机器人定位导航技术 激光SLAM与视觉SLAM谁更胜一筹?
  • 如何编写一个可升级的智能合约
  • 使用 5W1H 写出高可读的 Git Commit Message
  • -- 数据结构 顺序表 --Java
  • 它承受着该等级不该有的简单, leetcode 564 寻找最近的回文数
  • 微信小程序上拉加载:onReachBottom详解+设置触发距离
  • 想晋级高级工程师只知道表面是不够的!Git内部原理介绍
  • 异常机制详解
  • 看到一个关于网页设计的文章分享过来!大家看看!
  • 【云吞铺子】性能抖动剖析(二)
  • mysql面试题分组并合并列
  • 阿里云重庆大学大数据训练营落地分享
  • 长三角G60科创走廊智能驾驶产业联盟揭牌成立,近80家企业助力智能驾驶行业发展 ...
  • 蚂蚁金服CTO程立:真正的技术革命才刚刚开始
  • 通过调用文摘列表API获取文摘
  • ​二进制运算符:(与运算)、|(或运算)、~(取反运算)、^(异或运算)、位移运算符​
  • !!【OpenCV学习】计算两幅图像的重叠区域
  • # centos7下FFmpeg环境部署记录
  • #NOIP 2014# day.1 生活大爆炸版 石头剪刀布
  • (003)SlickEdit Unity的补全
  • (13)Hive调优——动态分区导致的小文件问题
  • (51单片机)第五章-A/D和D/A工作原理-A/D
  • (arch)linux 转换文件编码格式
  • (C++)栈的链式存储结构(出栈、入栈、判空、遍历、销毁)(数据结构与算法)