当前位置: 首页 > news >正文

Kafka 单机和集群环境部署教程

目录

    • 一、Kafka 单机环境部署
      • 1. 环境准备
      • 2. 安装 Java
      • 3. 安装 ZooKeeper
        • 3.1 下载并解压 ZooKeeper
        • 3.2 配置 ZooKeeper
        • 3.3 启动 ZooKeeper
        • 3.4 验证 ZooKeeper 是否正常运行
      • 4. 安装 Kafka
        • 4.1 下载并解压 Kafka
        • 4.2 配置 Kafka
        • 4.3 创建日志目录
        • 4.4 启动 Kafka Broker
        • 4.5 验证 Kafka 是否正常运行
      • 5. Kafka 单机部署的注意事项
    • 二、Kafka 集群环境部署
      • 1. 环境准备
      • 2. 安装 ZooKeeper 集群
        • 2.1 配置 ZooKeeper 节点 ID
        • 2.2 启动 ZooKeeper 集群
      • 3. 安装 Kafka 集群
        • 3.1 配置 Kafka Broker
        • 3.2 启动 Kafka Broker
      • 4. 验证 Kafka 集群状态
        • 4.1 创建 Topic
        • 4.2 验证 Topic
      • 5. Kafka 集群部署的注意事项
    • 三、Kafka 使用案例:生产者和消费者
      • 1. 使用 Java 实现 Kafka 生产者和消费者
        • 1.1 添加依赖
        • 1.2 编写 Kafka 生产者
        • 1.3 编写 Kafka 消费者
        • 1.4 运行 Java 程序
      • 2. 使用 Python 实现 Kafka 生产者和消费者
        • 2.1 安装 Kafka 库
        • 2.2 编写 Kafka 生产者
        • 2.3 编写 Kafka 消费者
        • 2.4 运行 Python 程序
      • 3. 注意事项
    • 总结
      • 部署过程中的注意事项

下面是 Apache Kafka 单机和集群环境部署的详细教程,包括部署过程中的注意事项以及一个使用案例。Apache Kafka 是一个分布式流处理平台,广泛用于实时数据处理、日志收集、消息队列等场景。


一、Kafka 单机环境部署

1. 环境准备

  • 操作系统:Linux(推荐 Ubuntu 20.04 或 CentOS 7)
  • Java:Kafka 需要 Java 环境,推荐使用 OpenJDK 8 或 11。
  • ZooKeeper:Kafka 依赖 ZooKeeper 进行分布式协调。

2. 安装 Java

在 Ubuntu 中:

sudo apt update
sudo apt install openjdk-11-jdk

在 CentOS 中:

sudo yum install java-11-openjdk

验证 Java 安装:

java -version

3. 安装 ZooKeeper

Kafka 使用 ZooKeeper 进行节点管理和协调,需要先安装并启动 ZooKeeper。

3.1 下载并解压 ZooKeeper
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.2/apache-zookeeper-3.8.2-bin.tar.gz
tar -xzvf apache-zookeeper-3.8.2-bin.tar.gz
mv apache-zookeeper-3.8.2-bin /usr/local/zookeeper
3.2 配置 ZooKeeper
  1. 创建数据目录:

    mkdir -p /var/lib/zookeeper
    
  2. 复制配置文件:

    cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
    
  3. 编辑配置文件 /usr/local/zookeeper/conf/zoo.cfg

    dataDir=/var/lib/zookeeper
    clientPort=2181
    
3.3 启动 ZooKeeper
/usr/local/zookeeper/bin/zkServer.sh start
3.4 验证 ZooKeeper 是否正常运行
/usr/local/zookeeper/bin/zkCli.sh -server localhost:2181

在连接成功后输入 ls /,若返回空列表([]),则说明连接成功。

4. 安装 Kafka

4.1 下载并解压 Kafka

访问 Kafka 官网 下载最新版本的 Kafka。

wget https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
tar -xzvf kafka_2.12-3.5.0.tgz
mv kafka_2.12-3.5.0 /usr/local/kafka
4.2 配置 Kafka

编辑 Kafka 的配置文件 /usr/local/kafka/config/server.properties

# Kafka Broker ID,唯一标识符
broker.id=0# 监听的接口和端口
listeners=PLAINTEXT://:9092# 日志文件存储路径
log.dirs=/var/lib/kafka-logs# Zookeeper 连接地址
zookeeper.connect=localhost:2181
4.3 创建日志目录
mkdir -p /var/lib/kafka-logs
4.4 启动 Kafka Broker
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
4.5 验证 Kafka 是否正常运行

创建一个测试 Topic:

/usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

列出 Topic:

/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

你应该看到 test-topic 在列出的 Topic 中。

