Redission分布式工具的使用

前言

记得上一次使用Redis的时候,还是在自己做的毕业设计项目上,当时抱着学习的目的,简单地做了查询的缓存处理。当然,在现在看来,小项目的简单查询根本用不到Redis来作为缓存中间件。而在实际的工作中,Rediss的使用,一般都用来缓存一些加载或者查询比较费时的、实时性要求比较低的数据,以提高web应用接口响应速度,进而提升用户体验。最近了解到Redission分布式工具,于是便有了这篇文章,记录下自己的使用过程。

什么是Redisson

以下是 Redisson官方 的一段描述

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

SpringBoot项目集成Redission分布式工具

集成Redission,一般集成SpringBoot项目模块,我都会在 maven官方仓库 搜索对应的依赖包。目前仓库上使用的最多的依赖包有两个。

依赖的选择

一个是Redisson原生依赖

<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.17.7</version>
</dependency>

一个是SpringBoot官方提供的Starter包

<!-- https://mvnrepository.com/artifact/org.redisson/redisson-spring-boot-starter -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.17.7</version>
</dependency>

其实两个依赖包使用起来没多大区别,只是SpringBoot官方提供的Start包对spring应用有更好的支持。具体选择哪个,就看自己喜欢了。

版本的选择

Redisson的版本需要与SpringBoot的版本相对应,不然容易出现一些兼容性的问题。如果使用starter包引入的方式,我们可以参考 redisson-spring-boot-starter官方 给出的对应版本关系,选择对应的版本就好了。以下来自官方给出的版本对应关系。

redisson-spring-data
module name
Spring Boot
version
redisson-spring-data-161.3.y
redisson-spring-data-171.4.y
redisson-spring-data-181.5.y
redisson-spring-data-2x2.x.y

查看自己引入的redisson-spring-boot-starter版本号是否与SpringBoot对应。 查看引入的版本号

配置Redisson

配置Redisson可以有多种方式,如通过Config对象读取文件yaml文件等。而实际使用的最多的应该是集成在SpringBoot中,在分布式场景下,就可以统一在配置中心进行修改,根据不同环境的Redis部署方式,配置不同的Redisson配置。

使用通用的SpringBoot配置

spring:
  redis:
    database: 
    host:
    port:
    password:
    ssl: 
    timeout:
    cluster:
      nodes:
    sentinel:
      master:
      nodes:

使用Redisson独立的配置

集群模式
spring:
  redis:
   redisson: 
      file: classpath:redisson.yaml #这里可以使用外部文件,也可以直接在config中配置
      config: |
        clusterServersConfig:
            idleConnectionTimeout: 10000
            connectTimeout: 10000
            timeout: 3000
            retryAttempts: 3
            retryInterval: 1500
            password: null
            subscriptionsPerConnection: 5
            clientName: null
            loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
            slaveSubscriptionConnectionMinimumIdleSize: 1
            slaveSubscriptionConnectionPoolSize: 50
            slaveConnectionMinimumIdleSize: 32
            slaveConnectionPoolSize: 64
            masterConnectionMinimumIdleSize: 32
            masterConnectionPoolSize: 64
            readMode: "SLAVE"
            nodeAddresses:
            - "redis://127.0.0.1:7004"
            - "redis://127.0.0.1:7001"
            - "redis://127.0.0.1:7000"
            scanInterval: 1000
        threads: 0
        nettyThreads: 0
        codec: !<org.redisson.codec.JsonJacksonCodec> {}
        transportMode: "NIO"        
单Redis节点模式
spring:
  redis:
   redisson: 
      config: |
        singleServerConfig:
            idleConnectionTimeout: 10000
            connectTimeout: 10000
            timeout: 3000
            retryAttempts: 3
            retryInterval: 1500
            password: null
            subscriptionsPerConnection: 5
            clientName: null
            address: "redis://127.0.0.1:6379"
            subscriptionConnectionMinimumIdleSize: 1
            subscriptionConnectionPoolSize: 50
            connectionMinimumIdleSize: 32
            connectionPoolSize: 64
            database: 0
            dnsMonitoringInterval: 5000
        threads: 0
        nettyThreads: 0
        codec: !<org.redisson.codec.JsonJacksonCodec> {}
        transportMode: "NIO"        
