最近对项目进行并发测试,结果发现之前测试好的东西,这并发测试支持不到10个
幸亏这个项目还没上线,如果上线了,这绝对是一个必须卷铺盖滚蛋的超级重大bug。没办法,只能挨个排查,首先排查报错,一看数据库执行报错,大量的mysql中断错误,然后去看配置文件,纳尼,竟然没有使用数据库连接池。
果断的给数据库连接配上了数据库连接池,然后再测试,此时sql执行不再报错了,然而出现了大量的jedis连接中断。然后看配置文件,这里用了redis连接池的,为什么并发不起来还报错呢,果断的调大redis连接池的信息:
然后测试,发现还是大量的jedis连接中断异常。没办法了,只能看代码了,然后看代码,发现里面使用了redisTemplate,同时还又配置了一个redis客户端,也就是代码里面使用了两套redis客户端,第二套redis客户端用于分布式锁,怪不得在配置文件中修改连接池没有任何效果,然后注释掉分布式锁,项目可以实现大并发了。果然,此时断定肯定是第二套分布式锁的redis客户端有问题,然后看了配置,发现这里的redis又单独的配置了redis连接池,然后使用完redis之后,没有吧redis放回到pool里面。因此各种错误。
反思
1、一套代码里面为什么要用两套redis客户端?
果断的进行代码整改,必须只使用一套客户端,并且一套配置。所以这里要求统一使用第一套的redisTemplate。本文的话,介绍下redisTemplate框架下的分布式锁实现。下面直接开始。
1)首先创建一个maven项目,引入redis的依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
记住:
使用redis连接池,必须要搭配这个commons-pool2的依赖,不然的话实现不了这个redis的连接池。
2)接着我们在application.yml里面配置下redis的信息
spring: redis: host: 192.168.31.30 port: 6379 database: 3 timeout: 10000 password: 3c0bef8420ca0961a74ff1fbe8c66ef4 lettuce: pool: # 连接池中的最小空闲连接 min-idle: 0 # 连接池中的最大空闲连接 max-idle: 30 # 连接池的最大数据库连接数 max-active: 30 #连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1
这里我们主要是使用这个redis的lettuce连接池。
3)初始化redisTemplate
这里的话我们必须要初始化这个redistemplate,这里主要是实现序列化,因此我们创建一个redisconfig的类,完整代码如下:
package com.redistemplet.lock.config; import java.net.UnknownHostException; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { // 创建模板 RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); // 设置连接工厂 redisTemplate.setConnectionFactory(redisConnectionFactory); // 设置序列化工具 GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer(); // key和 hashKey采用 string序列化 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); // value和 hashValue采用 JSON序列化 redisTemplate.setValueSerializer(jsonRedisSerializer); redisTemplate.setHashValueSerializer(jsonRedisSerializer); return redisTemplate; } }
此时我们就可以在项目里面就可以使用redisTemplate对象了。
4)使用redistemplate编写分布式锁
这里的话,我们使用redistemplate来实现分布式锁,所以这里的话,主要涉及的函数是setnx和expire两个方法,完整代码如下:
package com.redistemplet.lock.helper;
import java.util.Objects;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class RedisLockHelper {
public static final String DEFAULT_LOCK_PREFIX = "redis_lock";
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
*
* @param lockPrefix
* @param key
* @param expire_time 分布式锁过期时间防止死锁,单位是ms
* @return
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public boolean lock(String lockPrefix, String key, final Integer expire_time) {
// 设置分布式锁的lock key
String lock = new StringBuilder(StringUtils.isNotBlank(lockPrefix) ? lockPrefix : DEFAULT_LOCK_PREFIX)
.append("_").append(key).toString();
// 这里的值是准备设置的,并且是在后面进行比对的值,防止把当前线程的锁误认为是自己的锁了。
String thread_id = UUID.randomUUID().toString().replace("-", "");
System.out.println("当前的线程id是:" + thread_id);
// 利用lambda表达式
return (Boolean) redisTemplate.execute((RedisCallback) connection -> {
Boolean acquire = connection.setNX(lock.getBytes(), String.valueOf(thread_id).getBytes());
System.out.println("acquire 的值是:" + acquire);
if (acquire) {
System.out.println("setnx true");
connection.expire(lock.getBytes(), expire_time/1000);
return true;
} else {
System.out.println("setnx false");
byte[] value = connection.get(lock.getBytes());
if (Objects.nonNull(value) && value.length > 0) {
String redisThreadId = new String(value);
System.out.println("当前线程准备重入,当前线程id是:" + thread_id + ":" + redisThreadId + ",结果是:"
+ thread_id.equals(redisThreadId));
// 如果当前redis获取到的value值和当前线程的id不一样,则不是当前对象获取的值。不能重入
return thread_id.equals(redisThreadId);
}
}
return false;
});
}
/**
* 删除锁
*
* @param key
*/
public void delete(String key) {
redisTemplate.delete(key);
}
}此分布式锁的完整逻辑都在上面代码的注释里面。
5)接着我们编写方法测试下分布式锁
这里我们业务以同一个用户为redissetvalue为案例,也就是拿到锁,我就向redis里面加1,没有拿到锁就不加1,这样一个演示场景,示例代码如下:
package com.redistemplet.lock.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.redistemplet.lock.helper.RedisLockHelper;
@RestController
public class HelloController {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
RedisLockHelper redisLockHelper;
/**
* 这里我们模拟一个场景,就是请求一次获取一个分布式锁,然后使用redis做自增加1,然后并发测试下看是否可以成功完成分布式锁的使用
*
* @return
*/
@RequestMapping("/hello")
public String hello() {
this.redisIncrement();
return "hello";
}
/**
* 这里模拟可能出现数据错误的情况来使用分布式锁
*/
private void redisIncrement() {
String key = "userid:1";
//这里给的超时时间一定要尽量大于整个程序执行的时间
boolean lock = redisLockHelper.lock("wallet", key,3000);
if (lock) {
System.out.println("获取到了锁,准备执行");
//获取到了锁,模拟业务操作
this.setRedisValue();
// 最后执行成功要手动的释放锁
redisLockHelper.delete(key);
} else {
System.out.println("没获取到锁,准备重试");
// 设置失败次数计数器, 当到达5次时, 返回失败
int failCount = 1;
while(failCount <= 5){
// 等待100ms重试
try {
Thread.sleep(100l);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (redisLockHelper.lock("wallet", key, 300)){
// 执行逻辑操作
redisLockHelper.delete(key);
}else{
failCount ++;
}
}
throw new RuntimeException("现在使用的人太多了, 请稍等再试");
}
}
private void setRedisValue() {
Integer value = (Integer) redisTemplate.opsForValue().get("testfoo");
if(null == value) {
value = 0;
}
value += 1;
redisTemplate.opsForValue().set("testfoo", value);
}
}这里主要的逻辑就是拿到锁就加1,没有拿到锁就重试5次进行尝试。
6)测试
这里我们使用apipost进行测试,测试的场景如下:
并发数100个,同时压测10次:
点击压测之后,我们看看测试结果:
这里测试请求成功4次,失败了996次,那也就是我们redis里面的预期值应该是4,下面看看是不是4:
可以看到和预期的值是一模一样的。
备注:
1、在真实的业务中一定要注意下,不管是mysql。还是redis,还是其他什么,连接池是标配。这应该是每一个开发人员的基础意识。 2、在同一套业务代码里面,千万不要你做你的,我做我的,你搞一套redis,我搞一套redis,这玩意儿终究必然会出问题。 3、在redishelper里面,有一个可冲入的逻辑,本案例只是展示这个可冲入逻辑,真实的情况,这里的threadid应该是在业务重试之外。 4、使用分布式锁的时候切记要做最小粒度的锁,不要做大锁,做了大锁,整个系统会各种慢,各种失败,严重影响业务。
以上就是关于redistTemplate实现分布式锁的演示,最后按照惯例,附上本案例的源码,登录后即可下载。







还没有评论,来说两句吧...