PYTHON-2268 Close clients in test suite

This commit is contained in:
Shane Harvey 2020-05-29 17:28:09 -07:00
parent 4760d07815
commit 5b49557c59
21 changed files with 128 additions and 41 deletions

View File

@ -50,6 +50,10 @@ class PeriodicExecutor(object):
self._thread_will_exit = False
self._lock = threading.Lock()
def __repr__(self):
return '<%s(name=%s) object at 0x%x>' % (
self.__class__.__name__, self._name, id(self))
def open(self):
"""Start. Multiple calls have no effect.

View File

@ -15,6 +15,7 @@
"""Represent MongoClient's configuration."""
import threading
import traceback
from bson.objectid import ObjectId
from pymongo import common, monitor, pool
@ -60,6 +61,9 @@ class TopologySettings(object):
self._heartbeat_frequency = heartbeat_frequency
self._direct = (len(self._seeds) == 1 and not replica_set_name)
self._topology_id = ObjectId()
# Store the allocation traceback to catch unclosed clients in the
# test suite.
self._stack = ''.join(traceback.format_stack())
@property
def seeds(self):

View File

@ -795,6 +795,44 @@ def setup():
warnings.simplefilter("always")
def _get_executors(topology):
executors = []
for server in topology._servers.values():
# Some MockMonitor do not have an _executor.
executors.append(getattr(server._monitor, '_executor', None))
executors.append(topology._Topology__events_executor)
if topology._srv_monitor:
executors.append(topology._srv_monitor._executor)
return [e for e in executors if e is not None]
def all_executors_stopped(topology):
running = [e for e in _get_executors(topology) if not e._stopped]
if running:
print(' Topology %s has THREADS RUNNING: %s, created at: %s' % (
topology, running, topology._settings._stack))
return False
return True
def print_unclosed_clients():
from pymongo.topology import Topology
processed = set()
# Call collect to manually cleanup any would-be gc'd clients to avoid
# false positives.
gc.collect()
for obj in gc.get_objects():
try:
if isinstance(obj, Topology):
# Avoid printing the same Topology multiple times.
if obj._topology_id in processed:
continue
all_executors_stopped(obj)
processed.add(obj._topology_id)
except ReferenceError:
pass
def teardown():
garbage = []
for g in gc.garbage:
@ -813,6 +851,10 @@ def teardown():
c.drop_database("pymongo_test_bernie")
c.close()
# Jython does not support gc.get_objects.
if not sys.platform.startswith('java'):
print_unclosed_clients()
class PymongoTestRunner(unittest.TextTestRunner):
def run(self, test):

View File

@ -65,8 +65,9 @@ class MockMonitor(Monitor):
topology,
pool,
topology_settings):
# MockMonitor gets a 'client' arg, regular monitors don't.
self.client = client
# MockMonitor gets a 'client' arg, regular monitors don't. Weakref it
# to avoid cycles.
self.client = weakref.proxy(client)
Monitor.__init__(
self,
server_description,
@ -75,8 +76,9 @@ class MockMonitor(Monitor):
topology_settings)
def _check_once(self):
client = self.client
address = self._server_description.address
response, rtt = self.client.mock_is_master('%s:%d' % address)
response, rtt = client.mock_is_master('%s:%d' % address)
return ServerDescription(address, IsMaster(response), rtt)

View File

@ -695,8 +695,6 @@ class TestAuthURIOptions(unittest.TestCase):
client_context.create_user('admin', 'admin', 'pass')
client_context.create_user(
'pymongo_test', 'user', 'pass', ['userAdmin', 'readWrite'])
self.client = rs_or_single_client_noauth(
username='admin', password='pass')
def tearDown(self):
client_context.drop_user('pymongo_test', 'user')

View File

@ -101,6 +101,10 @@ class ClientUnitTest(unittest.TestCase):
cls.client = rs_or_single_client(connect=False,
serverSelectionTimeoutMS=100)
@classmethod
def tearDownClass(cls):
cls.client.close()
def test_keyword_arg_defaults(self):
client = MongoClient(socketTimeoutMS=None,
connectTimeoutMS=20000,

View File

@ -105,6 +105,7 @@ class TestCollation(unittest.TestCase):
def tearDownClass(cls):
cls.warn_context.__exit__()
cls.warn_context = None
cls.client.close()
def tearDown(self):
self.listener.results.clear()

View File

@ -48,6 +48,10 @@ class TestAllScenarios(unittest.TestCase):
cls.listener = EventListener()
cls.client = single_client(event_listeners=[cls.listener])
@classmethod
def tearDownClass(cls):
cls.client.close()
def tearDown(self):
self.listener.results.clear()

View File

@ -51,6 +51,10 @@ class TestConnectionsSurvivePrimaryStepDown(IntegrationTest):
cls.coll = cls.db.get_collection(
"step-down", write_concern=WriteConcern("majority"))
@classmethod
def tearDownClass(cls):
cls.client.close()
def setUp(self):
# Note that all ops use same write-concern as self.db (majority).
self.db.drop_collection("step-down")

View File

@ -911,6 +911,7 @@ class TestClusterChangeStreamsWCustomTypes(
kwargs['type_registry'] = codec_options.type_registry
kwargs['document_class'] = codec_options.document_class
self.watched_target = rs_client(*args, **kwargs)
self.addCleanup(self.watched_target.close)
self.input_target = self.watched_target[self.db.name].test
# Insert a record to ensure db, coll are created.
self.input_target.insert_one({'data': 'dummy'})

View File

@ -27,27 +27,26 @@ from pymongo.read_preferences import ReadPreference
from pymongo.write_concern import WriteConcern
from test import client_context, unittest, IntegrationTest
from test.utils import rs_client, rs_or_single_client
from test.utils import rs_client
class TestSampleShellCommands(unittest.TestCase):
class TestSampleShellCommands(IntegrationTest):
@classmethod
@client_context.require_connection
def setUpClass(cls):
cls.client = rs_or_single_client(w="majority")
super(TestSampleShellCommands, cls).setUpClass()
# Run once before any tests run.
cls.client.pymongo_test.inventory.drop()
cls.db.inventory.drop()
@classmethod
def tearDownClass(cls):
client_context.client.drop_database("pymongo_test")
cls.client.drop_database("pymongo_test")
def tearDown(self):
# Run after every test.
self.client.pymongo_test.inventory.drop()
self.db.inventory.drop()
def test_first_three_examples(self):
db = client_context.client.pymongo_test
db = self.db
# Start Example 1
db.inventory.insert_one(
@ -84,7 +83,7 @@ class TestSampleShellCommands(unittest.TestCase):
self.assertEqual(db.inventory.count_documents({}), 4)
def test_query_top_level_fields(self):
db = client_context.client.pymongo_test
db = self.db
# Start Example 6
db.inventory.insert_many([
@ -151,7 +150,7 @@ class TestSampleShellCommands(unittest.TestCase):
self.assertEqual(len(list(cursor)), 2)
def test_query_embedded_documents(self):
db = client_context.client.pymongo_test
db = self.db
# Start Example 14
# Subdocument key order matters in a few of these examples so we have
@ -214,7 +213,7 @@ class TestSampleShellCommands(unittest.TestCase):
self.assertEqual(len(list(cursor)), 1)
def test_query_arrays(self):
db = client_context.client.pymongo_test
db = self.db
# Start Example 20
db.inventory.insert_many([
@ -290,7 +289,7 @@ class TestSampleShellCommands(unittest.TestCase):
self.assertEqual(len(list(cursor)), 1)
def test_query_array_of_documents(self):
db = client_context.client.pymongo_test
db = self.db
# Start Example 29
# Subdocument key order matters in a few of these examples so we have
@ -372,7 +371,7 @@ class TestSampleShellCommands(unittest.TestCase):
self.assertEqual(len(list(cursor)), 2)
def test_query_null(self):
db = client_context.client.pymongo_test
db = self.db
# Start Example 38
db.inventory.insert_many([{"_id": 1, "item": None}, {"_id": 2}])
@ -397,7 +396,7 @@ class TestSampleShellCommands(unittest.TestCase):
self.assertEqual(len(list(cursor)), 1)
def test_projection(self):
db = client_context.client.pymongo_test
db = self.db
# Start Example 42
db.inventory.insert_many([
@ -528,7 +527,7 @@ class TestSampleShellCommands(unittest.TestCase):
self.assertEqual(len(doc["instock"]), 1)
def test_update_and_replace(self):
db = client_context.client.pymongo_test
db = self.db
# Start Example 51
db.inventory.insert_many([
@ -614,7 +613,7 @@ class TestSampleShellCommands(unittest.TestCase):
self.assertEqual(len(doc["instock"]), 2)
def test_delete(self):
db = client_context.client.pymongo_test
db = self.db
# Start Example 55
db.inventory.insert_many([
@ -664,7 +663,7 @@ class TestSampleShellCommands(unittest.TestCase):
@client_context.require_replica_set
@client_context.require_no_mmap
def test_change_streams(self):
db = client_context.client.pymongo_test
db = self.db
done = False
def insert_docs():
@ -706,7 +705,7 @@ class TestSampleShellCommands(unittest.TestCase):
t.join()
def test_aggregate_examples(self):
db = client_context.client.pymongo_test
db = self.db
# Start Aggregation Example 1
db.sales.aggregate([
@ -792,7 +791,7 @@ class TestSampleShellCommands(unittest.TestCase):
# End Aggregation Example 4
def test_commands(self):
db = client_context.client.pymongo_test
db = self.db
db.restaurants.insert_one({})
# Start runCommand Example 1
@ -804,7 +803,7 @@ class TestSampleShellCommands(unittest.TestCase):
# End runCommand Example 2
def test_index_management(self):
db = client_context.client.pymongo_test
db = self.db
# Start Index Example 1
db.records.create_index("score")
@ -821,7 +820,7 @@ class TestSampleShellCommands(unittest.TestCase):
@client_context.require_replica_set
def test_misc(self):
# Marketing examples
client = client_context.client
client = self.client
self.addCleanup(client.drop_database, "test")
self.addCleanup(client.drop_database, "my_database")
@ -843,13 +842,6 @@ class TestSampleShellCommands(unittest.TestCase):
class TestTransactionExamples(IntegrationTest):
@classmethod
@client_context.require_connection
def setUpClass(cls):
super(TestTransactionExamples, cls).setUpClass()
cls.client = rs_or_single_client(w="majority")
@client_context.require_version_max(4, 4, 99) # PYTHON-2154 skip on 4.5+
@client_context.require_transactions
def test_transactions(self):

View File

@ -2306,6 +2306,8 @@ class TestLegacyBulkWriteConcern(BulkTestBase):
@classmethod
def tearDownClass(cls):
cls.deprecation_filter.stop()
if cls.secondary:
cls.secondary.close()
def cause_wtimeout(self, batch):
if self.need_replication_stopped:

View File

@ -71,6 +71,7 @@ class TestMongosLoadBalancing(MockClientTest):
host='a:1,b:2,c:3',
connect=False,
**kwargs)
self.addCleanup(mock_client.close)
# Latencies in seconds.
mock_client.mock_rtts['a:1'] = 0.020

View File

@ -52,6 +52,10 @@ class TestCommandMonitoring(PyMongoTestCase):
event_listeners=[cls.listener],
retryWrites=False)
@classmethod
def tearDownClass(cls):
cls.client.close()
def tearDown(self):
self.listener.results.clear()
@ -1401,6 +1405,7 @@ class TestGlobalListener(PyMongoTestCase):
@classmethod
def tearDownClass(cls):
monitoring._LISTENERS = cls.saved_listeners
cls.client.close()
def setUp(self):
self.listener.results.clear()

View File

@ -161,6 +161,9 @@ class _TestPoolingBase(unittest.TestCase):
db.unique.insert_one({"_id": "jesse"})
db.test.insert_many([{} for _ in range(10)])
def tearDown(self):
self.c.close()
def create_pool(
self,
pair=(client_context.host, client_context.port),

View File

@ -34,6 +34,7 @@ class TestReadConcern(PyMongoTestCase):
@classmethod
def tearDownClass(cls):
cls.client.close()
client_context.client.pymongo_test.drop_collection('coll')
def tearDown(self):

View File

@ -370,6 +370,7 @@ class TestCommandAndReadPreference(TestReplicaSetClientBase):
@classmethod
def tearDownClass(cls):
cls.c.drop_database('pymongo_test')
cls.c.close()
def executed_on_which_server(self, client, fn, *args, **kwargs):
"""Execute fn(*args, **kwargs) and return the Server instance used."""

View File

@ -300,6 +300,7 @@ class TestReplicaSetWireVersion(MockClientTest):
host='a:1',
replicaSet='rs',
connect=False)
self.addCleanup(c.close)
c.set_wire_version_range('a:1', 3, 7)
c.set_wire_version_range('b:2', 2, 3)
@ -330,15 +331,17 @@ class TestReplicaSetClientInternalIPs(MockClientTest):
def test_connect_with_internal_ips(self):
# Client is passed an IP it can reach, 'a:1', but the RS config
# only contains unreachable IPs like 'internal-ip'. PYTHON-608.
client = MockClient(
standalones=[],
members=['a:1'],
mongoses=[],
ismaster_hosts=['internal-ip:27017'],
host='a:1',
replicaSet='rs',
serverSelectionTimeoutMS=100)
self.addCleanup(client.close)
with self.assertRaises(AutoReconnect) as context:
connected(MockClient(
standalones=[],
members=['a:1'],
mongoses=[],
ismaster_hosts=['internal-ip:27017'],
host='a:1',
replicaSet='rs',
serverSelectionTimeoutMS=100))
connected(client)
self.assertEqual(
"Could not reach any servers in [('internal-ip', 27017)]."
@ -356,6 +359,7 @@ class TestReplicaSetClientMaxWriteBatchSize(MockClientTest):
host='a:1',
replicaSet='rs',
connect=False)
self.addCleanup(c.close)
c.set_max_write_batch_size('a:1', 1)
c.set_max_write_batch_size('b:2', 2)

View File

@ -192,6 +192,7 @@ class TestRetryableWritesMMAPv1(IgnoreDeprecationsTest):
@classmethod
def tearDownClass(cls):
cls.knobs.disable()
cls.client.close()
@client_context.require_version_min(3, 5)
@client_context.require_no_standalone
@ -226,6 +227,7 @@ class TestRetryableWrites(IgnoreDeprecationsTest):
@classmethod
def tearDownClass(cls):
cls.knobs.disable()
cls.client.close()
super(TestRetryableWrites, cls).tearDownClass()
def setUp(self):

View File

@ -78,6 +78,7 @@ class TestSession(IntegrationTest):
@classmethod
def tearDownClass(cls):
monitoring._SENSITIVE_COMMANDS.update(cls.sensitive_commands)
cls.client2.close()
super(TestSession, cls).tearDownClass()
def setUp(self):
@ -85,6 +86,7 @@ class TestSession(IntegrationTest):
self.session_checker_listener = SessionTestListener()
self.client = rs_or_single_client(
event_listeners=[self.listener, self.session_checker_listener])
self.addCleanup(self.client.close)
self.db = self.client.pymongo_test
self.initial_lsids = set(s['id'] for s in session_ids(self.client))
@ -783,6 +785,10 @@ class TestCausalConsistency(unittest.TestCase):
cls.listener = SessionTestListener()
cls.client = rs_or_single_client(event_listeners=[cls.listener])
@classmethod
def tearDownClass(cls):
cls.client.close()
@client_context.require_sessions
def setUp(self):
super(TestCausalConsistency, self).setUp()

View File

@ -56,6 +56,12 @@ class TransactionsBase(SpecRunner):
for address in client_context.mongoses:
cls.mongos_clients.append(single_client('%s:%s' % address))
@classmethod
def tearDownClass(cls):
for client in cls.mongos_clients:
client.close()
super(TransactionsBase, cls).tearDownClass()
def maybe_skip_scenario(self, test):
super(TransactionsBase, self).maybe_skip_scenario(test)
if ('secondary' in self.id() and