一、消息隊(duì)列簡介
消息隊(duì)列(Message Queue),字面意思就是存放消息的隊(duì)列。最簡單的消息隊(duì)列模型包括 3 個(gè)角色:
- 消息隊(duì)列:存儲(chǔ)和管理消息,也被稱為消息代理(Message Broker)
- 生產(chǎn)者:發(fā)送消息到消息隊(duì)列
- 消費(fèi)者:從消息隊(duì)列獲取消息并處理消息
消息隊(duì)列和阻塞隊(duì)列的區(qū)別:
① 消息隊(duì)列是在 JVM 以外的獨(dú)立服務(wù),所以不受 JVM 內(nèi)存的限制
② 消息隊(duì)列不僅僅做數(shù)據(jù)存儲(chǔ),還需要確保數(shù)據(jù)安全,存入到消息隊(duì)列中的所有消息都需要做持久化,這樣不管是服務(wù)宕機(jī)還是重啟,數(shù)據(jù)都不會(huì)丟失。而且消息隊(duì)列還會(huì)在消息投遞給消費(fèi)者后,要求消費(fèi)者做消息確認(rèn),如果消費(fèi)者沒有確認(rèn),那么這條消息就會(huì)一直存在于消息隊(duì)列中,下一次會(huì)繼續(xù)投遞給消費(fèi)者,讓消費(fèi)者繼續(xù)處理,直到消息被成功處理。
二、Redis 提供的消息隊(duì)列
Redis 提供了三種不同的方式來實(shí)現(xiàn)消息隊(duì)列:
- list 結(jié)構(gòu):基于 List 結(jié)構(gòu)模擬消息隊(duì)列
- PubSub:基本的點(diǎn)對(duì)點(diǎn)消息隊(duì)列
- Stream:比較完善的消息隊(duì)列模型
2.1 基于 List 結(jié)構(gòu)模擬消息隊(duì)列
消息隊(duì)列(Message Queue),字面意思就是存放消息的隊(duì)列。而 Redis 的 List 數(shù)據(jù)結(jié)構(gòu)是一個(gè)雙向鏈表,很容易模擬出隊(duì)列效果。
隊(duì)列(先進(jìn)先出)是入口和出口不在一邊,因此我們可以利用:LPUSH 結(jié)合 RPOP、或者 RPUSH 結(jié)合 LPOP 來實(shí)現(xiàn)。
不過要注意的是,當(dāng)隊(duì)列中沒有消息時(shí) RPOP 或 LPOP 操作會(huì)返回 null,并不像 JVM 的阻塞隊(duì)列那樣會(huì)阻塞并等待下消息。因此這里應(yīng)該使用 BRPOP 或者 BLPOP 來實(shí)現(xiàn)阻塞效果。
基于 List 的消息隊(duì)列有哪些優(yōu)缺點(diǎn)?
優(yōu)點(diǎn):
① 利用 Redis 存儲(chǔ),不受限于 JVM 內(nèi)存上限
② 基于 Redis 的持久化機(jī)制,數(shù)據(jù)安全性有保證
③ 可以滿足消息有序性
缺點(diǎn):
① 無法避免消息丟失。假設(shè)某個(gè)消費(fèi)者從消息隊(duì)列(List 結(jié)構(gòu))中獲取到一條消息,但還未來得及處理,該消費(fèi)者出現(xiàn)故障,那么這條消息就會(huì)丟失,這是因?yàn)?POP 命令是 remove and get,會(huì)將消息直接從消息隊(duì)列中直接移除,這樣其他消費(fèi)者就獲取不到。
② 只支持單消費(fèi)者。消息隊(duì)列(List 結(jié)構(gòu))中的消息,一旦被某個(gè)消費(fèi)者取走,就會(huì)從隊(duì)列中移除,其他消費(fèi)者就獲取不到了,無法實(shí)現(xiàn)一條消息被很多消費(fèi)者消費(fèi)的需求。
2.2 基于 PubSub 的消息隊(duì)列
PubSub(發(fā)布訂閱)是 Redis2.0 版本引入的消息傳遞模型。顧名思義,消費(fèi)者可以訂閱一個(gè)或多個(gè) channel,生產(chǎn)者向?qū)?yīng) channel 發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。
相關(guān)命令如下:
- SUBSCRIBE channel [channel]:訂閱一個(gè)或多個(gè)頻道
- PUBLISH channel msg:向一個(gè)頻道發(fā)送消息
- PSUBSCRIBE pattern [pattern]:訂閱與 pattern 格式匹配的所有頻道
關(guān)于 PubSub 的具體命令使用方法可以參看官網(wǎng):?https://redis.io/commands/?group=pubsub
基于 PubSub 的消息隊(duì)列有哪些優(yōu)缺點(diǎn):
優(yōu)點(diǎn):
采用發(fā)布訂閱模型,支持多生產(chǎn)、多消費(fèi)。一條消息可以發(fā)給多個(gè)消費(fèi)者,也可以發(fā)給一個(gè)消費(fèi)者,而且也支持不同生產(chǎn)者往相同頻道發(fā)。
缺點(diǎn):
① 不支持?jǐn)?shù)據(jù)持久化。本身不像 List 結(jié)構(gòu)那樣支持?jǐn)?shù)據(jù)持久化,List 結(jié)構(gòu)本身就是用來存儲(chǔ)數(shù)據(jù)的,而 PubSub 則是用來做消息發(fā)送的。因此,當(dāng)發(fā)送一條消息時(shí),但卻沒有任何消費(fèi)者訂閱,那么該條消息就直接消失了。
② 無法避免消息丟失
③ 消息堆積有上限,超出時(shí)數(shù)據(jù)丟失。當(dāng)發(fā)送一條消息時(shí),如果有消費(fèi)者監(jiān)聽,消費(fèi)者會(huì)將發(fā)送過來的消息緩存至消息緩存區(qū),由消費(fèi)者進(jìn)行處理。而消費(fèi)者的緩存空間是有上限的,如果超出了就會(huì)丟失。
2.3 基于 Stream 的消息隊(duì)列
Stream 是 Redis5.0 引入的一種新的數(shù)據(jù)類型,可以實(shí)現(xiàn)一個(gè)功能非常完善的消息隊(duì)列。
發(fā)送消息的命令:
最簡用法如下:
讀取消息的方式之一:XREAD
使用 XREAD 讀取消息
XREAD 阻塞方式,讀取最新的消息:
在業(yè)務(wù)開發(fā)中,我們可以循環(huán)的調(diào)用 XREAD 阻塞方式來查詢最新消息,從而實(shí)現(xiàn)持續(xù)監(jiān)聽隊(duì)列的效果,偽代碼如下:
while(true) {
// 嘗試讀取隊(duì)列中的消息,最多阻塞 2 秒
Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
if(msg == null) {
continue;
}
// 處理消息
handleMessage(msg);
}
注意:當(dāng)我們指定起始 ID 為 $ 時(shí),代表讀取最新的消息,如果我們處理一條消息的過程中,又有超過 1 條以上的消息到達(dá)隊(duì)列,則下次獲取時(shí)也只能獲取到最新的一條,會(huì)出現(xiàn)漏讀消息的問題。
STREAM 類型消息隊(duì)列的 XREAD 命令特點(diǎn):
- 消息可回溯。消息讀完后不消失,永久保存在隊(duì)列中。
- 一個(gè)消息可以被多個(gè)消費(fèi)者讀取
- 可以阻塞讀取
- 有消息漏讀的風(fēng)險(xiǎn)
2.4 基于 Stream 的消息隊(duì)列-消費(fèi)者組
消費(fèi)者組(Consumer Group):將多個(gè)消費(fèi)者劃分到一個(gè)組中,監(jiān)聽同一個(gè)隊(duì)列。具備下列特點(diǎn):
① 消息分流:隊(duì)列中的消息會(huì)分流給組內(nèi)的不同消費(fèi)者,而不是重復(fù)消費(fèi),從而加快消息處理的速度。
處于一個(gè)組內(nèi)的多個(gè)消費(fèi)者實(shí)際上是競爭關(guān)系,凡是進(jìn)入到這個(gè)組的消息,組內(nèi)的消費(fèi)者就會(huì)競爭該消息的處理權(quán)。這種方式可以大大提高消息的處理速度,避免消息堆積。如果想要一條消息被多個(gè)消費(fèi)者處理,可以添加多個(gè)消費(fèi)者組。
② 消息標(biāo)識(shí):消費(fèi)者組會(huì)維護(hù)一個(gè)標(biāo)識(shí),記錄最后一個(gè)被處理的消息,哪怕消費(fèi)者宕機(jī)重啟,還會(huì)從標(biāo)識(shí)之后讀取消息。確保每一個(gè)消息都會(huì)被消費(fèi)。
③ 消息確認(rèn):消費(fèi)者獲取消息后,消息處于 pending 狀態(tài),并存入一個(gè) pending-list。當(dāng)處理完成后需要通過 XACK 來確認(rèn)消息,標(biāo)記消息為已處理,才會(huì)從 pending-list 移除。
創(chuàng)建消費(fèi)者組:
- key:隊(duì)列名稱
- groupName:消費(fèi)者組名稱
- ID:起始 ID 標(biāo)識(shí),$ 代表隊(duì)列中最后一個(gè)消息,0 代表隊(duì)列中第一個(gè)消息。
- MKSTREAM:隊(duì)列不存在時(shí)自動(dòng)創(chuàng)建隊(duì)列
其他常見命令:
從消費(fèi)者組讀取消息:
- group:消費(fèi)者組名稱
- consumer:消費(fèi)者名稱,如果消費(fèi)者不存在,會(huì)自動(dòng)創(chuàng)建一個(gè)消費(fèi)者
- count:本次查詢的最大數(shù)量
- BLOCK milliseconds:當(dāng)沒有消息時(shí)最長等待時(shí)間
- NOACK:無需手動(dòng) ACK,獲取到消息后自動(dòng)確認(rèn)
- STREAMS key:指定隊(duì)列名稱
- ID:獲取消息的起始 ID:
“>”:從下一個(gè)未消費(fèi)的消息開始
其他:根據(jù)指定 id 從 pending-list 中獲取已消費(fèi)但未確認(rèn)的消息,例如 0,是從 pending-list 中的第一個(gè)消息開始。
使用 Java 代碼處理消費(fèi)者監(jiān)聽消息的基本思路:
whilt(true){
// 嘗試監(jiān)聽隊(duì)列,使用阻塞模式,最長等待 2000 毫秒
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
// 含義:消費(fèi)者組 g1 中的消費(fèi)者 c1 使用阻塞式嘗試從消息隊(duì)列 s1 中讀取下一個(gè)未被消費(fèi)的消息,阻塞時(shí)長為 2000 毫秒
Obeject msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
if(msg == null){
continue;
}
try{
// 處理消息,完成后一定要 ACK
handleMessage(msg);
} catch(Exception e){
while(true){
// XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0
// 含義:消費(fèi)者組 g1 中的消費(fèi)者 c1 從消息隊(duì)列 s1 的pending-list 中讀取第一個(gè)消息
Obeject msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
if(msg == null){ // null 說明沒有異常消息,所有消息都已確認(rèn),結(jié)束循環(huán)
break;
}
try{
// 說明有異常消息,再次處理
handleMessage(msg);
}catch(Exception e){
// 再次出現(xiàn)異常,記錄日志,繼續(xù)循環(huán)
continue;
}
}
}
}
Stream 類型消息隊(duì)列的 XREADGROUP 命令特點(diǎn):
- 消息可回溯
- 可以多消費(fèi)者爭搶消息,加快消費(fèi)速度
- 可以阻塞讀取
- 沒有消息漏讀的風(fēng)險(xiǎn)
- 有消息確認(rèn)機(jī)制,保證消息至少被消費(fèi)一次
三、Redis 消息隊(duì)列比對(duì)
四、基于 Stream 消息隊(duì)列實(shí)現(xiàn)異步秒殺
需求:
① 創(chuàng)建一個(gè) Stream 類型的消息隊(duì)列,名為 stream.orders
② 修改之前的秒殺下單 Lua 腳本,在認(rèn)定有搶購資格后,直接向 stream.orders 中添加消息,內(nèi)容包含 voucherId、userId、orderId
③ 項(xiàng)目啟動(dòng)時(shí),開啟一個(gè)線程任務(wù),嘗試獲取 stream.orders 中的消息,完成下單
4.1 通過命令行的方式創(chuàng)建消息隊(duì)列以及消費(fèi)者組
創(chuàng)建隊(duì)列名為 stream.orders 且組名為 g1 的消費(fèi)者組,消息 ID 從 0 開始
4.2 Lua 腳本
-- 優(yōu)惠券id
local voucherId = ARGV[1]
-- 用戶id
local userId = ARGV[2]
-- 訂單id
local orderId = ARGV[3]
-- 庫存key
local stockKey = "seckill:stock:"..voucherId
-- 訂單key
local orderKey = "seckill:order:"..voucherId
-- 判斷庫存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
return 1
end
-- 判斷用戶是否已經(jīng)下過單
if(redis.call('sismember', orderKey, userId) == 1) then
return 2
end
-- 扣減庫存
redis.call('incrby', stockKey, -1)
-- 將 userId 存入當(dāng)前優(yōu)惠券的 set 集合
redis.call('sadd', orderKey, userId)
-- 將訂單信息存入到消息隊(duì)列中 xadd stream.orders * k1 v1 k2 v2
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
4.3 代碼改進(jìn)
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
/***
* 創(chuàng)建線程池
*/
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
/***
* 容器啟動(dòng)時(shí),便開始創(chuàng)建獨(dú)立線程,從隊(duì)列中讀取數(shù)據(jù),創(chuàng)建訂單
*/
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while(true){
try {
// 獲取消息隊(duì)列中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2000)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 判斷訂單信息是否為空
if(list == null || list.isEmpty()){
// 如果為 null,說明沒有消息,繼續(xù)下一次循環(huán)
continue;
}
// 解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 創(chuàng)建訂單
createVoucherOrder(voucherOrder);
// 確認(rèn)消息 xack s1 g1 id
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
} catch (Exception e) {
log.error("處理訂單異常!", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while(true){
try {
// 獲取 pending-list 中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 判斷訂單信息是否為空
if(list == null || list.isEmpty()){
break;
}
// 解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 創(chuàng)建訂單
createVoucherOrder(voucherOrder);
// 確認(rèn)消息 xack s1 g1 id
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
} catch (Exception e) {
log.error("處理訂單異常!", e);
try {
Thread.sleep(100);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
}
private void createVoucherOrder(VoucherOrder voucherOrder) {
// 判斷當(dāng)前優(yōu)惠券用戶是否已經(jīng)下過單
// 用戶 id
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
// 獲取互斥鎖
// 使用空參意味著不會(huì)進(jìn)行重復(fù)嘗試獲取鎖
boolean isLock = lock.tryLock();
if (!isLock) {
// 獲取鎖失敗,直接返回失敗或者重試
log.error("不允許重復(fù)下單!");
return;
}
try {
// 查詢訂單
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
log.error("不允許重復(fù)下單!");
return;
}
// 扣減庫存
boolean success = seckillVoucherService.update().
setSql("stock = stock - 1").
eq("voucher_id", voucherId).
gt("stock", 0).
update();
// 扣減失敗
if (!success) {
log.error("庫存不足!");
return;
}
// 創(chuàng)建訂單
save(voucherOrder);
} finally {
// 釋放鎖
lock.unlock();
}
}
@Override
public Result seckillVoucher(Long voucherId) {
UserDTO user = UserHolder.getUser();
// 生成訂單 id
Long orderId = redisIdWorker.nextId("order");
// 執(zhí)行 lua 腳本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), user.getId().toString(), orderId.toString());
int r = result.intValue();
// 判斷結(jié)果是否為 0
if(r != 0){
// 不為 0 ,代表沒有購買資格
Result.fail(r == 1 ? "庫存不足!" : "不能重復(fù)下單!");
}
// 返回訂單 id
return Result.ok(orderId);
}
}