Compare commits

..

8 Commits

13 changed files with 238 additions and 130 deletions

View File

@ -505,13 +505,20 @@ python3 ./.evergreen/scripts/resync-all-specs.py
Follow the [Python Driver Release Process Wiki](https://wiki.corp.mongodb.com/display/DRIVERS/Python+Driver+Release+Process).
## Asyncio considerations
## Project Structure and Asyncio Considerations
PyMongo adds asyncio capability by modifying the source files in `*/asynchronous` to `*/synchronous` using
[unasync](https://github.com/python-trio/unasync/) and some custom transforms.
This section describes the layout of the `pymongo/` package.
Where possible, edit the code in `*/asynchronous/*.py` and not the synchronous files.
You can run `pre-commit run --all-files synchro` before running tests if you are testing synchronous code.
Within `pymongo/`, the code is further divided into the `pymongo/asynchronous` and `pymongo/synchronous` subdirectories.
Files in `pymongo/synchronous` are generated from `pymongo/asynchronous` using the `synchro` pre-commit hook, which uses [unasync](https://github.com/python-trio/unasync/) and some custom transforms.
As a result, **all modifications** within `pymongo` must be made in either the top-level `pymongo` directory when they have to exhibit differing behavior between sync and async contexts or the `pymongo/asynchronous` directory, not `pymongo/synchronous`.
Any changes made directly to files in the `pymongo/synchronous` directory will be overwritten by the `synchro` hook when it is run, which happens automatically on commit.
Some top-level files (e.g. `pymongo/collection.py`) are re-export files for existing import compatibility and should not be modified directly.
The other top-level files (e.g. `pymongo/network_layer.py`, `pymongo/pool_shared.py`) contain either shared code used in both the asynchronous and synchronous APIs, or code that is very different between the two APIs and therefore cannot be generated from the async version using `synchro`.
Run `pre-commit run --all-files synchro` before running tests to generate the latest version of the synchronous code.
To prevent the `synchro` hook from accidentally overwriting code, it first checks to see whether a sync version
of a file is changing and not its async counterpart, and will fail.

View File

@ -109,6 +109,7 @@ struct module_state {
#define DATETIME_CLAMP 2
#define DATETIME_MS 3
#define DATETIME_AUTO 4
#define PYTHON_3_12 0x030C0000
/* Converts integer to its string representation in decimal notation. */
extern int cbson_long_long_to_str(long long num, char* str, size_t size) {
@ -249,6 +250,67 @@ static int _write_element_to_buffer(PyObject* self, buffer_t buffer,
*/
static int write_raw_doc(buffer_t buffer, PyObject* raw, PyObject* _raw);
#if PY_VERSION_HEX >= PYTHON_3_12
/* Transfer traceback from old_exc to new_exc.
* Steals reference to old_exc. */
static PyObject* _transfer_traceback(PyObject *old_exc, PyObject *new_exc) {
PyObject *tb = PyException_GetTraceback(old_exc);
if (tb) {
PyException_SetTraceback(new_exc, tb);
Py_DECREF(tb);
}
Py_DECREF(old_exc);
return new_exc;
}
#endif
/* Rewrap the current exception as InvalidBSON(str(e)) if it is not already an InvalidBSON error. */
static void _rewrap_as_invalid_bson(void) {
#if PY_VERSION_HEX >= PYTHON_3_12
PyObject *exc = PyErr_GetRaisedException();
if (exc && PyErr_GivenExceptionMatches(exc, PyExc_Exception)) {
PyObject *InvalidBSON = _error("InvalidBSON");
if (InvalidBSON) {
if (!PyErr_GivenExceptionMatches(exc, InvalidBSON)) {
PyObject *err_msg = PyObject_Str(exc);
if (err_msg) {
PyObject *new_exc = PyObject_CallOneArg(InvalidBSON, err_msg);
if (new_exc) {
exc = _transfer_traceback(exc, new_exc);
}
}
Py_XDECREF(err_msg);
}
Py_DECREF(InvalidBSON);
}
}
/* Steals reference to exc. */
PyErr_SetRaisedException(exc);
#else
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
PyObject *InvalidBSON = NULL;
PyErr_Fetch(&etype, &evalue, &etrace);
if (PyErr_GivenExceptionMatches(etype, PyExc_Exception)) {
InvalidBSON = _error("InvalidBSON");
if (InvalidBSON) {
if (!PyErr_GivenExceptionMatches(etype, InvalidBSON)) {
Py_DECREF(etype);
etype = InvalidBSON;
if (evalue) {
PyObject *msg = PyObject_Str(evalue);
Py_DECREF(evalue);
evalue = msg;
}
PyErr_NormalizeException(&etype, &evalue, &etrace);
} else {
Py_DECREF(InvalidBSON);
}
}
}
PyErr_Restore(etype, evalue, etrace);
#endif
}
/* Date stuff */
static PyObject* datetime_from_millis(long long millis) {
/* To encode a datetime instance like datetime(9999, 12, 31, 23, 59, 59, 999999)
@ -294,34 +356,57 @@ static PyObject* datetime_from_millis(long long millis) {
timeinfo.tm_sec,
microseconds);
if(!datetime) {
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
#if PY_VERSION_HEX >= PYTHON_3_12
PyObject *exc = PyErr_GetRaisedException();
/*
* Calling _error clears the error state, so fetch it first.
*/
PyErr_Fetch(&etype, &evalue, &etrace);
/* Only add addition error message on ValueError exceptions. */
if (PyErr_GivenExceptionMatches(etype, PyExc_ValueError)) {
if (evalue) {
PyObject* err_msg = PyObject_Str(evalue);
/* Only add additional error message on ValueError exceptions. */
if (exc && PyErr_GivenExceptionMatches(exc, PyExc_ValueError)) {
PyObject* err_msg = PyObject_Str(exc);
if (err_msg) {
PyObject* appendage = PyUnicode_FromString(" (Consider Using CodecOptions(datetime_conversion=DATETIME_AUTO) or MongoClient(datetime_conversion='DATETIME_AUTO')). See: https://www.mongodb.com/docs/languages/python/pymongo-driver/current/data-formats/dates-and-times/#handling-out-of-range-datetimes");
if (appendage) {
PyObject* msg = PyUnicode_Concat(err_msg, appendage);
if (msg) {
Py_DECREF(evalue);
evalue = msg;
PyObject* new_exc = PyObject_CallOneArg(PyExc_ValueError, msg);
if (new_exc) {
exc = _transfer_traceback(exc, new_exc);
}
Py_DECREF(msg);
}
}
Py_XDECREF(appendage);
}
Py_XDECREF(err_msg);
}
PyErr_NormalizeException(&etype, &evalue, &etrace);
}
/* Steals references to args. */
PyErr_Restore(etype, evalue, etrace);
/* Steals reference to exc. */
PyErr_SetRaisedException(exc);
#else
/* Calling _error clears the error state, so fetch it first.*/
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
PyErr_Fetch(&etype, &evalue, &etrace);
/* Only add additional error message on ValueError exceptions. */
if (PyErr_GivenExceptionMatches(etype, PyExc_ValueError)) {
if (evalue) {
PyObject* err_msg = PyObject_Str(evalue);
if (err_msg) {
PyObject* appendage = PyUnicode_FromString(" (Consider Using CodecOptions(datetime_conversion=DATETIME_AUTO) or MongoClient(datetime_conversion='DATETIME_AUTO')). See: https://www.mongodb.com/docs/languages/python/pymongo-driver/current/data-formats/dates-and-times/#handling-out-of-range-datetimes");
if (appendage) {
PyObject* msg = PyUnicode_Concat(err_msg, appendage);
if (msg) {
Py_DECREF(evalue);
evalue = msg;
}
}
Py_XDECREF(appendage);
}
Py_XDECREF(err_msg);
}
PyErr_NormalizeException(&etype, &evalue, &etrace);
}
/* Steals references to args. */
PyErr_Restore(etype, evalue, etrace);
#endif
}
return datetime;
}
@ -1681,6 +1766,46 @@ fail:
/* Update Invalid Document error to include doc as a property.
*/
void handle_invalid_doc_error(PyObject* dict) {
#if PY_VERSION_HEX >= PYTHON_3_12
PyObject *exc = PyErr_GetRaisedException();
PyObject *msg = NULL, *new_msg = NULL;
PyObject *InvalidDocument = NULL;
if (exc == NULL) {
return;
}
InvalidDocument = _error("InvalidDocument");
if (InvalidDocument == NULL) {
goto cleanup;
}
if (PyErr_GivenExceptionMatches(exc, InvalidDocument)) {
msg = PyObject_Str(exc);
if (msg) {
const char *msg_utf8 = PyUnicode_AsUTF8(msg);
if (msg_utf8 == NULL) {
goto cleanup;
}
new_msg = PyUnicode_FromFormat("Invalid document: %s", msg_utf8);
if (new_msg == NULL) {
goto cleanup;
}
/* Add doc to the error instance as a property. */
PyObject* exc_args[2] = {new_msg, dict};
PyObject* new_exc = PyObject_Vectorcall(InvalidDocument, exc_args, 2, NULL);
if (new_exc) {
exc = _transfer_traceback(exc, new_exc);
}
}
}
cleanup:
/* Steals reference to exc. */
PyErr_SetRaisedException(exc);
Py_XDECREF(msg);
Py_XDECREF(InvalidDocument);
Py_XDECREF(new_msg);
#else
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
PyObject *msg = NULL, *new_msg = NULL, *new_evalue = NULL;
PyErr_Fetch(&etype, &evalue, &etrace);
@ -1723,6 +1848,7 @@ cleanup:
Py_XDECREF(InvalidDocument);
Py_XDECREF(new_evalue);
Py_XDECREF(new_msg);
#endif
}
@ -2155,7 +2281,7 @@ static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer,
}
memcpy(&length, buffer + *position, 4);
length = BSON_UINT32_FROM_LE(length);
if (max < length) {
if (max - 5 < length) { // Account for 5-byte header. max >= 5 guaranteed above
goto invalid;
}
@ -2654,42 +2780,7 @@ static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer,
* Wrap any non-InvalidBSON errors in InvalidBSON.
*/
if (PyErr_Occurred()) {
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
PyObject *InvalidBSON = NULL;
/*
* Calling _error clears the error state, so fetch it first.
*/
PyErr_Fetch(&etype, &evalue, &etrace);
/* Dont reraise anything but PyExc_Exceptions as InvalidBSON. */
if (PyErr_GivenExceptionMatches(etype, PyExc_Exception)) {
InvalidBSON = _error("InvalidBSON");
if (InvalidBSON) {
if (!PyErr_GivenExceptionMatches(etype, InvalidBSON)) {
/*
* Raise InvalidBSON(str(e)).
*/
Py_DECREF(etype);
etype = InvalidBSON;
if (evalue) {
PyObject *msg = PyObject_Str(evalue);
Py_DECREF(evalue);
evalue = msg;
}
PyErr_NormalizeException(&etype, &evalue, &etrace);
} else {
/*
* The current exception matches InvalidBSON, so we don't
* need this reference after all.
*/
Py_DECREF(InvalidBSON);
}
}
}
/* Steals references to args. */
PyErr_Restore(etype, evalue, etrace);
_rewrap_as_invalid_bson();
} else {
PyObject *InvalidBSON = _error("InvalidBSON");
if (InvalidBSON) {
@ -2727,25 +2818,7 @@ static int _element_to_dict(PyObject* self, const char* string,
if (!*name) {
/* If NULL is returned then wrap the UnicodeDecodeError
in an InvalidBSON error */
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
PyObject *InvalidBSON = NULL;
PyErr_Fetch(&etype, &evalue, &etrace);
if (PyErr_GivenExceptionMatches(etype, PyExc_Exception)) {
InvalidBSON = _error("InvalidBSON");
if (InvalidBSON) {
Py_DECREF(etype);
etype = InvalidBSON;
if (evalue) {
PyObject *msg = PyObject_Str(evalue);
Py_DECREF(evalue);
evalue = msg;
}
PyErr_NormalizeException(&etype, &evalue, &etrace);
}
}
PyErr_Restore(etype, evalue, etrace);
_rewrap_as_invalid_bson();
return -1;
}
position += (unsigned)name_length + 1;

View File

@ -17,6 +17,7 @@ from __future__ import annotations
import json
from typing import Any, Optional
from urllib.parse import quote
def _get_azure_response(
@ -29,7 +30,7 @@ def _get_azure_response(
url += "?api-version=2018-02-01"
url += f"&resource={resource}"
if client_id:
url += f"&client_id={client_id}"
url += f"&client_id={quote(client_id)}"
headers = {"Metadata": "true", "Accept": "application/json"}
request = Request(url, headers=headers) # noqa: S310
try:

View File

@ -760,11 +760,7 @@ class Pool:
self._pending = 0
self._max_connecting = self.opts.max_connecting
self._client_id = client_id
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
self.address, self.opts.non_default_options
)
# Log before publishing event to prevent potential listener preemption in tests
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
@ -774,6 +770,11 @@ class Pool:
serverPort=self.address[1],
**self.opts.non_default_options,
)
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
self.address, self.opts.non_default_options
)
# Similar to active_sockets but includes threads in the wait queue.
self.operation_count: int = 0
# Retain references to pinned connections to prevent the CPython GC
@ -788,9 +789,6 @@ class Pool:
async with self.lock:
if self.state != PoolState.READY:
self.state = PoolState.READY
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_ready(self.address)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
@ -799,6 +797,9 @@ class Pool:
serverHost=self.address[0],
serverPort=self.address[1],
)
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_ready(self.address)
@property
def closed(self) -> bool:
@ -859,9 +860,6 @@ class Pool:
else:
for conn in sockets:
await conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_closed(self.address)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
@ -870,15 +868,11 @@ class Pool:
serverHost=self.address[0],
serverPort=self.address[1],
)
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_closed(self.address)
else:
if old_state != PoolState.PAUSED:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_cleared(
self.address,
service_id=service_id,
interrupt_connections=interrupt_connections,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
@ -888,6 +882,13 @@ class Pool:
serverPort=self.address[1],
serviceId=service_id,
)
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_cleared(
self.address,
service_id=service_id,
interrupt_connections=interrupt_connections,
)
if not _IS_SYNC:
await asyncio.gather(
*[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value]

View File

@ -758,11 +758,7 @@ class Pool:
self._pending = 0
self._max_connecting = self.opts.max_connecting
self._client_id = client_id
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
self.address, self.opts.non_default_options
)
# Log before publishing event to prevent potential listener preemption in tests
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
@ -772,6 +768,11 @@ class Pool:
serverPort=self.address[1],
**self.opts.non_default_options,
)
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_created(
self.address, self.opts.non_default_options
)
# Similar to active_sockets but includes threads in the wait queue.
self.operation_count: int = 0
# Retain references to pinned connections to prevent the CPython GC
@ -786,9 +787,6 @@ class Pool:
with self.lock:
if self.state != PoolState.READY:
self.state = PoolState.READY
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_ready(self.address)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
@ -797,6 +795,9 @@ class Pool:
serverHost=self.address[0],
serverPort=self.address[1],
)
if self.enabled_for_cmap:
assert self.opts._event_listeners is not None
self.opts._event_listeners.publish_pool_ready(self.address)
@property
def closed(self) -> bool:
@ -857,9 +858,6 @@ class Pool:
else:
for conn in sockets:
conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_closed(self.address)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
@ -868,15 +866,11 @@ class Pool:
serverHost=self.address[0],
serverPort=self.address[1],
)
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_closed(self.address)
else:
if old_state != PoolState.PAUSED:
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_cleared(
self.address,
service_id=service_id,
interrupt_connections=interrupt_connections,
)
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_CONNECTION_LOGGER,
@ -886,6 +880,13 @@ class Pool:
serverPort=self.address[1],
serviceId=service_id,
)
if self.enabled_for_cmap:
assert listeners is not None
listeners.publish_pool_cleared(
self.address,
service_id=service_id,
interrupt_connections=interrupt_connections,
)
if not _IS_SYNC:
asyncio.gather(
*[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets], # type: ignore[func-returns-value]

View File

@ -1 +1 @@
python-snappy>=0.7.3
python-snappy>=0.6.0

View File

@ -2711,11 +2711,11 @@ class TestClientPool(AsyncMockClientTest):
await async_wait_until(lambda: len(c.nodes) == 1, "connect")
self.assertEqual(await c.address, ("c", 3))
# Assert that we create 1 pooled connection.
# Wait for the pooled connection to be registered
await listener.async_wait_for_event(monitoring.ConnectionReadyEvent, 1)
self.assertEqual(listener.event_count(monitoring.ConnectionCreatedEvent), 1)
arbiter = c._topology.get_server_by_address(("c", 3))
self.assertEqual(len(arbiter.pool.conns), 1)
await async_wait_until(lambda: len(arbiter.pool.conns) == 1, "create 1 pooled connection")
# Arbiter pool is marked ready.
self.assertEqual(listener.event_count(monitoring.PoolReadyEvent), 1)

View File

@ -876,8 +876,6 @@ class TestViews(AsyncEncryptionIntegrationTest):
class TestCorpus(AsyncEncryptionIntegrationTest):
# PYTHON-5708: Encryption tests sending large payloads fail on some mongocryptd versions.
@async_client_context.require_version_max(6, 99)
@unittest.skipUnless(any(AWS_CREDS.values()), "AWS environment credentials are not set")
async def asyncSetUp(self):
await super().asyncSetUp()
@ -1054,8 +1052,6 @@ class TestBsonSizeBatches(AsyncEncryptionIntegrationTest):
client_encrypted: AsyncMongoClient
listener: OvertCommandListener
# PYTHON-5708: Encryption tests sending large payloads fail on some mongocryptd versions.
@async_client_context.require_version_max(6, 99)
async def asyncSetUp(self):
await super().asyncSetUp()
db = async_client_context.client.db

View File

@ -150,6 +150,20 @@ class TestGetAzureResponse(unittest.TestCase):
_, kwargs = mock_open.call_args
self.assertEqual(kwargs["timeout"], 42)
def test_client_id_is_url_encoded(self):
"""Ensure special characters in client_id are percent-encoded."""
body = json.dumps({"access_token": "tok", "expires_in": "3600"})
with _mock_urlopen(200, body) as mock_open:
self._call(client_id="id with spaces&special=chars")
url = mock_open.call_args[0][0].full_url
# '&' and '=' must be percent-encoded so they don't inject extra query params
self.assertIn("client_id=id%20with%20spaces%26special%3Dchars", url)
# The encoded client_id should not introduce a raw '&'
# Count params: api-version, resource, client_id — exactly 3
query_string = url.split("?", 1)[1]
self.assertEqual(query_string.count("&"), 2)
if __name__ == "__main__":
unittest.main()

View File

@ -1269,6 +1269,22 @@ class TestBSON(unittest.TestCase):
encode(doc)
self.assertEqual(cm.exception.document, doc)
def test_binary_length_accounts_for_header(self):
size = 20
binary_length = 12 # 5 more than the actual 7 bytes
payload = b""
payload += struct.pack("<i", size) # document size
payload += b"\x05" # type = Binary
payload += b"a\x00" # key "a"
payload += struct.pack("<I", binary_length) # Binary length (inflated)
payload += b"\x00" # subtype 0
payload += b"\x41" * 7 # value
payload += b"\x00" # EOO
with self.assertRaises(InvalidBSON):
decode(payload)
class TestCodecOptions(unittest.TestCase):
def test_document_class(self):

View File

@ -2666,11 +2666,11 @@ class TestClientPool(MockClientTest):
wait_until(lambda: len(c.nodes) == 1, "connect")
self.assertEqual(c.address, ("c", 3))
# Assert that we create 1 pooled connection.
# Wait for the pooled connection to be registered
listener.wait_for_event(monitoring.ConnectionReadyEvent, 1)
self.assertEqual(listener.event_count(monitoring.ConnectionCreatedEvent), 1)
arbiter = c._topology.get_server_by_address(("c", 3))
self.assertEqual(len(arbiter.pool.conns), 1)
wait_until(lambda: len(arbiter.pool.conns) == 1, "create 1 pooled connection")
# Arbiter pool is marked ready.
self.assertEqual(listener.event_count(monitoring.PoolReadyEvent), 1)

View File

@ -872,8 +872,6 @@ class TestViews(EncryptionIntegrationTest):
class TestCorpus(EncryptionIntegrationTest):
# PYTHON-5708: Encryption tests sending large payloads fail on some mongocryptd versions.
@client_context.require_version_max(6, 99)
@unittest.skipUnless(any(AWS_CREDS.values()), "AWS environment credentials are not set")
def setUp(self):
super().setUp()
@ -1050,8 +1048,6 @@ class TestBsonSizeBatches(EncryptionIntegrationTest):
client_encrypted: MongoClient
listener: OvertCommandListener
# PYTHON-5708: Encryption tests sending large payloads fail on some mongocryptd versions.
@client_context.require_version_max(6, 99)
def setUp(self):
super().setUp()
db = client_context.client.db

View File

@ -23,7 +23,7 @@ sys.path[0:0] = [""]
from test import client_knobs, unittest
from test.pymongo_mocks import DummyMonitor
from test.utils import MockPool, flaky
from test.utils import MockPool
from test.utils_shared import wait_until
from bson.objectid import ObjectId
@ -755,7 +755,6 @@ def wait_for_primary(topology):
class TestTopologyErrors(TopologyTest):
# Errors when calling hello.
@flaky(reason="PYTHON-5366")
def test_pool_reset(self):
# hello succeeds at first, then always raises socket error.
hello_count = [0]
@ -776,7 +775,11 @@ class TestTopologyErrors(TopologyTest):
# Pool is reset by hello failure.
t.request_check_all()
self.assertNotEqual(generation, server.pool.gen.get_overall())
# Wait for the monitor's hello failure to trigger Pool.reset() and bump the generation.
wait_until(
lambda: server.pool.gen.get_overall() != generation,
"pool reset after failed monitor check",
)
def test_hello_retry(self):
# hello succeeds at first, then raises socket error, then succeeds.