LOADING

加载过慢请开启缓存 浏览器默认开启

rabbitmq消费者限流

rabbitmq结合Guava对消费者进行消费者限流

1.配置RabbitMQ消息监听器容器工厂


@Configuration
public class RabbitConfig {
    public static final String MESSAGE_QUEUE = "message_queue";

    /**
     * 配置RabbitMQ消息监听器容器工厂
     * 该工厂用于创建处理单个消息的监听器容器
     *
     * @param connectionFactory RabbitMQ连接工厂,用于建立与RabbitMQ的连接
     * @return SimpleRabbitListenerContainerFactory实例,配置用于手动确认消息的监听器容器
     */
    @Bean
    public SimpleRabbitListenerContainerFactory singleMessageListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置手动确认模式,确保消息处理完成后才从队列中移除
        factory.setPrefetchCount(1); // 每次只拉取一条消息,确保消息不会在消费者中积压
        return factory;
    }
        
    @Bean
    public Queue sendMessageQueue() {
        return new Queue(MESSAGE_QUEUE, true);
    }
}

2.消费端使用guava的RateLimiter进行限流

@Component
@Slf4j
public class SendMessageConsumer {
    // 限流器,每秒允许消费1个消息
    private final RateLimiter rateLimiter = RateLimiter.create(1);

    @RabbitListener(queues = RabbitConfig.MESSAGE_QUEUE, containerFactory = "singleMessageListenerContainerFactory")
    public void onMessage(Channel channel, String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            if (rateLimiter.tryAcquire()) {
                // 处理消息
                log.info("消费成功 message: {}, 消费时间: {}", message, LocalDateTime.now());
                // 确认消息
                channel.basicAck(deliveryTag, false);
            } else {
                // 限流,拒绝消息并重新入队
                channel.basicNack(deliveryTag, false, true);
            }
        } catch (Exception e) {
            log.error("消息处理异常 message: {}, 错误信息: {}", message, e.getMessage(), e);
            try {
                // 处理失败,拒绝消息并重新入队
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException ex) {
                log.error("消息拒绝失败 deliveryTag: {},  错误信息: {}", deliveryTag, ex.getMessage(), ex);
            }
        }
    }
}

3.效果展示