PYTHON-4617 Skip unified retryable writes tests on MMAPv1 (#1841)

This commit is contained in:
Shane Harvey 2024-09-06 10:46:10 -07:00 committed by GitHub
parent 1eb3b8550e
commit 6bdaf19c78
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 62 additions and 248 deletions

View File

@ -15,7 +15,6 @@
"""Execute Transactions Spec tests."""
from __future__ import annotations
import os
import sys
from io import BytesIO
@ -23,8 +22,7 @@ from gridfs.asynchronous.grid_file import AsyncGridFS, AsyncGridFSBucket
sys.path[0:0] = [""]
from test.asynchronous import async_client_context, unittest
from test.asynchronous.utils_spec_runner import AsyncSpecRunner
from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
from test.utils import (
OvertCommandListener,
async_rs_client,
@ -54,8 +52,6 @@ from pymongo.read_preferences import ReadPreference
_IS_SYNC = False
_TXN_TESTS_DEBUG = os.environ.get("TRANSACTION_TESTS_DEBUG")
# Max number of operations to perform after a transaction to prove unpinning
# occurs. Chosen so that there's a low false positive rate. With 2 mongoses,
# 50 attempts yields a one in a quadrillion chance of a false positive
@ -63,31 +59,7 @@ _TXN_TESTS_DEBUG = os.environ.get("TRANSACTION_TESTS_DEBUG")
UNPIN_TEST_MAX_ATTEMPTS = 50
class AsyncTransactionsBase(AsyncSpecRunner):
@classmethod
async def _setup_class(cls):
await super()._setup_class()
if async_client_context.supports_transactions():
for address in async_client_context.mongoses:
cls.mongos_clients.append(await async_single_client("{}:{}".format(*address)))
@classmethod
async def _tearDown_class(cls):
for client in cls.mongos_clients:
await client.close()
await super()._tearDown_class()
def maybe_skip_scenario(self, test):
super().maybe_skip_scenario(test)
if (
"secondary" in self.id()
and not async_client_context.is_mongos
and not async_client_context.has_secondaries
):
raise unittest.SkipTest("No secondaries")
class TestTransactions(AsyncTransactionsBase):
class TestTransactions(AsyncIntegrationTest):
RUN_ON_SERVERLESS = True
@async_client_context.require_transactions
@ -421,7 +393,31 @@ class PatchSessionTimeout:
client_session._WITH_TRANSACTION_RETRY_TIME_LIMIT = self.real_timeout
class TestTransactionsConvenientAPI(AsyncTransactionsBase):
class TestTransactionsConvenientAPI(AsyncIntegrationTest):
@classmethod
async def _setup_class(cls):
await super()._setup_class()
cls.mongos_clients = []
if async_client_context.supports_transactions():
for address in async_client_context.mongoses:
cls.mongos_clients.append(await async_single_client("{}:{}".format(*address)))
@classmethod
async def _tearDown_class(cls):
for client in cls.mongos_clients:
await client.close()
await super()._tearDown_class()
async def _set_fail_point(self, client, command_args):
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
await client.admin.command(cmd)
async def set_fail_point(self, command_args):
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
await self._set_fail_point(client, command_args)
@async_client_context.require_transactions
async def test_callback_raises_custom_error(self):
class _MyException(Exception):

View File

@ -1,55 +0,0 @@
# Copyright 2020-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""v2 format CRUD test runner.
https://github.com/mongodb/specifications/blob/master/source/crud/tests/README.rst
"""
from __future__ import annotations
from test.utils_spec_runner import SpecRunner
class TestCrudV2(SpecRunner):
# Default test database and collection names.
TEST_DB = None
TEST_COLLECTION = None
def allowable_errors(self, op):
"""Override expected error classes."""
errors = super().allowable_errors(op)
errors += (ValueError,)
return errors
def get_scenario_db_name(self, scenario_def):
"""Crud spec says database_name is optional."""
return scenario_def.get("database_name", self.TEST_DB)
def get_scenario_coll_name(self, scenario_def):
"""Crud spec says collection_name is optional."""
return scenario_def.get("collection_name", self.TEST_COLLECTION)
def get_object_name(self, op):
"""Crud spec says object is optional and defaults to 'collection'."""
return op.get("object", "collection")
def get_outcome_coll_name(self, outcome, collection):
"""Crud spec says outcome has an optional 'collection.name'."""
return outcome["collection"].get("name", collection.name)
def setup_scenario(self, scenario_def):
"""Allow specs to override a test's setup."""
# PYTHON-1935 Only create the collection if there is data to insert.
if scenario_def["data"]:
super().setup_scenario(scenario_def)

View File

@ -20,7 +20,6 @@ import pprint
import sys
import threading
from bson import SON
from pymongo.errors import AutoReconnect
sys.path[0:0] = [""]
@ -34,14 +33,10 @@ from test import (
)
from test.utils import (
CMAPListener,
EventListener,
OvertCommandListener,
SpecTestCreator,
rs_client,
rs_or_single_client,
set_fail_point,
)
from test.utils_spec_runner import SpecRunner
from pymongo.monitoring import (
ConnectionCheckedOutEvent,
@ -50,7 +45,6 @@ from pymongo.monitoring import (
PoolClearedEvent,
)
from pymongo.synchronous.mongo_client import MongoClient
from pymongo.write_concern import WriteConcern
# Location of JSON test specifications.
_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "retryable_reads", "legacy")
@ -74,81 +68,6 @@ class TestClientOptions(PyMongoTestCase):
self.assertEqual(client.options.retry_reads, False)
class TestSpec(SpecRunner):
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
@classmethod
@client_context.require_failCommand_fail_point
# TODO: remove this once PYTHON-1948 is done.
@client_context.require_no_mmap
def setUpClass(cls):
super().setUpClass()
def maybe_skip_scenario(self, test):
super().maybe_skip_scenario(test)
skip_names = ["listCollectionObjects", "listIndexNames", "listDatabaseObjects"]
for name in skip_names:
if name.lower() in test["description"].lower():
self.skipTest(f"PyMongo does not support {name}")
# Serverless does not support $out and collation.
if client_context.serverless:
for operation in test["operations"]:
if operation["name"] == "aggregate":
for stage in operation["arguments"]["pipeline"]:
if "$out" in stage:
self.skipTest("MongoDB Serverless does not support $out")
if "collation" in operation["arguments"]:
self.skipTest("MongoDB Serverless does not support collations")
# Skip changeStream related tests on MMAPv1 and serverless.
test_name = self.id().rsplit(".")[-1]
if "changestream" in test_name.lower():
if client_context.storage_engine == "mmapv1":
self.skipTest("MMAPv1 does not support change streams.")
if client_context.serverless:
self.skipTest("Serverless does not support change streams.")
def get_scenario_coll_name(self, scenario_def):
"""Override a test's collection name to support GridFS tests."""
if "bucket_name" in scenario_def:
return scenario_def["bucket_name"]
return super().get_scenario_coll_name(scenario_def)
def setup_scenario(self, scenario_def):
"""Override a test's setup to support GridFS tests."""
if "bucket_name" in scenario_def:
data = scenario_def["data"]
db_name = self.get_scenario_db_name(scenario_def)
db = client_context.client[db_name]
# Create a bucket for the retryable reads GridFS tests with as few
# majority writes as possible.
wc = WriteConcern(w="majority")
if data:
db["fs.chunks"].drop()
db["fs.files"].drop()
db["fs.chunks"].insert_many(data["fs.chunks"])
db.get_collection("fs.files", write_concern=wc).insert_many(data["fs.files"])
else:
db.get_collection("fs.chunks").drop()
db.get_collection("fs.files", write_concern=wc).drop()
else:
super().setup_scenario(scenario_def)
def create_test(scenario_def, test, name):
@client_context.require_test_commands
def run_scenario(self):
self.run_scenario(scenario_def, test)
return run_scenario
test_creator = SpecTestCreator(create_test, TestSpec, _TEST_PATH)
test_creator.create_tests()
class FindThread(threading.Thread):
def __init__(self, collection):
super().__init__()

View File

@ -16,7 +16,6 @@
from __future__ import annotations
import copy
import os
import pprint
import sys
import threading
@ -29,11 +28,9 @@ from test.utils import (
DeprecationFilter,
EventListener,
OvertCommandListener,
SpecTestCreator,
rs_or_single_client,
set_fail_point,
)
from test.utils_spec_runner import SpecRunner
from test.version import Version
from bson.codec_options import DEFAULT_CODEC_OPTIONS
@ -65,9 +62,6 @@ from pymongo.operations import (
from pymongo.synchronous.mongo_client import MongoClient
from pymongo.write_concern import WriteConcern
# Location of JSON test specifications.
_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "retryable_writes", "legacy")
class InsertEventListener(EventListener):
def succeeded(self, event: CommandSucceededEvent) -> None:
@ -89,44 +83,6 @@ class InsertEventListener(EventListener):
)
class TestAllScenarios(SpecRunner):
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
def get_object_name(self, op):
return op.get("object", "collection")
def get_scenario_db_name(self, scenario_def):
return scenario_def.get("database_name", "pymongo_test")
def get_scenario_coll_name(self, scenario_def):
return scenario_def.get("collection_name", "test")
def run_test_ops(self, sessions, collection, test):
# Transform retryable writes spec format into transactions.
operation = test["operation"]
outcome = test["outcome"]
if "error" in outcome:
operation["error"] = outcome["error"]
if "result" in outcome:
operation["result"] = outcome["result"]
test["operations"] = [operation]
super().run_test_ops(sessions, collection, test)
def create_test(scenario_def, test, name):
@client_context.require_test_commands
@client_context.require_no_mmap
def run_scenario(self):
self.run_scenario(scenario_def, test)
return run_scenario
test_creator = SpecTestCreator(create_test, TestAllScenarios, _TEST_PATH)
test_creator.create_tests()
def retryable_single_statement_ops(coll):
return [
(coll.bulk_write, [[InsertOne({}), InsertOne({})]], {}),

View File

@ -15,7 +15,6 @@
"""Execute Transactions Spec tests."""
from __future__ import annotations
import os
import sys
from io import BytesIO
@ -23,14 +22,13 @@ from gridfs.synchronous.grid_file import GridFS, GridFSBucket
sys.path[0:0] = [""]
from test import client_context, unittest
from test import IntegrationTest, client_context, unittest
from test.utils import (
OvertCommandListener,
rs_client,
single_client,
wait_until,
)
from test.utils_spec_runner import SpecRunner
from typing import List
from bson import encode
@ -54,8 +52,6 @@ from pymongo.synchronous.helpers import next
_IS_SYNC = True
_TXN_TESTS_DEBUG = os.environ.get("TRANSACTION_TESTS_DEBUG")
# Max number of operations to perform after a transaction to prove unpinning
# occurs. Chosen so that there's a low false positive rate. With 2 mongoses,
# 50 attempts yields a one in a quadrillion chance of a false positive
@ -63,31 +59,7 @@ _TXN_TESTS_DEBUG = os.environ.get("TRANSACTION_TESTS_DEBUG")
UNPIN_TEST_MAX_ATTEMPTS = 50
class TransactionsBase(SpecRunner):
@classmethod
def _setup_class(cls):
super()._setup_class()
if client_context.supports_transactions():
for address in client_context.mongoses:
cls.mongos_clients.append(single_client("{}:{}".format(*address)))
@classmethod
def _tearDown_class(cls):
for client in cls.mongos_clients:
client.close()
super()._tearDown_class()
def maybe_skip_scenario(self, test):
super().maybe_skip_scenario(test)
if (
"secondary" in self.id()
and not client_context.is_mongos
and not client_context.has_secondaries
):
raise unittest.SkipTest("No secondaries")
class TestTransactions(TransactionsBase):
class TestTransactions(IntegrationTest):
RUN_ON_SERVERLESS = True
@client_context.require_transactions
@ -417,7 +389,31 @@ class PatchSessionTimeout:
client_session._WITH_TRANSACTION_RETRY_TIME_LIMIT = self.real_timeout
class TestTransactionsConvenientAPI(TransactionsBase):
class TestTransactionsConvenientAPI(IntegrationTest):
@classmethod
def _setup_class(cls):
super()._setup_class()
cls.mongos_clients = []
if client_context.supports_transactions():
for address in client_context.mongoses:
cls.mongos_clients.append(single_client("{}:{}".format(*address)))
@classmethod
def _tearDown_class(cls):
for client in cls.mongos_clients:
client.close()
super()._tearDown_class()
def _set_fail_point(self, client, command_args):
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
client.admin.command(cmd)
def set_fail_point(self, command_args):
clients = self.mongos_clients if self.mongos_clients else [self.client]
for client in clients:
self._set_fail_point(client, command_args)
@client_context.require_transactions
def test_callback_raises_custom_error(self):
class _MyException(Exception):

View File

@ -1100,6 +1100,13 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
if not cls.should_run_on(run_on_spec):
raise unittest.SkipTest(f"{cls.__name__} runOnRequirements not satisfied")
# add any special-casing for skipping tests here
if client_context.storage_engine == "mmapv1":
if "retryable-writes" in cls.TEST_SPEC["description"] or "retryable_writes" in str(
cls.TEST_PATH
):
raise unittest.SkipTest("MMAPv1 does not support retryWrites=True")
# Handle mongos_clients for transactions tests.
cls.mongos_clients = []
if (
@ -1110,11 +1117,6 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
for address in client_context.mongoses:
cls.mongos_clients.append(single_client("{}:{}".format(*address)))
# add any special-casing for skipping tests here
if client_context.storage_engine == "mmapv1":
if "retryable-writes" in cls.TEST_SPEC["description"]:
raise unittest.SkipTest("MMAPv1 does not support retryWrites=True")
# Speed up the tests by decreasing the heartbeat frequency.
cls.knobs = client_knobs(
heartbeat_frequency=0.1,
@ -2157,7 +2159,7 @@ def generate_test_classes(
raise ValueError(
f"test file '{fpath}' has unsupported schemaVersion '{schema_version}'"
)
module_dict = {"__module__": module}
module_dict = {"__module__": module, "TEST_PATH": test_path}
module_dict.update(kwargs)
test_klasses[class_name] = type(
class_name,