1616use MongoDB \Driver \ReadPreference ;
1717use MongoDB \Driver \WriteConcern ;
1818use MongoDB \Exception \ResumeTokenException ;
19- use MongoDB \Operation \DatabaseCommand ;
2019use MongoDB \Operation \InsertOne ;
2120use MongoDB \Operation \Watch ;
2221use MongoDB \Tests \CommandObserver ;
@@ -224,7 +223,7 @@ function (array $event) use (&$events) {
224223 $ postBatchResumeToken = $ this ->getPostBatchResumeTokenFromReply ($ events [0 ]['succeeded ' ]->getReply ());
225224
226225 $ this ->assertFalse ($ changeStream ->valid ());
227- $ this ->killChangeStreamCursor ( $ changeStream );
226+ $ this ->forceChangeStreamResume ( );
228227
229228 $ this ->assertNoCommandExecuted (function () use ($ changeStream ) {
230229 $ changeStream ->rewind ();
@@ -303,7 +302,7 @@ function (array $event) use (&$events) {
303302 $ this ->assertInstanceOf (TimestampInterface::class, $ operationTime );
304303
305304 $ this ->assertFalse ($ changeStream ->valid ());
306- $ this ->killChangeStreamCursor ( $ changeStream );
305+ $ this ->forceChangeStreamResume ( );
307306
308307 $ this ->assertNoCommandExecuted (function () use ($ changeStream ) {
309308 $ changeStream ->rewind ();
@@ -454,7 +453,7 @@ public function testNoChangeAfterResumeBeforeInsert()
454453
455454 $ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
456455
457- $ this ->killChangeStreamCursor ( $ changeStream );
456+ $ this ->forceChangeStreamResume ( );
458457
459458 $ changeStream ->next ();
460459 $ this ->assertFalse ($ changeStream ->valid ());
@@ -481,10 +480,10 @@ public function testResumeMultipleTimesInSuccession()
481480 $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
482481 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
483482
484- /* Killing the cursor when there are no results will test that neither
483+ /* Forcing a resume when there are no results will test that neither
485484 * the initial rewind() nor a resume attempt via next() increment the
486485 * key. */
487- $ this ->killChangeStreamCursor ( $ changeStream );
486+ $ this ->forceChangeStreamResume ( );
488487
489488 $ this ->assertNoCommandExecuted (function () use ($ changeStream ) {
490489 $ changeStream ->rewind ();
@@ -499,7 +498,7 @@ public function testResumeMultipleTimesInSuccession()
499498 $ this ->assertNull ($ changeStream ->current ());
500499
501500 // A consecutive resume attempt should still not increment the key
502- $ this ->killChangeStreamCursor ( $ changeStream );
501+ $ this ->forceChangeStreamResume ( );
503502
504503 $ changeStream ->next ();
505504 $ this ->assertFalse ($ changeStream ->valid ());
@@ -525,10 +524,10 @@ public function testResumeMultipleTimesInSuccession()
525524
526525 $ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
527526
528- /* Insert another document and kill the cursor . ChangeStream::next()
527+ /* Insert another document and force a resume . ChangeStream::next()
529528 * should resume and pick up the last insert. */
530529 $ this ->insertDocument (['_id ' => 2 ]);
531- $ this ->killChangeStreamCursor ( $ changeStream );
530+ $ this ->forceChangeStreamResume ( );
532531
533532 $ changeStream ->next ();
534533 $ this ->assertTrue ($ changeStream ->valid ());
@@ -552,7 +551,7 @@ public function testResumeMultipleTimesInSuccession()
552551 *
553552 * Note: PHPLIB-448 may require rewind() to throw an exception here. */
554553 $ this ->insertDocument (['_id ' => 3 ]);
555- $ this ->killChangeStreamCursor ( $ changeStream );
554+ $ this ->forceChangeStreamResume ( );
556555
557556 $ this ->assertNoCommandExecuted (function () use ($ changeStream ) {
558557 $ changeStream ->rewind ();
@@ -578,7 +577,7 @@ public function testResumeMultipleTimesInSuccession()
578577
579578 // Test one final, consecutive resume via ChangeStream::next()
580579 $ this ->insertDocument (['_id ' => 4 ]);
581- $ this ->killChangeStreamCursor ( $ changeStream );
580+ $ this ->forceChangeStreamResume ( );
582581
583582 $ changeStream ->next ();
584583 $ this ->assertTrue ($ changeStream ->valid ());
@@ -622,7 +621,7 @@ public function testKey()
622621 $ this ->assertFalse ($ changeStream ->valid ());
623622 $ this ->assertNull ($ changeStream ->key ());
624623
625- $ this ->killChangeStreamCursor ( $ changeStream );
624+ $ this ->forceChangeStreamResume ( );
626625
627626 $ changeStream ->next ();
628627 $ this ->assertFalse ($ changeStream ->valid ());
@@ -901,7 +900,7 @@ public function testRewindExtractsResumeTokenAndNextResumes()
901900 ];
902901 $ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
903902
904- $ this ->killChangeStreamCursor ( $ changeStream );
903+ $ this ->forceChangeStreamResume ( );
905904
906905 $ this ->advanceCursorUntilValid ($ changeStream );
907906 $ this ->assertSame (1 , $ changeStream ->key ());
@@ -1140,7 +1139,7 @@ function (array $event) use (&$originalSession) {
11401139 );
11411140
11421141 $ changeStream ->rewind ();
1143- $ this ->killChangeStreamCursor ( $ changeStream );
1142+ $ this ->forceChangeStreamResume ( );
11441143
11451144 (new CommandObserver ())->observe (
11461145 function () use (&$ changeStream ) {
@@ -1324,7 +1323,7 @@ public function testOriginalReadPreferenceIsPreservedOnResume()
13241323
13251324 $ changeStream = $ operation ->execute ($ secondary );
13261325 $ previousCursorId = $ changeStream ->getCursorId ();
1327- $ this ->killChangeStreamCursor ( $ changeStream );
1326+ $ this ->forceChangeStreamResume ( );
13281327
13291328 $ changeStream ->next ();
13301329 $ this ->assertNotSame ($ previousCursorId , $ changeStream ->getCursorId ());
@@ -1465,7 +1464,7 @@ public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfter
14651464 $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ options );
14661465 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
14671466 $ changeStream ->rewind ();
1468- $ this ->killChangeStreamCursor ( $ changeStream );
1467+ $ this ->forceChangeStreamResume ( );
14691468
14701469 $ aggregateCommand = null ;
14711470
@@ -1516,7 +1515,7 @@ public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOp
15161515 $ this ->advanceCursorUntilValid ($ changeStream );
15171516 $ this ->assertTrue ($ changeStream ->valid ());
15181517
1519- $ this ->killChangeStreamCursor ( $ changeStream );
1518+ $ this ->forceChangeStreamResume ( );
15201519
15211520 $ aggregateCommand = null ;
15221521
@@ -1552,6 +1551,15 @@ function (array $event) use (&$commands) {
15521551 $ this ->assertEmpty ($ commands );
15531552 }
15541553
1554+ private function forceChangeStreamResume (array $ commands = ['getMore ' ], int $ errorCode = self ::NOT_MASTER )
1555+ {
1556+ $ this ->configureFailPoint ([
1557+ 'configureFailPoint ' => 'failCommand ' ,
1558+ 'mode ' => ['times ' => 1 ],
1559+ 'data ' => ['failCommands ' => $ commands , 'errorCode ' => $ errorCode ],
1560+ ]);
1561+ }
1562+
15551563 private function getPostBatchResumeTokenFromReply (stdClass $ reply )
15561564 {
15571565 $ this ->assertObjectHasAttribute ('cursor ' , $ reply );
@@ -1584,17 +1592,6 @@ private function isStartAtOperationTimeSupported()
15841592 return server_supports_feature ($ this ->getPrimaryServer (), self ::$ wireVersionForStartAtOperationTime );
15851593 }
15861594
1587- private function killChangeStreamCursor (ChangeStream $ changeStream )
1588- {
1589- $ command = [
1590- 'killCursors ' => $ this ->getCollectionName (),
1591- 'cursors ' => [ $ changeStream ->getCursorId () ],
1592- ];
1593-
1594- $ operation = new DatabaseCommand ($ this ->getDatabaseName (), $ command );
1595- $ operation ->execute ($ this ->getPrimaryServer ());
1596- }
1597-
15981595 private function advanceCursorUntilValid (Iterator $ iterator , $ limitOnShardedClusters = 5 )
15991596 {
16001597 if (! $ this ->isShardedCluster ()) {
0 commit comments