From f61b0e4f591ff09ed9382a6fe99cb0c6a87fa764 Mon Sep 17 00:00:00 2001 From: Bernie Hackett Date: Tue, 29 Apr 2014 11:41:35 -0700 Subject: [PATCH] PYTHON-684 - Ignore wnote/jnote from legacy servers. Stop unnecessarily raising OperationFailure in the Bulk API when a pre-2.6 server returns a result with a wnote or jnote field. --- pymongo/bulk.py | 10 --- test/test_bulk.py | 215 ++++++++++++++++++++++------------------------ 2 files changed, 104 insertions(+), 121 deletions(-) diff --git a/pymongo/bulk.py b/pymongo/bulk.py index 58515f83f..4eda2f36b 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -82,16 +82,6 @@ def _make_error(index, code, errmsg, operation): def _merge_legacy(run, full_result, result, index): """Merge a result from a legacy opcode into the full results. """ - # MongoDB 2.6 returns {'ok': 0, 'code': 2, ...} if the j write - # concern option is used with --nojournal or w > 1 is used with - # a standalone mongod instance. Raise immediately here for - # consistency when talking to older servers. Since these are - # configuration errors related to write concern the entire batch - # will fail. - note = result.get("jnote", result.get("wnote")) - if note: - raise OperationFailure(note, _BAD_VALUE, result) - affected = result.get('n', 0) errmsg = result.get("errmsg", result.get("err", "")) diff --git a/test/test_bulk.py b/test/test_bulk.py index b6471b9f0..ff02c493c 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -903,30 +903,58 @@ class TestBulkWriteConcern(BulkTestBase): OperationFailure, batch.execute, {'fsync': True, 'j': True}) - def test_j_without_journal(self): - client = self.coll.database.connection - if not server_started_with_nojournal(client): - raise SkipTest("Need mongod started with --nojournal") - - # Using j=True without journaling is a hard failure. - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({}) - self.assertRaises(OperationFailure, batch.execute, {'j': True}) - def test_write_concern_failure_ordered(self): + if not self.is_repl: + raise SkipTest("Need a replica set to test.") + + # Ensure we don't raise on wnote. + batch = self.coll.initialize_ordered_bulk_op() + batch.find({"something": "that does not exist"}).remove() + self.assertTrue(batch.execute({"w": self.w})) batch = self.coll.initialize_ordered_bulk_op() batch.insert({'a': 1}) batch.insert({'a': 2}) - # Using w > 1 with no replication is a hard failure. - if not self.is_repl: - self.assertRaises(OperationFailure, - batch.execute, {'w': 5, 'wtimeout': 1}) - # Replication wtimeout is a 'soft' error. # It shouldn't stop batch processing. + try: + batch.execute({'w': self.w + 1, 'wtimeout': 1}) + except BulkWriteError, exc: + result = exc.details + self.assertEqual(exc.code, 65) else: + self.fail("Error not raised") + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 0, + 'nInserted': 2, + 'nRemoved': 0, + 'upserted': [], + 'writeErrors': []}, + result) + + # When talking to legacy servers there will be a + # write concern error for each operation. + self.assertTrue(len(result['writeConcernErrors']) > 0) + + failed = result['writeConcernErrors'][0] + self.assertEqual(64, failed['code']) + self.assertTrue(isinstance(failed['errmsg'], basestring)) + + self.coll.remove() + self.coll.ensure_index('a', unique=True) + + # Fail due to write concern support as well + # as duplicate key error on ordered batch. + try: + batch = self.coll.initialize_ordered_bulk_op() + batch.insert({'a': 1}) + batch.find({'a': 3}).upsert().replace_one({'b': 1}) + batch.insert({'a': 1}) + batch.insert({'a': 2}) try: batch.execute({'w': self.w + 1, 'wtimeout': 1}) except BulkWriteError, exc: @@ -938,74 +966,66 @@ class TestBulkWriteConcern(BulkTestBase): self.assertEqualResponse( {'nMatched': 0, 'nModified': 0, - 'nUpserted': 0, - 'nInserted': 2, + 'nUpserted': 1, + 'nInserted': 1, 'nRemoved': 0, - 'upserted': [], - 'writeErrors': []}, + 'upserted': [{'index': 1, '_id': '...'}], + 'writeErrors': [ + {'index': 2, + 'code': 11000, + 'errmsg': '...', + 'op': {'_id': '...', 'a': 1}}]}, result) - # When talking to legacy servers there will be a - # write concern error for each operation. - self.assertTrue(len(result['writeConcernErrors']) > 0) - - failed = result['writeConcernErrors'][0] - self.assertEqual(64, failed['code']) - self.assertTrue(isinstance(failed['errmsg'], basestring)) - - self.coll.remove() - self.coll.ensure_index('a', unique=True) - - # Fail due to write concern support as well - # as duplicate key error on ordered batch. - try: - batch = self.coll.initialize_ordered_bulk_op() - batch.insert({'a': 1}) - batch.find({'a': 3}).upsert().replace_one({'b': 1}) - batch.insert({'a': 1}) - batch.insert({'a': 2}) - try: - batch.execute({'w': self.w + 1, 'wtimeout': 1}) - except BulkWriteError, exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") - - self.assertEqualResponse( - {'nMatched': 0, - 'nModified': 0, - 'nUpserted': 1, - 'nInserted': 1, - 'nRemoved': 0, - 'upserted': [{'index': 1, '_id': '...'}], - 'writeErrors': [ - {'index': 2, - 'code': 11000, - 'errmsg': '...', - 'op': {'_id': '...', 'a': 1}}]}, - result) - - self.assertEqual(2, len(result['writeConcernErrors'])) - failed = result['writeErrors'][0] - self.assertTrue("duplicate" in failed['errmsg']) - finally: - self.coll.drop_index([('a', 1)]) + self.assertEqual(2, len(result['writeConcernErrors'])) + failed = result['writeErrors'][0] + self.assertTrue("duplicate" in failed['errmsg']) + finally: + self.coll.drop_index([('a', 1)]) def test_write_concern_failure_unordered(self): + if not self.is_repl: + raise SkipTest("Need a replica set to test.") + + # Ensure we don't raise on wnote. + batch = self.coll.initialize_ordered_bulk_op() + batch.find({"something": "that does not exist"}).remove() + self.assertTrue(batch.execute({"w": self.w})) batch = self.coll.initialize_unordered_bulk_op() batch.insert({'a': 1}) batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, 'b': 1}}) batch.insert({'a': 2}) - # Using w > 1 with no replication is a hard failure. - if not self.is_repl: - self.assertRaises(OperationFailure, - batch.execute, {'w': 5, 'wtimeout': 1}) # Replication wtimeout is a 'soft' error. # It shouldn't stop batch processing. + try: + batch.execute({'w': self.w + 1, 'wtimeout': 1}) + except BulkWriteError, exc: + result = exc.details + self.assertEqual(exc.code, 65) else: + self.fail("Error not raised") + + self.assertEqual(2, result['nInserted']) + self.assertEqual(1, result['nUpserted']) + self.assertEqual(0, len(result['writeErrors'])) + # When talking to legacy servers there will be a + # write concern error for each operation. + self.assertTrue(len(result['writeConcernErrors']) > 1) + + self.coll.remove() + self.coll.ensure_index('a', unique=True) + + # Fail due to write concern support as well + # as duplicate key error on unordered batch. + try: + batch = self.coll.initialize_unordered_bulk_op() + batch.insert({'a': 1}) + batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, + 'b': 1}}) + batch.insert({'a': 1}) + batch.insert({'a': 2}) try: batch.execute({'w': self.w + 1, 'wtimeout': 1}) except BulkWriteError, exc: @@ -1016,54 +1036,27 @@ class TestBulkWriteConcern(BulkTestBase): self.assertEqual(2, result['nInserted']) self.assertEqual(1, result['nUpserted']) - self.assertEqual(0, len(result['writeErrors'])) + self.assertEqual(1, len(result['writeErrors'])) # When talking to legacy servers there will be a # write concern error for each operation. self.assertTrue(len(result['writeConcernErrors']) > 1) - self.coll.remove() - self.coll.ensure_index('a', unique=True) + failed = result['writeErrors'][0] + self.assertEqual(2, failed['index']) + self.assertEqual(11000, failed['code']) + self.assertTrue(isinstance(failed['errmsg'], basestring)) + self.assertEqual(1, failed['op']['a']) - # Fail due to write concern support as well - # as duplicate key error on unordered batch. - try: - batch = self.coll.initialize_unordered_bulk_op() - batch.insert({'a': 1}) - batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, - 'b': 1}}) - batch.insert({'a': 1}) - batch.insert({'a': 2}) - try: - batch.execute({'w': self.w + 1, 'wtimeout': 1}) - except BulkWriteError, exc: - result = exc.details - self.assertEqual(exc.code, 65) - else: - self.fail("Error not raised") + failed = result['writeConcernErrors'][0] + self.assertEqual(64, failed['code']) + self.assertTrue(isinstance(failed['errmsg'], basestring)) - self.assertEqual(2, result['nInserted']) - self.assertEqual(1, result['nUpserted']) - self.assertEqual(1, len(result['writeErrors'])) - # When talking to legacy servers there will be a - # write concern error for each operation. - self.assertTrue(len(result['writeConcernErrors']) > 1) - - failed = result['writeErrors'][0] - self.assertEqual(2, failed['index']) - self.assertEqual(11000, failed['code']) - self.assertTrue(isinstance(failed['errmsg'], basestring)) - self.assertEqual(1, failed['op']['a']) - - failed = result['writeConcernErrors'][0] - self.assertEqual(64, failed['code']) - self.assertTrue(isinstance(failed['errmsg'], basestring)) - - upserts = result['upserted'] - self.assertEqual(1, len(upserts)) - self.assertEqual(1, upserts[0]['index']) - self.assertTrue(upserts[0].get('_id')) - finally: - self.coll.drop_index([('a', 1)]) + upserts = result['upserted'] + self.assertEqual(1, len(upserts)) + self.assertEqual(1, upserts[0]['index']) + self.assertTrue(upserts[0].get('_id')) + finally: + self.coll.drop_index([('a', 1)]) class TestBulkNoResults(BulkTestBase):