利用 Redis 实现延迟队列(点赞场景)

news/2024/9/16 21:22:51 标签: java, 后端

🌈点赞场景在前段时间有很多人都在争论,我也看了一些视频和文档,最后觉得b站技术的这篇写得很好

【点个赞吧】 - B站千亿级点赞系统服务架构设计 - 哔哩哔哩

🌈所以我也尝试用 Redis 的延迟队列来写一个点赞处理的 demo(这都是基于高并发情况下),完全没有落地。后续有时间会专门写一个点赞的技术方案

🌈至于为什么选择 Redis 延迟队列而不是用常见的 MQ,因为我还在学习阶段,想更好地学习 Redis 的延迟队列

🌈由于只是个菜只因,接触到和学到的技术并不多,可能还有很多情况没有考虑到

目录

1. Redis 实现延迟队列的方案

2. Redis 过期事件监听实现延时任务

3. Redis 过期事件监听实现延时任务功能有什么缺陷

3.1. 时效性差

3.2. 丢消息

3.3. 多服务实例下存在消息重复消息的问题

4. 为什么选用Redission作为延迟队列

5. 使用 Redis 实现延时任务有什么注意的地方?

6. 如何使用 Redisson 实现延迟队列(点赞场景)

6.1. 基础配置

6.2. 延迟队列实战


1. Redis 实现延迟队列的方案

基于 Redis 实现延时任务的功能无非就下面两种方案:

  1. Redis 过期事件监听
  2. Redisson 内置的延时队列

这里选用的是用 Redission 内置的延迟队列,所以实现的着重点放在 Redission

2. Redis 过期事件监听实现延时任务

Redis 2.0 引入了 发布订阅(Pub/Sub) 功能。在 Pub/Sub 模型中,引入了一个名为 channel(频道) 的概念,类似于消息队列中的 topic(主题)

Pub/Sub 涉及两个主要角色:发布者(Publisher)订阅者(Subscriber,也称为消费者)

  • 发布者 通过 PUBLISH 命令将消息发送到指定的 channel
  • 订阅者 通过 SUBSCRIBE 命令订阅感兴趣的 channel,并且可以同时订阅一个或多个 channel

Pub/Sub 模式 中,生产者需要指定将消息发送到哪个 channel,而消费者通过订阅对应的 channel 来获取消息。Redis 内部也存在一些默认的 channel,这些通道用于 Redis 自身发送消息,而非用户代码生成。只需监听这些 channel,即可获取与 过期 key 相关的通知,从而实现延时任务的功能。

这一特性被 Redis 官方称为 Keyspace Notifications,其主要作用是 实时监控 Redis 中键和值的变化。通过它,开发者能够及时捕捉键的变化(如过期、删除等事件),从而执行相应的处理逻辑

3. Redis 过期事件监听实现延时任务功能有什么缺陷

3.1. 时效性差

官方文档的一段介绍解释了时效性差的原因,地址:

Redis keyspace notifications | Docs

Redis 中的 过期事件消息 只有在 Redis 服务器真正删除 key 时才会发布,而不是在 key 到达过期时间后立即发布。

常见的过期数据删除策略有两种:

  1. 惰性删除:仅当访问 key 时,才会检查其是否过期。这种方式对 CPU 友好,但可能导致大量过期 key 未及时删除,继续占用内存。
  2. 定期删除:Redis 会定期抽取一部分 key,检查并删除过期的 key。为了减少删除操作对 CPU 的影响,Redis 会限制删除操作的执行时长和频率。虽然定期删除更有利于释放内存,但也可能增加 CPU 负载。

Redis 结合了这两种策略,采用 定期删除惰性删除 的方式。定期删除保证了内存的回收,而惰性删除则在取用时保证 CPU 性能。

因此,可能会出现这样一种情况:虽然设置了 key 的过期时间,但当该时间到达时,key 可能尚未被删除,导致 过期事件 未及时发布。

其他的文章测试

请勿过度依赖Redis的过期监听-阿里云开发者社区

3.2. 丢消息

Redis 的 pub/sub 模式中的消息并不支持持久化,这与消息队列不同。在 Redis 的 pub/sub 模式中,发布者将消息发送给指定的频道,订阅者监听相应的频道以接收消息。当没有订阅者时,消息会被直接丢弃,在 Redis 中不会存储该消息。

