当前位置: 首页 > news >正文

kafka搭建单机开发教程

下载安装

下载apache kafka2.11及以上版本Apache Kafka

下载后解压成这样的形式

改配置(单机非常的简单)

对于/conf/目录下新建一个properties文件叫server.properties文件即可。集群我后面单独会讲,那块有点篇幅的但不适合刚入手的开发,我们先把Kafka转起来再说。

server.properties

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://127.0.0.1:9092
port=9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
host.name=127.0.0.1
 
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
 
socket.request.max.bytes=104857600
 
 
log.dirs=/Users/chrishu123126.com/opt/kafka2.11/logs
num.partitions=3
 
num.recovery.threads.per.data.dir=1
 
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
 
 
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
 
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=6000
 
group.initial.rebalance.delay.ms=0
 
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

 

关键参数解释:

  • broker.id=0,只有一个kafka broker
  • delete.topic.enable=true,这个值如果不设,你运行kafka 里的删除topic命令是没用的,还需要自己跑到zk里去把topic删了;
  • listeners=PLAINTEXT://127.0.0.1:9092,kafka的监听地址,这边是本机,如果是连接kafka和kafka不在同一台服务器上那么你需要在此处把127.0.0.1换成这台机器的局域网ip;
  • port=9092,向远程暴露kafka服务的地址;
  • log.dirs=/Users/chrishu123126.com/opt/kafka2.11/logs,不仅仅是kafka的log日志,kafka的topic里的暂存信息也是用的这个目录,它实际是一个log日志+kafka的data文件所在路径;
  • num.partitions=3,不管是单机还是多机生产集群,这个最小数目是3.我们在后期说多机生产集群环境时,会详细述说kafka里的partition、broker、consumer几者间的关系,这三个概念是很重要的;
  • zookeeper.connect=localhost:2181,localhost:2182,localhost:2183,必须指定。kafka的节点间的信息是依靠zk去做协调的;
  • zookeeper.connection.timeout.ms=6000,ZK连接超时,一般为2-6s间即可;

zk集群本机搭建

这个太简单了,在此篇博文中就不作展开,网上有太多例子

我这边直接给出搭建所用的全配置。

先设3个zoo.cfg

分别命名为zoo1.cfg、zoo2.cfg、zoo3.cfg

zoo1.cfg

# The number of milliseconds of each tick
#zk时间单元(毫秒)
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
# floower启动过程中从leader同部数据的时间限制
#如果集群规模大,数据量多的话,适当调大此参数
initLimit=50
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=6
dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk1
dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk1
clientPort=2181
server.1=localhost:2187:2887
server.2=localhost:2188:2888
server.3=localhost:2189:2889
#leader不接受客户端连接,专注于通信和选举等
leaderServes=no
maxClientCnxns=1000
forceSync=no

zoo2.cfg

# The number of milliseconds of each tick
#zk时间单元(毫秒)
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
# floower启动过程中从leader同部数据的时间限制
#如果集群规模大,数据量多的话,适当调大此参数
initLimit=50
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=6
dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk2
dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk2
clientPort=2182
server.1=localhost:2187:2887
server.2=localhost:2188:2888
server.3=localhost:2189:2889
#leader不接受客户端连接,专注于通信和选举等
leaderServes=no
maxClientCnxns=1000
forceSync=no

zoo3.cfg

# The number of milliseconds of each tick
#zk时间单元(毫秒)
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
# floower启动过程中从leader同部数据的时间限制
#如果集群规模大,数据量多的话,适当调大此参数
initLimit=50
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=6
dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk3
dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk3
clientPort=2183
server.1=localhost:2187:2887
server.2=localhost:2188:2888
server.3=localhost:2189:2889
#leader不接受客户端连接,专注于通信和选举等
leaderServes=no
maxClientCnxns=1000
forceSync=no

开始搭建ZK集群

我们用的是zookeeper3.4.3

输入如下命令

touch data/zk1/myid
touch data/zk2/myid
touch data/zk3/myid

这样在每个/data/zk1这样的目录下会有一个空的文件叫myid

接下来再输入以下命令

echo 1 > data/zk1/myid
echo 2 > data/zk2/myid
echo 3 > data/zk3/myid

启动zk群

sh zkServer.sh start ../conf/zoo1.cfg
sh zkServer.sh start ../conf/zoo2.cfg
sh zkServer.sh start ../conf/zoo3.cfg

下面给出一条命令启动、一条命令关闭整个zk群的命令。

zk-start.sh

#!/bin/sh
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo1.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo2.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo3.cfg

zk-stop.sh

#!/bin/sh
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo1.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo2.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo3.cfg

运行kafka

./kafka-server-start.sh -daemon ../config/server.properties

如何创建一个kafka的topic和消费呢?

kafka创建producer

## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

命令结束后,你会得到一个command窗口。

