上文《Go语言开发基础系列(四十六)kafka生产者发送消息》我们使用了github.com/IBM/sarama框架完成kafka生产者,本文我们介绍下使用github.com/IBM/sarama进行消费者的开发。
使用java操作过kafka的同学都知道:
1、kafka的消费者有group组的概念,不同group组可以从头到尾的接收到kafka队列的所有消息,且不同消费者组之间互不干涉。 2、消费者一般使用异步多线程的形式来进行数据的接收。
基于上诉两点,用go实现的消费者消费消息的完整代码示例如下:
package main
import (
"context"
"fmt"
"github.com/IBM/sarama"
"log"
)
func main() {
//创建消费者的config
config := sarama.NewConfig()
config.ClientID = "consumer1" //当前consumer的id
config.Version = sarama.V3_4_1_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest //设置为从上一次消费的位置开始消费,可以把消息队列中未消费的数据给拉取出来。
// 创建client
client, err := sarama.NewClient([]string{"192.168.1.249:9092"}, config)
if err != nil {
log.Fatal(err)
}
//从客服端创建不同的group组接收消息
group, err := sarama.NewConsumerGroupFromClient("consumer1", client)
if err != nil {
fmt.Printf("连接kafka错误, err:%v\n", err)
return
}
// 检查错误
go func() {
for err := range group.Errors() {
fmt.Println("group errors : ", err)
}
}()
ctx := context.Background()
fmt.Println("start get msg")
// for 是应对 consumer rebalance
for {
// 需要监听的主题
topics := []string{"test"}
handler := ConsumerGroupHandler{}
// 启动kafka消费组模式,消费的逻辑在上面的 ConsumeClaim 这个方法里
err := group.Consume(ctx, topics, handler)
if err != nil {
fmt.Println("消费消息失败; err : ", err)
return
}
}
}
type ConsumerGroupHandler struct{}
func (ConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error {
return nil
}
func (ConsumerGroupHandler) Cleanup(sess sarama.ConsumerGroupSession) error {
return nil
}
// 这个方法用来消费消息的
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// 获取消息
for msg := range claim.Messages() {
fmt.Printf("接受到消息的topic是 %s---消息所属分区是:%d, 消息的Offset:%d, 消息的Key:%s, 消息的Value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
// 将消息标记为已使用
sess.MarkMessage(msg, "")
}
return nil
}最后运行一下,可看到消费成功
在代码里面有几个需要注意的地方:
1、config中需要设置offsets,即从最新的地方拉取还是把kafka中没有消费过的消息都拉取下来,一般选择后者。 2、使用group的概念,切换不同的group,可以实现从头拉取消息。 3、手动mark标记消息,即消息处理完才标记消息为已使用,如果没消费完或者报错,我们不能标记消息为已使用,这样贴合实际业务,保证消息消费不丢失。
以上就是go语言实现kafka消费者的案例。


还没有评论,来说两句吧...