largely untested attempt at pooling as described on wiki. no auth support yet though
This commit is contained in:
parent
ef429d86d4
commit
35a34461b8
@ -104,14 +104,14 @@ class Collection(object):
|
||||
"""
|
||||
return self.__collection_name
|
||||
|
||||
def _send_message(self, operation, data, sock=None):
|
||||
def _send_message(self, operation, data):
|
||||
"""Wrap up a message and send it.
|
||||
"""
|
||||
# reserved int, full collection name, message data
|
||||
message = _ZERO
|
||||
message += bson._make_c_string(self.full_name())
|
||||
message += data
|
||||
return self.__database.connection()._send_message(operation, message, sock=sock)
|
||||
return self.__database.connection()._send_message(operation, message)
|
||||
|
||||
def database(self):
|
||||
"""Get the database that this collection is a part of.
|
||||
|
||||
@ -35,45 +35,66 @@ _TIMEOUT = 20.0
|
||||
class Connection(object):
|
||||
"""A connection to Mongo.
|
||||
"""
|
||||
def __init__(self, host="localhost", port=27017, pool_size=1, _connect=True):
|
||||
def __init__(self, host="localhost", port=27017, options={}, _connect=True):
|
||||
"""Open a new connection to a Mongo instance at host:port.
|
||||
|
||||
The resultant connection object has connection-pooling built in. The
|
||||
maximum size of the pool is given by `pool_size`.
|
||||
The resultant connection object has connection-pooling built in.
|
||||
|
||||
Raises TypeError if host is not an instance of string or port is not an
|
||||
instance of int. Raises ConnectionFailure if the connection cannot be
|
||||
made. Raises TypeError if `pool_size` is not an instance of int. Raises
|
||||
ValueError if `pool_size` is less than or equal to zero.
|
||||
made.
|
||||
|
||||
The keyword argument `options` is a dictionary used for specifying
|
||||
additional connection level options. Valid options are:
|
||||
|
||||
- pool_size (default=1): maximum size of the connection's built in
|
||||
connection pool - must be greater than or equal to one.
|
||||
- auto_start_request (default=True): automatically start a request
|
||||
on every operation - see documentation for `start_request`.
|
||||
|
||||
:Parameters:
|
||||
- `host` (optional): hostname or IPv4 address of the instance to
|
||||
connect to
|
||||
- `port` (optional): port number on which to connect
|
||||
- `pool_size` (optional): maximum size of built in connection pool
|
||||
- `options` (optional): dictionary of connection options
|
||||
"""
|
||||
if not isinstance(host, types.StringType):
|
||||
raise TypeError("host must be an instance of str")
|
||||
if not isinstance(port, types.IntType):
|
||||
raise TypeError("port must be an instance of int")
|
||||
if not isinstance(pool_size, types.IntType):
|
||||
raise TypeError("pool_size must be an instance of int")
|
||||
if pool_size <= 0:
|
||||
raise ValueError("pool_size must be positive")
|
||||
if not isinstance(options, types.DictType):
|
||||
raise TypeError("options must be an instance of dict")
|
||||
|
||||
# host and port for the master
|
||||
self.__host = None
|
||||
self.__port = None
|
||||
|
||||
self.__nodes = [(host, port)]
|
||||
|
||||
# current request_id
|
||||
self.__id = 1
|
||||
self.__id_lock = threading.Lock()
|
||||
|
||||
self.__cursor_manager = CursorManager(self)
|
||||
|
||||
self.__pool_size = options.get("pool_size", 1)
|
||||
if not isinstance(self.__pool_size, types.IntType):
|
||||
raise TypeError("pool_size must be an instance of int")
|
||||
if self.__pool_size <= 0:
|
||||
raise ValueError("pool_size must be positive")
|
||||
self.__auto_start_request = options.get("auto_start_request", True)
|
||||
|
||||
# map from threads to sockets
|
||||
self.__thread_map = {}
|
||||
self.__pool_size = pool_size
|
||||
self.__locks = [threading.Lock() for _ in range(pool_size)]
|
||||
self.__sockets = [None for _ in range(pool_size)]
|
||||
|
||||
# count of how many threads are mapped to each socket
|
||||
self.__thread_count = [0 for _ in range(self.__pool_size)]
|
||||
|
||||
# locks to be used around socket-level operations
|
||||
self.__locks = [threading.Lock() for _ in range(self.__pool_size)]
|
||||
|
||||
# sockets that make up the pool
|
||||
self.__sockets = [None for _ in range(self.__pool_size)]
|
||||
|
||||
if _connect:
|
||||
self.__find_master()
|
||||
@ -99,7 +120,7 @@ class Connection(object):
|
||||
self.__find_master()
|
||||
|
||||
@classmethod
|
||||
def paired(cls, left, right=("localhost", 27017)):
|
||||
def paired(cls, left, right=("localhost", 27017), options={}):
|
||||
"""Open a new paired connection to Mongo.
|
||||
|
||||
Raises TypeError if either `left` or `right` is not a tuple of the form
|
||||
@ -108,21 +129,24 @@ class Connection(object):
|
||||
:Parameters:
|
||||
- `left`: (host, port) pair for the left Mongo instance
|
||||
- `right` (optional): (host, port) pair for the right Mongo instance
|
||||
- `options` (optional): dictionary of connection options - see
|
||||
`__init__` documentation for information on what options are
|
||||
available
|
||||
"""
|
||||
left = list(left)
|
||||
left.append(False) # _connect
|
||||
connection = cls(*left)
|
||||
connection = cls(*left, options=options)
|
||||
connection.__pair_with(*right)
|
||||
return connection
|
||||
|
||||
def __increment_id(self):
|
||||
self.__id_lock.acquire(1)
|
||||
self.__id_lock.acquire()
|
||||
result = self.__id
|
||||
self.__id += 1
|
||||
self.__id_lock.release()
|
||||
return result
|
||||
|
||||
def _master(self, sock):
|
||||
def __master(self, sock):
|
||||
"""Get the hostname and port of the master Mongo instance.
|
||||
|
||||
Return a tuple (host, port).
|
||||
@ -162,7 +186,7 @@ class Connection(object):
|
||||
_logger.debug("trying %r:%r" % (host, port))
|
||||
try:
|
||||
sock.connect((host, port))
|
||||
master = self._master(sock)
|
||||
master = self.__master(sock)
|
||||
if master is True:
|
||||
self.__host = host
|
||||
self.__port = port
|
||||
@ -219,32 +243,36 @@ class Connection(object):
|
||||
|
||||
self.__cursor_manager = manager
|
||||
|
||||
def _send_message(self, operation, data, sock=None):
|
||||
"""Say something to Mongo.
|
||||
|
||||
Raises ConnectionFailure if the message cannot be sent. Returns the
|
||||
request id of the sent message.
|
||||
|
||||
:Parameters:
|
||||
- `operation`: opcode of the message
|
||||
- `data`: data to send
|
||||
- `sock`: socket on which to send the message (as returned by
|
||||
`_acquire_socket`, or None
|
||||
def __pick_and_acquire_socket(self):
|
||||
"""Acquire a socket to use for synchronous send and receive operations.
|
||||
"""
|
||||
if sock is None:
|
||||
thread = threading.current_thread()
|
||||
if thread in self.__thread_map:
|
||||
sock = self.__thread_map[thread]
|
||||
else:
|
||||
sock = random.randint(0, self.__pool_size - 1)
|
||||
choices = range(self.__pool_size)
|
||||
choices = sorted(choices, lambda x, y: cmp(self.__thread_count[x],
|
||||
self.__thread_count[y]))
|
||||
|
||||
if isinstance(sock, types.IntType):
|
||||
if self.__sockets[sock] is None:
|
||||
self.__connect(sock)
|
||||
sock = self.__sockets[sock]
|
||||
elif not isinstance(sock, socket.socket):
|
||||
raise TypeError("sock must be a socket id or instance")
|
||||
for choice in choices:
|
||||
if self.__locks[choice].acquire(False):
|
||||
return choice
|
||||
|
||||
self.__locks[choices[0]].acquire()
|
||||
return choices[0]
|
||||
|
||||
def __get_socket(self):
|
||||
thread = threading.current_thread()
|
||||
if self.__thread_map.get(thread, -1) >= 0:
|
||||
sock = self.__thread_map[thread]
|
||||
self.__locks[sock].acquire()
|
||||
else:
|
||||
sock = self.__pick_and_acquire_socket()
|
||||
if self.__auto_start_request or thread in self.__thread_map:
|
||||
self.__thread_map[thread] = sock
|
||||
self.__thread_count[sock] += 1
|
||||
|
||||
if not self.__sockets[sock]:
|
||||
self.__connect(sock)
|
||||
return sock
|
||||
|
||||
def __send_message_on_socket(self, operation, data, sock):
|
||||
# header
|
||||
request_id = self.__increment_id()
|
||||
to_send = struct.pack("<i", 16 + len(data))
|
||||
@ -263,24 +291,22 @@ class Connection(object):
|
||||
|
||||
return request_id
|
||||
|
||||
def _receive_message(self, sock, operation, request_id):
|
||||
"""Receive a message from Mongo.
|
||||
def _send_message(self, operation, data):
|
||||
"""Say something to Mongo.
|
||||
|
||||
Returns the message body. Asserts that the message uses the given opcode
|
||||
and request id. Calls to receive_message and send_message should be done
|
||||
synchronously.
|
||||
Raises ConnectionFailure if the message cannot be sent. Returns the
|
||||
request id of the sent message.
|
||||
|
||||
:Parameters:
|
||||
- `sock`: socket on which to receive the message (as returned by
|
||||
`acquire_socket`.
|
||||
- `operation`: opcode of the message
|
||||
- `request_id`: request id that the message should be in response to
|
||||
- `data`: data to send
|
||||
"""
|
||||
if isinstance(sock, types.IntType):
|
||||
sock = self.__sockets[sock]
|
||||
elif not isinstance(sock, socket.socket):
|
||||
raise TypeError("sock must be a socket id or instance")
|
||||
sock_number = self.__get_socket()
|
||||
sock = self.__sockets[sock_number]
|
||||
self.__send_message_on_socket(operation, data, sock)
|
||||
self.__locks[sock_number].release()
|
||||
|
||||
def __receive_message_on_socket(self, operation, request_id, sock):
|
||||
def receive(length):
|
||||
message = ""
|
||||
while len(message) < length:
|
||||
@ -297,6 +323,44 @@ class Connection(object):
|
||||
|
||||
return receive(length - 16)
|
||||
|
||||
__hack_socket_lock = threading.Lock()
|
||||
def _receive_message(self, operation, data, _sock=None):
|
||||
"""Receive a message from Mongo.
|
||||
|
||||
Sends the given message and returns the response.
|
||||
|
||||
:Parameters:
|
||||
- `operation`: opcode of the message to send
|
||||
- `data`: data to send
|
||||
"""
|
||||
# hack so we can do find_master on a specific socket...
|
||||
if _sock:
|
||||
self.__hack_socket_lock.acquire()
|
||||
try:
|
||||
request_id = self.__send_message_on_socket(operation, data, _sock)
|
||||
result = self.__receive_message_on_socket(1, request_id, _sock)
|
||||
return result
|
||||
finally:
|
||||
self.__hack_socket_lock.release()
|
||||
|
||||
sock_number = self.__get_socket()
|
||||
sock = self.__sockets[sock_number]
|
||||
request_id = self.__send_message_on_socket(operation, data, sock)
|
||||
result = self.__receive_message_on_socket(1, request_id, sock)
|
||||
self.__locks[sock_number].release()
|
||||
return result
|
||||
|
||||
def start_request(self):
|
||||
if not self.__auto_start_request:
|
||||
self.end_request()
|
||||
self.__thread_map[threading.current_thread()] = -1
|
||||
|
||||
def end_request(self):
|
||||
thread = threading.current_thread()
|
||||
if self.__thread_map.get(thread, -1) >= 0:
|
||||
sock_number = self.__thread_map.pop(thread)
|
||||
self.__thread_count[sock_number] -= 1
|
||||
|
||||
def __cmp__(self, other):
|
||||
if isinstance(other, Connection):
|
||||
return cmp((self.__host, self.__port), (other.__host, other.__port))
|
||||
@ -396,21 +460,3 @@ class Connection(object):
|
||||
|
||||
def next(self):
|
||||
raise TypeError("'Connection' object is not iterable")
|
||||
|
||||
def _acquire_socket(self):
|
||||
"""Acquire a socket to use for synchronous send and receive operations.
|
||||
"""
|
||||
thread = threading.current_thread()
|
||||
if thread in self.__thread_map:
|
||||
sock = self.__thread_map[thread]
|
||||
else:
|
||||
sock = random.randint(0, self.__pool_size - 1)
|
||||
self.__thread_map[thread] = sock
|
||||
|
||||
self.__locks[sock].acquire(1)
|
||||
return sock
|
||||
|
||||
def _release_socket(self, socket_number):
|
||||
"""Release a socket that was acquired using `_acquire_socket`.
|
||||
"""
|
||||
self.__locks[socket_number].release()
|
||||
|
||||
@ -25,6 +25,7 @@ from code import Code
|
||||
from errors import InvalidOperation, OperationFailure
|
||||
|
||||
_query_lock = Lock()
|
||||
_ZERO = "\x00\x00\x00\x00"
|
||||
|
||||
class Cursor(object):
|
||||
"""A cursor / iterator over Mongo query results.
|
||||
@ -234,16 +235,7 @@ class Cursor(object):
|
||||
return len(self.__data)
|
||||
|
||||
def send_message(operation, message):
|
||||
if self.__socket is None:
|
||||
socket_number = self.__collection.database().connection()._acquire_socket()
|
||||
request_id = self.__collection._send_message(operation, message, sock=socket_number)
|
||||
response = self.__collection.database().connection()._receive_message(socket_number, 1, request_id)
|
||||
self.__collection.database().connection()._release_socket(socket_number)
|
||||
else:
|
||||
_query_lock.acquire(1)
|
||||
request_id = self.__collection._send_message(operation, message, sock=self.__socket)
|
||||
response = self.__collection.database().connection()._receive_message(self.__socket, 1, request_id)
|
||||
_query_lock.release()
|
||||
response = self.__collection.database().connection()._receive_message(operation, message, _sock=self.__socket)
|
||||
|
||||
response_flag = struct.unpack("<i", response[:4])[0]
|
||||
if response_flag == 1:
|
||||
@ -262,9 +254,11 @@ class Cursor(object):
|
||||
self.__data = bson._to_dicts(response[20:])
|
||||
assert len(self.__data) == number_returned
|
||||
|
||||
message = _ZERO
|
||||
message += bson._make_c_string(self.__collection.full_name())
|
||||
if self.__id is None:
|
||||
# Query
|
||||
message = struct.pack("<i", self.__skip)
|
||||
message += struct.pack("<i", self.__skip)
|
||||
message += struct.pack("<i", -self.__limit)
|
||||
message += bson.BSON.from_dict(self.__query_spec())
|
||||
if self.__fields:
|
||||
@ -283,7 +277,7 @@ class Cursor(object):
|
||||
self.__killed = True
|
||||
return 0
|
||||
|
||||
message = struct.pack("<i", -limit)
|
||||
message += struct.pack("<i", -limit)
|
||||
message += struct.pack("<q", self.__id)
|
||||
|
||||
send_message(2005, message)
|
||||
|
||||
@ -25,16 +25,21 @@ class TestPooling(unittest.TestCase):
|
||||
self.host = os.environ.get("DB_IP", "localhost")
|
||||
self.port = int(os.environ.get("DB_PORT", 27017))
|
||||
default_connection = Connection(self.host, self.port)
|
||||
pooled_connection = Connection(self.host, self.port, 10)
|
||||
pooled_connection = Connection(self.host, self.port, {"pool_size": 10})
|
||||
self.default_db = default_connection["pymongo_test"]
|
||||
self.pooled_db = pooled_connection["pymongo_test"]
|
||||
|
||||
def test_exceptions(self):
|
||||
self.assertRaises(TypeError, Connection, self.host, self.port, None)
|
||||
self.assertRaises(TypeError, Connection, self.host, self.port, "one")
|
||||
self.assertRaises(TypeError, Connection, self.host, self.port, [])
|
||||
self.assertRaises(ValueError, Connection, self.host, self.port, -10)
|
||||
self.assertRaises(ValueError, Connection, self.host, self.port, 0)
|
||||
self.assertRaises(TypeError, Connection, self.host, self.port,
|
||||
{"pool_size": None})
|
||||
self.assertRaises(TypeError, Connection, self.host, self.port,
|
||||
{"pool_size": "one"})
|
||||
self.assertRaises(TypeError, Connection, self.host, self.port,
|
||||
{"pool_size": []})
|
||||
self.assertRaises(ValueError, Connection, self.host, self.port,
|
||||
{"pool_size": -10})
|
||||
self.assertRaises(ValueError, Connection, self.host, self.port,
|
||||
{"pool_size": 0})
|
||||
|
||||
# TODO more tests for this!
|
||||
# test auth support
|
||||
|
||||
Loading…
Reference in New Issue
Block a user