package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)var (topic = "user_click"reader *kafka.Reader
)
func writeKafka(ctx context.Context) {writer := &kafka.Writer{Addr: kafka.TCP("localhost:9092"), Topic: topic, Balancer: &kafka.Hash{}, WriteTimeout: 1 * time.Second, RequiredAcks: kafka.RequireNone, AllowAutoTopicCreation: true, }defer writer.Close() for i := 0; i < 3; i++ { if err := writer.WriteMessages(ctx, kafka.Message{Key: []byte("1"), Value: []byte("A")},kafka.Message{Key: []byte("2"), Value: []byte("B")},kafka.Message{Key: []byte("3"), Value: []byte("C")},kafka.Message{Key: []byte("1"), Value: []byte("D")}, kafka.Message{Key: []byte("2"), Value: []byte("E")},); err != nil {if err == kafka.LeaderNotAvailable { time.Sleep(500 * time.Millisecond)continue} else {fmt.Printf("batch write message failed: %v", err)}} else {break }}
}
func readKafka(ctx context.Context) {reader = kafka.NewReader(kafka.ReaderConfig{Brokers: []string{"localhost:9092"}, Topic: topic,CommitInterval: 1 * time.Second, GroupID: "recommend_biz", StartOffset: kafka.FirstOffset, })for { if message, err := reader.ReadMessage(ctx); err != nil {fmt.Printf("read message from kafka failed: %v", err)break} else {offset := message.Offsetfmt.Printf("topic=%s, partition=%d, offset=%d, key=%s, message content=%s\n", message.Topic, message.Partition, offset, string(message.Key), string(message.Value))}}
}
func listenSignal() {c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) sig := <-c fmt.Printf("receive signal %s\n", sig.String())if reader != nil {reader.Close()}os.Exit(0)
}func main() {ctx := context.Background()go listenSignal()readKafka(ctx)
}