当前位置: 首页 > news >正文

初试Kafka

Kafka 是一个分布式流处理平台,通常用作消息中间件,它可以处理大规模的实时数据流。以下是从零开始使用 Kafka 作为消息中间件的基本教程:

步骤 1: 下载和安装 Kafka

  1. 访问 Apache Kafka 官方网站:Apache Kafka
  2. 下载最新的 Kafka 发行版,并解压缩到本地文件夹。

步骤 2: 启动 ZooKeeper

Kafka 使用 ZooKeeper 来协调分布式节点。在 Kafka 解压缩后的文件夹中,进入 bin 目录,执行以下命令启动 ZooKeeper:

./zookeeper-server-start.sh ../config/zookeeper.properties

步骤 3: 启动 Kafka 服务

继续在 bin 目录中执行以下命令启动 Kafka 服务:

./kafka-server-start.sh ../config/server.properties

步骤 4: 创建一个主题(Topic)

Kafka 使用主题来组织和分类消息。执行以下命令创建一个主题:

./kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

这将创建一个名为 my_topic 的主题,具有一个分区和一个副本。

步骤 5: 发送消息到主题

使用 Kafka 提供的生产者工具向主题发送消息:

./kafka-console-producer.sh --topic my_topic --bootstrap-server localhost:9092

然后,您可以在控制台中输入消息并按 Enter 发送。

步骤 6: 消费消息

使用 Kafka 提供的消费者工具从主题中消费消息:

./kafka-console-consumer.sh --topic my_topic --bootstrap-server localhost:9092 --from-beginning

这将显示从主题中接收到的消息。

步骤 7: 使用编程语言连接 Kafka

除了命令行工具外,您还可以使用编程语言连接 Kafka。根据您选择的语言,可以使用 Kafka 提供的客户端库。

使用 Java 示例
// 生产者示例
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(properties);ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key", "Hello, Kafka!");producer.send(record);producer.close();}
}// 消费者示例
import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "my_group");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList("my_topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());}}}
}

这是一个简单的 Java 示例,演示了如何使用 Kafka 的生产者和消费者 API。

希望这个简单的教程能帮助您入门 Kafka。请注意,这只是一个基础,Kafka 还有许多高级功能和配置,具体取决于您的使用场景和需求。

相关文章:

  • 【网络奇缘】——奈氏准则和香农定理从理论到实践一站式服务|计算机网络
  • springboot 查询
  • css mask 案例
  • 13章总结
  • python哈希算法实现
  • 智慧工地项目端监管端一体化SaaS云平台源码(微服务架构)
  • uni-app 命令行创建
  • 《软件需求分析报告》
  • [RISCV] 为android14添加一个新的riscv device
  • C语言中switch语句中的case后()
  • React TSX 从一组二维数据中,重新挑选数组组成新数组示例:
  • 实现 Spring Boot 项目热重载,无需重启,省时省力
  • DDOS攻击简介——什么是DDOS
  • PyQt5实现学生管理系统第三天(下)
  • 无法获取前置摄像头的预览图像?【Bug已解决-鸿蒙开发】
  • 2018一半小结一波
  • C++回声服务器_9-epoll边缘触发模式版本服务器
  • ES6 ...操作符
  • Flex布局到底解决了什么问题
  • git 常用命令
  • If…else
  • Invalidate和postInvalidate的区别
  • Netty+SpringBoot+FastDFS+Html5实现聊天App(六)
  • Protobuf3语言指南
  • Python利用正则抓取网页内容保存到本地
  • scala基础语法(二)
  • WebSocket使用
  • 初识MongoDB分片
  • 基于OpenResty的Lua Web框架lor0.0.2预览版发布
  • 简单数学运算程序(不定期更新)
  • 聊聊flink的TableFactory
  • 码农张的Bug人生 - 初来乍到
  • 因为阿里,他们成了“杭漂”
  • Java数据解析之JSON
  • ​Linux·i2c驱动架构​
  • ​第20课 在Android Native开发中加入新的C++类
  • # centos7下FFmpeg环境部署记录
  • (20)目标检测算法之YOLOv5计算预选框、详解anchor计算
  • (C语言)球球大作战
  • (WSI分类)WSI分类文献小综述 2024
  • (十一)JAVA springboot ssm b2b2c多用户商城系统源码:服务网关Zuul高级篇
  • (学习日记)2024.03.12:UCOSIII第十四节:时基列表
  • .gitignore文件_Git:.gitignore
  • .NET 8 中引入新的 IHostedLifecycleService 接口 实现定时任务
  • .NET 中小心嵌套等待的 Task,它可能会耗尽你线程池的现有资源,出现类似死锁的情况
  • .NET轻量级ORM组件Dapper葵花宝典
  • @SuppressLint(NewApi)和@TargetApi()的区别
  • @SuppressWarnings注解
  • [@Controller]4 详解@ModelAttribute
  • [20171101]rman to destination.txt
  • [2021 蓝帽杯] One Pointer PHP
  • [AIGC] 开源流程引擎哪个好,如何选型?
  • [BZOJ 1032][JSOI2007]祖码Zuma(区间Dp)
  • [C++][数据结构][算法]单链式结构的深拷贝
  • [C++]C++基础知识概述