1. 概述
当前主流大型互联网的业务系统的业务都是高度复杂,技术架构都是分布式,这样提高可用性和和稳定性的同时,也带来多节点在同时段执行的过程中相互干扰,这在对同一资源的处理会产生数据的不一致。
那如何能保证在同一时刻该资源只能被一个应用处理,而不能并发处理。答案就是引入一种分布式协调机制来调度,而这种分布式协调机制的核心我们称之为分布式锁。

- 成员变量 V 存在 JVM_1、JVM_2、JVM_3 三个 JVM 内存中
- 成员变量 V 同时都会在JVM分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的
- 不是同时发过来,三个请求分别操作三个不同 JVM 内存区域的数据,变量 V 之间不存在共享,也不具有可见性,处理的结果也是不对的
注:在类中,该成员变量V是一个有状态的对象。
2. 特点
- 在分布式系统环境下,同一个方法在同一时间只能被一个机器的同一个线程执行;
- 高可用的获取锁与释放锁;
- 高性能的获取锁与释放锁;
- 具备可重入特性;
- 具备锁的自我失效机制,防止死锁;
- 满足非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
3. 实现方式
根据分布式的CAP理论我们了解“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”
所以我们在系统设计之初就分析调研,根据分析调研结果对这三者做取舍。
借鉴在主流互联网的经验,通常靠牺牲一致性 来换取系统的高可用性,系统的“一致性”保障只能用“最终一致性”来保证,这个最终时间需要在可控可接受的范围内。很多场景下,我们为了保证最终一致性,都会做很多技术方案来支持,比如分布式事务、分布式锁。
在分布式锁的技术实现上,主流认可有三种实现方式,从复杂度来看,由难至易依次增加:
基于数据库实现分布式锁;
基于缓存(Redis/Memcached等)实现分布式锁;
基于Zookeeper实现分布式锁;
无论哪一种方式,都不可能完美,需要根据实际业务场景做出选择。
4. 数据库实现
4.1. 前提
在了解数据库实现分布式锁的之前,我们首了解乐观锁(Optimistic Lock)和悲观锁(Pessimistic Lock)。
4.1.1. 乐观锁
每次去取数据的时候都会认为不会有其他线程对数据进行修改,因此不会上锁,但是在更新时会判断其他线程在这之前有没有对数据进行修改,一般会使用版本号机制或CAS操作实现;乐观锁适用于只“读” 的应用场景,这样做可以提高吞吐量,像数据库使用的write_condition的机制,本身就是乐观锁。
4.1.2. 悲观锁
每次取数据时都认为其他线程会修改,所以都会加锁(读锁、写锁、行锁等),当其他线程想要访问数据时,都需要阻塞(block)挂起。可以依靠数据库实现,如行锁、读锁和写锁等,都是在操作之前加锁,它对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度,因此,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。在 Java中,synchronized的思想也是悲观锁。
乐观锁、悲观锁两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行retry,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。本质上,数据库的乐观锁做法和悲观锁做法主要就是避免丢失更新问题:
4.2. 乐观锁实现
从上面的前提概要我们知道,乐观锁机制其实就是在数据库表中引入一个版本号(version)字段来实现的。
当我们要从数据库中读取数据的时候,同时把这个version字段也读出来,如果要对读出来的数据进行修改写回数据库时,则需要将version加1,同时将新的数据与新的version更新到数据表中,且必须在更新的时候同时检查目前数据库里version值是不是之前的那个version,如果是,则正常更新。如果不是,则更新失败,说明在这个过程中有其它的进程去更新过数据了。