3.3. 多服务实例下存在消息重复消息的问题

Redis 的 pub/sub 模式目前只有广播模式,这意味着当生产者向特定频道发布一条消息时,所有订阅相关频道的消费者都能够收到该消息。

这个时候,我们需要注意多个服务实例重复处理消息的问题,这会增加代码开发量和维护难度。

4. 为什么选用Redission作为延迟队列

Redisson 是一个开源的 Java Redis 客户端,提供了许多开箱即用的功能,包括多种分布式锁的实现和延迟队列。Redisson 内置的延迟队列 RDelayedQueue 利用 Redis 的 SortedSet 实现延时任务功能。

SortedSet 是一个有序集合,每个元素都有一个分数,代表其优先级或时间权重。Redisson 通过将需要延迟执行的任务插入到 SortedSet 中,并为它们设置相应的过期时间作为分数来实现延迟队列。

Redisson 在客户端启动一个定时任务,当时间到达时,它使用 zrangebyscore 命令扫描 SortedSet 中已过期的元素(即分数小于或等于当前时间的元素)。这些过期元素会被从 SortedSet 中移除,并加入到就绪消息列表(List 结构)中。

当任务被移到就绪消息列表时,Redisson 通常还会通过 Redis 的发布/订阅机制(Pub/Sub)通知消费者有新任务到达。就绪消息列表是一个阻塞队列,消费者可以使用阻塞操作(如 BLPOP key 0,其中0表示无限等待)来监听。由于 Redis 的 Pub/Sub 机制是事件驱动的,它避免了轮询开销,只有在有新消息时才会触发处理逻辑。

需要注意的是,Redisson 的定时任务调度器并不是以固定时间间隔频繁调用 zrangebyscore 命令进行扫描,而是根据 SortedSet 中最近的到期时间动态调整下一次检查的时间点。

相比于使用 Redis 过期事件监听实现延时任务,Redisson 延迟队列具有以下优势:

  1. 减少丢失消息的可能性RDelayedQueue 中的消息会被持久化,即使 Redis 宕机,根据持久化机制,可能仅丢失少量消息,影响不大。此外,还可以使用数据库扫描作为补偿机制。
  2. 避免消息重复消费:所有客户端从同一个目标队列获取任务,避免了重复消费的问题。

尽管 Redisson 提供了便利的延迟队列功能,但在实际项目中,如果需要更高的吞吐量和可靠性,通常优先选择使用消息队列的延时消息方案。消息队列可以通过保障消息消费的可靠性和控制生产者与消费者数量来实现更好的性能。

5. 使用 Redis 实现延时任务有什么注意的地方?

在任务时间跨度较大且任务数量众多的场景中,需要特别注意内存管理。大量任务可能会导致内存占用过高,而长时间保存任务则会造成资源浪费。为了解决这些问题,可以结合使用 MySQL 和 Redis 来优化任务管理:

  1. 短期任务:对于延迟时间较短的任务(例如几分钟到几个小时内执行的任务),可以继续存储在 Redis 中,以便快速访问和处理。
  2. 长期任务:对于延迟时间较长的任务(例如几天或几周后执行的任务),则可以存储在 MySQL 中。通过这种方式,可以有效减少 Redis 的内存占用。
  3. 定期扫描:使用定时任务(例如 XXL-JOB 或 Spring Task)定期扫描 MySQL 中即将到期的任务(例如未来 2 小时内到期的任务),并将这些任务推送到 Redis 中进行处理。这种做法可以确保任务在适当的时候被加载到内存中。
  4. 优化查询:在定期扫描 MySQL 时,可能需要处理大量数据。为提高查询效率,可以使用索引或进行分库分表等优化措施。

将 Redis 和 MySQL 结合使用的优势

  1. 节省缓存资源:通过将长期任务存储在 MySQL 中,避免了在 Redis 中存储大量长期任务导致的内存浪费。
  2. 可靠性和成本:MySQL 提供的事务机制可以保证任务数据的可靠性,同时存储成本也相对较低。
  3. 避免大 key 问题:如果仅使用一个 RDelayedQueue,任务数量过大会产生大 key 问题。可以通过将任务按某种逻辑(如时间段、任务类型)分片存储到多个 RDelayedQueue 中来避免这一问题。

通过这种结合使用的方式,既能利用 Redis 的快速访问能力,又能依靠 MySQL 的持久化存储和事务支持,有效地管理大时间跨度和大量的延时任务。

