当前位置: 首页 > news >正文

Kafka-Go学习

文章目录

      • 1. **安装 `kafka-go`**
      • 2. **基本概念**
      • 3. **`kafka-go` 基本用法**
        • 3.1 创建 Producer(生产者)
        • 3.2 创建 Consumer(消费者)
        • 3.3 生产者和消费者配置详解
          • 生产者配置 (`kafka.WriterConfig`)
          • 消费者配置 (`kafka.ReaderConfig`)
      • 4. **高级用法**
        • 4.1 消费者偏移量管理
        • 4.2 分区管理
        • 4.3 使用 SASL 认证
      • 5. **Kafka 生产者与消费者优化**
        • 5.1 优化生产者
        • 5.2 优化消费者
      • 6. **错误处理**
      • 7. **总结**
        • 常用资源

kafka-go 是 Go 语言中一个轻量级、高效的 Kafka 客户端库,提供了简单易用的 API 来与 Apache Kafka 进行交互。 kafka-go 支持 Kafka 的生产者和消费者功能,适用于 Go 应用程序中使用 Kafka 进行消息队列的实现。

1. 安装 kafka-go

首先,需要在 Go 项目中安装 kafka-go 库:

go get github.com/segmentio/kafka-go

2. 基本概念

  • Producer (生产者):生产者负责将消息发送到 Kafka 中的某个主题。
  • Consumer (消费者):消费者从 Kafka 中读取消息。
  • Topic (主题):Kafka 将消息按主题进行分类,每个主题可能有多个分区。
  • Partition (分区):每个主题可以被划分为若干个分区,消息在分区之间进行负载均衡。

3. kafka-go 基本用法

3.1 创建 Producer(生产者)

生产者的作用是向 Kafka 的主题中发送消息。kafka-go 提供了一个简单的 API 来实现消息的生产。

package mainimport ("context""fmt""log""time""github.com/segmentio/kafka-go"
)func main() {// 配置 Kafka writer(生产者)writer := kafka.NewWriter(kafka.WriterConfig{Brokers:  []string{"localhost:9092"}, // Kafka broker 地址Topic:    "example-topic",            // 发送到的 Kafka 主题Balancer: &kafka.LeastBytes{},        // 负载均衡策略})// 定义上下文ctx := context.Background()// 发送消息err := writer.WriteMessages(ctx,kafka.Message{Key:   []byte("Key-A"),Value: []byte("Hello Kafka!"),},kafka.Message{Key:   []byte("Key-B"),Value: []byte("Another Message"),},)if err != nil {log.Fatal("Failed to write messages:", err)}fmt.Println("Messages successfully sent to Kafka")// 关闭 writerif err := writer.Close(); err != nil {log.Fatal("Failed to close writer:", err)}
}
3.2 创建 Consumer(消费者)

消费者从 Kafka 的主题中读取消息。你可以设置不同的消费者组来实现分布式消费。

package mainimport ("context""fmt""log""time""github.com/segmentio/kafka-go"
)func main() {// 配置 Kafka reader(消费者)reader := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{"localhost:9092"},  // Kafka broker 地址Topic:    "example-topic",             // 读取的 Kafka 主题GroupID:  "example-group",             // 消费者组 IDMinBytes: 10e3,                       // 每次 fetch 请求最少读取 10KBMaxBytes: 10e6,                       // 每次 fetch 请求最多读取 10MB})// 读取消息for {// 设置上下文ctx := context.Background()// 读取消息msg, err := reader.ReadMessage(ctx)if err != nil {log.Fatal("Failed to read message:", err)}// 打印消息fmt.Printf("Message at offset %d: key = %s, value = %s\n", msg.Offset, string(msg.Key), string(msg.Value))// 模拟延迟,避免占用过多 CPU 资源time.Sleep(1 * time.Second)}// 关闭 readerif err := reader.Close(); err != nil {log.Fatal("Failed to close reader:", err)}
}
3.3 生产者和消费者配置详解
生产者配置 (kafka.WriterConfig)
  • Brokers:Kafka broker 的地址列表。
  • Topic:指定生产者要发送消息的 Kafka 主题。
  • Balancer:消息负载均衡策略,如 LeastBytes(最小字节数分配)或 Hash(基于消息 key 的哈希分配)。
消费者配置 (kafka.ReaderConfig)
  • Brokers:Kafka broker 的地址列表。
  • Topic:指定消费者要读取的 Kafka 主题。
  • GroupID:消费者组 ID,Kafka 会将同一个组的消费者平衡分配到不同的分区。
  • MinBytesMaxBytes:每次从 Kafka 读取的最小和最大字节数,影响消息的拉取频率和性能。

4. 高级用法

4.1 消费者偏移量管理

Kafka 消费者通过偏移量(Offset)来管理读取进度,kafka-go 自动为你处理偏移量提交,但你也可以手动管理。

package mainimport ("context""fmt""log""github.com/segmentio/kafka-go"
)func main() {reader := kafka.NewReader(kafka.ReaderConfig{Brokers: []string{"localhost:9092"},Topic:   "example-topic",GroupID: "example-group",})// 读取消息并手动提交偏移量for {msg, err := reader.FetchMessage(context.Background())if err != nil {log.Fatal("Failed to fetch message:", err)}fmt.Printf("Message: %s = %s\n", string(msg.Key), string(msg.Value))// 手动提交消息偏移量if err := reader.CommitMessages(context.Background(), msg); err != nil {log.Fatal("Failed to commit message:", err)}}if err := reader.Close(); err != nil {log.Fatal("Failed to close reader:", err)}
}
4.2 分区管理

Kafka 主题中的消息被分布在多个分区中,kafka-go 允许生产者根据消息的 Key 选择分区,确保相同的 Key 总是发送到同一个分区。

