Skip to content

Commit bac5b9b

Browse files
committed
Merge pull request #468
2 parents 76ef271 + bff5e2e commit bac5b9b

File tree

6 files changed

+95
-8
lines changed

6 files changed

+95
-8
lines changed

docs/includes/apiargs-MongoDBCollection-method-watch-option.yaml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,15 @@ interface: phpmethod
2222
operation: ~
2323
optional: true
2424
---
25-
source:
26-
file: apiargs-MongoDBCollection-method-find-option.yaml
27-
ref: maxAwaitTimeMS
25+
arg_name: option
26+
name: maxAwaitTimeMS
27+
type: integer
28+
description: |
29+
Positive integer denoting the time limit in milliseconds for the server to
30+
block a getMore operation if no data is available.
31+
interface: phpmethod
32+
operation: ~
33+
optional: true
2834
---
2935
source:
3036
file: apiargs-MongoDBCollection-common-option.yaml

docs/tutorial/tailable-cursor.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ using :php:`IteratorIterator <iteratoriterator>`.
177177

178178
$iterator->next();
179179
}
180-
}
181180

182181
Much like the ``foreach`` example, this version on the consumer script will
183182
start by quickly printing all results in the capped collection; however, it will

src/Operation/Aggregate.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ public function __construct($databaseName, $collectionName, array $pipeline, arr
163163
throw InvalidArgumentException::invalidType('"hint" option', $options['hint'], 'string or array or object');
164164
}
165165

166+
if (isset($options['maxAwaitTimeMS']) && ! is_integer($options['maxAwaitTimeMS'])) {
167+
throw InvalidArgumentException::invalidType('"maxAwaitTimeMS" option', $options['maxAwaitTimeMS'], 'integer');
168+
}
169+
166170
if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
167171
throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
168172
}
@@ -277,6 +281,7 @@ private function createCommand(Server $server, $isCursorSupported)
277281
'aggregate' => $this->collectionName,
278282
'pipeline' => $this->pipeline,
279283
];
284+
$cmdOptions = [];
280285

281286
// Servers < 2.6 do not support any command options
282287
if ( ! $isCursorSupported) {
@@ -303,13 +308,17 @@ private function createCommand(Server $server, $isCursorSupported)
303308
$cmd['hint'] = is_array($this->options['hint']) ? (object) $this->options['hint'] : $this->options['hint'];
304309
}
305310

311+
if (isset($this->options['maxAwaitTimeMS'])) {
312+
$cmdOptions['maxAwaitTimeMS'] = $this->options['maxAwaitTimeMS'];
313+
}
314+
306315
if ($this->options['useCursor']) {
307316
$cmd['cursor'] = isset($this->options["batchSize"])
308317
? ['batchSize' => $this->options["batchSize"]]
309318
: new stdClass;
310319
}
311320

312-
return new Command($cmd);
321+
return new Command($cmd, $cmdOptions);
313322
}
314323

