Merge branch 'master' of github.com:mongodb/mongo-python-driver

This commit is contained in:
Steven Silvester 2025-08-20 14:11:31 -05:00
commit daf0f89c5a
No known key found for this signature in database
GPG Key ID: B1BF5EC3A8B32F91
17 changed files with 1627 additions and 559 deletions

View File

@ -3,11 +3,6 @@ PYMONGO=$(dirname "$(cd "$(dirname "$0")" || exit; pwd)")
rm $PYMONGO/test/transactions/legacy/errors-client.json # PYTHON-1894
rm $PYMONGO/test/connection_monitoring/wait-queue-fairness.json # PYTHON-1873
rm $PYMONGO/test/client-side-encryption/spec/unified/fle2v2-BypassQueryAnalysis.json # PYTHON-5143
rm $PYMONGO/test/client-side-encryption/spec/unified/fle2v2-EncryptedFields-vs-EncryptedFieldsMap.json # PYTHON-5143
rm $PYMONGO/test/client-side-encryption/spec/unified/localSchema.json # PYTHON-5143
rm $PYMONGO/test/client-side-encryption/spec/unified/maxWireVersion.json # PYTHON-5143
rm $PYMONGO/test/unified-test-format/valid-pass/poc-queryable-encryption.json # PYTHON-5143
rm $PYMONGO/test/discovery_and_monitoring/unified/pool-clear-application-error.json # PYTHON-4918
rm $PYMONGO/test/discovery_and_monitoring/unified/pool-clear-checkout-error.json # PYTHON-4918
rm $PYMONGO/test/discovery_and_monitoring/unified/pool-clear-min-pool-size-error.json # PYTHON-4918

View File

