diff --git a/test/test_change_stream.py b/test/test_change_stream.py index 1f4db90bb..4d24eee84 100644 --- a/test/test_change_stream.py +++ b/test/test_change_stream.py @@ -135,39 +135,36 @@ class TestChangeStream(IntegrationTest): with self.assertRaises(StopIteration): next(change_stream) + def _test_next_blocks(self, change_stream): + inserted_doc = {'_id': ObjectId()} + changes = [] + t = threading.Thread( + target=lambda: changes.append(change_stream.next())) + t.start() + # Sleep for a bit to prove that the call to next() blocks. + time.sleep(1) + self.assertTrue(t.is_alive()) + self.assertFalse(changes) + self.coll.insert_one(inserted_doc) + # Join with large timeout to give the server time to return the change, + # in particular for shard clusters. + t.join(30) + self.assertFalse(t.is_alive()) + self.assertEqual(1, len(changes)) + self.assertEqual(changes[0]['operationType'], 'insert') + self.assertEqual(changes[0]['fullDocument'], inserted_doc) + def test_next_blocks(self): """Test that next blocks until a change is readable""" - inserted_doc = {'_id': ObjectId()} # Use a short await time to speed up the test. with self.coll.watch(max_await_time_ms=250) as change_stream: - changes = [] - t = threading.Thread( - target=lambda: changes.append(change_stream.next())) - t.start() - self.coll.insert_one(inserted_doc) - time.sleep(1) - t.join(3) - self.assertFalse(t.is_alive()) - self.assertEqual(1, len(changes)) - self.assertEqual(changes[0]['operationType'], 'insert') - self.assertEqual(changes[0]['fullDocument'], inserted_doc) + self._test_next_blocks(change_stream) def test_aggregate_cursor_blocks(self): """Test that an aggregate cursor blocks until a change is readable.""" - inserted_doc = {'_id': ObjectId()} with self.coll.aggregate([{'$changeStream': {}}], maxAwaitTimeMS=250) as change_stream: - changes = [] - t = threading.Thread( - target=lambda: changes.append(change_stream.next())) - t.start() - self.coll.insert_one(inserted_doc) - time.sleep(1) - t.join(3) - self.assertFalse(t.is_alive()) - self.assertEqual(1, len(changes)) - self.assertEqual(changes[0]['operationType'], 'insert') - self.assertEqual(changes[0]['fullDocument'], inserted_doc) + self._test_next_blocks(change_stream) def test_concurrent_close(self): """Ensure a ChangeStream can be closed from another thread.""" @@ -188,7 +185,7 @@ class TestChangeStream(IntegrationTest): """ChangeStream must continuously track the last seen resumeToken.""" with self.coll.watch() as change_stream: self.assertIsNone(change_stream._resume_token) - for i in range(10): + for i in range(3): self.coll.insert_one({}) change = next(change_stream) self.assertEqual(change['_id'], change_stream._resume_token)