哨兵模式
spring:
  redis:
   redisson: 
      config: |
        sentinelServersConfig:
            idleConnectionTimeout: 10000
            connectTimeout: 10000
            timeout: 3000
            retryAttempts: 3
            retryInterval: 1500
            password: null
            subscriptionsPerConnection: 5
            clientName: null
            loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
            slaveSubscriptionConnectionMinimumIdleSize: 1
            slaveSubscriptionConnectionPoolSize: 50
            slaveConnectionMinimumIdleSize: 32
            slaveConnectionPoolSize: 64
            masterConnectionMinimumIdleSize: 32
            masterConnectionPoolSize: 64
            readMode: "SLAVE"
            sentinelAddresses:
            - "redis://127.0.0.1:26379"
            - "redis://127.0.0.1:26389"
            masterName: "mymaster"
            database: 0
        threads: 0
        nettyThreads: 0
        codec: !<org.redisson.codec.JsonJacksonCodec> {}
        transportMode: "NIO"        
主从模式
spring:
  redis:
   redisson: 
      config: |
        masterSlaveServersConfig:
            idleConnectionTimeout: 10000
            connectTimeout: 10000
            timeout: 3000
            retryAttempts: 3
            retryInterval: 1500
            failedAttempts: 3
            password: null
            subscriptionsPerConnection: 5
            clientName: null
            loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
            slaveSubscriptionConnectionMinimumIdleSize: 1
            slaveSubscriptionConnectionPoolSize: 50
            slaveConnectionMinimumIdleSize: 32
            slaveConnectionPoolSize: 64
            masterConnectionMinimumIdleSize: 32
            masterConnectionPoolSize: 64
            readMode: "SLAVE"
            slaveAddresses:
            - "redis://127.0.0.1:6381"
            - "redis://127.0.0.1:6380"
            masterAddress: "redis://127.0.0.1:6379"
            database: 0
        threads: 0
        nettyThreads: 0
        codec: !<org.redisson.codec.JsonJacksonCodec> {}
        transportMode: "NIO"        

主流的配置就是上面的几种了, 各参数的含义可以参考具体的 官方说明

验证配置是否成功

编写测试类验证Redisson是否可用

@SpringBootTest
@Slf4j
public class RedissonApplicationTest {

    @Resource
    RedissonClient redissonClient;

    @Test
    public void TestRedissonClient() throws IOException {
        RSet<Object> set = redissonClient.getSet("teoan");
        set.add("test");
    }
}

查看Redis中的数据

可以看到redis中已经存在对应的Set集合数据,Redisson配置到这里就完了.

使用Redisson

分布式对象

Redisson提供的分布式对象有多个,如通用对象桶(Object Bucket)、二进制流(Binary Stream)、地理空间对象桶(Geospatial Bucket)、BitSet 原子整长形(AtomicLong)、原子双精度浮点(AtomicDouble)、话题(订阅分发)、布隆过滤器(Bloom Filter)等,每个Redisson对象实例都会有一个与之对应的Redis数据实例,可以通过调用getName方法来取得Redis数据实例的名称(key)。所有与Redis key相关的操作都归纳在RKeys这个接口,他们的使用方式具体可以 参考官方wiki ,其中我比较感兴趣的是这个话题对象,它可以实现类似于订阅分发的功能,和消息队列的思想差不多,我就以话题对象为例子,玩一下这个分布式对象。

话题(订阅分发)

@SpringBootTest
@Slf4j
public class RedissonApplicationTest {

    @Resource
    RedissonClient redissonClient;
    @Resource
    Executor execute;

    @Test
    public void TestRedissonClient() {
        RTopic topic = redissonClient.getTopic("Teoan");
        topic.addListener(String.class,(channel, message)->{
           log.info("鸡汤来咯,看看鸡汤里面是什么[{}]",message);
        });

        // 在其他线程或JVM节点
        execute.execute(()->{
            long clientsReceivedMessage = topic.publish("毒药");
            log.info("clientsReceivedMessage:{}",clientsReceivedMessage);
        });

    }
}

执行结果

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.7.5)

