diff --git a/test/test_change_stream.py b/test/test_change_stream.py index e00aaa640..b71f5613d 100644 --- a/test/test_change_stream.py +++ b/test/test_change_stream.py @@ -1020,21 +1020,32 @@ class TestCollectionChangeStream(TestChangeStreamBase, APITestsMixin, ProseSpecT self.assertEqual(change["ns"]["coll"], self.watched_collection().name) self.assertEqual(change["fullDocument"], raw_doc) + @client_context.require_version_min(4, 0) # Needed for start_at_operation_time. def test_uuid_representations(self): """Test with uuid document _ids and different uuid_representation.""" + optime = self.db.command("ping")["operationTime"] + self.watched_collection().insert_many( + [ + {"_id": Binary(uuid.uuid4().bytes, id_subtype)} + for id_subtype in (STANDARD, PYTHON_LEGACY) + ] + ) for uuid_representation in ALL_UUID_REPRESENTATIONS: - for id_subtype in (STANDARD, PYTHON_LEGACY): - options = self.watched_collection().codec_options.with_options( - uuid_representation=uuid_representation - ) - coll = self.watched_collection(codec_options=options) - with coll.watch() as change_stream: - coll.insert_one({"_id": Binary(uuid.uuid4().bytes, id_subtype)}) - _ = change_stream.next() - resume_token = change_stream.resume_token + options = self.watched_collection().codec_options.with_options( + uuid_representation=uuid_representation + ) + coll = self.watched_collection(codec_options=options) + with coll.watch(start_at_operation_time=optime, max_await_time_ms=1) as change_stream: + _ = change_stream.next() + resume_token_1 = change_stream.resume_token + _ = change_stream.next() + resume_token_2 = change_stream.resume_token - # Should not error. - coll.watch(resume_after=resume_token) + # Should not error. + with coll.watch(resume_after=resume_token_1): + pass + with coll.watch(resume_after=resume_token_2): + pass def test_document_id_order(self): """Test with document _ids that need their order preserved.""" @@ -1053,7 +1064,8 @@ class TestCollectionChangeStream(TestChangeStreamBase, APITestsMixin, ProseSpecT # The resume token is always a document. self.assertIsInstance(resume_token, document_class) # Should not error. - coll.watch(resume_after=resume_token) + with coll.watch(resume_after=resume_token): + pass coll.delete_many({}) def test_read_concern(self): diff --git a/test/test_custom_types.py b/test/test_custom_types.py index 7daf83244..c30c62b1b 100644 --- a/test/test_custom_types.py +++ b/test/test_custom_types.py @@ -764,9 +764,7 @@ class TestGridFileCustomType(IntegrationTest): db.fs, _id=5, filename="my_file", - contentType="text/html", chunkSize=1000, - aliases=["foo"], metadata={"foo": "red", "bar": "blue"}, bar=3, baz="hello", @@ -780,13 +778,10 @@ class TestGridFileCustomType(IntegrationTest): self.assertEqual("my_file", two.filename) self.assertEqual(5, two._id) self.assertEqual(11, two.length) - self.assertEqual("text/html", two.content_type) self.assertEqual(1000, two.chunk_size) self.assertTrue(isinstance(two.upload_date, datetime.datetime)) - self.assertEqual(["foo"], two.aliases) self.assertEqual({"foo": "red", "bar": "blue"}, two.metadata) self.assertEqual(3, two.bar) - self.assertEqual(None, two.md5) for attr in [ "_id", @@ -805,7 +800,9 @@ class TestGridFileCustomType(IntegrationTest): class ChangeStreamsWCustomTypesTestMixin: @no_type_check def change_stream(self, *args, **kwargs): - return self.watched_target.watch(*args, **kwargs) + stream = self.watched_target.watch(*args, max_await_time_ms=1, **kwargs) + self.addCleanup(stream.close) + return stream @no_type_check def insert_and_check(self, change_stream, insert_doc, expected_doc): diff --git a/test/test_examples.py b/test/test_examples.py index e003d8459..02b178586 100644 --- a/test/test_examples.py +++ b/test/test_examples.py @@ -747,6 +747,7 @@ class TestSampleShellCommands(IntegrationTest): done = False def insert_docs(): + nonlocal done while not done: db.inventory.insert_one({"username": "alice"}) db.inventory.delete_one({"username": "alice"}) @@ -760,17 +761,20 @@ class TestSampleShellCommands(IntegrationTest): cursor = db.inventory.watch() next(cursor) # End Changestream Example 1 + cursor.close() # Start Changestream Example 2 cursor = db.inventory.watch(full_document="updateLookup") next(cursor) # End Changestream Example 2 + cursor.close() # Start Changestream Example 3 resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor) # End Changestream Example 3 + cursor.close() # Start Changestream Example 4 pipeline = [ @@ -780,6 +784,7 @@ class TestSampleShellCommands(IntegrationTest): cursor = db.inventory.watch(pipeline=pipeline) next(cursor) # End Changestream Example 4 + cursor.close() finally: done = True t.join() diff --git a/test/test_sdam_monitoring_spec.py b/test/test_sdam_monitoring_spec.py index 63281c987..8e0a3cbbb 100644 --- a/test/test_sdam_monitoring_spec.py +++ b/test/test_sdam_monitoring_spec.py @@ -179,7 +179,7 @@ class TestAllScenarios(IntegrationTest): def create_test(scenario_def): def run_scenario(self): - with client_knobs(events_queue_frequency=0.1): + with client_knobs(events_queue_frequency=0.05, min_heartbeat_interval=0.05): _run_scenario(self) def _run_scenario(self): @@ -216,7 +216,7 @@ def create_test(scenario_def): ) # Wait some time to catch possible lagging extra events. - time.sleep(0.5) + wait_until(lambda: topology._events.empty(), "publish lagging events") i = 0 while i < expected_len: @@ -273,7 +273,9 @@ class TestSdamMonitoring(IntegrationTest): def setUpClass(cls): super().setUpClass() # Speed up the tests by decreasing the event publish frequency. - cls.knobs = client_knobs(events_queue_frequency=0.1) + cls.knobs = client_knobs( + events_queue_frequency=0.1, heartbeat_frequency=0.1, min_heartbeat_interval=0.1 + ) cls.knobs.enable() cls.listener = ServerAndTopologyEventListener() retry_writes = client_context.supports_transactions()