1352 lines
64 KiB
Python
1352 lines
64 KiB
Python
# Copyright 2015-present MongoDB, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import copy
|
|
import datetime
|
|
import sys
|
|
import time
|
|
import warnings
|
|
|
|
sys.path[0:0] = [""]
|
|
|
|
from bson.objectid import ObjectId
|
|
from bson.py3compat import text_type
|
|
from bson.son import SON
|
|
from pymongo import CursorType, monitoring, InsertOne, UpdateOne, DeleteOne
|
|
from pymongo.command_cursor import CommandCursor
|
|
from pymongo.errors import NotMasterError, OperationFailure
|
|
from pymongo.read_preferences import ReadPreference
|
|
from pymongo.write_concern import WriteConcern
|
|
from test import (client_context,
|
|
client_knobs,
|
|
PyMongoTestCase,
|
|
sanitize_cmd,
|
|
unittest)
|
|
from test.utils import (EventListener,
|
|
rs_or_single_client,
|
|
single_client,
|
|
wait_until)
|
|
|
|
|
|
class TestCommandMonitoring(PyMongoTestCase):
|
|
|
|
@classmethod
|
|
@client_context.require_connection
|
|
def setUpClass(cls):
|
|
cls.listener = EventListener()
|
|
cls.client = rs_or_single_client(
|
|
event_listeners=[cls.listener],
|
|
retryWrites=False)
|
|
|
|
def tearDown(self):
|
|
self.listener.results.clear()
|
|
|
|
def test_started_simple(self):
|
|
self.client.pymongo_test.command('ismaster')
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertEqualCommand(SON([('ismaster', 1)]), started.command)
|
|
self.assertEqual('ismaster', started.command_name)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
|
|
def test_succeeded_simple(self):
|
|
self.client.pymongo_test.command('ismaster')
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertEqual('ismaster', succeeded.command_name)
|
|
self.assertEqual(self.client.address, succeeded.connection_id)
|
|
self.assertEqual(1, succeeded.reply.get('ok'))
|
|
self.assertTrue(isinstance(succeeded.request_id, int))
|
|
self.assertTrue(isinstance(succeeded.duration_micros, int))
|
|
|
|
def test_failed_simple(self):
|
|
try:
|
|
self.client.pymongo_test.command('oops!')
|
|
except OperationFailure:
|
|
pass
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
failed = results['failed'][0]
|
|
self.assertEqual(0, len(results['succeeded']))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertTrue(
|
|
isinstance(failed, monitoring.CommandFailedEvent))
|
|
self.assertEqual('oops!', failed.command_name)
|
|
self.assertEqual(self.client.address, failed.connection_id)
|
|
self.assertEqual(0, failed.failure.get('ok'))
|
|
self.assertTrue(isinstance(failed.request_id, int))
|
|
self.assertTrue(isinstance(failed.duration_micros, int))
|
|
|
|
def test_find_one(self):
|
|
self.client.pymongo_test.test.find_one()
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertEqualCommand(
|
|
SON([('find', 'test'),
|
|
('filter', {}),
|
|
('limit', 1),
|
|
('singleBatch', True)]),
|
|
started.command)
|
|
self.assertEqual('find', started.command_name)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
|
|
def test_find_and_get_more(self):
|
|
self.client.pymongo_test.test.drop()
|
|
self.client.pymongo_test.test.insert_many([{} for _ in range(10)])
|
|
self.listener.results.clear()
|
|
cursor = self.client.pymongo_test.test.find(
|
|
projection={'_id': False},
|
|
batch_size=4)
|
|
for _ in range(4):
|
|
next(cursor)
|
|
cursor_id = cursor.cursor_id
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertEqualCommand(
|
|
SON([('find', 'test'),
|
|
('filter', {}),
|
|
('projection', {'_id': False}),
|
|
('batchSize', 4)]),
|
|
started.command)
|
|
self.assertEqual('find', started.command_name)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertTrue(isinstance(succeeded.duration_micros, int))
|
|
self.assertEqual('find', succeeded.command_name)
|
|
self.assertTrue(isinstance(succeeded.request_id, int))
|
|
self.assertEqual(cursor.address, succeeded.connection_id)
|
|
csr = succeeded.reply["cursor"]
|
|
self.assertEqual(csr["id"], cursor_id)
|
|
self.assertEqual(csr["ns"], "pymongo_test.test")
|
|
self.assertEqual(csr["firstBatch"], [{} for _ in range(4)])
|
|
|
|
self.listener.results.clear()
|
|
# Next batch. Exhausting the cursor could cause a getMore
|
|
# that returns id of 0 and no results.
|
|
next(cursor)
|
|
try:
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertEqualCommand(
|
|
SON([('getMore', cursor_id),
|
|
('collection', 'test'),
|
|
('batchSize', 4)]),
|
|
started.command)
|
|
self.assertEqual('getMore', started.command_name)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertTrue(isinstance(succeeded.duration_micros, int))
|
|
self.assertEqual('getMore', succeeded.command_name)
|
|
self.assertTrue(isinstance(succeeded.request_id, int))
|
|
self.assertEqual(cursor.address, succeeded.connection_id)
|
|
csr = succeeded.reply["cursor"]
|
|
self.assertEqual(csr["id"], cursor_id)
|
|
self.assertEqual(csr["ns"], "pymongo_test.test")
|
|
self.assertEqual(csr["nextBatch"], [{} for _ in range(4)])
|
|
finally:
|
|
# Exhaust the cursor to avoid kill cursors.
|
|
tuple(cursor)
|
|
|
|
def test_find_with_explain(self):
|
|
cmd = SON([('explain', SON([('find', 'test'),
|
|
('filter', {})]))])
|
|
self.client.pymongo_test.test.drop()
|
|
self.client.pymongo_test.test.insert_one({})
|
|
self.listener.results.clear()
|
|
coll = self.client.pymongo_test.test
|
|
# Test that we publish the unwrapped command.
|
|
if self.client.is_mongos:
|
|
coll = coll.with_options(
|
|
read_preference=ReadPreference.PRIMARY_PREFERRED)
|
|
res = coll.find().explain()
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertEqualCommand(cmd, started.command)
|
|
self.assertEqual('explain', started.command_name)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertTrue(isinstance(succeeded.duration_micros, int))
|
|
self.assertEqual('explain', succeeded.command_name)
|
|
self.assertTrue(isinstance(succeeded.request_id, int))
|
|
self.assertEqual(self.client.address, succeeded.connection_id)
|
|
self.assertEqual(res, succeeded.reply)
|
|
|
|
def _test_find_options(self, query, expected_cmd):
|
|
coll = self.client.pymongo_test.test
|
|
coll.drop()
|
|
coll.create_index('x')
|
|
coll.insert_many([{'x': i} for i in range(5)])
|
|
|
|
# Test that we publish the unwrapped command.
|
|
self.listener.results.clear()
|
|
if self.client.is_mongos:
|
|
coll = coll.with_options(
|
|
read_preference=ReadPreference.PRIMARY_PREFERRED)
|
|
|
|
cursor = coll.find(**query)
|
|
|
|
next(cursor)
|
|
try:
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertEqualCommand(expected_cmd, started.command)
|
|
self.assertEqual('find', started.command_name)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertTrue(isinstance(succeeded.duration_micros, int))
|
|
self.assertEqual('find', succeeded.command_name)
|
|
self.assertTrue(isinstance(succeeded.request_id, int))
|
|
self.assertEqual(self.client.address, succeeded.connection_id)
|
|
finally:
|
|
# Exhaust the cursor to avoid kill cursors.
|
|
tuple(cursor)
|
|
|
|
def test_find_options(self):
|
|
query = dict(filter={},
|
|
hint=[('x', 1)],
|
|
max_time_ms=10000,
|
|
max={'x': 10},
|
|
min={'x': -10},
|
|
return_key=True,
|
|
show_record_id=True,
|
|
projection={'x': False},
|
|
skip=1,
|
|
no_cursor_timeout=True,
|
|
sort=[('_id', 1)],
|
|
allow_partial_results=True,
|
|
comment='this is a test',
|
|
batch_size=2)
|
|
|
|
cmd = dict(find='test',
|
|
filter={},
|
|
hint=SON([('x', 1)]),
|
|
comment='this is a test',
|
|
maxTimeMS=10000,
|
|
max={'x': 10},
|
|
min={'x': -10},
|
|
returnKey=True,
|
|
showRecordId=True,
|
|
sort=SON([('_id', 1)]),
|
|
projection={'x': False},
|
|
skip=1,
|
|
batchSize=2,
|
|
noCursorTimeout=True,
|
|
allowPartialResults=True)
|
|
|
|
if client_context.version < (4, 1, 0, -1):
|
|
query['max_scan'] = 10
|
|
cmd['maxScan'] = 10
|
|
|
|
self._test_find_options(query, cmd)
|
|
|
|
@client_context.require_version_max(3, 7, 2)
|
|
def test_find_snapshot(self):
|
|
# Test "snapshot" parameter separately, can't combine with "sort".
|
|
query = dict(filter={},
|
|
snapshot=True)
|
|
|
|
cmd = dict(find='test',
|
|
filter={},
|
|
snapshot=True)
|
|
|
|
self._test_find_options(query, cmd)
|
|
|
|
def test_command_and_get_more(self):
|
|
self.client.pymongo_test.test.drop()
|
|
self.client.pymongo_test.test.insert_many(
|
|
[{'x': 1} for _ in range(10)])
|
|
self.listener.results.clear()
|
|
coll = self.client.pymongo_test.test
|
|
# Test that we publish the unwrapped command.
|
|
if self.client.is_mongos:
|
|
coll = coll.with_options(
|
|
read_preference=ReadPreference.PRIMARY_PREFERRED)
|
|
cursor = coll.aggregate(
|
|
[{'$project': {'_id': False, 'x': 1}}], batchSize=4)
|
|
for _ in range(4):
|
|
next(cursor)
|
|
cursor_id = cursor.cursor_id
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertEqualCommand(
|
|
SON([('aggregate', 'test'),
|
|
('pipeline', [{'$project': {'_id': False, 'x': 1}}]),
|
|
('cursor', {'batchSize': 4})]),
|
|
started.command)
|
|
self.assertEqual('aggregate', started.command_name)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertTrue(isinstance(succeeded.duration_micros, int))
|
|
self.assertEqual('aggregate', succeeded.command_name)
|
|
self.assertTrue(isinstance(succeeded.request_id, int))
|
|
self.assertEqual(cursor.address, succeeded.connection_id)
|
|
expected_cursor = {'id': cursor_id,
|
|
'ns': 'pymongo_test.test',
|
|
'firstBatch': [{'x': 1} for _ in range(4)]}
|
|
self.assertEqualCommand(expected_cursor, succeeded.reply.get('cursor'))
|
|
|
|
self.listener.results.clear()
|
|
next(cursor)
|
|
try:
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertEqualCommand(
|
|
SON([('getMore', cursor_id),
|
|
('collection', 'test'),
|
|
('batchSize', 4)]),
|
|
started.command)
|
|
self.assertEqual('getMore', started.command_name)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertTrue(isinstance(succeeded.duration_micros, int))
|
|
self.assertEqual('getMore', succeeded.command_name)
|
|
self.assertTrue(isinstance(succeeded.request_id, int))
|
|
self.assertEqual(cursor.address, succeeded.connection_id)
|
|
expected_result = {
|
|
'cursor': {'id': cursor_id,
|
|
'ns': 'pymongo_test.test',
|
|
'nextBatch': [{'x': 1} for _ in range(4)]},
|
|
'ok': 1.0}
|
|
self.assertEqualReply(expected_result, succeeded.reply)
|
|
finally:
|
|
# Exhaust the cursor to avoid kill cursors.
|
|
tuple(cursor)
|
|
|
|
def test_get_more_failure(self):
|
|
address = self.client.address
|
|
coll = self.client.pymongo_test.test
|
|
cursor_doc = {"id": 12345, "firstBatch": [], "ns": coll.full_name}
|
|
cursor = CommandCursor(coll, cursor_doc, address)
|
|
try:
|
|
next(cursor)
|
|
except Exception:
|
|
pass
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
self.assertEqual(0, len(results['succeeded']))
|
|
failed = results['failed'][0]
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertEqualCommand(
|
|
SON([('getMore', 12345),
|
|
('collection', 'test')]),
|
|
started.command)
|
|
self.assertEqual('getMore', started.command_name)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
self.assertTrue(
|
|
isinstance(failed, monitoring.CommandFailedEvent))
|
|
self.assertTrue(isinstance(failed.duration_micros, int))
|
|
self.assertEqual('getMore', failed.command_name)
|
|
self.assertTrue(isinstance(failed.request_id, int))
|
|
self.assertEqual(cursor.address, failed.connection_id)
|
|
self.assertEqual(0, failed.failure.get("ok"))
|
|
|
|
@client_context.require_replica_set
|
|
@client_context.require_secondaries_count(1)
|
|
def test_not_master_error(self):
|
|
address = next(iter(client_context.client.secondaries))
|
|
client = single_client(*address, event_listeners=[self.listener])
|
|
# Clear authentication command results from the listener.
|
|
client.admin.command('ismaster')
|
|
self.listener.results.clear()
|
|
error = None
|
|
try:
|
|
client.pymongo_test.test.find_one_and_delete({})
|
|
except NotMasterError as exc:
|
|
error = exc.errors
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
failed = results['failed'][0]
|
|
self.assertEqual(0, len(results['succeeded']))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertTrue(
|
|
isinstance(failed, monitoring.CommandFailedEvent))
|
|
self.assertEqual('findAndModify', failed.command_name)
|
|
self.assertEqual(address, failed.connection_id)
|
|
self.assertEqual(0, failed.failure.get('ok'))
|
|
self.assertTrue(isinstance(failed.request_id, int))
|
|
self.assertTrue(isinstance(failed.duration_micros, int))
|
|
self.assertEqual(error, failed.failure)
|
|
|
|
@client_context.require_no_mongos
|
|
def test_exhaust(self):
|
|
self.client.pymongo_test.test.drop()
|
|
self.client.pymongo_test.test.insert_many([{} for _ in range(10)])
|
|
self.listener.results.clear()
|
|
cursor = self.client.pymongo_test.test.find(
|
|
projection={'_id': False},
|
|
batch_size=5,
|
|
cursor_type=CursorType.EXHAUST)
|
|
next(cursor)
|
|
cursor_id = cursor.cursor_id
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertEqualCommand(
|
|
SON([('find', 'test'),
|
|
('filter', {}),
|
|
('projection', {'_id': False}),
|
|
('batchSize', 5)]),
|
|
started.command)
|
|
self.assertEqual('find', started.command_name)
|
|
self.assertEqual(cursor.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertTrue(isinstance(succeeded.duration_micros, int))
|
|
self.assertEqual('find', succeeded.command_name)
|
|
self.assertTrue(isinstance(succeeded.request_id, int))
|
|
self.assertEqual(cursor.address, succeeded.connection_id)
|
|
expected_result = {
|
|
'cursor': {'id': cursor_id,
|
|
'ns': 'pymongo_test.test',
|
|
'firstBatch': [{} for _ in range(5)]},
|
|
'ok': 1}
|
|
self.assertEqualReply(expected_result, succeeded.reply)
|
|
|
|
self.listener.results.clear()
|
|
tuple(cursor)
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertEqualCommand(
|
|
SON([('getMore', cursor_id),
|
|
('collection', 'test'),
|
|
('batchSize', 5)]),
|
|
started.command)
|
|
self.assertEqual('getMore', started.command_name)
|
|
self.assertEqual(cursor.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertTrue(isinstance(succeeded.duration_micros, int))
|
|
self.assertEqual('getMore', succeeded.command_name)
|
|
self.assertTrue(isinstance(succeeded.request_id, int))
|
|
self.assertEqual(cursor.address, succeeded.connection_id)
|
|
expected_result = {
|
|
'cursor': {'id': 0,
|
|
'ns': 'pymongo_test.test',
|
|
'nextBatch': [{} for _ in range(5)]},
|
|
'ok': 1}
|
|
self.assertEqualReply(expected_result, succeeded.reply)
|
|
|
|
def test_kill_cursors(self):
|
|
with client_knobs(kill_cursor_frequency=0.01):
|
|
self.client.pymongo_test.test.drop()
|
|
self.client.pymongo_test.test.insert_many([{} for _ in range(10)])
|
|
cursor = self.client.pymongo_test.test.find().batch_size(5)
|
|
next(cursor)
|
|
cursor_id = cursor.cursor_id
|
|
self.listener.results.clear()
|
|
cursor.close()
|
|
time.sleep(2)
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
# There could be more than one cursor_id here depending on
|
|
# when the thread last ran.
|
|
self.assertIn(cursor_id, started.command['cursors'])
|
|
self.assertEqual('killCursors', started.command_name)
|
|
self.assertIs(type(started.connection_id), tuple)
|
|
self.assertEqual(cursor.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertTrue(isinstance(succeeded.duration_micros, int))
|
|
self.assertEqual('killCursors', succeeded.command_name)
|
|
self.assertTrue(isinstance(succeeded.request_id, int))
|
|
self.assertIs(type(succeeded.connection_id), tuple)
|
|
self.assertEqual(cursor.address, succeeded.connection_id)
|
|
# There could be more than one cursor_id here depending on
|
|
# when the thread last ran.
|
|
self.assertTrue(cursor_id in succeeded.reply['cursorsUnknown']
|
|
or cursor_id in succeeded.reply['cursorsKilled'])
|
|
|
|
def test_non_bulk_writes(self):
|
|
coll = self.client.pymongo_test.test
|
|
coll.drop()
|
|
self.listener.results.clear()
|
|
|
|
# Implied write concern insert_one
|
|
res = coll.insert_one({'x': 1})
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('insert', coll.name),
|
|
('ordered', True),
|
|
('documents', [{'_id': res.inserted_id, 'x': 1}])])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('insert', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(1, reply.get('n'))
|
|
|
|
# Unacknowledged insert_one
|
|
self.listener.results.clear()
|
|
coll = coll.with_options(write_concern=WriteConcern(w=0))
|
|
res = coll.insert_one({'x': 1})
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('insert', coll.name),
|
|
('ordered', True),
|
|
('documents', [{'_id': res.inserted_id, 'x': 1}]),
|
|
('writeConcern', {'w': 0})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('insert', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
self.assertEqualReply(succeeded.reply, {'ok': 1})
|
|
|
|
# Explicit write concern insert_one
|
|
self.listener.results.clear()
|
|
coll = coll.with_options(write_concern=WriteConcern(w=1))
|
|
res = coll.insert_one({'x': 1})
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('insert', coll.name),
|
|
('ordered', True),
|
|
('documents', [{'_id': res.inserted_id, 'x': 1}]),
|
|
('writeConcern', {'w': 1})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('insert', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(1, reply.get('n'))
|
|
|
|
# delete_many
|
|
self.listener.results.clear()
|
|
res = coll.delete_many({'x': 1})
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('delete', coll.name),
|
|
('ordered', True),
|
|
('deletes', [SON([('q', {'x': 1}),
|
|
('limit', 0)])]),
|
|
('writeConcern', {'w': 1})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('delete', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(res.deleted_count, reply.get('n'))
|
|
|
|
# replace_one
|
|
self.listener.results.clear()
|
|
oid = ObjectId()
|
|
res = coll.replace_one({'_id': oid}, {'_id': oid, 'x': 1}, upsert=True)
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('update', coll.name),
|
|
('ordered', True),
|
|
('updates', [SON([('q', {'_id': oid}),
|
|
('u', {'_id': oid, 'x': 1}),
|
|
('multi', False),
|
|
('upsert', True)])]),
|
|
('writeConcern', {'w': 1})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('update', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(1, reply.get('n'))
|
|
self.assertEqual([{'index': 0, '_id': oid}], reply.get('upserted'))
|
|
|
|
# update_one
|
|
self.listener.results.clear()
|
|
res = coll.update_one({'x': 1}, {'$inc': {'x': 1}})
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('update', coll.name),
|
|
('ordered', True),
|
|
('updates', [SON([('q', {'x': 1}),
|
|
('u', {'$inc': {'x': 1}}),
|
|
('multi', False),
|
|
('upsert', False)])]),
|
|
('writeConcern', {'w': 1})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('update', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(1, reply.get('n'))
|
|
|
|
# update_many
|
|
self.listener.results.clear()
|
|
res = coll.update_many({'x': 2}, {'$inc': {'x': 1}})
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('update', coll.name),
|
|
('ordered', True),
|
|
('updates', [SON([('q', {'x': 2}),
|
|
('u', {'$inc': {'x': 1}}),
|
|
('multi', True),
|
|
('upsert', False)])]),
|
|
('writeConcern', {'w': 1})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('update', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(1, reply.get('n'))
|
|
|
|
# delete_one
|
|
self.listener.results.clear()
|
|
res = coll.delete_one({'x': 3})
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('delete', coll.name),
|
|
('ordered', True),
|
|
('deletes', [SON([('q', {'x': 3}),
|
|
('limit', 1)])]),
|
|
('writeConcern', {'w': 1})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('delete', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(1, reply.get('n'))
|
|
|
|
self.assertEqual(0, coll.count_documents({}))
|
|
|
|
# write errors
|
|
coll.insert_one({'_id': 1})
|
|
try:
|
|
self.listener.results.clear()
|
|
coll.insert_one({'_id': 1})
|
|
except OperationFailure:
|
|
pass
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('insert', coll.name),
|
|
('ordered', True),
|
|
('documents', [{'_id': 1}]),
|
|
('writeConcern', {'w': 1})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('insert', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(0, reply.get('n'))
|
|
errors = reply.get('writeErrors')
|
|
self.assertIsInstance(errors, list)
|
|
error = errors[0]
|
|
self.assertEqual(0, error.get('index'))
|
|
self.assertIsInstance(error.get('code'), int)
|
|
self.assertIsInstance(error.get('errmsg'), text_type)
|
|
|
|
def test_legacy_writes(self):
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore", DeprecationWarning)
|
|
|
|
coll = self.client.pymongo_test.test
|
|
coll.drop()
|
|
self.listener.results.clear()
|
|
|
|
# Implied write concern insert
|
|
_id = coll.insert({'x': 1})
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('insert', coll.name),
|
|
('ordered', True),
|
|
('documents', [{'_id': _id, 'x': 1}])])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('insert', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(1, reply.get('n'))
|
|
|
|
# Unacknowledged insert
|
|
self.listener.results.clear()
|
|
_id = coll.insert({'x': 1}, w=0)
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('insert', coll.name),
|
|
('ordered', True),
|
|
('documents', [{'_id': _id, 'x': 1}]),
|
|
('writeConcern', {'w': 0})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('insert', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
self.assertEqual(succeeded.reply, {'ok': 1})
|
|
|
|
# Explicit write concern insert
|
|
self.listener.results.clear()
|
|
_id = coll.insert({'x': 1}, w=1)
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('insert', coll.name),
|
|
('ordered', True),
|
|
('documents', [{'_id': _id, 'x': 1}]),
|
|
('writeConcern', {'w': 1})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('insert', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(1, reply.get('n'))
|
|
|
|
# remove all
|
|
self.listener.results.clear()
|
|
res = coll.remove({'x': 1}, w=1)
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('delete', coll.name),
|
|
('ordered', True),
|
|
('deletes', [SON([('q', {'x': 1}),
|
|
('limit', 0)])]),
|
|
('writeConcern', {'w': 1})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('delete', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(res['n'], reply.get('n'))
|
|
|
|
# upsert
|
|
self.listener.results.clear()
|
|
oid = ObjectId()
|
|
coll.update({'_id': oid}, {'_id': oid, 'x': 1}, upsert=True, w=1)
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('update', coll.name),
|
|
('ordered', True),
|
|
('updates', [SON([('q', {'_id': oid}),
|
|
('u', {'_id': oid, 'x': 1}),
|
|
('multi', False),
|
|
('upsert', True)])]),
|
|
('writeConcern', {'w': 1})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('update', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(1, reply.get('n'))
|
|
self.assertEqual([{'index': 0, '_id': oid}], reply.get('upserted'))
|
|
|
|
# update one
|
|
self.listener.results.clear()
|
|
coll.update({'x': 1}, {'$inc': {'x': 1}})
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('update', coll.name),
|
|
('ordered', True),
|
|
('updates', [SON([('q', {'x': 1}),
|
|
('u', {'$inc': {'x': 1}}),
|
|
('multi', False),
|
|
('upsert', False)])])])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('update', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(1, reply.get('n'))
|
|
|
|
# update many
|
|
self.listener.results.clear()
|
|
coll.update({'x': 2}, {'$inc': {'x': 1}}, multi=True)
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('update', coll.name),
|
|
('ordered', True),
|
|
('updates', [SON([('q', {'x': 2}),
|
|
('u', {'$inc': {'x': 1}}),
|
|
('multi', True),
|
|
('upsert', False)])])])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('update', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(1, reply.get('n'))
|
|
|
|
# remove one
|
|
self.listener.results.clear()
|
|
coll.remove({'x': 3}, multi=False)
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('delete', coll.name),
|
|
('ordered', True),
|
|
('deletes', [SON([('q', {'x': 3}),
|
|
('limit', 1)])])])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('delete', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
reply = succeeded.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
self.assertEqual(1, reply.get('n'))
|
|
|
|
self.assertEqual(0, coll.count_documents({}))
|
|
|
|
def test_insert_many(self):
|
|
# This always uses the bulk API.
|
|
coll = self.client.pymongo_test.test
|
|
coll.drop()
|
|
self.listener.results.clear()
|
|
|
|
big = 'x' * (1024 * 1024 * 4)
|
|
docs = [{'_id': i, 'big': big} for i in range(6)]
|
|
coll.insert_many(docs)
|
|
results = self.listener.results
|
|
started = results['started']
|
|
succeeded = results['succeeded']
|
|
self.assertEqual(0, len(results['failed']))
|
|
documents = []
|
|
count = 0
|
|
operation_id = started[0].operation_id
|
|
self.assertIsInstance(operation_id, int)
|
|
for start, succeed in zip(started, succeeded):
|
|
self.assertIsInstance(start, monitoring.CommandStartedEvent)
|
|
cmd = sanitize_cmd(start.command)
|
|
self.assertEqual(['insert', 'ordered', 'documents'],
|
|
list(cmd.keys()))
|
|
self.assertEqual(coll.name, cmd['insert'])
|
|
self.assertIs(True, cmd['ordered'])
|
|
documents.extend(cmd['documents'])
|
|
self.assertEqual('pymongo_test', start.database_name)
|
|
self.assertEqual('insert', start.command_name)
|
|
self.assertIsInstance(start.request_id, int)
|
|
self.assertEqual(self.client.address, start.connection_id)
|
|
self.assertIsInstance(succeed, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeed.duration_micros, int)
|
|
self.assertEqual(start.command_name, succeed.command_name)
|
|
self.assertEqual(start.request_id, succeed.request_id)
|
|
self.assertEqual(start.connection_id, succeed.connection_id)
|
|
self.assertEqual(start.operation_id, operation_id)
|
|
self.assertEqual(succeed.operation_id, operation_id)
|
|
reply = succeed.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
count += reply.get('n', 0)
|
|
self.assertEqual(documents, docs)
|
|
self.assertEqual(6, count)
|
|
|
|
def test_legacy_insert_many(self):
|
|
# On legacy servers this uses bulk OP_INSERT.
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore", DeprecationWarning)
|
|
|
|
coll = self.client.pymongo_test.test
|
|
coll.drop()
|
|
self.listener.results.clear()
|
|
|
|
# Force two batches on legacy servers.
|
|
big = 'x' * (1024 * 1024 * 12)
|
|
docs = [{'_id': i, 'big': big} for i in range(6)]
|
|
coll.insert(docs)
|
|
results = self.listener.results
|
|
started = results['started']
|
|
succeeded = results['succeeded']
|
|
self.assertEqual(0, len(results['failed']))
|
|
documents = []
|
|
count = 0
|
|
operation_id = started[0].operation_id
|
|
self.assertIsInstance(operation_id, int)
|
|
for start, succeed in zip(started, succeeded):
|
|
self.assertIsInstance(start, monitoring.CommandStartedEvent)
|
|
cmd = sanitize_cmd(start.command)
|
|
self.assertEqual(['insert', 'ordered', 'documents'],
|
|
list(cmd.keys()))
|
|
self.assertEqual(coll.name, cmd['insert'])
|
|
self.assertIs(True, cmd['ordered'])
|
|
documents.extend(cmd['documents'])
|
|
self.assertEqual('pymongo_test', start.database_name)
|
|
self.assertEqual('insert', start.command_name)
|
|
self.assertIsInstance(start.request_id, int)
|
|
self.assertEqual(self.client.address, start.connection_id)
|
|
self.assertIsInstance(succeed, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeed.duration_micros, int)
|
|
self.assertEqual(start.command_name, succeed.command_name)
|
|
self.assertEqual(start.request_id, succeed.request_id)
|
|
self.assertEqual(start.connection_id, succeed.connection_id)
|
|
self.assertEqual(start.operation_id, operation_id)
|
|
self.assertEqual(succeed.operation_id, operation_id)
|
|
reply = succeed.reply
|
|
self.assertEqual(1, reply.get('ok'))
|
|
count += reply.get('n', 0)
|
|
self.assertEqual(documents, docs)
|
|
self.assertEqual(6, count)
|
|
|
|
def test_bulk_write(self):
|
|
coll = self.client.pymongo_test.test
|
|
coll.drop()
|
|
self.listener.results.clear()
|
|
|
|
coll.bulk_write([InsertOne({'_id': 1}),
|
|
UpdateOne({'_id': 1}, {'$set': {'x': 1}}),
|
|
DeleteOne({'_id': 1})])
|
|
results = self.listener.results
|
|
started = results['started']
|
|
succeeded = results['succeeded']
|
|
self.assertEqual(0, len(results['failed']))
|
|
operation_id = started[0].operation_id
|
|
pairs = list(zip(started, succeeded))
|
|
self.assertEqual(3, len(pairs))
|
|
for start, succeed in pairs:
|
|
self.assertIsInstance(start, monitoring.CommandStartedEvent)
|
|
self.assertEqual('pymongo_test', start.database_name)
|
|
self.assertIsInstance(start.request_id, int)
|
|
self.assertEqual(self.client.address, start.connection_id)
|
|
self.assertIsInstance(succeed, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeed.duration_micros, int)
|
|
self.assertEqual(start.command_name, succeed.command_name)
|
|
self.assertEqual(start.request_id, succeed.request_id)
|
|
self.assertEqual(start.connection_id, succeed.connection_id)
|
|
self.assertEqual(start.operation_id, operation_id)
|
|
self.assertEqual(succeed.operation_id, operation_id)
|
|
|
|
expected = SON([('insert', coll.name),
|
|
('ordered', True),
|
|
('documents', [{'_id': 1}])])
|
|
self.assertEqualCommand(expected, started[0].command)
|
|
expected = SON([('update', coll.name),
|
|
('ordered', True),
|
|
('updates', [SON([('q', {'_id': 1}),
|
|
('u', {'$set': {'x': 1}}),
|
|
('multi', False),
|
|
('upsert', False)])])])
|
|
self.assertEqualCommand(expected, started[1].command)
|
|
expected = SON([('delete', coll.name),
|
|
('ordered', True),
|
|
('deletes', [SON([('q', {'_id': 1}),
|
|
('limit', 1)])])])
|
|
self.assertEqualCommand(expected, started[2].command)
|
|
|
|
def test_write_errors(self):
|
|
coll = self.client.pymongo_test.test
|
|
coll.drop()
|
|
self.listener.results.clear()
|
|
|
|
try:
|
|
coll.bulk_write([InsertOne({'_id': 1}),
|
|
InsertOne({'_id': 1}),
|
|
InsertOne({'_id': 1}),
|
|
DeleteOne({'_id': 1})],
|
|
ordered=False)
|
|
except OperationFailure:
|
|
pass
|
|
results = self.listener.results
|
|
started = results['started']
|
|
succeeded = results['succeeded']
|
|
self.assertEqual(0, len(results['failed']))
|
|
operation_id = started[0].operation_id
|
|
pairs = list(zip(started, succeeded))
|
|
errors = []
|
|
for start, succeed in pairs:
|
|
self.assertIsInstance(start, monitoring.CommandStartedEvent)
|
|
self.assertEqual('pymongo_test', start.database_name)
|
|
self.assertIsInstance(start.request_id, int)
|
|
self.assertEqual(self.client.address, start.connection_id)
|
|
self.assertIsInstance(succeed, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeed.duration_micros, int)
|
|
self.assertEqual(start.command_name, succeed.command_name)
|
|
self.assertEqual(start.request_id, succeed.request_id)
|
|
self.assertEqual(start.connection_id, succeed.connection_id)
|
|
self.assertEqual(start.operation_id, operation_id)
|
|
self.assertEqual(succeed.operation_id, operation_id)
|
|
if 'writeErrors' in succeed.reply:
|
|
errors.extend(succeed.reply['writeErrors'])
|
|
|
|
self.assertEqual(2, len(errors))
|
|
fields = set(['index', 'code', 'errmsg'])
|
|
for error in errors:
|
|
self.assertTrue(fields.issubset(set(error)))
|
|
|
|
def test_first_batch_helper(self):
|
|
# Regardless of server version and use of helpers._first_batch
|
|
# this test should still pass.
|
|
self.listener.results.clear()
|
|
tuple(self.client.pymongo_test.test.list_indexes())
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('listIndexes', 'test'), ('cursor', {})])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('listIndexes', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
self.assertTrue('cursor' in succeeded.reply)
|
|
self.assertTrue('ok' in succeeded.reply)
|
|
|
|
self.listener.results.clear()
|
|
self.client.pymongo_test.current_op(True)
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = SON([('currentOp', 1), ('$all', True)])
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('admin', started.database_name)
|
|
self.assertEqual('currentOp', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
self.assertTrue('inprog' in succeeded.reply)
|
|
self.assertTrue('ok' in succeeded.reply)
|
|
|
|
if not client_context.is_mongos:
|
|
self.client.fsync(lock=True)
|
|
self.listener.results.clear()
|
|
self.client.unlock()
|
|
# Wait for async unlock...
|
|
wait_until(
|
|
lambda: not self.client.is_locked, "unlock the database")
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
expected = {'fsyncUnlock': 1}
|
|
self.assertEqualCommand(expected, started.command)
|
|
self.assertEqual('admin', started.database_name)
|
|
self.assertEqual('fsyncUnlock', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertIsInstance(succeeded.duration_micros, int)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
self.assertTrue('info' in succeeded.reply)
|
|
self.assertTrue('ok' in succeeded.reply)
|
|
|
|
def test_sensitive_commands(self):
|
|
listeners = self.client._event_listeners
|
|
|
|
self.listener.results.clear()
|
|
cmd = SON([("getnonce", 1)])
|
|
listeners.publish_command_start(
|
|
cmd, "pymongo_test", 12345, self.client.address)
|
|
delta = datetime.timedelta(milliseconds=100)
|
|
listeners.publish_command_success(
|
|
delta, {'nonce': 'e474f4561c5eb40b', 'ok': 1.0},
|
|
"getnonce", 12345, self.client.address)
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertIsInstance(started, monitoring.CommandStartedEvent)
|
|
self.assertEqual({}, started.command)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertEqual('getnonce', started.command_name)
|
|
self.assertIsInstance(started.request_id, int)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent)
|
|
self.assertEqual(succeeded.duration_micros, 100000)
|
|
self.assertEqual(started.command_name, succeeded.command_name)
|
|
self.assertEqual(started.request_id, succeeded.request_id)
|
|
self.assertEqual(started.connection_id, succeeded.connection_id)
|
|
self.assertEqual({}, succeeded.reply)
|
|
|
|
|
|
class TestGlobalListener(PyMongoTestCase):
|
|
|
|
@classmethod
|
|
@client_context.require_connection
|
|
def setUpClass(cls):
|
|
cls.listener = EventListener()
|
|
# We plan to call register(), which internally modifies _LISTENERS.
|
|
cls.saved_listeners = copy.deepcopy(monitoring._LISTENERS)
|
|
monitoring.register(cls.listener)
|
|
cls.client = single_client()
|
|
# Get one (authenticated) socket in the pool.
|
|
cls.client.pymongo_test.command('ismaster')
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
monitoring._LISTENERS = cls.saved_listeners
|
|
|
|
def setUp(self):
|
|
self.listener.results.clear()
|
|
|
|
def test_simple(self):
|
|
self.client.pymongo_test.command('ismaster')
|
|
results = self.listener.results
|
|
started = results['started'][0]
|
|
succeeded = results['succeeded'][0]
|
|
self.assertEqual(0, len(results['failed']))
|
|
self.assertTrue(
|
|
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
|
self.assertTrue(
|
|
isinstance(started, monitoring.CommandStartedEvent))
|
|
self.assertEqualCommand(SON([('ismaster', 1)]), started.command)
|
|
self.assertEqual('ismaster', started.command_name)
|
|
self.assertEqual(self.client.address, started.connection_id)
|
|
self.assertEqual('pymongo_test', started.database_name)
|
|
self.assertTrue(isinstance(started.request_id, int))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|