PYTHON-1582 Fix TestChangeStream.test_next_blocks
This commit is contained in:
parent
a15c8283dc
commit
adbcc2d6d4
@ -135,39 +135,36 @@ class TestChangeStream(IntegrationTest):
|
||||
with self.assertRaises(StopIteration):
|
||||
next(change_stream)
|
||||
|
||||
def _test_next_blocks(self, change_stream):
|
||||
inserted_doc = {'_id': ObjectId()}
|
||||
changes = []
|
||||
t = threading.Thread(
|
||||
target=lambda: changes.append(change_stream.next()))
|
||||
t.start()
|
||||
# Sleep for a bit to prove that the call to next() blocks.
|
||||
time.sleep(1)
|
||||
self.assertTrue(t.is_alive())
|
||||
self.assertFalse(changes)
|
||||
self.coll.insert_one(inserted_doc)
|
||||
# Join with large timeout to give the server time to return the change,
|
||||
# in particular for shard clusters.
|
||||
t.join(30)
|
||||
self.assertFalse(t.is_alive())
|
||||
self.assertEqual(1, len(changes))
|
||||
self.assertEqual(changes[0]['operationType'], 'insert')
|
||||
self.assertEqual(changes[0]['fullDocument'], inserted_doc)
|
||||
|
||||
def test_next_blocks(self):
|
||||
"""Test that next blocks until a change is readable"""
|
||||
inserted_doc = {'_id': ObjectId()}
|
||||
# Use a short await time to speed up the test.
|
||||
with self.coll.watch(max_await_time_ms=250) as change_stream:
|
||||
changes = []
|
||||
t = threading.Thread(
|
||||
target=lambda: changes.append(change_stream.next()))
|
||||
t.start()
|
||||
self.coll.insert_one(inserted_doc)
|
||||
time.sleep(1)
|
||||
t.join(3)
|
||||
self.assertFalse(t.is_alive())
|
||||
self.assertEqual(1, len(changes))
|
||||
self.assertEqual(changes[0]['operationType'], 'insert')
|
||||
self.assertEqual(changes[0]['fullDocument'], inserted_doc)
|
||||
self._test_next_blocks(change_stream)
|
||||
|
||||
def test_aggregate_cursor_blocks(self):
|
||||
"""Test that an aggregate cursor blocks until a change is readable."""
|
||||
inserted_doc = {'_id': ObjectId()}
|
||||
with self.coll.aggregate([{'$changeStream': {}}],
|
||||
maxAwaitTimeMS=250) as change_stream:
|
||||
changes = []
|
||||
t = threading.Thread(
|
||||
target=lambda: changes.append(change_stream.next()))
|
||||
t.start()
|
||||
self.coll.insert_one(inserted_doc)
|
||||
time.sleep(1)
|
||||
t.join(3)
|
||||
self.assertFalse(t.is_alive())
|
||||
self.assertEqual(1, len(changes))
|
||||
self.assertEqual(changes[0]['operationType'], 'insert')
|
||||
self.assertEqual(changes[0]['fullDocument'], inserted_doc)
|
||||
self._test_next_blocks(change_stream)
|
||||
|
||||
def test_concurrent_close(self):
|
||||
"""Ensure a ChangeStream can be closed from another thread."""
|
||||
@ -188,7 +185,7 @@ class TestChangeStream(IntegrationTest):
|
||||
"""ChangeStream must continuously track the last seen resumeToken."""
|
||||
with self.coll.watch() as change_stream:
|
||||
self.assertIsNone(change_stream._resume_token)
|
||||
for i in range(10):
|
||||
for i in range(3):
|
||||
self.coll.insert_one({})
|
||||
change = next(change_stream)
|
||||
self.assertEqual(change['_id'], change_stream._resume_token)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user