@ -59,7 +59,8 @@ from pymongo.synchronous.mongo_client import MongoClient
sys.path[0:0] = [""]
from test.helpers import (
from test.helpers import client_knobs, global_knobs
from test.helpers_shared import (
COMPRESSORS,
IS_SRV,
MONGODB_API_VERSION,
@ -67,10 +68,8 @@ from test.helpers import (
TEST_LOADBALANCER,
TLS_OPTIONS,
SystemCertsPatcher,
client_knobs,
db_pwd,
db_user,
global_knobs,
host,
is_server_resolvable,
port,

View File

@ -59,7 +59,8 @@ from pymongo.ssl_support import HAVE_SSL, _ssl # type:ignore[attr-defined]
sys.path[0:0] = [""]
from test.helpers import (
from test.asynchronous.helpers import client_knobs, global_knobs
from test.helpers_shared import (
COMPRESSORS,
IS_SRV,
MONGODB_API_VERSION,
@ -67,10 +68,8 @@ from test.helpers import (
TEST_LOADBALANCER,
TLS_OPTIONS,
SystemCertsPatcher,
client_knobs,
db_pwd,
db_user,
global_knobs,
host,
is_server_resolvable,
port,

View File

@ -12,137 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Shared constants and helper methods for pymongo, bson, and gridfs test suites."""
"""Shared helper methods for pymongo, bson, and gridfs test suites."""
from __future__ import annotations
import asyncio
import base64
import gc
import multiprocessing
import os
import signal
import socket
import subprocess
import sys
import threading
import time
import traceback
import unittest
import warnings
from inspect import iscoroutinefunction
from pymongo._asyncio_task import create_task
try:
import ipaddress
HAVE_IPADDRESS = True
except ImportError:
HAVE_IPADDRESS = False
from functools import wraps
from typing import Any, Callable, Dict, Generator, Optional, no_type_check
from unittest import SkipTest
from typing import Optional, no_type_check
from bson.son import SON
from pymongo import common, message
from bson import SON
from pymongo import common
from pymongo._asyncio_task import create_task
from pymongo.read_preferences import ReadPreference
from pymongo.ssl_support import HAVE_SSL, _ssl # type:ignore[attr-defined]
from pymongo.synchronous.uri_parser import parse_uri
if HAVE_SSL:
import ssl
_IS_SYNC = False
# Enable debug output for uncollectable objects. PyPy does not have set_debug.
if hasattr(gc, "set_debug"):
gc.set_debug(
gc.DEBUG_UNCOLLECTABLE | getattr(gc, "DEBUG_OBJECTS", 0) | getattr(gc, "DEBUG_INSTANCES", 0)
)
# The host and port of a single mongod or mongos, or the seed host
# for a replica set.
host = os.environ.get("DB_IP", "localhost")
port = int(os.environ.get("DB_PORT", 27017))
IS_SRV = "mongodb+srv" in host
db_user = os.environ.get("DB_USER", "user")
db_pwd = os.environ.get("DB_PASSWORD", "password")
CERT_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "certificates")
CLIENT_PEM = os.environ.get("CLIENT_PEM", os.path.join(CERT_PATH, "client.pem"))
CA_PEM = os.environ.get("CA_PEM", os.path.join(CERT_PATH, "ca.pem"))
TLS_OPTIONS: Dict = {"tls": True}
if CLIENT_PEM:
TLS_OPTIONS["tlsCertificateKeyFile"] = CLIENT_PEM
if CA_PEM:
TLS_OPTIONS["tlsCAFile"] = CA_PEM
COMPRESSORS = os.environ.get("COMPRESSORS")
MONGODB_API_VERSION = os.environ.get("MONGODB_API_VERSION")
TEST_LOADBALANCER = bool(os.environ.get("TEST_LOAD_BALANCER"))
SINGLE_MONGOS_LB_URI = os.environ.get("SINGLE_MONGOS_LB_URI")
MULTI_MONGOS_LB_URI = os.environ.get("MULTI_MONGOS_LB_URI")
if TEST_LOADBALANCER:
res = parse_uri(SINGLE_MONGOS_LB_URI or "")
host, port = res["nodelist"][0]
db_user = res["username"] or db_user
db_pwd = res["password"] or db_pwd
# Shared KMS data.
LOCAL_MASTER_KEY = base64.b64decode(
b"Mng0NCt4ZHVUYUJCa1kxNkVyNUR1QURhZ2h2UzR2d2RrZzh0cFBwM3R6NmdWMDFBMUN3YkQ"
b"5aXRRMkhGRGdQV09wOGVNYUMxT2k3NjZKelhaQmRCZGJkTXVyZG9uSjFk"
)
AWS_CREDS = {
"accessKeyId": os.environ.get("FLE_AWS_KEY", ""),
"secretAccessKey": os.environ.get("FLE_AWS_SECRET", ""),
}
AWS_CREDS_2 = {
"accessKeyId": os.environ.get("FLE_AWS_KEY2", ""),
"secretAccessKey": os.environ.get("FLE_AWS_SECRET2", ""),
}
AZURE_CREDS = {
"tenantId": os.environ.get("FLE_AZURE_TENANTID", ""),
"clientId": os.environ.get("FLE_AZURE_CLIENTID", ""),
"clientSecret": os.environ.get("FLE_AZURE_CLIENTSECRET", ""),
}
GCP_CREDS = {
"email": os.environ.get("FLE_GCP_EMAIL", ""),
"privateKey": os.environ.get("FLE_GCP_PRIVATEKEY", ""),
}
KMIP_CREDS = {"endpoint": os.environ.get("FLE_KMIP_ENDPOINT", "localhost:5698")}
# Ensure Evergreen metadata doesn't result in truncation
os.environ.setdefault("MONGOB_LOG_MAX_DOCUMENT_LENGTH", "2000")
def is_server_resolvable():
"""Returns True if 'server' is resolvable."""
socket_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(1)
try:
try:
socket.gethostbyname("server")
return True
except OSError:
return False
finally:
socket.setdefaulttimeout(socket_timeout)
def _create_user(authdb, user, pwd=None, roles=None, **kwargs):
cmd = SON([("createUser", user)])
# X509 doesn't use a password
if pwd:
cmd["pwd"] = pwd
cmd["roles"] = roles or ["root"]
cmd.update(**kwargs)
return authdb.command(cmd)
async def async_repl_set_step_down(client, **kwargs):
"""Run replSetStepDown, first unfreezing a secondary with replSetFreeze."""
@ -237,133 +122,10 @@ class client_knobs:
raise Exception(msg)
def _all_users(db):
return {u["user"] for u in db.command("usersInfo").get("users", [])}
def sanitize_cmd(cmd):
cp = cmd.copy()
cp.pop("$clusterTime", None)
cp.pop("$db", None)
cp.pop("$readPreference", None)
cp.pop("lsid", None)
if MONGODB_API_VERSION:
# Stable API parameters
cp.pop("apiVersion", None)
# OP_MSG encoding may move the payload type one field to the
# end of the command. Do the same here.
name = next(iter(cp))
try:
identifier = message._FIELD_MAP[name]
docs = cp.pop(identifier)
cp[identifier] = docs
except KeyError:
pass
return cp
def sanitize_reply(reply):
cp = reply.copy()
cp.pop("$clusterTime", None)
cp.pop("operationTime", None)
return cp
def print_thread_tracebacks() -> None:
"""Print all Python thread tracebacks."""
for thread_id, frame in sys._current_frames().items():
sys.stderr.write(f"\n--- Traceback for thread {thread_id} ---\n")
traceback.print_stack(frame, file=sys.stderr)
def print_thread_stacks(pid: int) -> None:
"""Print all C-level thread stacks for a given process id."""
if sys.platform == "darwin":
cmd = ["lldb", "--attach-pid", f"{pid}", "--batch", "--one-line", '"thread backtrace all"']
else:
cmd = ["gdb", f"--pid={pid}", "--batch", '--eval-command="thread apply all bt"']
try:
res = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8"
)
except Exception as exc:
sys.stderr.write(f"Could not print C-level thread stacks because {cmd[0]} failed: {exc}")
else:
sys.stderr.write(res.stdout)
# Global knobs to speed up the test suite.
global_knobs = client_knobs(events_queue_frequency=0.05)
def _get_executors(topology):
executors = []
for server in topology._servers.values():
# Some MockMonitor do not have an _executor.
if hasattr(server._monitor, "_executor"):
executors.append(server._monitor._executor)
if hasattr(server._monitor, "_rtt_monitor"):
executors.append(server._monitor._rtt_monitor._executor)
executors.append(topology._Topology__events_executor)
if topology._srv_monitor:
executors.append(topology._srv_monitor._executor)
return [e for e in executors if e is not None]
def print_running_topology(topology):
running = [e for e in _get_executors(topology) if not e._stopped]
if running:
print(
"WARNING: found Topology with running threads:\n"
f" Threads: {running}\n"
f" Topology: {topology}\n"
f" Creation traceback:\n{topology._settings._stack}"
)
def test_cases(suite):
"""Iterator over all TestCases within a TestSuite."""
for suite_or_case in suite._tests:
if isinstance(suite_or_case, unittest.TestCase):
# unittest.TestCase
yield suite_or_case
else:
# unittest.TestSuite
yield from test_cases(suite_or_case)
# Helper method to workaround https://bugs.python.org/issue21724
def clear_warning_registry():
"""Clear the __warningregistry__ for all modules."""
for _, module in list(sys.modules.items()):
if hasattr(module, "__warningregistry__"):
module.__warningregistry__ = {} # type:ignore[attr-defined]
class SystemCertsPatcher:
def __init__(self, ca_certs):
if (
ssl.OPENSSL_VERSION.lower().startswith("libressl")
and sys.platform == "darwin"
and not _ssl.IS_PYOPENSSL
):
raise SkipTest(
"LibreSSL on OSX doesn't support setting CA certificates "
"using SSL_CERT_FILE environment variable."
)
self.original_certs = os.environ.get("SSL_CERT_FILE")
# Tell OpenSSL where CA certificates live.
os.environ["SSL_CERT_FILE"] = ca_certs
def disable(self):
if self.original_certs is None:
os.environ.pop("SSL_CERT_FILE")
else:
os.environ["SSL_CERT_FILE"] = self.original_certs
if _IS_SYNC:
PARENT = threading.Thread
else:

View File

@ -57,11 +57,14 @@ from test import (
from test.asynchronous.test_bulk import AsyncBulkTestBase
from test.asynchronous.unified_format import generate_test_classes
from test.asynchronous.utils_spec_runner import AsyncSpecRunner
from test.helpers import (
from test.helpers_shared import (
ALL_KMS_PROVIDERS,
AWS_CREDS,
AWS_TEMP_CREDS,
AZURE_CREDS,
CA_PEM,
CLIENT_PEM,
DEFAULT_KMS_TLS,
GCP_CREDS,
KMIP_CREDS,
LOCAL_MASTER_KEY,
@ -204,7 +207,7 @@ class TestAutoEncryptionOpts(AsyncPyMongoTestCase):
opts = AutoEncryptionOpts(
{},
"k.d",
kms_tls_options={"kmip": {"tlsCAFile": CA_PEM, "tlsCertificateKeyFile": CLIENT_PEM}},
kms_tls_options=DEFAULT_KMS_TLS,
)
_kms_ssl_contexts = _parse_kms_tls_options(opts._kms_tls_options, _IS_SYNC)
ctx = _kms_ssl_contexts["kmip"]
@ -616,17 +619,10 @@ class TestExplicitSimple(AsyncEncryptionIntegrationTest):
# Spec tests
AWS_TEMP_CREDS = {
"accessKeyId": os.environ.get("CSFLE_AWS_TEMP_ACCESS_KEY_ID", ""),
"secretAccessKey": os.environ.get("CSFLE_AWS_TEMP_SECRET_ACCESS_KEY", ""),
"sessionToken": os.environ.get("CSFLE_AWS_TEMP_SESSION_TOKEN", ""),
}
AWS_TEMP_NO_SESSION_CREDS = {
"accessKeyId": os.environ.get("CSFLE_AWS_TEMP_ACCESS_KEY_ID", ""),
"secretAccessKey": os.environ.get("CSFLE_AWS_TEMP_SECRET_ACCESS_KEY", ""),
}
KMS_TLS_OPTS = {"kmip": {"tlsCAFile": CA_PEM, "tlsCertificateKeyFile": CLIENT_PEM}}
class AsyncTestSpec(AsyncSpecRunner):
@ -663,7 +659,7 @@ class AsyncTestSpec(AsyncSpecRunner):
self.skipTest("GCP environment credentials are not set")
if "kmip" in kms_providers:
kms_providers["kmip"] = KMIP_CREDS
opts["kms_tls_options"] = KMS_TLS_OPTS
opts["kms_tls_options"] = DEFAULT_KMS_TLS
if "key_vault_namespace" not in opts:
opts["key_vault_namespace"] = "keyvault.datakeys"
if "extra_options" in opts:
@ -757,14 +753,6 @@ if _HAVE_PYMONGOCRYPT:
)
# Prose Tests
ALL_KMS_PROVIDERS = {
"aws": AWS_CREDS,
"azure": AZURE_CREDS,
"gcp": GCP_CREDS,
"kmip": KMIP_CREDS,
"local": {"key": LOCAL_MASTER_KEY},
}
LOCAL_KEY_ID = Binary(base64.b64decode(b"LOCALAAAAAAAAAAAAAAAAA=="), UUID_SUBTYPE)
AWS_KEY_ID = Binary(base64.b64decode(b"AWSAAAAAAAAAAAAAAAAAAA=="), UUID_SUBTYPE)
AZURE_KEY_ID = Binary(base64.b64decode(b"AZUREAAAAAAAAAAAAAAAAA=="), UUID_SUBTYPE)
@ -851,13 +839,17 @@ class TestDataKeyDoubleEncryption(AsyncEncryptionIntegrationTest):
self.KMS_PROVIDERS,
"keyvault.datakeys",
schema_map=schemas,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
)
self.client_encrypted = await self.async_rs_or_single_client(
auto_encryption_opts=opts, uuidRepresentation="standard"
)
self.client_encryption = self.create_client_encryption(
self.KMS_PROVIDERS, "keyvault.datakeys", self.client, OPTS, kms_tls_options=KMS_TLS_OPTS
self.KMS_PROVIDERS,
"keyvault.datakeys",
self.client,
OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
)
self.listener.reset()
@ -1066,7 +1058,7 @@ class TestCorpus(AsyncEncryptionIntegrationTest):
"keyvault.datakeys",
async_client_context.client,
OPTS,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
)
corpus = self.fix_up_curpus(json_data("corpus", "corpus.json"))
@ -1158,7 +1150,7 @@ class TestCorpus(AsyncEncryptionIntegrationTest):
async def test_corpus(self):
opts = AutoEncryptionOpts(
self.kms_providers(), "keyvault.datakeys", kms_tls_options=KMS_TLS_OPTS
self.kms_providers(), "keyvault.datakeys", kms_tls_options=DEFAULT_KMS_TLS
)
await self._test_corpus(opts)
@ -1169,7 +1161,7 @@ class TestCorpus(AsyncEncryptionIntegrationTest):
self.kms_providers(),
"keyvault.datakeys",
schema_map=schemas,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
)
await self._test_corpus(opts)
@ -1300,7 +1292,7 @@ class TestCustomEndpoint(AsyncEncryptionIntegrationTest):
key_vault_namespace="keyvault.datakeys",
key_vault_client=async_client_context.client,
codec_options=OPTS,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
)
kms_providers_invalid = copy.deepcopy(kms_providers)
@ -1312,7 +1304,7 @@ class TestCustomEndpoint(AsyncEncryptionIntegrationTest):
key_vault_namespace="keyvault.datakeys",
key_vault_client=async_client_context.client,
codec_options=OPTS,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
)
self._kmip_host_error = None
self._invalid_host_error = None
@ -2752,7 +2744,7 @@ class TestRewrapWithSeparateClientEncryption(AsyncEncryptionIntegrationTest):
key_vault_client=self.client,
key_vault_namespace="keyvault.datakeys",
kms_providers=ALL_KMS_PROVIDERS,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
codec_options=OPTS,
)
@ -2772,7 +2764,7 @@ class TestRewrapWithSeparateClientEncryption(AsyncEncryptionIntegrationTest):
key_vault_client=client2,
key_vault_namespace="keyvault.datakeys",
kms_providers=ALL_KMS_PROVIDERS,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
codec_options=OPTS,
)

View File

@ -37,6 +37,7 @@ from test.asynchronous import (
)
from test.asynchronous.utils import async_get_pool, flaky
from test.asynchronous.utils_spec_runner import SpecRunnerTask
from test.helpers_shared import ALL_KMS_PROVIDERS, DEFAULT_KMS_TLS
from test.unified_format_shared import (
KMS_TLS_OPTS,
PLACEHOLDER_MAP,
@ -61,6 +62,8 @@ from test.utils_shared import (
from test.version import Version
from typing import Any, Dict, List, Mapping, Optional
import pytest
import pymongo
from bson import SON, json_util
from bson.codec_options import DEFAULT_CODEC_OPTIONS
@ -76,7 +79,7 @@ from pymongo.asynchronous.database import AsyncDatabase
from pymongo.asynchronous.encryption import AsyncClientEncryption
from pymongo.asynchronous.helpers import anext
from pymongo.driver_info import DriverInfo
from pymongo.encryption_options import _HAVE_PYMONGOCRYPT
from pymongo.encryption_options import _HAVE_PYMONGOCRYPT, AutoEncryptionOpts
from pymongo.errors import (
AutoReconnect,
BulkWriteError,
@ -259,6 +262,23 @@ class EntityMapUtil:
kwargs: dict = {}
observe_events = spec.get("observeEvents", [])
if "autoEncryptOpts" in spec:
auto_encrypt_opts = spec["autoEncryptOpts"].copy()
auto_encrypt_kwargs: dict = dict(kms_tls_options=DEFAULT_KMS_TLS)
kms_providers = ALL_KMS_PROVIDERS.copy()
key_vault_namespace = auto_encrypt_opts.pop("keyVaultNamespace")
for provider_name, provider_value in auto_encrypt_opts.pop("kmsProviders").items():
kms_providers[provider_name].update(provider_value)
extra_opts = auto_encrypt_opts.pop("extraOptions", {})
for key, value in extra_opts.items():
auto_encrypt_kwargs[camel_to_snake(key)] = value
for key, value in auto_encrypt_opts.items():
auto_encrypt_kwargs[camel_to_snake(key)] = value
auto_encryption_opts = AutoEncryptionOpts(
kms_providers, key_vault_namespace, **auto_encrypt_kwargs
)
kwargs["auto_encryption_opts"] = auto_encryption_opts
# The unified tests use topologyOpeningEvent, we use topologyOpenedEvent
for i in range(len(observe_events)):
if "topologyOpeningEvent" == observe_events[i]:
@ -430,7 +450,7 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
a class attribute ``TEST_SPEC``.
"""
SCHEMA_VERSION = Version.from_string("1.22")
SCHEMA_VERSION = Version.from_string("1.23")
RUN_ON_LOAD_BALANCER = True
TEST_SPEC: Any
TEST_PATH = "" # This gets filled in by generate_test_classes
@ -462,6 +482,13 @@ class UnifiedSpecTestMixinV1(AsyncIntegrationTest):
wc = WriteConcern(w="majority")
else:
wc = WriteConcern(w=1)
# Remove any encryption collections associated with the collection.
collections = await db.list_collection_names()
for collection in collections:
if collection in [f"enxcol_.{coll_name}.esc", f"enxcol_.{coll_name}.ecoc"]:
await db.drop_collection(collection)
if documents:
if opts:
await db.create_collection(coll_name, **opts)
@ -1516,7 +1543,14 @@ def generate_test_classes(
TEST_SPEC = test_spec
EXPECTED_FAILURES = expected_failures
return SpecTestBase
base = SpecTestBase
# Add "encryption" marker if the "csfle" runOnRequirement is set.
for req in test_spec.get("runOnRequirements", []):
if req.get("csfle", False):
base = pytest.mark.encryption(base)
return base
for dirpath, _, filenames in os.walk(test_path):
dirname = os.path.split(dirpath)[-1]

View File

@ -0,0 +1,322 @@
{
"description": "fle2v2-BypassQueryAnalysis",
"schemaVersion": "1.23",
"runOnRequirements": [
{
"minServerVersion": "7.0.0",
"serverless": "forbid",
"csfle": true,
"topologies": [
"replicaset",
"sharded",
"load-balanced"
]
}
],
"createEntities": [
{
"client": {
"id": "client0",
"autoEncryptOpts": {
"kmsProviders": {
"local": {
"key": "Mng0NCt4ZHVUYUJCa1kxNkVyNUR1QURhZ2h2UzR2d2RrZzh0cFBwM3R6NmdWMDFBMUN3YkQ5aXRRMkhGRGdQV09wOGVNYUMxT2k3NjZKelhaQmRCZGJkTXVyZG9uSjFk"
}
},
"keyVaultNamespace": "keyvault.datakeys",
"bypassQueryAnalysis": true
},
"observeEvents": [
"commandStartedEvent"
]
}
},
{
"database": {
"id": "encryptedDB",
"client": "client0",
"databaseName": "default"
}
},
{
"collection": {
"id": "encryptedColl",
"database": "encryptedDB",
"collectionName": "default"
}
},
{
"client": {
"id": "client1"
}
},
{
"database": {
"id": "unencryptedDB",
"client": "client1",
"databaseName": "default"
}
},
{
"collection": {
"id": "unencryptedColl",
"database": "unencryptedDB",
"collectionName": "default"
}
}
],
"initialData": [
{
"databaseName": "keyvault",
"collectionName": "datakeys",
"documents": [
{
"_id": {
"$binary": {
"base64": "EjRWeBI0mHYSNBI0VniQEg==",
"subType": "04"
}
},
"keyMaterial": {
"$binary": {
"base64": "sHe0kz57YW7v8g9VP9sf/+K1ex4JqKc5rf/URX3n3p8XdZ6+15uXPaSayC6adWbNxkFskuMCOifDoTT+rkqMtFkDclOy884RuGGtUysq3X7zkAWYTKi8QAfKkajvVbZl2y23UqgVasdQu3OVBQCrH/xY00nNAs/52e958nVjBuzQkSb1T8pKJAyjZsHJ60+FtnfafDZSTAIBJYn7UWBCwQ==",
"subType": "00"
}
},
"creationDate": {
"$date": {
"$numberLong": "1648914851981"
}
},
"updateDate": {
"$date": {
"$numberLong": "1648914851981"
}
},
"status": {
"$numberInt": "0"
},
"masterKey": {
"provider": "local"
}
}
]
},
{
"databaseName": "default",
"collectionName": "default",
"documents": [],
"createOptions": {
"encryptedFields": {
"fields": [
{
"keyId": {
"$binary": {
"base64": "EjRWeBI0mHYSNBI0VniQEg==",
"subType": "04"
}
},
"path": "encryptedIndexed",
"bsonType": "string",
"queries": {
"queryType": "equality",
"contention": {
"$numberLong": "0"
}
}
},
{
"keyId": {
"$binary": {
"base64": "q83vqxI0mHYSNBI0VniQEg==",
"subType": "04"
}
},
"path": "encryptedUnindexed",
"bsonType": "string"
}
]
}
}
}
],
"tests": [
{
"description": "BypassQueryAnalysis decrypts",
"operations": [
{
"object": "encryptedColl",
"name": "insertOne",
"arguments": {
"document": {
"_id": 1,
"encryptedIndexed": {
"$binary": {
"base64": "C18BAAAFZAAgAAAAANnt+eLTkv4GdDPl8IAfJOvTzArOgFJQ2S/DcLza4W0DBXMAIAAAAAD2u+omZme3P2gBPehMQyQHQ153tPN1+z7bksYA9jKTpAVwADAAAAAAUnCOQqIvmR65YKyYnsiVfVrg9hwUVO3RhhKExo3RWOzgaS0QdsBL5xKFS0JhZSoWBXUAEAAAAAQSNFZ4EjSYdhI0EjRWeJASEHQAAgAAAAV2AFAAAAAAEjRWeBI0mHYSNBI0VniQEpQbp/ZJpWBKeDtKLiXb0P2E9wvc0g3f373jnYQYlJquOrlPOoEy3ngsHPJuSUijvWDsrQzqYa349K7G/66qaXEFZQAgAAAAAOuac/eRLYakKX6B0vZ1r3QodOQFfjqJD+xlGiPu4/PsBWwAIAAAAACkm0o9bj6j0HuADKc0svbqO2UHj6GrlNdF6yKNxh63xRJrAAAAAAAAAAAAAA==",
"subType": "06"
}
}
}
}
},
{
"object": "encryptedColl",
"name": "find",
"arguments": {
"filter": {
"_id": 1
}
},
"expectResult": [
{
"_id": 1,
"encryptedIndexed": "123"
}
]
},
{
"object": "unencryptedColl",
"name": "find",
"arguments": {
"filter": {}
},
"expectResult": [
{
"_id": 1,
"encryptedIndexed": {
"$$type": "binData"
},
"__safeContent__": [
{
"$binary": {
"base64": "31eCYlbQoVboc5zwC8IoyJVSkag9PxREka8dkmbXJeY=",
"subType": "00"
}
}
]
}
]
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"listCollections": 1,
"filter": {
"name": "default"
}
},
"commandName": "listCollections"
}
},
{
"commandStartedEvent": {
"command": {
"insert": "default",
"documents": [
{
"_id": 1,
"encryptedIndexed": {
"$binary": {
"base64": "C18BAAAFZAAgAAAAANnt+eLTkv4GdDPl8IAfJOvTzArOgFJQ2S/DcLza4W0DBXMAIAAAAAD2u+omZme3P2gBPehMQyQHQ153tPN1+z7bksYA9jKTpAVwADAAAAAAUnCOQqIvmR65YKyYnsiVfVrg9hwUVO3RhhKExo3RWOzgaS0QdsBL5xKFS0JhZSoWBXUAEAAAAAQSNFZ4EjSYdhI0EjRWeJASEHQAAgAAAAV2AFAAAAAAEjRWeBI0mHYSNBI0VniQEpQbp/ZJpWBKeDtKLiXb0P2E9wvc0g3f373jnYQYlJquOrlPOoEy3ngsHPJuSUijvWDsrQzqYa349K7G/66qaXEFZQAgAAAAAOuac/eRLYakKX6B0vZ1r3QodOQFfjqJD+xlGiPu4/PsBWwAIAAAAACkm0o9bj6j0HuADKc0svbqO2UHj6GrlNdF6yKNxh63xRJrAAAAAAAAAAAAAA==",
"subType": "06"
}
}
}
],
"ordered": true,
"encryptionInformation": {
"type": 1,
"schema": {
"default.default": {
"escCollection": "enxcol_.default.esc",
"ecocCollection": "enxcol_.default.ecoc",
"fields": [
{
"keyId": {
"$binary": {
"base64": "EjRWeBI0mHYSNBI0VniQEg==",
"subType": "04"
}
},
"path": "encryptedIndexed",
"bsonType": "string",
"queries": {
"queryType": "equality",
"contention": {
"$numberLong": "0"
}
}
},
{
"keyId": {
"$binary": {
"base64": "q83vqxI0mHYSNBI0VniQEg==",
"subType": "04"
}
},
"path": "encryptedUnindexed",
"bsonType": "string"
}
]
}
}
}
},
"commandName": "insert"
}
},
{
"commandStartedEvent": {
"command": {
"find": "default",
"filter": {
"_id": 1
}
},
"commandName": "find"
}
},
{
"commandStartedEvent": {
"command": {
"find": "datakeys",
"filter": {
"$or": [
{
"_id": {
"$in": [
{
"$binary": {
"base64": "EjRWeBI0mHYSNBI0VniQEg==",
"subType": "04"
}
}
]
}
},
{
"keyAltNames": {
"$in": []
}
}
]
},
"$db": "keyvault",
"readConcern": {
"level": "majority"
}
},
"commandName": "find"
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,256 @@
{
"description": "fle2v2-EncryptedFields-vs-EncryptedFieldsMap",
"schemaVersion": "1.23",
"runOnRequirements": [
{
"minServerVersion": "7.0.0",
"serverless": "forbid",
"csfle": true,
"topologies": [
"replicaset",
"sharded",
"load-balanced"
]
}
],
"createEntities": [
{
"client": {
"id": "client0",
"autoEncryptOpts": {
"kmsProviders": {
"local": {
"key": "Mng0NCt4ZHVUYUJCa1kxNkVyNUR1QURhZ2h2UzR2d2RrZzh0cFBwM3R6NmdWMDFBMUN3YkQ5aXRRMkhGRGdQV09wOGVNYUMxT2k3NjZKelhaQmRCZGJkTXVyZG9uSjFk"
}
},
"keyVaultNamespace": "keyvault.datakeys",
"encryptedFieldsMap": {
"default.default": {
"fields": []
}
}
},
"observeEvents": [
"commandStartedEvent"
]
}
},
{
"database": {
"id": "encryptedDB",
"client": "client0",
"databaseName": "default"
}
},
{
"collection": {
"id": "encryptedColl",
"database": "encryptedDB",
"collectionName": "default"
}
}
],
"initialData": [
{
"databaseName": "keyvault",
"collectionName": "datakeys",
"documents": [
{
"_id": {
"$binary": {
"base64": "q83vqxI0mHYSNBI0VniQEg==",
"subType": "04"
}
},
"keyMaterial": {
"$binary": {
"base64": "HBk9BWihXExNDvTp1lUxOuxuZK2Pe2ZdVdlsxPEBkiO1bS4mG5NNDsQ7zVxJAH8BtdOYp72Ku4Y3nwc0BUpIKsvAKX4eYXtlhv5zUQxWdeNFhg9qK7qb8nqhnnLeT0f25jFSqzWJoT379hfwDeu0bebJHr35QrJ8myZdPMTEDYF08QYQ48ShRBli0S+QzBHHAQiM2iJNr4svg2WR8JSeWQ==",
"subType": "00"
}
},
"creationDate": {
"$date": {
"$numberLong": "1648914851981"
}
},
"updateDate": {
"$date": {
"$numberLong": "1648914851981"
}
},
"status": {
"$numberInt": "0"
},
"masterKey": {
"provider": "local"
}
}
]
},
{
"databaseName": "default",
"collectionName": "default",
"documents": [],
"createOptions": {
"encryptedFields": {
"fields": [
{
"keyId": {
"$binary": {
"base64": "EjRWeBI0mHYSNBI0VniQEg==",
"subType": "04"
}
},
"path": "encryptedIndexed",
"bsonType": "string",
"queries": {
"queryType": "equality",
"contention": {
"$numberLong": "0"
}
}
},
{
"keyId": {
"$binary": {
"base64": "q83vqxI0mHYSNBI0VniQEg==",
"subType": "04"
}
},
"path": "encryptedUnindexed",
"bsonType": "string"
}
]
}
}
}
],
"tests": [
{
"description": "encryptedFieldsMap is preferred over remote encryptedFields",
"operations": [
{
"object": "encryptedColl",
"name": "insertOne",
"arguments": {
"document": {
"_id": 1,
"encryptedUnindexed": {
"$binary": {
"base64": "BqvN76sSNJh2EjQSNFZ4kBICTQaVZPWgXp41I7mPV1rLFTtw1tXzjcdSEyxpKKqujlko5TeizkB9hHQ009dVY1+fgIiDcefh+eQrm3CkhQ==",
"subType": "06"
}
}
}
}
},
{
"object": "encryptedColl",
"name": "find",
"arguments": {
"filter": {
"_id": 1
}
},
"expectResult": [
{
"_id": 1,
"encryptedUnindexed": "value123"
}
]
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"databaseName": "default",
"commandName": "insert",
"command": {
"insert": "default",
"documents": [
{
"_id": 1,
"encryptedUnindexed": {
"$binary": {
"base64": "BqvN76sSNJh2EjQSNFZ4kBICTQaVZPWgXp41I7mPV1rLFTtw1tXzjcdSEyxpKKqujlko5TeizkB9hHQ009dVY1+fgIiDcefh+eQrm3CkhQ==",
"subType": "06"
}
}
}
],
"ordered": true
}
}
},
{
"commandStartedEvent": {
"databaseName": "default",
"commandName": "find",
"command": {
"find": "default",
"filter": {
"_id": 1
}
}
}
},
{
"commandStartedEvent": {
"databaseName": "keyvault",
"commandName": "find",
"command": {
"find": "datakeys",
"filter": {
"$or": [
{
"_id": {
"$in": [
{
"$binary": {
"base64": "q83vqxI0mHYSNBI0VniQEg==",
"subType": "04"
}
}
]
}
},
{
"keyAltNames": {
"$in": []
}
}
]
},
"$db": "keyvault",
"readConcern": {
"level": "majority"
}
}
}
}
]
}
],
"outcome": [
{
"collectionName": "default",
"databaseName": "default",
"documents": [
{
"_id": 1,
"encryptedUnindexed": {
"$binary": {
"base64": "BqvN76sSNJh2EjQSNFZ4kBICTQaVZPWgXp41I7mPV1rLFTtw1tXzjcdSEyxpKKqujlko5TeizkB9hHQ009dVY1+fgIiDcefh+eQrm3CkhQ==",
"subType": "06"
}
}
}
]
}
]
}
]
}

View File

@ -0,0 +1,343 @@
{
"description": "localSchema",
"schemaVersion": "1.23",
"runOnRequirements": [
{
"minServerVersion": "4.1.10",
"csfle": true
}
],
"createEntities": [
{
"client": {
"id": "client0",
"autoEncryptOpts": {
"schemaMap": {
"default.default": {
"properties": {
"encrypted_w_altname": {
"encrypt": {
"keyId": "/altname",
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
},
"encrypted_string": {
"encrypt": {
"keyId": [
{
"$binary": {
"base64": "AAAAAAAAAAAAAAAAAAAAAA==",
"subType": "04"
}
}
],
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"
}
},
"random": {
"encrypt": {
"keyId": [
{
"$binary": {
"base64": "AAAAAAAAAAAAAAAAAAAAAA==",
"subType": "04"
}
}
],
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Random"
}
},
"encrypted_string_equivalent": {
"encrypt": {
"keyId": [
{
"$binary": {
"base64": "AAAAAAAAAAAAAAAAAAAAAA==",
"subType": "04"
}
}
],
"bsonType": "string",
"algorithm": "AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"
}
}
},
"bsonType": "object"
}
},
"keyVaultNamespace": "keyvault.datakeys",
"kmsProviders": {
"aws": {
"accessKeyId": {
"$$placeholder": 1
},
"secretAccessKey": {
"$$placeholder": 1
},
"sessionToken": {
"$$placeholder": 1
}
}
}
},
"observeEvents": [
"commandStartedEvent"
]
}
},
{
"client": {
"id": "client1",
"autoEncryptOpts": {
"schemaMap": {
"default.default": {
"properties": {
"test": {
"bsonType": "string"
}
},
"bsonType": "object",
"required": [
"test"
]
}
},
"keyVaultNamespace": "keyvault.datakeys",
"kmsProviders": {
"aws": {
"accessKeyId": {
"$$placeholder": 1
},
"secretAccessKey": {
"$$placeholder": 1
},
"sessionToken": {
"$$placeholder": 1
}
}
}
},
"observeEvents": [
"commandStartedEvent"
]
}
},
{
"database": {
"id": "encryptedDB",
"client": "client0",
"databaseName": "default"
}
},
{
"collection": {
"id": "encryptedColl",
"database": "encryptedDB",
"collectionName": "default"
}
},
{
"database": {
"id": "encryptedDB2",
"client": "client1",
"databaseName": "default"
}
},
{
"collection": {
"id": "encryptedColl2",
"database": "encryptedDB2",
"collectionName": "default"
}
}
],
"initialData": [
{
"databaseName": "keyvault",
"collectionName": "datakeys",
"documents": [
{
"status": 1,
"_id": {
"$binary": {
"base64": "AAAAAAAAAAAAAAAAAAAAAA==",
"subType": "04"
}
},
"masterKey": {
"provider": "aws",
"key": "arn:aws:kms:us-east-1:579766882180:key/89fcc2c4-08b0-4bd9-9f25-e30687b580d0",
"region": "us-east-1"
},
"updateDate": {
"$date": {
"$numberLong": "1552949630483"
}
},
"keyMaterial": {
"$binary": {
"base64": "AQICAHhQNmWG2CzOm1dq3kWLM+iDUZhEqnhJwH9wZVpuZ94A8gEqnsxXlR51T5EbEVezUqqKAAAAwjCBvwYJKoZIhvcNAQcGoIGxMIGuAgEAMIGoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDHa4jo6yp0Z18KgbUgIBEIB74sKxWtV8/YHje5lv5THTl0HIbhSwM6EqRlmBiFFatmEWaeMk4tO4xBX65eq670I5TWPSLMzpp8ncGHMmvHqRajNBnmFtbYxN3E3/WjxmdbOOe+OXpnGJPcGsftc7cB2shRfA4lICPnE26+oVNXT6p0Lo20nY5XC7jyCO",
"subType": "00"
}
},
"creationDate": {
"$date": {
"$numberLong": "1552949630483"
}
},
"keyAltNames": [
"altname",
"another_altname"
]
}
]
},
{
"databaseName": "default",
"collectionName": "default",
"documents": []
}
],
"tests": [
{
"description": "A local schema should override",
"operations": [
{
"object": "encryptedColl",
"name": "insertOne",
"arguments": {
"document": {
"_id": 1,
"encrypted_string": "string0"
}
}
},
{
"object": "encryptedColl",
"name": "find",
"arguments": {
"filter": {
"_id": 1
}
},
"expectResult": [
{
"_id": 1,
"encrypted_string": "string0"
}
]
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"databaseName": "keyvault",
"commandName": "find",
"command": {
"find": "datakeys",
"filter": {
"$or": [
{
"_id": {
"$in": [
{
"$binary": {
"base64": "AAAAAAAAAAAAAAAAAAAAAA==",
"subType": "04"
}
}
]
}
},
{
"keyAltNames": {
"$in": []
}
}
]
},
"readConcern": {
"level": "majority"
}
}
}
},
{
"commandStartedEvent": {
"commandName": "insert",
"command": {
"insert": "default",
"documents": [
{
"_id": 1,
"encrypted_string": {
"$binary": {
"base64": "AQAAAAAAAAAAAAAAAAAAAAACwj+3zkv2VM+aTfk60RqhXq6a/77WlLwu/BxXFkL7EppGsju/m8f0x5kBDD3EZTtGALGXlym5jnpZAoSIkswHoA==",
"subType": "06"
}
}
}
],
"ordered": true
}
}
},
{
"commandStartedEvent": {
"commandName": "find",
"command": {
"find": "default",
"filter": {
"_id": 1
}
}
}
}
]
}
],
"outcome": [
{
"collectionName": "default",
"databaseName": "default",
"documents": [
{
"_id": 1,
"encrypted_string": {
"$binary": {
"base64": "AQAAAAAAAAAAAAAAAAAAAAACwj+3zkv2VM+aTfk60RqhXq6a/77WlLwu/BxXFkL7EppGsju/m8f0x5kBDD3EZTtGALGXlym5jnpZAoSIkswHoA==",
"subType": "06"
}
}
}
]
}
]
},
{
"description": "A local schema with no encryption is an error",
"operations": [
{
"object": "encryptedColl2",
"name": "insertOne",
"arguments": {
"document": {
"_id": 1,
"encrypted_string": "string0"
}
},
"expectError": {
"isError": true,
"errorContains": "JSON schema keyword 'required' is only allowed with a remote schema"
}
}
]
}
]
}

View File

@ -0,0 +1,101 @@
{
"description": "maxWireVersion",
"schemaVersion": "1.23",
"runOnRequirements": [
{
"maxServerVersion": "4.0.99",
"csfle": true
}
],
"createEntities": [
{
"client": {
"id": "client0",
"autoEncryptOpts": {
"kmsProviders": {
"aws": {}
},
"keyVaultNamespace": "keyvault.datakeys",
"extraOptions": {
"mongocryptdBypassSpawn": true
}
}
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "default"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "default"
}
}
],
"initialData": [
{
"databaseName": "keyvault",
"collectionName": "datakeys",
"documents": [
{
"status": 1,
"_id": {
"$binary": {
"base64": "AAAAAAAAAAAAAAAAAAAAAA==",
"subType": "04"
}
},
"masterKey": {
"provider": "aws",
"key": "arn:aws:kms:us-east-1:579766882180:key/89fcc2c4-08b0-4bd9-9f25-e30687b580d0",
"region": "us-east-1"
},
"updateDate": {
"$date": {
"$numberLong": "1552949630483"
}
},
"keyMaterial": {
"$binary": {
"base64": "AQICAHhQNmWG2CzOm1dq3kWLM+iDUZhEqnhJwH9wZVpuZ94A8gEqnsxXlR51T5EbEVezUqqKAAAAwjCBvwYJKoZIhvcNAQcGoIGxMIGuAgEAMIGoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDHa4jo6yp0Z18KgbUgIBEIB74sKxWtV8/YHje5lv5THTl0HIbhSwM6EqRlmBiFFatmEWaeMk4tO4xBX65eq670I5TWPSLMzpp8ncGHMmvHqRajNBnmFtbYxN3E3/WjxmdbOOe+OXpnGJPcGsftc7cB2shRfA4lICPnE26+oVNXT6p0Lo20nY5XC7jyCO",
"subType": "00"
}
},
"creationDate": {
"$date": {
"$numberLong": "1552949630483"
}
},
"keyAltNames": [
"altname",
"another_altname"
]
}
]
}
],
"tests": [
{
"description": "operation fails with maxWireVersion < 8",
"operations": [
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"encrypted_string": "string0"
}
},
"expectError": {
"errorContains": "Auto-encryption requires a minimum MongoDB version of 4.2"
}
}
]
}
]
}

View File

@ -12,137 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Shared constants and helper methods for pymongo, bson, and gridfs test suites."""
"""Shared helper methods for pymongo, bson, and gridfs test suites."""
from __future__ import annotations
import asyncio
import base64
import gc
import multiprocessing
import os
import signal
import socket
import subprocess
import sys
import threading
import time
import traceback
import unittest
import warnings
from inspect import iscoroutinefunction
from pymongo._asyncio_task import create_task
try:
import ipaddress
HAVE_IPADDRESS = True
except ImportError:
HAVE_IPADDRESS = False
from functools import wraps
from typing import Any, Callable, Dict, Generator, Optional, no_type_check
from unittest import SkipTest
from typing import Optional, no_type_check
from bson.son import SON
from pymongo import common, message
from bson import SON
from pymongo import common
from pymongo._asyncio_task import create_task
from pymongo.read_preferences import ReadPreference
from pymongo.ssl_support import HAVE_SSL, _ssl # type:ignore[attr-defined]
from pymongo.synchronous.uri_parser import parse_uri
if HAVE_SSL:
import ssl
_IS_SYNC = True
# Enable debug output for uncollectable objects. PyPy does not have set_debug.
if hasattr(gc, "set_debug"):
gc.set_debug(
gc.DEBUG_UNCOLLECTABLE | getattr(gc, "DEBUG_OBJECTS", 0) | getattr(gc, "DEBUG_INSTANCES", 0)
)
# The host and port of a single mongod or mongos, or the seed host
# for a replica set.
host = os.environ.get("DB_IP", "localhost")
port = int(os.environ.get("DB_PORT", 27017))
IS_SRV = "mongodb+srv" in host
db_user = os.environ.get("DB_USER", "user")
db_pwd = os.environ.get("DB_PASSWORD", "password")
CERT_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "certificates")
CLIENT_PEM = os.environ.get("CLIENT_PEM", os.path.join(CERT_PATH, "client.pem"))
CA_PEM = os.environ.get("CA_PEM", os.path.join(CERT_PATH, "ca.pem"))
TLS_OPTIONS: Dict = {"tls": True}
if CLIENT_PEM:
TLS_OPTIONS["tlsCertificateKeyFile"] = CLIENT_PEM
if CA_PEM:
TLS_OPTIONS["tlsCAFile"] = CA_PEM
COMPRESSORS = os.environ.get("COMPRESSORS")
MONGODB_API_VERSION = os.environ.get("MONGODB_API_VERSION")
TEST_LOADBALANCER = bool(os.environ.get("TEST_LOAD_BALANCER"))
SINGLE_MONGOS_LB_URI = os.environ.get("SINGLE_MONGOS_LB_URI")
MULTI_MONGOS_LB_URI = os.environ.get("MULTI_MONGOS_LB_URI")
if TEST_LOADBALANCER:
res = parse_uri(SINGLE_MONGOS_LB_URI or "")
host, port = res["nodelist"][0]
db_user = res["username"] or db_user
db_pwd = res["password"] or db_pwd
# Shared KMS data.
LOCAL_MASTER_KEY = base64.b64decode(
b"Mng0NCt4ZHVUYUJCa1kxNkVyNUR1QURhZ2h2UzR2d2RrZzh0cFBwM3R6NmdWMDFBMUN3YkQ"
b"5aXRRMkhGRGdQV09wOGVNYUMxT2k3NjZKelhaQmRCZGJkTXVyZG9uSjFk"
)
AWS_CREDS = {
"accessKeyId": os.environ.get("FLE_AWS_KEY", ""),
"secretAccessKey": os.environ.get("FLE_AWS_SECRET", ""),
}
AWS_CREDS_2 = {
"accessKeyId": os.environ.get("FLE_AWS_KEY2", ""),
"secretAccessKey": os.environ.get("FLE_AWS_SECRET2", ""),
}
AZURE_CREDS = {
"tenantId": os.environ.get("FLE_AZURE_TENANTID", ""),
"clientId": os.environ.get("FLE_AZURE_CLIENTID", ""),
"clientSecret": os.environ.get("FLE_AZURE_CLIENTSECRET", ""),
}
GCP_CREDS = {
"email": os.environ.get("FLE_GCP_EMAIL", ""),
"privateKey": os.environ.get("FLE_GCP_PRIVATEKEY", ""),
}
KMIP_CREDS = {"endpoint": os.environ.get("FLE_KMIP_ENDPOINT", "localhost:5698")}
# Ensure Evergreen metadata doesn't result in truncation
os.environ.setdefault("MONGOB_LOG_MAX_DOCUMENT_LENGTH", "2000")
def is_server_resolvable():
"""Returns True if 'server' is resolvable."""
socket_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(1)
try:
try:
socket.gethostbyname("server")
return True
except OSError:
return False
finally:
socket.setdefaulttimeout(socket_timeout)
def _create_user(authdb, user, pwd=None, roles=None, **kwargs):
cmd = SON([("createUser", user)])
# X509 doesn't use a password
if pwd:
cmd["pwd"] = pwd
cmd["roles"] = roles or ["root"]
cmd.update(**kwargs)
return authdb.command(cmd)
def repl_set_step_down(client, **kwargs):
"""Run replSetStepDown, first unfreezing a secondary with replSetFreeze."""
@ -237,133 +122,10 @@ class client_knobs:
raise Exception(msg)
def _all_users(db):
return {u["user"] for u in db.command("usersInfo").get("users", [])}
def sanitize_cmd(cmd):
cp = cmd.copy()
cp.pop("$clusterTime", None)
cp.pop("$db", None)
cp.pop("$readPreference", None)
cp.pop("lsid", None)
if MONGODB_API_VERSION:
# Stable API parameters
cp.pop("apiVersion", None)
# OP_MSG encoding may move the payload type one field to the
# end of the command. Do the same here.
name = next(iter(cp))
try:
identifier = message._FIELD_MAP[name]
docs = cp.pop(identifier)
cp[identifier] = docs
except KeyError:
pass
return cp
def sanitize_reply(reply):
cp = reply.copy()
cp.pop("$clusterTime", None)
cp.pop("operationTime", None)
return cp
def print_thread_tracebacks() -> None:
"""Print all Python thread tracebacks."""
for thread_id, frame in sys._current_frames().items():
sys.stderr.write(f"\n--- Traceback for thread {thread_id} ---\n")
traceback.print_stack(frame, file=sys.stderr)
def print_thread_stacks(pid: int) -> None:
"""Print all C-level thread stacks for a given process id."""
if sys.platform == "darwin":
cmd = ["lldb", "--attach-pid", f"{pid}", "--batch", "--one-line", '"thread backtrace all"']
else:
cmd = ["gdb", f"--pid={pid}", "--batch", '--eval-command="thread apply all bt"']
try:
res = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8"
)
except Exception as exc:
sys.stderr.write(f"Could not print C-level thread stacks because {cmd[0]} failed: {exc}")
else:
sys.stderr.write(res.stdout)
# Global knobs to speed up the test suite.
global_knobs = client_knobs(events_queue_frequency=0.05)
def _get_executors(topology):
executors = []
for server in topology._servers.values():
# Some MockMonitor do not have an _executor.
if hasattr(server._monitor, "_executor"):
executors.append(server._monitor._executor)
if hasattr(server._monitor, "_rtt_monitor"):
executors.append(server._monitor._rtt_monitor._executor)
executors.append(topology._Topology__events_executor)
if topology._srv_monitor:
executors.append(topology._srv_monitor._executor)
return [e for e in executors if e is not None]
def print_running_topology(topology):
running = [e for e in _get_executors(topology) if not e._stopped]
if running:
print(
"WARNING: found Topology with running threads:\n"
f" Threads: {running}\n"
f" Topology: {topology}\n"
f" Creation traceback:\n{topology._settings._stack}"
)
def test_cases(suite):
"""Iterator over all TestCases within a TestSuite."""
for suite_or_case in suite._tests:
if isinstance(suite_or_case, unittest.TestCase):
# unittest.TestCase
yield suite_or_case
else:
# unittest.TestSuite
yield from test_cases(suite_or_case)
# Helper method to workaround https://bugs.python.org/issue21724
def clear_warning_registry():
"""Clear the __warningregistry__ for all modules."""
for _, module in list(sys.modules.items()):
if hasattr(module, "__warningregistry__"):
module.__warningregistry__ = {} # type:ignore[attr-defined]
class SystemCertsPatcher:
def __init__(self, ca_certs):
if (
ssl.OPENSSL_VERSION.lower().startswith("libressl")
and sys.platform == "darwin"
and not _ssl.IS_PYOPENSSL
):
raise SkipTest(
"LibreSSL on OSX doesn't support setting CA certificates "
"using SSL_CERT_FILE environment variable."
)
self.original_certs = os.environ.get("SSL_CERT_FILE")
# Tell OpenSSL where CA certificates live.
os.environ["SSL_CERT_FILE"] = ca_certs
def disable(self):
if self.original_certs is None:
os.environ.pop("SSL_CERT_FILE")
else:
os.environ["SSL_CERT_FILE"] = self.original_certs
if _IS_SYNC:
PARENT = threading.Thread
else:

271
test/helpers_shared.py Normal file
View File

@ -0,0 +1,271 @@
# Copyright 2019-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import base64
import gc
import os
import socket
import subprocess
import sys
import traceback
import unittest
from pathlib import Path
try:
import ipaddress
HAVE_IPADDRESS = True
except ImportError:
HAVE_IPADDRESS = False
from functools import wraps
from typing import no_type_check
from unittest import SkipTest
from bson.son import SON
from pymongo import message
from pymongo.ssl_support import HAVE_SSL, _ssl # type:ignore[attr-defined]
from pymongo.synchronous.uri_parser import parse_uri
if HAVE_SSL:
import ssl
# Enable debug output for uncollectable objects. PyPy does not have set_debug.
if hasattr(gc, "set_debug"):
gc.set_debug(
gc.DEBUG_UNCOLLECTABLE | getattr(gc, "DEBUG_OBJECTS", 0) | getattr(gc, "DEBUG_INSTANCES", 0)
)
# The host and port of a single mongod or mongos, or the seed host
# for a replica set.
host = os.environ.get("DB_IP", "localhost")
port = int(os.environ.get("DB_PORT", 27017))
IS_SRV = "mongodb+srv" in host
db_user = os.environ.get("DB_USER", "user")
db_pwd = os.environ.get("DB_PASSWORD", "password")
HERE = Path(__file__).absolute()
CERT_PATH = str(HERE.parent / "certificates")
CLIENT_PEM = os.environ.get("CLIENT_PEM", os.path.join(CERT_PATH, "client.pem"))
CA_PEM = os.environ.get("CA_PEM", os.path.join(CERT_PATH, "ca.pem"))
TLS_OPTIONS: dict = {"tls": True}
if CLIENT_PEM:
TLS_OPTIONS["tlsCertificateKeyFile"] = CLIENT_PEM
if CA_PEM:
TLS_OPTIONS["tlsCAFile"] = CA_PEM
COMPRESSORS = os.environ.get("COMPRESSORS")
MONGODB_API_VERSION = os.environ.get("MONGODB_API_VERSION")
TEST_LOADBALANCER = bool(os.environ.get("TEST_LOAD_BALANCER"))
SINGLE_MONGOS_LB_URI = os.environ.get("SINGLE_MONGOS_LB_URI")
MULTI_MONGOS_LB_URI = os.environ.get("MULTI_MONGOS_LB_URI")
if TEST_LOADBALANCER:
res = parse_uri(SINGLE_MONGOS_LB_URI or "")
host, port = res["nodelist"][0]
db_user = res["username"] or db_user
db_pwd = res["password"] or db_pwd
# Shared KMS data.
LOCAL_MASTER_KEY = base64.b64decode(
b"Mng0NCt4ZHVUYUJCa1kxNkVyNUR1QURhZ2h2UzR2d2RrZzh0cFBwM3R6NmdWMDFBMUN3YkQ"
b"5aXRRMkhGRGdQV09wOGVNYUMxT2k3NjZKelhaQmRCZGJkTXVyZG9uSjFk"
)
AWS_CREDS = {
"accessKeyId": os.environ.get("FLE_AWS_KEY", ""),
"secretAccessKey": os.environ.get("FLE_AWS_SECRET", ""),
}
AWS_CREDS_2 = {
"accessKeyId": os.environ.get("FLE_AWS_KEY2", ""),
"secretAccessKey": os.environ.get("FLE_AWS_SECRET2", ""),
}
AZURE_CREDS = {
"tenantId": os.environ.get("FLE_AZURE_TENANTID", ""),
"clientId": os.environ.get("FLE_AZURE_CLIENTID", ""),
"clientSecret": os.environ.get("FLE_AZURE_CLIENTSECRET", ""),
}
GCP_CREDS = {
"email": os.environ.get("FLE_GCP_EMAIL", ""),
"privateKey": os.environ.get("FLE_GCP_PRIVATEKEY", ""),
}
KMIP_CREDS = {"endpoint": os.environ.get("FLE_KMIP_ENDPOINT", "localhost:5698")}
AWS_TEMP_CREDS = {
"accessKeyId": os.environ.get("CSFLE_AWS_TEMP_ACCESS_KEY_ID", ""),
"secretAccessKey": os.environ.get("CSFLE_AWS_TEMP_SECRET_ACCESS_KEY", ""),
"sessionToken": os.environ.get("CSFLE_AWS_TEMP_SESSION_TOKEN", ""),
}
ALL_KMS_PROVIDERS = dict(
aws=AWS_CREDS,
azure=AZURE_CREDS,
gcp=GCP_CREDS,
local=dict(key=LOCAL_MASTER_KEY),
kmip=KMIP_CREDS,
)
DEFAULT_KMS_TLS = dict(kmip=dict(tlsCAFile=CA_PEM, tlsCertificateKeyFile=CLIENT_PEM))
# Ensure Evergreen metadata doesn't result in truncation
os.environ.setdefault("MONGOB_LOG_MAX_DOCUMENT_LENGTH", "2000")
def is_server_resolvable():
"""Returns True if 'server' is resolvable."""
socket_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(1)
try:
try:
socket.gethostbyname("server")
return True
except OSError:
return False
finally:
socket.setdefaulttimeout(socket_timeout)
def _create_user(authdb, user, pwd=None, roles=None, **kwargs):
cmd = SON([("createUser", user)])
# X509 doesn't use a password
if pwd:
cmd["pwd"] = pwd
cmd["roles"] = roles or ["root"]
cmd.update(**kwargs)
return authdb.command(cmd)
def _all_users(db):
return {u["user"] for u in db.command("usersInfo").get("users", [])}
def sanitize_cmd(cmd):
cp = cmd.copy()
cp.pop("$clusterTime", None)
cp.pop("$db", None)
cp.pop("$readPreference", None)
cp.pop("lsid", None)
if MONGODB_API_VERSION:
# Stable API parameters
cp.pop("apiVersion", None)
# OP_MSG encoding may move the payload type one field to the
# end of the command. Do the same here.
name = next(iter(cp))
try:
identifier = message._FIELD_MAP[name]
docs = cp.pop(identifier)
cp[identifier] = docs
except KeyError:
pass
return cp
def sanitize_reply(reply):
cp = reply.copy()
cp.pop("$clusterTime", None)
cp.pop("operationTime", None)
return cp
def print_thread_tracebacks() -> None:
"""Print all Python thread tracebacks."""
for thread_id, frame in sys._current_frames().items():
sys.stderr.write(f"\n--- Traceback for thread {thread_id} ---\n")
traceback.print_stack(frame, file=sys.stderr)
def print_thread_stacks(pid: int) -> None:
"""Print all C-level thread stacks for a given process id."""
if sys.platform == "darwin":
cmd = ["lldb", "--attach-pid", f"{pid}", "--batch", "--one-line", '"thread backtrace all"']
else:
cmd = ["gdb", f"--pid={pid}", "--batch", '--eval-command="thread apply all bt"']
try:
res = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8"
)
except Exception as exc:
sys.stderr.write(f"Could not print C-level thread stacks because {cmd[0]} failed: {exc}")
else:
sys.stderr.write(res.stdout)
def _get_executors(topology):
executors = []
for server in topology._servers.values():
# Some MockMonitor do not have an _executor.
if hasattr(server._monitor, "_executor"):
executors.append(server._monitor._executor)
if hasattr(server._monitor, "_rtt_monitor"):
executors.append(server._monitor._rtt_monitor._executor)
executors.append(topology._Topology__events_executor)
if topology._srv_monitor:
executors.append(topology._srv_monitor._executor)
return [e for e in executors if e is not None]
def print_running_topology(topology):
running = [e for e in _get_executors(topology) if not e._stopped]
if running:
print(
"WARNING: found Topology with running threads:\n"
f" Threads: {running}\n"
f" Topology: {topology}\n"
f" Creation traceback:\n{topology._settings._stack}"
)
def test_cases(suite):
"""Iterator over all TestCases within a TestSuite."""
for suite_or_case in suite._tests:
if isinstance(suite_or_case, unittest.TestCase):
# unittest.TestCase
yield suite_or_case
else:
# unittest.TestSuite
yield from test_cases(suite_or_case)
# Helper method to workaround https://bugs.python.org/issue21724
def clear_warning_registry():
"""Clear the __warningregistry__ for all modules."""
for _, module in list(sys.modules.items()):
if hasattr(module, "__warningregistry__"):
module.__warningregistry__ = {} # type:ignore[attr-defined]
class SystemCertsPatcher:
def __init__(self, ca_certs):
if (
ssl.OPENSSL_VERSION.lower().startswith("libressl")
and sys.platform == "darwin"
and not _ssl.IS_PYOPENSSL
):
raise SkipTest(
"LibreSSL on OSX doesn't support setting CA certificates "
"using SSL_CERT_FILE environment variable."
)
self.original_certs = os.environ.get("SSL_CERT_FILE")
# Tell OpenSSL where CA certificates live.
os.environ["SSL_CERT_FILE"] = ca_certs
def disable(self):
if self.original_certs is None:
os.environ.pop("SSL_CERT_FILE")
else:
os.environ["SSL_CERT_FILE"] = self.original_certs

View File

@ -54,11 +54,14 @@ sys.path[0:0] = [""]
from test import (
unittest,
)
from test.helpers import (
from test.helpers_shared import (
ALL_KMS_PROVIDERS,
AWS_CREDS,
AWS_TEMP_CREDS,
AZURE_CREDS,
CA_PEM,
CLIENT_PEM,
DEFAULT_KMS_TLS,
GCP_CREDS,
KMIP_CREDS,
LOCAL_MASTER_KEY,
@ -204,7 +207,7 @@ class TestAutoEncryptionOpts(PyMongoTestCase):
opts = AutoEncryptionOpts(
{},
"k.d",
kms_tls_options={"kmip": {"tlsCAFile": CA_PEM, "tlsCertificateKeyFile": CLIENT_PEM}},
kms_tls_options=DEFAULT_KMS_TLS,
)
_kms_ssl_contexts = _parse_kms_tls_options(opts._kms_tls_options, _IS_SYNC)
ctx = _kms_ssl_contexts["kmip"]
@ -614,17 +617,10 @@ class TestExplicitSimple(EncryptionIntegrationTest):
# Spec tests
AWS_TEMP_CREDS = {
"accessKeyId": os.environ.get("CSFLE_AWS_TEMP_ACCESS_KEY_ID", ""),
"secretAccessKey": os.environ.get("CSFLE_AWS_TEMP_SECRET_ACCESS_KEY", ""),
"sessionToken": os.environ.get("CSFLE_AWS_TEMP_SESSION_TOKEN", ""),
}
AWS_TEMP_NO_SESSION_CREDS = {
"accessKeyId": os.environ.get("CSFLE_AWS_TEMP_ACCESS_KEY_ID", ""),
"secretAccessKey": os.environ.get("CSFLE_AWS_TEMP_SECRET_ACCESS_KEY", ""),
}
KMS_TLS_OPTS = {"kmip": {"tlsCAFile": CA_PEM, "tlsCertificateKeyFile": CLIENT_PEM}}
class TestSpec(SpecRunner):
@ -661,7 +657,7 @@ class TestSpec(SpecRunner):
self.skipTest("GCP environment credentials are not set")
if "kmip" in kms_providers:
kms_providers["kmip"] = KMIP_CREDS
opts["kms_tls_options"] = KMS_TLS_OPTS
opts["kms_tls_options"] = DEFAULT_KMS_TLS
if "key_vault_namespace" not in opts:
opts["key_vault_namespace"] = "keyvault.datakeys"
if "extra_options" in opts:
@ -755,14 +751,6 @@ if _HAVE_PYMONGOCRYPT:
)
# Prose Tests
ALL_KMS_PROVIDERS = {
"aws": AWS_CREDS,
"azure": AZURE_CREDS,
"gcp": GCP_CREDS,
"kmip": KMIP_CREDS,
"local": {"key": LOCAL_MASTER_KEY},
}
LOCAL_KEY_ID = Binary(base64.b64decode(b"LOCALAAAAAAAAAAAAAAAAA=="), UUID_SUBTYPE)
AWS_KEY_ID = Binary(base64.b64decode(b"AWSAAAAAAAAAAAAAAAAAAA=="), UUID_SUBTYPE)
AZURE_KEY_ID = Binary(base64.b64decode(b"AZUREAAAAAAAAAAAAAAAAA=="), UUID_SUBTYPE)
@ -849,13 +837,17 @@ class TestDataKeyDoubleEncryption(EncryptionIntegrationTest):
self.KMS_PROVIDERS,
"keyvault.datakeys",
schema_map=schemas,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
)
self.client_encrypted = self.rs_or_single_client(
auto_encryption_opts=opts, uuidRepresentation="standard"
)
self.client_encryption = self.create_client_encryption(
self.KMS_PROVIDERS, "keyvault.datakeys", self.client, OPTS, kms_tls_options=KMS_TLS_OPTS
self.KMS_PROVIDERS,
"keyvault.datakeys",
self.client,
OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
)
self.listener.reset()
@ -1062,7 +1054,7 @@ class TestCorpus(EncryptionIntegrationTest):
"keyvault.datakeys",
client_context.client,
OPTS,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
)
corpus = self.fix_up_curpus(json_data("corpus", "corpus.json"))
@ -1154,7 +1146,7 @@ class TestCorpus(EncryptionIntegrationTest):
def test_corpus(self):
opts = AutoEncryptionOpts(
self.kms_providers(), "keyvault.datakeys", kms_tls_options=KMS_TLS_OPTS
self.kms_providers(), "keyvault.datakeys", kms_tls_options=DEFAULT_KMS_TLS
)
self._test_corpus(opts)
@ -1165,7 +1157,7 @@ class TestCorpus(EncryptionIntegrationTest):
self.kms_providers(),
"keyvault.datakeys",
schema_map=schemas,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
)
self._test_corpus(opts)
@ -1296,7 +1288,7 @@ class TestCustomEndpoint(EncryptionIntegrationTest):
key_vault_namespace="keyvault.datakeys",
key_vault_client=client_context.client,
codec_options=OPTS,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
)
kms_providers_invalid = copy.deepcopy(kms_providers)
@ -1308,7 +1300,7 @@ class TestCustomEndpoint(EncryptionIntegrationTest):
key_vault_namespace="keyvault.datakeys",
key_vault_client=client_context.client,
codec_options=OPTS,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
)
self._kmip_host_error = None
self._invalid_host_error = None
@ -2736,7 +2728,7 @@ class TestRewrapWithSeparateClientEncryption(EncryptionIntegrationTest):
key_vault_client=self.client,
key_vault_namespace="keyvault.datakeys",
kms_providers=ALL_KMS_PROVIDERS,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
codec_options=OPTS,
)
@ -2756,7 +2748,7 @@ class TestRewrapWithSeparateClientEncryption(EncryptionIntegrationTest):
key_vault_client=client2,
key_vault_namespace="keyvault.datakeys",
kms_providers=ALL_KMS_PROVIDERS,
kms_tls_options=KMS_TLS_OPTS,
kms_tls_options=DEFAULT_KMS_TLS,
codec_options=OPTS,
)

View File

@ -25,7 +25,7 @@ import warnings
sys.path[0:0] = [""]
from test import unittest
from test.helpers import clear_warning_registry
from test.helpers_shared import clear_warning_registry
from pymongo.common import INTERNAL_URI_OPTION_NAME_MAP, _CaseInsensitiveDictionary, validate
from pymongo.compression_support import _have_snappy

View File

@ -0,0 +1,193 @@
{
"description": "poc-queryable-encryption",
"schemaVersion": "1.23",
"runOnRequirements": [
{
"minServerVersion": "7.0",
"csfle": true,
"topologies": [
"replicaset",
"load-balanced",
"sharded"
]
}
],
"createEntities": [
{
"client": {
"id": "client0",
"autoEncryptOpts": {
"keyVaultNamespace": "keyvault.datakeys",
"kmsProviders": {
"local": {
"key": "Mng0NCt4ZHVUYUJCa1kxNkVyNUR1QURhZ2h2UzR2d2RrZzh0cFBwM3R6NmdWMDFBMUN3YkQ5aXRRMkhGRGdQV09wOGVNYUMxT2k3NjZKelhaQmRCZGJkTXVyZG9uSjFk"
}
}
}
}
},
{
"database": {
"id": "encryptedDB",
"client": "client0",
"databaseName": "poc-queryable-encryption"
}
},
{
"collection": {
"id": "encryptedColl",
"database": "encryptedDB",
"collectionName": "encrypted"
}
},
{
"client": {
"id": "client1"
}
},
{
"database": {
"id": "unencryptedDB",
"client": "client1",
"databaseName": "poc-queryable-encryption"
}
},
{
"collection": {
"id": "unencryptedColl",
"database": "unencryptedDB",
"collectionName": "encrypted"
}
}
],
"initialData": [
{
"databaseName": "keyvault",
"collectionName": "datakeys",
"documents": [
{
"_id": {
"$binary": {
"base64": "EjRWeBI0mHYSNBI0VniQEg==",
"subType": "04"
}
},
"keyMaterial": {
"$binary": {
"base64": "sHe0kz57YW7v8g9VP9sf/+K1ex4JqKc5rf/URX3n3p8XdZ6+15uXPaSayC6adWbNxkFskuMCOifDoTT+rkqMtFkDclOy884RuGGtUysq3X7zkAWYTKi8QAfKkajvVbZl2y23UqgVasdQu3OVBQCrH/xY00nNAs/52e958nVjBuzQkSb1T8pKJAyjZsHJ60+FtnfafDZSTAIBJYn7UWBCwQ==",
"subType": "00"
}
},
"creationDate": {
"$date": {
"$numberLong": "1641024000000"
}
},
"updateDate": {
"$date": {
"$numberLong": "1641024000000"
}
},
"status": 1,
"masterKey": {
"provider": "local"
}
}
]
},
{
"databaseName": "poc-queryable-encryption",
"collectionName": "encrypted",
"documents": [],
"createOptions": {
"encryptedFields": {
"fields": [
{
"keyId": {
"$binary": {
"base64": "EjRWeBI0mHYSNBI0VniQEg==",
"subType": "04"
}
},
"path": "encryptedInt",
"bsonType": "int",
"queries": {
"queryType": "equality",
"contention": {
"$numberLong": "0"
}
}
}
]
}
}
}
],
"tests": [
{
"description": "insert, replace, and find with queryable encryption",
"operations": [
{
"object": "encryptedColl",
"name": "insertOne",
"arguments": {
"document": {
"_id": 1,
"encryptedInt": 11
}
}
},
{
"object": "encryptedColl",
"name": "replaceOne",
"arguments": {
"filter": {
"encryptedInt": 11
},
"replacement": {
"encryptedInt": 22
}
}
},
{
"object": "encryptedColl",
"name": "find",
"arguments": {
"filter": {
"encryptedInt": 22
}
},
"expectResult": [
{
"_id": 1,
"encryptedInt": 22
}
]
},
{
"object": "unencryptedColl",
"name": "find",
"arguments": {
"filter": {}
},
"expectResult": [
{
"_id": 1,
"encryptedInt": {
"$$type": "binData"
},
"__safeContent__": [
{
"$binary": {
"base64": "rhS16TJojgDDBtbluxBokvcotP1mQTGeYpNt8xd3MJQ=",
"subType": "00"
}
}
]
}
]
}
]
}
]
}

View File

@ -35,6 +35,7 @@ from test import (
client_knobs,
unittest,
)
from test.helpers_shared import ALL_KMS_PROVIDERS, DEFAULT_KMS_TLS
from test.unified_format_shared import (
KMS_TLS_OPTS,
PLACEHOLDER_MAP,
@ -60,6 +61,8 @@ from test.utils_spec_runner import SpecRunnerThread
from test.version import Version
from typing import Any, Dict, List, Mapping, Optional
import pytest
import pymongo
from bson import SON, json_util
from bson.codec_options import DEFAULT_CODEC_OPTIONS
@ -68,7 +71,7 @@ from gridfs import GridFSBucket, GridOut, NoFile
from gridfs.errors import CorruptGridFile
from pymongo import ASCENDING, CursorType, MongoClient, _csot
from pymongo.driver_info import DriverInfo
from pymongo.encryption_options import _HAVE_PYMONGOCRYPT
from pymongo.encryption_options import _HAVE_PYMONGOCRYPT, AutoEncryptionOpts
from pymongo.errors import (
AutoReconnect,
BulkWriteError,
@ -258,6 +261,23 @@ class EntityMapUtil:
kwargs: dict = {}
observe_events = spec.get("observeEvents", [])
if "autoEncryptOpts" in spec:
auto_encrypt_opts = spec["autoEncryptOpts"].copy()
auto_encrypt_kwargs: dict = dict(kms_tls_options=DEFAULT_KMS_TLS)
kms_providers = ALL_KMS_PROVIDERS.copy()
key_vault_namespace = auto_encrypt_opts.pop("keyVaultNamespace")
for provider_name, provider_value in auto_encrypt_opts.pop("kmsProviders").items():
kms_providers[provider_name].update(provider_value)
extra_opts = auto_encrypt_opts.pop("extraOptions", {})
for key, value in extra_opts.items():
auto_encrypt_kwargs[camel_to_snake(key)] = value
for key, value in auto_encrypt_opts.items():
auto_encrypt_kwargs[camel_to_snake(key)] = value
auto_encryption_opts = AutoEncryptionOpts(
kms_providers, key_vault_namespace, **auto_encrypt_kwargs
)
kwargs["auto_encryption_opts"] = auto_encryption_opts
# The unified tests use topologyOpeningEvent, we use topologyOpenedEvent
for i in range(len(observe_events)):
if "topologyOpeningEvent" == observe_events[i]:
@ -429,7 +449,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
a class attribute ``TEST_SPEC``.
"""
SCHEMA_VERSION = Version.from_string("1.22")
SCHEMA_VERSION = Version.from_string("1.23")
RUN_ON_LOAD_BALANCER = True
TEST_SPEC: Any
TEST_PATH = "" # This gets filled in by generate_test_classes
@ -461,6 +481,13 @@ class UnifiedSpecTestMixinV1(IntegrationTest):
wc = WriteConcern(w="majority")
else:
wc = WriteConcern(w=1)
# Remove any encryption collections associated with the collection.
collections = db.list_collection_names()
for collection in collections:
if collection in [f"enxcol_.{coll_name}.esc", f"enxcol_.{coll_name}.ecoc"]:
db.drop_collection(collection)
if documents:
if opts:
db.create_collection(coll_name, **opts)
@ -1501,7 +1528,14 @@ def generate_test_classes(
TEST_SPEC = test_spec
EXPECTED_FAILURES = expected_failures
return SpecTestBase
base = SpecTestBase
# Add "encryption" marker if the "csfle" runOnRequirement is set.
for req in test_spec.get("runOnRequirements", []):
if req.get("csfle", False):
base = pytest.mark.encryption(base)
return base
for dirpath, _, filenames in os.walk(test_path):
dirname = os.path.split(dirpath)[-1]

View File

@ -25,9 +25,10 @@ import os
import time
import types
from collections import abc
from test.helpers import (
from test.helpers_shared import (
AWS_CREDS,
AWS_CREDS_2,
AWS_TEMP_CREDS,
AZURE_CREDS,
CA_PEM,
CLIENT_PEM,
@ -118,10 +119,22 @@ for provider_name, provider_data in [
("kmip", KMIP_CREDS),
("kmip:name1", KMIP_CREDS),
]:
# Use the temp aws creds for autoEncryptOpts.
if provider_name == "aws":
for key, value in AWS_TEMP_CREDS.items():
placeholder = f"/autoEncryptOpts/kmsProviders/{provider_name}/{key}"
PLACEHOLDER_MAP[placeholder] = value
for key, value in provider_data.items():
placeholder = f"/clientEncryptionOpts/kmsProviders/{provider_name}/{key}"
PLACEHOLDER_MAP[placeholder] = value
if provider_name == "aws":
continue
placeholder = f"/autoEncryptOpts/kmsProviders/{provider_name}/{key}"
PLACEHOLDER_MAP[placeholder] = value
OIDC_ENV = os.environ.get("OIDC_ENV", "test")
if OIDC_ENV == "test":
PLACEHOLDER_MAP["/uriOptions/authMechanismProperties"] = {"ENVIRONMENT": "test"}