Merge branch 'master' of github.com:mongodb/mongo-python-driver

This commit is contained in:
Steven Silvester 2024-06-24 11:33:50 -05:00
commit 5e3ebf1c90
No known key found for this signature in database
GPG Key ID: B1BF5EC3A8B32F91
20 changed files with 478 additions and 242 deletions

View File

@ -962,7 +962,7 @@ task_groups:
- ${DRIVERS_TOOLS}/.evergreen/csfle/azurekms/delete-vm.sh
- func: "upload test results"
setup_group_can_fail_task: true
teardown_group_can_fail_task: true
teardown_task_can_fail_task: true
setup_group_timeout_secs: 1800
tasks:
- testazurekms-task
@ -2220,9 +2220,9 @@ axes:
display_name: "RHEL 8.x"
run_on: rhel87-small
batchtime: 10080 # 7 days
- id: rhel80-fips
display_name: "RHEL 8.0 FIPS"
run_on: rhel80-fips
- id: rhel92-fips
display_name: "RHEL 9.2 FIPS"
run_on: rhel92-fips
batchtime: 10080 # 7 days
- id: ubuntu-22.04
display_name: "Ubuntu 22.04"
@ -2596,7 +2596,7 @@ buildvariants:
- matrix_name: "tests-fips"
matrix_spec:
platform:
- rhel80-fips
- rhel92-fips
auth: "auth"
ssl: "ssl"
display_name: "${platform} ${auth} ${ssl}"

View File

@ -5,6 +5,11 @@ on:
branches: [ "master", "v*"]
tags: ['*']
pull_request:
workflow_call:
inputs:
ref:
required: true
type: string
schedule:
- cron: '17 10 * * 2'
@ -35,6 +40,8 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
ref: ${{ inputs.ref }}
- uses: actions/setup-python@v3
# Initializes the CodeQL tools for scanning.

140
.github/workflows/dist.yml vendored Normal file
View File

