Redis实现轻量级消息队列

对于小型项目想使用消息队列进行消息分发时,如果单独加一个中间件,显然有点太浪费,毕竟还得考虑消息中间件本身高可用,所以Redis变得更加适合

1.引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spring:
redis:
host: localhost
# 连接超时时间(记得添加单位,Duration)
timeout: 10000ms
# Redis默认情况下有16个分片,这里配置具体使用的分片
# database: 0
lettuce:
pool:
# 连接池最大连接数(使用负值表示没有限制) 默认 8
max-active: 8
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
max-wait: -1ms
# 连接池中的最大空闲连接 默认 8
max-idle: 8
# 连接池中的最小空闲连接 默认 0
min-idle: 0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Redis订阅频道属性类
* @author haopeng
* @date 2020-03-01 20:46
*/
@Component
public class RedisListenerBean {

private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerBean.class);


/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);

// 监听msgToAll
container.addMessageListener(listenerAdapter, new PatternTopic("REDIS_TOPIC"));
LOGGER.info("Subscribed Redis channel: " + "REDIS_TOPIC");
return container;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

/**
* Redis订阅频道处理类
* @author haopeng
* @date 2020-03-01 20:44
*/
@Component
public class RedisListenerHandle extends MessageListenerAdapter {

private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerHandle.class);

@Autowired
private RedisTemplate<String, String> redisTemplate;


/**
* 收到监听消息
* @param message
* @param bytes
*/
@Override
public void onMessage(Message message, byte[] bytes) {
byte[] body = message.getBody();
byte[] channel = message.getChannel();
String rawMsg;
String topic;
try {
rawMsg = redisTemplate.getStringSerializer().deserialize(body);
topic = redisTemplate.getStringSerializer().deserialize(channel);
LOGGER.info("Received raw message from topic:" + topic + ", raw message content:" + rawMsg);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
}

3.编写controller

1
2
3
4
@GetMapping("/redis")
public void redisTest() {
redisTemplate.convertAndSend("REDIS_TOPIC", "发送redis消息" + UUID.fastUUID());
}

4.测试

1
2
控制台输出:
Received raw message from topic:REDIS_TOPIC, raw message content:发送redis消息b0b129aa-9669-44d8-9fb3-7b41281fadb7