PYTHON-684 - Ignore wnote/jnote from legacy servers.

Stop unnecessarily raising OperationFailure in the Bulk API
when a pre-2.6 server returns a result with a wnote or jnote
field.
This commit is contained in:
Bernie Hackett 2014-04-29 11:41:35 -07:00
parent 7d55d77072
commit f61b0e4f59
2 changed files with 104 additions and 121 deletions

View File

@ -82,16 +82,6 @@ def _make_error(index, code, errmsg, operation):
def _merge_legacy(run, full_result, result, index):
"""Merge a result from a legacy opcode into the full results.
"""
# MongoDB 2.6 returns {'ok': 0, 'code': 2, ...} if the j write
# concern option is used with --nojournal or w > 1 is used with
# a standalone mongod instance. Raise immediately here for
# consistency when talking to older servers. Since these are
# configuration errors related to write concern the entire batch
# will fail.
note = result.get("jnote", result.get("wnote"))
if note:
raise OperationFailure(note, _BAD_VALUE, result)
affected = result.get('n', 0)
errmsg = result.get("errmsg", result.get("err", ""))

View File

@ -903,30 +903,58 @@ class TestBulkWriteConcern(BulkTestBase):
OperationFailure,
batch.execute, {'fsync': True, 'j': True})
def test_j_without_journal(self):
client = self.coll.database.connection
if not server_started_with_nojournal(client):
raise SkipTest("Need mongod started with --nojournal")
# Using j=True without journaling is a hard failure.
batch = self.coll.initialize_ordered_bulk_op()
batch.insert({})
self.assertRaises(OperationFailure, batch.execute, {'j': True})
def test_write_concern_failure_ordered(self):
if not self.is_repl:
raise SkipTest("Need a replica set to test.")
# Ensure we don't raise on wnote.
batch = self.coll.initialize_ordered_bulk_op()
batch.find({"something": "that does not exist"}).remove()
self.assertTrue(batch.execute({"w": self.w}))
batch = self.coll.initialize_ordered_bulk_op()
batch.insert({'a': 1})
batch.insert({'a': 2})
# Using w > 1 with no replication is a hard failure.
if not self.is_repl:
self.assertRaises(OperationFailure,
batch.execute, {'w': 5, 'wtimeout': 1})
# Replication wtimeout is a 'soft' error.
# It shouldn't stop batch processing.
try:
batch.execute({'w': self.w + 1, 'wtimeout': 1})
except BulkWriteError, exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 0,
'nInserted': 2,
'nRemoved': 0,
'upserted': [],
'writeErrors': []},
result)
# When talking to legacy servers there will be a
# write concern error for each operation.
self.assertTrue(len(result['writeConcernErrors']) > 0)
failed = result['writeConcernErrors'][0]
self.assertEqual(64, failed['code'])
self.assertTrue(isinstance(failed['errmsg'], basestring))
self.coll.remove()
self.coll.ensure_index('a', unique=True)
# Fail due to write concern support as well
# as duplicate key error on ordered batch.
try:
batch = self.coll.initialize_ordered_bulk_op()
batch.insert({'a': 1})
batch.find({'a': 3}).upsert().replace_one({'b': 1})
batch.insert({'a': 1})
batch.insert({'a': 2})
try:
batch.execute({'w': self.w + 1, 'wtimeout': 1})
except BulkWriteError, exc:
@ -938,74 +966,66 @@ class TestBulkWriteConcern(BulkTestBase):
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 0,
'nInserted': 2,
'nUpserted': 1,
'nInserted': 1,
'nRemoved': 0,
'upserted': [],
'writeErrors': []},
'upserted': [{'index': 1, '_id': '...'}],
'writeErrors': [
{'index': 2,
'code': 11000,
'errmsg': '...',
'op': {'_id': '...', 'a': 1}}]},
result)
# When talking to legacy servers there will be a
# write concern error for each operation.
self.assertTrue(len(result['writeConcernErrors']) > 0)
failed = result['writeConcernErrors'][0]
self.assertEqual(64, failed['code'])
self.assertTrue(isinstance(failed['errmsg'], basestring))
self.coll.remove()
self.coll.ensure_index('a', unique=True)
# Fail due to write concern support as well
# as duplicate key error on ordered batch.
try:
batch = self.coll.initialize_ordered_bulk_op()
batch.insert({'a': 1})
batch.find({'a': 3}).upsert().replace_one({'b': 1})
batch.insert({'a': 1})
batch.insert({'a': 2})
try:
batch.execute({'w': self.w + 1, 'wtimeout': 1})
except BulkWriteError, exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 1,
'nInserted': 1,
'nRemoved': 0,
'upserted': [{'index': 1, '_id': '...'}],
'writeErrors': [
{'index': 2,
'code': 11000,
'errmsg': '...',
'op': {'_id': '...', 'a': 1}}]},
result)
self.assertEqual(2, len(result['writeConcernErrors']))
failed = result['writeErrors'][0]
self.assertTrue("duplicate" in failed['errmsg'])
finally:
self.coll.drop_index([('a', 1)])
self.assertEqual(2, len(result['writeConcernErrors']))
failed = result['writeErrors'][0]
self.assertTrue("duplicate" in failed['errmsg'])
finally:
self.coll.drop_index([('a', 1)])
def test_write_concern_failure_unordered(self):
if not self.is_repl:
raise SkipTest("Need a replica set to test.")
# Ensure we don't raise on wnote.
batch = self.coll.initialize_ordered_bulk_op()
batch.find({"something": "that does not exist"}).remove()
self.assertTrue(batch.execute({"w": self.w}))
batch = self.coll.initialize_unordered_bulk_op()
batch.insert({'a': 1})
batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3, 'b': 1}})
batch.insert({'a': 2})
# Using w > 1 with no replication is a hard failure.
if not self.is_repl:
self.assertRaises(OperationFailure,
batch.execute, {'w': 5, 'wtimeout': 1})
# Replication wtimeout is a 'soft' error.
# It shouldn't stop batch processing.
try:
batch.execute({'w': self.w + 1, 'wtimeout': 1})
except BulkWriteError, exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
self.assertEqual(2, result['nInserted'])
self.assertEqual(1, result['nUpserted'])
self.assertEqual(0, len(result['writeErrors']))
# When talking to legacy servers there will be a
# write concern error for each operation.
self.assertTrue(len(result['writeConcernErrors']) > 1)
self.coll.remove()
self.coll.ensure_index('a', unique=True)
# Fail due to write concern support as well
# as duplicate key error on unordered batch.
try:
batch = self.coll.initialize_unordered_bulk_op()
batch.insert({'a': 1})
batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3,
'b': 1}})
batch.insert({'a': 1})
batch.insert({'a': 2})
try:
batch.execute({'w': self.w + 1, 'wtimeout': 1})
except BulkWriteError, exc:
@ -1016,54 +1036,27 @@ class TestBulkWriteConcern(BulkTestBase):
self.assertEqual(2, result['nInserted'])
self.assertEqual(1, result['nUpserted'])
self.assertEqual(0, len(result['writeErrors']))
self.assertEqual(1, len(result['writeErrors']))
# When talking to legacy servers there will be a
# write concern error for each operation.
self.assertTrue(len(result['writeConcernErrors']) > 1)
self.coll.remove()
self.coll.ensure_index('a', unique=True)
failed = result['writeErrors'][0]
self.assertEqual(2, failed['index'])
self.assertEqual(11000, failed['code'])
self.assertTrue(isinstance(failed['errmsg'], basestring))
self.assertEqual(1, failed['op']['a'])
# Fail due to write concern support as well
# as duplicate key error on unordered batch.
try:
batch = self.coll.initialize_unordered_bulk_op()
batch.insert({'a': 1})
batch.find({'a': 3}).upsert().update_one({'$set': {'a': 3,
'b': 1}})
batch.insert({'a': 1})
batch.insert({'a': 2})
try:
batch.execute({'w': self.w + 1, 'wtimeout': 1})
except BulkWriteError, exc:
result = exc.details
self.assertEqual(exc.code, 65)
else:
self.fail("Error not raised")
failed = result['writeConcernErrors'][0]
self.assertEqual(64, failed['code'])
self.assertTrue(isinstance(failed['errmsg'], basestring))
self.assertEqual(2, result['nInserted'])
self.assertEqual(1, result['nUpserted'])
self.assertEqual(1, len(result['writeErrors']))
# When talking to legacy servers there will be a
# write concern error for each operation.
self.assertTrue(len(result['writeConcernErrors']) > 1)
failed = result['writeErrors'][0]
self.assertEqual(2, failed['index'])
self.assertEqual(11000, failed['code'])
self.assertTrue(isinstance(failed['errmsg'], basestring))
self.assertEqual(1, failed['op']['a'])
failed = result['writeConcernErrors'][0]
self.assertEqual(64, failed['code'])
self.assertTrue(isinstance(failed['errmsg'], basestring))
upserts = result['upserted']
self.assertEqual(1, len(upserts))
self.assertEqual(1, upserts[0]['index'])
self.assertTrue(upserts[0].get('_id'))
finally:
self.coll.drop_index([('a', 1)])
upserts = result['upserted']
self.assertEqual(1, len(upserts))
self.assertEqual(1, upserts[0]['index'])
self.assertTrue(upserts[0].get('_id'))
finally:
self.coll.drop_index([('a', 1)])
class TestBulkNoResults(BulkTestBase):