第26篇:聊聊如何使用MySQL实现分布式锁

star2017 1年前 ⋅ 525 阅读

Mysql系列的目标是:通过这个系列从入门到全面掌握一个高级开发所需要的全部技能。

欢迎大家加我微信itsoku一起交流java、算法、数据库相关技术。

这是Mysql系列第26篇。

本篇我们使用mysql实现一个分布式锁。

分布式锁的功能

  1. 分布式锁使用者位于不同的机器中,锁获取成功之后,才可以对共享资源进行操作
  2. 锁具有重入的功能:即一个使用者可以多次获取某个锁
  3. 获取锁有超时的功能:即在指定的时间内去尝试获取锁,超过了超时时间,如果还未获取成功,则返回获取失败
  4. 能够自动容错,比如:A机器获取锁lock1之后,在释放锁lock1之前,A机器挂了,导致锁lock1未释放,结果会lock1一直被A机器占有着,遇到这种情况时,分布式锁要能够自动解决,可以这么做:持有锁的时候可以加个持有超时时间,超过了这个时间还未释放的,其他机器将有机会获取锁

预备技能:乐观锁

通常我们修改表中一条数据过程如下:

  1. t1select获取记录R1
  2. t2:对R1进行编辑
  3. t3update R1

我们来看一下上面的过程存在的问题:

如果A、B两个线程同时执行到t1,他们俩看到的R1的数据一样,然后都对R1进行编辑,然后去执行t3,最终2个线程都会更新成功,后面一个线程会把前面一个线程update的结果给覆盖掉,这就是并发修改数据存在的问题。

我们可以在表中新增一个版本号,每次更新数据时候将版本号作为条件,并且每次更新时候版本号+1,过程优化一下,如下:

  1. t1:打开事务start transaction
  2. t2select获取记录R1,声明变量v=R1.version
  3. t3:对R1进行编辑
  4. t4:执行更新操作
  5. update R1 set version = version + 1 where user_id=#user_id# and version = #v#;
  6. t5t4中的update会返回影响的行数,我们将其记录在count中,然后根据count来判断提交还是回滚
  7. if(count==1){
  8. //提交事务
  9. commit;
  10. }else{
  11. //回滚事务
  12. rollback;
  13. }

上面重点在于步骤t4,当多个线程同时执行到t1,他们看到的R1是一样的,但是当他们执行到t4的时候,数据库会对update的这行记录加锁,确保并发情况下排队执行,所以只有第一个的update会返回1,其他的update结果会返回0,然后后面会判断count是否为1,进而对事务进行提交或者回滚。可以通过count的值知道修改数据是否成功了。

上面这种方式就乐观锁。我们可以通过乐观锁的方式确保数据并发修改过程中的正确性。

使用mysql实现分布式锁

建表

我们创建一个分布式锁表,如下

  1. DROP DATABASE IF EXISTS javacode2018;
  2. CREATE DATABASE javacode2018;
  3. USE javacode2018;
  4. DROP TABLE IF EXISTS t_lock;
  5. create table t_lock(
  6. lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '锁唯一标志',
  7. request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用来标识请求对象的',
  8. lock_count INT NOT NULL DEFAULT 0 COMMENT '当前上锁次数',
  9. timeout BIGINT NOT NULL DEFAULT 0 COMMENT '锁超时时间',
  10. version INT NOT NULL DEFAULT 0 COMMENT '版本号,每次更新+1'
  11. )COMMENT '锁信息表';