writer := kafka.NewWriter(kafka.WriterConfig{Brokers: []string{"localhost:9092"},Topic:   "example-topic",Balancer: &kafka.Hash{},  // 基于 Key 的 Hash 分配到相同分区
})
4.3 使用 SASL 认证

如果 Kafka 使用了 SASL(Simple Authentication and Security Layer)认证机制,你可以通过 kafka-go 提供的 SASL 支持来进行认证。

import "github.com/segmentio/kafka-go/sasl/plain"dialer := &kafka.Dialer{SASLMechanism: plain.Mechanism{Username: "my-username",Password: "my-password",},
}reader := kafka.NewReader(kafka.ReaderConfig{Brokers: []string{"localhost:9092"},Topic:   "example-topic",Dialer:  dialer,  // 配置认证
})

5. Kafka 生产者与消费者优化

5.1 优化生产者
  • Batching:批量发送消息能提升效率,kafka-go 允许配置批量发送。
  • Compression:使用压缩算法如 gzipsnappy,可以减少网络带宽使用。
writer := kafka.NewWriter(kafka.WriterConfig{Brokers:      []string{"localhost:9092"},Topic:        "example-topic",BatchSize:    100,  // 设置批量大小BatchTimeout: time.Millisecond * 10,Compression:  kafka.Gzip,
})
5.2 优化消费者
  • 并发消费:可以启动多个消费者来读取不同的分区,提升消息处理的吞吐量。
  • 流量控制:通过配置 MinBytesMaxBytes 来控制每次 fetch 的大小,从而优化消费者的性能。

6. 错误处理

Kafka 通常是分布式的,可能会遇到网络故障或 broker 不可用等问题。在生产者和消费者中应该使用适当的错误处理和重试机制。

for {err := writer.WriteMessages(ctx, msg)if err != nil {fmt.Println("Error writing message:", err)time.Sleep(1 * time.Second)  // 简单的重试机制}
}

7. 总结

kafka-go 是 Go 语言中用于与 Kafka 进行通信的一个简洁高效的库,提供了生产者、消费者、分区管理、偏移量管理等完整的功能。它的 API 设计简单易用,同时具有较高的性能和扩展性,适合在 Go 应用中集成 Kafka 消息队列。

常用资源
  • Kafka 官方文档:https://kafka.apache.org/documentation/
  • kafka-go 官方文档:https://github.com/segmentio/kafka-go

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 5.内容创作的未来:ChatGPT如何辅助写作(5/10)
  • 算法题之每日温度
  • Vue学习记录之三(ref全家桶)
  • 山东潍坊戴尔存储服务器维修 md3800f raid恢复
  • Spring:项目中的统一异常处理和自定义异常
  • MATLAB入门基础篇
  • 2024数学建模研赛华为杯选题建议详细思路代码文章A题B题C题D题E题F题研究生数模竞赛
  • 我的AI工具箱Tauri版-FasterWhisper音频转文本
  • 【毕业设计】基于 PHP 开发的社区交流系统
  • ubuntu 22.04 ~24.04 如何修改登录背景
  • golang学习笔记2-语法要求,注释与代码风格
  • 周边游小程序开发
  • 双击就可以打开vue项目,而不用npm run dev
  • Redis——redispluspls库通用命令以及String类型相关接口使用
  • 实用好软-----电脑端 全能音视频转换器 转换各种音视频格式
  • 【MySQL经典案例分析】 Waiting for table metadata lock
  • 【笔记】你不知道的JS读书笔记——Promise
  • angular组件开发
  • ECMAScript 6 学习之路 ( 四 ) String 字符串扩展
  • iOS高仿微信项目、阴影圆角渐变色效果、卡片动画、波浪动画、路由框架等源码...
  • JS变量作用域
  • Spark RDD学习: aggregate函数
  • webpack+react项目初体验——记录我的webpack环境配置
  • windows下如何用phpstorm同步测试服务器
  • 爱情 北京女病人
  • 笨办法学C 练习34:动态数组
  • 持续集成与持续部署宝典Part 2:创建持续集成流水线
  • 坑!为什么View.startAnimation不起作用?
  • 码农张的Bug人生 - 见面之礼
  • 使用 @font-face
  • 微信小程序填坑清单
  • 详解NodeJs流之一
  • 详解移动APP与web APP的区别
  • 一份游戏开发学习路线
  • 关于Android全面屏虚拟导航栏的适配总结
  • ​HTTP与HTTPS:网络通信的安全卫士
  • ​比特币大跌的 2 个原因
  • # 深度解析 Socket 与 WebSocket:原理、区别与应用
  • $.ajax中的eval及dataType
  • $.each()与$(selector).each()
  • (1)(1.11) SiK Radio v2(一)
  • (1)svelte 教程:hello world
  • (7) cmake 编译C++程序(二)
  • (Redis使用系列) SpringBoot中Redis的RedisConfig 二
  • (笔记)M1使用hombrew安装qemu
  • (超简单)构建高可用网络应用:使用Nginx进行负载均衡与健康检查
  • (带教程)商业版SEO关键词按天计费系统:关键词排名优化、代理服务、手机自适应及搭建教程
  • (二)pulsar安装在独立的docker中,python测试
  • (二)什么是Vite——Vite 和 Webpack 区别(冷启动)
  • (分布式缓存)Redis哨兵
  • (论文阅读22/100)Learning a Deep Compact Image Representation for Visual Tracking
  • (译) 函数式 JS #1:简介
  • (原创)Stanford Machine Learning (by Andrew NG) --- (week 9) Anomaly DetectionRecommender Systems...
  • (转)利用ant在Mac 下自动化打包签名Android程序
  • ./configure,make,make install的作用(转)