Merge branch 'master' of github.com:mongodb/mongo-python-driver

This commit is contained in:
Steven Silvester 2025-06-10 06:08:30 -05:00
commit 826ebef69a
No known key found for this signature in database
GPG Key ID: B1BF5EC3A8B32F91
42 changed files with 108 additions and 424 deletions

View File

@ -3060,16 +3060,6 @@ tasks:
- sharded_cluster-auth-ssl
- sync
# Serverless tests
- name: test-serverless
commands:
- func: run tests
vars:
TEST_NAME: serverless
AUTH: auth
SSL: ssl
tags: [serverless]
# Standard tests
- name: test-standard-v4.0-python3.9-sync-noauth-nossl-standalone
commands:

View File

@ -586,26 +586,6 @@ buildvariants:
- rhel87-small
tags: [coverage_tag]
# Serverless tests
- name: serverless-rhel8-python3.9
tasks:
- name: .serverless
display_name: Serverless RHEL8 Python3.9
run_on:
- rhel87-small
batchtime: 10080
expansions:
PYTHON_BINARY: /opt/python/3.9/bin/python3
- name: serverless-rhel8-python3.13
tasks:
- name: .serverless
display_name: Serverless RHEL8 Python3.13
run_on:
- rhel87-small
batchtime: 10080
expansions:
PYTHON_BINARY: /opt/python/3.13/bin/python3
# Stable api tests
- name: stable-api-require-v1-rhel8-auth
tasks:

View File

@ -352,23 +352,6 @@ def create_disable_test_commands_variants():
return [create_variant(tasks, display_name, host=host, python=python, expansions=expansions)]
def create_serverless_variants():
host = DEFAULT_HOST
batchtime = BATCHTIME_WEEK
tasks = [".serverless"]
base_name = "Serverless"
return [
create_variant(
tasks,
get_variant_name(base_name, host, python=python),
host=host,
python=python,
batchtime=batchtime,
)
for python in MIN_MAX_PYTHON
]
def create_oidc_auth_variants():
variants = []
for host_name in ["ubuntu22", "macos", "win64"]:
@ -968,14 +951,6 @@ def create_free_threading_tasks():
return [EvgTask(name=task_name, tags=tags, commands=[server_func, test_func])]
def create_serverless_tasks():
vars = dict(TEST_NAME="serverless", AUTH="auth", SSL="ssl")
test_func = FunctionCall(func="run tests", vars=vars)
tags = ["serverless"]
task_name = "test-serverless"
return [EvgTask(name=task_name, tags=tags, commands=[test_func])]
##############
# Functions
##############

View File

@ -229,14 +229,6 @@ def handle_test_env() -> None:
config = read_env(f"{DRIVERS_TOOLS}/.evergreen/atlas_data_lake/secrets-export.sh")
DB_USER = config["ADL_USERNAME"]
DB_PASSWORD = config["ADL_PASSWORD"]
elif test_name == "serverless":
run_command(f"bash {DRIVERS_TOOLS}/.evergreen/serverless/setup.sh")
config = read_env(f"{DRIVERS_TOOLS}/.evergreen/serverless/secrets-export.sh")
DB_USER = config["SERVERLESS_ATLAS_USER"]
DB_PASSWORD = config["SERVERLESS_ATLAS_PASSWORD"]
write_env("MONGODB_URI", config["SERVERLESS_URI"])
write_env("SINGLE_MONGOS_LB_URI", config["SERVERLESS_URI"])
write_env("MULTI_MONGOS_LB_URI", config["SERVERLESS_URI"])
elif test_name == "auth_oidc":
DB_USER = config["OIDC_ADMIN_USER"]
DB_PASSWORD = config["OIDC_ADMIN_PWD"]

View File

@ -36,10 +36,6 @@ elif TEST_NAME == "auth_oidc":
elif TEST_NAME == "ocsp":
run_command(f"bash {DRIVERS_TOOLS}/.evergreen/ocsp/teardown.sh")
# Tear down serverless if applicable.
elif TEST_NAME == "serverless":
run_command(f"bash {DRIVERS_TOOLS}/.evergreen/serverless/teardown.sh")
# Tear down atlas cluster if applicable.
if TEST_NAME in ["aws_lambda", "search_index"]:
run_command(f"bash {DRIVERS_TOOLS}/.evergreen/atlas/teardown-atlas-cluster.sh")

View File

@ -45,7 +45,6 @@ TEST_SUITE_MAP = {
"mockupdb": "mockupdb",
"ocsp": "ocsp",
"perf": "perf",
"serverless": "",
}
# Tests that require a sub test suite.
@ -60,7 +59,6 @@ NO_RUN_ORCHESTRATION = [
"aws_lambda",
"data_lake",
"mockupdb",
"serverless",
"ocsp",
]

View File

@ -286,6 +286,7 @@ async def _async_socket_receive(
_PYPY = "PyPy" in sys.version
_WINDOWS = sys.platform == "win32"
def wait_for_read(conn: Connection, deadline: Optional[float]) -> None:
@ -337,7 +338,8 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me
while bytes_read < length:
try:
# Use the legacy wait_for_read cancellation approach on PyPy due to PYTHON-5011.
if _PYPY:
# also use it on Windows due to PYTHON-5405
if _PYPY or _WINDOWS:
wait_for_read(conn, deadline)
if _csot.get_timeout() and deadline is not None:
conn.set_conn_timeout(max(deadline - time.monotonic(), 0))
@ -359,6 +361,7 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me
raise _OperationCancelled("operation cancelled") from None
if (
_PYPY
or _WINDOWS
or not conn.is_sdam
and deadline is not None
and deadline - time.monotonic() < 0

View File

@ -64,7 +64,6 @@ from test.helpers import (
MONGODB_API_VERSION,
MULTI_MONGOS_LB_URI,
TEST_LOADBALANCER,
TEST_SERVERLESS,
TLS_OPTIONS,
SystemCertsPatcher,
client_knobs,
@ -123,9 +122,8 @@ class ClientContext:
self.conn_lock = threading.Lock()
self.is_data_lake = False
self.load_balancer = TEST_LOADBALANCER
self.serverless = TEST_SERVERLESS
self._fips_enabled = None
if self.load_balancer or self.serverless:
if self.load_balancer:
self.default_client_options["loadBalanced"] = True
if COMPRESSORS:
self.default_client_options["compressors"] = COMPRESSORS
@ -167,7 +165,7 @@ class ClientContext:
@property
def hello(self):
if not self._hello:
if self.serverless or self.load_balancer:
if self.load_balancer:
self._hello = self.client.admin.command(HelloCompat.CMD)
else:
self._hello = self.client.admin.command(HelloCompat.LEGACY_CMD)
@ -222,24 +220,21 @@ class ClientContext:
if self.client:
self.connected = True
if self.serverless:
self.auth_enabled = True
else:
try:
self.cmd_line = self.client.admin.command("getCmdLineOpts")
except pymongo.errors.OperationFailure as e:
assert e.details is not None
msg = e.details.get("errmsg", "")
if e.code == 13 or "unauthorized" in msg or "login" in msg:
# Unauthorized.
self.auth_enabled = True
else:
raise
try:
self.cmd_line = self.client.admin.command("getCmdLineOpts")
except pymongo.errors.OperationFailure as e:
assert e.details is not None
msg = e.details.get("errmsg", "")
if e.code == 13 or "unauthorized" in msg or "login" in msg:
# Unauthorized.
self.auth_enabled = True
else:
self.auth_enabled = self._server_started_with_auth()
raise
else:
self.auth_enabled = self._server_started_with_auth()
if self.auth_enabled:
if not self.serverless and not IS_SRV:
if not IS_SRV:
# See if db_user already exists.
if not self._check_user_provided():
_create_user(self.client.admin, db_user, db_pwd)
@ -259,13 +254,10 @@ class ClientContext:
# May not have this if OperationFailure was raised earlier.
self.cmd_line = self.client.admin.command("getCmdLineOpts")
if self.serverless:
self.server_status = {}
else:
self.server_status = self.client.admin.command("serverStatus")
if self.storage_engine == "mmapv1":
# MMAPv1 does not support retryWrites=True.
self.default_client_options["retryWrites"] = False
self.server_status = self.client.admin.command("serverStatus")
if self.storage_engine == "mmapv1":
# MMAPv1 does not support retryWrites=True.
self.default_client_options["retryWrites"] = False
hello = self.hello
self.sessions_enabled = "logicalSessionTimeoutMinutes" in hello
@ -302,42 +294,33 @@ class ClientContext:
self.w = len(hello.get("hosts", [])) or 1
self.version = Version.from_client(self.client)
if self.serverless:
self.server_parameters = {
"requireApiVersion": False,
"enableTestCommands": True,
}
self.server_parameters = self.client.admin.command("getParameter", "*")
assert self.cmd_line is not None
if self.server_parameters["enableTestCommands"]:
self.test_commands_enabled = True
self.has_ipv6 = False
else:
self.server_parameters = self.client.admin.command("getParameter", "*")
assert self.cmd_line is not None
if self.server_parameters["enableTestCommands"]:
elif "parsed" in self.cmd_line:
params = self.cmd_line["parsed"].get("setParameter", [])
if "enableTestCommands=1" in params:
self.test_commands_enabled = True
elif "parsed" in self.cmd_line:
params = self.cmd_line["parsed"].get("setParameter", [])
if "enableTestCommands=1" in params:
else:
params = self.cmd_line["parsed"].get("setParameter", {})
if params.get("enableTestCommands") == "1":
self.test_commands_enabled = True
else:
params = self.cmd_line["parsed"].get("setParameter", {})
if params.get("enableTestCommands") == "1":
self.test_commands_enabled = True
self.has_ipv6 = self._server_started_with_ipv6()
self.has_ipv6 = self._server_started_with_ipv6()
self.is_mongos = (self.hello).get("msg") == "isdbgrid"
if self.is_mongos:
address = self.client.address
self.mongoses.append(address)
if not self.serverless:
# Check for another mongos on the next port.
assert address is not None
next_address = address[0], address[1] + 1
mongos_client = self._connect(*next_address, **self.default_client_options)
if mongos_client:
hello = mongos_client.admin.command(HelloCompat.LEGACY_CMD)
if hello.get("msg") == "isdbgrid":
self.mongoses.append(next_address)
mongos_client.close()
# Check for another mongos on the next port.
assert address is not None
next_address = address[0], address[1] + 1
mongos_client = self._connect(*next_address, **self.default_client_options)
if mongos_client:
hello = mongos_client.admin.command(HelloCompat.LEGACY_CMD)
if hello.get("msg") == "isdbgrid":
self.mongoses.append(next_address)
mongos_client.close()
def init(self):
with self.conn_lock:
@ -666,15 +649,9 @@ class ClientContext:
lambda: not self.load_balancer, "Must not be connected to a load balancer", func=func
)
def require_no_serverless(self, func):
"""Run a test only if the client is not connected to serverless."""
return self._require(
lambda: not self.serverless, "Must not be connected to serverless", func=func
)
def require_change_streams(self, func):
"""Run a test only if the server supports change streams."""
return self.require_no_mmap(self.require_no_standalone(self.require_no_serverless(func)))
return self.require_no_mmap(self.require_no_standalone(func))
def is_topology_type(self, topologies):
unknown = set(topologies) - {
@ -1195,8 +1172,6 @@ class IntegrationTest(PyMongoTestCase):
def setUp(self) -> None:
if client_context.load_balancer and not getattr(self, "RUN_ON_LOAD_BALANCER", False):
raise SkipTest("this test does not support load balancers")
if client_context.serverless and not getattr(self, "RUN_ON_SERVERLESS", False):
raise SkipTest("this test does not support serverless")
self.client = client_context.client
self.db = self.client.pymongo_test
if client_context.auth_enabled:

View File

@ -64,7 +64,6 @@ from test.helpers import (
MONGODB_API_VERSION,
MULTI_MONGOS_LB_URI,
TEST_LOADBALANCER,
TEST_SERVERLESS,
TLS_OPTIONS,
SystemCertsPatcher,
client_knobs,
@ -123,9 +122,8 @@ class AsyncClientContext:
self.conn_lock = threading.Lock()
self.is_data_lake = False
self.load_balancer = TEST_LOADBALANCER
self.serverless = TEST_SERVERLESS
self._fips_enabled = None
if self.load_balancer or self.serverless:
if self.load_balancer:
self.default_client_options["loadBalanced"] = True
if COMPRESSORS:
self.default_client_options["compressors"] = COMPRESSORS
@ -167,7 +165,7 @@ class AsyncClientContext:
@property
async def hello(self):
if not self._hello:
if self.serverless or self.load_balancer:
if self.load_balancer:
self._hello = await self.client.admin.command(HelloCompat.CMD)
else:
self._hello = await self.client.admin.command(HelloCompat.LEGACY_CMD)
@ -222,24 +220,21 @@ class AsyncClientContext:
if self.client:
self.connected = True
if self.serverless:
self.auth_enabled = True
else:
try:
self.cmd_line = await self.client.admin.command("getCmdLineOpts")
except pymongo.errors.OperationFailure as e:
assert e.details is not None
msg = e.details.get("errmsg", "")
if e.code == 13 or "unauthorized" in msg or "login" in msg:
# Unauthorized.
self.auth_enabled = True
else:
raise
try:
self.cmd_line = await self.client.admin.command("getCmdLineOpts")
except pymongo.errors.OperationFailure as e:
assert e.details is not None
msg = e.details.get("errmsg", "")
if e.code == 13 or "unauthorized" in msg or "login" in msg:
# Unauthorized.
self.auth_enabled = True
else:
self.auth_enabled = self._server_started_with_auth()
raise
else:
self.auth_enabled = self._server_started_with_auth()
if self.auth_enabled:
if not self.serverless and not IS_SRV:
if not IS_SRV:
# See if db_user already exists.
if not await self._check_user_provided():
await _create_user(self.client.admin, db_user, db_pwd)
@ -259,13 +254,10 @@ class AsyncClientContext:
# May not have this if OperationFailure was raised earlier.
self.cmd_line = await self.client.admin.command("getCmdLineOpts")
if self.serverless:
self.server_status = {}
else:
self.server_status = await self.client.admin.command("serverStatus")
if self.storage_engine == "mmapv1":
# MMAPv1 does not support retryWrites=True.
self.default_client_options["retryWrites"] = False
self.server_status = await self.client.admin.command("serverStatus")
if self.storage_engine == "mmapv1":
# MMAPv1 does not support retryWrites=True.
self.default_client_options["retryWrites"] = False
hello = await self.hello
self.sessions_enabled = "logicalSessionTimeoutMinutes" in hello
@ -302,44 +294,33 @@ class AsyncClientContext:
self.w = len(hello.get("hosts", [])) or 1
self.version = await Version.async_from_client(self.client)
if self.serverless:
self.server_parameters = {
"requireApiVersion": False,
"enableTestCommands": True,
}
self.server_parameters = await self.client.admin.command("getParameter", "*")
assert self.cmd_line is not None
if self.server_parameters["enableTestCommands"]:
self.test_commands_enabled = True
self.has_ipv6 = False
else:
self.server_parameters = await self.client.admin.command("getParameter", "*")
assert self.cmd_line is not None
if self.server_parameters["enableTestCommands"]:
elif "parsed" in self.cmd_line:
params = self.cmd_line["parsed"].get("setParameter", [])
if "enableTestCommands=1" in params:
self.test_commands_enabled = True
elif "parsed" in self.cmd_line:
params = self.cmd_line["parsed"].get("setParameter", [])
if "enableTestCommands=1" in params:
else:
params = self.cmd_line["parsed"].get("setParameter", {})
if params.get("enableTestCommands") == "1":
self.test_commands_enabled = True
else:
params = self.cmd_line["parsed"].get("setParameter", {})
if params.get("enableTestCommands") == "1":
self.test_commands_enabled = True
self.has_ipv6 = await self._server_started_with_ipv6()
self.has_ipv6 = await self._server_started_with_ipv6()
self.is_mongos = (await self.hello).get("msg") == "isdbgrid"
if self.is_mongos:
address = await self.client.address
self.mongoses.append(address)
if not self.serverless:
# Check for another mongos on the next port.
assert address is not None
next_address = address[0], address[1] + 1
mongos_client = await self._connect(
*next_address, **self.default_client_options
)
if mongos_client:
hello = await mongos_client.admin.command(HelloCompat.LEGACY_CMD)
if hello.get("msg") == "isdbgrid":
self.mongoses.append(next_address)
await mongos_client.close()
# Check for another mongos on the next port.
assert address is not None
next_address = address[0], address[1] + 1
mongos_client = await self._connect(*next_address, **self.default_client_options)
if mongos_client:
hello = await mongos_client.admin.command(HelloCompat.LEGACY_CMD)
if hello.get("msg") == "isdbgrid":
self.mongoses.append(next_address)
await mongos_client.close()
async def init(self):
with self.conn_lock:
@ -668,15 +649,9 @@ class AsyncClientContext:
lambda: not self.load_balancer, "Must not be connected to a load balancer", func=func
)
def require_no_serverless(self, func):
"""Run a test only if the client is not connected to serverless."""
return self._require(
lambda: not self.serverless, "Must not be connected to serverless", func=func
)
def require_change_streams(self, func):
"""Run a test only if the server supports change streams."""
return self.require_no_mmap(self.require_no_standalone(self.require_no_serverless(func)))
return self.require_no_mmap(self.require_no_standalone(func))
async def is_topology_type(self, topologies):
unknown = set(topologies) - {
@ -1213,8 +1188,6 @@ class AsyncIntegrationTest(AsyncPyMongoTestCase):
async def asyncSetUp(self) -> None:
if async_client_context.load_balancer and not getattr(self, "RUN_ON_LOAD_BALANCER", False):
raise SkipTest("this test does not support load balancers")
if async_client_context.serverless and not getattr(self, "RUN_ON_SERVERLESS", False):
raise SkipTest("this test does not support serverless")
self.client = async_client_context.client
self.db = self.client.pymongo_test
if async_client_context.auth_enabled:

View File

@ -82,7 +82,6 @@ if CA_PEM:
COMPRESSORS = os.environ.get("COMPRESSORS")
MONGODB_API_VERSION = os.environ.get("MONGODB_API_VERSION")
TEST_LOADBALANCER = bool(os.environ.get("TEST_LOAD_BALANCER"))
TEST_SERVERLESS = bool(os.environ.get("TEST_SERVERLESS"))
SINGLE_MONGOS_LB_URI = os.environ.get("SINGLE_MONGOS_LB_URI")
MULTI_MONGOS_LB_URI = os.environ.get("MULTI_MONGOS_LB_URI")
@ -91,15 +90,6 @@ if TEST_LOADBALANCER:
host, port = res["nodelist"][0]
db_user = res["username"] or db_user
db_pwd = res["password"] or db_pwd
elif TEST_SERVERLESS:
TEST_LOADBALANCER = True
res = parse_uri(SINGLE_MONGOS_LB_URI or "")
host, port = res["nodelist"][0]
db_user = res["username"] or db_user
db_pwd = res["password"] or db_pwd
TLS_OPTIONS = {"tls": True}
# Spec says serverless tests must be run with compression.
COMPRESSORS = COMPRESSORS or "zlib"
# Shared KMS data.

View File

@ -2006,7 +2006,7 @@ class TestClient(AsyncIntegrationTest):
self.assertEqual(len(client.topology_description.server_descriptions()), 2)
@unittest.skipIf(
async_client_context.load_balancer or async_client_context.serverless,
async_client_context.load_balancer,
"loadBalanced clients do not run SDAM",
)
@unittest.skipIf(sys.platform == "win32", "Windows does not support SIGSTOP")

View File

@ -47,7 +47,6 @@ _IS_SYNC = False
class TestClientBulkWrite(AsyncIntegrationTest):
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_returns_error_if_no_namespace_provided(self):
models = [InsertOne(document={"a": "b"})]
with self.assertRaises(InvalidOperation) as context:
@ -58,7 +57,6 @@ class TestClientBulkWrite(AsyncIntegrationTest):
)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_handles_non_pymongo_error(self):
with patch.object(
_AsyncClientBulk, "write_command", return_value={"error": TypeError("mock type error")}
@ -70,7 +68,6 @@ class TestClientBulkWrite(AsyncIntegrationTest):
self.assertFalse(hasattr(context.exception.error, "details"))
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_formats_write_error_correctly(self):
models = [
InsertOne(namespace="db.coll", document={"_id": 1}),
@ -94,7 +91,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.max_message_size_bytes = await async_client_context.max_message_size_bytes
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_batch_splits_if_num_operations_too_large(self):
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener])
@ -119,7 +115,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(first_event.operation_id, second_event.operation_id)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_batch_splits_if_ops_payload_too_large(self):
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener])
@ -151,7 +146,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(first_event.operation_id, second_event.operation_id)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
@async_client_context.require_failCommand_fail_point
async def test_collects_write_concern_errors_across_batches(self):
listener = OvertCommandListener()
@ -194,7 +188,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(len(bulk_write_events), 2)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_collects_write_errors_across_batches_unordered(self):
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener])
@ -224,7 +217,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(len(bulk_write_events), 2)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_collects_write_errors_across_batches_ordered(self):
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener])
@ -254,7 +246,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(len(bulk_write_events), 1)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_handles_cursor_requiring_getMore(self):
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener])
@ -294,7 +285,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertTrue(get_more_event)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
@async_client_context.require_no_standalone
async def test_handles_cursor_requiring_getMore_within_transaction(self):
listener = OvertCommandListener()
@ -337,7 +327,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertTrue(get_more_event)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
@async_client_context.require_failCommand_fail_point
async def test_handles_getMore_error(self):
listener = OvertCommandListener()
@ -392,7 +381,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertTrue(kill_cursors_event)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_returns_error_if_unacknowledged_too_large_insert(self):
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener])
@ -452,7 +440,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
return num_models, models
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_no_batch_splits_if_new_namespace_is_not_too_large(self):
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener])
@ -483,7 +470,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(event.command["nsInfo"][0]["ns"], "db.coll")
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_batch_splits_if_new_namespace_is_too_large(self):
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener])
@ -521,7 +507,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(second_event.command["nsInfo"][0]["ns"], namespace)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_returns_error_if_no_writes_can_be_added_to_ops(self):
client = await self.async_rs_or_single_client()
@ -539,7 +524,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
await client.bulk_write(models=models)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
@unittest.skipUnless(_HAVE_PYMONGOCRYPT, "pymongocrypt is not installed")
async def test_returns_error_if_auto_encryption_configured(self):
opts = AutoEncryptionOpts(
@ -556,7 +540,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_upserted_result(self):
client = await self.async_rs_or_single_client()
@ -596,7 +579,6 @@ class TestClientBulkWriteCRUD(AsyncIntegrationTest):
self.assertEqual(result.update_results[2].did_upsert, False)
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_15_unacknowledged_write_across_batches(self):
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener])
@ -645,7 +627,6 @@ class TestClientBulkWriteCSOT(AsyncIntegrationTest):
self.max_message_size_bytes = await async_client_context.max_message_size_bytes
@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
@async_client_context.require_failCommand_fail_point
async def test_timeout_in_multi_batch_bulk_write(self):
_OVERHEAD = 500

View File

@ -36,16 +36,6 @@ class TestAsyncClientContext(AsyncUnitTest):
),
)
def test_serverless(self):
if not os.environ.get("TEST_SERVERLESS"):
raise SkipTest("TEST_SERVERLESS is not set")
self.assertTrue(
async_client_context.connected and async_client_context.serverless,
"client context must be connected to serverless when "
f"TEST_SERVERLESS is set. Failed attempts:\n{async_client_context.connection_attempt_info()}",
)
def test_enableTestCommands_is_disabled(self):
if not os.environ.get("DISABLE_TEST_COMMANDS"):
raise SkipTest("DISABLE_TEST_COMMANDS is not set")

View File

@ -33,7 +33,7 @@ else:
_TEST_PATH = os.path.join(pathlib.Path(__file__).resolve().parent.parent, "crud", "unified")
# Generate unified tests.
globals().update(generate_test_classes(_TEST_PATH, module=__name__, RUN_ON_SERVERLESS=True))
globals().update(generate_test_classes(_TEST_PATH, module=__name__))
if __name__ == "__main__":
unittest.main()

View File

@ -41,7 +41,6 @@ globals().update(generate_test_classes(TEST_PATH, module=__name__))
class TestCSOT(AsyncIntegrationTest):
RUN_ON_SERVERLESS = True
RUN_ON_LOAD_BALANCER = True
async def test_timeout_nested(self):

View File

@ -445,7 +445,6 @@ class TestPoolManagement(AsyncIntegrationTest):
class TestServerMonitoringMode(AsyncIntegrationTest):
@async_client_context.require_no_serverless
@async_client_context.require_no_load_balancer
async def asyncSetUp(self):
await super().asyncSetUp()

View File

@ -54,7 +54,6 @@ globals().update(generate_test_classes(_TEST_PATH, module=__name__))
class TestLB(AsyncIntegrationTest):
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
async def test_connections_are_only_returned_once(self):
if "PyPy" in sys.version:
@ -142,10 +141,8 @@ class TestLB(AsyncIntegrationTest):
session = client.start_session()
await session.start_transaction()
await client.test_session_gc.test.find_one({}, session=session)
# Cleanup the transaction left open on the server unless we're
# testing serverless which does not support killSessions.
if not async_client_context.serverless:
self.addAsyncCleanup(self.client.admin.command, "killSessions", [session.session_id])
# Cleanup the transaction left open on the server
self.addAsyncCleanup(self.client.admin.command, "killSessions", [session.session_id])
if async_client_context.load_balancer:
self.assertEqual(pool.active_sockets, 1) # Pinned.

View File

@ -80,7 +80,6 @@ class FindThread(threading.Thread):
class TestPoolPausedError(AsyncIntegrationTest):
# Pools don't get paused in load balanced mode.
RUN_ON_LOAD_BALANCER = False
RUN_ON_SERVERLESS = False
@async_client_context.require_sync
@async_client_context.require_failCommand_blockConnection

View File

@ -129,7 +129,6 @@ def non_retryable_single_statement_ops(coll):
class IgnoreDeprecationsTest(AsyncIntegrationTest):
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
deprecation_filter: DeprecationFilter
async def asyncSetUp(self) -> None:
@ -423,7 +422,6 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
class TestWriteConcernError(AsyncIntegrationTest):
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
fail_insert: dict
@async_client_context.require_replica_set
@ -494,7 +492,6 @@ class InsertThread(threading.Thread):
class TestPoolPausedError(AsyncIntegrationTest):
# Pools don't get paused in load balanced mode.
RUN_ON_LOAD_BALANCER = False
RUN_ON_SERVERLESS = False
@async_client_context.require_sync
@async_client_context.require_failCommand_blockConnection

View File

@ -194,10 +194,11 @@ class TestSession(AsyncIntegrationTest):
# successful connection checkout" test from Driver Sessions Spec.
succeeded = False
lsid_set = set()
failures = 0
for _ in range(5):
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener], maxPoolSize=1)
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener], maxPoolSize=1)
# Retry up to 10 times because there is a known race that can cause multiple
# sessions to be used: connection check in happens before session check in
for _ in range(10):
cursor = client.db.test.find({})
ops: List[Tuple[Callable, List[Any]]] = [
(client.db.test.find_one, [{"_id": 1}]),
@ -240,9 +241,9 @@ class TestSession(AsyncIntegrationTest):
if i.command.get("lsid"):
lsid_set.add(i.command.get("lsid")["id"])
if len(lsid_set) == 1:
# Break on first success.
succeeded = True
else:
failures += 1
break
self.assertTrue(succeeded, lsid_set)
async def test_pool_lifo(self):

View File

@ -73,8 +73,6 @@ class AsyncTransactionsBase(AsyncSpecRunner):
class TestTransactions(AsyncTransactionsBase):
RUN_ON_SERVERLESS = True
@async_client_context.require_transactions
def test_transaction_options_validation(self):
default_options = TransactionOptions()

View File

@ -42,7 +42,6 @@ globals().update(
expected_failures=[
"Client side error in command starting transaction", # PYTHON-1894
],
RUN_ON_SERVERLESS=False,
)
)
@ -56,7 +55,6 @@ globals().update(
expected_failures=[
".*", # All tests expected to fail
],
RUN_ON_SERVERLESS=False,
)
)

View File

@ -40,7 +40,6 @@ globals().update(generate_test_classes(TEST_PATH, module=__name__))
class TestServerApiIntegration(AsyncIntegrationTest):
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
def assertServerApi(self, event):
self.assertIn("apiVersion", event.command)

View File

@ -131,14 +131,6 @@ async def is_run_on_requirement_satisfied(requirement):
if req_max_server_version:
max_version_satisfied = Version.from_string(req_max_server_version) >= server_version
serverless = requirement.get("serverless")
if serverless == "require":
serverless_satisfied = async_client_context.serverless
elif serverless == "forbid":
serverless_satisfied = not async_client_context.serverless
else: # unset or "allow"
serverless_satisfied = True
params_satisfied = True
params = requirement.get("serverParameters")
if params:
@ -168,7 +160,6 @@ async def is_run_on_requirement_satisfied(requirement):
topology_satisfied
and min_version_satisfied
and max_version_satisfied
and serverless_satisfied
and params_satisfied
and auth_satisfied
and csfle_satisfied
@ -284,7 +275,7 @@ class EntityMapUtil:
self._listeners[spec["id"]] = listener
kwargs["event_listeners"] = [listener]
if spec.get("useMultipleMongoses"):
if async_client_context.load_balancer or async_client_context.serverless:
if async_client_context.load_balancer:
kwargs["h"] = async_client_context.MULTI_MONGOS_LB_URI
elif async_client_context.is_mongos:
kwargs["h"] = async_client_context.mongos_seeds()
@ -440,7 +431,6 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
SCHEMA_VERSION = Version.from_string("1.22")
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
TEST_SPEC: Any
TEST_PATH = "" # This gets filled in by generate_test_classes
mongos_clients: list[AsyncMongoClient] = []
@ -511,11 +501,7 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
# Handle mongos_clients for transactions tests.
self.mongos_clients = []
if (
async_client_context.supports_transactions()
and not async_client_context.load_balancer
and not async_client_context.serverless
):
if async_client_context.supports_transactions() and not async_client_context.load_balancer:
for address in async_client_context.mongoses:
self.mongos_clients.append(await self.async_single_client("{}:{}".format(*address)))
@ -552,12 +538,6 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
self.skipTest("PYTHON-5170 tests are flakey")
if "Driver extends timeout while streaming" in spec["description"] and not _IS_SYNC:
self.skipTest("PYTHON-5174 tests are flakey")
if (
"inserting _id with type null via clientBulkWrite" in spec["description"]
or "commitTransaction fails after Interrupted" in spec["description"]
or "commit is not retried after MaxTimeMSExpired error" in spec["description"]
) and async_client_context.serverless:
self.skipTest("PYTHON-5326 known serverless failures")
class_name = self.__class__.__name__.lower()
description = spec["description"].lower()

View File

@ -124,18 +124,6 @@ class AsyncSpecTestCreator:
if max_ver is not None:
method = async_client_context.require_version_max(*max_ver)(method)
if "serverless" in scenario_def:
serverless = scenario_def["serverless"]
if serverless == "require":
serverless_satisfied = async_client_context.serverless
elif serverless == "forbid":
serverless_satisfied = not async_client_context.serverless
else: # unset or "allow"
serverless_satisfied = True
method = unittest.skipUnless(
serverless_satisfied, "Serverless requirement not satisfied"
)(method)
return method
@staticmethod
@ -168,16 +156,6 @@ class AsyncSpecTestCreator:
return not async_client_context.auth_enabled
return True
@staticmethod
def serverless_ok(run_on_req):
serverless = run_on_req["serverless"]
if serverless == "require":
return async_client_context.serverless
elif serverless == "forbid":
return not async_client_context.serverless
else: # unset or "allow"
return True
async def should_run_on(self, scenario_def):
run_on = scenario_def.get("runOn", [])
if not run_on:
@ -190,7 +168,6 @@ class AsyncSpecTestCreator:
and self.min_server_version(req)
and self.max_server_version(req)
and self.valid_auth_enabled(req)
and self.serverless_ok(req)
):
return True
return False
@ -680,7 +657,7 @@ class AsyncSpecRunner(AsyncIntegrationTest):
use_multi_mongos = test["useMultipleMongoses"]
host = None
if use_multi_mongos:
if async_client_context.load_balancer or async_client_context.serverless:
if async_client_context.load_balancer:
host = async_client_context.MULTI_MONGOS_LB_URI
elif async_client_context.is_mongos:
host = async_client_context.mongos_seeds()

View File

@ -37,13 +37,11 @@ URIS = {
"ATLAS_FREE": os.environ.get("ATLAS_FREE"),
"ATLAS_TLS11": os.environ.get("ATLAS_TLS11"),
"ATLAS_TLS12": os.environ.get("ATLAS_TLS12"),
"ATLAS_SERVERLESS": os.environ.get("ATLAS_SERVERLESS"),
"ATLAS_SRV_REPL": os.environ.get("ATLAS_SRV_REPL"),
"ATLAS_SRV_SHRD": os.environ.get("ATLAS_SRV_SHRD"),
"ATLAS_SRV_FREE": os.environ.get("ATLAS_SRV_FREE"),
"ATLAS_SRV_TLS11": os.environ.get("ATLAS_SRV_TLS11"),
"ATLAS_SRV_TLS12": os.environ.get("ATLAS_SRV_TLS12"),
"ATLAS_SRV_SERVERLESS": os.environ.get("ATLAS_SRV_SERVERLESS"),
}
@ -73,9 +71,6 @@ class TestAtlasConnect(PyMongoTestCase):
def test_tls_12(self):
self.connect(URIS["ATLAS_TLS12"])
def test_serverless(self):
self.connect(URIS["ATLAS_SERVERLESS"])
def connect_srv(self, uri):
self.connect(uri)
self.assertIn("mongodb+srv://", uri)
@ -96,9 +91,6 @@ class TestAtlasConnect(PyMongoTestCase):
def test_srv_tls_12(self):
self.connect_srv(URIS["ATLAS_SRV_TLS12"])
def test_srv_serverless(self):
self.connect_srv(URIS["ATLAS_SRV_SERVERLESS"])
def test_uniqueness(self):
"""Ensure that we don't accidentally duplicate the test URIs."""
uri_to_names = defaultdict(list)

View File

@ -82,7 +82,6 @@ if CA_PEM:
COMPRESSORS = os.environ.get("COMPRESSORS")
MONGODB_API_VERSION = os.environ.get("MONGODB_API_VERSION")
TEST_LOADBALANCER = bool(os.environ.get("TEST_LOAD_BALANCER"))
TEST_SERVERLESS = bool(os.environ.get("TEST_SERVERLESS"))
SINGLE_MONGOS_LB_URI = os.environ.get("SINGLE_MONGOS_LB_URI")
MULTI_MONGOS_LB_URI = os.environ.get("MULTI_MONGOS_LB_URI")
@ -91,15 +90,6 @@ if TEST_LOADBALANCER:
host, port = res["nodelist"][0]
db_user = res["username"] or db_user
db_pwd = res["password"] or db_pwd
elif TEST_SERVERLESS:
TEST_LOADBALANCER = True
res = parse_uri(SINGLE_MONGOS_LB_URI or "")
host, port = res["nodelist"][0]
db_user = res["username"] or db_user
db_pwd = res["password"] or db_pwd
TLS_OPTIONS = {"tls": True}
# Spec says serverless tests must be run with compression.
COMPRESSORS = COMPRESSORS or "zlib"
# Shared KMS data.

View File

@ -1963,7 +1963,7 @@ class TestClient(IntegrationTest):
self.assertEqual(len(client.topology_description.server_descriptions()), 2)
@unittest.skipIf(
client_context.load_balancer or client_context.serverless,
client_context.load_balancer,
"loadBalanced clients do not run SDAM",
)
@unittest.skipIf(sys.platform == "win32", "Windows does not support SIGSTOP")

View File

@ -47,7 +47,6 @@ _IS_SYNC = True
class TestClientBulkWrite(IntegrationTest):
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_returns_error_if_no_namespace_provided(self):
models = [InsertOne(document={"a": "b"})]
with self.assertRaises(InvalidOperation) as context:
@ -58,7 +57,6 @@ class TestClientBulkWrite(IntegrationTest):
)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_handles_non_pymongo_error(self):
with patch.object(
_ClientBulk, "write_command", return_value={"error": TypeError("mock type error")}
@ -70,7 +68,6 @@ class TestClientBulkWrite(IntegrationTest):
self.assertFalse(hasattr(context.exception.error, "details"))
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_formats_write_error_correctly(self):
models = [
InsertOne(namespace="db.coll", document={"_id": 1}),
@ -94,7 +91,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.max_message_size_bytes = client_context.max_message_size_bytes
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_batch_splits_if_num_operations_too_large(self):
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener])
@ -119,7 +115,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(first_event.operation_id, second_event.operation_id)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_batch_splits_if_ops_payload_too_large(self):
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener])
@ -151,7 +146,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(first_event.operation_id, second_event.operation_id)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
@client_context.require_failCommand_fail_point
def test_collects_write_concern_errors_across_batches(self):
listener = OvertCommandListener()
@ -194,7 +188,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(len(bulk_write_events), 2)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_collects_write_errors_across_batches_unordered(self):
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener])
@ -224,7 +217,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(len(bulk_write_events), 2)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_collects_write_errors_across_batches_ordered(self):
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener])
@ -254,7 +246,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(len(bulk_write_events), 1)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_handles_cursor_requiring_getMore(self):
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener])
@ -294,7 +285,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertTrue(get_more_event)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
@client_context.require_no_standalone
def test_handles_cursor_requiring_getMore_within_transaction(self):
listener = OvertCommandListener()
@ -337,7 +327,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertTrue(get_more_event)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
@client_context.require_failCommand_fail_point
def test_handles_getMore_error(self):
listener = OvertCommandListener()
@ -392,7 +381,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertTrue(kill_cursors_event)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_returns_error_if_unacknowledged_too_large_insert(self):
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener])
@ -448,7 +436,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
return num_models, models
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_no_batch_splits_if_new_namespace_is_not_too_large(self):
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener])
@ -479,7 +466,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(event.command["nsInfo"][0]["ns"], "db.coll")
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_batch_splits_if_new_namespace_is_too_large(self):
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener])
@ -517,7 +503,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(second_event.command["nsInfo"][0]["ns"], namespace)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_returns_error_if_no_writes_can_be_added_to_ops(self):
client = self.rs_or_single_client()
@ -535,7 +520,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
client.bulk_write(models=models)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
@unittest.skipUnless(_HAVE_PYMONGOCRYPT, "pymongocrypt is not installed")
def test_returns_error_if_auto_encryption_configured(self):
opts = AutoEncryptionOpts(
@ -552,7 +536,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_upserted_result(self):
client = self.rs_or_single_client()
@ -592,7 +575,6 @@ class TestClientBulkWriteCRUD(IntegrationTest):
self.assertEqual(result.update_results[2].did_upsert, False)
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_15_unacknowledged_write_across_batches(self):
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener])
@ -641,7 +623,6 @@ class TestClientBulkWriteCSOT(IntegrationTest):
self.max_message_size_bytes = client_context.max_message_size_bytes
@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
@client_context.require_failCommand_fail_point
def test_timeout_in_multi_batch_bulk_write(self):
_OVERHEAD = 500

View File

@ -36,16 +36,6 @@ class TestClientContext(UnitTest):
),
)
def test_serverless(self):
if not os.environ.get("TEST_SERVERLESS"):
raise SkipTest("TEST_SERVERLESS is not set")
self.assertTrue(
client_context.connected and client_context.serverless,
"client context must be connected to serverless when "
f"TEST_SERVERLESS is set. Failed attempts:\n{client_context.connection_attempt_info()}",
)
def test_enableTestCommands_is_disabled(self):
if not os.environ.get("DISABLE_TEST_COMMANDS"):
raise SkipTest("DISABLE_TEST_COMMANDS is not set")

View File

@ -33,7 +33,7 @@ else:
_TEST_PATH = os.path.join(pathlib.Path(__file__).resolve().parent.parent, "crud", "unified")
# Generate unified tests.
globals().update(generate_test_classes(_TEST_PATH, module=__name__, RUN_ON_SERVERLESS=True))
globals().update(generate_test_classes(_TEST_PATH, module=__name__))
if __name__ == "__main__":
unittest.main()

View File

@ -41,7 +41,6 @@ globals().update(generate_test_classes(TEST_PATH, module=__name__))
class TestCSOT(IntegrationTest):
RUN_ON_SERVERLESS = True
RUN_ON_LOAD_BALANCER = True
def test_timeout_nested(self):

View File

@ -443,7 +443,6 @@ class TestPoolManagement(IntegrationTest):
class TestServerMonitoringMode(IntegrationTest):
@client_context.require_no_serverless
@client_context.require_no_load_balancer
def setUp(self):
super().setUp()

View File

@ -54,7 +54,6 @@ globals().update(generate_test_classes(_TEST_PATH, module=__name__))
class TestLB(IntegrationTest):
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
def test_connections_are_only_returned_once(self):
if "PyPy" in sys.version:
@ -142,10 +141,8 @@ class TestLB(IntegrationTest):
session = client.start_session()
session.start_transaction()
client.test_session_gc.test.find_one({}, session=session)
# Cleanup the transaction left open on the server unless we're
# testing serverless which does not support killSessions.
if not client_context.serverless:
self.addCleanup(self.client.admin.command, "killSessions", [session.session_id])
# Cleanup the transaction left open on the server
self.addCleanup(self.client.admin.command, "killSessions", [session.session_id])
if client_context.load_balancer:
self.assertEqual(pool.active_sockets, 1) # Pinned.

View File

@ -80,7 +80,6 @@ class FindThread(threading.Thread):
class TestPoolPausedError(IntegrationTest):
# Pools don't get paused in load balanced mode.
RUN_ON_LOAD_BALANCER = False
RUN_ON_SERVERLESS = False
@client_context.require_sync
@client_context.require_failCommand_blockConnection

View File

@ -129,7 +129,6 @@ def non_retryable_single_statement_ops(coll):
class IgnoreDeprecationsTest(IntegrationTest):
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
deprecation_filter: DeprecationFilter
def setUp(self) -> None:
@ -421,7 +420,6 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
class TestWriteConcernError(IntegrationTest):
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
fail_insert: dict
@client_context.require_replica_set
@ -492,7 +490,6 @@ class InsertThread(threading.Thread):
class TestPoolPausedError(IntegrationTest):
# Pools don't get paused in load balanced mode.
RUN_ON_LOAD_BALANCER = False
RUN_ON_SERVERLESS = False
@client_context.require_sync
@client_context.require_failCommand_blockConnection

View File

@ -194,10 +194,11 @@ class TestSession(IntegrationTest):
# successful connection checkout" test from Driver Sessions Spec.
succeeded = False
lsid_set = set()
failures = 0
for _ in range(5):
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener], maxPoolSize=1)
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener], maxPoolSize=1)
# Retry up to 10 times because there is a known race that can cause multiple
# sessions to be used: connection check in happens before session check in
for _ in range(10):
cursor = client.db.test.find({})
ops: List[Tuple[Callable, List[Any]]] = [
(client.db.test.find_one, [{"_id": 1}]),
@ -240,9 +241,9 @@ class TestSession(IntegrationTest):
if i.command.get("lsid"):
lsid_set.add(i.command.get("lsid")["id"])
if len(lsid_set) == 1:
# Break on first success.
succeeded = True
else:
failures += 1
break
self.assertTrue(succeeded, lsid_set)
def test_pool_lifo(self):

View File

@ -73,8 +73,6 @@ class TransactionsBase(SpecRunner):
class TestTransactions(TransactionsBase):
RUN_ON_SERVERLESS = True
@client_context.require_transactions
def test_transaction_options_validation(self):
default_options = TransactionOptions()

View File

@ -42,7 +42,6 @@ globals().update(
expected_failures=[
"Client side error in command starting transaction", # PYTHON-1894
],
RUN_ON_SERVERLESS=False,
)
)
@ -56,7 +55,6 @@ globals().update(
expected_failures=[
".*", # All tests expected to fail
],
RUN_ON_SERVERLESS=False,
)
)

View File

@ -40,7 +40,6 @@ globals().update(generate_test_classes(TEST_PATH, module=__name__))
class TestServerApiIntegration(IntegrationTest):
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
def assertServerApi(self, event):
self.assertIn("apiVersion", event.command)

View File

@ -130,14 +130,6 @@ def is_run_on_requirement_satisfied(requirement):
if req_max_server_version:
max_version_satisfied = Version.from_string(req_max_server_version) >= server_version
serverless = requirement.get("serverless")
if serverless == "require":
serverless_satisfied = client_context.serverless
elif serverless == "forbid":
serverless_satisfied = not client_context.serverless
else: # unset or "allow"
serverless_satisfied = True
params_satisfied = True
params = requirement.get("serverParameters")
if params:
@ -167,7 +159,6 @@ def is_run_on_requirement_satisfied(requirement):
topology_satisfied
and min_version_satisfied
and max_version_satisfied
and serverless_satisfied
and params_satisfied
and auth_satisfied
and csfle_satisfied
@ -283,7 +274,7 @@ class EntityMapUtil:
self._listeners[spec["id"]] = listener
kwargs["event_listeners"] = [listener]
if spec.get("useMultipleMongoses"):
if client_context.load_balancer or client_context.serverless:
if client_context.load_balancer:
kwargs["h"] = client_context.MULTI_MONGOS_LB_URI
elif client_context.is_mongos:
kwargs["h"] = client_context.mongos_seeds()
@ -439,7 +430,6 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
SCHEMA_VERSION = Version.from_string("1.22")
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
TEST_SPEC: Any
TEST_PATH = "" # This gets filled in by generate_test_classes
mongos_clients: list[MongoClient] = []
@ -510,11 +500,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
# Handle mongos_clients for transactions tests.
self.mongos_clients = []
if (
client_context.supports_transactions()
and not client_context.load_balancer
and not client_context.serverless
):
if client_context.supports_transactions() and not client_context.load_balancer:
for address in client_context.mongoses:
self.mongos_clients.append(self.single_client("{}:{}".format(*address)))
@ -551,12 +537,6 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
self.skipTest("PYTHON-5170 tests are flakey")
if "Driver extends timeout while streaming" in spec["description"] and not _IS_SYNC:
self.skipTest("PYTHON-5174 tests are flakey")
if (
"inserting _id with type null via clientBulkWrite" in spec["description"]
or "commitTransaction fails after Interrupted" in spec["description"]
or "commit is not retried after MaxTimeMSExpired error" in spec["description"]
) and client_context.serverless:
self.skipTest("PYTHON-5326 known serverless failures")
class_name = self.__class__.__name__.lower()
description = spec["description"].lower()

View File

@ -124,18 +124,6 @@ class SpecTestCreator:
if max_ver is not None:
method = client_context.require_version_max(*max_ver)(method)
if "serverless" in scenario_def:
serverless = scenario_def["serverless"]
if serverless == "require":
serverless_satisfied = client_context.serverless
elif serverless == "forbid":
serverless_satisfied = not client_context.serverless
else: # unset or "allow"
serverless_satisfied = True
method = unittest.skipUnless(
serverless_satisfied, "Serverless requirement not satisfied"
)(method)
return method
@staticmethod
@ -168,16 +156,6 @@ class SpecTestCreator:
return not client_context.auth_enabled
return True
@staticmethod
def serverless_ok(run_on_req):
serverless = run_on_req["serverless"]
if serverless == "require":
return client_context.serverless
elif serverless == "forbid":
return not client_context.serverless
else: # unset or "allow"
return True
def should_run_on(self, scenario_def):
run_on = scenario_def.get("runOn", [])
if not run_on:
@ -190,7 +168,6 @@ class SpecTestCreator:
and self.min_server_version(req)
and self.max_server_version(req)
and self.valid_auth_enabled(req)
and self.serverless_ok(req)
):
return True
return False
@ -677,7 +654,7 @@ class SpecRunner(IntegrationTest):
use_multi_mongos = test["useMultipleMongoses"]
host = None
if use_multi_mongos:
if client_context.load_balancer or client_context.serverless:
if client_context.load_balancer:
host = client_context.MULTI_MONGOS_LB_URI
elif client_context.is_mongos:
host = client_context.mongos_seeds()