当前位置: 首页 > news >正文

Docker Compose部署Kafka集群并在宿主机Windows连接开发

文章目录

  • 1. 常用参数
  • 2. 理解参数和原理
  • 3. Docker Compose
  • 4. 验证

Docker for Windows4.23.0
windows11
Java17

1. 常用参数

kafka容器常用参数如下

  • -e KAFKA_BROKER_ID=1:设置 Kafka broker 的 ID 为 1。每个 Kafka broker 都需要一个唯一的 ID。

  • -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181:指定 Kafka 连接到 Zookeeper 的地址,这里假设 Zookeeper 容器的名称为 zookeeper,并且它在 2181 端口监听。

  • -e ALLOW_PLAINTEXT_LISTENER=yes:允许 Kafka 使用纯文本监听器。即允许非加密的通信。

  • -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092:Kafka broker 实际监听在容器内的 0.0.0.0:9092 上。这意味着 Kafka 接受来自任何网络接口的连接。

  • -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092:指定 Kafka 广播其监听器地址,客户端将使用该地址连接到 broker。在这个例子中,Kafka 广播它在 localhost:9092 上监听。

  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:指定 Kafka 使用的监听器协议映射。例如:PLAINTEXT:PLAINTEXT,SSL:SSL

  • KAFKA_INTER_BROKER_LISTENER_NAME:指定 broker 间通信使用的监听器名称。例如:PLAINTEXT

2. 理解参数和原理

KAFKA_LISTENERS是broker实际监听的地址。

KAFKA_ADVERTISED_LISTENERS是broker注册在zookeeper或者controller broker里面的元数据,当消费者或者生产者使用Bootstrap-Server去连接kafka集群时,集群会返回元数据等信息到客户端,客户端会根据每个broker提供的KAFKA_ADVERTISED_LISTENERS去连接对应的broker。

所以首先,集群之间,broker之间需要通信,所以每个kafka容器需要设置一个KAFKA_ADVERTISED_LISTENERS用于告诉别的容器如何连接到自己,如果容器都是处于同一bridge网络,那么直接使用容器名即可。

其次,我们想要在宿主机比如windows的idea开发,我们一般只能通过docker容器-p暴露的端口去连接kafka,所以每个kafka容器还需要设置一个KAFKA_ADVERTISED_LISTENERS来告诉宿主机的客户端,如何连接到自己,这里需要使用localhost+暴露在宿主机的端口。

那么如果KAFKA_ADVERTISED_LISTENERS里面有2个地址,如何保证broker之间的连接使用的是容器名,而宿主机客户端使用的是localhost呢?

这需要KAFKA_INTER_BROKER_LISTENER_NAME来指定前者。

并且由于KAFKA_ADVERTISED_LISTENERS里面有2个地址,所以我们还需要KAFKA_LISTENER_SECURITY_PROTOCOL_MAP来映射监听器名字。

3. Docker Compose

version: '3.8'services:zookeeper:image: bitnami/zookeeper:3.8.2container_name: zookeeperenvironment:- ALLOW_ANONYMOUS_LOGIN=yesnetworks:- kafkakafka1:image: bitnami/kafka:3.6.1container_name: kafka1depends_on:- zookeeperports:- "19092:9092"environment:- KAFKA_BROKER_ID=1- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka1:9093,EXTERNAL://localhost:19092- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNALnetworks:- kafkakafka2:image: bitnami/kafka:3.6.1container_name: kafka2depends_on:- zookeeperports:- "29092:9092"environment:- KAFKA_BROKER_ID=2- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka2:9093,EXTERNAL://localhost:29092- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNALnetworks:- kafkakafka3:image: bitnami/kafka:3.6.1container_name: kafka3depends_on:- zookeeperports:- "39092:9092"environment:- KAFKA_BROKER_ID=3- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka3:9093,EXTERNAL://localhost:39092- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNALnetworks:- kafkanetworks:kafka:driver: bridge

可以看到每个容器都设置了INTERNAL,因为指定了KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL,所以这是用于broker之间的连接,其监听在本地的0.0.0.0:9093,广播给其它broker的通信地址是<容器名>:9093,使用PLAINTEXT(明文)方式通信。

除此之外还设置了EXTERNAL,监听在本地的0.0.0.0:9092,广播给客户端的地址是localhost:19092、localhost:29092、localhost:39092,也就是windows上的客户端通过localhost:19092访问broker,这会被docker的-p映射到对应容器的9092,被0.0.0.0:9092对接。

4. 验证

连接到某个容器。创建test主题。

kafka-topics.sh --create --topic test --partitions 3 --replication-factor 3 --bootstrap-server kafka1:9093

查看分区和副本情况,可以看到在不同的broker上,输出中显示的是Broker ID。