6. 如何使用 Redisson 实现延迟队列

6.1. 基础配置

maven 依赖:

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.16.2</version>
</dependency>

基础 Redission 配置文件

java">@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379");
        return Redisson.create(config);
    }
}

6.2. 延迟队列实战

这里就随便写一个需要频繁修改数据的场景,就例如一个点赞的场景

点赞功能虽然看起来简单,但如果系统流量大,用户频繁点赞,尤其是针对热门内容时,后台需要处理的大量并发请求就会成为性能瓶颈

点赞计数并发处理

  • 并发场景:在点赞的场景中,可能会有很多用户同时对同一个帖子进行点赞。为了避免计数错误(比如两个人同时点赞,只增加了一次的情况),就需要使用分布式锁或原子操作来确保计数的正确性。
  • 解决方案:使用 RAtomicLong 是一个比较好的选择,它提供原子操作,可以确保多个线程或多个分布式节点对同一个点赞计数进行安全的增加或覆盖操作。

缓存

  • 高频访问优化:在点赞场景中,某个内容可能会在短时间内被大量用户点赞。将点赞数保存在 Redis 这样的缓存系统中,可以极大地减轻数据库的压力,提高系统的响应速度。

延迟持久化

  • 持久化问题:每次点赞都立即写入数据库会对数据库产生巨大的压力,尤其是在高并发情况下。因此,通常的策略是将点赞数暂时缓存在 Redis 中,等待合适的时间再批量持久化到数据库。
  • 解决方案:通过 RDelayedQueue 实现延迟处理的功能。在点赞操作发生时,不立即持久化,而是将操作推迟 15 分钟再处理。这一做法既能确保点赞数不丢失,又减少了频繁持久化操作的开销。

使用 RBlockingQueue 代替 RQueue 的好处

  • 避免频繁轮询:在原本的代码中使用了 RQueue,这种队列会需要不断地去轮询,判断是否有新的任务需要处理,这对资源是一种浪费。
  • 阻塞队列的优化:RBlockingQueue 提供了阻塞机制,只有在有新元素到来时才会唤醒队列进行处理,节省了系统资源的消耗,减少不必要的 CPU 轮询开销。

java">public Long likeIncrementCount(String postId, Long directLikeNum, int countStrategy) {
    String key = LikeCacheKey.Like_COUNT.getKey(postId);
    RAtomicLong rAtomicLong = redissonClient.getAtomicLong(key);

    // 初始化操作数,如果 Redis 数据不存
    if (!rAtomicLong.isExists()) {
        getLikeNum(postId);
    }

    long likeCount;
    // 根据策略计数
    switch (countStrategy) {
        case ACCUMULATION.getType(): // 累加
            if (directLikeNum == null) {
                likeCount = rAtomicLong.incrementAndGet();
            } else {
                likeCount = rAtomicLong.addAndGet(directLikeNum);
            }
            break;
        case COVER.getType(): // 覆盖
            if (directLikeNum == null) {
                throw new IllegalArgumentException("Direct like number cannot be null when using override strategy");
            }
            rAtomicLong.set(directLikeNum);
            likeCount = directLikeNum;
            break;
        default: // 默认返回当前值
            likeCount = rAtomicLong.get();
            break;
    }

    // 设置过期时间
    rAtomicLong.expire(60, TimeUnit.MINUTES);

    // 使用RBlockingQueue避免频繁轮询
    RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(LikeCacheKey.Like_DYNAMIC.getKey());
    RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

    // 如果队列中不包含当前key,则添加到延迟队列中
    if (!delayedQueue.contains(key)) {
        // 延迟 10 分钟统计
        delayedQueue.offerAsync(key, 10, TimeUnit.MINUTES);
    }

    return likeCount;
}

接下来就是处理延迟队列

java">@Slf4j
@Component
@RequiredArgsConstructor
public class LikePersistenceTask implements ApplicationRunner {

    private final ScheduledExecutorService executorService = Executors.newSingleThreadExecutor();
    private final LikeService likeService;  
    private final RedissonClient redissonClient;

    @Override
    public void run(ApplicationArguments args) {
        executorService.submit(this::processLikeData);
        log.info("启动一个后台线程,用于处理 Redis 点赞统计数据持久化。");
    }

    private void processLikeData() {
        RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(LikeCacheKey.LIKE_DYNAMIC.getKey());
        while (!Thread.currentThread().isInterrupted()) {
            try {
                String key = blockingQueue.take();  // 阻塞直到有元素
                if (StringUtils.isNotBlank(key)) {
                    processKey(key);
                }
            } catch (InterruptedException e) {
                log.error("处理 Redis 点赞统计数据持久化线程被中断", e);
                Thread.currentThread().interrupt();  // 恢复中断状态
            } catch (Exception e) {
                log.error("处理 Redis 点赞统计数据持久化时发生错误", e);
            }
        }
    }

    private void processKey(String key) {
        String[] objs = LikeCacheKey.LIKE_COUNT.parseKeyArg(key);
        String postId = objs[0];
        Integer actionType = Integer.valueOf(objs[1]);
        likeService.persistenceLikeData(postId, actionType);
    }

    @PreDestroy
    public void shutdown() {
        log.info("正在关闭 LikePersistenceTask 线程池...");
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    log.error("线程池未能在指定时间内终止");
                }
            }
        } catch (InterruptedException ie) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}


http://www.niftyadmin.cn/n/5645908.html

相关文章

AutoGen Function Call 函数调用解析(一)

目录 一、AutoGen Function Call 1.1 register_for_llm 注册调用 1.2 register_for_execution 注册执行 1.3 三种注册方法 1.3.1 函数定义和注册分开 1.3.2 定义函数时注册 1.3.3 register_function 函数注册 二、实例 本文主要对 AutoGen Function Call 进行解析&…

vue3 自定义指令 directive

1、官方说明&#xff1a;https://cn.vuejs.org/guide/reusability/custom-directives 除了 Vue 内置的一系列指令 (比如 v-model 或 v-show) 之外&#xff0c;Vue 还允许你注册自定义的指令 (Custom Directives)。 我们已经介绍了两种在 Vue 中重用代码的方式&#xff1a;组件和…

MySQL中常见的存储引擎有什么?

MySQL中常见的存储引擎有什么&#xff1f; MySQL中有三种常见的引擎&#xff1a;InnoDB&#xff08;默认&#xff09;&#xff0c;MyISAM&#xff0c;Memory。 InnoDB存储引擎作为MySQL的默认存储引擎有很多特点&#xff1a; B树作为索引结构&#xff0c;叶子节点上存放表中…

Docker 清理和查看镜像与容器占用情况

查看容器占用磁盘大小 docker system df 查看单个image、container大小&#xff1a; docker system df -v 清理所有废弃镜像与Build Cache docker system prune -a

快速失败 (fail-fast) 和安全失败 (fail-safe)

1. 定义与工作原理 1.1 快速失败&#xff08;Fail-Fast&#xff09; 定义&#xff1a; 快速失败是一种系统设计原则&#xff0c;当系统遇到异常情况或错误时&#xff0c;立即停止执行并返回错误&#xff0c;而不是试图继续执行或处理潜在的问题。快速失败系统会主动检测系统中…

电脑驱动作用详解

电脑驱动的主要作用是充当操作系统与硬件设备之间的桥梁&#xff0c;使操作系统能够正确地与硬件设备进行通信和控制。具体来说&#xff0c;电脑驱动的作用包括以下几个方面&#xff1a; 1. 硬件与操作系统的接口 翻译指令&#xff1a;驱动程序将操作系统的指令翻译成硬件设备…

【生日视频制作】劳斯莱斯库里南中控改名软件AE模板修改文字软件生成器教程特效素材【AE模板】

生日视频制作教程豪车劳斯莱斯库里南中控改名软件AE模板修改文字特效广告生成神器素材祝福玩法AE模板工程 怎么如何做的【生日视频制作】劳斯莱斯库里南中控改名软件AE模板修改文字软件生成器教程特效素材【AE模板】 生日视频制作步骤&#xff1a; 下载AE模板 安装AE软件 把A…

GraphPad Prism 10 for Mac/Win:高效统计分析与精美绘图的科学利器

GraphPad Prism 10 是一款专为科研工作者设计的强大统计分析与绘图软件&#xff0c;无论是Mac还是Windows用户&#xff0c;都能享受到其带来的便捷与高效。该软件广泛应用于生物医学研究、实验设计和数据分析领域&#xff0c;以其直观的操作界面、丰富的统计方法和多样化的图表…