前面我们尝试介绍了pulsar,同时也安装了一个简单的pulsar,在这里我们编写个pulsar 的javademo。供大家参考。
首先我们引入pulsar的maven依赖
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.6.1</version> </dependency>
生产者
package com.test.pulsar; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.shade.io.netty.util.HashedWheelTimer; import org.apache.pulsar.shade.io.netty.util.Timer; import org.junit.After; import org.junit.Before; import org.junit.Test; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; @Slf4j public class PulsarProduceMessage { private String brokerServiceurl = "pulsar://192.168.31.30:6650"; private String topicName = "persistent://public/default/test"; private Timer timer = new HashedWheelTimer(); private PulsarClient client; private Producer<byte[]> producer; @Before public void init() throws Exception { // 构造Pulsar client client = PulsarClient.builder().serviceUrl(brokerServiceurl).ioThreads(4) // netty的ioThreads负责网络IO操作,如果业务流量较大,可以调高ioThreads个数 .listenerThreads(10) // 负责调用以listener模式启动的消费者的回调函数,建议配置大于该client负责的partition数目 .operationTimeout(5, TimeUnit.SECONDS) // 一些元数据操作的超时时间,Pulsar默认为30s,有些保守,可以根据自己的网络情况、处理性能来适当调低 .connectionTimeout(15, TimeUnit.SECONDS) // 连接Pulsar的超时时间,配置原则同上。 .build(); // 创建producer producer = client.newProducer().topic(topicName).enableBatching(true)// 是否开启批量处理消息,默认true,需要注意的是enableBatching只在异步发送sendAsync生效,同步发送send失效。因此建议生产环境若想使用批处理,则需使用异步发送,或者多线程同步发送 .compressionType(CompressionType.LZ4)// 消息压缩(四种压缩方式:LZ4,ZLIB,ZSTD,SNAPPY),consumer端不用做改动就能消费,开启后大约可以降低3/4带宽消耗和存储(官方测试) .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) // 设置将对发送的消息进行批处理的时间段,10ms;可以理解为若该时间段内批处理成功,则一个batch中的消息数量不会被该参数所影响。 .sendTimeout(0, TimeUnit.SECONDS)// 设置发送超时0s;如果在sendTimeout过期之前服务器没有确认消息,则会发生错误。默认30s,设置为0代表无限制,建议配置为0 .batchingMaxMessages(1000)// 批处理中允许的最大消息数。默认1000 .maxPendingMessages(1000)// 设置等待接受来自broker确认消息的队列的最大大小,默认1000 .blockIfQueueFull(true)// 设置当消息队列中等待的消息已满时,Producer.send 和 Producer.sendAsync // 是否应该block阻塞。默认为false,达到maxPendingMessages后send操作会报错,设置为true后,send操作阻塞但是不报错。建议设置为true .roundRobinRouterBatchingPartitionSwitchFrequency(10)// 向不同partition分发消息的切换频率,默认10ms,可根据batch情况灵活调整 .batcherBuilder(BatcherBuilder.DEFAULT)// key_Shared模式要用KEY_BASED,才能保证同一个key的message在一个batch里 .create(); } /** * 同步发送 * @throws Exception */ @Test public void syncSend() throws Exception { final String content = "this is a testsyncSend message"; MessageId messageId = producer.send((content).getBytes());// 同步发送 log.info("message id : {}", JSON.toJSONString(messageId)); } /** * 异步发送 */ @Test public void asyncSend() { String content = "this is a asyncSend message"; CompletableFuture<MessageId> future = producer.sendAsync(content.getBytes());// 异步发送 future.handle((v, ex) -> { if (ex == null) { log.info("消息发送成功::{}", content); } else { log.info("消息发送失败:{}", content + ex); } return null; }); future.join(); log.info("发送完毕: {}", content); } @After public void close() throws Exception { producer.close();// 关闭producer client.close();// 关闭client } }
消费者
package com.test.pulsar; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.junit.After; import org.junit.Before; import org.junit.Test; import lombok.extern.slf4j.Slf4j; @Slf4j public class PulsarConsumerMessage { private String brokerServiceurl = "pulsar://192.168.31.30:6650"; private PulsarClient client; private Consumer<String> consumer; @Before public void init() throws Exception { client = PulsarClient.builder() .serviceUrl(brokerServiceurl) .ioThreads(4) //netty的ioThreads负责网络IO操作,如果业务流量较大,可以调高ioThreads个数 .listenerThreads(10) //负责调用以listener模式启动的消费者的回调函数,建议配置大于该client负责的partition数目 .operationTimeout(5, TimeUnit.SECONDS) //一些元数据操作的超时时间,Pulsar默认为30s,有些保守,可以根据自己的网络情况、处理性能来适当调低 .connectionTimeout(15, TimeUnit.SECONDS) //连接Pulsar的超时时间,配置原则同上。 .build(); consumer = client.newConsumer(Schema.STRING) .topic("public/default/test") .subscriptionName("my-subscription") .ackTimeout(10, TimeUnit.SECONDS) //当服务端推送消息,但消费者未及时回复ack,经过ackTimeout后,会重新推送给消费者处理 .subscriptionType(SubscriptionType.Exclusive)//订阅类型,根据业务需求决定。 .subscribe(); } @Test public void consumer() throws Exception { // 死循环接收 while (true) { Message<String> message = consumer.receive(); String msgContent = message.getValue(); log.info("接收到消息: {}", msgContent); consumer.acknowledge(message); } } @After public void close() throws Exception { client.close(); consumer.close(); } }
所有的注释都写在代码里面了,暂时不做过多的解释,直接看代码注释即可。
还没有评论,来说两句吧...