在秒杀的文章系列里面我们介绍了使用redis做分布式锁在大型的高并发场景里面是不合适的,主要是由于redis的集群同步的问题。所以当时给大家建议的是使用etcd来做分布式锁。etcd的分布式锁在单机的环境下,每秒QPS可以达到10W+。所以是一个非常适合的分布式锁方案。本篇文章主要演示etcd做分布式锁的java演示。后续文件会陆续介绍etcd。
1)创建一个maven项目,并且在pom里面引入如下依赖
<!-- https://mvnrepository.com/artifact/io.etcd/jetcd-core --> <dependency> <groupId>io.etcd</groupId> <artifactId>jetcd-core</artifactId> <version>0.5.3</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>26.0-jre</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version> </dependency>
2)创建一个lockResult的java类
package com.test.etcd; import java.io.Serializable; import java.util.concurrent.ScheduledExecutorService; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import lombok.experimental.Builder; import lombok.ToString; @Data @NoArgsConstructor @AllArgsConstructor @EqualsAndHashCode @ToString @Builder public class LockResult implements Serializable{ /** * */ private static final long serialVersionUID = 1L; private boolean isLockSuccess; private long leaseId; private ScheduledExecutorService service; }
3)编写一个etcd工具类
package com.test.etcd; import java.nio.charset.Charset; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; import io.etcd.jetcd.Lease; import io.etcd.jetcd.Lock; import lombok.extern.slf4j.Slf4j; @Slf4j public class EtcdDistributedLockUtils { private static EtcdDistributedLockUtils lock = null; private static Object mutex = new Object(); private Client client; // etcd客户端 private Lock lockClient; // etcd分布式锁客户端 private Lease leaseClient; // etcd租约客户端 private EtcdDistributedLockUtils() { super(); // 创建Etcd客户端,本例中Etcd集群只有一个节点 this.client = Client.builder().endpoints("http://192.168.31.30:32379").build(); this.lockClient = client.getLockClient(); this.leaseClient = client.getLeaseClient(); } /** * 单例 */ public static EtcdDistributedLockUtils getInstance() { synchronized (mutex) { // 互斥锁 if (null == lock) { lock = new EtcdDistributedLockUtils(); } } return lock; } /** * 加锁操作,需要注意的是,本例中没有加入重试机制,加锁失败将直接返回。 * * @param lockName: 针对某一共享资源(数据、文件等)制定的锁名 * @param TTL: Time To Live,租约有效期,一旦客户端崩溃,可在租约到期后自动释放锁 * @return LockResult */ public LockResult lock(String lockName, long TTL) { LockResult lockResult = new LockResult(); /* 1.准备阶段 */ // 创建一个定时任务作为“心跳”,保证等待锁释放期间,租约不失效; // 同时,一旦客户端发生故障,心跳便会停止,锁也会因租约过期而被动释放,避免死锁 ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); // 初始化返回值lockResult lockResult.setLockSuccess(false); lockResult.setService(service); // 记录租约ID,初始值设为 0L Long leaseId = 0L; /* 2.创建租约 */ try { // 创建一个租约,租约有效期为TTL,实际应用中根据具体业务确定 leaseId = leaseClient.grant(TTL).get().getID(); lockResult.setLeaseId(leaseId); // 启动定时任务续约,心跳周期和初次启动延时计算公式如下,可根据实际业务制定 long period = TTL - TTL / 5; service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), period, period, TimeUnit.SECONDS); } catch (Exception e) { log.error(e.getMessage(), e); return lockResult; } /* 3.加锁操作 */ // 执行加锁操作,并为锁对应的key绑定租约 try { lockClient.lock(ByteSequence.from(lockName, Charset.forName("UTF-8")), leaseId).get(); } catch (Exception e) { log.error(e.getMessage(), e); return lockResult; } lockResult.setLockSuccess(true); return lockResult; } /** * 解锁操作,释放锁、关闭定时任务、解除租约 * * @param lockName:锁名 * @param lockResult:加锁操作返回的结果 */ public void unLock(String lockName, LockResult lockResult) { System.err.println( System.currentTimeMillis() + "|[unlock]: " + Thread.currentThread().getName() + " start to unlock."); try { // 释放锁 lockClient.unlock(ByteSequence.from(lockName, Charset.forName("UTF-8"))).get(); // 关闭定时任务 lockResult.getService().shutdown(); // 删除租约 if (lockResult.getLeaseId() != 0L) { leaseClient.revoke(lockResult.getLeaseId()); } } catch (Exception e) { log.error(e.getMessage(), e); } } }
4)创建一个续租的子线程
package com.test.etcd; import io.etcd.jetcd.Lease; import lombok.extern.slf4j.Slf4j; @Slf4j public class KeepAliveTask implements Runnable { private Lease leaseClient; private long leaseId; KeepAliveTask(Lease leaseClient, long leaseId) { this.leaseClient = leaseClient; this.leaseId = leaseId; } @Override public void run() { // 续约一次 log.info("{} 续约了一次", leaseId); leaseClient.keepAliveOnce(leaseId); } }
5)模拟真实环境,创建一个使用etcd的task类
package com.test.etcd; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; @Slf4j public class EtcdDistributeLockTask implements Runnable { private String lockName ; public EtcdDistributeLockTask(String lockName) { this.lockName = lockName; } @Override public void run() { // 1. 加锁 LockResult lockResult = EtcdDistributedLockUtils.getInstance().lock(lockName, 1); if (lockResult.isLockSuccess()) { // 获得了锁 try { log.info("当前线程:{} 获取到了锁,锁信息是:{}", Thread.currentThread().getName(), JSON.toJSONString(lockResult)); Thread.sleep(10000); // sleep 10秒,模拟执行相关业务 } catch (Exception e) { log.error(e.getMessage(), e); } } log.info("当前线程:{} 准备释放锁:{}", Thread.currentThread().getName(), JSON.toJSONString(lockResult)); // 2. 解锁 EtcdDistributedLockUtils.getInstance().unLock(lockName, lockResult); log.info("当前线程:{} 准备释放锁完成", Thread.currentThread().getName()); } }
6)然后我们使用多线程来演示一下
package com.test.etcd; import lombok.extern.slf4j.Slf4j; @Slf4j public class EtcdDemo { public static void main(String[] args) { for (int i = 0; i < 2; i++) { new Thread(new EtcdDistributeLockTask("/etcd/good/1")).start(); } } }
7)运行结果
可以看到第一个线程获取了锁,然后执行代码,等到第一个线程释放了锁之后,第二个锁才能获取到锁并对应进行执行。
备注:
1、本个demo主要是做etcd的演示,我们会在后续的文章介绍里面,根据实际情况进行完善使用etcd的代码。
还没有评论,来说两句吧...