在秒杀的文章系列里面我们介绍了使用redis做分布式锁在大型的高并发场景里面是不合适的,主要是由于redis的集群同步的问题。所以当时给大家建议的是使用etcd来做分布式锁。etcd的分布式锁在单机的环境下,每秒QPS可以达到10W+。所以是一个非常适合的分布式锁方案。本篇文章主要演示etcd做分布式锁的java演示。后续文件会陆续介绍etcd。
1)创建一个maven项目,并且在pom里面引入如下依赖
<!-- https://mvnrepository.com/artifact/io.etcd/jetcd-core --> <dependency> <groupId>io.etcd</groupId> <artifactId>jetcd-core</artifactId> <version>0.5.3</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>26.0-jre</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version> </dependency>
2)创建一个lockResult的java类
package com.test.etcd;
import java.io.Serializable;
import java.util.concurrent.ScheduledExecutorService;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.Builder;
import lombok.ToString;
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@ToString
@Builder
public class LockResult implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private boolean isLockSuccess;
private long leaseId;
private ScheduledExecutorService service;
}3)编写一个etcd工具类
package com.test.etcd;
import java.nio.charset.Charset;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Lock;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class EtcdDistributedLockUtils {
private static EtcdDistributedLockUtils lock = null;
private static Object mutex = new Object();
private Client client; // etcd客户端
private Lock lockClient; // etcd分布式锁客户端
private Lease leaseClient; // etcd租约客户端
private EtcdDistributedLockUtils() {
super();
// 创建Etcd客户端,本例中Etcd集群只有一个节点
this.client = Client.builder().endpoints("http://192.168.31.30:32379").build();
this.lockClient = client.getLockClient();
this.leaseClient = client.getLeaseClient();
}
/**
* 单例
*/
public static EtcdDistributedLockUtils getInstance() {
synchronized (mutex) { // 互斥锁
if (null == lock) {
lock = new EtcdDistributedLockUtils();
}
}
return lock;
}
/**
* 加锁操作,需要注意的是,本例中没有加入重试机制,加锁失败将直接返回。
*
* @param lockName: 针对某一共享资源(数据、文件等)制定的锁名
* @param TTL: Time To Live,租约有效期,一旦客户端崩溃,可在租约到期后自动释放锁
* @return LockResult
*/
public LockResult lock(String lockName, long TTL) {
LockResult lockResult = new LockResult();
/* 1.准备阶段 */
// 创建一个定时任务作为“心跳”,保证等待锁释放期间,租约不失效;
// 同时,一旦客户端发生故障,心跳便会停止,锁也会因租约过期而被动释放,避免死锁
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
// 初始化返回值lockResult
lockResult.setLockSuccess(false);
lockResult.setService(service);
// 记录租约ID,初始值设为 0L
Long leaseId = 0L;
/* 2.创建租约 */
try {
// 创建一个租约,租约有效期为TTL,实际应用中根据具体业务确定
leaseId = leaseClient.grant(TTL).get().getID();
lockResult.setLeaseId(leaseId);
// 启动定时任务续约,心跳周期和初次启动延时计算公式如下,可根据实际业务制定
long period = TTL - TTL / 5;
service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), period, period, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(e.getMessage(), e);
return lockResult;
}
/* 3.加锁操作 */
// 执行加锁操作,并为锁对应的key绑定租约
try {
lockClient.lock(ByteSequence.from(lockName, Charset.forName("UTF-8")), leaseId).get();
} catch (Exception e) {
log.error(e.getMessage(), e);
return lockResult;
}
lockResult.setLockSuccess(true);
return lockResult;
}
/**
* 解锁操作,释放锁、关闭定时任务、解除租约
*
* @param lockName:锁名
* @param lockResult:加锁操作返回的结果
*/
public void unLock(String lockName, LockResult lockResult) {
System.err.println(
System.currentTimeMillis() + "|[unlock]: " + Thread.currentThread().getName() + " start to unlock.");
try {
// 释放锁
lockClient.unlock(ByteSequence.from(lockName, Charset.forName("UTF-8"))).get();
// 关闭定时任务
lockResult.getService().shutdown();
// 删除租约
if (lockResult.getLeaseId() != 0L) {
leaseClient.revoke(lockResult.getLeaseId());
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}4)创建一个续租的子线程
package com.test.etcd;
import io.etcd.jetcd.Lease;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class KeepAliveTask implements Runnable {
private Lease leaseClient;
private long leaseId;
KeepAliveTask(Lease leaseClient, long leaseId) {
this.leaseClient = leaseClient;
this.leaseId = leaseId;
}
@Override
public void run() {
// 续约一次
log.info("{} 续约了一次", leaseId);
leaseClient.keepAliveOnce(leaseId);
}
}5)模拟真实环境,创建一个使用etcd的task类
package com.test.etcd;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class EtcdDistributeLockTask implements Runnable {
private String lockName ;
public EtcdDistributeLockTask(String lockName) {
this.lockName = lockName;
}
@Override
public void run() {
// 1. 加锁
LockResult lockResult = EtcdDistributedLockUtils.getInstance().lock(lockName, 1);
if (lockResult.isLockSuccess()) { // 获得了锁
try {
log.info("当前线程:{} 获取到了锁,锁信息是:{}", Thread.currentThread().getName(), JSON.toJSONString(lockResult));
Thread.sleep(10000); // sleep 10秒,模拟执行相关业务
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
log.info("当前线程:{} 准备释放锁:{}", Thread.currentThread().getName(), JSON.toJSONString(lockResult));
// 2. 解锁
EtcdDistributedLockUtils.getInstance().unLock(lockName, lockResult);
log.info("当前线程:{} 准备释放锁完成", Thread.currentThread().getName());
}
}6)然后我们使用多线程来演示一下
package com.test.etcd;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class EtcdDemo {
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
new Thread(new EtcdDistributeLockTask("/etcd/good/1")).start();
}
}
}7)运行结果
可以看到第一个线程获取了锁,然后执行代码,等到第一个线程释放了锁之后,第二个锁才能获取到锁并对应进行执行。
备注:
1、本个demo主要是做etcd的演示,我们会在后续的文章介绍里面,根据实际情况进行完善使用etcd的代码。










还没有评论,来说两句吧...