万隆的笔记 万隆的笔记
博文索引
笔试面试
  • 在线学站

    • 菜鸟教程 (opens new window)
    • 入门教程 (opens new window)
    • Coursera (opens new window)
  • 在线文档

    • w3school (opens new window)
    • Bootstrap (opens new window)
    • Vue (opens new window)
    • 阿里开发者藏经阁 (opens new window)
  • 在线工具

    • tool 工具集 (opens new window)
    • bejson 工具集 (opens new window)
    • 文档转换 (opens new window)
  • 更多在线资源
  • Changlog
  • Aboutme
GitHub (opens new window)
博文索引
笔试面试
  • 在线学站

    • 菜鸟教程 (opens new window)
    • 入门教程 (opens new window)
    • Coursera (opens new window)
  • 在线文档

    • w3school (opens new window)
    • Bootstrap (opens new window)
    • Vue (opens new window)
    • 阿里开发者藏经阁 (opens new window)
  • 在线工具

    • tool 工具集 (opens new window)
    • bejson 工具集 (opens new window)
    • 文档转换 (opens new window)
  • 更多在线资源
  • Changlog
  • Aboutme
GitHub (opens new window)
  • Redis

  • 集群架构

    • Redis主从模式
    • Redis Sentinel 集群部署
    • Redis集群分区
    • 缓存架构设计
    • 分布式缓存问题
    • Redis分布式锁
      • 分布式锁特性
      • Redis乐观锁
      • Redis分布式锁
      • Redission分布式锁的使用
      • 分布式锁的实际应用
      • 分布式锁的对比
  • Redis
  • 集群架构
2022-04-01
目录

Redis分布式锁

# Redis分布式锁

Redis可以利用watch实现乐观锁,利用setnx实现分布式锁。

# 分布式锁特性

  • 互斥性:任意时刻,只能有一个客户端获取锁,不能同时有两个客户端获取到锁。
  • 同一性:锁只能被持有该锁的客户端删除,不能由其它客户端删除。
  • 可重入性:持有某个锁的客户端可继续对该锁加锁,即实现锁的续租。
  • 容错性:锁失效后(超过生命周期)自动释放锁(key失效),其他客户端可以继续获得该锁,防止死锁。

# Redis乐观锁

原理:Redis利用watch实现Redis乐观锁。

乐观锁基于CAS(Compare And Swap,比较并替换)思想,不具有互斥性,不会产生锁等待而消耗资源,但是需要反复的重试,但也是因为重试的机制,能比较快的响应。乐观锁还有的一个概念就是“带版本更新”,因此我们可以利用redis以及提供的watch指令来实现乐观锁。具体思路如下:

  1. 利用redis的watch功能,监控这个redisKey的状态值
  2. 获取redisKey的值
  3. 创建redis事务
  4. 给这个key的值+1
  5. 然后去执行这个事务,如果key的值被修改过则回滚,key不会加1。

可以使用Redis乐观锁,实现秒杀,DEMO如下:

public class Second { 
  public static void main(String[] arg) { 
    String redisKey = "lock"; 
    ExecutorService executorService = Executors.newFixedThreadPool(20); 
    try {
      Jedis jedis = new Jedis("127.0.0.1", 6379); 
      // 初始值 
      jedis.set(redisKey, "0"); 
      jedis.close(); 
    } catch (Exception e) { 
      e.printStackTrace(); 
    }
    for (int i = 0; i < 1000; i++) { 
      executorService.execute(() -> { 
        Jedis jedis1 = new Jedis("127.0.0.1", 6379); 
        try {
          jedis1.watch(redisKey);
          String redisValue = jedis1.get(redisKey); 
          int valInteger = Integer.valueOf(redisValue); 
          String userInfo = UUID.randomUUID().toString();
          // 没有秒完 
          if (valInteger < 20) { 
            Transaction tx = jedis1.multi(); 
            tx.incr(redisKey); 
            List list = tx.exec(); 
            // 秒成功,失败返回空list而不是空 
            if (list != null && list.size() > 0) { 
              System.out.println("用户:" + userInfo + ",秒杀成功! 当前成功人数:" + (valInteger + 1)); 
            }
            // 版本变化,被别人抢了。 
            else {
              System.out.println("用户:" + userInfo + ",秒杀失败"); 
            } 
          }
          // 秒完了 
          else {
            System.out.println("已经有20人秒杀成功,秒杀结束"); 
          } 
        } catch (Exception e) { 
          e.printStackTrace(); 
        } finally { 
          jedis1.close(); 
        } 
      }); 
    }
    executorService.shutdown(); 
  } 
}

