@@ -73,9 +73,6 @@ public function testNextResumesAfterCursorNotFound()
7373 $ this ->assertSameDocument ($ expectedResult , $ changeStream ->current ());
7474 }
7575
76- /**
77- * @todo test that rewind() also resumes once PHPLIB-322 is implemented
78- */
7976 public function testNextResumesAfterConnectionException ()
8077 {
8178 /* In order to trigger a dropped connection, we'll use a new client with
@@ -129,6 +126,56 @@ function(stdClass $command) use (&$commands) {
129126 $ this ->assertSame ($ expectedCommands , $ commands );
130127 }
131128
129+ public function testRewindResumesAfterConnectionException ()
130+ {
131+ /* In order to trigger a dropped connection, we'll use a new client with
132+ * a socket timeout that is less than the change stream's maxAwaitTimeMS
133+ * option. */
134+ $ manager = new Manager ($ this ->getUri (), ['socketTimeoutMS ' => 50 ]);
135+ $ primaryServer = $ manager ->selectServer (new ReadPreference (ReadPreference::RP_PRIMARY ));
136+
137+ $ operation = new Watch ($ manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], ['maxAwaitTimeMS ' => 100 ]);
138+ $ changeStream = $ operation ->execute ($ primaryServer );
139+
140+ $ commands = [];
141+
142+ try {
143+ (new CommandObserver )->observe (
144+ function () use ($ changeStream ) {
145+ $ changeStream ->rewind ();
146+ },
147+ function (stdClass $ command ) use (&$ commands ) {
148+ $ commands [] = key ((array ) $ command );
149+ }
150+ );
151+ $ this ->fail ('ConnectionTimeoutException was not thrown ' );
152+ } catch (ConnectionTimeoutException $ e ) {}
153+
154+ $ expectedCommands = [
155+ /* The initial aggregate command for change streams returns a cursor
156+ * envelope with an empty initial batch, since there are no changes
157+ * to report at the moment the change stream is created. Therefore,
158+ * we expect a getMore to be issued when we first advance the change
159+ * stream (with either rewind() or next()). */
160+ 'getMore ' ,
161+ /* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous
162+ * getMore command encounters a client socket timeout and leaves the
163+ * cursor open on the server. ChangeStream should catch this error
164+ * and resume by issuing a new aggregate command. */
165+ 'aggregate ' ,
166+ /* When ChangeStream resumes, it overwrites its original cursor with
167+ * the new cursor resulting from the last aggregate command. This
168+ * removes the last reference to the old cursor, which causes the
169+ * driver to kill it (via mongoc_cursor_destroy()). */
170+ 'killCursors ' ,
171+ /* Finally, ChangeStream will rewind the new cursor as the last step
172+ * of the resume process. This results in one last getMore. */
173+ 'getMore ' ,
174+ ];
175+
176+ $ this ->assertSame ($ expectedCommands , $ commands );
177+ }
178+
132179 public function testNoChangeAfterResumeBeforeInsert ()
133180 {
134181 $ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
@@ -260,7 +307,6 @@ public function testInitialCursorIsNotClosed()
260307
261308 /**
262309 * @expectedException MongoDB\Exception\ResumeTokenException
263- * @todo test that rewind() also attempts to extract the resume token once PHPLIB-322 is implemented
264310 */
265311 public function testNextCannotExtractResumeToken ()
266312 {
@@ -269,13 +315,26 @@ public function testNextCannotExtractResumeToken()
269315 $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , ['maxAwaitTimeMS ' => 100 ]);
270316 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
271317
272- $ changeStream ->rewind ();
273-
274318 $ this ->insertDocument (['x ' => 1 ]);
275319
276320 $ changeStream ->next ();
277321 }
278322
323+ /**
324+ * @expectedException MongoDB\Exception\ResumeTokenException
325+ */
326+ public function testRewindCannotExtractResumeToken ()
327+ {
328+ $ pipeline = [['$project ' => ['_id ' => 0 ]]];
329+
330+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , ['maxAwaitTimeMS ' => 100 ]);
331+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
332+
333+ $ this ->insertDocument (['x ' => 1 ]);
334+
335+ $ changeStream ->rewind ();
336+ }
337+
279338 public function testMaxAwaitTimeMS ()
280339 {
281340 /* On average, an acknowledged write takes about 20 ms to appear in a
@@ -320,6 +379,54 @@ public function testMaxAwaitTimeMS()
320379 $ this ->assertTrue ($ changeStream ->valid ());
321380 }
322381
382+ public function testResumeAfterKillThenNoOperations ()
383+ {
384+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], ['maxAwaitTimeMS ' => 100 ]);
385+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
386+
387+ $ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
388+
389+ $ this ->killChangeStreamCursor ($ changeStream );
390+
391+ $ changeStream ->rewind ();
392+ $ this ->assertFalse ($ changeStream ->valid ());
393+ $ this ->assertNull ($ changeStream ->current ());
394+ }
395+
396+ public function testResumeAfterKillThenOperation ()
397+ {
398+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], ['maxAwaitTimeMS ' => 100 ]);
399+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
400+
401+ $ this ->insertDocument (['_id ' => 1 , 'x ' => 'foo ' ]);
402+ $ this ->insertDocument (['_id ' => 2 , 'x ' => 'bar ' ]);
403+
404+ $ changeStream ->rewind ();
405+ $ this ->assertTrue ($ changeStream ->valid ());
406+ $ expectedResult = [
407+ '_id ' => $ changeStream ->current ()->_id ,
408+ 'operationType ' => 'insert ' ,
409+ 'fullDocument ' => ['_id ' => 1 , 'x ' => 'foo ' ],
410+ 'ns ' => ['db ' => $ this ->getDatabaseName (), 'coll ' => $ this ->getCollectionName ()],
411+ 'documentKey ' => ['_id ' => 1 ],
412+ ];
413+ $ this ->assertSameDocument ($ expectedResult , $ changeStream ->current ());
414+
415+ $ this ->killChangeStreamCursor ($ changeStream );
416+
417+ $ changeStream ->next ();
418+ $ this ->assertTrue ($ changeStream ->valid ());
419+
420+ $ expectedResult = [
421+ '_id ' => $ changeStream ->current ()->_id ,
422+ 'operationType ' => 'insert ' ,
423+ 'fullDocument ' => ['_id ' => 2 , 'x ' => 'bar ' ],
424+ 'ns ' => ['db ' => $ this ->getDatabaseName (), 'coll ' => $ this ->getCollectionName ()],
425+ 'documentKey ' => ['_id ' => 2 ],
426+ ];
427+ $ this ->assertSameDocument ($ expectedResult , $ changeStream ->current ());
428+ }
429+
323430 private function insertDocument ($ document )
324431 {
325432 $ insertOne = new InsertOne ($ this ->getDatabaseName (), $ this ->getCollectionName (), $ document );
0 commit comments