在开发中,消息队列是我们经常会使用到的,所以这里的话我们来演示下使用go语言来实现kafka的生产者,发送消息到kafka队列总。
一、安装kakfa环境
安装kafka的环境,可以参考《使用docker-compose快速部署单机版本zookeeper+kafka+kafka map》。根据此教程安装非常的方便。
二、安装依赖
在go语言中,使用kafka的话,我们安装的依赖是:github.com/IBM/sarama,所以在使用前记得安装此依赖
go get -u github.com/IBM/sarama
安装完成之后就可以开始编码了。
三、kafka生产者编码
接下来就是编写kafka生产者的代码了,主要分为4大块,分别是:
1、创建kafka相关的config 2、准备要发送的msg 3、创建到kafka 4、发送消息
根据上面的4个步骤,最后的编码完整代码如下:
package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
//创建config
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认,确保数据不丢失
config.Producer.Partitioner = sarama.NewRandomPartitioner // 随机partition发送
config.Producer.Return.Successes = true // 发送成功返回true
// 构造一个消息,包含topic和msg bod
msg := &sarama.ProducerMessage{}
msg.Topic = "test" //发送的topic是test
msg.Value = sarama.StringEncoder("测试消息")
// 连接到kafka
client, err := sarama.NewSyncProducer([]string{"192.168.1.249:9092"}, config)
if err != nil {
fmt.Println("连接kafka错误:", err)
return
}
defer client.Close()
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("发送消息失败, err:", err)
return
}
fmt.Printf("msg pid:%v msg offset:%v\n", pid, offset)
}测试一下
可以看到成功的把消息发送到了kafka中,我们再登录kafka map看看:
kafka map中能观测到对应的消息,说明我们的生产者发送到kafka中的消息是成功的。
以上就是在go语言中编写kafka生产者的案例。




还没有评论,来说两句吧...