@@ -465,7 +465,8 @@ describe("VisibilityManager", () => {
465465 const queueId = "tenant:t1:queue:release-batch" ;
466466 const queueKey = keys . queueKey ( queueId ) ;
467467 const queueItemsKey = keys . queueItemsKey ( queueId ) ;
468- const masterQueueKey = keys . masterQueueKey ( 0 ) ;
468+ const tenantQueueIndexKey = keys . tenantQueueIndexKey ( "t1" ) ;
469+ const dispatchKey = keys . dispatchKey ( 0 ) ;
469470
470471 // Add messages to queue and claim them
471472 for ( let i = 1 ; i <= 5 ; i ++ ) {
@@ -501,7 +502,9 @@ describe("VisibilityManager", () => {
501502 queueId ,
502503 queueKey ,
503504 queueItemsKey ,
504- masterQueueKey
505+ tenantQueueIndexKey ,
506+ dispatchKey ,
507+ "t1"
505508 ) ;
506509
507510 // Verify 2 messages still in-flight
@@ -539,17 +542,18 @@ describe("VisibilityManager", () => {
539542 const queueId = "tenant:t1:queue:empty-release" ;
540543 const queueKey = keys . queueKey ( queueId ) ;
541544 const queueItemsKey = keys . queueItemsKey ( queueId ) ;
542- const masterQueueKey = keys . masterQueueKey ( 0 ) ;
545+ const tenantQueueIndexKey = keys . tenantQueueIndexKey ( "t1" ) ;
546+ const dispatchKey = keys . dispatchKey ( 0 ) ;
543547
544548 // Should not throw when releasing empty array
545- await manager . releaseBatch ( [ ] , queueId , queueKey , queueItemsKey , masterQueueKey ) ;
549+ await manager . releaseBatch ( [ ] , queueId , queueKey , queueItemsKey , tenantQueueIndexKey , dispatchKey , "t1" ) ;
546550
547551 await manager . close ( ) ;
548552 }
549553 ) ;
550554
551555 redisTest (
552- "should update master queue with oldest message timestamp" ,
556+ "should update dispatch indexes with oldest message timestamp" ,
553557 { timeout : 10000 } ,
554558 async ( { redisOptions } ) => {
555559 keys = new DefaultFairQueueKeyProducer ( { prefix : "test" } ) ;
@@ -562,10 +566,11 @@ describe("VisibilityManager", () => {
562566 } ) ;
563567
564568 const redis = createRedisClient ( redisOptions ) ;
565- const queueId = "tenant:t1:queue:master -update" ;
569+ const queueId = "tenant:t1:queue:dispatch -update" ;
566570 const queueKey = keys . queueKey ( queueId ) ;
567571 const queueItemsKey = keys . queueItemsKey ( queueId ) ;
568- const masterQueueKey = keys . masterQueueKey ( 0 ) ;
572+ const tenantQueueIndexKey = keys . tenantQueueIndexKey ( "t1" ) ;
573+ const dispatchKey = keys . dispatchKey ( 0 ) ;
569574
570575 // Add and claim messages
571576 const baseTime = Date . now ( ) ;
@@ -586,11 +591,15 @@ describe("VisibilityManager", () => {
586591 const claimed = await manager . claimBatch ( queueId , queueKey , queueItemsKey , "consumer-1" , 3 ) ;
587592
588593 // Release all messages back
589- await manager . releaseBatch ( claimed , queueId , queueKey , queueItemsKey , masterQueueKey ) ;
594+ await manager . releaseBatch ( claimed , queueId , queueKey , queueItemsKey , tenantQueueIndexKey , dispatchKey , "t1" ) ;
590595
591- // Master queue should have been updated
592- const masterScore = await redis . zscore ( masterQueueKey , queueId ) ;
593- expect ( masterScore ) . not . toBeNull ( ) ;
596+ // Tenant queue index should have the queue with correct score
597+ const tenantQueueScore = await redis . zscore ( tenantQueueIndexKey , queueId ) ;
598+ expect ( tenantQueueScore ) . not . toBeNull ( ) ;
599+
600+ // Dispatch index should have the tenant
601+ const dispatchScore = await redis . zscore ( dispatchKey , "t1" ) ;
602+ expect ( dispatchScore ) . not . toBeNull ( ) ;
594603
595604 await manager . close ( ) ;
596605 await redis . quit ( ) ;
@@ -616,7 +625,8 @@ describe("VisibilityManager", () => {
616625 const queueId = "tenant:t1:queue:reclaim-test" ;
617626 const queueKey = keys . queueKey ( queueId ) ;
618627 const queueItemsKey = keys . queueItemsKey ( queueId ) ;
619- const masterQueueKey = keys . masterQueueKey ( 0 ) ;
628+ const tenantQueueIndexKey = keys . tenantQueueIndexKey ( "t1" ) ;
629+ const dispatchKey = keys . dispatchKey ( 0 ) ;
620630
621631 // Add and claim a message
622632 const messageId = "reclaim-msg" ;
@@ -644,7 +654,9 @@ describe("VisibilityManager", () => {
644654 const reclaimedMessages = await manager . reclaimTimedOut ( 0 , ( qId ) => ( {
645655 queueKey : keys . queueKey ( qId ) ,
646656 queueItemsKey : keys . queueItemsKey ( qId ) ,
647- masterQueueKey,
657+ tenantQueueIndexKey : keys . tenantQueueIndexKey ( keys . extractTenantId ( qId ) ) ,
658+ dispatchKey,
659+ tenantId : keys . extractTenantId ( qId ) ,
648660 } ) ) ;
649661
650662 expect ( reclaimedMessages ) . toHaveLength ( 1 ) ;
@@ -690,7 +702,8 @@ describe("VisibilityManager", () => {
690702 const queueId = "tenant:t1:queue:no-timeout" ;
691703 const queueKey = keys . queueKey ( queueId ) ;
692704 const queueItemsKey = keys . queueItemsKey ( queueId ) ;
693- const masterQueueKey = keys . masterQueueKey ( 0 ) ;
705+ const tenantQueueIndexKey = keys . tenantQueueIndexKey ( "t1" ) ;
706+ const dispatchKey = keys . dispatchKey ( 0 ) ;
694707
695708 // Add and claim a message with long timeout
696709 const messageId = "long-timeout-msg" ;
@@ -712,7 +725,9 @@ describe("VisibilityManager", () => {
712725 const reclaimedMessages = await manager . reclaimTimedOut ( 0 , ( qId ) => ( {
713726 queueKey : keys . queueKey ( qId ) ,
714727 queueItemsKey : keys . queueItemsKey ( qId ) ,
715- masterQueueKey,
728+ tenantQueueIndexKey : keys . tenantQueueIndexKey ( keys . extractTenantId ( qId ) ) ,
729+ dispatchKey,
730+ tenantId : keys . extractTenantId ( qId ) ,
716731 } ) ) ;
717732
718733 expect ( reclaimedMessages ) . toHaveLength ( 0 ) ;
@@ -736,7 +751,8 @@ describe("VisibilityManager", () => {
736751 } ) ;
737752
738753 const redis = createRedisClient ( redisOptions ) ;
739- const masterQueueKey = keys . masterQueueKey ( 0 ) ;
754+ const tenantQueueIndexKey = keys . tenantQueueIndexKey ( "t1" ) ;
755+ const dispatchKey = keys . dispatchKey ( 0 ) ;
740756
741757 // Add and claim messages for two different tenants
742758 for ( const tenant of [ "t1" , "t2" ] ) {
@@ -767,7 +783,9 @@ describe("VisibilityManager", () => {
767783 const reclaimedMessages = await manager . reclaimTimedOut ( 0 , ( qId ) => ( {
768784 queueKey : keys . queueKey ( qId ) ,
769785 queueItemsKey : keys . queueItemsKey ( qId ) ,
770- masterQueueKey,
786+ tenantQueueIndexKey : keys . tenantQueueIndexKey ( keys . extractTenantId ( qId ) ) ,
787+ dispatchKey,
788+ tenantId : keys . extractTenantId ( qId ) ,
771789 } ) ) ;
772790
773791 expect ( reclaimedMessages ) . toHaveLength ( 2 ) ;
@@ -798,7 +816,8 @@ describe("VisibilityManager", () => {
798816 const queueId = "tenant:t1:queue:fallback-test" ;
799817 const queueKey = keys . queueKey ( queueId ) ;
800818 const queueItemsKey = keys . queueItemsKey ( queueId ) ;
801- const masterQueueKey = keys . masterQueueKey ( 0 ) ;
819+ const tenantQueueIndexKey = keys . tenantQueueIndexKey ( "t1" ) ;
820+ const dispatchKey = keys . dispatchKey ( 0 ) ;
802821 const inflightDataKey = keys . inflightDataKey ( 0 ) ;
803822
804823 // Add and claim a message
@@ -830,7 +849,9 @@ describe("VisibilityManager", () => {
830849 const reclaimedMessages = await manager . reclaimTimedOut ( 0 , ( qId ) => ( {
831850 queueKey : keys . queueKey ( qId ) ,
832851 queueItemsKey : keys . queueItemsKey ( qId ) ,
833- masterQueueKey,
852+ tenantQueueIndexKey : keys . tenantQueueIndexKey ( keys . extractTenantId ( qId ) ) ,
853+ dispatchKey,
854+ tenantId : keys . extractTenantId ( qId ) ,
834855 } ) ) ;
835856
836857 expect ( reclaimedMessages ) . toHaveLength ( 1 ) ;
0 commit comments