Java 使用redis实现分布式锁 Java SpringCloud实现分布式锁 (jedis、jedis+lua、redisson)
实现分布式锁常见的三种方式:database、redis、zookeeper。
本文介绍使用redis实现分布式锁。
在单体架构中,我们常用的锁是 synchronized 和 ReentrantLock,本地锁在单体架构中是完全够用的,因为所有线程都共享一个内存空间 (所有进程都在一个jvm里面),但是在分布式架构中,线程可能是来自不同的jvm,这个时候内存空间是不共享的,对象是不可见的,就会保证不了数据的一致性,这明显不是我们加锁的初衷。
下面我们通过下单的例子(demo代码放到了码云,文章完结附码云地址)来演示一下分布式锁 (下单–扣减商品数量–扣减余额):
首先是没任何锁的情况下:
代码:
原数据:通过接口查出来数据,取任意一条测试,注意number为商品的剩余数量
通过接口查看当前余额为1000
测试开始:
- 使用apifox进行压测,并发设置10,每个进程执行30次,一共300次,注意库存变化
- 点击开始压测,等待压测完成,调用接口查看库存
发现库存超了,出现了超卖的问题,这就是没加任何锁的情况。
加synchronized锁:
加锁后重复压测步骤,结论还是超卖
加ReentrantLock锁:
重复压测步骤,结论是依然会出现超卖
结论
以上是几种在分布式系统中,使用本地锁的测试情况,结论是在分布式系统中,无论是使用哪种本地锁,都会造成超卖等现象,正好验证了我们刚开始说的那样。本地锁只能对单个进程进行限制,如果是多个进程,就麻瓜了。
使用redis实现分布式锁:
进入本文正题,如何使用redis实现分布式锁?其实不论是使用redis,数据库还是其他的方式,大致的思路都是不会变的,首先应该想的是实现分布式锁应该考虑哪些问题?我总结了以下几点:
- 单线程访问(同一时刻只能有一个线程持有锁,即加锁、解锁)。
- 自己加的锁必须得自己释放。
- 设置超时时间、实现超时续时。
- 高可用:主从节点或集群,主要为了防止节点实现,锁一直持有不释放造成死锁。 综合考虑到以上几点问题,分布式锁大致就可以实现了。
方式一:使用jedis
我们第一个想到的可能就是jedis中有一个setnx和expire两个方法,直接调用这两个方法去设置一下不就行了?但是这里需要注意的是可能会出现一种情况,setnx设置键值成功了,但是设置expire超时时间失败了,那就会一直持有锁,当然在设置超时时间之后调用jedis.ttl的方法来查询超时时间是否设置成功,如果没成功的话再次设置,这样的话也不是不行,只是需要考虑,万一再次设置也失败了,那就尬住了。所以就只能两个一个设置,要么都成功,要么失败,两种实现方式:1、使用jedis的set 。2、使用lua脚本来保证原子性。在这里我们采用第一种方式,脚本的方式在下面。
先贴代码:
redis.clients
jedis
4.2.3
//初始化jedisPool,host、port是从配置文件获取的
@Configuration
public class InitJedisPool {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int post;
@Value("${spring.redis.password}")
private String password;
@Bean("jedisPool")
public JedisPool initJedisPool() {
JedisPoolConfig config = new JedisPoolConfig();
//资源池中最大连接数
config.setMaxTotal(100);
//资源池中最大空闲连接数
config.setMaxIdle(100);
//资源池确保的最少空闲连接数
config.setMinIdle(10);
//是否开启空闲检测
config.setTestWhileIdle(true);
//向资源池借用连接时是否做连接有效性检测(ping)。检测到的无效连接将会被移除。
config.setTestOnBorrow(true);
//向资源池归还连接时是否做连接有效性检测(ping)。检测到无效连接将会被移除。
config.setTestOnReturn(true);
//做空闲资源检测时,每次检测资源的个数。可根据自身应用连接数进行微调,如果设置为 -1,就是对所有连接做空闲监测。
config.setNumTestsPerEvictionRun(-1);
//不加此句会报错pool2 的bean已经注册
config.setJmxEnabled(false);
if (StringUtils.isNotBlank(password)) {
return new JedisPool(config, host, post, 2000, password);
} else {
return new JedisPool(config, host, post, 2000);
}
}
}
//使用jedis实现redis分布式锁,思路是将线程ID和uuid组合成val放进redis,如果设置成功了,就开启一个守护去给key续期(可能会存在业务还没处理完,但是key过期了,就得给key续期)
//业务处理完后,停止守护线程,判断是否是自己加的锁,如果是就删除key
@Component
public class RedisLock {
@Autowired
@Qualifier("jedisPool")
private JedisPool jedisPool;
private final int DB = 0;
private final String LOCK_KEY = "REDIS:LOCK:%s";
private volatile boolean running = true;
private Jedis getJedis(int db) {
Jedis jedis = jedisPool.getResource();
jedis.select(db);
return jedis;
}
/**
* @param key 业务key
* @param expireTime 超时时间/ms
* @param waitTimeout 等待时间,超过等待时间就放弃/ms
* @param val uuid
* 键值对的value值,一般为线程ID+uuid,在做删除操作时拿来对比
* SetParams
* ex: 设置键值的过期时间(单位为秒)。
* px: 设置键值的过期时间(单位为毫秒)。
* nx: 仅在键不存在时设置键值。
* xx: 仅在键已经存在时更新键值。
* @return boolean true加锁成功 false加锁失败
*/
public boolean tryLock(String key, String val, long expireTime, long waitTimeout) {
this.check(expireTime, waitTimeout);
key = String.format(LOCK_KEY, key);
val = Thread.currentThread().getId() + "-" + val;
try (Jedis jedis = this.getJedis(DB)) {
//计算等待超时时间,超过时间直接返回获取锁失败
long deadTimeLine = System.currentTimeMillis() + waitTimeout;
//是否继续获取锁
boolean flag = true;
//锁是否获取成功
boolean result = false;
while (flag) {
SetParams setParams = new SetParams();
setParams.nx();
setParams.ex(expireTime);
//成功返回OK,否则返回null
String res = jedis.set(key, val, setParams);
if ("OK".equals(res)) {
flag = false;
result = true;
System.out.println(val + "加锁成功,开始守护");
this.guardThread(key, expireTime);
} else {
waitTimeout = deadTimeLine - System.currentTimeMillis();
// 超过等待时间仍然没有成功获取锁的话就放弃,这里判断大于100是因为下面睡眠了100,如果小于100,直接返回获取锁失败
if (waitTimeout > 100L) {
try {
//防止一直获取锁
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
} else {
flag = false;
}
result = false;
}
}
return result;
}
}
/**
* 看门狗机制,开启守护进程,进行key续期
*
* @param key
* @param expireTime
*/
private void guardThread(String key, long expireTime) {
Thread thread = new Thread(() -> {
try {
//先睡眠一下,防止一进来就续期
Thread.sleep(expireTime - 500);
} catch (Exception e) {
e.printStackTrace();
return;
}
while (this.running) {
try (Jedis jedis = this.getJedis(DB)) {
if (jedis.exists(key)) {
//续期
System.out.println(System.currentTimeMillis());
System.out.println(key + ":续期");
jedis.expire(key, expireTime);
//查询key是否设置了过期时间 -2为key不存在 -1为key存在但是没有过期时间 是否以秒为单位返回剩余时间,如果为-1再次设置
if (-1 == jedis.ttl(key)) {
jedis.expire(key, expireTime);
}
//超过两次设置失败就不守护了
if (-1 == jedis.ttl(key)) {
this.running = false;
} else {
try {
Thread.sleep(expireTime - 500);
} catch (InterruptedException e) {
e.printStackTrace();
this.running = false;
}
}
} else {
this.running = false;
}
} catch (Exception e) {
e.printStackTrace();
this.running = false;
}
}
});
//java中线程分为两种类型:用户线程和守护线程。通过Thread.setDaemon(false)设置为用户线程;通过Thread.setDaemon(true)设置为守护线程。如果不设置次属性,默认为用户线程。
thread.setDaemon(true);
thread.start();
}
private void threadStop() {
this.running = false;
}
public void unlock(String key, String val) {
this.threadStop();
key = String.format(LOCK_KEY, key);
val = Thread.currentThread().getId() + "-" + val;
try (Jedis jedis = this.getJedis(DB)) {
String s = jedis.get(key);
//确保有锁存在并且是自己加的锁
if (StringUtils.isNotBlank(s) & s.equals(val)) {
jedis.del(key);
}
}
}
//检查时间是否规范
private void check(long expireTime, long waitTimeout) {
if (0 >= waitTimeout) {
throw new BusinessException("超时时间有误");
}
if (500 >= expireTime) {
throw new BusinessException("时间应大于500ms");
}
}
业务代码:在业务代码中引入RedisLock即可
@Autowired
RedisLock redisLock;
@Autowired
ArticleService articleService;
@Autowired
OrdersMapper ordersMapper;
@Autowired
UserAccountService userAccountService;
@Override
@Transactional(rollbackFor = Exception.class)
public void order(String userId, String artId, int number) {
String uuid = UUID.randomUUID().toString().replace("-", "");
try {
if (redisLock.tryLock(RedisKey.ORDER.getRemarks(), uuid, 3000, 10000)) {
System.out.println("进来的UUID:" + uuid);
Map articleMap = articleService.selectArticleForUpdate(artId);
if (1 != (int) articleMap.get("status")) {
throw new BusinessException("商品已下架");
}
if (0 > ((int) articleMap.get("number") - number)) {
throw new BusinessException("库存不足");
}
BigDecimal price = (BigDecimal) articleMap.get("price");
try {
if (1 != ordersMapper.addOrders(userId, Serial.getOrderId(userId), artId, price.toPlainString(), number, "下单", TimeTools.getTime())) {
throw new BusinessException("下单失败");
}
articleService.updateOrderNumber(artId);
} catch (Exception e) {
e.printStackTrace();
throw new BusinessException("下单失败");
}
//远程调用扣款
JSONObject body = new JSONObject();
body.put("userId", userId);
body.put("balance", new BigDecimal(number).multiply(price).toPlainString());
body.put("payType", 2);
body.put("type", 1);
body.put("remarks", "用户下单");
body.put("explains", "用户下单说明");
ApiParse.parse(userAccountService.operBal(body.toJSONString()));
} else {
throw new BusinessException("请求超时");
}
} catch (Exception e) {
e.printStackTrace();
if (e.getMessage().contains("库存不足")){
throw new BusinessException("库存不足");
}
throw new BusinessException("未知错误");
} finally {
redisLock.unlock(RedisKey.ORDER.getRemarks(), uuid);
}
}
测试:使用apifox开启压测功能,并发10,轮次30,一共300次调用。
结论:测试多次,结果均为理想结果。
使用jedis实现分布式锁结束,可以拿来直接用。
方式二:使用 jedis+lua脚本
这种方式和使用jedis逻辑上其实是一样的,只是将jedis.set,jedis.expire 两个执行步骤并成了一条,保证了原子性(要么都成功,要么都失败)
此处只贴代码,测试结果就不贴了,我已经测试过了多次,是预期结果。
import com.example.commons.exception.BusinessException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Configuration
public class RedisLuaLock {
@Autowired
@Qualifier("jedisPool")
private JedisPool jedisPool;
private final int DB = 0;
private final String LOCK_KEY = "REDIS:LUA:LOCK:%s";
private volatile boolean running = true;
//加锁
private static final String LOCK_SCRIPT =
"if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then " +
" redis.call('expire', KEYS[1], ARGV[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";
//超时续期
private static final String RENEW_LOCK_SCRIPT =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";
//解锁
private static final String RELEASE_LOCK_SCRIPT =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
private Jedis getJedis(int db) {
Jedis jedis = jedisPool.getResource();
jedis.select(db);
return jedis;
}
/**
* @param key 业务key
* @param expireTime 超时时间/ms
* @param waitTimeout 等待时间,超过等待时间就放弃/ms
* @param val uuid
* jedis.eval(final String script, final List keys, final List args)
* 入参说明:
* script:执行脚本
* keys:key
* args:参数,与脚本对应
* @return
*/
public boolean tryLock(String key, String val, long expireTime, long waitTimeout) {
this.check(expireTime, waitTimeout);
key = String.format(LOCK_KEY, key);
val = Thread.currentThread().getId() + "-" + val;
List args = new ArrayList();
args.add(val);
args.add(String.valueOf(expireTime));
try (Jedis jedis = this.getJedis(DB)) {
//计算等待超时时间,超过时间直接返回获取锁失败
long deadTimeLine = System.currentTimeMillis() + waitTimeout;
//是否继续获取锁
boolean flag = true;
//锁是否获取成功
boolean result = false;
while (flag) {
if (1 == (long) jedis.eval(LOCK_SCRIPT, Collections.singletonList(key), args)) {
flag = false;
result = true;
//todo 守护
System.out.println(val + "加锁成功,开始守护");
this.guardThread(key, expireTime, val);
} else {
waitTimeout = deadTimeLine - System.currentTimeMillis();
// 超过等待时间仍然没有成功获取锁的话就放弃,这里判断大于100是因为下面睡眠了100,如果小于100,直接返回获取锁失败
if (waitTimeout > 100L) {
try {
//防止一直获取锁
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
} else {
flag = false;
}
result = false;
}
}
return result;
}
}
/**
* 看门狗机制,开启守护进程,进行key续期
*
* @param key
* @param expireTime
* @param val
*/
private void guardThread(String key, long expireTime, String val) {
this.running = true;
Thread thread = new Thread(() -> {
System.out.println("进来");
try {
//先睡眠一下,防止一进来就续期
Thread.sleep(expireTime - 500);
} catch (Exception e) {
e.printStackTrace();
return;
}
List args = new ArrayList();
args.add(val);
args.add(String.valueOf(expireTime));
while (this.running) {
try (Jedis jedis = this.getJedis(DB)) {
if (jedis.exists(key)) {
//续期
if (1 != (long) jedis.eval(RENEW_LOCK_SCRIPT, Collections.singletonList(key), args)) {
this.running = false;
} else {
try {
Thread.sleep(expireTime - 500);
} catch (InterruptedException e) {
e.printStackTrace();
this.running = false;
}
}
} else {
this.running = false;
}
} catch (Exception e) {
e.printStackTrace();
this.running = false;
}
}
});
//java中线程分为两种类型:用户线程和守护线程。通过Thread.setDaemon(false)设置为用户线程;通过Thread.setDaemon(true)设置为守护线程。如果不设置次属性,默认为用户线程。
thread.setDaemon(true);
thread.start();
}
private void threadStop() {
this.running = false;
}
public void unlock(String key, String val) {
this.threadStop();
key = String.format(LOCK_KEY, key);
val = Thread.currentThread().getId() + "-" + val;
try (Jedis jedis = this.getJedis(DB)) {
jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(key), Collections.singletonList(val));
}
}
//检查时间是否规范
private void check(long expireTime, long waitTimeout) {
if (0 >= waitTimeout) {
throw new BusinessException("超时时间有误");
}
if (500 >= expireTime) {
throw new BusinessException("时间应大于500ms");
}
}
}
方式三:使用redisson
org.redisson
redisson-spring-boot-starter
3.36.0
@Autowired
RedissonClient redissonClient;
@Override
@Transactional(rollbackFor = Exception.class)
public void orderRedisson(String userId, String artId, int number) {
RLock lock = redissonClient.getLock(RedisKey.ORDER.getRemarks());
try {
// lock.tryLock(30, 15, TimeUnit.SECONDS):尝试加锁,最多等待 30 秒,上锁后 15 秒自动解锁
//如果未指定锁的持有时间(即leaseTime参数为-1或未设置),则Redisson会自动开启看门狗机制。
if (lock.tryLock(30, TimeUnit.SECONDS)) {
//业务
this.business(artId, number, userId);
} else {
throw new BusinessException("请求超时");
}
} catch (Exception e) {
e.printStackTrace();
if (e.getMessage().contains("库存不足")) {
throw new BusinessException("库存不足");
}
throw new BusinessException("未知错误");
} finally {
lock.unlock();
}
}
redisson就比较简单了,不用考虑锁的加锁和解锁以及续期,拿来直接就行,需要注意的是,不管是lock.trylock还是lock.lock,如果不传锁的过期时间,都会有看门狗机制的,都默认会leaseTime赋值了-1,即启动看门狗。
本文结束,项目地址:https://gitee.com/sanmaoalt/cloud-demo-k