分布式锁实现原理与最佳实践
阿里妹导读
一、超卖问题复现
1.1 现象
商品表
订单表
订单item表
(rollbackFor = Exception.class)
public Long createOrder() throws Exception {
Product product = productMapper.selectByPrimaryKey(purchaseProductId);
// ... 忽略校验逻辑
//商品当前库存
Integer currentCount = product.getCount();
//校验库存
if (purchaseProductNum > currentCount) {
throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买");
}
// 计算剩余库存
Integer leftCount = currentCount - purchaseProductNum;
// 更新库存
product.setCount(leftCount);
product.setGmtModified(new Date());
productMapper.updateByPrimaryKeySelective(product);
Order order = new Order();
// ... 省略 Set
orderMapper.insertSelective(order);
OrderItem orderItem = new OrderItem();
orderItem.setOrderId(order.getId());
// ... 省略 Set
return order.getId();
}
(rollbackFor = Exception.class)
public Long createOrder() throws Exception {
Product product = productMapper.selectByPrimaryKey(purchaseProductId);
// ... 忽略校验逻辑
//商品当前库存
Integer currentCount = product.getCount();
//校验库存
if (purchaseProductNum > currentCount) {
throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买");
}
// 使用 set count = count - #{purchaseProductNum,jdbcType=INTEGER}, 更新库存
productMapper.updateProductCount(purchaseProductNum,new Date(),product.getId());
Order order = new Order();
// ... 省略 Set
orderMapper.insertSelective(order);
OrderItem orderItem = new OrderItem();
orderItem.setOrderId(order.getId());
// ... 省略 Set
return order.getId();
}
(rollbackFor = Exception.class)
public synchronized Long createOrder() throws Exception {
Product product = productMapper.selectByPrimaryKey(purchaseProductId);
// ... 忽略校验逻辑
//商品当前库存
Integer currentCount = product.getCount();
//校验库存
if (purchaseProductNum > currentCount) {
throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买");
}
// 使用 set count = count - #{purchaseProductNum,jdbcType=INTEGER}, 更新库存
productMapper.updateProductCount(purchaseProductNum,new Date(),product.getId());
Order order = new Order();
// ... 省略 Set
orderMapper.insertSelective(order);
OrderItem orderItem = new OrderItem();
orderItem.setOrderId(order.getId());
// ... 省略 Set
return order.getId();
}
1.2 解决办法
单体应用:使用本地锁 + 数据库中的行锁解决
分布式应用:
使用数据库中的乐观锁,加一个 version 字段,利用CAS来实现,会导致大量的 update 失败
使用数据库维护一张锁的表 + 悲观锁 select,使用 select for update 实现
使用Redis 的 setNX实现分布式锁
使用zookeeper的watcher + 有序临时节点来实现可阻塞的分布式锁
使用Redisson框架内的分布式锁来实现
使用curator 框架内的分布式锁来实现
二、单体应用解决超卖的问题
正确示例:将事务包含在锁的控制范围内
//@Transactional(rollbackFor = Exception.class)
public synchronized Long createOrder() throws Exception {
TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
Product product = productMapper.selectByPrimaryKey(purchaseProductId);
if (product == null) {
platformTransactionManager.rollback(transaction1);
throw new Exception("购买商品:" + purchaseProductId + "不存在");
}
//商品当前库存
Integer currentCount = product.getCount();
//校验库存
if (purchaseProductNum > currentCount) {
platformTransactionManager.rollback(transaction1);
throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买");
}
productMapper.updateProductCount(purchaseProductNum, new Date(), product.getId());
Order order = new Order();
// ... 省略 Set
orderMapper.insertSelective(order);
OrderItem orderItem = new OrderItem();
orderItem.setOrderId(order.getId());
// ... 省略 Set
return order.getId();
platformTransactionManager.commit(transaction1);
}
正确示例:使用synchronized的代码块
public Long createOrder() throws Exception {
Product product = null;
//synchronized (this) {
//synchronized (object) {
synchronized (DBOrderService2.class) {
TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
product = productMapper.selectByPrimaryKey(purchaseProductId);
if (product == null) {
platformTransactionManager.rollback(transaction1);
throw new Exception("购买商品:" + purchaseProductId + "不存在");
}
//商品当前库存
Integer currentCount = product.getCount();
System.out.println(Thread.currentThread().getName() + "库存数:" + currentCount);
//校验库存
if (purchaseProductNum > currentCount) {
platformTransactionManager.rollback(transaction1);
throw new Exception("商品" + purchaseProductId + "仅剩" + currentCount + "件,无法购买");
}
productMapper.updateProductCount(purchaseProductNum, new Date(), product.getId());
platformTransactionManager.commit(transaction1);
}
TransactionStatus transaction2 = platformTransactionManager.getTransaction(transactionDefinition);
Order order = new Order();
// ... 省略 Set
orderMapper.insertSelective(order);
OrderItem orderItem = new OrderItem();
// ... 省略 Set
orderItemMapper.insertSelective(orderItem);
platformTransactionManager.commit(transaction2);
return order.getId();
正确示例:使用Lock
private Lock lock = new ReentrantLock();
public Long createOrder() throws Exception{
Product product = null;
lock.lock();
TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
try {
product = productMapper.selectByPrimaryKey(purchaseProductId);
if (product==null){
throw new Exception("购买商品:"+purchaseProductId+"不存在");
}
//商品当前库存
Integer currentCount = product.getCount();
System.out.println(Thread.currentThread().getName()+"库存数:"+currentCount);
//校验库存
if (purchaseProductNum > currentCount){
throw new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买");
}
productMapper.updateProductCount(purchaseProductNum,new Date(),product.getId());
platformTransactionManager.commit(transaction1);
} catch (Exception e) {
platformTransactionManager.rollback(transaction1);
} finally {
// 注意抛异常的时候锁释放不掉,分布式锁也一样,都要在这里删掉
lock.unlock();
}
TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
Order order = new Order();
// ... 省略 Set
orderMapper.insertSelective(order);
OrderItem orderItem = new OrderItem();
// ... 省略 Set
orderItemMapper.insertSelective(orderItem);
platformTransactionManager.commit(transaction);
return order.getId();
}
三、常见分布式锁的使用
3.1 数据库乐观锁
3.2 数据库分布式锁
// 加上事务就是为了 for update 的锁可以一直生效到事务执行结束
// 默认回滚的是 RunTimeException
(rollbackFor = Exception.class)
public String singleLock() throws Exception {
log.info("我进入了方法!");
DistributeLock distributeLock = distributeLockMapper.
selectDistributeLock("demo");
if (distributeLock==null) {
throw new Exception("分布式锁找不到");
}
log.info("我进入了锁!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "我已经执行完成!";
}
<select id="selectDistributeLock" resultType="com.deltaqin.distribute.model.DistributeLock">
select * from distribute_lock
where businessCode =
for update
</select>
private MethodlockMapper methodlockMapper;
public boolean tryLock() {
try {
//插入一条数据 insert into
methodlockMapper.insert(new Methodlock("lock"));
}catch (Exception e){
//插入失败
return false;
}
return true;
}
public void waitLock() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void unlock() {
//删除数据 delete
methodlockMapper.deleteByMethodlock("lock");
System.out.println("-------释放锁------");
}
3.3 Redis setNx
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring.redis.host=localhost
public class RedisLock implements AutoCloseable {
private RedisTemplate redisTemplate;
private String key;
private String value;
//单位:秒
private int expireTime;
/**
* 没有传递 value,因为直接使用的是随机值
*/
public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){
this.redisTemplate = redisTemplate;
this.key = key;
this.expireTime=expireTime;
this.value = UUID.randomUUID().toString();
}
/**
* JDK 1.7 之后的自动关闭的功能
*/
public void close() throws Exception {
unLock();
}
/**
* 获取分布式锁
* SET resource_name my_random_value NX PX 30000
* 每一个线程对应的随机值 my_random_value 不一样,用于释放锁的时候校验
* NX 表示 key 不存在的时候成功,key 存在的时候设置不成功,Redis 自己是单线程,串行执行的,第一个执行的才可以设置成功
* PX 表示过期时间,没有设置的话,忘记删除,就会永远不过期
*/
public boolean getLock(){
RedisCallback<Boolean> redisCallback = connection -> {
//设置NX
RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
//设置过期时间
Expiration expiration = Expiration.seconds(expireTime);
//序列化key
byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
//序列化value
byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
//执行setnx操作
Boolean result = connection.set(redisKey, redisValue, expiration, setOption);
return result;
};
//获取分布式锁
Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
return lock;
}
/**
* 释放锁的时候随机数相同的时候才可以释放,避免释放了别人设置的锁(自己的已经过期了所以别人才可以设置成功)
* 释放的时候采用 LUA 脚本,因为 delete 没有原生支持删除的时候校验值,证明是当前线程设置进去的值
* 脚本是在官方文档里面有的
*/
public boolean unLock() {
// key 是自己才可以释放,不是就不能释放别人的锁
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class);
List<String> keys = Arrays.asList(key);
// 执行脚本的时候传递的 value 就是对应的值
Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
log.info("释放锁的结果:"+result);
return result;
}
}
public String redisLock(){
log.info("我进入了方法!");
try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){
if (redisLock.getLock()) {
log.info("我进入了锁!!");
Thread.sleep(15000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
log.info("方法执行完成");
return "方法执行完成";
}
3.4 zookeeper 瞬时znode节点 + watcher监听机制
多线程并发创建瞬时节点的时候,得到有序的序列,序号最小的线程可以获得锁;
其他的线程监听自己序号的前一个序号。前一个线程执行结束之后删除自己序号的节点;
下一个序号的线程得到通知,继续执行;
以此类推,创建节点的时候,就确认了线程执行的顺序。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
zk 的观察器只可以监控一次,数据发生变化之后可以发送给客户端,之后需要再次设置监控。exists、create、getChildren三个方法都可以添加watcher ,也就是在调用方法的时候传递true就是添加监听。注意这里Lock 实现了Watcher和AutoCloseable:
/**
* 自己本身就是一个 watcher,可以得到通知
* AutoCloseable 实现自动关闭,资源不使用的时候
*/
4j
public class ZkLock implements AutoCloseable, Watcher {
private ZooKeeper zooKeeper;
/**
* 记录当前锁的名字
*/
private String znode;
public ZkLock() throws IOException {
this.zooKeeper = new ZooKeeper("localhost:2181",
10000,this);
}
public boolean getLock(String businessCode) {
try {
//创建业务 根节点
Stat stat = zooKeeper.exists("/" + businessCode, false);
if (stat==null){
zooKeeper.create("/" + businessCode,businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
//创建瞬时有序节点 /order/order_00000001
znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
//获取业务节点下 所有的子节点
List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
//获取序号最小的(第一个)子节点
Collections.sort(childrenNodes);
String firstNode = childrenNodes.get(0);
//如果创建的节点是第一个子节点,则获得锁
if (znode.endsWith(firstNode)){
return true;
}
//如果不是第一个子节点,则监听前一个节点
String lastNode = firstNode;
for (String node:childrenNodes){
if (znode.endsWith(node)){
zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
break;
}else {
lastNode = node;
}
}
synchronized (this){
wait();
}
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
public void close() throws Exception {
zooKeeper.delete(znode,-1);
zooKeeper.close();
log.info("我已经释放了锁!");
}
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted){
synchronized (this){
notify();
}
}
}
}
3.5 zookeeper curator
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
"start",destroyMethod = "close") (initMethod=
public CuratorFramework getCuratorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.
newClient("localhost:2181", retryPolicy);
return client;
}
框架已经实现了分布式锁。zk的Java客户端升级版。使用的时候直接指定重试的策略就可以。
@Autowired
private CuratorFramework client;
@Test
public void testCuratorLock(){
InterProcessMutex lock = new InterProcessMutex(client, "/order");
try {
if ( lock.acquire(30, TimeUnit.SECONDS) ) {
try {
log.info("我获得了锁!!!");
}
finally {
lock.release();
}
}
} catch (Exception e) {
e.printStackTrace();
}
client.close();
}
3.6 Redission
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.2</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
编写相应的redisson.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:redisson="http://redisson.org/schema/redisson"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://redisson.org/schema/redisson
http://redisson.org/schema/redisson/redisson.xsd
">
<redisson:client>
<redisson:single-server address="redis://127.0.0.1:6379"/>
</redisson:client>
</beans>
配置对应@ImportResource("classpath*:redisson.xml")资源文件。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.19.1</version>
</dependency>
public RedissonClient getRedissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
@Test
public void testRedissonLock() {
RLock rLock = redisson.getLock("order");
try {
rLock.lock(30, TimeUnit.SECONDS);
log.info("我获得了锁!!!");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
log.info("我释放了锁!!");
rLock.unlock();
}
}
3.7 Etcd
四、常见分布式锁的原理
4.1 Redisson
//lua脚本命令执行方式:redis-cli --eval /tmp/test.lua , 10
jedis.set("product_stock_10016", "15");
//初始化商品10016的库存
String script = " local count = redis.call('get', KEYS[1]) " +
" local a = tonumber(count) " +
" local b = tonumber(ARGV[1]) " +
" if a >= b then " +
" redis.call('set', KEYS[1], a-b) " +
" return 1 " +
" end " +
" return 0 ";
Object obj = jedis.eval(script, Arrays.asList("product_stock_10016"),
Arrays.asList("10"));
System.out.println(obj);
4.2 RedLock解决非单体项目的Redis主从架构的锁失效
获取锁
获取当前时间T1,作为后续的计时依据;
按顺序地,依次向5个独立的节点来尝试获取锁 SET resource_name my_random_value NX PX 30000;
计算获取锁总共花了多少时间,判断获取锁成功与否;
时间:T2-T1;
多数节点的锁(N/2+1);
当获取锁成功后的有效时间,要从初始的时间减去第三步算出来的消耗时间;
如果没能获取锁成功,尽快释放掉锁。
释放锁
向所有节点发起释放锁的操作,不管这些节点有没有成功设置过。
public String redlock() {
String lockKey = "product_001";
//这里需要自己实例化不同redis实例的redisson客户端连接,这里只是伪代码用一个redisson客户端简化了
RLock lock1 = redisson.getLock(lockKey);
RLock lock2 = redisson.getLock(lockKey);
RLock lock3 = redisson.getLock(lockKey);
/**
* 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)
*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
/**
* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
*/
boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);
if (res) {
//成功获得锁,在这里处理业务
}
} catch (Exception e) {
throw new RuntimeException("lock fail");
} finally {
//无论如何, 最后都要解锁
redLock.unlock();
}
return "end";
}
但是,它的实现建立在一个不安全的系统模型上的,它依赖系统时间,当时钟发生跳跃时,也可能会出现安全性问题。分布式存储专家Martin对RedLock的分析文章,Redis作者的也专门写了一篇文章进行了反驳。
Martin Kleppmann:How to do distributed locking
Antirez:Is Redlock safe?
4.3 Curator
五、业务中使用分布式锁的注意点
5.1 自己实现分布式锁的坑
public String deductStock() {
String lockKey = "lock:product_101";
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deltaqin");
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
stringRedisTemplate.delete(lockKey);
}
return "end";
}
public String deductStock() {
String lockKey = "lock:product_101";
String clientId = UUID.randomUUID().toString();
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS); //jedis.setnx(k,v)
if (!result) {
return "error_code";
}
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
// 卡在这里,锁过期了,其他线程又可以加锁,此时又把其他线程新加的锁删掉了
stringRedisTemplate.delete(lockKey);
}
}
return "end";
}
5.2 锁优化:分段加锁逻辑
product_10111_stock = 100
product_10111_stock1 = 20
product_10111_stock2 = 20
product_10111_stock3 = 20
product_10111_stock4 = 20
product_10111_stock5 = 20
六、分布式锁的真相与选择
6.1 分布式锁的真相
互斥:不同线程、进程互斥。
超时机制:临界区代码耗时导致,网络原因导致。可以使用额外的线程续命保证。
完备的锁接口:阻塞的和非阻塞的接口都要有,lock和tryLock。
可重入性:当前请求的节点+ 线程唯一标识。
公平性:锁唤醒时候,按照顺序唤醒。
正确性:进程内的锁不会因为报错死锁,因为崩溃的时候整个进程都会结束。但是多实例部署时死锁就很容易发生,如果粗暴使用超时机制解决死锁问题,就默认了下面这个假设:
锁的超时时间 >> 获取锁的时延 + 执行临界区代码的时间 + 各种进程的暂停(比如 GC)
但上述假设其实无法保证的。
6.2 分布式锁的选择
数据库:db操作性能较差,并且有锁表的风险,一般不考虑。
优点:实现简单、易于理解
缺点:对数据库压力大
Redis:适用于并发量很大、性能要求很高而可靠性问题可以通过其他方案去弥补的场景。
优点:易于理解
缺点:自己实现、不支持阻塞
Redisson:相对于Jedis其实更多用在分布式的场景。
优点:提供锁的方法,可阻塞
Zookeeper:适用于高可靠(高可用),而并发量不是太高的场景。
优点:支持阻塞
缺点:需理解Zookeeper、程序复杂
Curator
优点:提供锁的方法
缺点:Zookeeper,强一致,慢
Etcd:安全和可靠性上有保证,但是比较重。
欢迎加入【阿里云开发者公众号】读者群
微信扫码关注该文公众号作者