前面我们尝试介绍了pulsar,同时也安装了一个简单的pulsar,在这里我们编写个pulsar 的javademo。供大家参考。
首先我们引入pulsar的maven依赖
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.6.1</version> </dependency>
生产者
package com.test.pulsar;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.shade.io.netty.util.HashedWheelTimer;
import org.apache.pulsar.shade.io.netty.util.Timer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PulsarProduceMessage {
private String brokerServiceurl = "pulsar://192.168.31.30:6650";
private String topicName = "persistent://public/default/test";
private Timer timer = new HashedWheelTimer();
private PulsarClient client;
private Producer<byte[]> producer;
@Before
public void init() throws Exception {
// 构造Pulsar client
client = PulsarClient.builder().serviceUrl(brokerServiceurl).ioThreads(4) // netty的ioThreads负责网络IO操作,如果业务流量较大,可以调高ioThreads个数
.listenerThreads(10) // 负责调用以listener模式启动的消费者的回调函数,建议配置大于该client负责的partition数目
.operationTimeout(5, TimeUnit.SECONDS) // 一些元数据操作的超时时间,Pulsar默认为30s,有些保守,可以根据自己的网络情况、处理性能来适当调低
.connectionTimeout(15, TimeUnit.SECONDS) // 连接Pulsar的超时时间,配置原则同上。
.build();
// 创建producer
producer = client.newProducer().topic(topicName).enableBatching(true)// 是否开启批量处理消息,默认true,需要注意的是enableBatching只在异步发送sendAsync生效,同步发送send失效。因此建议生产环境若想使用批处理,则需使用异步发送,或者多线程同步发送
.compressionType(CompressionType.LZ4)// 消息压缩(四种压缩方式:LZ4,ZLIB,ZSTD,SNAPPY),consumer端不用做改动就能消费,开启后大约可以降低3/4带宽消耗和存储(官方测试)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) // 设置将对发送的消息进行批处理的时间段,10ms;可以理解为若该时间段内批处理成功,则一个batch中的消息数量不会被该参数所影响。
.sendTimeout(0, TimeUnit.SECONDS)// 设置发送超时0s;如果在sendTimeout过期之前服务器没有确认消息,则会发生错误。默认30s,设置为0代表无限制,建议配置为0
.batchingMaxMessages(1000)// 批处理中允许的最大消息数。默认1000
.maxPendingMessages(1000)// 设置等待接受来自broker确认消息的队列的最大大小,默认1000
.blockIfQueueFull(true)// 设置当消息队列中等待的消息已满时,Producer.send 和 Producer.sendAsync
// 是否应该block阻塞。默认为false,达到maxPendingMessages后send操作会报错,设置为true后,send操作阻塞但是不报错。建议设置为true
.roundRobinRouterBatchingPartitionSwitchFrequency(10)// 向不同partition分发消息的切换频率,默认10ms,可根据batch情况灵活调整
.batcherBuilder(BatcherBuilder.DEFAULT)// key_Shared模式要用KEY_BASED,才能保证同一个key的message在一个batch里
.create();
}
/**
* 同步发送
* @throws Exception
*/
@Test
public void syncSend() throws Exception {
final String content = "this is a testsyncSend message";
MessageId messageId = producer.send((content).getBytes());// 同步发送
log.info("message id : {}", JSON.toJSONString(messageId));
}
/**
* 异步发送
*/
@Test
public void asyncSend() {
String content = "this is a asyncSend message";
CompletableFuture<MessageId> future = producer.sendAsync(content.getBytes());// 异步发送
future.handle((v, ex) -> {
if (ex == null) {
log.info("消息发送成功::{}", content);
} else {
log.info("消息发送失败:{}", content + ex);
}
return null;
});
future.join();
log.info("发送完毕: {}", content);
}
@After
public void close() throws Exception {
producer.close();// 关闭producer
client.close();// 关闭client
}
}消费者
package com.test.pulsar;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PulsarConsumerMessage {
private String brokerServiceurl = "pulsar://192.168.31.30:6650";
private PulsarClient client;
private Consumer<String> consumer;
@Before
public void init() throws Exception {
client = PulsarClient.builder()
.serviceUrl(brokerServiceurl)
.ioThreads(4) //netty的ioThreads负责网络IO操作,如果业务流量较大,可以调高ioThreads个数
.listenerThreads(10) //负责调用以listener模式启动的消费者的回调函数,建议配置大于该client负责的partition数目
.operationTimeout(5, TimeUnit.SECONDS) //一些元数据操作的超时时间,Pulsar默认为30s,有些保守,可以根据自己的网络情况、处理性能来适当调低
.connectionTimeout(15, TimeUnit.SECONDS) //连接Pulsar的超时时间,配置原则同上。
.build();
consumer = client.newConsumer(Schema.STRING)
.topic("public/default/test")
.subscriptionName("my-subscription")
.ackTimeout(10, TimeUnit.SECONDS) //当服务端推送消息,但消费者未及时回复ack,经过ackTimeout后,会重新推送给消费者处理
.subscriptionType(SubscriptionType.Exclusive)//订阅类型,根据业务需求决定。
.subscribe();
}
@Test
public void consumer() throws Exception {
// 死循环接收
while (true) {
Message<String> message = consumer.receive();
String msgContent = message.getValue();
log.info("接收到消息: {}", msgContent);
consumer.acknowledge(message);
}
}
@After
public void close() throws Exception {
client.close();
consumer.close();
}
}所有的注释都写在代码里面了,暂时不做过多的解释,直接看代码注释即可。









还没有评论,来说两句吧...