PYTHON-952 - killCursors monitoring
This commit is contained in:
parent
8ed682b48f
commit
5dba74c005
@ -21,7 +21,7 @@ from collections import deque
|
||||
from bson.py3compat import integer_types
|
||||
from pymongo import helpers, monitoring
|
||||
from pymongo.errors import AutoReconnect, NotMasterError, OperationFailure
|
||||
from pymongo.message import _GetMore
|
||||
from pymongo.message import _CursorAddress, _GetMore
|
||||
|
||||
|
||||
class CommandCursor(object):
|
||||
@ -52,8 +52,8 @@ class CommandCursor(object):
|
||||
"""Closes this cursor.
|
||||
"""
|
||||
if self.__id and not self.__killed:
|
||||
self.__collection.database.client.close_cursor(self.__id,
|
||||
self.__address)
|
||||
self.__collection.database.client.close_cursor(
|
||||
self.__id, _CursorAddress(self.__address, self.__ns))
|
||||
self.__killed = True
|
||||
|
||||
def close(self):
|
||||
|
||||
@ -32,7 +32,7 @@ from pymongo.errors import (AutoReconnect,
|
||||
InvalidOperation,
|
||||
NotMasterError,
|
||||
OperationFailure)
|
||||
from pymongo.message import _GetMore, _Query
|
||||
from pymongo.message import _CursorAddress, _GetMore, _Query
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
|
||||
_QUERY_OPTIONS = {
|
||||
@ -269,8 +269,10 @@ class Cursor(object):
|
||||
# to stop the server from sending more data.
|
||||
self.__exhaust_mgr.sock.close()
|
||||
else:
|
||||
self.__collection.database.client.close_cursor(self.__id,
|
||||
self.__address)
|
||||
self.__collection.database.client.close_cursor(
|
||||
self.__id,
|
||||
_CursorAddress(
|
||||
self.__address, self.__collection.full_name))
|
||||
if self.__exhaust and self.__exhaust_mgr:
|
||||
self.__exhaust_mgr.close()
|
||||
self.__killed = True
|
||||
|
||||
@ -219,6 +219,20 @@ class _GetMore(object):
|
||||
return get_more(self.ns, self.ntoreturn, self.cursor_id)
|
||||
|
||||
|
||||
class _CursorAddress(tuple):
|
||||
"""The server address (host, port) of a cursor, with namespace property."""
|
||||
|
||||
def __new__(cls, address, namespace):
|
||||
self = tuple.__new__(cls, address)
|
||||
self.__namespace = namespace
|
||||
return self
|
||||
|
||||
@property
|
||||
def namespace(self):
|
||||
"""The namespace this cursor."""
|
||||
return self.__namespace
|
||||
|
||||
|
||||
def __last_error(namespace, args):
|
||||
"""Data to send to do a lastError.
|
||||
"""
|
||||
|
||||
@ -40,9 +40,11 @@ from collections import defaultdict
|
||||
|
||||
from bson.py3compat import (integer_types,
|
||||
string_type)
|
||||
from bson.son import SON
|
||||
from pymongo import (common,
|
||||
database,
|
||||
message,
|
||||
monitoring,
|
||||
periodic_executor,
|
||||
uri_parser)
|
||||
from pymongo.client_options import ClientOptions
|
||||
@ -910,6 +912,7 @@ class MongoClient(common.BaseObject):
|
||||
|
||||
# Don't re-open topology if it's closed and there's no pending cursors.
|
||||
if address_to_cursor_ids:
|
||||
publish = monitoring.enabled()
|
||||
topology = self._get_topology()
|
||||
for address, cursor_ids in address_to_cursor_ids.items():
|
||||
try:
|
||||
@ -920,8 +923,28 @@ class MongoClient(common.BaseObject):
|
||||
server = topology.select_server(
|
||||
writable_server_selector)
|
||||
|
||||
server.send_message(message.kill_cursors(cursor_ids),
|
||||
self.__all_credentials)
|
||||
if publish:
|
||||
start = datetime.datetime.now()
|
||||
data = message.kill_cursors(cursor_ids)
|
||||
if publish:
|
||||
duration = datetime.datetime.now() - start
|
||||
try:
|
||||
dbname, collname = address.namespace.split(".", 1)
|
||||
except AttributeError:
|
||||
dbname = collname = 'OP_KILL_CURSORS'
|
||||
command = SON([('killCursors', collname),
|
||||
('cursors', cursor_ids)])
|
||||
monitoring.publish_command_start(
|
||||
command, dbname, data[0], address)
|
||||
start = datetime.datetime.now()
|
||||
server.send_message(data, self.__all_credentials)
|
||||
if publish:
|
||||
duration = (datetime.datetime.now() - start) + duration
|
||||
# OP_KILL_CURSORS returns no reply, fake one.
|
||||
reply = {'cursorsUnknown': cursor_ids, 'ok': 1}
|
||||
monitoring.publish_command_success(
|
||||
duration, reply, 'killCursors', data[0], address)
|
||||
|
||||
except ConnectionFailure as exc:
|
||||
warnings.warn("couldn't close cursor on %s: %s"
|
||||
% (address, exc))
|
||||
|
||||
@ -13,14 +13,15 @@
|
||||
# limitations under the License.
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
sys.path[0:0] = [""]
|
||||
|
||||
from bson.son import SON
|
||||
from pymongo import CursorType, MongoClient, monitoring
|
||||
from pymongo import CursorType, monitoring
|
||||
from pymongo.command_cursor import CommandCursor
|
||||
from pymongo.errors import NotMasterError, OperationFailure
|
||||
from test import unittest, IntegrationTest, client_context
|
||||
from test import unittest, IntegrationTest, client_context, client_knobs
|
||||
from test.utils import single_client
|
||||
|
||||
|
||||
@ -421,6 +422,39 @@ class TestCommandMonitoring(IntegrationTest):
|
||||
'ok': 1}
|
||||
self.assertEqual(expected_result, succeeded.reply)
|
||||
|
||||
def test_kill_cursors(self):
|
||||
with client_knobs(kill_cursor_frequency=0.01):
|
||||
self.client.pymongo_test.test.drop()
|
||||
self.client.pymongo_test.test.insert_many([{} for _ in range(10)])
|
||||
cursor = self.client.pymongo_test.test.find().batch_size(5)
|
||||
next(cursor)
|
||||
cursor_id = cursor.cursor_id
|
||||
self.listener.results = {}
|
||||
cursor.close()
|
||||
time.sleep(2)
|
||||
results = self.listener.results
|
||||
started = results.get('started')
|
||||
succeeded = results.get('succeeded')
|
||||
self.assertIsNone(results.get('failed'))
|
||||
self.assertTrue(
|
||||
isinstance(started, monitoring.CommandStartedEvent))
|
||||
# There could be more than one cursor_id here depending on
|
||||
# when the thread last ran.
|
||||
self.assertIn(cursor_id, started.command['cursors'])
|
||||
self.assertEqual('killCursors', started.command_name)
|
||||
self.assertEqual(cursor.address, started.connection_id)
|
||||
self.assertEqual('pymongo_test', started.database_name)
|
||||
self.assertTrue(isinstance(started.request_id, int))
|
||||
self.assertTrue(
|
||||
isinstance(succeeded, monitoring.CommandSucceededEvent))
|
||||
self.assertTrue(isinstance(succeeded.duration_micros, int))
|
||||
self.assertEqual('killCursors', succeeded.command_name)
|
||||
self.assertTrue(isinstance(succeeded.request_id, int))
|
||||
self.assertEqual(cursor.address, succeeded.connection_id)
|
||||
# There could be more than one cursor_id here depending on
|
||||
# when the thread last ran.
|
||||
self.assertIn(cursor_id, succeeded.reply['cursorsUnknown'])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user