@ -0,0 +1,140 @@
name: Python Dist
on:
push:
tags:
- "[0-9]+.[0-9]+.[0-9]+"
- "[0-9]+.[0-9]+.[0-9]+.post[0-9]+"
- "[0-9]+.[0-9]+.[0-9]+[a-b][0-9]+"
- "[0-9]+.[0-9]+.[0-9]+rc[0-9]+"
workflow_dispatch:
pull_request:
workflow_call:
concurrency:
group: dist-${{ github.ref }}
cancel-in-progress: true
defaults:
run:
shell: bash -eux {0}
jobs:
build_wheels:
name: Build wheels for ${{ matrix.buildplat[1] }}
runs-on: ${{ matrix.buildplat[0] }}
strategy:
# Ensure that a wheel builder finishes even if another fails
fail-fast: false
matrix:
# Github Actions doesn't support pairing matrix values together, let's improvise
# https://github.com/github/feedback/discussions/7835#discussioncomment-1769026
buildplat:
- [ubuntu-20.04, "manylinux_x86_64", "cp3*-manylinux_x86_64"]
- [ubuntu-20.04, "manylinux_aarch64", "cp3*-manylinux_aarch64"]
- [ubuntu-20.04, "manylinux_ppc64le", "cp3*-manylinux_ppc64le"]
- [ubuntu-20.04, "manylinux_s390x", "cp3*-manylinux_s390x"]
- [ubuntu-20.04, "manylinux_i686", "cp3*-manylinux_i686"]
- [windows-2019, "win_amd6", "cp3*-win_amd64"]
- [windows-2019, "win32", "cp3*-win32"]
- [macos-14, "macos", "cp*-macosx_*"]
steps:
- name: Checkout pymongo
uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: actions/setup-python@v5
with:
cache: 'pip'
python-version: 3.8
cache-dependency-path: 'pyproject.toml'
allow-prereleases: true
- name: Set up QEMU
if: runner.os == 'Linux'
uses: docker/setup-qemu-action@v3
with:
platforms: all
- name: Install cibuildwheel
# Note: the default manylinux is manylinux2014
run: |
python -m pip install -U pip
python -m pip install "cibuildwheel>=2.17,<3"
- name: Build wheels
env:
CIBW_BUILD: ${{ matrix.buildplat[2] }}
run: python -m cibuildwheel --output-dir wheelhouse
- name: Build manylinux1 wheels
if: ${{ matrix.buildplat[1] == 'manylinux_x86_64' || matrix.buildplat[1] == 'manylinux_i686' }}
env:
CIBW_MANYLINUX_X86_64_IMAGE: manylinux1
CIBW_MANYLINUX_I686_IMAGE: manylinux1
CIBW_BUILD: "cp38-${{ matrix.buildplat[1] }} cp39-${{ matrix.buildplat[1] }}"
run: python -m cibuildwheel --output-dir wheelhouse
- name: Assert all versions in wheelhouse
if: ${{ ! startsWith(matrix.buildplat[1], 'macos') }}
run: |
ls wheelhouse/*cp38*.whl
ls wheelhouse/*cp39*.whl
ls wheelhouse/*cp310*.whl
ls wheelhouse/*cp311*.whl
ls wheelhouse/*cp312*.whl
- uses: actions/upload-artifact@v4
with:
name: wheel-${{ matrix.buildplat[1] }}
path: ./wheelhouse/*.whl
if-no-files-found: error
make_sdist:
name: Make SDist
runs-on: macos-13
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: actions/setup-python@v5
with:
# Build sdist on lowest supported Python
python-version: '3.8'
- name: Build SDist
run: |
set -ex
python -m pip install -U pip build
python -m build --sdist .
- name: Test SDist
run: |
python -m pip install dist/*.gz
cd ..
python -c "from pymongo import has_c; assert has_c()"
- uses: actions/upload-artifact@v4
with:
name: "sdist"
path: ./dist/*.tar.gz
collect_dist:
runs-on: ubuntu-latest
needs: [build_wheels, make_sdist]
name: Download Wheels
steps:
- name: Download all workflow run artifacts
uses: actions/download-artifact@v4
- name: Flatten directory
working-directory: .
run: |
find . -mindepth 2 -type f -exec mv {} . \;
find . -type d -empty -delete
- uses: actions/upload-artifact@v4
with:
name: all-dist-${{ github.run_id }}
path: "./*"

View File

@ -1,156 +1,86 @@
name: Python Wheels
name: Release
on:
push:
tags:
- "[0-9]+.[0-9]+.[0-9]+"
- "[0-9]+.[0-9]+.[0-9]+.post[0-9]+"
- "[0-9]+.[0-9]+.[0-9]+[a-b][0-9]+"
- "[0-9]+.[0-9]+.[0-9]+rc[0-9]+"
workflow_dispatch:
pull_request:
inputs:
version:
description: "The new version to set"
required: true
following_version:
description: "The post (dev) version to set"
required: true
dry_run:
description: "Dry Run?"
default: false
type: boolean
concurrency:
group: wheels-${{ github.ref }}
cancel-in-progress: true
env:
# Changes per repo
PRODUCT_NAME: PyMongo
# Changes per branch
SILK_ASSET_GROUP: mongodb-python-driver
defaults:
run:
shell: bash -eux {0}
jobs:
build_wheels:
name: Build wheels for ${{ matrix.buildplat[1] }}
runs-on: ${{ matrix.buildplat[0] }}
strategy:
# Ensure that a wheel builder finishes even if another fails
fail-fast: false
matrix:
# Github Actions doesn't support pairing matrix values together, let's improvise
# https://github.com/github/feedback/discussions/7835#discussioncomment-1769026
buildplat:
- [ubuntu-20.04, "manylinux_x86_64", "cp3*-manylinux_x86_64"]
- [ubuntu-20.04, "manylinux_aarch64", "cp3*-manylinux_aarch64"]
- [ubuntu-20.04, "manylinux_ppc64le", "cp3*-manylinux_ppc64le"]
- [ubuntu-20.04, "manylinux_s390x", "cp3*-manylinux_s390x"]
- [ubuntu-20.04, "manylinux_i686", "cp3*-manylinux_i686"]
- [windows-2019, "win_amd6", "cp3*-win_amd64"]
- [windows-2019, "win32", "cp3*-win32"]
- [macos-14, "macos", "cp*-macosx_*"]
steps:
- name: Checkout pymongo
uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: actions/setup-python@v5
with:
cache: 'pip'
python-version: 3.8
cache-dependency-path: 'pyproject.toml'
allow-prereleases: true
- name: Set up QEMU
if: runner.os == 'Linux'
uses: docker/setup-qemu-action@v3
with:
platforms: all
- name: Install cibuildwheel
# Note: the default manylinux is manylinux2014
run: |
python -m pip install -U pip
python -m pip install "cibuildwheel>=2.17,<3"
- name: Build wheels
env:
CIBW_BUILD: ${{ matrix.buildplat[2] }}
run: python -m cibuildwheel --output-dir wheelhouse
- name: Build manylinux1 wheels
if: ${{ matrix.buildplat[1] == 'manylinux_x86_64' || matrix.buildplat[1] == 'manylinux_i686' }}
env:
CIBW_MANYLINUX_X86_64_IMAGE: manylinux1
CIBW_MANYLINUX_I686_IMAGE: manylinux1
CIBW_BUILD: "cp38-${{ matrix.buildplat[1] }} cp39-${{ matrix.buildplat[1] }}"
run: python -m cibuildwheel --output-dir wheelhouse
- name: Assert all versions in wheelhouse
if: ${{ ! startsWith(matrix.buildplat[1], 'macos') }}
run: |
ls wheelhouse/*cp38*.whl
ls wheelhouse/*cp39*.whl
ls wheelhouse/*cp310*.whl
ls wheelhouse/*cp311*.whl
ls wheelhouse/*cp312*.whl
- uses: actions/upload-artifact@v4
with:
name: wheel-${{ matrix.buildplat[1] }}
path: ./wheelhouse/*.whl
if-no-files-found: error
make_sdist:
name: Make SDist
runs-on: macos-13
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: actions/setup-python@v5
with:
# Build sdist on lowest supported Python
python-version: '3.8'
- name: Build SDist
run: |
set -ex
python -m pip install -U pip build
python -m build --sdist .
- name: Test SDist
run: |
python -m pip install dist/*.gz
cd ..
python -c "from pymongo import has_c; assert has_c()"
- uses: actions/upload-artifact@v4
with:
name: "sdist"
path: ./dist/*.tar.gz
collect_dist:
pre-publish:
environment: release
runs-on: ubuntu-latest
needs: [build_wheels, make_sdist]
name: Download Wheels
permissions:
id-token: write
contents: write
steps:
- name: Download all workflow run artifacts
uses: actions/download-artifact@v4
- name: Flatten directory
working-directory: .
run: |
find . -mindepth 2 -type f -exec mv {} . \;
find . -type d -empty -delete
- uses: actions/upload-artifact@v4
- uses: mongodb-labs/drivers-github-tools/secure-checkout@v2
with:
name: all-dist-${{ github.run_id }}
path: "./*"
app_id: ${{ vars.APP_ID }}
private_key: ${{ secrets.APP_PRIVATE_KEY }}
- uses: mongodb-labs/drivers-github-tools/setup@v2
with:
aws_role_arn: ${{ secrets.AWS_ROLE_ARN }}
aws_region_name: ${{ vars.AWS_REGION_NAME }}
aws_secret_id: ${{ secrets.AWS_SECRET_ID }}
artifactory_username: ${{ vars.ARTIFACTORY_USERNAME }}
- uses: mongodb-labs/drivers-github-tools/python/pre-publish@v2
with:
version: ${{ inputs.version }}
dry_run: ${{ inputs.dry_run }}
build-dist:
needs: [pre-publish]
uses: ./.github/workflows/dist.yml
static-scan:
needs: [pre-publish]
uses: ./.github/workflows/codeql.yml
with:
ref: ${{ inputs.version }}
publish:
# https://packaging.python.org/en/latest/guides/publishing-package-distribution-releases-using-github-actions-ci-cd-workflows/#publishing-the-distribution-to-pypi
needs: [collect_dist]
if: startsWith(github.ref, 'refs/tags/')
needs: [build-dist, static-scan]
runs-on: ubuntu-latest
environment: release
permissions:
id-token: write
contents: write
security-events: write
steps:
- name: Download all the dists
uses: actions/download-artifact@v4
with:
name: all-dist-${{ github.run_id }}
path: dist/
- name: Publish distribution 📦 to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
- uses: mongodb-labs/drivers-github-tools/secure-checkout@v2
with:
app_id: ${{ vars.APP_ID }}
private_key: ${{ secrets.APP_PRIVATE_KEY }}
- uses: mongodb-labs/drivers-github-tools/setup@v2
with:
aws_role_arn: ${{ secrets.AWS_ROLE_ARN }}
aws_region_name: ${{ vars.AWS_REGION_NAME }}
aws_secret_id: ${{ secrets.AWS_SECRET_ID }}
artifactory_username: ${{ vars.ARTIFACTORY_USERNAME }}
- uses: mongodb-labs/drivers-github-tools/python/publish@v2
with:
version: ${{ inputs.version }}
following_version: ${{ inputs.following_version }}
product_name: ${{ env.PRODUCT_NAME }}
silk_asset_group: ${{ env.SILK_ASSET_GROUP }}
token: ${{ github.token }}
dry_run: ${{ inputs.dry_run }}

View File

@ -531,6 +531,12 @@ class ClientSession:
self._client._return_server_session(self._server_session)
self._server_session = None
def _end_implicit_session(self) -> None:
# Implicit sessions can't be part of transactions or pinned connections
if self._server_session is not None:
self._client._return_server_session(self._server_session)
self._server_session = None
def _check_ended(self) -> None:
if self._server_session is None:
raise InvalidOperation("Cannot use ended session")

View File

@ -81,8 +81,8 @@ class AsyncCommandCursor(Generic[_DocumentType]):
self._explicit_session = explicit_session
self._killed = self._id == 0
self._comment = comment
if _IS_SYNC and self._killed:
self._end_session(True) # type: ignore[unused-coroutine]
if self._killed:
self._end_session()
if "ns" in cursor_info: # noqa: SIM401
self._ns = cursor_info["ns"]
@ -95,8 +95,7 @@ class AsyncCommandCursor(Generic[_DocumentType]):
raise TypeError("max_await_time_ms must be an integer or None")
def __del__(self) -> None:
if _IS_SYNC:
self._die(False) # type: ignore[unused-coroutine]
self._die_no_lock()
def batch_size(self, batch_size: int) -> AsyncCommandCursor[_DocumentType]:
"""Limits the number of documents returned in one batch. Each batch
@ -198,8 +197,7 @@ class AsyncCommandCursor(Generic[_DocumentType]):
return self._session
return None
async def _die(self, synchronous: bool = False) -> None:
"""Closes this cursor."""
def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]:
already_killed = self._killed
self._killed = True
if self._id and not already_killed:
@ -210,8 +208,22 @@ class AsyncCommandCursor(Generic[_DocumentType]):
# Skip killCursors.
cursor_id = 0
address = None
await self._collection.database.client._cleanup_cursor(
synchronous,
return cursor_id, address
def _die_no_lock(self) -> None:
"""Closes this cursor without acquiring a lock."""
cursor_id, address = self._prepare_to_die()
self._collection.database.client._cleanup_cursor_no_lock(
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
)
if not self._explicit_session:
self._session = None
self._sock_mgr = None
async def _die_lock(self) -> None:
"""Closes this cursor."""
cursor_id, address = self._prepare_to_die()
await self._collection.database.client._cleanup_cursor_lock(
cursor_id,
address,
self._sock_mgr,
@ -222,14 +234,14 @@ class AsyncCommandCursor(Generic[_DocumentType]):
self._session = None
self._sock_mgr = None
async def _end_session(self, synchronous: bool) -> None:
def _end_session(self) -> None:
if self._session and not self._explicit_session:
await self._session._end_session(lock=synchronous)
self._session._end_implicit_session()
self._session = None
async def close(self) -> None:
"""Explicitly close / kill this cursor."""
await self._die(True)
await self._die_lock()
async def _send_message(self, operation: _GetMore) -> None:
"""Send a getmore message and handle the response."""
@ -243,7 +255,7 @@ class AsyncCommandCursor(Generic[_DocumentType]):
# Don't send killCursors because the cursor is already closed.
self._killed = True
if exc.timeout:
await self._die(False)
self._die_no_lock()
else:
# Return the session and pinned connection, if necessary.
await self.close()
@ -305,7 +317,7 @@ class AsyncCommandCursor(Generic[_DocumentType]):
)
)
else: # Cursor id is zero nothing else to return
await self._die(True)
await self._die_lock()
return len(self._data)

View File

@ -259,8 +259,7 @@ class AsyncCursor(Generic[_DocumentType]):
return self._retrieved
def __del__(self) -> None:
if _IS_SYNC:
self._die() # type: ignore[unused-coroutine]
self._die_no_lock()
def clone(self) -> AsyncCursor[_DocumentType]:
"""Get a clone of this cursor.
@ -996,14 +995,7 @@ class AsyncCursor(Generic[_DocumentType]):
y[key] = value
return y
async def _die(self, synchronous: bool = False) -> None:
"""Closes this cursor."""
try:
already_killed = self._killed
except AttributeError:
# ___init__ did not run to completion (or at all).
return
def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]:
self._killed = True
if self._id and not already_killed:
cursor_id = self._id
@ -1013,8 +1005,34 @@ class AsyncCursor(Generic[_DocumentType]):
# Skip killCursors.
cursor_id = 0
address = None
await self._collection.database.client._cleanup_cursor(
synchronous,
return cursor_id, address
def _die_no_lock(self) -> None:
"""Closes this cursor without acquiring a lock."""
try:
already_killed = self._killed
except AttributeError:
# ___init__ did not run to completion (or at all).
return
cursor_id, address = self._prepare_to_die(already_killed)
self._collection.database.client._cleanup_cursor_no_lock(
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
)
if not self._explicit_session:
self._session = None
self._sock_mgr = None
async def _die_lock(self) -> None:
"""Closes this cursor."""
try:
already_killed = self._killed
except AttributeError:
# ___init__ did not run to completion (or at all).
return
cursor_id, address = self._prepare_to_die(already_killed)
await self._collection.database.client._cleanup_cursor_lock(
cursor_id,
address,
self._sock_mgr,
@ -1027,7 +1045,7 @@ class AsyncCursor(Generic[_DocumentType]):
async def close(self) -> None:
"""Explicitly close / kill this cursor."""
await self._die(True)
await self._die_lock()
async def distinct(self, key: str) -> list:
"""Get a list of distinct values for `key` among all documents
@ -1080,7 +1098,7 @@ class AsyncCursor(Generic[_DocumentType]):
# Don't send killCursors because the cursor is already closed.
self._killed = True
if exc.timeout:
await self._die(False)
self._die_no_lock()
else:
await self.close()
# If this is a tailable cursor the error is likely

View File

@ -1857,48 +1857,63 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
async with self._tmp_session(session) as s:
return await self._retry_with_session(retryable, func, s, bulk, operation, operation_id)
async def _cleanup_cursor(
def _cleanup_cursor_no_lock(
self,
locks_allowed: bool,
cursor_id: int,
address: Optional[_CursorAddress],
conn_mgr: _ConnectionManager,
session: Optional[ClientSession],
explicit_session: bool,
) -> None:
"""Cleanup a cursor from cursor.close() or __del__.
"""Cleanup a cursor from __del__ without locking.
This method handles cleanup for Cursors/CommandCursors including any
pinned connection attached at the time the cursor
was garbage collected.
:param cursor_id: The cursor id which may be 0.
:param address: The _CursorAddress.
:param conn_mgr: The _ConnectionManager for the pinned connection or None.
"""
# The cursor will be closed later in a different session.
if cursor_id or conn_mgr:
self._close_cursor_soon(cursor_id, address, conn_mgr)
if session and not explicit_session:
session._end_implicit_session()
async def _cleanup_cursor_lock(
self,
cursor_id: int,
address: Optional[_CursorAddress],
conn_mgr: _ConnectionManager,
session: Optional[ClientSession],
explicit_session: bool,
) -> None:
"""Cleanup a cursor from cursor.close() using a lock.
This method handles cleanup for Cursors/CommandCursors including any
pinned connection or implicit session attached at the time the cursor
was closed or garbage collected.
:param locks_allowed: True if we are allowed to acquire locks.
:param cursor_id: The cursor id which may be 0.
:param address: The _CursorAddress.
:param conn_mgr: The _ConnectionManager for the pinned connection or None.
:param session: The cursor's session.
:param explicit_session: True if the session was passed explicitly.
"""
if locks_allowed:
if cursor_id:
if conn_mgr and conn_mgr.more_to_come:
# If this is an exhaust cursor and we haven't completely
# exhausted the result set we *must* close the socket
# to stop the server from sending more data.
assert conn_mgr.conn is not None
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
else:
await self._close_cursor_now(
cursor_id, address, session=session, conn_mgr=conn_mgr
)
if conn_mgr:
await conn_mgr.close()
else:
# The cursor will be closed later in a different session.
if cursor_id or conn_mgr:
self._close_cursor_soon(cursor_id, address, conn_mgr)
if cursor_id:
if conn_mgr and conn_mgr.more_to_come:
# If this is an exhaust cursor and we haven't completely
# exhausted the result set we *must* close the socket
# to stop the server from sending more data.
assert conn_mgr.conn is not None
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
else:
await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr)
if conn_mgr:
await conn_mgr.close()
if session and not explicit_session:
await session._end_session(lock=locks_allowed)
session._end_implicit_session()
async def _close_cursor_now(
self,
@ -1978,7 +1993,7 @@ class AsyncMongoClient(common.BaseObject, Generic[_DocumentType]):
for address, cursor_id, conn_mgr in pinned_cursors:
try:
await self._cleanup_cursor(True, cursor_id, address, conn_mgr, None, False)
await self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False)
except Exception as exc:
if isinstance(exc, InvalidOperation) and self._topology._closed:
# Raise the exception when client is closed so that it

View File

@ -531,6 +531,12 @@ class ClientSession:
self._client._return_server_session(self._server_session)
self._server_session = None
def _end_implicit_session(self) -> None:
# Implicit sessions can't be part of transactions or pinned connections
if self._server_session is not None:
self._client._return_server_session(self._server_session)
self._server_session = None
def _check_ended(self) -> None:
if self._server_session is None:
raise InvalidOperation("Cannot use ended session")

View File

@ -81,8 +81,8 @@ class CommandCursor(Generic[_DocumentType]):
self._explicit_session = explicit_session
self._killed = self._id == 0
self._comment = comment
if _IS_SYNC and self._killed:
self._end_session(True) # type: ignore[unused-coroutine]
if self._killed:
self._end_session()
if "ns" in cursor_info: # noqa: SIM401
self._ns = cursor_info["ns"]
@ -95,8 +95,7 @@ class CommandCursor(Generic[_DocumentType]):
raise TypeError("max_await_time_ms must be an integer or None")
def __del__(self) -> None:
if _IS_SYNC:
self._die(False) # type: ignore[unused-coroutine]
self._die_no_lock()
def batch_size(self, batch_size: int) -> CommandCursor[_DocumentType]:
"""Limits the number of documents returned in one batch. Each batch
@ -198,8 +197,7 @@ class CommandCursor(Generic[_DocumentType]):
return self._session
return None
def _die(self, synchronous: bool = False) -> None:
"""Closes this cursor."""
def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]:
already_killed = self._killed
self._killed = True
if self._id and not already_killed:
@ -210,8 +208,22 @@ class CommandCursor(Generic[_DocumentType]):
# Skip killCursors.
cursor_id = 0
address = None
self._collection.database.client._cleanup_cursor(
synchronous,
return cursor_id, address
def _die_no_lock(self) -> None:
"""Closes this cursor without acquiring a lock."""
cursor_id, address = self._prepare_to_die()
self._collection.database.client._cleanup_cursor_no_lock(
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
)
if not self._explicit_session:
self._session = None
self._sock_mgr = None
def _die_lock(self) -> None:
"""Closes this cursor."""
cursor_id, address = self._prepare_to_die()
self._collection.database.client._cleanup_cursor_lock(
cursor_id,
address,
self._sock_mgr,
@ -222,14 +234,14 @@ class CommandCursor(Generic[_DocumentType]):
self._session = None
self._sock_mgr = None
def _end_session(self, synchronous: bool) -> None:
def _end_session(self) -> None:
if self._session and not self._explicit_session:
self._session._end_session(lock=synchronous)
self._session._end_implicit_session()
self._session = None
def close(self) -> None:
"""Explicitly close / kill this cursor."""
self._die(True)
self._die_lock()
def _send_message(self, operation: _GetMore) -> None:
"""Send a getmore message and handle the response."""
@ -243,7 +255,7 @@ class CommandCursor(Generic[_DocumentType]):
# Don't send killCursors because the cursor is already closed.
self._killed = True
if exc.timeout:
self._die(False)
self._die_no_lock()
else:
# Return the session and pinned connection, if necessary.
self.close()
@ -305,7 +317,7 @@ class CommandCursor(Generic[_DocumentType]):
)
)
else: # Cursor id is zero nothing else to return
self._die(True)
self._die_lock()
return len(self._data)

View File

@ -259,8 +259,7 @@ class Cursor(Generic[_DocumentType]):
return self._retrieved
def __del__(self) -> None:
if _IS_SYNC:
self._die() # type: ignore[unused-coroutine]
self._die_no_lock()
def clone(self) -> Cursor[_DocumentType]:
"""Get a clone of this cursor.
@ -994,14 +993,7 @@ class Cursor(Generic[_DocumentType]):
y[key] = value
return y
def _die(self, synchronous: bool = False) -> None:
"""Closes this cursor."""
try:
already_killed = self._killed
except AttributeError:
# ___init__ did not run to completion (or at all).
return
def _prepare_to_die(self, already_killed: bool) -> tuple[int, Optional[_CursorAddress]]:
self._killed = True
if self._id and not already_killed:
cursor_id = self._id
@ -1011,8 +1003,34 @@ class Cursor(Generic[_DocumentType]):
# Skip killCursors.
cursor_id = 0
address = None
self._collection.database.client._cleanup_cursor(
synchronous,
return cursor_id, address
def _die_no_lock(self) -> None:
"""Closes this cursor without acquiring a lock."""
try:
already_killed = self._killed
except AttributeError:
# ___init__ did not run to completion (or at all).
return
cursor_id, address = self._prepare_to_die(already_killed)
self._collection.database.client._cleanup_cursor_no_lock(
cursor_id, address, self._sock_mgr, self._session, self._explicit_session
)
if not self._explicit_session:
self._session = None
self._sock_mgr = None
def _die_lock(self) -> None:
"""Closes this cursor."""
try:
already_killed = self._killed
except AttributeError:
# ___init__ did not run to completion (or at all).
return
cursor_id, address = self._prepare_to_die(already_killed)
self._collection.database.client._cleanup_cursor_lock(
cursor_id,
address,
self._sock_mgr,
@ -1025,7 +1043,7 @@ class Cursor(Generic[_DocumentType]):
def close(self) -> None:
"""Explicitly close / kill this cursor."""
self._die(True)
self._die_lock()
def distinct(self, key: str) -> list:
"""Get a list of distinct values for `key` among all documents
@ -1078,7 +1096,7 @@ class Cursor(Generic[_DocumentType]):
# Don't send killCursors because the cursor is already closed.
self._killed = True
if exc.timeout:
self._die(False)
self._die_no_lock()
else:
self.close()
# If this is a tailable cursor the error is likely

View File

@ -1854,46 +1854,63 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
with self._tmp_session(session) as s:
return self._retry_with_session(retryable, func, s, bulk, operation, operation_id)
def _cleanup_cursor(
def _cleanup_cursor_no_lock(
self,
locks_allowed: bool,
cursor_id: int,
address: Optional[_CursorAddress],
conn_mgr: _ConnectionManager,
session: Optional[ClientSession],
explicit_session: bool,
) -> None:
"""Cleanup a cursor from cursor.close() or __del__.
"""Cleanup a cursor from __del__ without locking.
This method handles cleanup for Cursors/CommandCursors including any
pinned connection attached at the time the cursor
was garbage collected.
:param cursor_id: The cursor id which may be 0.
:param address: The _CursorAddress.
:param conn_mgr: The _ConnectionManager for the pinned connection or None.
"""
# The cursor will be closed later in a different session.
if cursor_id or conn_mgr:
self._close_cursor_soon(cursor_id, address, conn_mgr)
if session and not explicit_session:
session._end_implicit_session()
def _cleanup_cursor_lock(
self,
cursor_id: int,
address: Optional[_CursorAddress],
conn_mgr: _ConnectionManager,
session: Optional[ClientSession],
explicit_session: bool,
) -> None:
"""Cleanup a cursor from cursor.close() using a lock.
This method handles cleanup for Cursors/CommandCursors including any
pinned connection or implicit session attached at the time the cursor
was closed or garbage collected.
:param locks_allowed: True if we are allowed to acquire locks.
:param cursor_id: The cursor id which may be 0.
:param address: The _CursorAddress.
:param conn_mgr: The _ConnectionManager for the pinned connection or None.
:param session: The cursor's session.
:param explicit_session: True if the session was passed explicitly.
"""
if locks_allowed:
if cursor_id:
if conn_mgr and conn_mgr.more_to_come:
# If this is an exhaust cursor and we haven't completely
# exhausted the result set we *must* close the socket
# to stop the server from sending more data.
assert conn_mgr.conn is not None
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
else:
self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr)
if conn_mgr:
conn_mgr.close()
else:
# The cursor will be closed later in a different session.
if cursor_id or conn_mgr:
self._close_cursor_soon(cursor_id, address, conn_mgr)
if cursor_id:
if conn_mgr and conn_mgr.more_to_come:
# If this is an exhaust cursor and we haven't completely
# exhausted the result set we *must* close the socket
# to stop the server from sending more data.
assert conn_mgr.conn is not None
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
else:
self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr)
if conn_mgr:
conn_mgr.close()
if session and not explicit_session:
session._end_session(lock=locks_allowed)
session._end_implicit_session()
def _close_cursor_now(
self,
@ -1973,7 +1990,7 @@ class MongoClient(common.BaseObject, Generic[_DocumentType]):
for address, cursor_id, conn_mgr in pinned_cursors:
try:
self._cleanup_cursor(True, cursor_id, address, conn_mgr, None, False)
self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False)
except Exception as exc:
if isinstance(exc, InvalidOperation) and self._topology._closed:
# Raise the exception when client is closed so that it

View File

@ -50,6 +50,7 @@ Tracker = "https://jira.mongodb.org/projects/PYTHON/issues"
[tool.hatch.version]
path = "pymongo/_version.py"
validate-bump = false
[tool.hatch.build.targets.wheel]
packages = ["bson","gridfs", "pymongo"]

View File

@ -277,6 +277,7 @@ class ClientContext:
self.is_data_lake = False
self.load_balancer = TEST_LOADBALANCER
self.serverless = TEST_SERVERLESS
self._fips_enabled = None
if self.load_balancer or self.serverless:
self.default_client_options["loadBalanced"] = True
if COMPRESSORS:
@ -523,6 +524,17 @@ class ClientContext:
# Raised if self.server_status is None.
return None
@property
def fips_enabled(self):
if self._fips_enabled is not None:
return self._fips_enabled
try:
subprocess.check_call(["fips-mode-setup", "--is-enabled"])
self._fips_enabled = True
except (subprocess.SubprocessError, FileNotFoundError):
self._fips_enabled = False
return self._fips_enabled
def check_auth_type(self, auth_type):
auth_mechs = self.server_parameters.get("authenticationMechanisms", [])
return auth_type in auth_mechs
@ -670,6 +682,12 @@ class ClientContext:
lambda: self.auth_enabled, "Authentication is not enabled on the server", func=func
)
def require_no_fips(self, func):
"""Run a test only if the host does not have FIPS enabled."""
return self._require(
lambda: not self.fips_enabled, "Test cannot run on a FIPS-enabled host", func=func
)
def require_no_auth(self, func):
"""Run a test only if the server is running without auth enabled."""
return self._require(

View File

@ -107,7 +107,9 @@
}
],
"auth": null,
"options": null
"options": {
"authMechanism": "MONGODB-OIDC"
}
}
]
}

View File

@ -344,6 +344,7 @@ class TestSCRAMSHA1(IntegrationTest):
client_context.drop_user("pymongo_test", "user")
super().tearDown()
@client_context.require_no_fips
def test_scram_sha1(self):
host, port = client_context.host, client_context.port
@ -405,6 +406,7 @@ class TestSCRAM(IntegrationTest):
else:
self.assertEqual(started, ["saslStart", "saslContinue", "saslContinue"])
@client_context.require_no_fips
def test_scram(self):
# Step 1: create users
client_context.create_user(

View File

@ -23,6 +23,7 @@ import mmap
import os
import pickle
import re
import struct
import sys
import tempfile
import uuid
@ -489,6 +490,33 @@ class TestBSON(unittest.TestCase):
b"\x00",
)
def test_bad_code(self):
# Assert that decoding invalid Code with scope does not include a field name.
def generate_payload(length: int) -> bytes:
string_size = length - 0x1E
return bytes.fromhex(
struct.pack("<I", length).hex() # payload size
+ "0f" # type "code with scope"
+ "3100" # key (cstring)
+ "0a000000" # c_w_s_size
+ "04000000" # code_size
+ "41004200" # code (cstring)
+ "feffffff" # scope_size
+ "02" # type "string"
+ "3200" # key (cstring)
+ struct.pack("<I", string_size).hex() # string size
+ "00" * string_size # value (cstring)
# next bytes is a field name for type \x00
# type \x00 is invalid so bson throws an exception
)
for i in range(100):
payload = generate_payload(0x54F + i)
with self.assertRaisesRegex(InvalidBSON, "invalid") as ctx:
bson.decode(payload)
self.assertNotIn("fieldname", str(ctx.exception))
def test_unknown_type(self):
# Repr value differs with major python version
part = "type {!r} for fieldname 'foo'".format(b"\x14")

View File

@ -1021,6 +1021,7 @@ class TestClient(IntegrationTest):
MongoClient("http://localhost")
@client_context.require_auth
@client_context.require_no_fips
def test_auth_from_uri(self):
host, port = client_context.host, client_context.port
client_context.create_user("admin", "admin", "pass")
@ -1077,6 +1078,7 @@ class TestClient(IntegrationTest):
rs_or_single_client_noauth(username="ad min", password="foo").server_info()
@client_context.require_auth
@client_context.require_no_fips
def test_lazy_auth_raises_operation_failure(self):
lazy_client = rs_or_single_client_noauth(
f"mongodb://user:wrong@{client_context.host}/pymongo_test", connect=False

View File

@ -400,6 +400,7 @@ class TestCMAP(IntegrationTest):
failed_event = listener.events[3]
self.assertEqual(failed_event.reason, ConnectionCheckOutFailedReason.CONN_ERROR)
@client_context.require_no_fips
def test_5_check_out_fails_auth_error(self):
listener = CMAPListener()
client = single_client_noauth(

View File

@ -432,6 +432,7 @@ class TestDatabase(IntegrationTest):
def test_cursor_command_invalid(self):
self.assertRaises(InvalidOperation, self.db.cursor_command, "usersInfo", "test")
@client_context.require_no_fips
def test_password_digest(self):
self.assertRaises(TypeError, auth._password_digest, 5)
self.assertRaises(TypeError, auth._password_digest, True)