From 35a34461b8a5b44fe4671ce5ca166cb16f2e45cd Mon Sep 17 00:00:00 2001 From: Mike Dirolf Date: Wed, 11 Feb 2009 17:50:27 -0500 Subject: [PATCH] largely untested attempt at pooling as described on wiki. no auth support yet though --- pymongo/collection.py | 4 +- pymongo/connection.py | 188 ++++++++++++++++++++++++++---------------- pymongo/cursor.py | 18 ++-- test/test_pooling.py | 17 ++-- 4 files changed, 136 insertions(+), 91 deletions(-) diff --git a/pymongo/collection.py b/pymongo/collection.py index 8aceb8da8..50b25ef6f 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -104,14 +104,14 @@ class Collection(object): """ return self.__collection_name - def _send_message(self, operation, data, sock=None): + def _send_message(self, operation, data): """Wrap up a message and send it. """ # reserved int, full collection name, message data message = _ZERO message += bson._make_c_string(self.full_name()) message += data - return self.__database.connection()._send_message(operation, message, sock=sock) + return self.__database.connection()._send_message(operation, message) def database(self): """Get the database that this collection is a part of. diff --git a/pymongo/connection.py b/pymongo/connection.py index 24338c3d5..b3f81b0b7 100644 --- a/pymongo/connection.py +++ b/pymongo/connection.py @@ -35,45 +35,66 @@ _TIMEOUT = 20.0 class Connection(object): """A connection to Mongo. """ - def __init__(self, host="localhost", port=27017, pool_size=1, _connect=True): + def __init__(self, host="localhost", port=27017, options={}, _connect=True): """Open a new connection to a Mongo instance at host:port. - The resultant connection object has connection-pooling built in. The - maximum size of the pool is given by `pool_size`. + The resultant connection object has connection-pooling built in. Raises TypeError if host is not an instance of string or port is not an instance of int. Raises ConnectionFailure if the connection cannot be - made. Raises TypeError if `pool_size` is not an instance of int. Raises - ValueError if `pool_size` is less than or equal to zero. + made. + + The keyword argument `options` is a dictionary used for specifying + additional connection level options. Valid options are: + + - pool_size (default=1): maximum size of the connection's built in + connection pool - must be greater than or equal to one. + - auto_start_request (default=True): automatically start a request + on every operation - see documentation for `start_request`. :Parameters: - `host` (optional): hostname or IPv4 address of the instance to connect to - `port` (optional): port number on which to connect - - `pool_size` (optional): maximum size of built in connection pool + - `options` (optional): dictionary of connection options """ if not isinstance(host, types.StringType): raise TypeError("host must be an instance of str") if not isinstance(port, types.IntType): raise TypeError("port must be an instance of int") - if not isinstance(pool_size, types.IntType): - raise TypeError("pool_size must be an instance of int") - if pool_size <= 0: - raise ValueError("pool_size must be positive") + if not isinstance(options, types.DictType): + raise TypeError("options must be an instance of dict") # host and port for the master self.__host = None self.__port = None self.__nodes = [(host, port)] + + # current request_id self.__id = 1 self.__id_lock = threading.Lock() + self.__cursor_manager = CursorManager(self) + self.__pool_size = options.get("pool_size", 1) + if not isinstance(self.__pool_size, types.IntType): + raise TypeError("pool_size must be an instance of int") + if self.__pool_size <= 0: + raise ValueError("pool_size must be positive") + self.__auto_start_request = options.get("auto_start_request", True) + + # map from threads to sockets self.__thread_map = {} - self.__pool_size = pool_size - self.__locks = [threading.Lock() for _ in range(pool_size)] - self.__sockets = [None for _ in range(pool_size)] + + # count of how many threads are mapped to each socket + self.__thread_count = [0 for _ in range(self.__pool_size)] + + # locks to be used around socket-level operations + self.__locks = [threading.Lock() for _ in range(self.__pool_size)] + + # sockets that make up the pool + self.__sockets = [None for _ in range(self.__pool_size)] if _connect: self.__find_master() @@ -99,7 +120,7 @@ class Connection(object): self.__find_master() @classmethod - def paired(cls, left, right=("localhost", 27017)): + def paired(cls, left, right=("localhost", 27017), options={}): """Open a new paired connection to Mongo. Raises TypeError if either `left` or `right` is not a tuple of the form @@ -108,21 +129,24 @@ class Connection(object): :Parameters: - `left`: (host, port) pair for the left Mongo instance - `right` (optional): (host, port) pair for the right Mongo instance + - `options` (optional): dictionary of connection options - see + `__init__` documentation for information on what options are + available """ left = list(left) left.append(False) # _connect - connection = cls(*left) + connection = cls(*left, options=options) connection.__pair_with(*right) return connection def __increment_id(self): - self.__id_lock.acquire(1) + self.__id_lock.acquire() result = self.__id self.__id += 1 self.__id_lock.release() return result - def _master(self, sock): + def __master(self, sock): """Get the hostname and port of the master Mongo instance. Return a tuple (host, port). @@ -162,7 +186,7 @@ class Connection(object): _logger.debug("trying %r:%r" % (host, port)) try: sock.connect((host, port)) - master = self._master(sock) + master = self.__master(sock) if master is True: self.__host = host self.__port = port @@ -219,32 +243,36 @@ class Connection(object): self.__cursor_manager = manager - def _send_message(self, operation, data, sock=None): - """Say something to Mongo. - - Raises ConnectionFailure if the message cannot be sent. Returns the - request id of the sent message. - - :Parameters: - - `operation`: opcode of the message - - `data`: data to send - - `sock`: socket on which to send the message (as returned by - `_acquire_socket`, or None + def __pick_and_acquire_socket(self): + """Acquire a socket to use for synchronous send and receive operations. """ - if sock is None: - thread = threading.current_thread() - if thread in self.__thread_map: - sock = self.__thread_map[thread] - else: - sock = random.randint(0, self.__pool_size - 1) + choices = range(self.__pool_size) + choices = sorted(choices, lambda x, y: cmp(self.__thread_count[x], + self.__thread_count[y])) - if isinstance(sock, types.IntType): - if self.__sockets[sock] is None: - self.__connect(sock) - sock = self.__sockets[sock] - elif not isinstance(sock, socket.socket): - raise TypeError("sock must be a socket id or instance") + for choice in choices: + if self.__locks[choice].acquire(False): + return choice + self.__locks[choices[0]].acquire() + return choices[0] + + def __get_socket(self): + thread = threading.current_thread() + if self.__thread_map.get(thread, -1) >= 0: + sock = self.__thread_map[thread] + self.__locks[sock].acquire() + else: + sock = self.__pick_and_acquire_socket() + if self.__auto_start_request or thread in self.__thread_map: + self.__thread_map[thread] = sock + self.__thread_count[sock] += 1 + + if not self.__sockets[sock]: + self.__connect(sock) + return sock + + def __send_message_on_socket(self, operation, data, sock): # header request_id = self.__increment_id() to_send = struct.pack("= 0: + sock_number = self.__thread_map.pop(thread) + self.__thread_count[sock_number] -= 1 + def __cmp__(self, other): if isinstance(other, Connection): return cmp((self.__host, self.__port), (other.__host, other.__port)) @@ -396,21 +460,3 @@ class Connection(object): def next(self): raise TypeError("'Connection' object is not iterable") - - def _acquire_socket(self): - """Acquire a socket to use for synchronous send and receive operations. - """ - thread = threading.current_thread() - if thread in self.__thread_map: - sock = self.__thread_map[thread] - else: - sock = random.randint(0, self.__pool_size - 1) - self.__thread_map[thread] = sock - - self.__locks[sock].acquire(1) - return sock - - def _release_socket(self, socket_number): - """Release a socket that was acquired using `_acquire_socket`. - """ - self.__locks[socket_number].release() diff --git a/pymongo/cursor.py b/pymongo/cursor.py index a738c1386..3188d71b6 100644 --- a/pymongo/cursor.py +++ b/pymongo/cursor.py @@ -25,6 +25,7 @@ from code import Code from errors import InvalidOperation, OperationFailure _query_lock = Lock() +_ZERO = "\x00\x00\x00\x00" class Cursor(object): """A cursor / iterator over Mongo query results. @@ -234,16 +235,7 @@ class Cursor(object): return len(self.__data) def send_message(operation, message): - if self.__socket is None: - socket_number = self.__collection.database().connection()._acquire_socket() - request_id = self.__collection._send_message(operation, message, sock=socket_number) - response = self.__collection.database().connection()._receive_message(socket_number, 1, request_id) - self.__collection.database().connection()._release_socket(socket_number) - else: - _query_lock.acquire(1) - request_id = self.__collection._send_message(operation, message, sock=self.__socket) - response = self.__collection.database().connection()._receive_message(self.__socket, 1, request_id) - _query_lock.release() + response = self.__collection.database().connection()._receive_message(operation, message, _sock=self.__socket) response_flag = struct.unpack("