PYTHON-5413 Handle flaky tests (#2395)

This commit is contained in:
Steven Silvester 2025-07-01 15:42:58 -05:00 committed by GitHub
parent 578c6c2ad2
commit 0b2900d162
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 305 additions and 119 deletions

View File

@ -145,6 +145,7 @@ functions:
- MONGODB_API_VERSION - MONGODB_API_VERSION
- REQUIRE_API_VERSION - REQUIRE_API_VERSION
- DEBUG_LOG - DEBUG_LOG
- DISABLE_FLAKY
- ORCHESTRATION_FILE - ORCHESTRATION_FILE
- OCSP_SERVER_TYPE - OCSP_SERVER_TYPE
- VERSION - VERSION

View File

@ -1084,6 +1084,7 @@ def create_run_tests_func():
"MONGODB_API_VERSION", "MONGODB_API_VERSION",
"REQUIRE_API_VERSION", "REQUIRE_API_VERSION",
"DEBUG_LOG", "DEBUG_LOG",
"DISABLE_FLAKY",
"ORCHESTRATION_FILE", "ORCHESTRATION_FILE",
"OCSP_SERVER_TYPE", "OCSP_SERVER_TYPE",
"VERSION", "VERSION",

View File

@ -162,10 +162,6 @@ def handle_test_env() -> None:
write_env("PIP_PREFER_BINARY") # Prefer binary dists by default. write_env("PIP_PREFER_BINARY") # Prefer binary dists by default.
write_env("UV_FROZEN") # Do not modify lock files. write_env("UV_FROZEN") # Do not modify lock files.
# Skip CSOT tests on non-linux platforms.
if PLATFORM != "linux":
write_env("SKIP_CSOT_TESTS")
# Set an environment variable for the test name and sub test name. # Set an environment variable for the test name and sub test name.
write_env(f"TEST_{test_name.upper()}") write_env(f"TEST_{test_name.upper()}")
write_env("TEST_NAME", test_name) write_env("TEST_NAME", test_name)

View File

@ -404,6 +404,15 @@ If you are running one of the `no-responder` tests, omit the `run-server` step.
- Regenerate the test variants and tasks using `pre-commit run --all-files generate-config`. - Regenerate the test variants and tasks using `pre-commit run --all-files generate-config`.
- Make sure to add instructions for running the test suite to `CONTRIBUTING.md`. - Make sure to add instructions for running the test suite to `CONTRIBUTING.md`.
## Handling flaky tests
We have a custom `flaky` decorator in [test/asynchronous/utils.py](test/asynchronous/utils.py) that can be used for
tests that are `flaky`. By default the decorator only applies when not running on CPython on Linux, since other
runtimes tend to have more variation. When using the `flaky` decorator, open a corresponding ticket and
a use the ticket number as the "reason" parameter to the decorator, e.g. `@flaky(reason="PYTHON-1234")`.
When running tests locally (not in CI), the `flaky` decorator will be disabled unless `ENABLE_FLAKY` is set.
To disable the `flaky` decorator in CI, you can use `evergreen patch --param DISABLE_FLAKY=1`.
## Specification Tests ## Specification Tests
The MongoDB [specifications repository](https://github.com/mongodb/specifications) The MongoDB [specifications repository](https://github.com/mongodb/specifications)

View File

@ -32,6 +32,7 @@ import unittest
import warnings import warnings
from asyncio import iscoroutinefunction from asyncio import iscoroutinefunction
from pymongo.errors import AutoReconnect
from pymongo.synchronous.uri_parser import parse_uri from pymongo.synchronous.uri_parser import parse_uri
try: try:
@ -1219,12 +1220,17 @@ def teardown():
c = client_context.client c = client_context.client
if c: if c:
if not client_context.is_data_lake: if not client_context.is_data_lake:
c.drop_database("pymongo-pooling-tests") try:
c.drop_database("pymongo_test") c.drop_database("pymongo-pooling-tests")
c.drop_database("pymongo_test1") c.drop_database("pymongo_test")
c.drop_database("pymongo_test2") c.drop_database("pymongo_test1")
c.drop_database("pymongo_test_mike") c.drop_database("pymongo_test2")
c.drop_database("pymongo_test_bernie") c.drop_database("pymongo_test_mike")
c.drop_database("pymongo_test_bernie")
except AutoReconnect:
# PYTHON-4982
if sys.implementation.name.lower() != "pypy":
raise
c.close() c.close()
print_running_clients() print_running_clients()

View File

@ -33,6 +33,7 @@ import warnings
from asyncio import iscoroutinefunction from asyncio import iscoroutinefunction
from pymongo.asynchronous.uri_parser import parse_uri from pymongo.asynchronous.uri_parser import parse_uri
from pymongo.errors import AutoReconnect
try: try:
import ipaddress import ipaddress
@ -1235,12 +1236,17 @@ async def async_teardown():
c = async_client_context.client c = async_client_context.client
if c: if c:
if not async_client_context.is_data_lake: if not async_client_context.is_data_lake:
await c.drop_database("pymongo-pooling-tests") try:
await c.drop_database("pymongo_test") await c.drop_database("pymongo-pooling-tests")
await c.drop_database("pymongo_test1") await c.drop_database("pymongo_test")
await c.drop_database("pymongo_test2") await c.drop_database("pymongo_test1")
await c.drop_database("pymongo_test_mike") await c.drop_database("pymongo_test2")
await c.drop_database("pymongo_test_bernie") await c.drop_database("pymongo_test_mike")
await c.drop_database("pymongo_test_bernie")
except AutoReconnect:
# PYTHON-4982
if sys.implementation.name.lower() != "pypy":
raise
await c.close() await c.close()
print_running_clients() print_running_clients()

View File

@ -25,6 +25,7 @@ from test.asynchronous import (
async_client_context, async_client_context,
unittest, unittest,
) )
from test.asynchronous.utils import flaky
from test.utils_shared import ( from test.utils_shared import (
OvertCommandListener, OvertCommandListener,
) )
@ -619,8 +620,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites # https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
class TestClientBulkWriteCSOT(AsyncIntegrationTest): class TestClientBulkWriteCSOT(AsyncIntegrationTest):
async def asyncSetUp(self): async def asyncSetUp(self):
if os.environ.get("SKIP_CSOT_TESTS", ""):
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
await super().asyncSetUp() await super().asyncSetUp()
self.max_write_batch_size = await async_client_context.max_write_batch_size self.max_write_batch_size = await async_client_context.max_write_batch_size
self.max_bson_object_size = await async_client_context.max_bson_size self.max_bson_object_size = await async_client_context.max_bson_size
@ -628,7 +627,10 @@ class TestClientBulkWriteCSOT(AsyncIntegrationTest):
@async_client_context.require_version_min(8, 0, 0, -24) @async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_failCommand_fail_point @async_client_context.require_failCommand_fail_point
@flaky(reason="PYTHON-5290", max_runs=3, affects_cpython_linux=True)
async def test_timeout_in_multi_batch_bulk_write(self): async def test_timeout_in_multi_batch_bulk_write(self):
if sys.platform != "linux" and "CI" in os.environ:
self.skipTest("PYTHON-3522 CSOT test runs too slow on Windows and MacOS")
_OVERHEAD = 500 _OVERHEAD = 500
internal_client = await self.async_rs_or_single_client(timeoutMS=None) internal_client = await self.async_rs_or_single_client(timeoutMS=None)

View File

@ -23,6 +23,7 @@ sys.path[0:0] = [""]
from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
from test.asynchronous.unified_format import generate_test_classes from test.asynchronous.unified_format import generate_test_classes
from test.asynchronous.utils import flaky
import pymongo import pymongo
from pymongo import _csot from pymongo import _csot
@ -43,9 +44,8 @@ globals().update(generate_test_classes(TEST_PATH, module=__name__))
class TestCSOT(AsyncIntegrationTest): class TestCSOT(AsyncIntegrationTest):
RUN_ON_LOAD_BALANCER = True RUN_ON_LOAD_BALANCER = True
@flaky(reason="PYTHON-3522")
async def test_timeout_nested(self): async def test_timeout_nested(self):
if os.environ.get("SKIP_CSOT_TESTS", ""):
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
coll = self.db.coll coll = self.db.coll
self.assertEqual(_csot.get_timeout(), None) self.assertEqual(_csot.get_timeout(), None)
self.assertEqual(_csot.get_deadline(), float("inf")) self.assertEqual(_csot.get_deadline(), float("inf"))
@ -82,9 +82,8 @@ class TestCSOT(AsyncIntegrationTest):
self.assertEqual(_csot.get_rtt(), 0.0) self.assertEqual(_csot.get_rtt(), 0.0)
@async_client_context.require_change_streams @async_client_context.require_change_streams
@flaky(reason="PYTHON-3522")
async def test_change_stream_can_resume_after_timeouts(self): async def test_change_stream_can_resume_after_timeouts(self):
if os.environ.get("SKIP_CSOT_TESTS", ""):
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
coll = self.db.test coll = self.db.test
await coll.insert_one({}) await coll.insert_one({})
async with await coll.watch() as stream: async with await coll.watch() as stream:

View File

@ -31,6 +31,7 @@ import pymongo
sys.path[0:0] = [""] sys.path[0:0] = [""]
from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
from test.asynchronous.utils import flaky
from test.utils_shared import ( from test.utils_shared import (
AllowListEventListener, AllowListEventListener,
EventListener, EventListener,
@ -1406,9 +1407,8 @@ class TestCursor(AsyncIntegrationTest):
docs = await c.to_list(3) docs = await c.to_list(3)
self.assertEqual(len(docs), 2) self.assertEqual(len(docs), 2)
@flaky(reason="PYTHON-3522")
async def test_to_list_csot_applied(self): async def test_to_list_csot_applied(self):
if os.environ.get("SKIP_CSOT_TESTS", ""):
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
client = await self.async_single_client(timeoutMS=500, w=1) client = await self.async_single_client(timeoutMS=500, w=1)
coll = client.pymongo.test coll = client.pymongo.test
# Initialize the client with a larger timeout to help make test less flakey # Initialize the client with a larger timeout to help make test less flakey
@ -1449,9 +1449,8 @@ class TestCursor(AsyncIntegrationTest):
self.assertEqual(len(await result.to_list(1)), 1) self.assertEqual(len(await result.to_list(1)), 1)
@async_client_context.require_failCommand_blockConnection @async_client_context.require_failCommand_blockConnection
@flaky(reason="PYTHON-3522")
async def test_command_cursor_to_list_csot_applied(self): async def test_command_cursor_to_list_csot_applied(self):
if os.environ.get("SKIP_CSOT_TESTS", ""):
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
client = await self.async_single_client(timeoutMS=500, w=1) client = await self.async_single_client(timeoutMS=500, w=1)
coll = client.pymongo.test coll = client.pymongo.test
# Initialize the client with a larger timeout to help make test less flakey # Initialize the client with a larger timeout to help make test less flakey

View File

@ -32,6 +32,7 @@ import uuid
import warnings import warnings
from test.asynchronous import AsyncIntegrationTest, AsyncPyMongoTestCase, async_client_context from test.asynchronous import AsyncIntegrationTest, AsyncPyMongoTestCase, async_client_context
from test.asynchronous.test_bulk import AsyncBulkTestBase from test.asynchronous.test_bulk import AsyncBulkTestBase
from test.asynchronous.utils import flaky
from test.asynchronous.utils_spec_runner import AsyncSpecRunner, AsyncSpecTestCreator from test.asynchronous.utils_spec_runner import AsyncSpecRunner, AsyncSpecTestCreator
from threading import Thread from threading import Thread
from typing import Any, Dict, Mapping, Optional from typing import Any, Dict, Mapping, Optional
@ -3247,6 +3248,7 @@ class TestKmsRetryProse(AsyncEncryptionIntegrationTest):
class TestAutomaticDecryptionKeys(AsyncEncryptionIntegrationTest): class TestAutomaticDecryptionKeys(AsyncEncryptionIntegrationTest):
@async_client_context.require_no_standalone @async_client_context.require_no_standalone
@async_client_context.require_version_min(7, 0, -1) @async_client_context.require_version_min(7, 0, -1)
@flaky(reason="PYTHON-4982")
async def asyncSetUp(self): async def asyncSetUp(self):
await super().asyncSetUp() await super().asyncSetUp()
self.key1_document = json_data("etc", "data", "keys", "key1-document.json") self.key1_document = json_data("etc", "data", "keys", "key1-document.json")
@ -3489,6 +3491,8 @@ class TestNoSessionsSupport(AsyncEncryptionIntegrationTest):
self.assertNotIn("lsid", self.listener.started_events[1].command) self.assertNotIn("lsid", self.listener.started_events[1].command)
await self.mongocryptd_client.close()
async def test_explicit_session_errors_when_unsupported(self): async def test_explicit_session_errors_when_unsupported(self):
self.listener.reset() self.listener.reset()
async with self.mongocryptd_client.start_session() as s: async with self.mongocryptd_client.start_session() as s:
@ -3501,6 +3505,8 @@ class TestNoSessionsSupport(AsyncEncryptionIntegrationTest):
): ):
await self.mongocryptd_client.db.test.insert_one({"x": 1}, session=s) await self.mongocryptd_client.db.test.insert_one({"x": 1}, session=s)
await self.mongocryptd_client.close()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -20,7 +20,7 @@ import copy
import pprint import pprint
import sys import sys
import threading import threading
from test.asynchronous.utils import async_set_fail_point from test.asynchronous.utils import async_set_fail_point, flaky
sys.path[0:0] = [""] sys.path[0:0] = [""]
@ -466,6 +466,7 @@ class TestPoolPausedError(AsyncIntegrationTest):
@async_client_context.require_failCommand_blockConnection @async_client_context.require_failCommand_blockConnection
@async_client_context.require_retryable_writes @async_client_context.require_retryable_writes
@client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05) @client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05)
@flaky(reason="PYTHON-5291")
async def test_pool_paused_error_is_retryable(self): async def test_pool_paused_error_is_retryable(self):
cmap_listener = CMAPListener() cmap_listener = CMAPListener()
cmd_listener = OvertCommandListener() cmd_listener = OvertCommandListener()

