diff --git a/pymongo/database.py b/pymongo/database.py index e81193093..3aa726367 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -15,7 +15,9 @@ """Database level operations.""" import types +import md5 +from son import SON from son_manipulator import ObjectIdInjector from collection import Collection from errors import InvalidName, CollectionInvalid, OperationFailure @@ -274,3 +276,55 @@ class Database(object): def next(self): raise TypeError("'Database' object is not iterable") + + # TODO this should probably be private, but I'm using it for some tests right now... + def _password_digest(self, password): + """Get a password digest to use for authentication, given a password + """ + if not isinstance(password, types.StringTypes): + raise TypeError("password must be an instance of (str, unicode)") + + return unicode(md5.new("mongo" + password).hexdigest()) + + def authenticate(self, name, password): + """Authenticate to use this database. + + Once authenticated, the user has full read and write access to this + database. Raises TypeError if either name or password is not an instance + of (str, unicode). Authentication lasts for the life of the database + connection, or until `Database.logout` is called. + + The "admin" database is special. Authenticating on "admin" gives access + to *all* databases. Effectively, "admin" access means root access to the + database. + + Arguments: + - `name`: the name of the user to authenticate + - `password`: the password of the user to authenticate + """ + if not isinstance(name, types.StringTypes): + raise TypeError("name must be an instance of (str, unicode)") + if not isinstance(password, types.StringTypes): + raise TypeError("password must be an instance of (str, unicode)") + + nonce = self._command({"getnonce": 1}) + if nonce["ok"] != 1: + raise OperationFailure("failed to get nonce: %s" % result["errmsg"]) + nonce = nonce["nonce"] + result = self._command(SON([("authenticate", 1), + ("user", unicode(name)), + ("nonce", nonce), + ("key", unicode(md5.new("%s%s%s" % (nonce, + unicode(name), + self._password_digest(password) + )).hexdigest()))])) + return result["ok"] == 1 + + def logout(self): + """Deauthorize use of this database for this connection. + + Note that other databases may still be authorized. + """ + result = self._command({"logout": 1}) + if result["ok"] != 1: + raise OperationFailure("logout failed: %s" % result["errmsg"]) diff --git a/test/test_database.py b/test/test_database.py index 0bb5374cd..ab1024f39 100644 --- a/test/test_database.py +++ b/test/test_database.py @@ -190,6 +190,34 @@ class TestDatabase(unittest.TestCase): self.assertEqual(None, db.error()) self.assertEqual(None, db.previous_error()) + def test_password_digest(self): + db = self.connection.test + + self.assertRaises(TypeError, db._password_digest, 5) + self.assertRaises(TypeError, db._password_digest, True) + self.assertRaises(TypeError, db._password_digest, None) + + self.assertTrue(isinstance(db._password_digest("password"), types.UnicodeType)) + self.assertEqual(db._password_digest("password"), u"3b483f665f97e2fdf97575e72321fc5b") + self.assertEqual(db._password_digest("password"), db._password_digest(u"password")) + + def test_authenticate(self): + db = self.connection.test + db.system.users.remove({}) + db.system.users.insert({"user": u"mike", "pwd": db._password_digest("password")}) + + self.assertRaises(TypeError, db.authenticate, 5, "password") + self.assertRaises(TypeError, db.authenticate, "mike", 5) + + self.assertFalse(db.authenticate("mike", "not a real password")) + self.assertFalse(db.authenticate("faker", "password")) + self.assertTrue(db.authenticate("mike", "password")) + self.assertTrue(db.authenticate(u"mike", u"password")) + + # just make sure there are no exceptions here + db.logout() + db.logout() + # TODO some of these tests belong in the collection level testing. def test_save_find_one(self): db = Database(self.connection, "test")