分布式架构中通常都会存在共享资源,在多个服务或一个服务多个实例同时对此共享资源进行读写操作情况下,会存在数据不一致性,就需要分布式锁来解决此问题。
实现分布式锁通常会借助外部组件,主要有四种实现方式,基于 Redis 实现,基于 Zookeeper 实现,直接使用 Redisson 已实现的分布式锁,Google 的 Chubby 服务。
锁
在计算机科学中,锁(lock)或互斥(mutex)是一种同步机制,用于在多线程环境中强制对共享资源的访问限制。锁旨在强制实施互斥排他、并发控制策略。
Java JDK 有 Lock,synchronized 提供锁功能。数据库 MySQL 有 for update
悲观锁,基于 CAS(Compare and Swap) 实现的乐观锁。
分布式锁
在分布式环境中,一个服务多个实例部署在不同机器上,多实例以不同的进程对共享资源进操作,为确保数据一致性,必须以互斥的方式执行,就需要借助外部组件实现分布式锁来保证不同进程之间的互斥性。
分布式锁是控制分布式系统之间同步访问共享资源的一种方式,用于协调不同服务对共享资源的写操作,通过互斥的方式来防止不同服务之间的干扰来保证一致性。
分布式锁理解
在同一进程里的多线程若要对共享资源进行操作,就需要加锁来让线程排队处理,最简单粗暴的处理是增加一个公共状态变量,线程一直循环该变量值判断是否执行操作,不同线程对该变量的判断是互斥的。
下面只是简单的伪代码演示:
thread-1
boolean flag = true;
while(flag){
.........
flag = false;
}
thread-2
while(!flag){
.........
flag = true;
}
分布式锁实际也是基于此思路来设计方案的,但因为是分布式环境,就需要把该 公共状态变量 放到外部存储,考虑性能问题,通常把此公共状态变量放到内存级的数据存储系统,如 Redis。通过设置和获取公共状态变量的值是判断是否执行后续的处理,相当于多进程之间是互斥的。
当然,分布锁还有其它特性是需要关注的,在实现上因为要考滤多种情况,实现逻辑会稍显复杂些,如下。
分布式锁特性
分布式锁方案必须考虑以下特性:
- 确保互斥:多线程或多服务实例之间必须是互斥的,锁的基本特性,否则就不叫锁了。
- 避免死锁:如果一个线程或服务获得资源锁,但后面挂了,并没有释放锁,导致其它线程或服务永远无法获得锁而进入无限等待,这就会出现死锁,必须做到避免死锁。
- 保证性能:高并发分布式环境中,线程互斥等待会成为性能瓶颈,在分布式锁实现的中间件和方案上需要保证性能。
- 锁扩展特性:基于分布式环境的复杂场景,分布式锁不能只是加锁,然后一直等待。最好实现 Java JDK Lock 的一些功能,如:锁判断,超时设置,可重入性等。
基于 Redis 实现的分布式锁,官方总结了三个属性,这些属性是有效使用分布式锁所需的最低保证:
- 安全属性:互斥性。在任何给定时刻,只有一个客户端可以持有锁。
- 活力属性A:无死锁。即使锁定资源的客户端崩溃或被分区,也始终可以获取锁定。
- 活力属性B:容错。只要大多数 Redis 节点启动,客户端就能够获取或释放锁。
实现方案
Redisson
Redisson 是一款非常优秀的,功能非常强大的开源的 Redis Java 客户端,基于高性能异步和无锁 Java Redis 客户端和 Netty 框架。
Redisson 实现的分布式是也是 Redis 官方推荐使用的 Java 实现的分布式锁。Redisson - GitHub,Rediss Distributed Lock 文档。
集成 Redisson
目前新建的应用大多都会基于 Spring Boot 来开始,同时 Redisson 为 Spring Boot 提供了 Starter 包(redisson-spring-boot-starter),这里以此为示例。或直接在项目中引入 redisson
依赖,具体参考 Redisson ReadMe > Quick Start。
项目添加
redisson-spring-boot-starter
依赖Maven pom.xml
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.11.2</version> </dependency>
添加 Redis 属性设置
# common spring boot settings spring.redis.database= spring.redis.host= spring.redis.port= spring.redis.password= spring.redis.ssl= spring.redis.timeout= spring.redis.cluster.nodes= spring.redis.sentinel.master= spring.redis.sentinel.nodes= # Redisson settings #path to redisson.yaml or redisson.json spring.redis.redisson.config=classpath:redisson.yaml
使用实现了 RedissonClient 接口 或 RedisTemplate / ReactiveRedisTemplate 的 Spring Bean。
最简单使用
RLock lock = redisson.getLock("anyLock"); // 最常见的使用方法 lock.lock();
指定加锁时间
如果负责储存分布式锁的 Redisson 节点宕机以后,而且这个锁正好处于锁定的状态时,这个锁会出现锁死的状态。
为了避免这种情况的发生,Redisson 内部提供了一个监控锁的看门狗,它的作用是在 Redisson 实例被关闭前,不断的延长锁的有效期。
默认情况下,看门狗的检查锁的超时时间是 30 秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。
另外 Redisson 还通过加锁的方法提供了
leaseTime
的参数来指定加锁的时间。超过这个时间后锁便自动解开了。// 加锁以后10秒钟自动解锁 // 无需调用unlock方法手动解锁 lock.lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }
Redisson 同时还为分布式锁提供了异步执行的相关方法:
RLock lock = redisson.getLock("anyLock"); lock.lockAsync(); lock.lockAsync(10, TimeUnit.SECONDS); Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
Redisson 分布式锁类型
Redisson 还提供了其它类型的锁,具体可参考官方文档 Rediss Distributed Lock 文档。
公平锁(Fair Lock)
保证当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson 会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
RLock fairLock = redisson.getFairLock("anyLock"); // 最常见的使用方法 fairLock.lock();
联锁(MultiLock)
基于 Redis 的 Redisson 分布式 联锁
RedissonMultiLock
对象可以将多个RLock
对象关联为一个联锁,每个RLock
对象实例可以来自于不同的 Redisson 实例。RLock lock1 = redissonInstance1.getLock("lock1"); RLock lock2 = redissonInstance2.getLock("lock2"); RLock lock3 = redissonInstance3.getLock("lock3"); RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); // 同时加锁:lock1 lock2 lock3 // 所有的锁都上锁成功才算成功。 lock.lock(); ... lock.unlock();
红锁(RedLock)
基于 Redis 的 Redisson 红锁
RedissonRedLock
对象实现了 Redlock 介绍的加锁算法。该对象也可以用来将多个RLock
对象关联为一个红锁,每个RLock
对象实例可以来自于不同的 Redisson 实例。RLock lock1 = redissonInstance1.getLock("lock1"); RLock lock2 = redissonInstance2.getLock("lock2"); RLock lock3 = redissonInstance3.getLock("lock3"); RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); // 同时加锁:lock1 lock2 lock3 // 红锁在大部分节点上加锁成功就算成功。 lock.lock(); ... lock.unlock();
读写锁(ReadWriteLock)
基于 Redis 的 Redisson 分布式可重入读写锁
RReadWriteLock
Java对象实现了java.util.concurrent.locks.ReadWriteLock
接口。其中读锁和写锁都继承了 RLock 接口。分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock"); // 最常见的使用方法 rwlock.readLock().lock(); // 或 rwlock.writeLock().lock();
信号量(Semaphore)
基于Redis的Redisson的分布式信号量(Semaphore)Java对象
RSemaphore
采用了与java.util.concurrent.Semaphore
相似的接口和用法。RSemaphore semaphore = redisson.getSemaphore("semaphore"); semaphore.acquire(); //或 semaphore.acquireAsync(); semaphore.acquire(23); semaphore.tryAcquire(); //或 semaphore.tryAcquireAsync(); semaphore.tryAcquire(23, TimeUnit.SECONDS); //或 semaphore.tryAcquireAsync(23, TimeUnit.SECONDS); semaphore.release(10); semaphore.release(); //或 semaphore.releaseAsync();
可过期性信号量(PermitExpirableSemaphore)
基于 Redis 的 Redisson 可过期性信号量(PermitExpirableSemaphore)是在
RSemaphore
对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); String permitId = semaphore.acquire(); // 获取一个信号,有效期只有2秒钟。 String permitId = semaphore.acquire(2, TimeUnit.SECONDS); // ... semaphore.release(permitId);
闭锁(CountDownLatch)
基于 Redisson 的 Redisson 分布式闭锁(CountDownLatch)Java对象
RCountDownLatch
采用了与java.util.concurrent.CountDownLatch
相似的接口和用法。RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.trySetCount(1); latch.await(); // 在其他线程或其他JVM里 RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.countDown();
RedissonLock 类图
RedissonLock 实现了 RLock,RLock 继承了 Java JDK 的 Lock 接口,并提供了锁过期策略处理。
Redis SET NX
基于 Redis 的 SET NX 命令实现分布锁,可以先了解下 setnx, expire, set/get
命令。
基于 Redis 实现的分布式锁是对 Redis SETNX 命令 和 SET key value [EX second] [PX milliseconds] [NX | XX] 命令的充分应用。
分布式锁核心步骤
Redis 实现分布式锁,依赖 SETNX 命令 或 SET 命令结合 NX 属性的使用。
加锁
最简单的方法是使用 setnx 命令。key 是锁的唯一标识,按业务来决定命名,例如给秒杀活动,将参于秒杀产品总数写入到 Redis,对减库存操作加锁, key 命名为
locak_sale_goodsId
,value 简单设置为 1。加锁伪代码如下:setnx(key,1):当一个线程执行 setnx 返回 1 ,说明 key 原本不存在,则该线程成功获得锁(加锁);当返回 0 时,说明 key 已存在,该线程独得锁失败。
解锁
有加锁,就得有解锁。当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。锁释放的最简单方式是执行 DEL 指令,伪代码如下:
del(key):释放锁之后,其他线程可以继续执行 setnx 命令来获得锁。
锁超时
如果一个得到锁的线程在执行任务过程中挂掉,来不及释放所持有的锁,则别的线程再也别想获得锁,就会无限等待,进入死锁状态。
所以 SETNX 的 KEY 必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。SETNX 命令不支持超时参数,就需要借助 EXPIRE(key,value) 命令,伪代码 expire(key, 30)。
综合以上操作的伪代码如下:
if(setnx(key,1) == 1){ expire(key,30); try{ do something...... }finally{ del(key) } }
分布式锁步骤完善
上面的基本步骤实现的分布式锁是存在缺陷的,也就不能满足分布式锁的特性了。
加锁由 setnx 和 exipre 两步操作,但是非原子性的。
一个极端情况是,在执行 setnx 时,成功得到了锁,但刚执行成功,正准备执行 exipre 命令,线程挂了,这把锁就没有设置过期时间,变得 长生不老,另的线程再也无法获得锁了。
解决:因 setnx 命令本身不支持传入超时时间,可以改为使用 set 命令,传入过期时间和 NX 参数,伪代码如下:
set(key, 1, 30, NX):即把 setnx 和 expire 两条命令合为一条来执行。
备注:也可以使用 Lua 脚本来执行 setnx 和 expire 两条命令,Lua 脚本是原子性的。
del 导致误删除
另一个极端场景,假如某个线程A成功获得了锁,前且设置了超时时间,但因某些原因而执行的很慢,执行耗时已经达到了超时时间,这时候锁过期自动释放,其他线程B得到了锁。
随后线程 A 执行完任务,接着执行 del 指令来释放锁。但这时候线程B 还在执行任务,线程A 实际上删除的是线程B加的锁。
解决:为了避免这种情况,可以在 del 释放锁之前判断当前锁是不是自己加的锁。具体实现可以把当前的 线程ID 当做 value,在删除之前验证 key 对应的 value 是不是自己线程的ID。伪代码如下:
加锁:
String threadId = Thread.currentThread().getId(); set(key,threadId,30,NX);
解锁:
if(threadId.equals(redisClient.get(key))){ del(key); }
这里又会隐含一个新的问题,判断和释放锁是两个独立操作,不是原子性的,所以这块改为用 Lua 脚本实现,Lua 脚本是原子性的。
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; redisClient.eval(luaScript , Collections.singletonList(key), Collections.singletonList(threadId));
这样修改的话,验证和删除过程就是原子操作了。
出现并发的可能性
还是上面第二个问题场景,如果线程 A 执行需要较长时间,基于锁互斥性,锁应仍被线程A 占用,并发的线程B进来并不能获得锁,但 线程A 执行所需时长超过了过期时间,就需要对快过期的锁续期。
解决:可以让获得锁的线程开启一个守护线程,用于给快要过期的锁 续期。通常会在过期时间过去了 2/3 时间时(默认过期 30秒,2/3 时间即过去了 20秒),让守护线程执行 expire 指令,为这把锁续期 2/3 过期时长(续期 20 秒,等于重新设置过期时间),守护线程从第 2/3 时间开始执行(每次从第 20 秒 开始执行续期)。
另一种情况,如果节点 1 忽略崩溃或断电,由于线程A 和 守护线程在同一个进程,过护线程也会停下。锁到了超时时间,没有续期,也就会自动释放。
Redis分布式锁实现
基于上面描述的思路,基于 Redis 分布式锁就不难实现了。以下示例代码基于 Spring Boot 和 RedisTemplate 操作。
Redis 配置类
@Configuration public class RedisConfig { /** * redis默认使用jdk的二进制数据来序列化 * 以下自定义使用 FastJson 来序列化 * * @param redisConnectionFactory * @return RedisTemplate */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(redisConnectionFactory); FastJsonRedisSerializer serializer = new FastJsonRedisSerializer(Object.class); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(serializer); template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(serializer); template.afterPropertiesSet(); return template; } }
Redis 分布式锁操作类
@Configuration public class RedisDistributedLock { private static final Boolean LOCK_SUCCESS = true; private static final Long RELEASE_SUCCESS = 1L; @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 加锁 * * @param lockKey * @param lockValue 通常为 threadId 或 requestId * @param expireTime * @return 是否加锁成功 */ public boolean tryToLock(String lockKey, String lockValue, int expireTime) { //set(key,value,expire_time,nx),如果不存在则设置,成功返回true,表示加锁成功 Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.SECONDS); boolean result = LOCK_SUCCESS.equals(lockResult); if (result) { RedisRenewalDaemonThread renewalDaemonThread = new RedisRenewalDaemonThread(redisTemplate, lockKey, expireTime); renewalDaemonThread.setDaemon(true); renewalDaemonThread.start(); } return result; } /** * 释放锁 * * @param lockKey * @param lockValue 通常为 threadId 或 requestId * @return 是否释放成功 */ public boolean releaseLockByLua(String lockKey, String lockValue) { //释放锁 lua 脚本, String luaScript = "if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end"; //注意要设置脚本执行返回类型 RedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class); List<String> keyList = new ArrayList<>(); keyList.add(lockKey); Long releaseResult = redisTemplate.execute(redisScript, keyList, lockValue); return RELEASE_SUCCESS.equals(releaseResult); } /** * 释放锁(问题:判断锁和释放锁是两个操作,是非原子性的) * * @param lockKey * @param lockValue 通常为 threadId 或 requestId * @return 是否释放成功 */ public boolean releaseLock(String lockKey, String lockValue) { String value = (String) redisTemplate.opsForValue().get(lockKey); if (lockValue.equals(value)) { return redisTemplate.delete(lockKey); } return false; } }
Redis 分布式锁 Key 过期续期守护线程类
public class RedisRenewalDaemonThread extends Thread { private String lockKey; private long expireTime; private RedisTemplate redisTemplate; public RedisRenewalDaemonThread() { } public RedisRenewalDaemonThread(RedisTemplate redisTemplate, String lockKey, long expireTime) { this.redisTemplate = redisTemplate; this.lockKey = lockKey; this.expireTime = expireTime; } @Override public void run() { long renew = this.expireTime / 3; //如果过期时间很短,小于3秒 if (renew == 0) { renew = 1L; } System.out.println("rest expire: " + renew); while (true) { try { long expire = redisTemplate.getExpire(lockKey); System.out.println(expire); //如果剩余过期时间小于等于三分之一,则续期。 //注意,这里是小于等于,若只是小于,expire 可能出现负数 if (expire <= renew) { redisTemplate.expire(lockKey, expireTime, TimeUnit.SECONDS); } //请求间隔,降低无效请求频率 Thread.sleep(renew * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
测试分布式锁
@RunWith(SpringRunner.class) @SpringBootTest public class RedisLockApplicationTests { @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private RedisDistributedLock redisDistributedLock; @Test public void testToLock() throws InterruptedException { String lockKey = "goods_id_10002"; String lockValue = String.valueOf(System.currentTimeMillis()); try { boolean lockResult = redisDistributedLock.tryToLock(lockKey, lockValue, 2); System.out.println("lock result: " + lockResult); Thread.sleep(3 * 1000); } finally { boolean releaseResult = redisDistributedLock.releaseLockByLua(lockKey, lockValue); System.out.println("release result: " + releaseResult); } } }
执行结果
lock result: true rest expire: 1 2 1 1 release result: true
Redis 解锁通知
在并发情况下,没有拿到锁的线程可能会采用自旋的方式(while(true))循环请求来获取锁,这种方式是会浪费 CPU 资源。
解决:
在释放锁后,同时发送锁释放通知( lpush )到一个 Redis List(队列),线程在加锁之前,先执行队列阻塞命令( brpop )从队列中获取锁释放通知,再去加锁。
使用阻塞命令必须指定超时时间,不能无限等待,超时到了同样去加锁。
Redis 阻塞队列,是谁先阻塞,就谁先执行,并且只被执行一次。
相关参考
注意:本文归作者所有,未经作者允许,不得转载