View File

@ -21,6 +21,7 @@ import threading
from pathlib import Path from pathlib import Path
from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
from test.asynchronous.helpers import ConcurrentRunner from test.asynchronous.helpers import ConcurrentRunner
from test.asynchronous.utils import flaky
from test.asynchronous.utils_selection_tests import create_topology from test.asynchronous.utils_selection_tests import create_topology
from test.asynchronous.utils_spec_runner import AsyncSpecTestCreator from test.asynchronous.utils_spec_runner import AsyncSpecTestCreator
from test.utils_shared import ( from test.utils_shared import (
@ -137,6 +138,7 @@ class TestProse(AsyncIntegrationTest):
@async_client_context.require_failCommand_appName @async_client_context.require_failCommand_appName
@async_client_context.require_multiple_mongoses @async_client_context.require_multiple_mongoses
@flaky(reason="PYTHON-3689")
async def test_load_balancing(self): async def test_load_balancing(self):
listener = OvertCommandListener() listener = OvertCommandListener()
cmap_listener = CMAPListener() cmap_listener = CMAPListener()

View File

@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio import asyncio
import sys import sys
import time import time
from test.asynchronous.utils import flaky
from test.utils_shared import FunctionCallRecorder from test.utils_shared import FunctionCallRecorder
from typing import Any from typing import Any
@ -254,6 +255,7 @@ class TestSrvPolling(AsyncPyMongoTestCase):
# Nodelist should reflect new valid DNS resolver response. # Nodelist should reflect new valid DNS resolver response.
await self.assert_nodelist_change(response_final, client) await self.assert_nodelist_change(response_final, client)
@flaky(reason="PYTHON-5315")
async def test_recover_from_initially_empty_seedlist(self): async def test_recover_from_initially_empty_seedlist(self):
def empty_seedlist(): def empty_seedlist():
return [] return []

View File

@ -35,12 +35,11 @@ from test.asynchronous import (
client_knobs, client_knobs,
unittest, unittest,
) )
from test.asynchronous.utils import async_get_pool from test.asynchronous.utils import async_get_pool, flaky
from test.asynchronous.utils_spec_runner import SpecRunnerTask from test.asynchronous.utils_spec_runner import SpecRunnerTask
from test.unified_format_shared import ( from test.unified_format_shared import (
KMS_TLS_OPTS, KMS_TLS_OPTS,
PLACEHOLDER_MAP, PLACEHOLDER_MAP,
SKIP_CSOT_TESTS,
EventListenerUtil, EventListenerUtil,
MatchEvaluatorUtil, MatchEvaluatorUtil,
coerce_result, coerce_result,
@ -519,20 +518,38 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
self.skipTest("Implement PYTHON-1894") self.skipTest("Implement PYTHON-1894")
if "timeoutMS applied to entire download" in spec["description"]: if "timeoutMS applied to entire download" in spec["description"]:
self.skipTest("PyMongo's open_download_stream does not cap the stream's lifetime") self.skipTest("PyMongo's open_download_stream does not cap the stream's lifetime")
if (
"Error returned from connection pool clear with interruptInUseConnections=true is retryable"
in spec["description"]
and not _IS_SYNC
):
self.skipTest("PYTHON-5170 tests are flakey")
if "Driver extends timeout while streaming" in spec["description"] and not _IS_SYNC:
self.skipTest("PYTHON-5174 tests are flakey")
class_name = self.__class__.__name__.lower() class_name = self.__class__.__name__.lower()
description = spec["description"].lower() description = spec["description"].lower()
if "csot" in class_name: if "csot" in class_name:
if "gridfs" in class_name and sys.platform == "win32": # Skip tests that are too slow to run on a given platform.
self.skipTest("PYTHON-3522 CSOT GridFS tests are flaky on Windows") slow_macos = [
"operation fails after two consecutive socket timeouts.*",
"operation succeeds after one socket timeout.*",
"Non-tailable cursor lifetime remaining timeoutMS applied to getMore if timeoutMode is unset",
]
slow_win32 = [
*slow_macos,
"maxTimeMS value in the command is less than timeoutMS",
"timeoutMS applies to whole operation.*",
]
slow_pypy = [
"timeoutMS applies to whole operation.*",
]
if "CI" in os.environ and sys.platform == "win32" and "gridfs" in class_name:
self.skipTest("PYTHON-3522 CSOT GridFS test runs too slow on Windows")
if "CI" in os.environ and sys.platform == "win32":
for pat in slow_win32:
if re.match(pat.lower(), description):
self.skipTest("PYTHON-3522 CSOT test runs too slow on Windows")
if "CI" in os.environ and sys.platform == "darwin":
for pat in slow_macos:
if re.match(pat.lower(), description):
self.skipTest("PYTHON-3522 CSOT test runs too slow on MacOS")
if "CI" in os.environ and sys.implementation.name.lower() == "pypy":
for pat in slow_pypy:
if re.match(pat.lower(), description):
self.skipTest("PYTHON-3522 CSOT test runs too slow on PyPy")
if "change" in description or "change" in class_name: if "change" in description or "change" in class_name:
self.skipTest("CSOT not implemented for watch()") self.skipTest("CSOT not implemented for watch()")
if "cursors" in class_name: if "cursors" in class_name:
@ -1353,38 +1370,31 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
self.assertListEqual(sorted_expected_documents, actual_documents) self.assertListEqual(sorted_expected_documents, actual_documents)
async def run_scenario(self, spec, uri=None): async def run_scenario(self, spec, uri=None):
if "csot" in self.id().lower() and SKIP_CSOT_TESTS:
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
# Kill all sessions before and after each test to prevent an open # Kill all sessions before and after each test to prevent an open
# transaction (from a test failure) from blocking collection/database # transaction (from a test failure) from blocking collection/database
# operations during test set up and tear down. # operations during test set up and tear down.
await self.kill_all_sessions() await self.kill_all_sessions()
if "csot" in self.id().lower(): # Handle flaky tests.
# Retry CSOT tests up to 2 times to deal with flakey tests. flaky_tests = [
attempts = 3 ("PYTHON-5170", ".*test_discovery_and_monitoring.*"),
for i in range(attempts): ("PYTHON-5174", ".*Driver_extends_timeout_while_streaming"),
try: ("PYTHON-5315", ".*TestSrvPolling.test_recover_from_initially_.*"),
return await self._run_scenario(spec, uri) ("PYTHON-4987", ".*UnknownTransactionCommitResult_labels_to_connection_errors"),
except (AssertionError, OperationFailure) as exc: ("PYTHON-3689", ".*TestProse.test_load_balancing"),
if isinstance(exc, OperationFailure) and ( ("PYTHON-3522", ".*csot.*"),
_IS_SYNC or "failpoint" not in exc._message ]
): for reason, flaky_test in flaky_tests:
raise if re.match(flaky_test.lower(), self.id().lower()) is not None:
if i < attempts - 1: func_name = self.id()
print( options = dict(reason=reason, reset_func=self.asyncSetUp, func_name=func_name)
f"Retrying after attempt {i+1} of {self.id()} failed with:\n" if "csot" in func_name.lower():
f"{traceback.format_exc()}", options["max_runs"] = 3
file=sys.stderr, options["affects_cpython_linux"] = True
) decorator = flaky(**options)
await self.asyncSetUp() await decorator(self._run_scenario)(spec, uri)
continue return
raise await self._run_scenario(spec, uri)
return None
else:
await self._run_scenario(spec, uri)
return None
async def _run_scenario(self, spec, uri=None): async def _run_scenario(self, spec, uri=None):
# maybe skip test manually # maybe skip test manually

View File

@ -17,10 +17,14 @@ from __future__ import annotations
import asyncio import asyncio
import contextlib import contextlib
import os
import random import random
import sys
import threading # Used in the synchronized version of this file import threading # Used in the synchronized version of this file
import time import time
import traceback
from asyncio import iscoroutinefunction from asyncio import iscoroutinefunction
from functools import wraps
from bson.son import SON from bson.son import SON
from pymongo import AsyncMongoClient from pymongo import AsyncMongoClient
@ -154,6 +158,65 @@ async def async_joinall(tasks):
await asyncio.wait([t.task for t in tasks if t is not None], timeout=300) await asyncio.wait([t.task for t in tasks if t is not None], timeout=300)
def flaky(
*,
reason=None,
max_runs=2,
min_passes=1,
delay=1,
affects_cpython_linux=False,
func_name=None,
reset_func=None,
):
"""Decorate a test as flaky.
:param reason: the reason why the test is flaky
:param max_runs: the maximum number of runs before raising an error
:param min_passes: the minimum number of passing runs
:param delay: the delay in seconds between retries
:param affects_cpython_links: whether the test is flaky on CPython on Linux
:param func_name: the name of the function, used for the rety message
:param reset_func: a function to call before retrying
"""
if reason is None:
raise ValueError("flaky requires a reason input")
is_cpython_linux = sys.platform == "linux" and sys.implementation.name == "cpython"
disable_flaky = "DISABLE_FLAKY" in os.environ
if "CI" not in os.environ and "ENABLE_FLAKY" not in os.environ:
disable_flaky = True
if disable_flaky or (is_cpython_linux and not affects_cpython_linux):
max_runs = 1
min_passes = 1
def decorator(target_func):
@wraps(target_func)
async def wrapper(*args, **kwargs):
passes = 0
for i in range(max_runs):
try:
result = await target_func(*args, **kwargs)
passes += 1
if passes == min_passes:
return result
except Exception as e:
if i == max_runs - 1:
raise e
print(
f"Retrying after attempt {i+1} of {func_name or target_func.__name__} failed with ({reason})):\n"
f"{traceback.format_exc()}",
file=sys.stderr,
)
await asyncio.sleep(delay)
if reset_func:
await reset_func()
return wrapper
return decorator
class AsyncMockConnection: class AsyncMockConnection:
def __init__(self): def __init__(self):
self.cancel_context = _CancellationContext() self.cancel_context = _CancellationContext()

View File

@ -25,6 +25,7 @@ from test import (
client_context, client_context,
unittest, unittest,
) )
from test.utils import flaky
from test.utils_shared import ( from test.utils_shared import (
OvertCommandListener, OvertCommandListener,
) )
@ -615,8 +616,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites # https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
class TestClientBulkWriteCSOT(IntegrationTest): class TestClientBulkWriteCSOT(IntegrationTest):
def setUp(self): def setUp(self):
if os.environ.get("SKIP_CSOT_TESTS", ""):
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
super().setUp() super().setUp()
self.max_write_batch_size = client_context.max_write_batch_size self.max_write_batch_size = client_context.max_write_batch_size
self.max_bson_object_size = client_context.max_bson_size self.max_bson_object_size = client_context.max_bson_size
@ -624,7 +623,10 @@ class TestClientBulkWriteCSOT(IntegrationTest):
@client_context.require_version_min(8, 0, 0, -24) @client_context.require_version_min(8, 0, 0, -24)
@client_context.require_failCommand_fail_point @client_context.require_failCommand_fail_point
@flaky(reason="PYTHON-5290", max_runs=3, affects_cpython_linux=True)
def test_timeout_in_multi_batch_bulk_write(self): def test_timeout_in_multi_batch_bulk_write(self):
if sys.platform != "linux" and "CI" in os.environ:
self.skipTest("PYTHON-3522 CSOT test runs too slow on Windows and MacOS")
_OVERHEAD = 500 _OVERHEAD = 500
internal_client = self.rs_or_single_client(timeoutMS=None) internal_client = self.rs_or_single_client(timeoutMS=None)

View File

@ -23,6 +23,7 @@ sys.path[0:0] = [""]
from test import IntegrationTest, client_context, unittest from test import IntegrationTest, client_context, unittest
from test.unified_format import generate_test_classes from test.unified_format import generate_test_classes
from test.utils import flaky
import pymongo import pymongo
from pymongo import _csot from pymongo import _csot
@ -43,9 +44,8 @@ globals().update(generate_test_classes(TEST_PATH, module=__name__))
class TestCSOT(IntegrationTest): class TestCSOT(IntegrationTest):
RUN_ON_LOAD_BALANCER = True RUN_ON_LOAD_BALANCER = True
@flaky(reason="PYTHON-3522")
def test_timeout_nested(self): def test_timeout_nested(self):
if os.environ.get("SKIP_CSOT_TESTS", ""):
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
coll = self.db.coll coll = self.db.coll
self.assertEqual(_csot.get_timeout(), None) self.assertEqual(_csot.get_timeout(), None)
self.assertEqual(_csot.get_deadline(), float("inf")) self.assertEqual(_csot.get_deadline(), float("inf"))
@ -82,9 +82,8 @@ class TestCSOT(IntegrationTest):
self.assertEqual(_csot.get_rtt(), 0.0) self.assertEqual(_csot.get_rtt(), 0.0)
@client_context.require_change_streams @client_context.require_change_streams
@flaky(reason="PYTHON-3522")
def test_change_stream_can_resume_after_timeouts(self): def test_change_stream_can_resume_after_timeouts(self):
if os.environ.get("SKIP_CSOT_TESTS", ""):
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
coll = self.db.test coll = self.db.test
coll.insert_one({}) coll.insert_one({})
with coll.watch() as stream: with coll.watch() as stream:

View File

@ -31,6 +31,7 @@ import pymongo
sys.path[0:0] = [""] sys.path[0:0] = [""]
from test import IntegrationTest, client_context, unittest from test import IntegrationTest, client_context, unittest
from test.utils import flaky
from test.utils_shared import ( from test.utils_shared import (
AllowListEventListener, AllowListEventListener,
EventListener, EventListener,
@ -1397,9 +1398,8 @@ class TestCursor(IntegrationTest):
docs = c.to_list(3) docs = c.to_list(3)
self.assertEqual(len(docs), 2) self.assertEqual(len(docs), 2)
@flaky(reason="PYTHON-3522")
def test_to_list_csot_applied(self): def test_to_list_csot_applied(self):
if os.environ.get("SKIP_CSOT_TESTS", ""):
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
client = self.single_client(timeoutMS=500, w=1) client = self.single_client(timeoutMS=500, w=1)
coll = client.pymongo.test coll = client.pymongo.test
# Initialize the client with a larger timeout to help make test less flakey # Initialize the client with a larger timeout to help make test less flakey
@ -1440,9 +1440,8 @@ class TestCursor(IntegrationTest):
self.assertEqual(len(result.to_list(1)), 1) self.assertEqual(len(result.to_list(1)), 1)
@client_context.require_failCommand_blockConnection @client_context.require_failCommand_blockConnection
@flaky(reason="PYTHON-3522")
def test_command_cursor_to_list_csot_applied(self): def test_command_cursor_to_list_csot_applied(self):
if os.environ.get("SKIP_CSOT_TESTS", ""):
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
client = self.single_client(timeoutMS=500, w=1) client = self.single_client(timeoutMS=500, w=1)
coll = client.pymongo.test coll = client.pymongo.test
# Initialize the client with a larger timeout to help make test less flakey # Initialize the client with a larger timeout to help make test less flakey

View File

@ -32,6 +32,7 @@ import uuid
import warnings import warnings
from test import IntegrationTest, PyMongoTestCase, client_context from test import IntegrationTest, PyMongoTestCase, client_context
from test.test_bulk import BulkTestBase from test.test_bulk import BulkTestBase
from test.utils import flaky
from test.utils_spec_runner import SpecRunner, SpecTestCreator from test.utils_spec_runner import SpecRunner, SpecTestCreator
from threading import Thread from threading import Thread
from typing import Any, Dict, Mapping, Optional from typing import Any, Dict, Mapping, Optional
@ -3229,6 +3230,7 @@ class TestKmsRetryProse(EncryptionIntegrationTest):
class TestAutomaticDecryptionKeys(EncryptionIntegrationTest): class TestAutomaticDecryptionKeys(EncryptionIntegrationTest):
@client_context.require_no_standalone @client_context.require_no_standalone
@client_context.require_version_min(7, 0, -1) @client_context.require_version_min(7, 0, -1)
@flaky(reason="PYTHON-4982")
def setUp(self): def setUp(self):
super().setUp() super().setUp()
self.key1_document = json_data("etc", "data", "keys", "key1-document.json") self.key1_document = json_data("etc", "data", "keys", "key1-document.json")
@ -3471,6 +3473,8 @@ class TestNoSessionsSupport(EncryptionIntegrationTest):
self.assertNotIn("lsid", self.listener.started_events[1].command) self.assertNotIn("lsid", self.listener.started_events[1].command)
self.mongocryptd_client.close()
def test_explicit_session_errors_when_unsupported(self): def test_explicit_session_errors_when_unsupported(self):
self.listener.reset() self.listener.reset()
with self.mongocryptd_client.start_session() as s: with self.mongocryptd_client.start_session() as s:
@ -3483,6 +3487,8 @@ class TestNoSessionsSupport(EncryptionIntegrationTest):
): ):
self.mongocryptd_client.db.test.insert_one({"x": 1}, session=s) self.mongocryptd_client.db.test.insert_one({"x": 1}, session=s)
self.mongocryptd_client.close()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -20,7 +20,7 @@ import copy
import pprint import pprint
import sys import sys
import threading import threading
from test.utils import set_fail_point from test.utils import flaky, set_fail_point
sys.path[0:0] = [""] sys.path[0:0] = [""]
@ -464,6 +464,7 @@ class TestPoolPausedError(IntegrationTest):
@client_context.require_failCommand_blockConnection @client_context.require_failCommand_blockConnection
@client_context.require_retryable_writes @client_context.require_retryable_writes
@client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05) @client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05)
@flaky(reason="PYTHON-5291")
def test_pool_paused_error_is_retryable(self): def test_pool_paused_error_is_retryable(self):
cmap_listener = CMAPListener() cmap_listener = CMAPListener()
cmd_listener = OvertCommandListener() cmd_listener = OvertCommandListener()

View File

@ -21,6 +21,7 @@ import threading
from pathlib import Path from pathlib import Path
from test import IntegrationTest, client_context, unittest from test import IntegrationTest, client_context, unittest
from test.helpers import ConcurrentRunner from test.helpers import ConcurrentRunner
from test.utils import flaky
from test.utils_selection_tests import create_topology from test.utils_selection_tests import create_topology
from test.utils_shared import ( from test.utils_shared import (
CMAPListener, CMAPListener,
@ -137,6 +138,7 @@ class TestProse(IntegrationTest):
@client_context.require_failCommand_appName @client_context.require_failCommand_appName
@client_context.require_multiple_mongoses @client_context.require_multiple_mongoses
@flaky(reason="PYTHON-3689")
def test_load_balancing(self): def test_load_balancing(self):
listener = OvertCommandListener() listener = OvertCommandListener()
cmap_listener = CMAPListener() cmap_listener = CMAPListener()

View File

@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio import asyncio
import sys import sys
import time import time
from test.utils import flaky
from test.utils_shared import FunctionCallRecorder from test.utils_shared import FunctionCallRecorder
from typing import Any from typing import Any
@ -254,6 +255,7 @@ class TestSrvPolling(PyMongoTestCase):
# Nodelist should reflect new valid DNS resolver response. # Nodelist should reflect new valid DNS resolver response.
self.assert_nodelist_change(response_final, client) self.assert_nodelist_change(response_final, client)
@flaky(reason="PYTHON-5315")
def test_recover_from_initially_empty_seedlist(self): def test_recover_from_initially_empty_seedlist(self):
def empty_seedlist(): def empty_seedlist():
return [] return []

View File

@ -23,7 +23,7 @@ sys.path[0:0] = [""]
from test import client_knobs, unittest from test import client_knobs, unittest
from test.pymongo_mocks import DummyMonitor from test.pymongo_mocks import DummyMonitor
from test.utils import MockPool from test.utils import MockPool, flaky
from test.utils_shared import wait_until from test.utils_shared import wait_until
from bson.objectid import ObjectId from bson.objectid import ObjectId
@ -750,6 +750,7 @@ def wait_for_primary(topology):
class TestTopologyErrors(TopologyTest): class TestTopologyErrors(TopologyTest):
# Errors when calling hello. # Errors when calling hello.
@flaky(reason="PYTHON-5366")
def test_pool_reset(self): def test_pool_reset(self):
# hello succeeds at first, then always raises socket error. # hello succeeds at first, then always raises socket error.
hello_count = [0] hello_count = [0]

View File

@ -38,7 +38,6 @@ from test import (
from test.unified_format_shared import ( from test.unified_format_shared import (
KMS_TLS_OPTS, KMS_TLS_OPTS,
PLACEHOLDER_MAP, PLACEHOLDER_MAP,
SKIP_CSOT_TESTS,
EventListenerUtil, EventListenerUtil,
MatchEvaluatorUtil, MatchEvaluatorUtil,
coerce_result, coerce_result,
@ -48,7 +47,7 @@ from test.unified_format_shared import (
parse_collection_or_database_options, parse_collection_or_database_options,
with_metaclass, with_metaclass,
) )
from test.utils import get_pool from test.utils import flaky, get_pool
from test.utils_shared import ( from test.utils_shared import (
camel_to_snake, camel_to_snake,
camel_to_snake_args, camel_to_snake_args,
@ -518,20 +517,38 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
self.skipTest("Implement PYTHON-1894") self.skipTest("Implement PYTHON-1894")
if "timeoutMS applied to entire download" in spec["description"]: if "timeoutMS applied to entire download" in spec["description"]:
self.skipTest("PyMongo's open_download_stream does not cap the stream's lifetime") self.skipTest("PyMongo's open_download_stream does not cap the stream's lifetime")
if (
"Error returned from connection pool clear with interruptInUseConnections=true is retryable"
in spec["description"]
and not _IS_SYNC
):
self.skipTest("PYTHON-5170 tests are flakey")
if "Driver extends timeout while streaming" in spec["description"] and not _IS_SYNC:
self.skipTest("PYTHON-5174 tests are flakey")
class_name = self.__class__.__name__.lower() class_name = self.__class__.__name__.lower()
description = spec["description"].lower() description = spec["description"].lower()
if "csot" in class_name: if "csot" in class_name:
if "gridfs" in class_name and sys.platform == "win32": # Skip tests that are too slow to run on a given platform.
self.skipTest("PYTHON-3522 CSOT GridFS tests are flaky on Windows") slow_macos = [
"operation fails after two consecutive socket timeouts.*",
"operation succeeds after one socket timeout.*",
"Non-tailable cursor lifetime remaining timeoutMS applied to getMore if timeoutMode is unset",
]
slow_win32 = [
*slow_macos,
"maxTimeMS value in the command is less than timeoutMS",
"timeoutMS applies to whole operation.*",
]
slow_pypy = [
"timeoutMS applies to whole operation.*",
]
if "CI" in os.environ and sys.platform == "win32" and "gridfs" in class_name:
self.skipTest("PYTHON-3522 CSOT GridFS test runs too slow on Windows")
if "CI" in os.environ and sys.platform == "win32":
for pat in slow_win32:
if re.match(pat.lower(), description):
self.skipTest("PYTHON-3522 CSOT test runs too slow on Windows")
if "CI" in os.environ and sys.platform == "darwin":
for pat in slow_macos:
if re.match(pat.lower(), description):
self.skipTest("PYTHON-3522 CSOT test runs too slow on MacOS")
if "CI" in os.environ and sys.implementation.name.lower() == "pypy":
for pat in slow_pypy:
if re.match(pat.lower(), description):
self.skipTest("PYTHON-3522 CSOT test runs too slow on PyPy")
if "change" in description or "change" in class_name: if "change" in description or "change" in class_name:
self.skipTest("CSOT not implemented for watch()") self.skipTest("CSOT not implemented for watch()")
if "cursors" in class_name: if "cursors" in class_name:
@ -1340,38 +1357,31 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
self.assertListEqual(sorted_expected_documents, actual_documents) self.assertListEqual(sorted_expected_documents, actual_documents)
def run_scenario(self, spec, uri=None): def run_scenario(self, spec, uri=None):
if "csot" in self.id().lower() and SKIP_CSOT_TESTS:
raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...")
# Kill all sessions before and after each test to prevent an open # Kill all sessions before and after each test to prevent an open
# transaction (from a test failure) from blocking collection/database # transaction (from a test failure) from blocking collection/database
# operations during test set up and tear down. # operations during test set up and tear down.
self.kill_all_sessions() self.kill_all_sessions()
if "csot" in self.id().lower(): # Handle flaky tests.
# Retry CSOT tests up to 2 times to deal with flakey tests. flaky_tests = [
attempts = 3 ("PYTHON-5170", ".*test_discovery_and_monitoring.*"),
for i in range(attempts): ("PYTHON-5174", ".*Driver_extends_timeout_while_streaming"),
try: ("PYTHON-5315", ".*TestSrvPolling.test_recover_from_initially_.*"),
return self._run_scenario(spec, uri) ("PYTHON-4987", ".*UnknownTransactionCommitResult_labels_to_connection_errors"),
except (AssertionError, OperationFailure) as exc: ("PYTHON-3689", ".*TestProse.test_load_balancing"),
if isinstance(exc, OperationFailure) and ( ("PYTHON-3522", ".*csot.*"),
_IS_SYNC or "failpoint" not in exc._message ]
): for reason, flaky_test in flaky_tests:
raise if re.match(flaky_test.lower(), self.id().lower()) is not None:
if i < attempts - 1: func_name = self.id()
print( options = dict(reason=reason, reset_func=self.setUp, func_name=func_name)
f"Retrying after attempt {i+1} of {self.id()} failed with:\n" if "csot" in func_name.lower():
f"{traceback.format_exc()}", options["max_runs"] = 3
file=sys.stderr, options["affects_cpython_linux"] = True
) decorator = flaky(**options)
self.setUp() decorator(self._run_scenario)(spec, uri)
continue return
raise self._run_scenario(spec, uri)
return None
else:
self._run_scenario(spec, uri)
return None
def _run_scenario(self, spec, uri=None): def _run_scenario(self, spec, uri=None):
# maybe skip test manually # maybe skip test manually

View File

@ -91,8 +91,6 @@ from pymongo.results import BulkWriteResult
from pymongo.server_description import ServerDescription from pymongo.server_description import ServerDescription
from pymongo.topology_description import TopologyDescription from pymongo.topology_description import TopologyDescription
SKIP_CSOT_TESTS = os.getenv("SKIP_CSOT_TESTS")
JSON_OPTS = json_util.JSONOptions(tz_aware=False) JSON_OPTS = json_util.JSONOptions(tz_aware=False)
IS_INTERRUPTED = False IS_INTERRUPTED = False

View File

@ -17,10 +17,14 @@ from __future__ import annotations
import asyncio import asyncio
import contextlib import contextlib
import os
import random import random
import sys
import threading # Used in the synchronized version of this file import threading # Used in the synchronized version of this file
import time import time
import traceback
from asyncio import iscoroutinefunction from asyncio import iscoroutinefunction
from functools import wraps
from bson.son import SON from bson.son import SON
from pymongo import MongoClient from pymongo import MongoClient
@ -152,6 +156,65 @@ def joinall(tasks):
asyncio.wait([t.task for t in tasks if t is not None], timeout=300) asyncio.wait([t.task for t in tasks if t is not None], timeout=300)
def flaky(
*,
reason=None,
max_runs=2,
min_passes=1,
delay=1,
affects_cpython_linux=False,
func_name=None,
reset_func=None,
):
"""Decorate a test as flaky.
:param reason: the reason why the test is flaky
:param max_runs: the maximum number of runs before raising an error
:param min_passes: the minimum number of passing runs
:param delay: the delay in seconds between retries
:param affects_cpython_links: whether the test is flaky on CPython on Linux
:param func_name: the name of the function, used for the rety message
:param reset_func: a function to call before retrying
"""
if reason is None:
raise ValueError("flaky requires a reason input")
is_cpython_linux = sys.platform == "linux" and sys.implementation.name == "cpython"
disable_flaky = "DISABLE_FLAKY" in os.environ
if "CI" not in os.environ and "ENABLE_FLAKY" not in os.environ:
disable_flaky = True
if disable_flaky or (is_cpython_linux and not affects_cpython_linux):
max_runs = 1
min_passes = 1
def decorator(target_func):
@wraps(target_func)
def wrapper(*args, **kwargs):
passes = 0
for i in range(max_runs):
try:
result = target_func(*args, **kwargs)
passes += 1
if passes == min_passes:
return result
except Exception as e:
if i == max_runs - 1:
raise e
print(
f"Retrying after attempt {i+1} of {func_name or target_func.__name__} failed with ({reason})):\n"
f"{traceback.format_exc()}",
file=sys.stderr,
)
time.sleep(delay)
if reset_func:
reset_func()
return wrapper
return decorator
class MockConnection: class MockConnection:
def __init__(self): def __init__(self):
self.cancel_context = _CancellationContext() self.cancel_context = _CancellationContext()