# Redis分布式锁

原理:Redis使用set nx实现分布式锁。nx,当键不存在的时候才可以设置成功,保证了互斥性。

锁实现与分布式锁的关键

  • 共享资源互斥、使用串行化
  • 单应用中使用锁:单进程多线程抢占锁,有synchronized、ReentrantLock。
  • 分布式应用中使用锁:多进程多线程,控制分布式系统之间同步访问共享资源的一种方式。利用Redis的单进程单线程特性对共享资源进行串行化处理。

# 使用set命令实现(推荐)

set命令除了可以使用nx,还可以设置过期时间。

/** 
* 使用redis的set命令实现获取分布式锁 
* @param lockKey 可以就是锁 
* @param requestId 请求ID,保证同一性 uuid+threadID 
* @param expireTime 过期时间,避免死锁 
* @return 
*/ 
public boolean getLock(String lockKey, String requestId, int expireTime) { 
  // NX:保证互斥性,hset 原子性操作 只要lockKey有效并且能setnx成功,则说明没有进程在使用分布式锁 
  String result = jedis.set(lockKey, requestId, "NX", "EX", expireTime); 
  if("OK".equals(result)) { 
    return true; 
  }
  return false; 
}

# 使用setnx命令实现(并发会有问题)

由于setnx之后还要设置过期时间,整个操作在并发的时候,可能会有问题。

public boolean getLock(String lockKey, String requestId, int expireTime) { 
  Long result = jedis.setnx(lockKey, requestId); 
  if(result == 1) { 
    // 成功设置,如果进程在这down,那么永久有效,别的进程就无法获得锁 
    jedis.expire(lockKey, expireTime); 
    return true; 
  }
  return false; 
}

# del命令释放锁(并发有问题)

del命令释放锁的问题在于,如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。比如客户端A加锁,一段时间之后客户端A解锁,在执行 jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了。

/*** 
* 释放分布式锁 
* @param lockKey 
* @param requestId 
*/ public static void releaseLock(String lockKey,String requestId) { 
  if (requestId.equals(jedis.get(lockKey))) { 
    jedis.del(lockKey); 
  } 
}

# redis+lua脚本释放锁(推荐)

public static boolean releaseLock(String lockKey, String requestId) { 
  String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; 
  Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); 
  if (result.equals(1L)) { 
    return true; 
  }
  return false; 
}

# redis分布式锁存在的问题

单机Redis,无法保证高可用。

主从Redis集群无法保证数据的强一致性,而且在主机宕机时会可能会造成锁的重复获得以及无法续租。

  • 重复获得:指主从之间复制存在延迟,假如客户端A在主节点获取锁Lock成功,但是不巧的是,这个时候主节点宕机了而且数据并没有同步到从节点,当从节点晋升为主节点,客户端B获取锁Lock是可能成功的,这就是锁重复获得。
  • 无法续租:指锁过期后,不能继续使用。

既然Redis实现分布式锁有这些问题,为什么还用Redis实现???

与业务有关。Redis分布式锁是AP模型(高可用模型),只要最终一致性就好。当业务不需要数据强一致性时,比如:社交场景,就可以使用Redis实现分布式锁,当业务必须要数据的强一致性,即不允许重复获得锁,比如金融场景(重复下单,重复转账)就不要使 用可以使用CP模型的实现方案,比如:zookeeper和etcd。

# Redission分布式锁的使用

Redisson (opens new window)框架是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。

Redisson基于NIO的Netty框架上的分布式锁。

# pom.xml

加入jar包的依赖

<dependency> 
  <groupId>org.redisson</groupId> 
  <artifactId>redisson</artifactId> 
  <version>2.7.0</version> 