分布式锁工具类:

  1. package com.itsoku.sql;
  2. import lombok.Builder;
  3. import lombok.Getter;
  4. import lombok.Setter;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.junit.Test;
  7. import java.sql.*;
  8. import java.util.Objects;
  9. import java.util.UUID;
  10. import java.util.concurrent.TimeUnit;
  11. /**
  12. * 工作10年的前阿里P7分享Java、算法、数据库方面的技术干货!坚信用技术改变命运,让家人过上更体面的生活!
  13. * 喜欢的请关注公众号:路人甲Java
  14. */
  15. @Slf4j
  16. public class LockUtils {
  17. //将requestid保存在该变量中
  18. static ThreadLocal<String> requestIdTL = new ThreadLocal<>();
  19. /**
  20. * 获取当前线程requestid
  21. *
  22. * @return
  23. */
  24. public static String getRequestId() {
  25. String requestId = requestIdTL.get();
  26. if (requestId == null || "".equals(requestId)) {
  27. requestId = UUID.randomUUID().toString();
  28. requestIdTL.set(requestId);
  29. }
  30. log.info("requestId:{}", requestId);
  31. return requestId;
  32. }
  33. /**
  34. * 获取锁
  35. *
  36. * @param lock_key 锁key
  37. * @param locktimeout(毫秒) 持有锁的有效时间,防止死锁
  38. * @param gettimeout(毫秒) 获取锁的超时时间,这个时间内获取不到将重试
  39. * @return
  40. */
  41. public static boolean lock(String lock_key, long locktimeout, int gettimeout) throws Exception {
  42. log.info("start");
  43. boolean lockResult = false;
  44. String request_id = getRequestId();
  45. long starttime = System.currentTimeMillis();
  46. while (true) {
  47. LockModel lockModel = LockUtils.get(lock_key);
  48. if (Objects.isNull(lockModel)) {
  49. //插入一条记录,重新尝试获取锁
  50. LockUtils.insert(LockModel.builder().lock_key(lock_key).request_id("").lock_count(0).timeout(0L).version(0).build());
  51. } else {
  52. String reqid = lockModel.getRequest_id();
  53. //如果reqid为空字符,表示锁未被占用
  54. if ("".equals(reqid)) {
  55. lockModel.setRequest_id(request_id);
  56. lockModel.setLock_count(1);
  57. lockModel.setTimeout(System.currentTimeMillis() + locktimeout);
  58. if (LockUtils.update(lockModel) == 1) {
  59. lockResult = true;
  60. break;
  61. }
  62. } else if (request_id.equals(reqid)) {
  63. //如果request_id和表中request_id一样表示锁被当前线程持有者,此时需要加重入锁
  64. lockModel.setTimeout(System.currentTimeMillis() + locktimeout);
  65. lockModel.setLock_count(lockModel.getLock_count() + 1);
  66. if (LockUtils.update(lockModel) == 1) {
  67. lockResult = true;
  68. break;
  69. }
  70. } else {
  71. //锁不是自己的,并且已经超时了,则重置锁,继续重试
  72. if (lockModel.getTimeout() < System.currentTimeMillis()) {
  73. LockUtils.resetLock(lockModel);
  74. } else {
  75. //如果未超时,休眠100毫秒,继续重试
  76. if (starttime + gettimeout > System.currentTimeMillis()) {
  77. TimeUnit.MILLISECONDS.sleep(100);
  78. } else {
  79. break;
  80. }
  81. }
  82. }
  83. }
  84. }
  85. log.info("end");
  86. return lockResult;
  87. }
  88. /**
  89. * 释放锁
  90. *
  91. * @param lock_key
  92. * @throws Exception
  93. */
  94. public static void unlock(String lock_key) throws Exception {
  95. //获取当前线程requestId
  96. String requestId = getRequestId();
  97. LockModel lockModel = LockUtils.get(lock_key);
  98. //当前线程requestId和库中request_id一致 && lock_count>0,表示可以释放锁
  99. if (Objects.nonNull(lockModel) && requestId.equals(lockModel.getRequest_id()) && lockModel.getLock_count() > 0) {
  100. if (lockModel.getLock_count() == 1) {
  101. //重置锁
  102. resetLock(lockModel);
  103. } else {
  104. lockModel.setLock_count(lockModel.getLock_count() - 1);
  105. LockUtils.update(lockModel);
  106. }
  107. }
  108. }
  109. /**
  110. * 重置锁
  111. *
  112. * @param lockModel
  113. * @return
  114. * @throws Exception
  115. */
  116. public static int resetLock(LockModel lockModel) throws Exception {
  117. lockModel.setRequest_id("");
  118. lockModel.setLock_count(0);
  119. lockModel.setTimeout(0L);
  120. return LockUtils.update(lockModel);
  121. }
  122. /**
  123. * 更新lockModel信息,内部采用乐观锁来更新
  124. *
  125. * @param lockModel
  126. * @return
  127. * @throws Exception
  128. */
  129. public static int update(LockModel lockModel) throws Exception {
  130. return exec(conn -> {
  131. String sql = "UPDATE t_lock SET request_id = ?,lock_count = ?,timeout = ?,version = version + 1 WHERE lock_key = ? AND version = ?";
  132. PreparedStatement ps = conn.prepareStatement(sql);
  133. int colIndex = 1;
  134. ps.setString(colIndex++, lockModel.getRequest_id());
  135. ps.setInt(colIndex++, lockModel.getLock_count());
  136. ps.setLong(colIndex++, lockModel.getTimeout());
  137. ps.setString(colIndex++, lockModel.getLock_key());
  138. ps.setInt(colIndex++, lockModel.getVersion());
  139. return ps.executeUpdate();
  140. });
  141. }
  142. public static LockModel get(String lock_key) throws Exception {
  143. return exec(conn -> {
  144. String sql = "select * from t_lock t WHERE t.lock_key=?";
  145. PreparedStatement ps = conn.prepareStatement(sql);
  146. int colIndex = 1;
  147. ps.setString(colIndex++, lock_key);
  148. ResultSet rs = ps.executeQuery();
  149. if (rs.next()) {
  150. return LockModel.builder().
  151. lock_key(lock_key).
  152. request_id(rs.getString("request_id")).
  153. lock_count(rs.getInt("lock_count")).
  154. timeout(rs.getLong("timeout")).
  155. version(rs.getInt("version")).build();
  156. }
  157. return null;
  158. });
  159. }
  160. public static int insert(LockModel lockModel) throws Exception {
  161. return exec(conn -> {
  162. String sql = "insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?)";
  163. PreparedStatement ps = conn.prepareStatement(sql);
  164. int colIndex = 1;
  165. ps.setString(colIndex++, lockModel.getLock_key());
  166. ps.setString(colIndex++, lockModel.getRequest_id());
  167. ps.setInt(colIndex++, lockModel.getLock_count());
  168. ps.setLong(colIndex++, lockModel.getTimeout());
  169. ps.setInt(colIndex++, lockModel.getVersion());
  170. return ps.executeUpdate();
  171. });
  172. }
  173. public static <T> T exec(SqlExec<T> sqlExec) throws Exception {
  174. Connection conn = getConn();
  175. try {
  176. return sqlExec.exec(conn);
  177. } finally {
  178. closeConn(conn);
  179. }
  180. }
  181. @FunctionalInterface
  182. public interface SqlExec<T> {
  183. T exec(Connection conn) throws Exception;
  184. }
  185. @Getter
  186. @Setter
  187. @Builder
  188. public static class LockModel {
  189. private String lock_key;
  190. private String request_id;
  191. private Integer lock_count;
  192. private Long timeout;
  193. private Integer version;
  194. }
  195. private static final String url = "jdbc:mysql://localhost:3306/javacode2018?useSSL=false"; //数据库地址
  196. private static final String username = "root"; //数据库用户名
  197. private static final String password = "root123"; //数据库密码
  198. private static final String driver = "com.mysql.jdbc.Driver"; //mysql驱动
  199. /**
  200. * 连接数据库
  201. *
  202. * @return
  203. */
  204. public static Connection getConn() {
  205. Connection conn = null;
  206. try {
  207. Class.forName(driver); //加载数据库驱动
  208. try {
  209. conn = DriverManager.getConnection(url, username, password); //连接数据库
  210. } catch (SQLException e) {
  211. e.printStackTrace();
  212. }
  213. } catch (ClassNotFoundException e) {
  214. e.printStackTrace();
  215. }
  216. return conn;
  217. }
  218. /**
  219. * 关闭数据库链接
  220. *
  221. * @return
  222. */
  223. public static void closeConn(Connection conn) {
  224. if (conn != null) {
  225. try {
  226. conn.close(); //关闭数据库链接
  227. } catch (SQLException e) {
  228. e.printStackTrace();
  229. }
  230. }
  231. }
  232. }

