mongo-python-driver/pymongo/connection.py
2009-01-22 17:13:32 -05:00

231 lines
7.6 KiB
Python

# Copyright 2009 10gen, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Low level connection to Mongo."""
import socket
import struct
import types
import traceback
from errors import ConnectionFailure, InvalidName, OperationFailure
from database import Database
from cursor_manager import CursorManager
class Connection(object):
"""A connection to Mongo.
"""
def __init__(self, host="localhost", port=27017):
"""Open a new connection to the database at host:port.
Raises TypeError if host is not an instance of string or port is not an
instance of int. Raises ConnectionFailure if the connection cannot be
made.
Arguments:
- `host` (optional): the hostname or IPv4 address of the database to
connect to
- `port` (optional): the port number on which to connect
"""
if not isinstance(host, types.StringType):
raise TypeError("host must be an instance of str")
if not isinstance(port, types.IntType):
raise TypeError("port must be an instance of int")
self.__host = host
self.__port = port
self.__id = 1
self.__cursor_manager = CursorManager(self)
self.__connect()
def set_cursor_manager(self, manager_class):
"""Set this connections cursor manager.
Raises TypeError if manager_class is not a subclass of CursorManager. A
cursor manager handles closing cursors. Different managers can implement
different policies in terms of when to actually kill a cursor that has
been closed.
"""
manager = manager_class(self)
if not isinstance(manager, CursorManager):
raise TypeError("manager_class must be a subclass of CursorManager")
self.__cursor_manager = manager
def host(self):
"""Get the connection host.
"""
return self.__host
def port(self):
"""Get the connection port.
"""
return self.__port
def __connect(self):
"""(Re-)connect to Mongo."""
try:
self.__socket = socket.socket()
self.__socket.connect((self.__host, self.__port))
except socket.error:
raise ConnectionFailure("could not connect to %s:%s, got: %s" %
(self.__host, self.__port,
traceback.format_exc()))
def _send_message(self, operation, data):
"""Say something to Mongo.
Raises ConnectionFailure if the message cannot be sent. Returns the
request id of the sent message.
Arguments:
- `operation`: the opcode of the message
- `data`: the data to send
"""
# header
to_send = struct.pack("<i", 16 + len(data))
to_send += struct.pack("<i", self.__id)
self.__id += 1
to_send += struct.pack("<i", 0) # responseTo
to_send += struct.pack("<i", operation)
to_send += data
total_sent = 0
while total_sent < len(to_send):
sent = self.__socket.send(to_send[total_sent:])
if sent == 0:
raise ConnectionFailure("connection closed")
total_sent += sent
return self.__id - 1
def _receive_message(self, operation, request_id):
"""Receive a message from Mongo.
Returns the message body. Asserts that the message uses the given opcode
and request id. Calls to receive_message and send_message should be done
synchronously.
Arguments:
- `operation`: the opcode of the message
- `request_id`: the request id that the message should be in response to
"""
def receive(length):
message = ""
while len(message) < length:
chunk = self.__socket.recv(length - len(message))
if chunk == "":
raise ConnectionFailure("connection closed")
message += chunk
return message
header = receive(16)
length = struct.unpack("<i", header[:4])[0]
assert request_id == struct.unpack("<i", header[8:12])[0]
assert operation == struct.unpack("<i", header[12:])[0]
return receive(length - 16)
def __cmp__(self, other):
if isinstance(other, Connection):
return cmp((self.__host, self.__port), (other.__host, other.__port))
return NotImplemented
def __repr__(self):
return "Connection(%r, %r)" % (self.__host, self.__port)
def __getattr__(self, name):
"""Get a database by name.
Raises InvalidName if an invalid database name is used.
Arguments:
- `name`: the name of the database to get
"""
return Database(self, name)
def __getitem__(self, name):
"""Get a database by name.
Raises InvalidName if an invalid database name is used.
Arguments:
- `name`: the name of the database to get
"""
return self.__getattr__(name)
def close_cursor(self, cursor_id):
"""Close a single database cursor.
Raises TypeError if cursor_id is not an instance of (int, long). What
closing the cursor actually means depends on this connection's cursor
manager.
Arguments:
- `cursor_id`: cursor id to close
"""
if not isinstance(cursor_id, (types.IntType, types.LongType)):
raise TypeError("cursor_id must be an instance of (int, long)")
self.__cursor_manager.close(cursor_id)
def kill_cursors(self, cursor_ids):
"""Kill database cursors with the given ids.
Raises TypeError if cursor_ids is not an instance of list.
Arguments:
- `cursor_ids`: list of cursor ids to kill
"""
if not isinstance(cursor_ids, types.ListType):
raise TypeError("cursor_ids must be a list")
message = "\x00\x00\x00\x00"
message += struct.pack("<i", len(cursor_ids))
for cursor_id in cursor_ids:
message += struct.pack("<q", cursor_id)
self._send_message(2007, message)
def __database_info(self):
"""Get a dictionary of (database_name: size_on_disk).
"""
result = self.admin._command({"listDatabases": 1})
if result["ok"] != 1:
raise OperationFailure("failed to get database names")
info = result["databases"]
return dict([(db["name"], db["sizeOnDisk"]) for db in info])
def database_names(self):
"""Get a list of all database names.
"""
return self.__database_info().keys()
def drop_database(self, name_or_database):
"""Drop a database.
Arguments:
- `name_or_database`: the name of a database to drop or the object
itself
"""
name = name_or_database
if isinstance(name, Database):
name = name.name()
if not isinstance(name, types.StringTypes):
raise TypeError("name_or_database must be an instance of (Database, str, unicode)")
result = self[name]._command({"dropDatabase": 1})
if result["ok"] != 1:
raise OperationFailure("failed to drop database")