PYTHON-4633 Speed up TestCollectionChangeStream.test_uuid_representations (#1775)

This commit is contained in:
Shane Harvey 2024-08-07 16:17:48 -07:00 committed by GitHub
parent dcaa42bfa4
commit 13cf110f01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 37 additions and 21 deletions

View File

@ -1020,21 +1020,32 @@ class TestCollectionChangeStream(TestChangeStreamBase, APITestsMixin, ProseSpecT
self.assertEqual(change["ns"]["coll"], self.watched_collection().name)
self.assertEqual(change["fullDocument"], raw_doc)
@client_context.require_version_min(4, 0) # Needed for start_at_operation_time.
def test_uuid_representations(self):
"""Test with uuid document _ids and different uuid_representation."""
optime = self.db.command("ping")["operationTime"]
self.watched_collection().insert_many(
[
{"_id": Binary(uuid.uuid4().bytes, id_subtype)}
for id_subtype in (STANDARD, PYTHON_LEGACY)
]
)
for uuid_representation in ALL_UUID_REPRESENTATIONS:
for id_subtype in (STANDARD, PYTHON_LEGACY):
options = self.watched_collection().codec_options.with_options(
uuid_representation=uuid_representation
)
coll = self.watched_collection(codec_options=options)
with coll.watch() as change_stream:
coll.insert_one({"_id": Binary(uuid.uuid4().bytes, id_subtype)})
_ = change_stream.next()
resume_token = change_stream.resume_token
options = self.watched_collection().codec_options.with_options(
uuid_representation=uuid_representation
)
coll = self.watched_collection(codec_options=options)
with coll.watch(start_at_operation_time=optime, max_await_time_ms=1) as change_stream:
_ = change_stream.next()
resume_token_1 = change_stream.resume_token
_ = change_stream.next()
resume_token_2 = change_stream.resume_token
# Should not error.
coll.watch(resume_after=resume_token)
# Should not error.
with coll.watch(resume_after=resume_token_1):
pass
with coll.watch(resume_after=resume_token_2):
pass
def test_document_id_order(self):
"""Test with document _ids that need their order preserved."""
@ -1053,7 +1064,8 @@ class TestCollectionChangeStream(TestChangeStreamBase, APITestsMixin, ProseSpecT
# The resume token is always a document.
self.assertIsInstance(resume_token, document_class)
# Should not error.
coll.watch(resume_after=resume_token)
with coll.watch(resume_after=resume_token):
pass
coll.delete_many({})
def test_read_concern(self):

View File

@ -764,9 +764,7 @@ class TestGridFileCustomType(IntegrationTest):
db.fs,
_id=5,
filename="my_file",
contentType="text/html",
chunkSize=1000,
aliases=["foo"],
metadata={"foo": "red", "bar": "blue"},
bar=3,
baz="hello",
@ -780,13 +778,10 @@ class TestGridFileCustomType(IntegrationTest):
self.assertEqual("my_file", two.filename)
self.assertEqual(5, two._id)
self.assertEqual(11, two.length)
self.assertEqual("text/html", two.content_type)
self.assertEqual(1000, two.chunk_size)
self.assertTrue(isinstance(two.upload_date, datetime.datetime))
self.assertEqual(["foo"], two.aliases)
self.assertEqual({"foo": "red", "bar": "blue"}, two.metadata)
self.assertEqual(3, two.bar)
self.assertEqual(None, two.md5)
for attr in [
"_id",
@ -805,7 +800,9 @@ class TestGridFileCustomType(IntegrationTest):
class ChangeStreamsWCustomTypesTestMixin:
@no_type_check
def change_stream(self, *args, **kwargs):
return self.watched_target.watch(*args, **kwargs)
stream = self.watched_target.watch(*args, max_await_time_ms=1, **kwargs)
self.addCleanup(stream.close)
return stream
@no_type_check
def insert_and_check(self, change_stream, insert_doc, expected_doc):

View File

@ -747,6 +747,7 @@ class TestSampleShellCommands(IntegrationTest):
done = False
def insert_docs():
nonlocal done
while not done:
db.inventory.insert_one({"username": "alice"})
db.inventory.delete_one({"username": "alice"})
@ -760,17 +761,20 @@ class TestSampleShellCommands(IntegrationTest):
cursor = db.inventory.watch()
next(cursor)
# End Changestream Example 1
cursor.close()
# Start Changestream Example 2
cursor = db.inventory.watch(full_document="updateLookup")
next(cursor)
# End Changestream Example 2
cursor.close()
# Start Changestream Example 3
resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
next(cursor)
# End Changestream Example 3
cursor.close()
# Start Changestream Example 4
pipeline = [
@ -780,6 +784,7 @@ class TestSampleShellCommands(IntegrationTest):
cursor = db.inventory.watch(pipeline=pipeline)
next(cursor)
# End Changestream Example 4
cursor.close()
finally:
done = True
t.join()

View File

@ -179,7 +179,7 @@ class TestAllScenarios(IntegrationTest):
def create_test(scenario_def):
def run_scenario(self):
with client_knobs(events_queue_frequency=0.1):
with client_knobs(events_queue_frequency=0.05, min_heartbeat_interval=0.05):
_run_scenario(self)
def _run_scenario(self):
@ -216,7 +216,7 @@ def create_test(scenario_def):
)
# Wait some time to catch possible lagging extra events.
time.sleep(0.5)
wait_until(lambda: topology._events.empty(), "publish lagging events")
i = 0
while i < expected_len:
@ -273,7 +273,9 @@ class TestSdamMonitoring(IntegrationTest):
def setUpClass(cls):
super().setUpClass()
# Speed up the tests by decreasing the event publish frequency.
cls.knobs = client_knobs(events_queue_frequency=0.1)
cls.knobs = client_knobs(
events_queue_frequency=0.1, heartbeat_frequency=0.1, min_heartbeat_interval=0.1
)
cls.knobs.enable()
cls.listener = ServerAndTopologyEventListener()
retry_writes = client_context.supports_transactions()