上面代码中实现了文章开头列的分布式锁的所有功能,大家可以认真研究下获取锁的方法:lock,释放锁的方法:unlock

测试用例

  1. package com.itsoku.sql;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.junit.Test;
  4. import static com.itsoku.sql.LockUtils.lock;
  5. import static com.itsoku.sql.LockUtils.unlock;
  6. /**
  7. * 工作10年的前阿里P7分享Java、算法、数据库方面的技术干货!坚信用技术改变命运,让家人过上更体面的生活!
  8. * 喜欢的请关注公众号:路人甲Java
  9. */
  10. @Slf4j
  11. public class LockUtilsTest {
  12. //测试重复获取和重复释放
  13. @Test
  14. public void test1() throws Exception {
  15. String lock_key = "key1";
  16. for (int i = 0; i < 10; i++) {
  17. lock(lock_key, 10000L, 1000);
  18. }
  19. for (int i = 0; i < 9; i++) {
  20. unlock(lock_key);
  21. }
  22. }
  23. //获取之后不释放,超时之后被thread1获取
  24. @Test
  25. public void test2() throws Exception {
  26. String lock_key = "key2";
  27. lock(lock_key, 5000L, 1000);
  28. Thread thread1 = new Thread(() -> {
  29. try {
  30. try {
  31. lock(lock_key, 5000L, 7000);
  32. } finally {
  33. unlock(lock_key);
  34. }
  35. } catch (Exception e) {
  36. e.printStackTrace();
  37. }
  38. });
  39. thread1.setName("thread1");
  40. thread1.start();
  41. thread1.join();
  42. }
  43. }

test1方法测试了重入锁的效果。

test2测试了主线程获取锁之后一直未释放,持有锁超时之后被thread1获取到了。

留给大家一个问题

上面分布式锁还需要考虑一个问题:比如A机会获取了key1的锁,并设置持有锁的超时时间为10秒,但是获取锁之后,执行了一段业务操作,业务操作耗时超过10秒了,此时机器B去获取锁时可以获取成功的,此时会导致A、B两个机器都获取锁成功了,都在执行业务操作,这种情况应该怎么处理?大家可以思考一下然后留言,我们一起讨论一下。

最新资料

更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: