diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py new file mode 100644 index 000000000..b96a1750c --- /dev/null +++ b/pymongo/change_stream.py @@ -0,0 +1,22 @@ +# Copyright 2024-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Re-import of synchronous ChangeStream API for compatibility.""" +from __future__ import annotations + +from pymongo.synchronous.change_stream import * # noqa: F403 +from pymongo.synchronous.change_stream import __doc__ as original_doc + +__doc__ = original_doc +__all__ = ["ChangeStream", "ClusterChangeStream", "CollectionChangeStream", "DatabaseChangeStream"] # noqa: F405 diff --git a/pymongo/synchronous/change_stream.py b/pymongo/synchronous/change_stream.py index dc2f6bf2c..a971ad08c 100644 --- a/pymongo/synchronous/change_stream.py +++ b/pymongo/synchronous/change_stream.py @@ -22,13 +22,7 @@ from bson import CodecOptions, _bson_to_dict from bson.raw_bson import RawBSONDocument from bson.timestamp import Timestamp from pymongo import _csot, common -from pymongo.aggregation import ( - _AggregationCommand, - _CollectionAggregationCommand, - _DatabaseAggregationCommand, -) from pymongo.collation import validate_collation_or_none -from pymongo.command_cursor import CommandCursor from pymongo.errors import ( ConnectionFailure, CursorNotFound, @@ -37,8 +31,16 @@ from pymongo.errors import ( PyMongoError, ) from pymongo.operations import _Op +from pymongo.synchronous.aggregation import ( + _AggregationCommand, + _CollectionAggregationCommand, + _DatabaseAggregationCommand, +) +from pymongo.synchronous.command_cursor import CommandCursor from pymongo.typings import _CollationIn, _DocumentType, _Pipeline +_IS_SYNC = True + # The change streams spec considers the following server errors from the # getMore command non-resumable. All other getMore errors are resumable. _RESUMABLE_GETMORE_ERRORS = frozenset( @@ -65,11 +67,11 @@ _RESUMABLE_GETMORE_ERRORS = frozenset( if TYPE_CHECKING: - from pymongo.client_session import ClientSession - from pymongo.collection import Collection - from pymongo.database import Database - from pymongo.mongo_client import MongoClient - from pymongo.pool import Connection + from pymongo.synchronous.client_session import ClientSession + from pymongo.synchronous.collection import Collection + from pymongo.synchronous.database import Database + from pymongo.synchronous.mongo_client import MongoClient + from pymongo.synchronous.pool import Connection def _resumable(exc: PyMongoError) -> bool: @@ -100,7 +102,9 @@ class ChangeStream(Generic[_DocumentType]): def __init__( self, target: Union[ - MongoClient[_DocumentType], Database[_DocumentType], Collection[_DocumentType] + MongoClient[_DocumentType], + Database[_DocumentType], + Collection[_DocumentType], ], pipeline: Optional[_Pipeline], full_document: Optional[str], @@ -149,6 +153,8 @@ class ChangeStream(Generic[_DocumentType]): self._closed = False self._timeout = self._target._timeout self._show_expanded_events = show_expanded_events + + def _initialize_cursor(self) -> None: # Initialize cursor. self._cursor = self._create_cursor() @@ -180,7 +186,7 @@ class ChangeStream(Generic[_DocumentType]): else: options["resumeAfter"] = resume_token - if self._start_at_operation_time is not None: + elif self._start_at_operation_time is not None: options["startAtOperationTime"] = self._start_at_operation_time if self._show_expanded_events: