宅男在线永久免费观看网直播,亚洲欧洲日产国码无码久久99,野花社区在线观看视频,亚洲人交乣女bbw,一本一本久久a久久精品综合不卡

全部
常見問題
產(chǎn)品動(dòng)態(tài)
精選推薦

Redis隊(duì)列原理解析:讓你的應(yīng)用程序運(yùn)行更加穩(wěn)定!

管理 管理 編輯 刪除

一、消息隊(duì)列簡介

消息隊(duì)列(Message Queue),字面意思就是存放消息的隊(duì)列。最簡單的消息隊(duì)列模型包括 3 個(gè)角色:

  • 消息隊(duì)列:存儲(chǔ)和管理消息,也被稱為消息代理(Message Broker)
  • 生產(chǎn)者:發(fā)送消息到消息隊(duì)列
  • 消費(fèi)者:從消息隊(duì)列獲取消息并處理消息

f3db520231213175325232.png

消息隊(duì)列和阻塞隊(duì)列的區(qū)別:

① 消息隊(duì)列是在 JVM 以外的獨(dú)立服務(wù),所以不受 JVM 內(nèi)存的限制

② 消息隊(duì)列不僅僅做數(shù)據(jù)存儲(chǔ),還需要確保數(shù)據(jù)安全,存入到消息隊(duì)列中的所有消息都需要做持久化,這樣不管是服務(wù)宕機(jī)還是重啟,數(shù)據(jù)都不會(huì)丟失。而且消息隊(duì)列還會(huì)在消息投遞給消費(fèi)者后,要求消費(fèi)者做消息確認(rèn),如果消費(fèi)者沒有確認(rèn),那么這條消息就會(huì)一直存在于消息隊(duì)列中,下一次會(huì)繼續(xù)投遞給消費(fèi)者,讓消費(fèi)者繼續(xù)處理,直到消息被成功處理。

二、Redis 提供的消息隊(duì)列

Redis 提供了三種不同的方式來實(shí)現(xiàn)消息隊(duì)列:

  • list 結(jié)構(gòu):基于 List 結(jié)構(gòu)模擬消息隊(duì)列
  • PubSub:基本的點(diǎn)對(duì)點(diǎn)消息隊(duì)列
  • Stream:比較完善的消息隊(duì)列模型

2.1 基于 List 結(jié)構(gòu)模擬消息隊(duì)列

消息隊(duì)列(Message Queue),字面意思就是存放消息的隊(duì)列。而 Redis 的 List 數(shù)據(jù)結(jié)構(gòu)是一個(gè)雙向鏈表,很容易模擬出隊(duì)列效果。

隊(duì)列(先進(jìn)先出)是入口和出口不在一邊,因此我們可以利用:LPUSH 結(jié)合 RPOP、或者 RPUSH 結(jié)合 LPOP 來實(shí)現(xiàn)。

不過要注意的是,當(dāng)隊(duì)列中沒有消息時(shí) RPOP 或 LPOP 操作會(huì)返回 null,并不像 JVM 的阻塞隊(duì)列那樣會(huì)阻塞并等待下消息。因此這里應(yīng)該使用 BRPOP 或者 BLPOP 來實(shí)現(xiàn)阻塞效果。

基于 List 的消息隊(duì)列有哪些優(yōu)缺點(diǎn)?

優(yōu)點(diǎn):

① 利用 Redis 存儲(chǔ),不受限于 JVM 內(nèi)存上限

② 基于 Redis 的持久化機(jī)制,數(shù)據(jù)安全性有保證

③ 可以滿足消息有序性

缺點(diǎn):

① 無法避免消息丟失。假設(shè)某個(gè)消費(fèi)者從消息隊(duì)列(List 結(jié)構(gòu))中獲取到一條消息,但還未來得及處理,該消費(fèi)者出現(xiàn)故障,那么這條消息就會(huì)丟失,這是因?yàn)?POP 命令是 remove and get,會(huì)將消息直接從消息隊(duì)列中直接移除,這樣其他消費(fèi)者就獲取不到。