假定同一账号下,小明和小明媳妇同时取银行进行取款操作,账户余额200¥,小明取款100¥,小明媳妇取款150¥。
没有锁机制的场景下,可能会出现账户同时被扣款150¥和100¥,导致账户余额出现负数情况,这样的话银行可能混不下去。
如果我们用到锁机制,小明和小明媳妇都在取款时候,除看到余额的操作,还在后台事务中读取版本号version=1,不论小明和小明媳妇先成功执行取款操作,都会将版本号version+1,即version=2,没有成功执行的那一方再去操作时候读取出来的version是2,那么就会触发重新获取余额。
乐观锁关键点:
4.3. 悲观锁实现
4.3.1. 方案1_for update
Oracle、Mysql中是基于for update来实现加锁的。在MYSQL中需要注意的是,在InnoDB中只有字段加了索引的,才会是行级锁,否者是表级锁,所以一定要对where的条件字段加索引。
当这条记录加上排它锁之后,其它线程是无法操作这条记录的。
4.3.2. 方案2_唯一性约束
对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功。
1 2
| INSERT INTO method_lock (method_name, desc) VALUES ('methodName', 'methodName');
|
4.3.3. 方案3_查询并占有
先获取锁的信息:
1
| select id, method_name, state,version from method_lock where state=1 and method_name='methodName';
|
再占有
1 2 3
| update t_resoure set state=2, version=2, update_time=now() where method_name='methodName' and state=1 and version=2;
|
如果没有更新影响到一行数据,则说明这个资源已经被别人占位。
4.3.4. 附 表结构
1 2 3 4 5 6 7 8 9 10 11 12
| DROP TABLE IF EXISTS method_lock;
CREATE TABLE method_lock ( lck_id INT(11) NOT NULL AUTO_INCREMENT COMMENT '主键', method_name VARCHAR(64) NOT NULL COMMENT '锁定的方法名', state TINYINT NOT NULL COMMENT '1:未分配;2:已分配', update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, ver INT NOT NULL COMMENT '版本号', PRIMARY KEY (lck_id), UNIQUE KEY uidx_method_name (method_name) USING BTREE ) ENGINE=INNODB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
|
4.4. 总结
4.4.1. 缺点
这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
配置实现细节
数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
非阻塞的?搞一个while循环,直到insert成功再返回成功。
非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。
5. Redis实现
基于Redis实现的锁机制,主要是依赖redis自身的原子操作,因为redis是单线程。要求redis版本大于2.6.12。
5.1. redis单实例
版本号大家以实际使用中的为准,我这里仅供参考,因为spring-boot的自动注册功能会为我们提供StringRedisTemplate,直接使用即可。
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.1.3.RELEASE</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| server: port: 9090
spring: datasource: name: mysql type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/springboot?characterEncoding=utf-8&useSSL=true username: root password: 123456 druid: initial-size: 5 min-idle: 5 max-active: 20 max-wait: 30000 time-between-eviction-runs-millis: 60000 min-evictable-idle-time-millis: 300000 validation-query: select 1 test-while-idle: true test-on-borrow: false test-on-return: false pool-prepared-statements: false max-pool-prepared-statement-per-connection-size: 20 connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=6000
redis: database: 0 host: 127.0.0.1 port: 6379
mybatis: mapperLocations: classpath:mapper/**/*.xml
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
| package xyz.wongs.weathertop.comp;
import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit;
@Component @Slf4j public class RedisLockComponent {
@Autowired private StringRedisTemplate stringRedisTemplate;
/** * @Description 获取锁,默认:失效时间为5,失效时间的单位为秒,重试次数为3,休眠4秒 * @param key * @param value * @param expire redis过期时间 * @return boolean * @throws * @date 2019/11/16 21:37 */ public boolean getLock(String key,String value){ return getLock(key,value,5, TimeUnit.SECONDS,3,5000); }
/** * @Description 获取锁,默认:失效时间的单位为秒,重试次数为3,休眠4秒 * @param key * @param value * @param expire redis过期时间 * @return boolean * @throws * @date 2019/11/16 21:37 */ public boolean getLock(String key,String value,long expire){ return getLock(key,value,expire, TimeUnit.SECONDS,3,5000); }
/** * @Description 获取锁,默认重试次数为3,休眠5秒 * @param key * @param value * @param expire redis过期时间 * @param unit * @return boolean * @throws * @date 2019/11/16 21:37 */ public boolean getLock(String key,String value,long expire, TimeUnit unit){ return getLock(key,value,expire, unit,3,5000); }
/** * @Description * @param key * @param value * @param expire redis过期时间 * @param unit * @param tryCount 重试次数 * @param waitMillis 每次重试要等待时间 * @return boolean * @throws * @date 2019/11/16 21:37 */ public boolean getLock(String key,String value,long expire, TimeUnit unit,int tryCount,int waitMillis){ //setIfAbsent如果键不存在则新增,存在则不改变已经有的值。 boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key,value,expire,unit); if(success) { return true; } //判断锁超时 - 防止原来的操作异常,没有运行解锁操作 防止死锁 String val = stringRedisTemplate.opsForValue().get(key); if(!Strings.isNullOrEmpty(val)){ if(System.currentTimeMillis() - Long.parseLong(val) > unit.toMillis(expire)){ // 超时移除 stringRedisTemplate.delete(key); } } // 重试、等待 if (tryCount > 0 && waitMillis > 0) { try { Thread.sleep(waitMillis); } catch (InterruptedException e) { log.error("getLock exception{}",e.getMessage()); } return getLock(key,value,expire,unit,tryCount - 1,waitMillis); } return false; }
/** * @Description 获取等待时间 * @param key * @param expire * @param unit * @return long 秒 * @throws * @date 2019/11/16 21:44 */ public long getWaitSecond(String key,long expire,TimeUnit unit) { long currentTime = System.currentTimeMillis(); long preTime = Long.parseLong(stringRedisTemplate.opsForValue().get(key)); return (preTime + unit.toMillis(expire) - currentTime) / 1000; }
/** * @Description 设置锁的过期时间,默认单位为毫秒 * @param key * @param expTime * @return Boolean * @throws * @date 2019/11/16 21:18 */ public Boolean renewal(String key,int expTime){ return renewal(key, expTime, TimeUnit.MILLISECONDS); }
/** * @Description 设置锁的过期时间 * @param key * @param expTime * @param unit * @return Boolean * @throws * @date 2019/11/16 21:18 */ public Boolean renewal(String key,int expTime,TimeUnit unit){ return stringRedisTemplate.expire(key, expTime, unit); }
/** * @Description 解锁 * @param key * @param val * @return void * @throws * @date 2019/11/16 21:13 */ public void unlock(String key,String val){ try { String value = stringRedisTemplate.opsForValue().get(key); if(!Strings.isNullOrEmpty(value) && val.equals(value) ){ // 删除锁状态 stringRedisTemplate.opsForValue().getOperations().delete(key); } } catch (Exception e) { log.error("unlock exception{}",e); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| package xyz.wongs.weathertop.web;
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import xyz.wongs.weathertop.base.message.enums.ResponseCode; import xyz.wongs.weathertop.base.message.response.ResponseResult; import xyz.wongs.weathertop.comp.RedisLockComponent; import xyz.wongs.weathertop.deno.entity.RedisLock; import xyz.wongs.weathertop.deno.mapper.RedisLockMapper;
import java.util.concurrent.TimeUnit;
@RestController @Slf4j public class RedisController {
@Autowired private RedisLockMapper redisLockMapper;
@Autowired private RedisLockComponent redisLockComponent;
/** * 超时时间 5s */ private static final int TIMEOUT = 3;
@RequestMapping(value = "/seckilling/{key}") public ResponseResult Seckilling(@PathVariable("key") String key){ ResponseResult responseResult = new ResponseResult(); //1、加锁 String value = System.currentTimeMillis() + ""; if(!redisLockComponent.getLock(key,value,6)){ responseResult.setStatus(false); responseResult.setCode(ResponseCode.DICT_LOCK_FAIL.getCode()); responseResult.setMsg("排队人数太多,请稍后再试."); return responseResult; }
RedisLock redisLock = redisLockMapper.selectByPrimaryKey(1); // 2、查询库存,为0则活动结束 if(redisLock.getCounts()==0){ responseResult.setStatus(false); responseResult.setCode(ResponseCode.RESOURCE_NOT_EXIST.getCode()); responseResult.setMsg("库存不够."); return responseResult; } //3、减库存 redisLock.setCounts(redisLock.getCounts()-1); redisLockMapper.updateByPrimaryKeySelective(redisLock); try{ Thread.sleep(5000);//模拟减库存的处理时间 }catch (InterruptedException e){ e.printStackTrace(); } //4、释放锁 redisLockComponent.unlock(key,value); responseResult.setMsg("恭喜您,秒杀成功。"); return responseResult; } }
|

5.2. redis集群实现
实际上我们为了高可用,降低单机故障概率,肯定将redis集群模式部署,这样情况下我们整合redisson,由于篇幅有限,这里暂时挖个坑,就不探讨这个,下一篇再说
6. Zookeeper实现
在上一节,给大家演示基于Redis的实现,这一章节我们来看另一种分布式锁的实现——Zookeeper。
关于Zookeeper的为什么能实现分布式锁,这里只说明zookeeper核心保存结构是DataTree数据结构,内部实现基于Map<String, DataNode>的数据结构,其他详细内容大家自行取官网查阅,这里也不赘述。
Zookeeper官方描述

Zookeeper中提供了节点类型主要有:
持久节点:节点创建后,将一直存在,直到有删除操作来主动清除。
顺序节点:假如当前一个父节点为/lock,可以在该父节点下面创建子节点;Zookeeper本身提供了可选的有序特性,例如我们可以创建子节点“/lock/test_”并且指明有序,那么Zookeeper在生成子节点时会根据当前子节点数量自动添加整数序号,如果第一个子节点为/lock/test_0000000000,下一个节点则为/lock/test_0000000001,依次类推。
临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点。
Zookeeper从诞生至今,市面上也有基于zk作为分布式锁实现的成熟框架,这里我们就以Curator就是代表 ,它就是Netflix开源的一套ZooKeeper客户端框架,它提供zk场景的绝大部分实现,使用Curator就不必关心其内部算法,Curator为使用者提供操作锁的API,比如获取锁、释放锁等,在实际开发中都非常方便,最后别忘记代码的finally代码块中,要最终确保锁能正确释放即可。
Curator提供了四种分布式锁:
InterProcessMutex:分布式可重入排它锁
InterProcessSemaphoreMutex:分布式排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:将多个锁作为单个实体管理的容器
说了这么多,不上代码,许多老铁恐怕都要犯困,没激情,下来我们用Coding带大家了解Zookeeper如何实现分布式锁,我们还是以Springboot基本框架,写一个案例。
6.1. 引入POM依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency>
|
6.2. application文件
1 2 3 4 5 6
| curator: retryCount: 5 elapsedTimeMs: 5000 connectString: 192.168.147.132:2181 sessionTimeoutMs: 60000 connectionTimeoutMs: 5000
|
6.3. 核心实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| /** * @Description 获取分布式锁 * @param path 提供可供写入的路径 * @return void * @throws * @date 2019/11/4 9:36 */ public boolean acquireDistributedLock(String path) {
boolean lock = true; String keyPath = "/" + ROOT_PATH_LOCK + "/" + path; while (true) { try { curatorFramework .create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(keyPath); log.info("success to acquire lock for path:{}", keyPath); break; } catch (Exception e) { log.info("failed to acquire lock for path:{}", keyPath); log.info("while try again ......."); try { if (countDownLatch.getCount() <= 0) { countDownLatch = new CountDownLatch(1); } countDownLatch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } lock = false; } } return lock; }
/** * @Description * @param path 释放分布式锁 * @return boolean * @throws * @date 2019/11/4 9:36 */ public boolean releaseDistributedLock(String path) { boolean release = true; String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;; try { if (curatorFramework.checkExists().forPath(keyPath) != null) { curatorFramework.delete().forPath(keyPath); } } catch (Exception e) { log.error("failed to release lock"); release = false; } return release; }
|
6.4. 演示结果
我用webcontroller实现一个restfull接口,同时打开两个URL获取同一把锁,获取的为成功,否则失败。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @GetMapping("/lock10") public ResponseResult getLock1() { ResponseResult responseResult = new ResponseResult(); Boolean acquire = distributedLockByZookeeper.acquireDistributedLock(PATH); try { if(acquire) { log.error("I am lock1,i am updating resource……!!!"); Thread.sleep(2000); } else{ responseResult.setCode(ResponseCode.SYSNC_LOCK.getCode()); responseResult.setMsg(ResponseCode.SYSNC_LOCK.getMsg()); } } catch (InterruptedException e) { e.printStackTrace(); } finally { distributedLockByZookeeper.releaseDistributedLock(PATH); } return responseResult; }
|


7. 三种锁的比较
这三种方式都不是尽善尽美,如同CAP,在复杂性、可靠性、性能等方面皆无法同时满足,所以,根据实际业务情况选择最适合的方案,优雅的解决问题,少带来弊端即可。
实现难易程度自低至高:数据库 、 缓存 、 Zookeeper
实现的复杂性自低至高:Zookeeper 、缓存 、数据库
从性能角度自高到低:缓存 、Zookeeper 、 数据库
从可靠性角度自高到低:Zookeeper 、缓存 、数据库
在技术层面Redis是nosql数据,而Zookeeper是分布式协调工具,都用于分布式解决方案;防死锁的实现上Redis是通过对key设置有效期来解决死锁,而Zookeeper使用会话有效期方式解决死锁现象;数据库的实现上,高并发过程中对库的压力会增大
8. 源码
Github源码地址
Gitee源码地址