5. Kafka 单机部署的注意事项

  • ZooKeeper:确保 ZooKeeper 正常运行,并且 zookeeper.connect 地址配置正确。
  • 内存和存储:为 Kafka 分配足够的内存和存储空间,尤其是在高负载场景下。
  • 日志文件:定期检查和清理 Kafka 日志文件,以防止磁盘占满。
  • 监听地址:如果需要远程访问,确保 listeners 配置了正确的监听地址。
  • 防火墙设置:确保防火墙开放了 Kafka 和 ZooKeeper 使用的端口(默认 9092 和 2181)。

二、Kafka 集群环境部署

Kafka 集群由多个 Kafka Broker 组成,能够提供高可用性和水平扩展。

1. 环境准备

  • 多台服务器:至少 3 台(3 个 Kafka Broker 和 3 个 ZooKeeper 实例)
  • 操作系统:Linux(推荐 Ubuntu 20.04 或 CentOS 7)
  • Java:在所有节点上安装 Java

2. 安装 ZooKeeper 集群

在每台服务器上按照单机部署的步骤安装 ZooKeeper,并进行以下配置:

2.1 配置 ZooKeeper 节点 ID

编辑每个节点的 zoo.cfg 文件,添加如下配置:

server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888
server.3=zookeeper3:2888:3888

在每台服务器上创建 myid 文件,用于标识节点:

echo "1" > /var/lib/zookeeper/myid  # 在 zookeeper1 上
echo "2" > /var/lib/zookeeper/myid  # 在 zookeeper2 上
echo "3" > /var/lib/zookeeper/myid  # 在 zookeeper3 上
2.2 启动 ZooKeeper 集群

在每台服务器上启动 ZooKeeper:

/usr/local/zookeeper/bin/zkServer.sh start

3. 安装 Kafka 集群

在每台服务器上按照单机部署的步骤安装 Kafka,并进行以下配置:

3.1 配置 Kafka Broker

编辑每个节点的 server.properties 文件,添加如下配置:

broker.id=0  # 每个 Broker 唯一 ID
listeners=PLAINTEXT://:9092
log.dirs=/var/lib/kafka-logs
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
3.2 启动 Kafka Broker

在每台服务器上启动 Kafka Broker:

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

4. 验证 Kafka 集群状态

4.1 创建 Topic

在任一 Kafka Broker 上执行以下命令:

/usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --partitions 3 --replication-factor 3
4.2 验证 Topic

列出集群中的 Topic:

/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka1:9092

查看 Topic 详细信息:

/usr/local/kafka/bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server kafka1:9092

5. Kafka 集群部署的注意事项

  • ZooKeeper 集群:确保每个节点配置了正确的 myid,并且所有节点可以互相通信。
  • Kafka Broker 配置:每个 Broker 必须有唯一的 broker.id
  • 分区和副本:根据实际需求配置合适的分区数和副本数,以提高数据可靠性和吞吐量。
  • 监控和报警:使用 Kafka Manager 或其他监控工具监控集群状态,及时处理故障。
  • 网络配置:确保各节点之间的网络连接正常,并且防火墙开放了必要端口。
  • 资源规划:为 Kafka 和 ZooKeeper 分配足够的 CPU、内存和磁盘资源。

三、Kafka 使用案例:生产者和消费者

1. 使用 Java 实现 Kafka 生产者和消费者

1.1 添加依赖

