最近对项目进行并发测试,结果发现之前测试好的东西,这并发测试支持不到10个
幸亏这个项目还没上线,如果上线了,这绝对是一个必须卷铺盖滚蛋的超级重大bug。没办法,只能挨个排查,首先排查报错,一看数据库执行报错,大量的mysql中断错误,然后去看配置文件,纳尼,竟然没有使用数据库连接池。
果断的给数据库连接配上了数据库连接池,然后再测试,此时sql执行不再报错了,然而出现了大量的jedis连接中断。然后看配置文件,这里用了redis连接池的,为什么并发不起来还报错呢,果断的调大redis连接池的信息:
然后测试,发现还是大量的jedis连接中断异常。没办法了,只能看代码了,然后看代码,发现里面使用了redisTemplate,同时还又配置了一个redis客户端,也就是代码里面使用了两套redis客户端,第二套redis客户端用于分布式锁,怪不得在配置文件中修改连接池没有任何效果,然后注释掉分布式锁,项目可以实现大并发了。果然,此时断定肯定是第二套分布式锁的redis客户端有问题,然后看了配置,发现这里的redis又单独的配置了redis连接池,然后使用完redis之后,没有吧redis放回到pool里面。因此各种错误。
反思
1、一套代码里面为什么要用两套redis客户端?
果断的进行代码整改,必须只使用一套客户端,并且一套配置。所以这里要求统一使用第一套的redisTemplate。本文的话,介绍下redisTemplate框架下的分布式锁实现。下面直接开始。
1)首先创建一个maven项目,引入redis的依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
记住:
使用redis连接池,必须要搭配这个commons-pool2的依赖,不然的话实现不了这个redis的连接池。
2)接着我们在application.yml里面配置下redis的信息
spring: redis: host: 192.168.31.30 port: 6379 database: 3 timeout: 10000 password: 3c0bef8420ca0961a74ff1fbe8c66ef4 lettuce: pool: # 连接池中的最小空闲连接 min-idle: 0 # 连接池中的最大空闲连接 max-idle: 30 # 连接池的最大数据库连接数 max-active: 30 #连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1
这里我们主要是使用这个redis的lettuce连接池。
3)初始化redisTemplate
这里的话我们必须要初始化这个redistemplate,这里主要是实现序列化,因此我们创建一个redisconfig的类,完整代码如下:
package com.redistemplet.lock.config; import java.net.UnknownHostException; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { // 创建模板 RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); // 设置连接工厂 redisTemplate.setConnectionFactory(redisConnectionFactory); // 设置序列化工具 GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer(); // key和 hashKey采用 string序列化 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); // value和 hashValue采用 JSON序列化 redisTemplate.setValueSerializer(jsonRedisSerializer); redisTemplate.setHashValueSerializer(jsonRedisSerializer); return redisTemplate; } }
此时我们就可以在项目里面就可以使用redisTemplate对象了。
4)使用redistemplate编写分布式锁
这里的话,我们使用redistemplate来实现分布式锁,所以这里的话,主要涉及的函数是setnx和expire两个方法,完整代码如下:
package com.redistemplet.lock.helper; import java.util.Objects; import java.util.UUID; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; @Component public class RedisLockHelper { public static final String DEFAULT_LOCK_PREFIX = "redis_lock"; @Autowired private RedisTemplate<String, Object> redisTemplate; /** * * @param lockPrefix * @param key * @param expire_time 分布式锁过期时间防止死锁,单位是ms * @return */ @SuppressWarnings({ "unchecked", "rawtypes" }) public boolean lock(String lockPrefix, String key, final Integer expire_time) { // 设置分布式锁的lock key String lock = new StringBuilder(StringUtils.isNotBlank(lockPrefix) ? lockPrefix : DEFAULT_LOCK_PREFIX) .append("_").append(key).toString(); // 这里的值是准备设置的,并且是在后面进行比对的值,防止把当前线程的锁误认为是自己的锁了。 String thread_id = UUID.randomUUID().toString().replace("-", ""); System.out.println("当前的线程id是:" + thread_id); // 利用lambda表达式 return (Boolean) redisTemplate.execute((RedisCallback) connection -> { Boolean acquire = connection.setNX(lock.getBytes(), String.valueOf(thread_id).getBytes()); System.out.println("acquire 的值是:" + acquire); if (acquire) { System.out.println("setnx true"); connection.expire(lock.getBytes(), expire_time/1000); return true; } else { System.out.println("setnx false"); byte[] value = connection.get(lock.getBytes()); if (Objects.nonNull(value) && value.length > 0) { String redisThreadId = new String(value); System.out.println("当前线程准备重入,当前线程id是:" + thread_id + ":" + redisThreadId + ",结果是:" + thread_id.equals(redisThreadId)); // 如果当前redis获取到的value值和当前线程的id不一样,则不是当前对象获取的值。不能重入 return thread_id.equals(redisThreadId); } } return false; }); } /** * 删除锁 * * @param key */ public void delete(String key) { redisTemplate.delete(key); } }
此分布式锁的完整逻辑都在上面代码的注释里面。
5)接着我们编写方法测试下分布式锁
这里我们业务以同一个用户为redissetvalue为案例,也就是拿到锁,我就向redis里面加1,没有拿到锁就不加1,这样一个演示场景,示例代码如下:
package com.redistemplet.lock.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.redistemplet.lock.helper.RedisLockHelper; @RestController public class HelloController { @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired RedisLockHelper redisLockHelper; /** * 这里我们模拟一个场景,就是请求一次获取一个分布式锁,然后使用redis做自增加1,然后并发测试下看是否可以成功完成分布式锁的使用 * * @return */ @RequestMapping("/hello") public String hello() { this.redisIncrement(); return "hello"; } /** * 这里模拟可能出现数据错误的情况来使用分布式锁 */ private void redisIncrement() { String key = "userid:1"; //这里给的超时时间一定要尽量大于整个程序执行的时间 boolean lock = redisLockHelper.lock("wallet", key,3000); if (lock) { System.out.println("获取到了锁,准备执行"); //获取到了锁,模拟业务操作 this.setRedisValue(); // 最后执行成功要手动的释放锁 redisLockHelper.delete(key); } else { System.out.println("没获取到锁,准备重试"); // 设置失败次数计数器, 当到达5次时, 返回失败 int failCount = 1; while(failCount <= 5){ // 等待100ms重试 try { Thread.sleep(100l); } catch (InterruptedException e) { e.printStackTrace(); } if (redisLockHelper.lock("wallet", key, 300)){ // 执行逻辑操作 redisLockHelper.delete(key); }else{ failCount ++; } } throw new RuntimeException("现在使用的人太多了, 请稍等再试"); } } private void setRedisValue() { Integer value = (Integer) redisTemplate.opsForValue().get("testfoo"); if(null == value) { value = 0; } value += 1; redisTemplate.opsForValue().set("testfoo", value); } }
这里主要的逻辑就是拿到锁就加1,没有拿到锁就重试5次进行尝试。
6)测试
这里我们使用apipost进行测试,测试的场景如下:
并发数100个,同时压测10次:
点击压测之后,我们看看测试结果:
这里测试请求成功4次,失败了996次,那也就是我们redis里面的预期值应该是4,下面看看是不是4:
可以看到和预期的值是一模一样的。
备注:
1、在真实的业务中一定要注意下,不管是mysql。还是redis,还是其他什么,连接池是标配。这应该是每一个开发人员的基础意识。 2、在同一套业务代码里面,千万不要你做你的,我做我的,你搞一套redis,我搞一套redis,这玩意儿终究必然会出问题。 3、在redishelper里面,有一个可冲入的逻辑,本案例只是展示这个可冲入逻辑,真实的情况,这里的threadid应该是在业务重试之外。 4、使用分布式锁的时候切记要做最小粒度的锁,不要做大锁,做了大锁,整个系统会各种慢,各种失败,严重影响业务。
以上就是关于redistTemplate实现分布式锁的演示,最后按照惯例,附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...