315324
/**

src/Operation/ChangeStream.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public function execute(Server $server)
147147

148148
private function createAggregateOptions()
149149
{
150-
$aggOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1]);
150+
$aggOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1]);
151151
if ( ! $aggOptions) {
152152
return [];
153153
}
@@ -174,6 +174,7 @@ private function createCommand()
174174
array_unshift($this->pipeline, $changeStreamArray);
175175

176176
$cmd = new Aggregate($this->databaseName, $this->collectionName, $this->pipeline, $this->createAggregateOptions());
177+
177178
return $cmd;
178179
}
179180

tests/Operation/ChangeStreamFunctionalTest.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,4 +237,49 @@ public function testConnectionException()
237237
]);
238238
$this->assertEquals($changeStreamResult->current(), $expectedResult);
239239
}
240+
241+
public function testMaxAwaitTimeMS()
242+
{
243+
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
244+
/* On average, an acknowledged write takes about 20 ms to appear in a
245+
* change stream on the server so we'll use a higher maxAwaitTimeMS to
246+
* ensure we see the write. */
247+
$maxAwaitTimeMS = 100;
248+
$changeStreamResult = $this->collection->watch([], ['maxAwaitTimeMS' => $maxAwaitTimeMS]);
249+
250+
/* The initial change stream is empty so we should expect a delay when
251+
* we call rewind, since it issues a getMore. Expect to wait at least
252+
* maxAwaitTimeMS, since no new documents should be inserted to wake up
253+
* the server's query thread. Also ensure we don't wait too long (server
254+
* default is one second). */
255+
$startTime = microtime(true);
256+
$changeStreamResult->rewind();
257+
$duration = microtime(true) - $startTime;
258+
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, $duration);
259+
$this->assertLessThan(0.5, $duration);
260+
261+
$this->assertFalse($changeStreamResult->valid());
262+
263+
/* Advancing again on a change stream will issue a getMore, so we should
264+
* expect a delay again. */
265+
$startTime = microtime(true);
266+
$changeStreamResult->next();
267+
$duration = microtime(true) - $startTime;
268+
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, $duration);
269+
$this->assertLessThan(0.5, $duration);
270+
271+
$this->assertFalse($changeStreamResult->valid());
272+
273+
/* After inserting a document, the change stream will not issue a
274+
* getMore so we should not expect a delay. */
275+
$result = $this->collection->insertOne(['_id' => 1]);
276+
$this->assertInstanceOf('MongoDB\InsertOneResult', $result);
277+
$this->assertSame(1, $result->getInsertedCount());
278+
279+
$startTime = microtime(true);
280+
$changeStreamResult->next();
281+
$duration = microtime(true) - $startTime;
282+
$this->assertLessThan($maxAwaitTimeMS * 0.001, $duration);
283+
$this->assertTrue($changeStreamResult->valid());
284+
}
240285
}

tests/Operation/FindFunctionalTest.php

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,18 +148,45 @@ public function testMaxAwaitTimeMS()
148148
// Insert documents into the capped collection.
149149
$bulkWrite = new BulkWrite(['ordered' => true]);
150150
$bulkWrite->insert(['_id' => 1]);
151+
$bulkWrite->insert(['_id' => 2]);
151152
$result = $this->manager->executeBulkWrite($this->getNamespace(), $bulkWrite);
152153

153154
$operation = new Find($databaseName, $cappedCollectionName, [], ['cursorType' => Find::TAILABLE_AWAIT, 'maxAwaitTimeMS' => $maxAwaitTimeMS]);
154155
$cursor = $operation->execute($this->getPrimaryServer());
155156
$it = new \IteratorIterator($cursor);
156157

157-
// Make sure we await results for no more than the maxAwaitTimeMS.
158+
/* The initial query includes the one and only document in its result
159+
* batch, so we should not expect a delay. */
160+
$startTime = microtime(true);
158161
$it->rewind();
162+
$duration = microtime(true) - $startTime;
163+
$this->assertLessThan($maxAwaitTimeMS * 0.001, $duration);
164+
165+
$this->assertTrue($it->valid());
166+
$this->assertSameDocument(['_id' => 1], $it->current());
167+
168+
/* Advancing again takes us to the last document of the result batch,
169+
* but still should not issue a getMore */
170+
$startTime = microtime(true);
159171
$it->next();
172+
$duration = microtime(true) - $startTime;
173+
$this->assertLessThan($maxAwaitTimeMS * 0.001, $duration);
174+
175+
$this->assertTrue($it->valid());
176+
$this->assertSameDocument(['_id' => 2], $it->current());
177+
178+
/* Now that we've reached the end of the initial result batch, advancing
179+
* again will issue a getMore. Expect to wait at least maxAwaitTimeMS,
180+
* since no new documents should be inserted to wake up the server's
181+
* query thread. Also ensure we don't wait too long (server default is
182+
* one second). */
160183
$startTime = microtime(true);
161184
$it->next();
162-
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, microtime(true) - $startTime);
185+
$duration = microtime(true) - $startTime;
186+
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, $duration);
187+
$this->assertLessThan(0.5, $duration);
188+
189+
$this->assertFalse($it->valid());
163190
}
164191

165192
/**

0 commit comments

Comments
 (0)