PYTHON-2762 Avoid duplicating unified test files for LB testing (#649)

Create new client for each cursor/session __del__ test.
Always close cursors in spec tests.
This commit is contained in:
Shane Harvey 2021-06-25 16:20:21 -07:00 committed by GitHub
parent 14160aed04
commit b4b7a07b81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 97 additions and 264 deletions

View File

@ -197,11 +197,7 @@ if [ -z "$GREEN_FRAMEWORK" ]; then
$PYTHON -c "from bson import _cbson; from pymongo import _cmessage"
fi
if [ -n "$TEST_LOADBALANCER" ]; then
$PYTHON -m xmlrunner discover -s test/load_balancer -v --locals -o $XUNIT_DIR
else
$PYTHON $COVERAGE_ARGS setup.py $C_EXTENSIONS test $TEST_ARGS $OUTPUT
fi
$PYTHON $COVERAGE_ARGS setup.py $C_EXTENSIONS test $TEST_ARGS $OUTPUT
else
# --no_ext has to come before "test" so there is no way to toggle extensions here.
$PYTHON green_framework_test.py $GREEN_FRAMEWORK $OUTPUT

View File

@ -589,6 +589,24 @@ class ClientContext(object):
return self._require(lambda: sec_count() >= count,
"Not enough secondaries available")
@property
def supports_secondary_read_pref(self):
if self.has_secondaries:
return True
if self.is_mongos:
shard = self.client.config.shards.find_one()['host']
num_members = shard.count(',') + 1
return num_members > 1
return False
def require_secondary_read_pref(self):
"""Run a test only if the client is connected to a cluster that
supports secondary read preference
"""
return self._require(lambda: self.supports_secondary_read_pref,
"This cluster does not support secondary read "
"preference")
def require_no_replica_set(self, func):
"""Run a test if the client is *not* connected to a replica set."""
return self._require(
@ -639,6 +657,13 @@ class ClientContext(object):
"Must be connected to a load balancer",
func=func)
def require_no_load_balancer(self, func):
"""Run a test only if the client is not connected to a load balancer.
"""
return self._require(lambda: not self.load_balancer,
"Must not be connected to a load balancer",
func=func)
def check_auth_with_sharding(self, func):
"""Skip a test when connected to mongos < 2.0 and running with auth."""
condition = lambda: not (self.auth_enabled and
@ -852,6 +877,9 @@ class IntegrationTest(PyMongoTestCase):
@classmethod
@client_context.require_connection
def setUpClass(cls):
if (client_context.load_balancer and
not getattr(cls, 'RUN_ON_LOAD_BALANCER', False)):
raise SkipTest('this test does not support load balancers')
cls.client = client_context.client
cls.db = cls.client.pymongo_test
if client_context.auth_enabled:
@ -875,6 +903,14 @@ class MockClientTest(unittest.TestCase):
The class temporarily overrides HEARTBEAT_FREQUENCY to speed up tests.
"""
# MockClients tests that use replicaSet, directConnection=True, pass
# multiple seed addresses, or wait for heartbeat events are incompatible
# with loadBalanced=True.
@classmethod
@client_context.require_no_load_balancer
def setUpClass(cls):
pass
def setUp(self):
super(MockClientTest, self).setUp()

View File

@ -1,23 +0,0 @@
# Copyright 2015-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path[0:0] = [""]
from test import unittest
from test.test_command_monitoring_unified import *
if __name__ == "__main__":
unittest.main()

View File

@ -1,23 +0,0 @@
# Copyright 2021-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path[0:0] = [""]
from test import unittest
from test.test_crud_unified import *
if __name__ == '__main__':
unittest.main()

View File

@ -1,23 +0,0 @@
# Copyright 2021-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path[0:0] = [""]
from test import unittest
from test.test_dns import *
if __name__ == '__main__':
unittest.main()

View File

@ -1,26 +0,0 @@
# Copyright 2021-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test the Load Balancer unified spec tests."""
import sys
sys.path[0:0] = [""]
from test import unittest
from test.test_load_balancer import *
if __name__ == "__main__":
unittest.main()

View File

@ -1,23 +0,0 @@
# Copyright 2021-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path[0:0] = [""]
from test import unittest
from test.test_change_stream import *
if __name__ == '__main__':
unittest.main()

View File

@ -1,23 +0,0 @@
# Copyright 2021-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path[0:0] = [""]
from test import unittest
from test.test_retryable_reads import *
if __name__ == '__main__':
unittest.main()

View File

@ -1,23 +0,0 @@
# Copyright 2021-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path[0:0] = [""]
from test import unittest
from test.test_retryable_writes import *
if __name__ == '__main__':
unittest.main()

View File

@ -1,23 +0,0 @@
# Copyright 2021-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path[0:0] = [""]
from test import unittest
from test.test_transactions_unified import *
if __name__ == '__main__':
unittest.main()

View File

@ -1,23 +0,0 @@
# Copyright 2021-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path[0:0] = [""]
from test import unittest
from test.test_uri_spec import *
if __name__ == '__main__':
unittest.main()

View File

@ -1,23 +0,0 @@
# Copyright 2021-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path[0:0] = [""]
from test import unittest
from test.test_versioned_api import *
if __name__ == '__main__':
unittest.main()

View File

@ -49,6 +49,8 @@ from test.utils import (
class TestChangeStreamBase(IntegrationTest):
RUN_ON_LOAD_BALANCER = True
def change_stream_with_client(self, client, *args, **kwargs):
"""Create a change stream using the given client and return it."""
raise NotImplementedError
@ -1038,7 +1040,8 @@ class TestCollectionChangeStream(TestChangeStreamBase, APITestsMixin,
pass
class TestAllLegacyScenarios(unittest.TestCase):
class TestAllLegacyScenarios(IntegrationTest):
RUN_ON_LOAD_BALANCER = True
@classmethod
@client_context.require_connection

View File

@ -22,12 +22,12 @@ sys.path[0:0] = [""]
from pymongo.errors import ConnectionFailure
from pymongo.ismaster import IsMaster
from pymongo.monitor import Monitor
from test import unittest, client_knobs
from test import unittest, client_knobs, IntegrationTest
from test.utils import (HeartbeatEventListener, MockPool, single_client,
wait_until)
class TestHeartbeatMonitoring(unittest.TestCase):
class TestHeartbeatMonitoring(IntegrationTest):
def create_mock_monitor(self, responses, uri, expected_results):
listener = HeartbeatEventListener()

View File

@ -22,18 +22,22 @@ import threading
sys.path[0:0] = [""]
from test import unittest, IntegrationTest, client_context
from test.utils import get_pool, wait_until, ExceptionCatchingThread
from test.utils import (ExceptionCatchingThread,
get_pool,
rs_client,
wait_until)
from test.unified_format import generate_test_classes
# Location of JSON test specifications.
TEST_PATH = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'load_balancer', 'unified')
os.path.dirname(os.path.realpath(__file__)), 'load_balancer')
# Generate unified tests.
globals().update(generate_test_classes(TEST_PATH, module=__name__))
class TestLB(IntegrationTest):
RUN_ON_LOAD_BALANCER = True
def test_connections_are_only_returned_once(self):
pool = get_pool(self.client)
@ -45,11 +49,14 @@ class TestLB(IntegrationTest):
@client_context.require_load_balancer
def test_unpin_committed_transaction(self):
pool = get_pool(self.client)
with self.client.start_session() as session:
client = rs_client()
self.addCleanup(client.close)
pool = get_pool(client)
coll = client[self.db.name].test
with client.start_session() as session:
with session.start_transaction():
self.assertEqual(pool.active_sockets, 0)
self.db.test.insert_one({}, session=session)
coll.insert_one({}, session=session)
self.assertEqual(pool.active_sockets, 1) # Pinned.
self.assertEqual(pool.active_sockets, 1) # Still pinned.
self.assertEqual(pool.active_sockets, 0) # Unpinned.
@ -75,9 +82,12 @@ class TestLB(IntegrationTest):
self._test_no_gc_deadlock(create_resource)
def _test_no_gc_deadlock(self, create_resource):
pool = get_pool(self.client)
client = rs_client()
self.addCleanup(client.close)
pool = get_pool(client)
coll = client[self.db.name].test
coll.insert_many([{} for _ in range(10)])
self.assertEqual(pool.active_sockets, 0)
self.db.test.insert_many([{} for _ in range(10)])
# Cause the initial find attempt to fail to induce a reference cycle.
args = {
"mode": {
@ -92,7 +102,7 @@ class TestLB(IntegrationTest):
}
}
with self.fail_point(args):
resource = create_resource(self.db.test)
resource = create_resource(coll)
if client_context.load_balancer:
self.assertEqual(pool.active_sockets, 1) # Pinned.
@ -102,7 +112,9 @@ class TestLB(IntegrationTest):
# Garbage collect the resource while the pool is locked to ensure we
# don't deadlock.
del resource
gc.collect()
# On PyPy it can take a few rounds to collect the cursor.
for _ in range(3):
gc.collect()
thread.unlock.set()
thread.join(5)
self.assertFalse(thread.is_alive())
@ -110,15 +122,16 @@ class TestLB(IntegrationTest):
wait_until(lambda: pool.active_sockets == 0, 'return socket')
# Run another operation to ensure the socket still works.
self.db.test.delete_many({})
coll.delete_many({})
@client_context.require_transactions
def test_session_gc(self):
pool = get_pool(self.client)
self.assertEqual(pool.active_sockets, 0)
session = self.client.start_session()
client = rs_client()
self.addCleanup(client.close)
pool = get_pool(client)
session = client.start_session()
session.start_transaction()
self.client.test_session_gc.test.find_one({}, session=session)
client.test_session_gc.test.find_one({}, session=session)
if client_context.load_balancer:
self.assertEqual(pool.active_sockets, 1) # Pinned.
@ -138,7 +151,7 @@ class TestLB(IntegrationTest):
wait_until(lambda: pool.active_sockets == 0, 'return socket')
# Run another operation to ensure the socket still works.
self.db.test.delete_many({})
client[self.db.name].test.delete_many({})
class PoolLocker(ExceptionCatchingThread):

View File

@ -28,6 +28,7 @@ from test.utils import connected, wait_until
@client_context.require_connection
@client_context.require_no_load_balancer
def setUpModule():
pass

View File

@ -601,10 +601,11 @@ class TestMongosAndReadPreference(unittest.TestCase):
def test_send_hedge(self):
cases = {
'primaryPreferred': PrimaryPreferred,
'secondary': Secondary,
'secondaryPreferred': SecondaryPreferred,
'nearest': Nearest,
}
if client_context.supports_secondary_read_pref:
cases['secondary'] = Secondary
listener = OvertCommandListener()
client = rs_client(event_listeners=[listener])
self.addCleanup(client.close)

View File

@ -26,6 +26,7 @@ from test.utils import wait_until
@client_context.require_connection
@client_context.require_no_load_balancer
def setUpModule():
pass

View File

@ -51,6 +51,7 @@ class TestClientOptions(PyMongoTestCase):
class TestSpec(SpecRunner):
RUN_ON_LOAD_BALANCER = True
@classmethod
@client_context.require_failCommand_fail_point

View File

@ -54,6 +54,7 @@ _TEST_PATH = os.path.join(
class TestAllScenarios(SpecRunner):
RUN_ON_LOAD_BALANCER = True
def get_object_name(self, op):
return op.get('object', 'collection')
@ -121,6 +122,7 @@ def non_retryable_single_statement_ops(coll):
class IgnoreDeprecationsTest(IntegrationTest):
RUN_ON_LOAD_BALANCER = True
@classmethod
def setUpClass(cls):
@ -417,6 +419,8 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
class TestWriteConcernError(IntegrationTest):
RUN_ON_LOAD_BALANCER = True
@classmethod
@client_context.require_replica_set
@client_context.require_no_mmap

View File

@ -168,12 +168,11 @@ def compare_multiple_events(i, expected_results, actual_results):
return j, True, ''
class TestAllScenarios(unittest.TestCase):
class TestAllScenarios(IntegrationTest):
@classmethod
@client_context.require_connection
def setUp(cls):
cls.all_listener = ServerAndTopologyEventListener()
def setUp(self):
super(TestAllScenarios, self).setUp()
self.all_listener = ServerAndTopologyEventListener()
def create_test(scenario_def):

View File

@ -33,6 +33,8 @@ globals().update(generate_test_classes(TEST_PATH, module=__name__))
class TestServerApi(IntegrationTest):
RUN_ON_LOAD_BALANCER = True
def test_server_api_defaults(self):
api = ServerApi(ServerApiVersion.V1)
self.assertEqual(api.version, '1')

View File

@ -638,6 +638,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
a class attribute ``TEST_SPEC``.
"""
SCHEMA_VERSION = Version.from_string('1.5')
RUN_ON_LOAD_BALANCER = True
@staticmethod
def should_run_on(run_on_spec):
@ -776,7 +777,9 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
self.skipTest("MMAPv1 does not support change streams")
self.__raise_if_unsupported(
'createChangeStream', target, MongoClient, Database, Collection)
return target.watch(*args, **kwargs)
stream = target.watch(*args, **kwargs)
self.addCleanup(stream.close)
return stream
def _clientOperation_createChangeStream(self, target, *args, **kwargs):
return self.__entityOperation_createChangeStream(
@ -821,7 +824,9 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
def _collectionOperation_createFindCursor(self, target, *args, **kwargs):
self.__raise_if_unsupported('find', target, Collection)
return NonLazyCursor(target.find(*args, **kwargs))
cursor = NonLazyCursor(target.find(*args, **kwargs))
self.addCleanup(cursor.close)
return cursor
def _collectionOperation_listIndexes(self, target, *args, **kwargs):
if 'batch_size' in kwargs:

View File

@ -299,6 +299,10 @@ class SpecRunner(IntegrationTest):
arguments = args
result = cmd(**dict(arguments))
# Cleanup open change stream cursors.
if name == "watch":
self.addCleanup(result.close)
if name == "aggregate":
if arguments["pipeline"] and "$out" in arguments["pipeline"][-1]:
# Read from the primary to ensure causal consistency.