Java 使用redis实现分布式锁 Java SpringCloud实现分布式锁 (jedis、jedis+lua、redisson)

实现分布式锁常见的三种方式:database、redis、zookeeper。

本文介绍使用redis实现分布式锁。
在单体架构中,我们常用的锁是 synchronized 和 ReentrantLock,本地锁在单体架构中是完全够用的,因为所有线程都共享一个内存空间 (所有进程都在一个jvm里面),但是在分布式架构中,线程可能是来自不同的jvm,这个时候内存空间是不共享的,对象是不可见的,就会保证不了数据的一致性,这明显不是我们加锁的初衷。
下面我们通过下单的例子(demo代码放到了码云,文章完结附码云地址)来演示一下分布式锁 (下单–扣减商品数量–扣减余额):

首先是没任何锁的情况下:

代码:

原数据:通过接口查出来数据,取任意一条测试,注意number为商品的剩余数量


通过接口查看当前余额为1000

测试开始:
  1. 使用apifox进行压测,并发设置10,每个进程执行30次,一共300次,注意库存变化

  1. 点击开始压测,等待压测完成,调用接口查看库存


发现库存超了,出现了超卖的问题,这就是没加任何锁的情况。

加synchronized锁:

加锁后重复压测步骤,结论还是超卖

加ReentrantLock锁:

重复压测步骤,结论是依然会出现超卖

结论

以上是几种在分布式系统中,使用本地锁的测试情况,结论是在分布式系统中,无论是使用哪种本地锁,都会造成超卖等现象,正好验证了我们刚开始说的那样。本地锁只能对单个进程进行限制,如果是多个进程,就麻瓜了。

使用redis实现分布式锁:

进入本文正题,如何使用redis实现分布式锁?其实不论是使用redis,数据库还是其他的方式,大致的思路都是不会变的,首先应该想的是实现分布式锁应该考虑哪些问题?我总结了以下几点:

  1. 单线程访问(同一时刻只能有一个线程持有锁,即加锁、解锁)。
  2. 自己加的锁必须得自己释放。
  3. 设置超时时间、实现超时续时。
  4. 高可用:主从节点或集群,主要为了防止节点实现,锁一直持有不释放造成死锁。 综合考虑到以上几点问题,分布式锁大致就可以实现了。
方式一:使用jedis

我们第一个想到的可能就是jedis中有一个setnx和expire两个方法,直接调用这两个方法去设置一下不就行了?但是这里需要注意的是可能会出现一种情况,setnx设置键值成功了,但是设置expire超时时间失败了,那就会一直持有锁,当然在设置超时时间之后调用jedis.ttl的方法来查询超时时间是否设置成功,如果没成功的话再次设置,这样的话也不是不行,只是需要考虑,万一再次设置也失败了,那就尬住了。所以就只能两个一个设置,要么都成功,要么失败,两种实现方式:1、使用jedis的set 。2、使用lua脚本来保证原子性。在这里我们采用第一种方式,脚本的方式在下面。

先贴代码


 redis.clients
 jedis
 4.2.3