I have no name!@7212060b6e3d:/$ kafka-topics.sh --describe --topic test --bootstrap-server kafka1:9093
Topic: test     TopicId: Lo1eQ6aCQj6WiFcNiVBrcw PartitionCount: 3       ReplicationFactor: 3    Configs: Topic: test     Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1Topic: test     Partition: 1    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2Topic: test     Partition: 2    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

引入pom包

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version></dependency><!-- SLF4J API --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.36</version></dependency><!-- Logback classic (SLF4J implementation) --><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.11</version></dependency>

生产者代码,通过localhost:19092连接到集群。

package org.dragon.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;public class KafkaProducerTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test","key"+i,"我是你爹"+i);//发送recordproducer.send(record);Thread.sleep(500);}//关闭producerproducer.close();}
}

消费者代码,

package org.dragon.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections;
import java.util.HashMap;public class KafkaConsumerTest {public static void main(String[] args) {// 创建消费者对象HashMap<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());config.put(ConsumerConfig.GROUP_ID_CONFIG, "mkl");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);// 消费者订阅主题kafkaConsumer.subscribe(Collections.singletonList("test"));try {while (true){// 消费者拉取消息ConsumerRecords<String, String> records = kafkaConsumer.poll(100);records.forEach(System.out::println);}}finally {// 消费者关闭kafkaConsumer.close();}}
}

都启动后,消费者和生产者日志正常。
在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 对AAC解码的理解
  • Linux C++ 054-设计模式之外观模式
  • leetcode日记(38)字母异位词分组
  • C++数组
  • 【密码学】消息认证
  • 九、Linux二进制安装ElasticSearch集群
  • 【JavaScript】解决 JavaScript 语言报错:Uncaught SyntaxError: Unexpected token
  • Qt QWebSocket网络编程
  • Nginx -Web服务器/反向代理/负载均衡
  • Selenium WebDriver中的显式等待与隐式等待:深入理解与应用
  • LabVIEW学习-LabVIEW储存Excel表格
  • 新版k8s拉取镜像失败问题
  • Python基础学习笔记——异常
  • python实现openssl的EVP_BytesToKey及AES_256_CBC加解密算法
  • “存算分离“和“湖仓一体“
  • 【附node操作实例】redis简明入门系列—字符串类型
  • Date型的使用
  • SAP云平台运行环境Cloud Foundry和Neo的区别
  • TiDB 源码阅读系列文章(十)Chunk 和执行框架简介
  • ucore操作系统实验笔记 - 重新理解中断
  • Vue 2.3、2.4 知识点小结
  • 阿里云爬虫风险管理产品商业化,为云端流量保驾护航
  • 基于OpenResty的Lua Web框架lor0.0.2预览版发布
  • 计算机在识别图像时“看到”了什么?
  • 前端相关框架总和
  • 让你的分享飞起来——极光推出社会化分享组件
  • 入门级的git使用指北
  • 使用Gradle第一次构建Java程序
  • 通过来模仿稀土掘金个人页面的布局来学习使用CoordinatorLayout
  • 做一名精致的JavaScripter 01:JavaScript简介
  • 你对linux中grep命令知道多少?
  • ​ ​Redis(五)主从复制:主从模式介绍、配置、拓扑(一主一从结构、一主多从结构、树形主从结构)、原理(复制过程、​​​​​​​数据同步psync)、总结
  • # 计算机视觉入门
  • #Lua:Lua调用C++生成的DLL库
  • #在线报价接单​再坚持一下 明天是真的周六.出现货 实单来谈
  • (06)Hive——正则表达式
  • (done) NLP “bag-of-words“ 方法 (带有二元分类和多元分类两个例子)词袋模型、BoW
  • (STM32笔记)九、RCC时钟树与时钟 第二部分
  • (八)Flink Join 连接
  • (非本人原创)我们工作到底是为了什么?​——HP大中华区总裁孙振耀退休感言(r4笔记第60天)...
  • (附源码)springboot车辆管理系统 毕业设计 031034
  • (十一)手动添加用户和文件的特殊权限
  • (一)硬件制作--从零开始自制linux掌上电脑(F1C200S) <嵌入式项目>
  • ***汇编语言 实验16 编写包含多个功能子程序的中断例程
  • ./configure、make、make install 命令
  • .Net 4.0并行库实用性演练
  • .NET C# 使用 iText 生成PDF
  • .NET Core 控制台程序读 appsettings.json 、注依赖、配日志、设 IOptions
  • .NET Core 项目指定SDK版本
  • .Net Core 中间件与过滤器
  • .NET IoC 容器(三)Autofac
  • .Net 垃圾回收机制原理(二)
  • .Net--CLS,CTS,CLI,BCL,FCL
  • .NET企业级应用架构设计系列之技术选型
  • /bin/bash^M: bad interpreter: No such file or directory