authenticate() and logout()

This commit is contained in:
Mike Dirolf 2009-01-23 14:43:48 -05:00
parent 4180922a45
commit 94d51a6e39
2 changed files with 82 additions and 0 deletions

View File

@ -15,7 +15,9 @@
"""Database level operations."""
import types
import md5
from son import SON
from son_manipulator import ObjectIdInjector
from collection import Collection
from errors import InvalidName, CollectionInvalid, OperationFailure
@ -274,3 +276,55 @@ class Database(object):
def next(self):
raise TypeError("'Database' object is not iterable")
# TODO this should probably be private, but I'm using it for some tests right now...
def _password_digest(self, password):
"""Get a password digest to use for authentication, given a password
"""
if not isinstance(password, types.StringTypes):
raise TypeError("password must be an instance of (str, unicode)")
return unicode(md5.new("mongo" + password).hexdigest())
def authenticate(self, name, password):
"""Authenticate to use this database.
Once authenticated, the user has full read and write access to this
database. Raises TypeError if either name or password is not an instance
of (str, unicode). Authentication lasts for the life of the database
connection, or until `Database.logout` is called.
The "admin" database is special. Authenticating on "admin" gives access
to *all* databases. Effectively, "admin" access means root access to the
database.
Arguments:
- `name`: the name of the user to authenticate
- `password`: the password of the user to authenticate
"""
if not isinstance(name, types.StringTypes):
raise TypeError("name must be an instance of (str, unicode)")
if not isinstance(password, types.StringTypes):
raise TypeError("password must be an instance of (str, unicode)")
nonce = self._command({"getnonce": 1})
if nonce["ok"] != 1:
raise OperationFailure("failed to get nonce: %s" % result["errmsg"])
nonce = nonce["nonce"]
result = self._command(SON([("authenticate", 1),
("user", unicode(name)),
("nonce", nonce),
("key", unicode(md5.new("%s%s%s" % (nonce,
unicode(name),
self._password_digest(password)
)).hexdigest()))]))
return result["ok"] == 1
def logout(self):
"""Deauthorize use of this database for this connection.
Note that other databases may still be authorized.
"""
result = self._command({"logout": 1})
if result["ok"] != 1:
raise OperationFailure("logout failed: %s" % result["errmsg"])

View File

@ -190,6 +190,34 @@ class TestDatabase(unittest.TestCase):
self.assertEqual(None, db.error())
self.assertEqual(None, db.previous_error())
def test_password_digest(self):
db = self.connection.test
self.assertRaises(TypeError, db._password_digest, 5)
self.assertRaises(TypeError, db._password_digest, True)
self.assertRaises(TypeError, db._password_digest, None)
self.assertTrue(isinstance(db._password_digest("password"), types.UnicodeType))
self.assertEqual(db._password_digest("password"), u"3b483f665f97e2fdf97575e72321fc5b")
self.assertEqual(db._password_digest("password"), db._password_digest(u"password"))
def test_authenticate(self):
db = self.connection.test
db.system.users.remove({})
db.system.users.insert({"user": u"mike", "pwd": db._password_digest("password")})
self.assertRaises(TypeError, db.authenticate, 5, "password")
self.assertRaises(TypeError, db.authenticate, "mike", 5)
self.assertFalse(db.authenticate("mike", "not a real password"))
self.assertFalse(db.authenticate("faker", "password"))
self.assertTrue(db.authenticate("mike", "password"))
self.assertTrue(db.authenticate(u"mike", u"password"))
# just make sure there are no exceptions here
db.logout()
db.logout()
# TODO some of these tests belong in the collection level testing.
def test_save_find_one(self):
db = Database(self.connection, "test")