//初始化jedisPool,host、port是从配置文件获取的
@Configuration
public class InitJedisPool {
 @Value("${spring.redis.host}")
 private String host;
 @Value("${spring.redis.port}")
 private int post;
 @Value("${spring.redis.password}")
 private String password;
 @Bean("jedisPool")
 public JedisPool initJedisPool() {
 JedisPoolConfig config = new JedisPoolConfig();
 //资源池中最大连接数
 config.setMaxTotal(100);
 //资源池中最大空闲连接数
 config.setMaxIdle(100);
 //资源池确保的最少空闲连接数
 config.setMinIdle(10);
 //是否开启空闲检测
 config.setTestWhileIdle(true);
 //向资源池借用连接时是否做连接有效性检测(ping)。检测到的无效连接将会被移除。
 config.setTestOnBorrow(true);
 //向资源池归还连接时是否做连接有效性检测(ping)。检测到无效连接将会被移除。
 config.setTestOnReturn(true);
 //做空闲资源检测时,每次检测资源的个数。可根据自身应用连接数进行微调,如果设置为 -1,就是对所有连接做空闲监测。
 config.setNumTestsPerEvictionRun(-1);
 //不加此句会报错pool2 的bean已经注册
 config.setJmxEnabled(false);
 if (StringUtils.isNotBlank(password)) {
 return new JedisPool(config, host, post, 2000, password);
 } else {
 return new JedisPool(config, host, post, 2000);
 }
 }
}
//使用jedis实现redis分布式锁,思路是将线程ID和uuid组合成val放进redis,如果设置成功了,就开启一个守护去给key续期(可能会存在业务还没处理完,但是key过期了,就得给key续期)
//业务处理完后,停止守护线程,判断是否是自己加的锁,如果是就删除key
@Component
public class RedisLock {
 @Autowired
 @Qualifier("jedisPool")
 private JedisPool jedisPool;
 private final int DB = 0;
 private final String LOCK_KEY = "REDIS:LOCK:%s";
 private volatile boolean running = true;
 private Jedis getJedis(int db) {
 Jedis jedis = jedisPool.getResource();
 jedis.select(db);
 return jedis;
 }
 /**
 * @param key 业务key
 * @param expireTime 超时时间/ms
 * @param waitTimeout 等待时间,超过等待时间就放弃/ms
 * @param val uuid
 * 键值对的value值,一般为线程ID+uuid,在做删除操作时拿来对比
 * SetParams
 * ex: 设置键值的过期时间(单位为秒)。
 * px: 设置键值的过期时间(单位为毫秒)。
 * nx: 仅在键不存在时设置键值。
 * xx: 仅在键已经存在时更新键值。 
 * @return boolean true加锁成功 false加锁失败
 */
 public boolean tryLock(String key, String val, long expireTime, long waitTimeout) {
 this.check(expireTime, waitTimeout);
 key = String.format(LOCK_KEY, key);
 val = Thread.currentThread().getId() + "-" + val;
 try (Jedis jedis = this.getJedis(DB)) {
 //计算等待超时时间,超过时间直接返回获取锁失败
 long deadTimeLine = System.currentTimeMillis() + waitTimeout;
 //是否继续获取锁
 boolean flag = true;
 //锁是否获取成功
 boolean result = false;
 while (flag) {
 SetParams setParams = new SetParams();
 setParams.nx();
 setParams.ex(expireTime);
 //成功返回OK,否则返回null
 String res = jedis.set(key, val, setParams);
 if ("OK".equals(res)) {
 flag = false;
 result = true;
 System.out.println(val + "加锁成功,开始守护");
 this.guardThread(key, expireTime);
 } else {
 waitTimeout = deadTimeLine - System.currentTimeMillis();
 // 超过等待时间仍然没有成功获取锁的话就放弃,这里判断大于100是因为下面睡眠了100,如果小于100,直接返回获取锁失败
 if (waitTimeout > 100L) {
 try {
 //防止一直获取锁
 Thread.sleep(100);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 flag = true;
 } else {
 flag = false;
 }
 result = false;
 }
 }
 return result;
 }
 }
 /**
 * 看门狗机制,开启守护进程,进行key续期
 *
 * @param key
 * @param expireTime
 */
 private void guardThread(String key, long expireTime) {
 Thread thread = new Thread(() -> {
 try {
 //先睡眠一下,防止一进来就续期
 Thread.sleep(expireTime - 500);
 } catch (Exception e) {
 e.printStackTrace();
 return;
 }
 while (this.running) {
 try (Jedis jedis = this.getJedis(DB)) {
 if (jedis.exists(key)) {
 //续期
 System.out.println(System.currentTimeMillis());
 System.out.println(key + ":续期");
 jedis.expire(key, expireTime);
 //查询key是否设置了过期时间 -2为key不存在 -1为key存在但是没有过期时间 是否以秒为单位返回剩余时间,如果为-1再次设置
 if (-1 == jedis.ttl(key)) {
 jedis.expire(key, expireTime);
 }
 //超过两次设置失败就不守护了
 if (-1 == jedis.ttl(key)) {
 this.running = false;
 } else {
 try {
 Thread.sleep(expireTime - 500);
 } catch (InterruptedException e) {
 e.printStackTrace();
 this.running = false;
 }
 }
 } else {
 this.running = false;
 }
 } catch (Exception e) {
 e.printStackTrace();
 this.running = false;
 }
 }
 });
 //java中线程分为两种类型:用户线程和守护线程。通过Thread.setDaemon(false)设置为用户线程;通过Thread.setDaemon(true)设置为守护线程。如果不设置次属性,默认为用户线程。
 thread.setDaemon(true);
 thread.start();
 }
 private void threadStop() {
 this.running = false;
 }
 public void unlock(String key, String val) {
 this.threadStop();
 key = String.format(LOCK_KEY, key);
 val = Thread.currentThread().getId() + "-" + val;
 try (Jedis jedis = this.getJedis(DB)) {
 String s = jedis.get(key);
 //确保有锁存在并且是自己加的锁
 if (StringUtils.isNotBlank(s) & s.equals(val)) {
 jedis.del(key);
 }
 }
 }
 //检查时间是否规范
 private void check(long expireTime, long waitTimeout) {
 if (0 >= waitTimeout) {
 throw new BusinessException("超时时间有误");
 }
 if (500 >= expireTime) {
 throw new BusinessException("时间应大于500ms");
 }
 }

业务代码:在业务代码中引入RedisLock即可

@Autowired
 RedisLock redisLock;
 @Autowired
 ArticleService articleService;
 @Autowired
 OrdersMapper ordersMapper;
 @Autowired
 UserAccountService userAccountService;
 @Override
 @Transactional(rollbackFor = Exception.class)
 public void order(String userId, String artId, int number) {
 String uuid = UUID.randomUUID().toString().replace("-", "");
 try {
 if (redisLock.tryLock(RedisKey.ORDER.getRemarks(), uuid, 3000, 10000)) {
 System.out.println("进来的UUID:" + uuid);
 Map articleMap = articleService.selectArticleForUpdate(artId);
 if (1 != (int) articleMap.get("status")) {
 throw new BusinessException("商品已下架");
 }
 if (0 > ((int) articleMap.get("number") - number)) {
 throw new BusinessException("库存不足");
 }
 BigDecimal price = (BigDecimal) articleMap.get("price");
 try {
 if (1 != ordersMapper.addOrders(userId, Serial.getOrderId(userId), artId, price.toPlainString(), number, "下单", TimeTools.getTime())) {
 throw new BusinessException("下单失败");
 }
 articleService.updateOrderNumber(artId);
 } catch (Exception e) {
 e.printStackTrace();
 throw new BusinessException("下单失败");
 }
 //远程调用扣款
 JSONObject body = new JSONObject();
 body.put("userId", userId);
 body.put("balance", new BigDecimal(number).multiply(price).toPlainString());
 body.put("payType", 2);
 body.put("type", 1);
 body.put("remarks", "用户下单");
 body.put("explains", "用户下单说明");
 ApiParse.parse(userAccountService.operBal(body.toJSONString()));
 } else {
 throw new BusinessException("请求超时");
 }
 } catch (Exception e) {
 e.printStackTrace();
 if (e.getMessage().contains("库存不足")){
 throw new BusinessException("库存不足");
 }
 throw new BusinessException("未知错误");
 } finally {
 redisLock.unlock(RedisKey.ORDER.getRemarks(), uuid);
 }
 }

测试:使用apifox开启压测功能,并发10,轮次30,一共300次调用。

结论:测试多次,结果均为理想结果。

使用jedis实现分布式锁结束,可以拿来直接用。

方式二:使用 jedis+lua脚本

这种方式和使用jedis逻辑上其实是一样的,只是将jedis.set,jedis.expire 两个执行步骤并成了一条,保证了原子性(要么都成功,要么都失败)
此处只贴代码,测试结果就不贴了,我已经测试过了多次,是预期结果。

import com.example.commons.exception.BusinessException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Configuration
public class RedisLuaLock {
 @Autowired
 @Qualifier("jedisPool")
 private JedisPool jedisPool;
 private final int DB = 0;
 private final String LOCK_KEY = "REDIS:LUA:LOCK:%s";
 private volatile boolean running = true;
 //加锁
 private static final String LOCK_SCRIPT =
 "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then " +
 " redis.call('expire', KEYS[1], ARGV[2]) " +
 " return 1 " +
 "else " +
 " return 0 " +
 "end";
 //超时续期
 private static final String RENEW_LOCK_SCRIPT =
 "if redis.call('get', KEYS[1]) == ARGV[1] then " +
 " return redis.call('expire', KEYS[1], ARGV[2]) " +
 "else " +
 " return 0 " +
 "end";
 //解锁
 private static final String RELEASE_LOCK_SCRIPT =
 "if redis.call('get', KEYS[1]) == ARGV[1] then " +
 " return redis.call('del', KEYS[1]) " +
 "else " +
 " return 0 " +
 "end";
 private Jedis getJedis(int db) {
 Jedis jedis = jedisPool.getResource();
 jedis.select(db);
 return jedis;
 }
 /**
 * @param key 业务key
 * @param expireTime 超时时间/ms
 * @param waitTimeout 等待时间,超过等待时间就放弃/ms
 * @param val uuid
 * jedis.eval(final String script, final List keys, final List args)
 * 入参说明:
 * script:执行脚本
 * keys:key
 * args:参数,与脚本对应
 * @return
 */
 public boolean tryLock(String key, String val, long expireTime, long waitTimeout) {
 this.check(expireTime, waitTimeout);
 key = String.format(LOCK_KEY, key);
 val = Thread.currentThread().getId() + "-" + val;
 List args = new ArrayList();
 args.add(val);
 args.add(String.valueOf(expireTime));
 try (Jedis jedis = this.getJedis(DB)) {
 //计算等待超时时间,超过时间直接返回获取锁失败
 long deadTimeLine = System.currentTimeMillis() + waitTimeout;
 //是否继续获取锁
 boolean flag = true;
 //锁是否获取成功
 boolean result = false;
 while (flag) {
 if (1 == (long) jedis.eval(LOCK_SCRIPT, Collections.singletonList(key), args)) {
 flag = false;
 result = true;
 //todo 守护
 System.out.println(val + "加锁成功,开始守护");
 this.guardThread(key, expireTime, val);
 } else {
 waitTimeout = deadTimeLine - System.currentTimeMillis();
 // 超过等待时间仍然没有成功获取锁的话就放弃,这里判断大于100是因为下面睡眠了100,如果小于100,直接返回获取锁失败
 if (waitTimeout > 100L) {
 try {
 //防止一直获取锁
 Thread.sleep(100);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 flag = true;
 } else {
 flag = false;
 }
 result = false;
 }
 }
 return result;
 }
 }
 /**
 * 看门狗机制,开启守护进程,进行key续期
 *
 * @param key
 * @param expireTime
 * @param val
 */
 private void guardThread(String key, long expireTime, String val) {
 this.running = true;
 Thread thread = new Thread(() -> {
 System.out.println("进来");
 try {
 //先睡眠一下,防止一进来就续期
 Thread.sleep(expireTime - 500);
 } catch (Exception e) {
 e.printStackTrace();
 return;
 }
 List args = new ArrayList();
 args.add(val);
 args.add(String.valueOf(expireTime));
 while (this.running) {
 try (Jedis jedis = this.getJedis(DB)) {
 if (jedis.exists(key)) {
 //续期
 if (1 != (long) jedis.eval(RENEW_LOCK_SCRIPT, Collections.singletonList(key), args)) {
 this.running = false;
 } else {
 try {
 Thread.sleep(expireTime - 500);
 } catch (InterruptedException e) {
 e.printStackTrace();
 this.running = false;
 }
 }
 } else {
 this.running = false;
 }
 } catch (Exception e) {
 e.printStackTrace();
 this.running = false;
 }
 }
 });
 //java中线程分为两种类型:用户线程和守护线程。通过Thread.setDaemon(false)设置为用户线程;通过Thread.setDaemon(true)设置为守护线程。如果不设置次属性,默认为用户线程。
 thread.setDaemon(true);
 thread.start();
 }
 private void threadStop() {
 this.running = false;
 }
 public void unlock(String key, String val) {
 this.threadStop();
 key = String.format(LOCK_KEY, key);
 val = Thread.currentThread().getId() + "-" + val;
 try (Jedis jedis = this.getJedis(DB)) {
 jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(key), Collections.singletonList(val));
 }
 }
 //检查时间是否规范
 private void check(long expireTime, long waitTimeout) {
 if (0 >= waitTimeout) {
 throw new BusinessException("超时时间有误");
 }
 if (500 >= expireTime) {
 throw new BusinessException("时间应大于500ms");
 }
 }
}
方式三:使用redisson

 org.redisson
 redisson-spring-boot-starter
 3.36.0
 
@Autowired
 RedissonClient redissonClient;
 @Override
 @Transactional(rollbackFor = Exception.class)
 public void orderRedisson(String userId, String artId, int number) {
 RLock lock = redissonClient.getLock(RedisKey.ORDER.getRemarks());
 try {
 // lock.tryLock(30, 15, TimeUnit.SECONDS):尝试加锁,最多等待 30 秒,上锁后 15 秒自动解锁
 //如果未指定锁的持有时间(即leaseTime参数为-1或未设置),则Redisson会自动开启看门狗机制。
 if (lock.tryLock(30, TimeUnit.SECONDS)) {
 //业务
 this.business(artId, number, userId);
 } else {
 throw new BusinessException("请求超时");
 }
 } catch (Exception e) {
 e.printStackTrace();
 if (e.getMessage().contains("库存不足")) {
 throw new BusinessException("库存不足");
 }
 throw new BusinessException("未知错误");
 } finally {
 lock.unlock();
 }
 }

redisson就比较简单了,不用考虑锁的加锁和解锁以及续期,拿来直接就行,需要注意的是,不管是lock.trylock还是lock.lock,如果不传锁的过期时间,都会有看门狗机制的,都默认会leaseTime赋值了-1,即启动看门狗。

本文结束,项目地址:https://gitee.com/sanmaoalt/cloud-demo-k

作者:00oo00oo00原文地址:https://blog.csdn.net/weixin_45410882/article/details/141824420

%s 个评论

要回复文章请先登录注册