PYTHON-1119 - Implement the client metadata capture specification.
This commit is contained in:
parent
5afd19922a
commit
c9bbfa7ace
@ -110,6 +110,7 @@ def _parse_pool_options(options):
|
||||
wait_queue_timeout = options.get('waitqueuetimeoutms')
|
||||
wait_queue_multiple = options.get('waitqueuemultiple')
|
||||
event_listeners = options.get('event_listeners')
|
||||
appname = options.get('appname')
|
||||
ssl_context, ssl_match_hostname = _parse_ssl_options(options)
|
||||
return PoolOptions(max_pool_size,
|
||||
min_pool_size,
|
||||
@ -117,7 +118,8 @@ def _parse_pool_options(options):
|
||||
connect_timeout, socket_timeout,
|
||||
wait_queue_timeout, wait_queue_multiple,
|
||||
ssl_context, ssl_match_hostname, socket_keepalive,
|
||||
_EventListeners(event_listeners))
|
||||
_EventListeners(event_listeners),
|
||||
appname)
|
||||
|
||||
|
||||
class ClientOptions(object):
|
||||
|
||||
@ -398,6 +398,17 @@ def validate_is_document_type(option, value):
|
||||
"collections.MutableMapping" % (option,))
|
||||
|
||||
|
||||
def validate_appname_or_none(option, value):
|
||||
"""Validate the appname option."""
|
||||
if value is None:
|
||||
return value
|
||||
validate_string(option, value)
|
||||
# We need length in bytes, so encode utf8 first.
|
||||
if len(value.encode('utf-8')) > 128:
|
||||
raise ValueError("%s must be <= 128 bytes" % (option,))
|
||||
return value
|
||||
|
||||
|
||||
def validate_ok_for_replace(replacement):
|
||||
"""Validate a replacement document."""
|
||||
validate_is_mapping("replacement", replacement)
|
||||
@ -451,6 +462,7 @@ URI_VALIDATORS = {
|
||||
'uuidrepresentation': validate_uuid_representation,
|
||||
'connect': validate_boolean_or_string,
|
||||
'minpoolsize': validate_non_negative_integer,
|
||||
'appname': validate_appname_or_none,
|
||||
}
|
||||
|
||||
TIMEOUT_VALIDATORS = {
|
||||
|
||||
@ -162,6 +162,11 @@ class MongoClient(common.BaseObject):
|
||||
- `heartbeatFrequencyMS`: (optional) The number of milliseconds
|
||||
between periodic server checks, or None to accept the default
|
||||
frequency of 10 seconds.
|
||||
- `appname`: (string or None) The name of the application that
|
||||
created this MongoClient instance. MongoDB 3.4 and newer will
|
||||
print this value in the server log upon establishing each
|
||||
connection. It is also recorded in the slow query log and
|
||||
profile collections.
|
||||
- `event_listeners`: a list or tuple of event listeners. See
|
||||
:mod:`~pymongo.monitoring` for details.
|
||||
|
||||
|
||||
@ -111,11 +111,16 @@ class Monitor(object):
|
||||
# server's pool. If a server was once connected, change its type
|
||||
# to Unknown only after retrying once.
|
||||
address = self._server_description.address
|
||||
retry = self._server_description.server_type != SERVER_TYPE.Unknown
|
||||
retry = True
|
||||
metadata = None
|
||||
if self._server_description.server_type == SERVER_TYPE.Unknown:
|
||||
retry = False
|
||||
metadata = self._pool.opts.metadata
|
||||
|
||||
start = _time()
|
||||
try:
|
||||
return self._check_once()
|
||||
# If the server type is unknown, send metadata with first check.
|
||||
return self._check_once(metadata=metadata)
|
||||
except ReferenceError:
|
||||
raise
|
||||
except Exception as error:
|
||||
@ -144,7 +149,7 @@ class Monitor(object):
|
||||
self._avg_round_trip_time.reset()
|
||||
return default
|
||||
|
||||
def _check_once(self):
|
||||
def _check_once(self, metadata=None):
|
||||
"""A single attempt to call ismaster.
|
||||
|
||||
Returns a ServerDescription, or raises an exception.
|
||||
@ -153,7 +158,8 @@ class Monitor(object):
|
||||
if self._publish:
|
||||
self._listeners.publish_server_heartbeat_started(address)
|
||||
with self._pool.get_socket({}) as sock_info:
|
||||
response, round_trip_time = self._check_with_socket(sock_info)
|
||||
response, round_trip_time = self._check_with_socket(
|
||||
sock_info, metadata=metadata)
|
||||
self._avg_round_trip_time.add_sample(round_trip_time)
|
||||
sd = ServerDescription(
|
||||
address=address,
|
||||
@ -165,14 +171,17 @@ class Monitor(object):
|
||||
|
||||
return sd
|
||||
|
||||
def _check_with_socket(self, sock_info):
|
||||
def _check_with_socket(self, sock_info, metadata=None):
|
||||
"""Return (IsMaster, round_trip_time).
|
||||
|
||||
Can raise ConnectionFailure or OperationFailure.
|
||||
"""
|
||||
cmd = {'ismaster': 1}
|
||||
if metadata is not None:
|
||||
cmd['client'] = metadata
|
||||
start = _time()
|
||||
request_id, msg, max_doc_size = message.query(
|
||||
0, 'admin.$cmd', 0, -1, {'ismaster': 1},
|
||||
0, 'admin.$cmd', 0, -1, cmd,
|
||||
None, DEFAULT_CODEC_OPTIONS)
|
||||
|
||||
# TODO: use sock_info.command()
|
||||
|
||||
@ -15,11 +15,12 @@
|
||||
import contextlib
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
|
||||
from bson import DEFAULT_CODEC_OPTIONS
|
||||
from bson.py3compat import itervalues
|
||||
from pymongo import auth, helpers, thread_util
|
||||
from pymongo import auth, helpers, thread_util, __version__
|
||||
from pymongo.common import MAX_MESSAGE_SIZE
|
||||
from pymongo.errors import (AutoReconnect,
|
||||
ConnectionFailure,
|
||||
@ -38,6 +39,13 @@ from pymongo.read_preferences import ReadPreference
|
||||
from pymongo.server_type import SERVER_TYPE
|
||||
|
||||
|
||||
_METADATA = {
|
||||
'driver': {'name': 'PyMongo', 'version': __version__},
|
||||
'os': {'type': sys.platform},
|
||||
'platform': sys.version
|
||||
}
|
||||
|
||||
|
||||
# If the first getaddrinfo call of this interpreter's life is on a thread,
|
||||
# while the main thread holds the import lock, getaddrinfo deadlocks trying
|
||||
# to import the IDNA codec. Import it here, where presumably we're on the
|
||||
@ -71,14 +79,14 @@ class PoolOptions(object):
|
||||
'__connect_timeout', '__socket_timeout',
|
||||
'__wait_queue_timeout', '__wait_queue_multiple',
|
||||
'__ssl_context', '__ssl_match_hostname', '__socket_keepalive',
|
||||
'__event_listeners')
|
||||
'__event_listeners', '__metadata')
|
||||
|
||||
def __init__(self, max_pool_size=100, min_pool_size=0,
|
||||
max_idle_time_ms=None, connect_timeout=None,
|
||||
socket_timeout=None, wait_queue_timeout=None,
|
||||
wait_queue_multiple=None, ssl_context=None,
|
||||
ssl_match_hostname=True, socket_keepalive=False,
|
||||
event_listeners=None):
|
||||
event_listeners=None, appname=None):
|
||||
|
||||
self.__max_pool_size = max_pool_size
|
||||
self.__min_pool_size = min_pool_size
|
||||
@ -91,6 +99,9 @@ class PoolOptions(object):
|
||||
self.__ssl_match_hostname = ssl_match_hostname
|
||||
self.__socket_keepalive = socket_keepalive
|
||||
self.__event_listeners = event_listeners
|
||||
self.__metadata = _METADATA.copy()
|
||||
if appname:
|
||||
self.__metadata['application'] = {'name': appname}
|
||||
|
||||
@property
|
||||
def max_pool_size(self):
|
||||
@ -173,6 +184,12 @@ class PoolOptions(object):
|
||||
"""
|
||||
return self.__event_listeners
|
||||
|
||||
@property
|
||||
def metadata(self):
|
||||
"""A dict of metadata about the application, driver, os, and platform.
|
||||
"""
|
||||
return self.__metadata.copy()
|
||||
|
||||
|
||||
class SocketInfo(object):
|
||||
"""Store a socket with some metadata.
|
||||
@ -542,10 +559,14 @@ class Pool:
|
||||
try:
|
||||
sock = _configured_socket(self.address, self.opts)
|
||||
if self.handshake:
|
||||
ismaster = IsMaster(command(sock, 'admin', {'ismaster': 1},
|
||||
False, False,
|
||||
ReadPreference.PRIMARY,
|
||||
DEFAULT_CODEC_OPTIONS))
|
||||
ismaster = IsMaster(
|
||||
command(sock,
|
||||
'admin',
|
||||
{'ismaster': 1, 'client': self.opts.metadata},
|
||||
False,
|
||||
False,
|
||||
ReadPreference.PRIMARY,
|
||||
DEFAULT_CODEC_OPTIONS))
|
||||
else:
|
||||
ismaster = None
|
||||
return SocketInfo(sock, self, ismaster, self.address)
|
||||
|
||||
@ -76,7 +76,7 @@ class MockMonitor(Monitor):
|
||||
pool,
|
||||
topology_settings)
|
||||
|
||||
def _check_once(self):
|
||||
def _check_once(self, metadata=None):
|
||||
address = self._server_description.address
|
||||
response, rtt = self.client.mock_is_master('%s:%d' % address)
|
||||
return ServerDescription(address, IsMaster(response), rtt)
|
||||
|
||||
@ -43,7 +43,7 @@ from pymongo.errors import (AutoReconnect,
|
||||
from pymongo.monitoring import (ServerHeartbeatListener,
|
||||
ServerHeartbeatStartedEvent)
|
||||
from pymongo.mongo_client import MongoClient
|
||||
from pymongo.pool import SocketInfo
|
||||
from pymongo.pool import SocketInfo, _METADATA
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
from pymongo.server_selectors import (any_server_selector,
|
||||
writable_server_selector)
|
||||
@ -195,6 +195,20 @@ class ClientUnitTest(unittest.TestCase):
|
||||
MongoClient('mongodb://host/?'
|
||||
'readpreference=primary&readpreferencetags=dc:east')
|
||||
|
||||
def test_metadata(self):
|
||||
metadata = _METADATA.copy()
|
||||
metadata['application'] = {'name': 'foobar'}
|
||||
client = MongoClient(
|
||||
"mongodb://foo:27017/?appname=foobar&connect=false")
|
||||
options = client._MongoClient__options
|
||||
self.assertEqual(options.pool_options.metadata, metadata)
|
||||
client = MongoClient('foo', 27017, appname='foobar', connect=False)
|
||||
options = client._MongoClient__options
|
||||
self.assertEqual(options.pool_options.metadata, metadata)
|
||||
# No error
|
||||
MongoClient(appname='x' * 128)
|
||||
self.assertRaises(ValueError, MongoClient, appname='x' * 129)
|
||||
|
||||
|
||||
class TestClient(IntegrationTest):
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@ from pymongo import monitoring
|
||||
from pymongo.errors import ConnectionFailure
|
||||
from pymongo.ismaster import IsMaster
|
||||
from pymongo.monitor import Monitor
|
||||
from pymongo.pool import PoolOptions
|
||||
from test import unittest, client_knobs
|
||||
from test.utils import HeartbeatEventListener, single_client, wait_until
|
||||
|
||||
@ -44,6 +45,7 @@ class MockPool(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.pool_id = 0
|
||||
self._lock = threading.Lock()
|
||||
self.opts = PoolOptions()
|
||||
|
||||
def get_socket(self, all_credentials):
|
||||
return MockSocketInfo()
|
||||
@ -75,7 +77,7 @@ class TestHeartbeatMonitoring(unittest.TestCase):
|
||||
min_heartbeat_interval=0.1,
|
||||
events_queue_frequency=0.1):
|
||||
class MockMonitor(Monitor):
|
||||
def _check_with_socket(self, sock_info):
|
||||
def _check_with_socket(self, sock_info, metadata=None):
|
||||
if isinstance(responses[1], Exception):
|
||||
raise responses[1]
|
||||
return IsMaster(responses[1]), 99
|
||||
|
||||
@ -55,6 +55,7 @@ class MockPool(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.pool_id = 0
|
||||
self._lock = threading.Lock()
|
||||
self.opts = PoolOptions()
|
||||
|
||||
def get_socket(self, all_credentials):
|
||||
return MockSocketInfo()
|
||||
@ -238,7 +239,7 @@ class TestSingleServerTopology(TopologyTest):
|
||||
available = True
|
||||
|
||||
class TestMonitor(Monitor):
|
||||
def _check_with_socket(self, sock_info):
|
||||
def _check_with_socket(self, sock_info, metadata=None):
|
||||
if available:
|
||||
return IsMaster({'ok': 1}), round_trip_time
|
||||
else:
|
||||
@ -541,7 +542,7 @@ class TestTopologyErrors(TopologyTest):
|
||||
ismaster_count = [0]
|
||||
|
||||
class TestMonitor(Monitor):
|
||||
def _check_with_socket(self, sock_info):
|
||||
def _check_with_socket(self, sock_info, metadata=None):
|
||||
ismaster_count[0] += 1
|
||||
if ismaster_count[0] == 1:
|
||||
return IsMaster({'ok': 1}), 0
|
||||
@ -562,7 +563,7 @@ class TestTopologyErrors(TopologyTest):
|
||||
ismaster_count = [0]
|
||||
|
||||
class TestMonitor(Monitor):
|
||||
def _check_with_socket(self, sock_info):
|
||||
def _check_with_socket(self, sock_info, metadata=None):
|
||||
ismaster_count[0] += 1
|
||||
if ismaster_count[0] in (1, 3):
|
||||
return IsMaster({'ok': 1}), 0
|
||||
@ -584,7 +585,7 @@ class TestTopologyErrors(TopologyTest):
|
||||
exception = AssertionError('internal error')
|
||||
|
||||
class TestMonitor(Monitor):
|
||||
def _check_with_socket(self, sock_info):
|
||||
def _check_with_socket(self, sock_info, metadata=None):
|
||||
raise exception
|
||||
|
||||
t = create_mock_topology(monitor_class=TestMonitor)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user