② 只支持單消費(fèi)者。消息隊(duì)列(List 結(jié)構(gòu))中的消息,一旦被某個(gè)消費(fèi)者取走,就會(huì)從隊(duì)列中移除,其他消費(fèi)者就獲取不到了,無法實(shí)現(xiàn)一條消息被很多消費(fèi)者消費(fèi)的需求。

2.2 基于 PubSub 的消息隊(duì)列

PubSub(發(fā)布訂閱)是 Redis2.0 版本引入的消息傳遞模型。顧名思義,消費(fèi)者可以訂閱一個(gè)或多個(gè) channel,生產(chǎn)者向?qū)?yīng) channel 發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。

相關(guān)命令如下:

  • SUBSCRIBE channel [channel]:訂閱一個(gè)或多個(gè)頻道
  • PUBLISH channel msg:向一個(gè)頻道發(fā)送消息
  • PSUBSCRIBE pattern [pattern]:訂閱與 pattern 格式匹配的所有頻道

關(guān)于 PubSub 的具體命令使用方法可以參看官網(wǎng):?https://redis.io/commands/?group=pubsub

基于 PubSub 的消息隊(duì)列有哪些優(yōu)缺點(diǎn):

優(yōu)點(diǎn):

采用發(fā)布訂閱模型,支持多生產(chǎn)、多消費(fèi)。一條消息可以發(fā)給多個(gè)消費(fèi)者,也可以發(fā)給一個(gè)消費(fèi)者,而且也支持不同生產(chǎn)者往相同頻道發(fā)。

缺點(diǎn):

① 不支持?jǐn)?shù)據(jù)持久化。本身不像 List 結(jié)構(gòu)那樣支持?jǐn)?shù)據(jù)持久化,List 結(jié)構(gòu)本身就是用來存儲(chǔ)數(shù)據(jù)的,而 PubSub 則是用來做消息發(fā)送的。因此,當(dāng)發(fā)送一條消息時(shí),但卻沒有任何消費(fèi)者訂閱,那么該條消息就直接消失了。

② 無法避免消息丟失

③ 消息堆積有上限,超出時(shí)數(shù)據(jù)丟失。當(dāng)發(fā)送一條消息時(shí),如果有消費(fèi)者監(jiān)聽,消費(fèi)者會(huì)將發(fā)送過來的消息緩存至消息緩存區(qū),由消費(fèi)者進(jìn)行處理。而消費(fèi)者的緩存空間是有上限的,如果超出了就會(huì)丟失。

2.3 基于 Stream 的消息隊(duì)列

Stream 是 Redis5.0 引入的一種新的數(shù)據(jù)類型,可以實(shí)現(xiàn)一個(gè)功能非常完善的消息隊(duì)列。

發(fā)送消息的命令:

1b937202312131756428041.png

最簡用法如下:

c6f6320231213175759232.png

讀取消息的方式之一:XREAD

159f920231213175856456.png

使用 XREAD 讀取消息

50c47202312131801176933.png

XREAD 阻塞方式,讀取最新的消息:

6211c202312131802599576.png

在業(yè)務(wù)開發(fā)中,我們可以循環(huán)的調(diào)用 XREAD 阻塞方式來查詢最新消息,從而實(shí)現(xiàn)持續(xù)監(jiān)聽隊(duì)列的效果,偽代碼如下:

while(true) {
	// 嘗試讀取隊(duì)列中的消息,最多阻塞 2 秒
	Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
	if(msg == null) {
		continue;
	}
	// 處理消息
	handleMessage(msg);
}

注意:當(dāng)我們指定起始 ID 為 $ 時(shí),代表讀取最新的消息,如果我們處理一條消息的過程中,又有超過 1 條以上的消息到達(dá)隊(duì)列,則下次獲取時(shí)也只能獲取到最新的一條,會(huì)出現(xiàn)漏讀消息的問題。

STREAM 類型消息隊(duì)列的 XREAD 命令特點(diǎn):

  • 消息可回溯。消息讀完后不消失,永久保存在隊(duì)列中。
  • 一個(gè)消息可以被多個(gè)消費(fèi)者讀取
  • 可以阻塞讀取
  • 有消息漏讀的風(fēng)險(xiǎn)

2.4 基于 Stream 的消息隊(duì)列-消費(fèi)者組