在这个command窗口直接输入任意内容,即相当于在mq 里发了一条send("你输入的内容“)的作用。如果有相应的consumer连着kafka你就可以看到消费产生了。

kafka创建消息者

例:

producer

consumer

 

附、kafka新版本(2.两位数版本后)在kafka命令上发生的一些变化大全

后面在精讲kafka时我会把这些命令逐一做精讲。

启动
./kafka-server-start.sh -daemon ../config/server.properties
管理
## 创建主题(4个分区,2个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
复制
查询
## 查询集群描述
bin/kafka-topics.sh --describe --zookeeper
 
## topic列表查询
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
 
## topic列表查询(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
 
## 新消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
 
## 新消费者列表查询(支持0.10版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
 
## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
 
## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
 
## 显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
复制
发送和消费
## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
 
## 消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
 
## 新生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
 
## 新消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
 
## 高级点的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10
复制
平衡leader
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
复制
kafka自带压测命令
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092
复制
分区扩容
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
复制
迁移分区
1. 创建规则json
cat > increase-replication-factor.json <<EOF
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":3,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":4,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":5,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":6,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":7,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":8,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":9,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":10,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":11,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":12,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":13,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":14,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":15,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":16,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":17,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":18,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":19,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":20,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":21,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":22,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":23,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":24,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":25,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":26,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":27,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":28,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":29,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":30,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":31,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":32,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":33,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":34,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":35,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":36,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":37,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":38,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":39,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":40,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":41,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":42,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":43,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":44,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":45,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":46,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":47,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":48,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}]
}
EOF
复制
2. 执行
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
复制
3. 验证
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
复制

相关文章:

  • 如果你需要用Python搞个二维码,那应该收藏这篇博客
  • Linux权限的认识
  • 【JavaScript】五个常用功能/案例:判断特定结尾字符串 | 获取指定字符串 | 颜色字符串转换 | 字符串转驼峰格式 | 简易购物车
  • Kotlin 字符串与空值判断
  • 【NLP开发】Python实现聊天机器人(ALICE)
  • 【题解】同济线代习题二 8.1
  • 【C语言】五分钟彻底搞定字符串
  • 【极客日常】PyQt5的QListView兼容左键双击事件和右键上下文菜单的方法
  • 基于遗传优化算法的小车障碍物避障路线规划matlab仿真(包括matlab仿真录像)
  • vue后台系统管理项目-openlayers地图定位、港口数据标记功能
  • 通讯录的文件版本(产品经理又来加需求了!)
  • 基于Java+SpringBoot+Thymeleaf+Mysql医院预约挂号系统设计与实现
  • 计算机网络-网络层篇-IP协议
  • ASP.NET MVC--过滤器
  • 【C语言】手把手带你写第一个C语言程序
  • 【跃迁之路】【585天】程序员高效学习方法论探索系列(实验阶段342-2018.09.13)...
  • AWS实战 - 利用IAM对S3做访问控制
  • AzureCon上微软宣布了哪些容器相关的重磅消息
  • CentOS6 编译安装 redis-3.2.3
  • centos安装java运行环境jdk+tomcat
  • golang中接口赋值与方法集
  • LintCode 31. partitionArray 数组划分
  • node入门
  • Vue2.0 实现互斥
  • vue的全局变量和全局拦截请求器
  • 大主子表关联的性能优化方法
  • 多线程事务回滚
  • 汉诺塔算法
  • 深度学习在携程攻略社区的应用
  • 什么软件可以提取视频中的音频制作成手机铃声
  • 双管齐下,VMware的容器新战略
  • 小试R空间处理新库sf
  • 职业生涯 一个六年开发经验的女程序员的心声。
  • Unity3D - 异步加载游戏场景与异步加载游戏资源进度条 ...
  • 阿里云服务器如何修改远程端口?
  • 扩展资源服务器解决oauth2 性能瓶颈
  • 蚂蚁金服CTO程立:真正的技术革命才刚刚开始
  • #HarmonyOS:Web组件的使用
  • %3cscript放入php,跟bWAPP学WEB安全(PHP代码)--XSS跨站脚本攻击
  • (20050108)又读《平凡的世界》
  • (二)斐波那契Fabonacci函数
  • (强烈推荐)移动端音视频从零到上手(下)
  • (淘宝无限适配)手机端rem布局详解(转载非原创)
  • (转)关于pipe()的详细解析
  • (转)详解PHP处理密码的几种方式
  • (转)原始图像数据和PDF中的图像数据
  • .MSSQLSERVER 导入导出 命令集--堪称经典,值得借鉴!
  • .Net Attribute详解(上)-Attribute本质以及一个简单示例
  • .NET Framework 服务实现监控可观测性最佳实践
  • .net framwork4.6操作MySQL报错Character set ‘utf8mb3‘ is not supported 解决方法
  • .NET 编写一个可以异步等待循环中任何一个部分的 Awaiter
  • .Net 高效开发之不可错过的实用工具
  • .net 生成二级域名
  • .Net8 Blazor 尝鲜
  • .netcore 如何获取系统中所有session_如何把百度推广中获取的线索(基木鱼,电话,百度商桥等)同步到企业微信或者企业CRM等企业营销系统中...