在开发中,消息队列是我们经常会使用到的,所以这里的话我们来演示下使用go语言来实现kafka的生产者,发送消息到kafka队列总。
一、安装kakfa环境
安装kafka的环境,可以参考《使用docker-compose快速部署单机版本zookeeper+kafka+kafka map》。根据此教程安装非常的方便。
二、安装依赖
在go语言中,使用kafka的话,我们安装的依赖是:github.com/IBM/sarama,所以在使用前记得安装此依赖
go get -u github.com/IBM/sarama
安装完成之后就可以开始编码了。
三、kafka生产者编码
接下来就是编写kafka生产者的代码了,主要分为4大块,分别是:
1、创建kafka相关的config 2、准备要发送的msg 3、创建到kafka 4、发送消息
根据上面的4个步骤,最后的编码完整代码如下:
package main import ( "fmt" "github.com/IBM/sarama" ) func main() { //创建config config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认,确保数据不丢失 config.Producer.Partitioner = sarama.NewRandomPartitioner // 随机partition发送 config.Producer.Return.Successes = true // 发送成功返回true // 构造一个消息,包含topic和msg bod msg := &sarama.ProducerMessage{} msg.Topic = "test" //发送的topic是test msg.Value = sarama.StringEncoder("测试消息") // 连接到kafka client, err := sarama.NewSyncProducer([]string{"192.168.1.249:9092"}, config) if err != nil { fmt.Println("连接kafka错误:", err) return } defer client.Close() // 发送消息 pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("发送消息失败, err:", err) return } fmt.Printf("msg pid:%v msg offset:%v\n", pid, offset) }
测试一下
可以看到成功的把消息发送到了kafka中,我们再登录kafka map看看:
kafka map中能观测到对应的消息,说明我们的生产者发送到kafka中的消息是成功的。
以上就是在go语言中编写kafka生产者的案例。
还没有评论,来说两句吧...