1616use MongoDB \Driver \ReadPreference ;
1717use MongoDB \Driver \WriteConcern ;
1818use MongoDB \Exception \ResumeTokenException ;
19- use MongoDB \Operation \DatabaseCommand ;
2019use MongoDB \Operation \InsertOne ;
2120use MongoDB \Operation \Watch ;
2221use MongoDB \Tests \CommandObserver ;
@@ -155,49 +154,6 @@ function (array $event) use (&$lastEvent) {
155154 $ this ->assertSameDocument ($ postBatchResumeToken , $ changeStream ->getResumeToken ());
156155 }
157156
158- /**
159- * Prose test 10: "ChangeStream will resume after a killCursors command is
160- * issued for its child cursor."
161- */
162- public function testNextResumesAfterCursorNotFound ()
163- {
164- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
165- $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
166-
167- $ changeStream ->rewind ();
168- $ this ->assertFalse ($ changeStream ->valid ());
169-
170- $ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
171-
172- $ this ->advanceCursorUntilValid ($ changeStream );
173-
174- $ expectedResult = [
175- '_id ' => $ changeStream ->current ()->_id ,
176- 'operationType ' => 'insert ' ,
177- 'fullDocument ' => ['_id ' => 1 , 'x ' => 'foo ' ],
178- 'ns ' => ['db ' => $ this ->getDatabaseName (), 'coll ' => $ this ->getCollectionName ()],
179- 'documentKey ' => ['_id ' => 1 ],
180- ];
181-
182- $ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
183-
184- $ this ->killChangeStreamCursor ($ changeStream );
185-
186- $ this ->insertDocument (['_id ' => 2 , 'x ' => 'bar ' ]);
187-
188- $ this ->advanceCursorUntilValid ($ changeStream );
189-
190- $ expectedResult = [
191- '_id ' => $ changeStream ->current ()->_id ,
192- 'operationType ' => 'insert ' ,
193- 'fullDocument ' => ['_id ' => 2 , 'x ' => 'bar ' ],
194- 'ns ' => ['db ' => $ this ->getDatabaseName (), 'coll ' => $ this ->getCollectionName ()],
195- 'documentKey ' => ['_id ' => 2 ],
196- ];
197-
198- $ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
199- }
200-
201157 public function testNextResumesAfterConnectionException ()
202158 {
203159 /* In order to trigger a dropped connection, we'll use a new client with
@@ -267,7 +223,7 @@ function (array $event) use (&$events) {
267223 $ postBatchResumeToken = $ this ->getPostBatchResumeTokenFromReply ($ events [0 ]['succeeded ' ]->getReply ());
268224
269225 $ this ->assertFalse ($ changeStream ->valid ());
270- $ this ->killChangeStreamCursor ( $ changeStream );
226+ $ this ->forceChangeStreamResume ( );
271227
272228 $ this ->assertNoCommandExecuted (function () use ($ changeStream ) {
273229 $ changeStream ->rewind ();
@@ -346,7 +302,7 @@ function (array $event) use (&$events) {
346302 $ this ->assertInstanceOf (TimestampInterface::class, $ operationTime );
347303
348304 $ this ->assertFalse ($ changeStream ->valid ());
349- $ this ->killChangeStreamCursor ( $ changeStream );
305+ $ this ->forceChangeStreamResume ( );
350306
351307 $ this ->assertNoCommandExecuted (function () use ($ changeStream ) {
352308 $ changeStream ->rewind ();
@@ -497,7 +453,7 @@ public function testNoChangeAfterResumeBeforeInsert()
497453
498454 $ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
499455
500- $ this ->killChangeStreamCursor ( $ changeStream );
456+ $ this ->forceChangeStreamResume ( );
501457
502458 $ changeStream ->next ();
503459 $ this ->assertFalse ($ changeStream ->valid ());
@@ -524,10 +480,10 @@ public function testResumeMultipleTimesInSuccession()
524480 $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
525481 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
526482
527- /* Killing the cursor when there are no results will test that neither
483+ /* Forcing a resume when there are no results will test that neither
528484 * the initial rewind() nor a resume attempt via next() increment the
529485 * key. */
530- $ this ->killChangeStreamCursor ( $ changeStream );
486+ $ this ->forceChangeStreamResume ( );
531487
532488 $ this ->assertNoCommandExecuted (function () use ($ changeStream ) {
533489 $ changeStream ->rewind ();
@@ -542,7 +498,7 @@ public function testResumeMultipleTimesInSuccession()
542498 $ this ->assertNull ($ changeStream ->current ());
543499
544500 // A consecutive resume attempt should still not increment the key
545- $ this ->killChangeStreamCursor ( $ changeStream );
501+ $ this ->forceChangeStreamResume ( );
546502
547503 $ changeStream ->next ();
548504 $ this ->assertFalse ($ changeStream ->valid ());
@@ -568,10 +524,10 @@ public function testResumeMultipleTimesInSuccession()
568524
569525 $ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
570526
571- /* Insert another document and kill the cursor . ChangeStream::next()
527+ /* Insert another document and force a resume . ChangeStream::next()
572528 * should resume and pick up the last insert. */
573529 $ this ->insertDocument (['_id ' => 2 ]);
574- $ this ->killChangeStreamCursor ( $ changeStream );
530+ $ this ->forceChangeStreamResume ( );
575531
576532 $ changeStream ->next ();
577533 $ this ->assertTrue ($ changeStream ->valid ());
@@ -595,7 +551,7 @@ public function testResumeMultipleTimesInSuccession()
595551 *
596552 * Note: PHPLIB-448 may require rewind() to throw an exception here. */
597553 $ this ->insertDocument (['_id ' => 3 ]);
598- $ this ->killChangeStreamCursor ( $ changeStream );
554+ $ this ->forceChangeStreamResume ( );
599555
600556 $ this ->assertNoCommandExecuted (function () use ($ changeStream ) {
601557 $ changeStream ->rewind ();
@@ -621,7 +577,7 @@ public function testResumeMultipleTimesInSuccession()
621577
622578 // Test one final, consecutive resume via ChangeStream::next()
623579 $ this ->insertDocument (['_id ' => 4 ]);
624- $ this ->killChangeStreamCursor ( $ changeStream );
580+ $ this ->forceChangeStreamResume ( );
625581
626582 $ changeStream ->next ();
627583 $ this ->assertTrue ($ changeStream ->valid ());
@@ -665,7 +621,7 @@ public function testKey()
665621 $ this ->assertFalse ($ changeStream ->valid ());
666622 $ this ->assertNull ($ changeStream ->key ());
667623
668- $ this ->killChangeStreamCursor ( $ changeStream );
624+ $ this ->forceChangeStreamResume ( );
669625
670626 $ changeStream ->next ();
671627 $ this ->assertFalse ($ changeStream ->valid ());
@@ -731,41 +687,6 @@ public function testInitialCursorIsNotClosed()
731687 $ this ->assertFalse ($ cursor ->isDead ());
732688 }
733689
734- /**
735- * Prose test 5: "ChangeStream will not attempt to resume after encountering
736- * error code 11601 (Interrupted), 136 (CappedPositionLost), or 237
737- * (CursorKilled) while executing a getMore command."
738- *
739- * @dataProvider provideNonResumableErrorCodes
740- */
741- public function testNonResumableErrorCodes ($ errorCode )
742- {
743- $ this ->configureFailPoint ([
744- 'configureFailPoint ' => 'failCommand ' ,
745- 'mode ' => ['times ' => 1 ],
746- 'data ' => ['failCommands ' => ['getMore ' ], 'errorCode ' => $ errorCode ],
747- ]);
748-
749- $ this ->insertDocument (['x ' => 1 ]);
750-
751- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), []);
752- $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
753- $ changeStream ->rewind ();
754-
755- $ this ->expectException (ServerException::class);
756- $ this ->expectExceptionCode ($ errorCode );
757- $ changeStream ->next ();
758- }
759-
760- public function provideNonResumableErrorCodes ()
761- {
762- return [
763- 'CappedPositionLost ' => [136 ],
764- 'CursorKilled ' => [237 ],
765- 'Interrupted ' => [11601 ],
766- ];
767- }
768-
769690 /**
770691 * Prose test 2: "ChangeStream will throw an exception if the server
771692 * response is missing the resume token (if wire version is < 8, this is a
@@ -979,7 +900,7 @@ public function testRewindExtractsResumeTokenAndNextResumes()
979900 ];
980901 $ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
981902
982- $ this ->killChangeStreamCursor ( $ changeStream );
903+ $ this ->forceChangeStreamResume ( );
983904
984905 $ this ->advanceCursorUntilValid ($ changeStream );
985906 $ this ->assertSame (1 , $ changeStream ->key ());
@@ -1218,7 +1139,7 @@ function (array $event) use (&$originalSession) {
12181139 );
12191140
12201141 $ changeStream ->rewind ();
1221- $ this ->killChangeStreamCursor ( $ changeStream );
1142+ $ this ->forceChangeStreamResume ( );
12221143
12231144 (new CommandObserver ())->observe (
12241145 function () use (&$ changeStream ) {
@@ -1402,7 +1323,7 @@ public function testOriginalReadPreferenceIsPreservedOnResume()
14021323
14031324 $ changeStream = $ operation ->execute ($ secondary );
14041325 $ previousCursorId = $ changeStream ->getCursorId ();
1405- $ this ->killChangeStreamCursor ( $ changeStream );
1326+ $ this ->forceChangeStreamResume ( );
14061327
14071328 $ changeStream ->next ();
14081329 $ this ->assertNotSame ($ previousCursorId , $ changeStream ->getCursorId ());
@@ -1543,7 +1464,7 @@ public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfter
15431464 $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ options );
15441465 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
15451466 $ changeStream ->rewind ();
1546- $ this ->killChangeStreamCursor ( $ changeStream );
1467+ $ this ->forceChangeStreamResume ( );
15471468
15481469 $ aggregateCommand = null ;
15491470
@@ -1594,7 +1515,7 @@ public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOp
15941515 $ this ->advanceCursorUntilValid ($ changeStream );
15951516 $ this ->assertTrue ($ changeStream ->valid ());
15961517
1597- $ this ->killChangeStreamCursor ( $ changeStream );
1518+ $ this ->forceChangeStreamResume ( );
15981519
15991520 $ aggregateCommand = null ;
16001521
@@ -1630,6 +1551,15 @@ function (array $event) use (&$commands) {
16301551 $ this ->assertEmpty ($ commands );
16311552 }
16321553
1554+ private function forceChangeStreamResume (array $ commands = ['getMore ' ], int $ errorCode = self ::NOT_MASTER )
1555+ {
1556+ $ this ->configureFailPoint ([
1557+ 'configureFailPoint ' => 'failCommand ' ,
1558+ 'mode ' => ['times ' => 1 ],
1559+ 'data ' => ['failCommands ' => $ commands , 'errorCode ' => $ errorCode ],
1560+ ]);
1561+ }
1562+
16331563 private function getPostBatchResumeTokenFromReply (stdClass $ reply )
16341564 {
16351565 $ this ->assertObjectHasAttribute ('cursor ' , $ reply );
@@ -1662,17 +1592,6 @@ private function isStartAtOperationTimeSupported()
16621592 return server_supports_feature ($ this ->getPrimaryServer (), self ::$ wireVersionForStartAtOperationTime );
16631593 }
16641594
1665- private function killChangeStreamCursor (ChangeStream $ changeStream )
1666- {
1667- $ command = [
1668- 'killCursors ' => $ this ->getCollectionName (),
1669- 'cursors ' => [ $ changeStream ->getCursorId () ],
1670- ];
1671-
1672- $ operation = new DatabaseCommand ($ this ->getDatabaseName (), $ command );
1673- $ operation ->execute ($ this ->getPrimaryServer ());
1674- }
1675-
16761595 private function advanceCursorUntilValid (Iterator $ iterator , $ limitOnShardedClusters = 5 )
16771596 {
16781597 if (! $ this ->isShardedCluster ()) {
0 commit comments