1、分布式锁的实现方式

大概有三种:

  1. 基于关系型数据库(基于mysql数据库的分布式锁
  2. 基于缓存(基于redis的redisson实现分布式锁
  3. 基于zookeeper(本文讲解了基于zookeeper的分布式锁)

2、安装zookeeper

安装教程请参考:linux下zookeeper集群安装,有集群和单机安装配置。

3、springboot集成实现

3.1、pom引入

        <!-- zookeeper 客户端 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

3.2、zookeeper的yml配置

zooClientIp: 127.0.0.1:2181

3.3、CuratorFrameworkConfig基本配置类

package com.example.mybatiesplus.config;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.WatchedEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

/**
 * @DESCRIPTION zookeeper配置
 * @Author lst
 * @Date 2020-06-04 15:00
 */
@Configuration
public class CuratorFrameworkConfig {

    @Autowired
    private Environment env;

    @Bean
    public CuratorFramework curatorFramework(){
        // ExponentialBackoffRetry是种重连策略,每次重连的间隔会越来越长,1000毫秒是初始化的间隔时间,3代表尝试重连次数。
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);
        // 创建client
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(env.getProperty("zooClientIp"), retry);
        // 添加watched 监听器
        curatorFramework.getCuratorListenable().addListener(new MyCuratorListener(){

        });
        curatorFramework.start();
        return curatorFramework;
    }


    public class MyCuratorListener implements CuratorListener {
        @Override
        public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
            CuratorEventType type = event.getType();
            if(type == CuratorEventType.WATCHED){
                WatchedEvent watchedEvent = event.getWatchedEvent();
                String path = watchedEvent.getPath();
                System.out.println(watchedEvent.getType()+" -- "+ path);
                // 重新设置改节点监听
                if(null != path){
                    client.checkExists().watched().forPath(path);
                }
            }
        }
    }

}

3.3、接下来,写一个测试类

package com.example.mybatiesplus.controller;

import com.example.mybatiesplus.result.BaseResponse;
import com.example.mybatiesplus.result.ResultGenerator;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @DESCRIPTION 测试类
 * @Author lst
 * @Date 2020-05-24
 */
@RestController
@RequestMapping("/test")
@Api(value = "TestController", tags = "测试类")
@Slf4j
public class TestController {


    @Autowired
    private CuratorFramework curatorFramework;

    private String lockPath = "/lock/test/";

    /**
      * 通过zookeeper分布式锁高并发测试
      * @author lst
      * @date 2020-6-4 14:53
      * @return com.example.mybatiesplus.result.BaseResponse
     */
    @GetMapping(value = "/zookpTest", produces = "application/json; charset=utf-8")
    @ApiOperation(value = "通过zookeeper分布式锁高并发测试", notes = "通过zookeeper分布式锁高并发测试", code = 200, produces = "application/json")
    public BaseResponse zookpTest() {
        String lockName = lockPath + UUID.randomUUID().toString();
        log.info("============={} 线程访问开始=========lockName:{}",Thread.currentThread().getName(),lockName);
        //TODO 获取分布式锁
        InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex (curatorFramework, lockName);
        try{
            //获取锁资源
            boolean flag = lock.acquire(10, TimeUnit.HOURS);
            if(flag){
                log.info("线程:{},获取到了锁",Thread.currentThread().getName());
                //TODO 获得锁之后可以进行相应的处理  睡一会
                Thread.sleep(500);
                log.info("======获得锁后进行相应的操作======" + Thread.currentThread().getName());
            }
        }catch (Exception e){
            log.info("错误信息:{}",e.getMessage());
        }finally {
            try {
                lock.release();
                log.info("=========lockName:{}==============={}释放了锁",lockName,Thread.currentThread().getName());
            } catch (Exception e) {
                log.info("错误信息:{}",e.getMessage());
            }
        }
        return ResultGenerator.genSuccessResult();
    }


}

3.4、使用jmeter测试(添加http请求,设置并发数(先测试100))

3.5、测试数据

只要某个抢到锁的线程执行完毕并且释放了锁资源,其他的线程很快就会获取到锁。

3.6、zookeeper-dev-ZooInspector客户端查看

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