diff --git a/pymongo/periodic_executor.py b/pymongo/periodic_executor.py index f087dd386..07f383724 100644 --- a/pymongo/periodic_executor.py +++ b/pymongo/periodic_executor.py @@ -116,7 +116,10 @@ class PeriodicExecutor(object): self._stopped = True break except: - self._stopped = True + with self._lock: + self._stopped = True + self._thread_will_exit = True + raise deadline = _time() + self._interval diff --git a/pymongo/topology.py b/pymongo/topology.py index cfc6c9958..deb7b7008 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -350,6 +350,8 @@ class Topology(object): # Mark all servers Unknown. self._description = self._description.reset() self._update_servers() + self._opened = False + # Publish only after releasing the lock. if self._publish_tp: self._events.put((self._listeners.publish_topology_closed, diff --git a/test/test_threads.py b/test/test_threads.py index 4f26aad66..f9e17f3a4 100644 --- a/test/test_threads.py +++ b/test/test_threads.py @@ -21,7 +21,7 @@ from test import (client_context, db_pwd, IntegrationTest, unittest) -from test.utils import rs_or_single_client_noauth +from test.utils import rs_or_single_client_noauth, rs_or_single_client from test.utils import frequent_thread_switches, joinall from pymongo.errors import OperationFailure @@ -183,25 +183,22 @@ class TestThreads(IntegrationTest): error.join() okay.join() - @unittest.skip("PYTHON-1204") def test_client_disconnect(self): - self.db.drop_collection("test") - self.db.test.insert_many([{"x": i} for i in range(1000)]) + db = rs_or_single_client(serverSelectionTimeoutMS=30000).pymongo_test + db.drop_collection("test") + db.test.insert_many([{"x": i} for i in range(1000)]) # Start 10 threads that execute a query, and 10 threads that call # client.close() 10 times in a row. - threads = [SaveAndFind(self.db.test) for _ in range(10)] - threads.extend(Disconnect(self.db.client, 10) for _ in range(10)) + threads = [SaveAndFind(db.test) for _ in range(10)] + threads.extend(Disconnect(db.client, 10) for _ in range(10)) with frequent_thread_switches(): - # Frequent thread switches hurt performance badly enough to - # prevent reconnection within 5 seconds, especially in Python 2 - # on a Windows build slave. for t in threads: t.start() for t in threads: - t.join(30) + t.join(300) for t in threads: self.assertTrue(t.passed)