在 Maven 项目中添加 Kafka 的依赖:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.5.0</version>
</dependency>
1.2 编写 Kafka 生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class SimpleProducer {public static void main(String[] args) {// Kafka 生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", Integer.toString(i), "Message " + i);producer.send(record);}// 关闭生产者producer.close();}
}
1.3 编写 Kafka 消费者
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.Collections;
import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {// Kafka 消费者配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 轮询消息while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());}}}
}
1.4 运行 Java 程序

编译并运行生产者:

mvn compile
mvn exec:java -Dexec.mainClass="SimpleProducer"

编译并运行消费者:

mvn exec:java -Dexec.mainClass="SimpleConsumer"

2. 使用 Python 实现 Kafka 生产者和消费者

2.1 安装 Kafka 库
pip install kafka-python
2.2 编写 Kafka 生产者
from kafka import KafkaProducer# 创建 Kafka 生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092')# 发送消息
for i in range(10):producer.send('test-topic', key=bytes(str(i), encoding='utf-8'), value=bytes(f'Message {i}', encoding='utf-8'))# 关闭生产者
producer.close()
2.3 编写 Kafka 消费者
from kafka import KafkaConsumer# 创建 Kafka 消费者
consumer = KafkaConsumer('test-topic',bootstrap_servers='localhost:9092',group_id='test-group',auto_offset_reset='earliest'
)# 轮询消息
for message in consumer:print(f'Offset = {message.offset}, Key = {message.key.decode()}, Value = {message.value.decode()}')
2.4 运行 Python 程序

运行生产者:

python kafka_producer.py

运行消费者:

python kafka_consumer.py

3. 注意事项

  • 生产者和消费者配置:合理配置 bootstrap.serverskey.serializervalue.serializergroup.id 等参数。
  • 分区策略:在生产者中使用自定义分区策略,可以提高吞吐量和负载均衡。
  • 消费组:多个消费者实例可以组成一个消费组,以提高处理能力。
  • 容错机制:在实际应用中,需要考虑重试、错误处理和幂等性等问题。

总结

通过以上步骤,我们成功部署了 Kafka 单机和集群环境,并实现了一个简单的生产者和消费者应用。Kafka 提供了高吞吐量、低延迟的消息传递能力,适合用于实时流处理和数据管道。

部署过程中的注意事项

  • Java 版本:确保安装了正确版本的 Java。
  • ZooKeeper 集群:确保 ZooKeeper 集群稳定运行,并配置正确。
  • 网络配置:各节点之间的网络连接需要稳定,端口要开放。
  • 资源配置:根据业务需求配置合适的内存、CPU 和磁盘资源。
  • 数据安全:启用 Kafka 的 SSL/TLS 和 SASL 认证机制,确保数据安全传输。
  • 监控和管理:使用 Kafka Manager、Prometheus 等工具监控集群状态,及时处理异常。
  • 日志管理:定期检查和清理 Kafka 的日志,以防止磁盘空间不足。

通过合理的配置和优化,Kafka 可以为应用程序提供可靠的消息传递和流处理服务,是构建实时数据管道和事件驱动架构的重要组件。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 解决oracel锁表问题;SQL 错误 [54] [61000]: ORA-00054: 资源正忙
  • qt quick实现的水波纹特效:横向波纹、纵向波纹效果
  • 真题解析 | CCF CSP-J 2019 入门级 C++语言真题及答案
  • 安装开源软件ChatALL(齐叨)来聚合各大人工智能工具
  • Golang | Leetcode Golang题解之第332题重新安排行程
  • 使用 `@JsonTypeInfo` 和 `@JsonSubTypes` 注解实现多态序列化
  • django电商易购系统-计算机毕业设计源码61059
  • Element Plus的el-carousel走马灯平铺多张图片
  • 直播App遭受抓包后的DDoS与CC攻击防御策略
  • Haproxy的配置详解与使用
  • npm使用教程:从入门到精通
  • NextJS 使用 Docker 发布
  • echarts学习:绘制地图
  • PSO 算法实例(手动推导过程)
  • Windows下搭建Telegraf+Influxdb+Grafana(详解一)
  • 【159天】尚学堂高琪Java300集视频精华笔记(128)
  • Docker 笔记(1):介绍、镜像、容器及其基本操作
  • JavaScript函数式编程(一)
  • javascript面向对象之创建对象
  • Redis学习笔记 - pipline(流水线、管道)
  • 持续集成与持续部署宝典Part 2:创建持续集成流水线
  • 对话:中国为什么有前途/ 写给中国的经济学
  • 分享一份非常强势的Android面试题
  • 警报:线上事故之CountDownLatch的威力
  • 如何在 Tornado 中实现 Middleware
  • 数据结构java版之冒泡排序及优化
  • 算法-插入排序
  • 智能合约开发环境搭建及Hello World合约
  • ​Spring Boot 分片上传文件
  • #pragma data_seg 共享数据区(转)
  • #每日一题合集#牛客JZ23-JZ33
  • #我与Java虚拟机的故事#连载06:收获颇多的经典之作
  • $.ajax()
  • (5)STL算法之复制
  • (c语言版)滑动窗口 给定一个字符串,只包含字母和数字,按要求找出字符串中的最长(连续)子串的长度
  • (DFS + 剪枝)【洛谷P1731】 [NOI1999] 生日蛋糕
  • (done) 两个矩阵 “相似” 是什么意思?
  • (MonoGame从入门到放弃-1) MonoGame环境搭建
  • (二)Kafka离线安装 - Zookeeper下载及安装
  • (附源码)ssm高校实验室 毕业设计 800008
  • (推荐)叮当——中文语音对话机器人
  • .CSS-hover 的解释
  • .libPaths()设置包加载目录
  • .NET 6 Mysql Canal (CDC 增量同步,捕获变更数据) 案例版
  • .NET Standard 的管理策略
  • .net 调用php,php 调用.net com组件 --
  • .net 调用海康SDK以及常见的坑解释
  • .net 发送邮件
  • .net 生成二级域名
  • .Net6使用WebSocket与前端进行通信
  • .net获取当前url各种属性(文件名、参数、域名 等)的方法
  • .net下的富文本编辑器FCKeditor的配置方法
  • @RunWith注解作用
  • @Service注解让spring找到你的Service bean
  • @SpringBootApplication 包含的三个注解及其含义