2022-11-13 11:43:16.388  INFO 12056 --- [           main] c.t.l.test.RedissonApplicationTest       : Starting RedissonApplicationTest using Java 13.0.2 on Teoan-Desktop with PID 12056 (started by Teoan in F:\Linux\git_clone\TeoanStudy\Redisson)
2022-11-13 11:43:16.389  INFO 12056 --- [           main] c.t.l.test.RedissonApplicationTest       : No active profile set, falling back to 1 default profile: "default"
2022-11-13 11:43:19.326  INFO 12056 --- [           main] .s.d.r.c.RepositoryConfigurationDelegate : Multiple Spring Data modules found, entering strict repository configuration mode
2022-11-13 11:43:19.329  INFO 12056 --- [           main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Redis repositories in DEFAULT mode.
2022-11-13 11:43:19.461  INFO 12056 --- [           main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 34 ms. Found 0 Redis repository interfaces.
2022-11-13 11:43:23.770  INFO 12056 --- [           main] org.redisson.Version                     : Redisson 3.18.0
2022-11-13 11:43:26.451  INFO 12056 --- [isson-netty-2-7] o.r.c.pool.MasterPubSubConnectionPool    : 1 connections initialized for 127.0.0.1/127.0.0.1:6379
2022-11-13 11:43:26.559  INFO 12056 --- [isson-netty-2-3] o.r.c.pool.MasterConnectionPool          : 32 connections initialized for 127.0.0.1/127.0.0.1:6379
2022-11-13 11:43:27.529  INFO 12056 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 1 endpoint(s) beneath base path '/actuator'
2022-11-13 11:43:27.600  INFO 12056 --- [           main] c.t.l.test.RedissonApplicationTest       : Started RedissonApplicationTest in 12.609 seconds (JVM running for 18.62)
2022-11-13 11:43:28.821  INFO 12056 --- [         task-1] c.t.l.test.RedissonApplicationTest       : clientsReceivedMessage:1
2022-11-13 11:43:28.822  INFO 12056 --- [   redisson-3-2] c.t.l.test.RedissonApplicationTest       : 鸡汤来咯,看看鸡汤里面是什么[毒药]

进程已结束,退出代码0

限流器(RateLimiter)

简单讲一下限流器的用途,限流器用于限制总并发数,比如数据库连接池、线程池等。Redisson中的RateLimiter和Guava RateLimiter一样,也是使用令牌桶的限流算法,而不同的是,Redisson中的RateLimiter可以用来在分布式环境下现在请求方的调用频率。既适用于不同Redisson实例下的多线程限流,也适用于相同Redisson实例下的多线程限流。。算法逻辑大致为:令牌桶以固定的速率向桶中加入新的令牌,线程的请求会从桶里拿走一定数量的令牌,只要通中存在满足请求数量的令牌,请求就会被处理。比如我设置限流器一秒生成10个令牌,每个请求拿5个令牌,那么结果就是一秒最多处理2个请求。

@SpringBootTest
@Slf4j
public class RedissonApplicationTest {

    @Resource
    RedissonClient redissonClient;
    @Resource
    Executor execute;

    @Test
    public void TestRedissonRateLimiter() {
        RRateLimiter rateLimiter = redissonClient.getRateLimiter("myRateLimiter");
        // 初始化
        // 最大流速 = 每1秒钟产生10个令牌
        rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);

        // 10个线程并发
        for (int i = 0; i < 10; i++) {
            execute.execute(()->{
                RRateLimiter limiter = redissonClient.getRateLimiter("myRateLimiter");
                // 每次获取5个令牌
                if(limiter.tryAcquire(5)){
                    log.info("get 5 tokens success");
                    // ...
                }else {
                    log.info("get 5 tokens fail");
                }
            });
        }
    }
}
2022-11-13 18:02:49.213  INFO 14872 --- [         task-1] c.t.l.test.RedissonApplicationTest       : get 5 tokens success
2022-11-13 18:02:49.213  INFO 14872 --- [         task-3] c.t.l.test.RedissonApplicationTest       : get 5 tokens success
2022-11-13 18:02:49.214  INFO 14872 --- [         task-2] c.t.l.test.RedissonApplicationTest       : get 5 tokens fail
2022-11-13 18:02:49.215  INFO 14872 --- [         task-5] c.t.l.test.RedissonApplicationTest       : get 5 tokens fail
2022-11-13 18:02:49.215  INFO 14872 --- [         task-6] c.t.l.test.RedissonApplicationTest       : get 5 tokens fail
2022-11-13 18:02:49.216  INFO 14872 --- [         task-4] c.t.l.test.RedissonApplicationTest       : get 5 tokens fail
2022-11-13 18:02:49.217  INFO 14872 --- [         task-3] c.t.l.test.RedissonApplicationTest       : get 5 tokens fail
2022-11-13 18:02:49.219  INFO 14872 --- [         task-8] c.t.l.test.RedissonApplicationTest       : get 5 tokens fail
2022-11-13 18:02:49.223  INFO 14872 --- [         task-7] c.t.l.test.RedissonApplicationTest       : get 5 tokens fail
2022-11-13 18:02:49.225  INFO 14872 --- [         task-1] c.t.l.test.RedissonApplicationTest       : get 5 tokens fail

分布式集合

在Redisson提供的的集合类中,除了JDK中常见的Map,List,Set,还有一些在此基础上引申出来比较特殊集合对象,如映射缓存(RMapCache)、多值映射(RMultimap)、有序集(SortedSet)、计分排序集(ScoredSortedSet)等,除此之外,还有各种类型的队列对象,如双端队列(Deque)、阻塞队列(Blocking Queue)、有界阻塞队列(Bounded Blocking Queue)等。

映射缓存(RMapCache)

RMapCache和RMap的区别在于多了一个元素淘汰功能,可以自定义Map中元素的有效时间最长闲置时间

@SpringBootTest
@Slf4j
public class RedissonApplicationTest {

    @Resource
    RedissonClient redissonClient;

    @Test
    public void TestRMapCache(){
        RMapCache<String, Object> map = redissonClient.getMapCache("anyMap");
        // 有效时间 ttl = 10分钟
        map.put("key1", 1, 10, TimeUnit.MINUTES);
        // 有效时间 ttl = 10分钟, 最长闲置时间 maxIdleTime = 10秒钟
        map.put("key1", 2, 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);

        // 有效时间 = 3 秒钟
        map.putIfAbsent("key2", 3, 3, TimeUnit.SECONDS);
        // 有效时间 ttl = 40秒钟, 最长闲置时间 maxIdleTime = 10秒钟
        map.putIfAbsent("key2", 4, 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
    }
}

多值映射(RMultimap)

多值映射,基于Redisson的RMultimap对象允许Map中的一个字段值包含多个元素。

@SpringBootTest
@Slf4j
public class RedissonApplicationTest {

    @Resource
    RedissonClient redissonClient;
    
    @Resource
    ObjectMapper objectMapper;

    @Test
    public void TestRMultimap() throws JsonProcessingException(){
        RSetMultimap<Object, Object> myMultimap = redissonClient.getSetMultimap("myMultimap");
        myMultimap.put("key1","value1");
        myMultimap.put("key1","value2");
        myMultimap.put("key1","value3");
        myMultimap.put("key2","value1");
        myMultimap.put("key2","value2");

        Set<String> newValue1 = Set.of("newValue1", "newValue2", "newValue3");
        Set<Object> oldValues = myMultimap.replaceValues("key1", newValue1);
        log.info("oldValues:[{}]",objectMapper.writeValueAsString(oldValues));
        Set<Object> removeValues = myMultimap.removeAll("key2");
        log.info("removeValues:[{}]",objectMapper.writeValueAsString(removeValues));
    }
}
2022-11-20 16:59:33.540  INFO 28632 --- [           main] c.t.l.test.RedissonApplicationTest       : Started RedissonApplicationTest in 7.357 seconds (JVM running for 8.443)
2022-11-20 16:59:34.216  INFO 28632 --- [           main] c.t.l.test.RedissonApplicationTest       : oldValues:[["value3","value2","value1"]]
2022-11-20 16:59:34.220  INFO 28632 --- [           main] c.t.l.test.RedissonApplicationTest       : removeValues:[["value2","value1"]]

分布式锁

Redisson还提供了各种分布式锁,如可重入锁、公平锁、联锁、红锁等,这些锁对象也是完全符合Java的Lock规范,而且还提供异步(Async)、反射式(Reactive)和RxJava2标准的接口。

可重入锁

可重入锁,就是当前线程可以重复进入的锁,其他线程获取不到锁,则会阻塞。

@SpringBootTest
@Slf4j
public class RedissonApplicationTest {

    @Resource
    RedissonClient redissonClient;
    
    @Resource
    ObjectMapper objectMapper;
    
    @Resource
    Executor execute;

    @Test
    public void TestRLock() throws InterruptedException {
        RLock lock = redissonClient.getLock("lock");
        //另外的线程拿锁
        execute.execute(()->{
            // 与主线程获取同一个锁
            RLock rLock = redissonClient.getLock("lock");
            // 加锁以后10秒钟自动解锁
            rLock.lock(10,TimeUnit.SECONDS);
            log.info("其他线程拿到锁啦");
        });

        // 10秒内拿不到锁 会阻塞
        lock.lock(5, TimeUnit.SECONDS);
        log.info("主线程拿到锁啦");
        lock.unlock();
    }
}
2022-12-20 21:48:09.200  INFO 2587 --- [         task-1] c.t.l.test.RedissonApplicationTest       : 其他线程拿到锁啦
2022-12-20 21:48:19.190  INFO 2587 --- [           main] c.t.l.test.RedissonApplicationTest       : 主线程拿到锁啦

信号量

基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。个人理解这个信号量有点像上文提到的限流器,执行 acquire 方法阻塞,直到有对应数量许可证可以获得然后拿走许可证,每个 release 方法增加若干个许可证。这个许可证和限流器的令牌,其实是很类似的。所以感觉也可以用信号量来实现限流器的功能。

@SpringBootTest
@Slf4j
public class RedissonApplicationTest {

    @Resource
    RedissonClient redissonClient;
    
    @Resource
    ObjectMapper objectMapper;
    
    @Resource
    Executor execute;

    @Test
    public void TestSemaphore() throws InterruptedException {
        // 一共设置量5个信号量 子线程先获取3个
        execute.execute(()->{
            try {
            // 需提前在redis中设置key为semaphore 值为对应信号量
            RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
            log.info("子线程可用的信号量:[{}]",semaphore.availablePermits());
            semaphore.acquire(4);
            // 两秒后释放
            Thread.sleep(2000);
            semaphore.release(4);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        Thread.sleep(1000);
        RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
        log.info("主线程可用的信号量:[{}]",semaphore.availablePermits());
        // 主线程获取不到对应数量的信号量 会阻塞
        semaphore.acquire(3);
        log.info("主线程获取到对应数量信号量啦");
        semaphore.release(3);
    }
}
2022-12-25 16:59:41.013  INFO 10324 --- [         task-1] c.t.l.test.RedissonApplicationTest       : 子线程可用的信号量:[5]
2022-12-25 16:59:42.024  INFO 10324 --- [           main] c.t.l.test.RedissonApplicationTest       : 主线程可用的信号量:[1]
2022-12-25 16:59:43.233  INFO 10324 --- [           main] c.t.l.test.RedissonApplicationTest       : 主线程获取到对应数量信号量啦

总结

记录了下自己使用Redission的过程,如果你也正在入门使用Redission,那希望这篇文章对你有所帮助。文章内容大部分参考官方文档,官方文档yyds,嘻嘻。 通过自己手敲几个入门使用的案例,谈谈一下我的个人感受吧,Redis作为中间件应用的存在,也就说明它能够被多个服务应用所共用数据。而Redission正是利用了这一点,它将Java的集合数据结构Map、List、Set等用Redis对应的数据结构Hash、List、Set进行封装。在使用上,这些封装的集合对象和普通的Java集合对象并没有什么区别。在具体的存储实现上,一个是存在于JVM的内存中,另一个则是存到了Redis上对应的数据结构上。由于数据都存在于Redis中,那么对于多个实例的应用来说,这些数据是共用的,在高并发场景下,配合分布式锁,可以很大程度避免高并发场景下的一系列并发问题的发生。只要你了解Redis的基本数据结构,以及Java的基本集合对象,上手Redisson还是很快的。Redission提供了分布式相关的对象,以便于我们处理分布式场景下的相关复杂的业务问题,工具也只是手段,遇到具体的业务问题或场景,分析业务场景使用合适的分布式对象或者集合,或许可以让问题变得简单起来。