当前位置: 首页 > news >正文

kafka-go操作kafka

package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)var (topic  = "user_click"reader *kafka.Reader
)// 生产消息
func writeKafka(ctx context.Context) {writer := &kafka.Writer{Addr:                   kafka.TCP("localhost:9092"), //不定长参数,支持传入多个broker的ip:portTopic:                  topic,                       //为所有message指定统一的topic。如果这里不指定统一的Topic,则创建kafka.Message{}时需要分别指定TopicBalancer:               &kafka.Hash{},               //把message的key进行hash,确定partitionWriteTimeout:           1 * time.Second,             //设定写超时RequiredAcks:           kafka.RequireNone,           //RequireNone不需要等待ack返回,效率最高,安全性最低;RequireOne只需要确保Leader写入成功就可以发送下一条消息;RequiredAcks需要确保Leader和所有Follower都写入成功才可以发送下一条消息。AllowAutoTopicCreation: true,                        //Topic不存在时自动创建。生产环境中一般设为false,由运维管理员创建Topic并配置partition数目}defer writer.Close() //记得关闭连接for i := 0; i < 3; i++ { //允许重试3次if err := writer.WriteMessages(ctx, //批量写入消息,原子操作,要么全写成功,要么全写失败kafka.Message{Key: []byte("1"), Value: []byte("A")},kafka.Message{Key: []byte("2"), Value: []byte("B")},kafka.Message{Key: []byte("3"), Value: []byte("C")},kafka.Message{Key: []byte("1"), Value: []byte("D")}, //key相同时肯定写入同一个partitionkafka.Message{Key: []byte("2"), Value: []byte("E")},); err != nil {if err == kafka.LeaderNotAvailable { //首次写一个新的Topic时,会发生LeaderNotAvailable错误,重试一次就好了time.Sleep(500 * time.Millisecond)continue} else {fmt.Printf("batch write message failed: %v", err)}} else {break //只要成功一次就不再尝试下一次了}}
}// 消费消息
func readKafka(ctx context.Context) {reader = kafka.NewReader(kafka.ReaderConfig{Brokers:        []string{"localhost:9092"}, //支持传入多个broker的ip:portTopic:          topic,CommitInterval: 1 * time.Second,   //每隔多长时间自动commit一次offset。即一边读一边向kafka上报读到了哪个位置。GroupID:        "recommend_biz",   //一个Group内消费到的消息不会重复StartOffset:    kafka.FirstOffset, //当一个特定的partition没有commited offset时(比如第一次读一个partition,之前没有commit过),通过StartOffset指定从第一个还是最后一个位置开始消费。StartOffset的取值要么是FirstOffset要么是LastOffset,LastOffset表示Consumer启动之前生成的老数据不管了。仅当指定了GroupID时,StartOffset才生效。})// defer reader.Close() //由于下面是死循环,正常情况下readKafka()函数永远不会结束,defer不会执行。所以需要监听信息2和15,当收到信号时关闭reader。需要把reader设为全局变量for { //消息队列里随时可能有新消息进来,所以这里是死循环,类似于读Channelif message, err := reader.ReadMessage(ctx); err != nil {fmt.Printf("read message from kafka failed: %v", err)break} else {offset := message.Offsetfmt.Printf("topic=%s, partition=%d, offset=%d, key=%s, message content=%s\n", message.Topic, message.Partition, offset, string(message.Key), string(message.Value))}}
}// 需要监听信息2和15,当收到信号时关闭reader
func listenSignal() {c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) //注册信号2和15sig := <-c                                        //阻塞,直到信号的到来fmt.Printf("receive signal %s\n", sig.String())if reader != nil {reader.Close()}os.Exit(0) //进程退出
}func main() {ctx := context.Background()// writeKafka(ctx)go listenSignal()readKafka(ctx)
} 

相关文章:

  • 计算机丢失mfc100.dll如何恢复,详细解析mfc100.dll文件丢失解决方法
  • uniapp的几种跳转方式
  • oled显示器程序(IIC)从stm32f103移植到stm32f429出现bug不显示-解决移植失败问题
  • react路由组件的lazyLoad
  • Go 语言初探:从基础到实战
  • 【gltf-pipeline】安装gltf-pipeline 进行文件格式转换
  • 汽车电子中的深力科推荐一款汽车用功率MOSFET NVTFS6H888NLTAG N沟道
  • C语言 每日一题 11.9 day15
  • 酷开科技持续推动智能投影行业创新发展
  • 修正TiKnob的指示箭头显示问题
  • 搭建嵌入式GDB调试环境以及VSCode+gdbserver 图形化调试
  • IDEA中如何移除未使用的import
  • chrome 一些详细信息查找的地方
  • linux安装配置MongoDB并设置开机启动
  • React高阶组件(Higher-Order Components, HOCs)
  • 「前端」从UglifyJSPlugin强制开启css压缩探究webpack插件运行机制
  • Consul Config 使用Git做版本控制的实现
  • HashMap ConcurrentHashMap
  • JavaScript 一些 DOM 的知识点
  • jquery ajax学习笔记
  • Nacos系列:Nacos的Java SDK使用
  • Python爬虫--- 1.3 BS4库的解析器
  • Python中eval与exec的使用及区别
  • React16时代,该用什么姿势写 React ?
  • Storybook 5.0正式发布:有史以来变化最大的版本\n
  • Vultr 教程目录
  • 区块链技术特点之去中心化特性
  • media数据库操作,可以进行增删改查,实现回收站,隐私照片功能 SharedPreferences存储地址:
  • No resource identifier found for attribute,RxJava之zip操作符
  • 阿里云服务器购买完整流程
  • #LLM入门|Prompt#3.3_存储_Memory
  • #快捷键# 大学四年我常用的软件快捷键大全,教你成为电脑高手!!
  • %3cscript放入php,跟bWAPP学WEB安全(PHP代码)--XSS跨站脚本攻击
  • (02)vite环境变量配置
  • (8)STL算法之替换
  • (Redis使用系列) SpringBoot中Redis的RedisConfig 二
  • (ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY)讲解
  • (八)c52学习之旅-中断实验
  • (附源码)springboot炼糖厂地磅全自动控制系统 毕业设计 341357
  • (转) 深度模型优化性能 调参
  • (转)树状数组
  • .halo勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • .NET “底层”异步编程模式——异步编程模型(Asynchronous Programming Model,APM)...
  • .net core控制台应用程序初识
  • .NET Micro Framework 4.2 beta 源码探析
  • .net mvc 获取url中controller和action
  • .NET Standard、.NET Framework 、.NET Core三者的关系与区别?
  • .net wcf memory gates checking failed
  • .net 中viewstate的原理和使用
  • .NetCore项目nginx发布
  • .Net程序猿乐Android发展---(10)框架布局FrameLayout
  • .net连接oracle数据库
  • /etc/fstab和/etc/mtab的区别
  • /etc/X11/xorg.conf 文件被误改后进不了图形化界面
  • @staticmethod和@classmethod的作用与区别