PYTHON-1792 More reliable tests for ChangeStream.try_next

This commit is contained in:
Shane Harvey 2019-03-26 13:39:57 -07:00
parent cd787dbb2c
commit 5ebd2938bd

View File

@ -36,13 +36,13 @@ from bson.binary import (ALL_UUID_REPRESENTATIONS,
from bson.py3compat import iteritems
from bson.raw_bson import DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument
from pymongo import monitoring
from pymongo.change_stream import _NON_RESUMABLE_GETMORE_ERRORS
from pymongo.command_cursor import CommandCursor
from pymongo.errors import (InvalidOperation, OperationFailure,
ServerSelectionTimeoutError)
from pymongo.message import _CursorAddress
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from test import client_context, unittest, IntegrationTest
from test.utils import (
@ -70,10 +70,13 @@ class ChangeStreamTryNextMixin(object):
client._close_cursor_now(cursor.cursor_id, address)
def test_try_next(self):
coll = self.watched_collection()
# ChangeStreams only read majority committed data so use w:majority.
coll = self.watched_collection().with_options(
write_concern=WriteConcern("majority"))
coll.drop()
coll.insert_one({})
self.addCleanup(coll.drop)
with self.change_stream(max_await_time_ms=100) as stream:
with self.change_stream(max_await_time_ms=250) as stream:
self.assertIsNone(stream.try_next())
self.assertIsNone(stream._resume_token)
coll.insert_one({})
@ -88,14 +91,16 @@ class ChangeStreamTryNextMixin(object):
# Connect to the cluster.
client.admin.command('ping')
listener.results.clear()
coll = self.watched_collection()
# ChangeStreams only read majority committed data so use w:majority.
coll = self.watched_collection().with_options(
write_concern=WriteConcern("majority"))
coll.drop()
# Create the watched collection before starting the change stream to
# skip any "create" events.
coll.insert_one({'_id': 1})
self.addCleanup(coll.drop)
with self.change_stream_with_client(
client, max_await_time_ms=100) as stream:
client, max_await_time_ms=250) as stream:
self.assertEqual(listener.started_command_names(), ["aggregate"])
listener.results.clear()