Skip to content

Commit 43e742a

Browse files
committed
PHPLIB-276: Add maxAwaitTimeMS support for change streams
1 parent 76ef271 commit 43e742a

File tree

4 files changed

+33
-3
lines changed

4 files changed

+33
-3
lines changed

src/Operation/Aggregate.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ public function __construct($databaseName, $collectionName, array $pipeline, arr
163163
throw InvalidArgumentException::invalidType('"hint" option', $options['hint'], 'string or array or object');
164164
}
165165

166+
if (isset($options['maxAwaitTimeMS']) && ! is_integer($options['maxAwaitTimeMS'])) {
167+
throw InvalidArgumentException::invalidType('"maxAwaitTimeMS" option', $options['maxAwaitTimeMS'], 'integer');
168+
}
169+
166170
if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
167171
throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
168172
}
@@ -277,6 +281,7 @@ private function createCommand(Server $server, $isCursorSupported)
277281
'aggregate' => $this->collectionName,
278282
'pipeline' => $this->pipeline,
279283
];
284+
$cmdOptions = [];
280285

281286
// Servers < 2.6 do not support any command options
282287
if ( ! $isCursorSupported) {
@@ -303,13 +308,17 @@ private function createCommand(Server $server, $isCursorSupported)
303308
$cmd['hint'] = is_array($this->options['hint']) ? (object) $this->options['hint'] : $this->options['hint'];
304309
}
305310

311+
if (isset($this->options['maxAwaitTimeMS'])) {
312+
$cmdOptions['maxAwaitTimeMS'] = $this->options['maxAwaitTimeMS'];
313+
}
314+
306315
if ($this->options['useCursor']) {
307316
$cmd['cursor'] = isset($this->options["batchSize"])
308317
? ['batchSize' => $this->options["batchSize"]]
309318
: new stdClass;
310319
}
311320

312-
return new Command($cmd);
321+
return new Command($cmd, $cmdOptions);
313322
}
314323

315324
/**

src/Operation/ChangeStream.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public function execute(Server $server)
147147

148148
private function createAggregateOptions()
149149
{
150-
$aggOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1]);
150+
$aggOptions = array_intersect_key($this->options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1]);
151151
if ( ! $aggOptions) {
152152
return [];
153153
}
@@ -174,6 +174,7 @@ private function createCommand()
174174
array_unshift($this->pipeline, $changeStreamArray);
175175

176176
$cmd = new Aggregate($this->databaseName, $this->collectionName, $this->pipeline, $this->createAggregateOptions());
177+
177178
return $cmd;
178179
}
179180

tests/Operation/ChangeStreamFunctionalTest.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,4 +237,21 @@ public function testConnectionException()
237237
]);
238238
$this->assertEquals($changeStreamResult->current(), $expectedResult);
239239
}
240+
241+
public function testMaxAwaitTimeMS()
242+
{
243+
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
244+
$maxAwaitTimeMS = 10;
245+
$changeStreamResult = $this->collection->watch([], ['maxAwaitTimeMS' => $maxAwaitTimeMS]);
246+
247+
/* Make sure we await results for at least maxAwaitTimeMS, since no new
248+
* documents should be inserted to wake up the server's command thread.
249+
* Also ensure that we don't wait too long (server default is one
250+
* second). */
251+
$startTime = microtime(true);
252+
$changeStreamResult->rewind();
253+
$this->assertGreaterThanOrEqual($maxAwaitTimeMS * 0.001, microtime(true) - $startTime);
254+
$this->assertLessThan(0.5, microtime(true) - $startTime);
255+
}
256+
240257
}

tests/Operation/FindFunctionalTest.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,10 @@ public function testMaxAwaitTimeMS()
154154
$cursor = $operation->execute($this->getPrimaryServer());
155155
$it = new \IteratorIterator($cursor);
156156

157-
// Make sure we await results for no more than the maxAwaitTimeMS.
157+
/* Make sure we await results for at least maxAwaitTimeMS, since no new
158+
* documents should be inserted to wake up the server's query thread.
159+
* Also ensure that we don't wait too long (server default is one
160+
* second). */
158161
$it->rewind();
159162
$it->next();
160163
$startTime = microtime(true);

0 commit comments

Comments
 (0)