PYTHON-2868 Test Serverless behind a load balancer (#742)

This commit is contained in:
Shane Harvey 2021-09-23 15:57:57 -07:00 committed by GitHub
parent c7d80802be
commit 968ee7ba96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 151 additions and 52 deletions

View File

@ -417,9 +417,11 @@ functions:
fi
if [ -n "${test_serverless}" ]; then
export TEST_SERVERLESS=1
export MONGODB_URI="${MONGODB_URI}"
export MONGODB_URI="${SINGLE_ATLASPROXY_SERVERLESS_URI}"
export SERVERLESS_ATLAS_USER="${SERVERLESS_ATLAS_USER}"
export SERVERLESS_ATLAS_PASSWORD="${SERVERLESS_ATLAS_PASSWORD}"
export SINGLE_MONGOS_LB_URI="${SINGLE_ATLASPROXY_SERVERLESS_URI}"
export MULTI_MONGOS_LB_URI="${MULTI_ATLASPROXY_SERVERLESS_URI}"
fi
PYTHON_BINARY=${PYTHON_BINARY} \
@ -873,9 +875,10 @@ task_groups:
script: |
${PREPARE_SHELL}
set +o xtrace
SERVERLESS_DRIVERS_GROUP=${SERVERLESS_DRIVERS_GROUP} \
SERVERLESS_API_PUBLIC_KEY=${SERVERLESS_API_PUBLIC_KEY} \
SERVERLESS_API_PRIVATE_KEY=${SERVERLESS_API_PRIVATE_KEY} \
LOADBALANCED=ON \
SERVERLESS_DRIVERS_GROUP=${SERVERLESS_DRIVERS_GROUP} \
SERVERLESS_API_PUBLIC_KEY=${SERVERLESS_API_PUBLIC_KEY} \
SERVERLESS_API_PRIVATE_KEY=${SERVERLESS_API_PRIVATE_KEY} \
bash ${DRIVERS_TOOLS}/.evergreen/serverless/create-instance.sh
- command: expansions.update
params:
@ -887,9 +890,9 @@ task_groups:
${PREPARE_SHELL}
set +o xtrace
SERVERLESS_DRIVERS_GROUP=${SERVERLESS_DRIVERS_GROUP} \
SERVERLESS_API_PUBLIC_KEY=${SERVERLESS_API_PUBLIC_KEY} \
SERVERLESS_API_PRIVATE_KEY=${SERVERLESS_API_PRIVATE_KEY} \
SERVERLESS_INSTANCE_NAME=${SERVERLESS_INSTANCE_NAME} \
SERVERLESS_API_PUBLIC_KEY=${SERVERLESS_API_PUBLIC_KEY} \
SERVERLESS_API_PRIVATE_KEY=${SERVERLESS_API_PRIVATE_KEY} \
SERVERLESS_INSTANCE_NAME=${SERVERLESS_INSTANCE_NAME} \
bash ${DRIVERS_TOOLS}/.evergreen/serverless/delete-instance.sh
tasks:
- ".serverless"

View File

@ -107,12 +107,14 @@ if TEST_LOADBALANCER:
db_user = res['username'] or db_user
db_pwd = res['password'] or db_pwd
elif TEST_SERVERLESS:
res = parse_uri(os.environ["MONGODB_URI"])
host, port = res['nodelist'].pop(0)
additional_serverless_mongoses = res['nodelist']
TEST_LOADBALANCER = True
res = parse_uri(SINGLE_MONGOS_LB_URI)
host, port = res['nodelist'][0]
db_user = res['username'] or db_user
db_pwd = res['password'] or db_pwd
TLS_OPTIONS = {'tls': True}
# Spec says serverless tests must be run with compression.
COMPRESSORS = COMPRESSORS or 'zlib'
def is_server_resolvable():
@ -236,7 +238,7 @@ class ClientContext(object):
self.version = Version(-1) # Needs to be comparable with Version
self.auth_enabled = False
self.test_commands_enabled = False
self.server_parameters = None
self.server_parameters = {}
self.is_mongos = False
self.mongoses = []
self.is_rs = False
@ -251,7 +253,7 @@ class ClientContext(object):
self.is_data_lake = False
self.load_balancer = TEST_LOADBALANCER
self.serverless = TEST_SERVERLESS
if self.load_balancer:
if self.load_balancer or self.serverless:
self.default_client_options["loadBalanced"] = True
if COMPRESSORS:
self.default_client_options["compressors"] = COMPRESSORS
@ -402,7 +404,11 @@ class ClientContext(object):
self.w = len(hello.get("hosts", [])) or 1
self.version = Version.from_client(self.client)
if TEST_SERVERLESS:
if self.serverless:
self.server_parameters = {
'requireApiVersion': False,
'enableTestCommands': True,
}
self.test_commands_enabled = True
self.has_ipv6 = False
else:
@ -422,14 +428,11 @@ class ClientContext(object):
self.is_mongos = (self.hello.get('msg') == 'isdbgrid')
if self.is_mongos:
if self.serverless:
self.mongoses.append(self.client.address)
self.mongoses.extend(additional_serverless_mongoses)
else:
address = self.client.address
self.mongoses.append(address)
if not self.serverless:
# Check for another mongos on the next port.
address = self.client.address
next_address = address[0], address[1] + 1
self.mongoses.append(address)
mongos_client = self._connect(
*next_address, **self.default_client_options)
if mongos_client:

View File

@ -902,6 +902,11 @@
},
{
"description": "listCollections pins the cursor to a connection",
"runOnRequirements": [
{
"serverless": "forbid"
}
],
"operations": [
{
"name": "listCollections",
@ -1151,6 +1156,11 @@
},
{
"description": "change streams pin to a connection",
"runOnRequirements": [
{
"serverless": "forbid"
}
],
"operations": [
{
"name": "createChangeStream",

View File

@ -127,6 +127,11 @@
"tests": [
{
"description": "only connections for a specific serviceId are closed when pools are cleared",
"runOnRequirements": [
{
"serverless": "forbid"
}
],
"operations": [
{
"name": "createFindCursor",
@ -255,7 +260,7 @@
]
},
{
"description": "errors during the initial connection hello are ignore",
"description": "errors during the initial connection hello are ignored",
"runOnRequirements": [
{
"minServerVersion": "4.9"
@ -274,7 +279,9 @@
},
"data": {
"failCommands": [
"isMaster"
"ismaster",
"isMaster",
"hello"
],
"closeConnection": true,
"appName": "lbSDAMErrorTestClient"

View File

@ -607,6 +607,11 @@
},
{
"description": "pinned connection is released after a transient non-network CRUD error",
"runOnRequirements": [
{
"serverless": "forbid"
}
],
"operations": [
{
"name": "failPoint",
@ -715,6 +720,11 @@
},
{
"description": "pinned connection is released after a transient network CRUD error",
"runOnRequirements": [
{
"serverless": "forbid"
}
],
"operations": [
{
"name": "failPoint",
@ -831,6 +841,11 @@
},
{
"description": "pinned connection is released after a transient non-network commit error",
"runOnRequirements": [
{
"serverless": "forbid"
}
],
"operations": [
{
"name": "failPoint",

View File

@ -15,7 +15,7 @@
"useMultipleMongoses": true,
"uriOptions": {
"maxPoolSize": 1,
"waitQueueTimeoutMS": 5
"waitQueueTimeoutMS": 50
},
"observeEvents": [
"connectionCheckedOutEvent",

View File

@ -889,7 +889,7 @@ class TestClient(IntegrationTest):
"pymongo_test", "user", "pass", roles=['userAdmin', 'readWrite'])
with self.assertRaises(OperationFailure):
connected(rs_or_single_client(
connected(rs_or_single_client_noauth(
"mongodb://a:b@%s:%d" % (host, port)))
# No error.
@ -899,7 +899,7 @@ class TestClient(IntegrationTest):
# Wrong database.
uri = "mongodb://admin:pass@%s:%d/pymongo_test" % (host, port)
with self.assertRaises(OperationFailure):
connected(rs_or_single_client(uri))
connected(rs_or_single_client_noauth(uri))
# No error.
connected(rs_or_single_client_noauth(
@ -923,7 +923,7 @@ class TestClient(IntegrationTest):
client_context.create_user("admin", "ad min", "pa/ss")
self.addCleanup(client_context.drop_user, "admin", "ad min")
c = rs_or_single_client(username="ad min", password="pa/ss")
c = rs_or_single_client_noauth(username="ad min", password="pa/ss")
# Username and password aren't in strings that will likely be logged.
self.assertNotIn("ad min", repr(c))
@ -935,7 +935,8 @@ class TestClient(IntegrationTest):
c.server_info()
with self.assertRaises(OperationFailure):
rs_or_single_client(username="ad min", password="foo").server_info()
rs_or_single_client_noauth(
username="ad min", password="foo").server_info()
@client_context.require_auth
def test_lazy_auth_raises_operation_failure(self):
@ -957,11 +958,7 @@ class TestClient(IntegrationTest):
if not os.access(mongodb_socket, os.R_OK):
raise SkipTest("Socket file is not accessible")
if client_context.auth_enabled:
uri = "mongodb://%s:%s@%s" % (db_user, db_pwd, encoded_socket)
else:
uri = "mongodb://%s" % encoded_socket
uri = "mongodb://%s" % encoded_socket
# Confirm we can do operations via the socket.
client = rs_or_single_client(uri)
client.pymongo_test.test.insert_one({"dummy": "object"})

View File

@ -55,6 +55,7 @@ from test.utils import (camel_to_snake,
OvertCommandListener,
rs_or_single_client,
single_client,
single_client_noauth,
TestCreator,
wait_until)
from test.utils_spec_runner import SpecRunnerThread
@ -334,7 +335,7 @@ class TestCMAP(IntegrationTest):
opts = '&'.join(['%s=%s' % (k, v)
for k, v in self.POOL_OPTIONS.items()])
uri = 'mongodb://%s/?%s' % (client_context.pair, opts)
client = rs_or_single_client(uri, **self.credentials)
client = rs_or_single_client(uri)
self.addCleanup(client.close)
pool_opts = get_pool(client).opts
self.assertEqual(pool_opts.non_default_options, self.POOL_OPTIONS)
@ -395,8 +396,9 @@ class TestCMAP(IntegrationTest):
def test_5_check_out_fails_auth_error(self):
listener = CMAPListener()
client = single_client(username="notauser", password="fail",
event_listeners=[listener])
client = single_client_noauth(
username="notauser", password="fail",
event_listeners=[listener])
self.addCleanup(client.close)
# Attempt to create a new connection.

View File

@ -38,6 +38,7 @@ globals().update(generate_test_classes(TEST_PATH, module=__name__))
class TestLB(IntegrationTest):
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
def test_connections_are_only_returned_once(self):
pool = get_pool(self.client)
@ -97,7 +98,6 @@ class TestLB(IntegrationTest):
"failCommands": [
"find", "aggregate"
],
"errorCode": 91,
"closeConnection": True,
}
}

View File

@ -33,7 +33,8 @@ globals().update(generate_test_classes(
class_name_prefix='UnifiedTestFormat',
expected_failures=[
'Client side error in command starting transaction', # PYTHON-1894
]))
],
RUN_ON_SERVERLESS=False))
globals().update(generate_test_classes(
@ -43,7 +44,8 @@ globals().update(generate_test_classes(
bypass_test_generation_errors=True,
expected_failures=[
'.*', # All tests expected to fail
]))
],
RUN_ON_SERVERLESS=False))
class TestMatchEvaluatorUtil(unittest.TestCase):

View File

@ -49,7 +49,7 @@
},
"tests": [
{
"description": "unpin after TransientTransctionError error on commit",
"description": "unpin after TransientTransactionError error on commit",
"runOnRequirements": [
{
"serverless": "forbid"
@ -108,6 +108,24 @@
"arguments": {
"session": "session0"
}
},
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"x": 1
},
"session": "session0"
}
},
{
"name": "abortTransaction",
"object": "session0"
}
]
},
@ -142,7 +160,7 @@
]
},
{
"description": "unpin after TransientTransctionError error on abort",
"description": "unpin after non-transient error on abort",
"runOnRequirements": [
{
"serverless": "forbid"
@ -192,11 +210,29 @@
"arguments": {
"session": "session0"
}
},
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"x": 1
},
"session": "session0"
}
},
{
"name": "abortTransaction",
"object": "session0"
}
]
},
{
"description": "unpin after non-transient error on abort",
"description": "unpin after TransientTransactionError error on abort",
"operations": [
{
"name": "startTransaction",
@ -241,6 +277,24 @@
"arguments": {
"session": "session0"
}
},
{
"name": "startTransaction",
"object": "session0"
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"x": 1
},
"session": "session0"
}
},
{
"name": "abortTransaction",
"object": "session0"
}
]
},

View File

@ -279,7 +279,7 @@ class EntityMapUtil(object):
self._listeners[spec['id']] = listener
kwargs['event_listeners'] = [listener]
if spec.get('useMultipleMongoses'):
if client_context.load_balancer:
if client_context.load_balancer or client_context.serverless:
kwargs['h'] = client_context.MULTI_MONGOS_LB_URI
elif client_context.is_mongos:
kwargs['h'] = client_context.mongos_seeds()
@ -658,6 +658,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
"""
SCHEMA_VERSION = Version.from_string('1.5')
RUN_ON_LOAD_BALANCER = True
RUN_ON_SERVERLESS = True
@staticmethod
def should_run_on(run_on_spec):

View File

@ -47,6 +47,7 @@ from pymongo.server_selectors import (any_server_selector,
writable_server_selector)
from pymongo.server_type import SERVER_TYPE
from pymongo.write_concern import WriteConcern
from pymongo.uri_parser import parse_uri
from test import (client_context,
db_user,
@ -507,13 +508,10 @@ class TestCreator(object):
setattr(self._test_class, new_test.__name__, new_test)
def _connection_string(h, authenticate):
if h.startswith("mongodb://"):
def _connection_string(h):
if h.startswith("mongodb://") or h.startswith("mongodb+srv://"):
return h
elif client_context.auth_enabled and authenticate:
return "mongodb://%s:%s@%s" % (db_user, db_pwd, str(h))
else:
return "mongodb://%s" % (str(h),)
return "mongodb://%s" % (str(h),)
def _mongo_client(host, port, authenticate=True, directConnection=None,
@ -528,10 +526,17 @@ def _mongo_client(host, port, authenticate=True, directConnection=None,
client_options['directConnection'] = directConnection
client_options.update(kwargs)
client = MongoClient(_connection_string(host, authenticate), port,
**client_options)
uri = _connection_string(host)
if client_context.auth_enabled and authenticate:
# Only add the default username or password if one is not provided.
res = parse_uri(uri)
if (not res['username'] and not res['password'] and
'username' not in client_options and
'password' not in client_options):
client_options['username'] = db_user
client_options['password'] = db_pwd
return client
return MongoClient(uri, port, **client_options)
def single_client_noauth(h=None, p=None, **kwargs):

View File

@ -510,7 +510,7 @@ class SpecRunner(IntegrationTest):
use_multi_mongos = test['useMultipleMongoses']
host = None
if use_multi_mongos:
if client_context.load_balancer:
if client_context.load_balancer or client_context.serverless:
host = client_context.MULTI_MONGOS_LB_URI
elif client_context.is_mongos:
host = client_context.mongos_seeds()

View File

@ -1,6 +1,6 @@
{
"description": "Transaction handling",
"schemaVersion": "1.1",
"schemaVersion": "1.3",
"runOnRequirements": [
{
"minServerVersion": "4.9",