From 5ebd2938bdef4995321fdc44c07caab9ed09ad5d Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Tue, 26 Mar 2019 13:39:57 -0700 Subject: [PATCH] PYTHON-1792 More reliable tests for ChangeStream.try_next --- test/test_change_stream.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/test/test_change_stream.py b/test/test_change_stream.py index 58d6518df..6738f6691 100644 --- a/test/test_change_stream.py +++ b/test/test_change_stream.py @@ -36,13 +36,13 @@ from bson.binary import (ALL_UUID_REPRESENTATIONS, from bson.py3compat import iteritems from bson.raw_bson import DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument -from pymongo import monitoring from pymongo.change_stream import _NON_RESUMABLE_GETMORE_ERRORS from pymongo.command_cursor import CommandCursor from pymongo.errors import (InvalidOperation, OperationFailure, ServerSelectionTimeoutError) from pymongo.message import _CursorAddress from pymongo.read_concern import ReadConcern +from pymongo.write_concern import WriteConcern from test import client_context, unittest, IntegrationTest from test.utils import ( @@ -70,10 +70,13 @@ class ChangeStreamTryNextMixin(object): client._close_cursor_now(cursor.cursor_id, address) def test_try_next(self): - coll = self.watched_collection() + # ChangeStreams only read majority committed data so use w:majority. + coll = self.watched_collection().with_options( + write_concern=WriteConcern("majority")) + coll.drop() coll.insert_one({}) self.addCleanup(coll.drop) - with self.change_stream(max_await_time_ms=100) as stream: + with self.change_stream(max_await_time_ms=250) as stream: self.assertIsNone(stream.try_next()) self.assertIsNone(stream._resume_token) coll.insert_one({}) @@ -88,14 +91,16 @@ class ChangeStreamTryNextMixin(object): # Connect to the cluster. client.admin.command('ping') listener.results.clear() - coll = self.watched_collection() + # ChangeStreams only read majority committed data so use w:majority. + coll = self.watched_collection().with_options( + write_concern=WriteConcern("majority")) coll.drop() # Create the watched collection before starting the change stream to # skip any "create" events. coll.insert_one({'_id': 1}) self.addCleanup(coll.drop) with self.change_stream_with_client( - client, max_await_time_ms=100) as stream: + client, max_await_time_ms=250) as stream: self.assertEqual(listener.started_command_names(), ["aggregate"]) listener.results.clear()