</dependency>

# 配置Redisson

public class RedissonManager { 
  private static Config config = new Config(); 
  // 声明redisso对象 
  private static Redisson redisson = null; 
  // 实例化 
  redisson static{ 
    config.useClusterServers() 
    // 集群状态扫描间隔时间,单位是毫秒 
    .setScanInterval(2000) 
    // cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用) 
    .addNodeAddress("redis://127.0.0.1:7001" ) 
    .addNodeAddress("redis://127.0.0.1:7002") 
    .addNodeAddress("redis://127.0.0.1:7003") 
    .addNodeAddress("redis://127.0.0.1:7004") 
    .addNodeAddress("redis://127.0.0.1:7005") 
    .addNodeAddress("redis://127.0.0.1:7006"); 
    // 得到redisson对象 
    redisson = (Redisson) Redisson.create(config); 
  }
  
  /** 
  * 获取redisson对象的方法
  */
  public static Redisson getRedisson(){
    return redisson; 
  } 
}

# 锁的获取和释放

public class DistributedRedisLock { 
  /** 
   * 从配置类中获取redisson对象
   */
  private static Redisson redisson = RedissonManager.getRedisson(); 
  
  private static final String LOCK_TITLE = "redisLock_"; 
  
  /** 
   * 加锁
   */ 
  public static boolean acquire(String lockName){ 
    // 声明key对象 
    String key = LOCK_TITLE + lockName; 
    // 获取锁对象 
    RLock mylock = redisson.getLock(key); 
    // 加锁,并且设置锁过期时间3秒,防止死锁的产生 uuid+threadId 
    mylock.lock(2, 3, TimeUtil.SECOND); 
    // 加锁成功 
    return true; 
  } 
  
  /** 
   * 锁的释放
   */ 
  public static void release(String lockName){ 
    // 必须是和加锁时的同一个key 
    String key = LOCK_TITLE + lockName; 
    // 获取所对象 
    RLock mylock = redisson.getLock(key); 
    // 释放锁(解锁) 
    mylock.unlock(); 
  } 
}

# 业务逻辑中使用分布式锁

public String discount() throws IOException{ 
  String key = "lock001"; 
  // 加锁 
  DistributedRedisLock.acquire(key); 
  // 执行具体业务逻辑 
  dosoming 
  // 释放锁 
  DistributedRedisLock.release(key); 
  return "reslut"; 
}

# Redisson分布式锁的实现原理

redission1

# 加锁机制

如果该客户端面对的是一个redis cluster集群,他首先会根据hash节点选择一台机器。发送lua脚本到redis服务器上,脚本如下:

-- 第一个判断,看有没有锁,无锁就加锁、设置过期时间,加锁成功
"if (redis.call('exists',KEYS[1])==0) then "+ 
"redis.call('hset',KEYS[1],ARGV[2],1) ; "+ 
"redis.call('pexpire',KEYS[1],ARGV[1]) ; "+ 
"return nil; end ;" + 
-- 第二个判断 判断是不是当前客户端(我)加的锁,是的话 重入锁 
"if (redis.call('hexists',KEYS[1],ARGV[2]) ==1 ) then "+ 
"redis.call('hincrby',KEYS[1],ARGV[2],1) ; "+ 
"redis.call('pexpire',KEYS[1],ARGV[1]) ; "+ 
"return nil; end ;" + 
-- 如果都满足一、二条件,则表示不能加锁,返回锁的时间
"return redis.call('pttl',KEYS[1]) ;"

lua脚本能保证这段复杂业务逻辑执行的原子性。lua参数解释:

  • KEYS[1]) : 加锁的key
  • ARGV[1] : key的生存时间,默认为30秒
  • ARGV[2] : 加锁的客户端ID (UUID.randomUUID()) + “:” + threadId)

第一段if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。如何加锁呢?很简单,用下面的命令:

hset myLock 8743c9c0-0795-4907-87fd-6c719a6b4586:1 1

通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:

myLock: {"8743c9c0-0795-4907-87fd-6c719a6b4586:1":1 }

上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”这个客户端对“myLock”这个锁key完成了加锁。接着会执行“pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒。

# 锁互斥机制

那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会怎么样呢?

很简单,第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。接着第二个if判断,判断一下,myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。

所以,客户端2会获得到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时间(TTL)。比如还剩15000毫秒的生存时间。

此时客户端2会进入一个while循环,不停的尝试加锁。

# 自动延时机制

只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

# 可重入锁机制

假如客户端,又继续加锁。第一个if判断肯定不成立,“exists myLock”会显示锁key已经存在了。

第二个if判断会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”,此时就会执行可重入加锁的逻辑

incrby myLock 8743c9c0-0795-4907-87fd-6c71a6b4586:1 1

通过这个命令,对客户端1的加锁次数,累加1。数据结构会变成:

myLock :{"8743c9c0-0795-4907-87fd-6c719a6b4586:1":2 }

# 释放锁机制

执行lua脚本如下:

-- 如果key已经不存在,说明已经被解锁,直接发布(publish)redis消息 
"if (redis.call('exists', KEYS[1]) == 0) then " + 
"redis.call('publish', KEYS[2], ARGV[1]); " + 
"return 1; " + "end;" + 
-- key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。 即不是我加的锁 不能解锁 
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + 
"return nil;" + "end; " + 
-- 将value减1 
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + 
-- 如果 counter> 0 说明锁在重入,不能删除key 
"if (counter > 0) then " + 
"redis.call('pexpire', KEYS[1], ARGV[2]); " + 
"return 0; " + 
-- 否则,删除key并且publish 解锁消息 
"else " + 
"redis.call('del', KEYS[1]); " + 
-- 删除锁 
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+ "end; " + 
"return nil;",
  • KEYS[1] :需要加锁的key,这里需要是字符串类型。

  • KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:

    “redisson_lockchannel{” + getName() + “}”
    
  • ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。

  • ARGV[2] :锁的超时时间,防止死锁

  • ARGV[3] :锁的唯一标识,也就是刚才介绍的 id

    UUID.randomUUID() + “:” + threadId
    

如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。

简单来说,释放锁就是每次都对myLock数据结构中的那个加锁次数减1。如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:“del myLock”命令,从redis里删除这个key。然后呢,另外的客户端2就可以尝试完成加锁了。

# 分布式锁的实际应用

# 数据并发竞争

利用分布式锁可以将处理串行化,前面已经讲过了。

# 防止库存超卖

例如,下面的例子,并发的时候,订单1和订单2同时下单:

  • 订单1下单前会先查看库存,库存为10,所以下单5本可以成功;
  • 订单2下单前会先查看库存,库存为10,所以下单8本可以成功;

订单1和订单2 同时操作,共下单13本,但库存只有10本,显然库存不够了,这种情况称为库存超卖。

oversold

可以采用分布式锁解决这个问题。订单1和订单2都从Redis中获得分布式锁(setnx),谁能获得锁谁进行下单操作,这样就把订单系统下单的顺序串行化了,就不会出现超卖的情况了。

伪码如下:

// 加锁并设置有效期 
if(redis.lock("RDL", 200)){ 
  // 判断库存 
  if (orderNum < getCount()){ 
    // 加锁成功, 可以下单 
    order(5); 
    // 释放锁 
    redis,unlock("RDL"); 
  } 
}

注意,此种方法会降低处理效率,这样不适合秒杀的场景,秒杀可以使用CAS和Redis队列的方式。

oversold1

# 分布式锁的对比

除了Redis分布式锁,其他方案还有:

  • 基于zookeeper临时节点的分布式锁
  • 基于etcd实现

zookeeper分布式锁流程如下

zookeeper_lock.png

三者的对比,如下表

Redis zookeeper etcd
一致性算法 无 paxos(ZAB) raft
CAP AP CP CP
高可用 主从集群 n+1 (n至少为2) n+1
接口类型 客户端 客户端 http/grpc
实现 setNX createEphemeral restful API
上次更新: 5/30/2023, 11:42:20 PM
最近更新
01
2025
01-15
02
Elasticsearch面试题
07-17
03
Elasticsearch进阶
07-16
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式