消費(fèi)者組(Consumer Group):將多個(gè)消費(fèi)者劃分到一個(gè)組中,監(jiān)聽同一個(gè)隊(duì)列。具備下列特點(diǎn):

① 消息分流:隊(duì)列中的消息會(huì)分流給組內(nèi)的不同消費(fèi)者,而不是重復(fù)消費(fèi),從而加快消息處理的速度。

處于一個(gè)組內(nèi)的多個(gè)消費(fèi)者實(shí)際上是競爭關(guān)系,凡是進(jìn)入到這個(gè)組的消息,組內(nèi)的消費(fèi)者就會(huì)競爭該消息的處理權(quán)。這種方式可以大大提高消息的處理速度,避免消息堆積。如果想要一條消息被多個(gè)消費(fèi)者處理,可以添加多個(gè)消費(fèi)者組。

② 消息標(biāo)識(shí):消費(fèi)者組會(huì)維護(hù)一個(gè)標(biāo)識(shí),記錄最后一個(gè)被處理的消息,哪怕消費(fèi)者宕機(jī)重啟,還會(huì)從標(biāo)識(shí)之后讀取消息。確保每一個(gè)消息都會(huì)被消費(fèi)。

③ 消息確認(rèn):消費(fèi)者獲取消息后,消息處于 pending 狀態(tài),并存入一個(gè) pending-list。當(dāng)處理完成后需要通過 XACK 來確認(rèn)消息,標(biāo)記消息為已處理,才會(huì)從 pending-list 移除。

創(chuàng)建消費(fèi)者組:

8ec60202312131804259918.png

  • key:隊(duì)列名稱
  • groupName:消費(fèi)者組名稱
  • ID:起始 ID 標(biāo)識(shí),$ 代表隊(duì)列中最后一個(gè)消息,0 代表隊(duì)列中第一個(gè)消息。
  • MKSTREAM:隊(duì)列不存在時(shí)自動(dòng)創(chuàng)建隊(duì)列

其他常見命令:

bc5e4202312131805209027.png

從消費(fèi)者組讀取消息:

378fd202312131805486853.png

  • group:消費(fèi)者組名稱
  • consumer:消費(fèi)者名稱,如果消費(fèi)者不存在,會(huì)自動(dòng)創(chuàng)建一個(gè)消費(fèi)者
  • count:本次查詢的最大數(shù)量
  • BLOCK milliseconds:當(dāng)沒有消息時(shí)最長等待時(shí)間
  • NOACK:無需手動(dòng) ACK,獲取到消息后自動(dòng)確認(rèn)
  • STREAMS key:指定隊(duì)列名稱
  • ID:獲取消息的起始 ID:

“>”:從下一個(gè)未消費(fèi)的消息開始

其他:根據(jù)指定 id 從 pending-list 中獲取已消費(fèi)但未確認(rèn)的消息,例如 0,是從 pending-list 中的第一個(gè)消息開始。

使用 Java 代碼處理消費(fèi)者監(jiān)聽消息的基本思路:

whilt(true){
	// 嘗試監(jiān)聽隊(duì)列,使用阻塞模式,最長等待 2000 毫秒
	// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
	// 含義:消費(fèi)者組 g1 中的消費(fèi)者 c1 使用阻塞式嘗試從消息隊(duì)列 s1 中讀取下一個(gè)未被消費(fèi)的消息,阻塞時(shí)長為 2000 毫秒
	Obeject msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
	if(msg == null){
		continue;
	}
	try{
		// 處理消息,完成后一定要 ACK
		handleMessage(msg);
	} catch(Exception e){
		while(true){
			// XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0
			// 含義:消費(fèi)者組 g1 中的消費(fèi)者 c1 從消息隊(duì)列 s1 的pending-list 中讀取第一個(gè)消息
			Obeject msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
			if(msg == null){ // null 說明沒有異常消息,所有消息都已確認(rèn),結(jié)束循環(huán)
				break;
			}
			try{
				// 說明有異常消息,再次處理
				handleMessage(msg);
			}catch(Exception e){
				// 再次出現(xiàn)異常,記錄日志,繼續(xù)循環(huán)
				continue;
			}
		}
	}

}

Stream 類型消息隊(duì)列的 XREADGROUP 命令特點(diǎn):

  • 消息可回溯
  • 可以多消費(fèi)者爭搶消息,加快消費(fèi)速度
  • 可以阻塞讀取
  • 沒有消息漏讀的風(fēng)險(xiǎn)
  • 有消息確認(rèn)機(jī)制,保證消息至少被消費(fèi)一次

三、Redis 消息隊(duì)列比對(duì)

41c11202312131807378772.png

四、基于 Stream 消息隊(duì)列實(shí)現(xiàn)異步秒殺

需求:

① 創(chuàng)建一個(gè) Stream 類型的消息隊(duì)列,名為 stream.orders

② 修改之前的秒殺下單 Lua 腳本,在認(rèn)定有搶購資格后,直接向 stream.orders 中添加消息,內(nèi)容包含 voucherId、userId、orderId

③ 項(xiàng)目啟動(dòng)時(shí),開啟一個(gè)線程任務(wù),嘗試獲取 stream.orders 中的消息,完成下單

4.1 通過命令行的方式創(chuàng)建消息隊(duì)列以及消費(fèi)者組

創(chuàng)建隊(duì)列名為 stream.orders 且組名為 g1 的消費(fèi)者組,消息 ID 從 0 開始

75b0e202312131808304484.png

4.2 Lua 腳本

-- 優(yōu)惠券id
local voucherId = ARGV[1]
-- 用戶id
local userId = ARGV[2]
-- 訂單id
local orderId = ARGV[3]

-- 庫存key
local stockKey = "seckill:stock:"..voucherId
-- 訂單key
local orderKey = "seckill:order:"..voucherId

-- 判斷庫存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
    return 1
end

-- 判斷用戶是否已經(jīng)下過單
if(redis.call('sismember', orderKey, userId) == 1) then
    return 2
end

-- 扣減庫存
redis.call('incrby', stockKey, -1)

-- 將 userId 存入當(dāng)前優(yōu)惠券的 set 集合
redis.call('sadd', orderKey, userId)

-- 將訂單信息存入到消息隊(duì)列中 xadd stream.orders * k1 v1 k2 v2
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

4.3 代碼改進(jìn)

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private RedissonClient redissonClient;

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }


    /***
     * 創(chuàng)建線程池
     */
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    /***
     * 容器啟動(dòng)時(shí),便開始創(chuàng)建獨(dú)立線程,從隊(duì)列中讀取數(shù)據(jù),創(chuàng)建訂單
     */
    @PostConstruct
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private class VoucherOrderHandler implements Runnable {

        @Override
        public void run() {
            while(true){
                try {
                    // 獲取消息隊(duì)列中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2000)),
                            StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                    );
                    // 判斷訂單信息是否為空
                    if(list == null || list.isEmpty()){
                        // 如果為 null,說明沒有消息,繼續(xù)下一次循環(huán)
                        continue;
                    }
                    // 解析消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 創(chuàng)建訂單
                    createVoucherOrder(voucherOrder);
                    // 確認(rèn)消息 xack s1 g1 id
                    stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
                } catch (Exception e) {
                    log.error("處理訂單異常!", e);
                    handlePendingList();
                }
            }

        }

        private void handlePendingList() {
            while(true){
                try {
                    // 獲取 pending-list 中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                    );
                    // 判斷訂單信息是否為空
                    if(list == null || list.isEmpty()){
                        break;
                    }
                    // 解析消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 創(chuàng)建訂單
                    createVoucherOrder(voucherOrder);
                    // 確認(rèn)消息 xack s1 g1 id
                    stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());
                } catch (Exception e) {
                    log.error("處理訂單異常!", e);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException interruptedException) {
                        interruptedException.printStackTrace();
                    }
                }
            }

        }
    }

    private void createVoucherOrder(VoucherOrder voucherOrder) {
        // 判斷當(dāng)前優(yōu)惠券用戶是否已經(jīng)下過單
        // 用戶 id
        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();

        RLock lock = redissonClient.getLock("lock:order:" + userId);
        // 獲取互斥鎖
        // 使用空參意味著不會(huì)進(jìn)行重復(fù)嘗試獲取鎖
        boolean isLock = lock.tryLock();
        if (!isLock) {
            // 獲取鎖失敗,直接返回失敗或者重試
            log.error("不允許重復(fù)下單!");
            return;
        }


        try {
            // 查詢訂單
            int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
            if (count > 0) {
                log.error("不允許重復(fù)下單!");
                return;
            }

            // 扣減庫存
            boolean success = seckillVoucherService.update().
                    setSql("stock = stock - 1").
                    eq("voucher_id", voucherId).
                    gt("stock", 0).
                    update();

            // 扣減失敗
            if (!success) {
                log.error("庫存不足!");
                return;
            }

            // 創(chuàng)建訂單
            save(voucherOrder);
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        UserDTO user = UserHolder.getUser();
        // 生成訂單 id
        Long orderId = redisIdWorker.nextId("order");
        // 執(zhí)行 lua 腳本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), user.getId().toString(), orderId.toString());
        int r = result.intValue();

        // 判斷結(jié)果是否為 0
        if(r != 0){
            // 不為 0 ,代表沒有購買資格
            Result.fail(r == 1 ? "庫存不足!" : "不能重復(fù)下單!");
        }

        // 返回訂單 id
        return Result.ok(orderId);
    }
}


請(qǐng)登錄后查看

CRMEB-慕白寒窗雪 最后編輯于2023-12-13 18:09:50

快捷回復(fù)
回復(fù)
回復(fù)
回復(fù)({{post_count}}) {{!is_user ? '我的回復(fù)' :'全部回復(fù)'}}
排序 默認(rèn)正序 回復(fù)倒序 點(diǎn)贊倒序

{{item.user_info.nickname ? item.user_info.nickname : item.user_name}} LV.{{ item.user_info.bbs_level || item.bbs_level }}

作者 管理員 企業(yè)

{{item.floor}}# 同步到gitee 已同步到gitee {{item.is_suggest == 1? '取消推薦': '推薦'}}
{{item.is_suggest == 1? '取消推薦': '推薦'}}
沙發(fā) 板凳 地板 {{item.floor}}#
{{item.user_info.title || '暫無簡介'}}
附件

{{itemf.name}}

{{item.created_at}}  {{item.ip_address}}
打賞
已打賞¥{{item.reward_price}}
{{item.like_count}}
{{item.showReply ? '取消回復(fù)' : '回復(fù)'}}
刪除
回復(fù)
回復(fù)

{{itemc.user_info.nickname}}

{{itemc.user_name}}

回復(fù) {{itemc.comment_user_info.nickname}}

附件

{{itemf.name}}

{{itemc.created_at}}
打賞
已打賞¥{{itemc.reward_price}}
{{itemc.like_count}}
{{itemc.showReply ? '取消回復(fù)' : '回復(fù)'}}
刪除
回復(fù)
回復(fù)
查看更多
打賞
已打賞¥{{reward_price}}
3150
{{like_count}}
{{collect_count}}
添加回復(fù) ({{post_count}})

相關(guān)推薦

快速安全登錄

使用微信掃碼登錄
{{item.label}} 加精
{{item.label}} {{item.label}} 板塊推薦 常見問題 產(chǎn)品動(dòng)態(tài) 精選推薦 首頁頭條 首頁動(dòng)態(tài) 首頁推薦
取 消 確 定
回復(fù)
回復(fù)
問題:
問題自動(dòng)獲取的帖子內(nèi)容,不準(zhǔn)確時(shí)需要手動(dòng)修改. [獲取答案]
答案:
提交
bug 需求 取 消 確 定
打賞金額
當(dāng)前余額:¥{{rewardUserInfo.reward_price}}
{{item.price}}元
請(qǐng)輸入 0.1-{{reward_max_price}} 范圍內(nèi)的數(shù)值
打賞成功
¥{{price}}
完成 確認(rèn)打賞

微信登錄/注冊(cè)

切換手機(jī)號(hào)登錄

{{ bind_phone ? '綁定手機(jī)' : '手機(jī)登錄'}}

{{codeText}}
切換微信登錄/注冊(cè)
暫不綁定
CRMEB客服

CRMEB咨詢熱線 咨詢熱線

400-8888-794

微信掃碼咨詢

CRMEB開源商城下載 源碼下載 CRMEB幫助文檔 幫助文檔
返回頂部 返回頂部
CRMEB客服