mongo-python-driver/test/test_collection.py
2010-08-11 03:44:06 +08:00

1078 lines
39 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2009-2010 10gen, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test the collection module."""
import itertools
import re
import sys
import time
import unittest
import warnings
from nose.plugins.skip import SkipTest
sys.path[0:0] = [""]
from pymongo import ASCENDING, DESCENDING
from pymongo.binary import Binary
from pymongo.code import Code
from pymongo.collection import Collection
from pymongo.errors import (DuplicateKeyError,
InvalidDocument,
InvalidName,
InvalidOperation,
OperationFailure,
TimeoutError)
from pymongo.objectid import ObjectId
from pymongo.son import SON
from test.test_connection import get_connection
from test import (qcheck,
version)
class TestCollection(unittest.TestCase):
def setUp(self):
self.connection = get_connection()
self.db = self.connection.pymongo_test
def test_collection(self):
self.assertRaises(TypeError, Collection, self.db, 5)
def make_col(base, name):
return base[name]
self.assertRaises(InvalidName, make_col, self.db, "")
self.assertRaises(InvalidName, make_col, self.db, "te$t")
self.assertRaises(InvalidName, make_col, self.db, ".test")
self.assertRaises(InvalidName, make_col, self.db, "test.")
self.assertRaises(InvalidName, make_col, self.db, "tes..t")
self.assertRaises(InvalidName, make_col, self.db.test, "")
self.assertRaises(InvalidName, make_col, self.db.test, "te$t")
self.assertRaises(InvalidName, make_col, self.db.test, ".test")
self.assertRaises(InvalidName, make_col, self.db.test, "test.")
self.assertRaises(InvalidName, make_col, self.db.test, "tes..t")
self.assertRaises(InvalidName, make_col, self.db.test, "tes\x00t")
self.assert_(isinstance(self.db.test, Collection))
self.assertEqual(self.db.test, self.db["test"])
self.assertEqual(self.db.test, Collection(self.db, "test"))
self.assertEqual(self.db.test.mike, self.db["test.mike"])
self.assertEqual(self.db.test["mike"], self.db["test.mike"])
def test_create_index(self):
db = self.db
self.assertRaises(TypeError, db.test.create_index, 5)
self.assertRaises(TypeError, db.test.create_index, {"hello": 1})
self.assertRaises(ValueError, db.test.create_index, [])
db.test.drop_indexes()
self.assertEqual(db.system.indexes.find({"ns": u"pymongo_test.test"})
.count(), 1)
db.test.create_index("hello")
db.test.create_index([("hello", DESCENDING), ("world", ASCENDING)])
count = 0
for _ in db.system.indexes.find({"ns": u"pymongo_test.test"}):
count += 1
self.assertEqual(count, 3)
db.test.drop_indexes()
ix = db.test.create_index([("hello", DESCENDING),
("world", ASCENDING)], name="hello_world")
self.assertEquals(ix, "hello_world")
db.test.drop_indexes()
self.assertEqual(db.system.indexes.find({"ns": u"pymongo_test.test"})
.count(), 1)
db.test.create_index("hello")
self.assert_(SON([(u"name", u"hello_1"),
(u"ns", u"pymongo_test.test"),
(u"key", SON([(u"hello", 1)]))]) in
list(db.system.indexes
.find({"ns": u"pymongo_test.test"})))
db.test.drop_indexes()
self.assertEqual(db.system.indexes.find({"ns": u"pymongo_test.test"})
.count(), 1)
db.test.create_index([("hello", DESCENDING), ("world", ASCENDING)])
self.assert_(SON([(u"name", u"hello_-1_world_1"),
(u"ns", u"pymongo_test.test"),
(u"key", SON([(u"hello", -1),
(u"world", 1)]))]) in
list(db.system.indexes
.find({"ns": u"pymongo_test.test"})))
def test_ensure_index(self):
db = self.db
self.assertRaises(TypeError, db.test.ensure_index, {"hello": 1})
db.test.drop_indexes()
self.assertEqual("hello_1", db.test.create_index("hello"))
self.assertEqual("hello_1", db.test.create_index("hello"))
self.assertEqual("goodbye_1",
db.test.ensure_index("goodbye"))
self.assertEqual(None, db.test.ensure_index("goodbye"))
db.test.drop_indexes()
self.assertEqual("foo",
db.test.ensure_index("goodbye", name="foo"))
self.assertEqual(None, db.test.ensure_index("goodbye", name="foo"))
db.test.drop_indexes()
self.assertEqual("goodbye_1",
db.test.ensure_index("goodbye"))
self.assertEqual(None, db.test.ensure_index("goodbye"))
db.test.drop_index("goodbye_1")
self.assertEqual("goodbye_1",
db.test.ensure_index("goodbye"))
self.assertEqual(None, db.test.ensure_index("goodbye"))
db.drop_collection("test")
self.assertEqual("goodbye_1",
db.test.ensure_index("goodbye"))
self.assertEqual(None, db.test.ensure_index("goodbye"))
db_name = self.db.name
self.connection.drop_database(self.db.name)
self.assertEqual("goodbye_1",
db.test.ensure_index("goodbye"))
self.assertEqual(None, db.test.ensure_index("goodbye"))
db.test.drop_index("goodbye_1")
self.assertEqual("goodbye_1",
db.test.create_index("goodbye"))
self.assertEqual(None, db.test.ensure_index("goodbye"))
db.test.drop_index("goodbye_1")
self.assertEqual("goodbye_1",
db.test.ensure_index("goodbye", ttl=1))
time.sleep(1.1)
self.assertEqual("goodbye_1",
db.test.ensure_index("goodbye"))
db.test.drop_index("goodbye_1")
self.assertEqual("goodbye_1",
db.test.create_index("goodbye", ttl=1))
time.sleep(1.1)
self.assertEqual("goodbye_1",
db.test.ensure_index("goodbye"))
def test_index_on_binary(self):
db = self.db
db.drop_collection("test")
db.test.save({"bin": Binary("def")})
db.test.save({"bin": Binary("abc")})
db.test.save({"bin": Binary("ghi")})
self.assertEqual(db.test.find({"bin": Binary("abc")})
.explain()["nscanned"], 3)
db.test.create_index("bin")
self.assertEqual(db.test.find({"bin": Binary("abc")})
.explain()["nscanned"], 1)
def test_drop_index(self):
db = self.db
db.test.drop_indexes()
db.test.create_index("hello")
name = db.test.create_index("goodbye")
self.assertEqual(db.system.indexes.find({"ns": u"pymongo_test.test"})
.count(), 3)
self.assertEqual(name, "goodbye_1")
db.test.drop_index(name)
self.assertEqual(db.system.indexes.find({"ns": u"pymongo_test.test"})
.count(), 2)
self.assert_(SON([(u"name", u"hello_1"),
(u"ns", u"pymongo_test.test"),
(u"key", SON([(u"hello", 1)]))]) in
list(db.system.indexes
.find({"ns": u"pymongo_test.test"})))
db.test.drop_indexes()
db.test.create_index("hello")
name = db.test.create_index("goodbye")
self.assertEqual(db.system.indexes.find({"ns": u"pymongo_test.test"})
.count(), 3)
self.assertEqual(name, "goodbye_1")
db.test.drop_index([("goodbye", ASCENDING)])
self.assertEqual(db.system.indexes.find({"ns": u"pymongo_test.test"})
.count(), 2)
self.assert_(SON([(u"name", u"hello_1"),
(u"ns", u"pymongo_test.test"),
(u"key", SON([(u"hello", 1)]))]) in
list(db.system.indexes
.find({"ns": u"pymongo_test.test"})))
def test_index_info(self):
db = self.db
db.test.drop_indexes()
db.test.remove({})
db.test.save({}) # create collection
self.assertEqual(len(db.test.index_information()), 1)
self.assert_("_id_" in db.test.index_information())
db.test.create_index("hello")
self.assertEqual(len(db.test.index_information()), 2)
self.assertEqual(db.test.index_information()["hello_1"],
{"key": [("hello", ASCENDING)]})
db.test.create_index([("hello", DESCENDING), ("world", ASCENDING)],
unique=True)
self.assertEqual(db.test.index_information()["hello_1"],
{"key": [("hello", ASCENDING)]})
self.assertEqual(len(db.test.index_information()), 3)
self.assertEqual({"key": [("hello", DESCENDING), ("world", ASCENDING)],
"unique": True},
db.test.index_information()["hello_-1_world_1"])
def test_field_selection(self):
db = self.db
db.drop_collection("test")
doc = {"a": 1, "b": 5, "c": {"d": 5, "e": 10}}
db.test.insert(doc)
self.assertEqual(db.test.find({}, ["_id"]).next().keys(), ["_id"])
l = db.test.find({}, ["a"]).next().keys()
l.sort()
self.assertEqual(l, ["_id", "a"])
l = db.test.find({}, ["b"]).next().keys()
l.sort()
self.assertEqual(l, ["_id", "b"])
l = db.test.find({}, ["c"]).next().keys()
l.sort()
self.assertEqual(l, ["_id", "c"])
self.assertEqual(db.test.find({}, ["a"]).next()["a"], 1)
self.assertEqual(db.test.find({}, ["b"]).next()["b"], 5)
self.assertEqual(db.test.find({}, ["c"]).next()["c"],
{"d": 5, "e": 10})
self.assertEqual(db.test.find({}, ["c.d"]).next()["c"], {"d": 5})
self.assertEqual(db.test.find({}, ["c.e"]).next()["c"], {"e": 10})
self.assertEqual(db.test.find({}, ["b", "c.e"]).next()["c"],
{"e": 10})
l = db.test.find({}, ["b", "c.e"]).next().keys()
l.sort()
self.assertEqual(l, ["_id", "b", "c"])
self.assertEqual(db.test.find({}, ["b", "c.e"]).next()["b"], 5)
def test_options(self):
db = self.db
db.drop_collection("test")
db.test.save({})
self.assertEqual(db.test.options(), {})
self.assertEqual(db.test.doesnotexist.options(), {})
db.drop_collection("test")
db.create_collection("test", capped=True)
self.assertEqual(db.test.options(), {"capped": True})
db.drop_collection("test")
def test_insert_find_one(self):
db = self.db
db.test.remove({})
self.assertEqual(db.test.find().count(), 0)
doc = {"hello": u"world"}
id = db.test.insert(doc)
self.assertEqual(db.test.find().count(), 1)
self.assertEqual(doc, db.test.find_one())
self.assertEqual(doc["_id"], id)
self.assert_(isinstance(id, ObjectId))
def remove_insert_find_one(dict):
db.test.remove({})
db.test.insert(dict)
return db.test.find_one() == dict
qcheck.check_unittest(self, remove_insert_find_one,
qcheck.gen_mongo_dict(3))
def test_remove_all(self):
self.db.test.remove()
self.assertEqual(0, self.db.test.count())
self.db.test.insert({"x": 1})
self.db.test.insert({"y": 1})
self.assertEqual(2, self.db.test.count())
self.db.test.remove()
self.assertEqual(0, self.db.test.count())
def test_find_w_fields(self):
db = self.db
db.test.remove({})
db.test.insert({"x": 1, "mike": "awesome",
"extra thing": "abcdefghijklmnopqrstuvwxyz"})
self.assertEqual(1, db.test.count())
self.assert_("x" in db.test.find({}).next())
self.assert_("mike" in db.test.find({}).next())
self.assert_("extra thing" in db.test.find({}).next())
self.assert_("x" in db.test.find({}, ["x", "mike"]).next())
self.assert_("mike" in db.test.find({}, ["x", "mike"]).next())
self.assertFalse("extra thing" in db.test.find({}, ["x", "mike"]).next())
self.assertFalse("x" in db.test.find({}, ["mike"]).next())
self.assert_("mike" in db.test.find({}, ["mike"]).next())
self.assertFalse("extra thing" in db.test.find({}, ["mike"]).next())
def test_fields_specifier_as_dict(self):
db = self.db
db.test.remove({})
db.test.insert({"x": [1, 2, 3], "mike": "awesome"})
self.assertEqual([1,2,3], db.test.find_one()["x"])
if version.at_least(db.connection, (1, 5, 1)):
self.assertEqual([2,3],
db.test.find_one(fields={"x": {"$slice":
-2}})["x"])
self.assert_("x" not in db.test.find_one(fields={"x": 0}))
self.assert_("mike" in db.test.find_one(fields={"x": 0}))
def test_find_w_regex(self):
db = self.db
db.test.remove({})
db.test.insert({"x": "hello_world"})
db.test.insert({"x": "hello_mike"})
db.test.insert({"x": "hello_mikey"})
db.test.insert({"x": "hello_test"})
self.assertEqual(db.test.find().count(), 4)
self.assertEqual(db.test.find({"x":
re.compile("^hello.*")}).count(), 4)
self.assertEqual(db.test.find({"x":
re.compile("ello")}).count(), 4)
self.assertEqual(db.test.find({"x":
re.compile("^hello$")}).count(), 0)
self.assertEqual(db.test.find({"x":
re.compile("^hello_mi.*$")}).count(), 2)
def test_id_can_be_anything(self):
db = self.db
db.test.remove({})
auto_id = {"hello": "world"}
db.test.insert(auto_id)
self.assert_(isinstance(auto_id["_id"], ObjectId))
numeric = {"_id": 240, "hello": "world"}
db.test.insert(numeric)
self.assertEqual(numeric["_id"], 240)
object = {"_id": numeric, "hello": "world"}
db.test.insert(object)
self.assertEqual(object["_id"], numeric)
for x in db.test.find():
self.assertEqual(x["hello"], u"world")
self.assert_("_id" in x)
def test_iteration(self):
db = self.db
def iterate():
[a for a in db.test]
self.assertRaises(TypeError, iterate)
def test_invalid_key_names(self):
db = self.db
db.test.remove({})
db.test.insert({"hello": "world"})
db.test.insert({"hello": {"hello": "world"}})
self.assertRaises(InvalidName, db.test.insert, {"$hello": "world"})
self.assertRaises(InvalidName, db.test.insert,
{"hello": {"$hello": "world"}})
db.test.insert({"he$llo": "world"})
db.test.insert({"hello": {"hello$": "world"}})
self.assertRaises(InvalidName, db.test.insert,
{".hello": "world"})
self.assertRaises(InvalidName, db.test.insert,
{"hello": {".hello": "world"}})
self.assertRaises(InvalidName, db.test.insert,
{"hello.": "world"})
self.assertRaises(InvalidName, db.test.insert,
{"hello": {"hello.": "world"}})
self.assertRaises(InvalidName, db.test.insert,
{"hel.lo": "world"})
self.assertRaises(InvalidName, db.test.insert,
{"hello": {"hel.lo": "world"}})
db.test.update({"hello": "world"}, {"$inc": "hello"})
def test_insert_multiple(self):
db = self.db
db.drop_collection("test")
doc1 = {"hello": u"world"}
doc2 = {"hello": u"mike"}
self.assertEqual(db.test.find().count(), 0)
ids = db.test.insert([doc1, doc2])
self.assertEqual(db.test.find().count(), 2)
self.assertEqual(doc1, db.test.find_one({"hello": u"world"}))
self.assertEqual(doc2, db.test.find_one({"hello": u"mike"}))
self.assertEqual(2, len(ids))
self.assertEqual(doc1["_id"], ids[0])
self.assertEqual(doc2["_id"], ids[1])
id = db.test.insert([{"hello": 1}])
self.assert_(isinstance(id, list))
self.assertEqual(1, len(id))
self.assertRaises(InvalidOperation, db.test.insert, [])
def test_insert_iterables(self):
db = self.db
self.assertRaises(TypeError, db.test.insert, 4)
self.assertRaises(TypeError, db.test.insert, None)
self.assertRaises(TypeError, db.test.insert, True)
db.drop_collection("test")
self.assertEqual(db.test.find().count(), 0)
ids = db.test.insert(({"hello": u"world"}, {"hello": u"world"}))
self.assertEqual(db.test.find().count(), 2)
db.drop_collection("test")
self.assertEqual(db.test.find().count(), 0)
ids = db.test.insert(itertools.imap(lambda x: {"hello": "world"}, itertools.repeat(None, 10)))
self.assertEqual(db.test.find().count(), 10)
def test_save(self):
self.db.drop_collection("test")
id = self.db.test.save({"hello": "world"})
self.assertEqual(self.db.test.find_one()["_id"], id)
self.assert_(isinstance(id, ObjectId))
def test_unique_index(self):
db = self.db
db.drop_collection("test")
db.test.create_index("hello")
db.test.save({"hello": "world"})
db.test.save({"hello": "mike"})
db.test.save({"hello": "world"})
self.assertFalse(db.error())
db.drop_collection("test")
db.test.create_index("hello", unique=True)
db.test.save({"hello": "world"})
db.test.save({"hello": "mike"})
db.test.save({"hello": "world"})
self.assert_(db.error())
def test_duplicate_key_error(self):
db = self.db
db.drop_collection("test")
db.test.create_index("x", unique=True)
db.test.insert({"_id": 1, "x": 1}, safe=True)
db.test.insert({"_id": 2, "x": 2}, safe=True)
expected_error = OperationFailure
if version.at_least(db.connection, (1, 3)):
expected_error = DuplicateKeyError
self.assertRaises(expected_error,
db.test.insert, {"_id": 1}, safe=True)
self.assertRaises(expected_error,
db.test.insert, {"x": 1}, safe=True)
self.assertRaises(expected_error,
db.test.save, {"x": 2}, safe=True)
self.assertRaises(expected_error,
db.test.update, {"x": 1}, {"$inc": {"x": 1}}, safe=True)
def test_error_code(self):
try:
self.db.test.update({}, {"$thismodifierdoesntexist": 1}, safe=True)
self.fail()
except OperationFailure, e:
if version.at_least(self.db.connection, (1, 3)):
self.assertEqual(10147, e.code)
def test_index_on_subfield(self):
db = self.db
db.drop_collection("test")
db.test.insert({"hello": {"a": 4, "b": 5}})
db.test.insert({"hello": {"a": 7, "b": 2}})
db.test.insert({"hello": {"a": 4, "b": 10}})
self.assertFalse(db.error())
db.drop_collection("test")
db.test.create_index("hello.a", unique=True)
db.test.insert({"hello": {"a": 4, "b": 5}})
db.test.insert({"hello": {"a": 7, "b": 2}})
db.test.insert({"hello": {"a": 4, "b": 10}})
self.assert_(db.error())
def test_safe_insert(self):
db = self.db
db.drop_collection("test")
a = {"hello": "world"}
db.test.insert(a)
db.test.insert(a)
self.assert_("E11000" in db.error()["err"])
self.assertRaises(OperationFailure, db.test.insert, a, safe=True)
def test_update(self):
db = self.db
db.drop_collection("test")
id1 = db.test.save({"x": 5})
db.test.update({}, {"$inc": {"x": 1}})
self.assertEqual(db.test.find_one(id1)["x"], 6)
id2 = db.test.save({"x": 1})
db.test.update({"x": 6}, {"$inc": {"x": 1}})
self.assertEqual(db.test.find_one(id1)["x"], 7)
self.assertEqual(db.test.find_one(id2)["x"], 1)
def test_multi_update(self):
db = self.db
if not version.at_least(db.connection, (1, 1, 3, -1)):
raise SkipTest()
db.drop_collection("test")
db.test.save({"x": 4, "y": 3})
db.test.save({"x": 5, "y": 5})
db.test.save({"x": 4, "y": 4})
db.test.update({"x": 4}, {"$set": {"y": 5}}, multi=True)
self.assertEqual(3, db.test.count())
for doc in db.test.find():
self.assertEqual(5, doc["y"])
self.assertEqual(2, db.test.update({"x": 4}, {"$set": {"y": 6}},
multi=True, safe=True)["n"])
def test_upsert(self):
db = self.db
db.drop_collection("test")
db.test.update({"page": "/"}, {"$inc": {"count": 1}}, upsert=True)
db.test.update({"page": "/"}, {"$inc": {"count": 1}}, upsert=True)
self.assertEqual(1, db.test.count())
self.assertEqual(2, db.test.find_one()["count"])
def test_safe_update(self):
db = self.db
v113minus = version.at_least(db.connection, (1, 1, 3, -1))
db.drop_collection("test")
db.test.create_index("x", unique=True)
db.test.insert({"x": 5})
id = db.test.insert({"x": 4})
self.assertEqual(None, db.test.update({"_id": id}, {"$inc": {"x": 1}}))
if v113minus:
self.assert_(db.error()["err"].startswith("E11001"))
else:
self.assert_(db.error()["err"].startswith("E12011"))
self.assertRaises(OperationFailure, db.test.update,
{"_id": id}, {"$inc": {"x": 1}}, safe=True)
self.assertEqual(1, db.test.update({"_id": id}, {"$inc": {"x": 2}}, safe=True)["n"])
self.assertEqual(0, db.test.update({"_id": "foo"}, {"$inc": {"x": 2}}, safe=True)["n"])
def test_safe_save(self):
db = self.db
db.drop_collection("test")
db.test.create_index("hello", unique=True)
db.test.save({"hello": "world"})
db.test.save({"hello": "world"})
self.assert_("E11000" in db.error()["err"])
self.assertRaises(OperationFailure, db.test.save, {"hello": "world"}, safe=True)
def test_safe_remove(self):
db = self.db
db.drop_collection("test")
db.create_collection("test", capped=True, size=1000)
db.test.insert({"x": 1})
self.assertEqual(1, db.test.count())
self.assertEqual(None, db.test.remove({"x": 1}))
self.assertEqual(1, db.test.count())
if version.at_least(db.connection, (1, 1, 3, -1)):
self.assertRaises(OperationFailure, db.test.remove, {"x": 1}, safe=True)
else: # Just test that it doesn't blow up
db.test.remove({"x": 1}, safe=True)
db.drop_collection("test")
db.test.insert({"x": 1})
db.test.insert({"x": 1})
self.assertEqual(2, db.test.remove({}, safe=True)["n"])
self.assertEqual(0, db.test.remove({}, safe=True)["n"])
def test_last_error_options(self):
if not version.at_least(self.connection, (1, 5, 1)):
raise SkipTest()
self.assertRaises(TimeoutError, self.db.test.save, {"x": 1}, w=2, wtimeout=1)
self.assertRaises(TimeoutError, self.db.test.insert, {"x": 1}, w=2, wtimeout=1)
self.assertRaises(TimeoutError, self.db.test.update, {"x": 1}, {"y": 2}, w=2, wtimeout=1)
self.assertRaises(TimeoutError, self.db.test.remove, {"x": 1}, {"y": 2}, w=2, wtimeout=1)
self.db.test.save({"x": 1}, w=1, wtimeout=1)
self.db.test.insert({"x": 1}, w=1, wtimeout=1)
self.db.test.remove({"x": 1}, w=1, wtimeout=1)
self.db.test.update({"x": 1}, {"y": 2}, w=1, wtimeout=1)
def test_count(self):
db = self.db
db.drop_collection("test")
self.assertEqual(db.test.count(), 0)
db.test.save({})
db.test.save({})
self.assertEqual(db.test.count(), 2)
def test_group(self):
db = self.db
db.drop_collection("test")
def group_checker(args, expected):
eval = db.test.group(*args)
self.assertEqual(eval, expected)
self.assertEqual([], db.test.group([], {}, {"count": 0},
"function (obj, prev) { prev.count++; }"))
db.test.save({"a": 2})
db.test.save({"b": 5})
db.test.save({"a": 1})
self.assertEqual([{"count": 3}],
db.test.group([], {}, {"count": 0},
"function (obj, prev) { prev.count++; }"))
self.assertEqual([{"count": 1}],
db.test.group([], {"a": {"$gt": 1}}, {"count": 0},
"function (obj, prev) { prev.count++; }"))
db.test.save({"a": 2, "b": 3})
self.assertEqual([{"a": 2, "count": 2},
{"a": None, "count": 1},
{"a": 1, "count": 1}],
db.test.group(["a"], {}, {"count": 0},
"function (obj, prev) { prev.count++; }"))
# modifying finalize
self.assertEqual([{"a": 2, "count": 3},
{"a": None, "count": 2},
{"a": 1, "count": 2}],
db.test.group(["a"], {}, {"count": 0},
"function (obj, prev) { prev.count++; }",
"function (obj) { obj.count++; }"))
# returning finalize
self.assertEqual([2, 1, 1],
db.test.group(["a"], {}, {"count": 0},
"function (obj, prev) { prev.count++; }",
"function (obj) { return obj.count; }"))
# keyf
self.assertEqual([2, 2],
db.test.group("function (obj) { if (obj.a == 2) { return {a: true} }; "
"return {b: true}; }", {}, {"count": 0},
"function (obj, prev) { prev.count++; }",
"function (obj) { return obj.count; }"))
# no key
self.assertEqual([{"count": 4}],
db.test.group(None, {}, {"count": 0},
"function (obj, prev) { prev.count++; }"))
warnings.simplefilter("error")
self.assertRaises(DeprecationWarning,
db.test.group, [], {}, {"count": 0},
"function (obj, prev) { prev.count++; }",
command=False)
warnings.simplefilter("default")
self.assertRaises(OperationFailure, db.test.group, [], {}, {}, "5 ++ 5")
def test_group_with_scope(self):
db = self.db
db.drop_collection("test")
db.test.save({"a": 1})
db.test.save({"b": 1})
reduce_function = "function (obj, prev) { prev.count += inc_value; }"
self.assertEqual(2, db.test.group([], {}, {"count": 0},
Code(reduce_function,
{"inc_value": 1}))[0]['count'])
self.assertEqual(4, db.test.group([], {}, {"count": 0},
Code(reduce_function,
{"inc_value": 2}))[0]['count'])
self.assertEqual(1, db.test.group([], {}, {"count": 0},
Code(reduce_function,
{"inc_value": 0.5}))[0]['count'])
if version.at_least(db.connection, (1, 1)):
self.assertEqual(2, db.test.group([], {}, {"count": 0},
Code(reduce_function,
{"inc_value": 1}),
command=True)[0]['count'])
self.assertEqual(4, db.test.group([], {}, {"count": 0},
Code(reduce_function,
{"inc_value": 2}),
command=True)[0]['count'])
self.assertEqual(1, db.test.group([], {}, {"count": 0},
Code(reduce_function,
{"inc_value": 0.5}),
command=True)[0]['count'])
def test_large_limit(self):
db = self.db
db.drop_collection("test")
for i in range(2000):
db.test.insert({"x": i, "y": "mongomongo" * 1000})
self.assertEqual(2000, db.test.count())
i = 0
y = 0
for doc in db.test.find(limit=1900):
i += 1
y += doc["x"]
self.assertEqual(1900, i)
self.assertEqual(1804050, y)
def test_find_kwargs(self):
db = self.db
db.drop_collection("test")
for i in range(10):
db.test.insert({"x": i})
self.assertEqual(10, db.test.count())
sum = 0
for x in db.test.find({}, skip=4, limit=2):
sum += x["x"]
self.assertEqual(9, sum)
def test_rename(self):
db = self.db
db.drop_collection("test")
db.drop_collection("foo")
self.assertRaises(TypeError, db.test.rename, 5)
self.assertRaises(InvalidName, db.test.rename, "")
self.assertRaises(InvalidName, db.test.rename, "te$t")
self.assertRaises(InvalidName, db.test.rename, ".test")
self.assertRaises(InvalidName, db.test.rename, "test.")
self.assertRaises(InvalidName, db.test.rename, "tes..t")
self.assertEqual(0, db.test.count())
self.assertEqual(0, db.foo.count())
for i in range(10):
db.test.insert({"x": i})
self.assertEqual(10, db.test.count())
db.test.rename("foo")
self.assertEqual(0, db.test.count())
self.assertEqual(10, db.foo.count())
x = 0
for doc in db.foo.find():
self.assertEqual(x, doc["x"])
x += 1
db.test.insert({})
self.assertRaises(OperationFailure, db.foo.rename, "test")
db.foo.rename("test", dropTarget=True)
# doesn't really test functionality, just that the option is set correctly
def test_snapshot(self):
db = self.db
self.assertRaises(TypeError, db.test.find, snapshot=5)
list(db.test.find(snapshot=True))
self.assertRaises(OperationFailure, list, db.test.find(snapshot=True).sort("foo", 1))
def test_find_one(self):
db = self.db
db.drop_collection("test")
id = db.test.save({"hello": "world", "foo": "bar"})
self.assertEqual("world", db.test.find_one()["hello"])
self.assertEqual(db.test.find_one(id), db.test.find_one())
self.assertEqual(db.test.find_one(None), db.test.find_one())
self.assertEqual(db.test.find_one({}), db.test.find_one())
self.assertEqual(db.test.find_one({"hello": "world"}), db.test.find_one())
self.assert_("hello" in db.test.find_one(fields=["hello"]))
self.assert_("hello" not in db.test.find_one(fields=["foo"]))
self.assertEqual(["_id"], db.test.find_one(fields=[]).keys())
self.assertEqual(None, db.test.find_one({"hello": "foo"}))
self.assertEqual(None, db.test.find_one(ObjectId()))
def test_find_one_non_objectid(self):
db = self.db
db.drop_collection("test")
db.test.save({"_id": 5})
self.assert_(db.test.find_one(5))
self.assertFalse(db.test.find_one(6))
def test_remove_non_objectid(self):
db = self.db
db.drop_collection("test")
db.test.save({"_id": 5})
self.assertEqual(1, db.test.count())
db.test.remove(5)
self.assertEqual(0, db.test.count())
def test_find_one_with_find_args(self):
db = self.db
db.drop_collection("test")
db.test.save({"x": 1})
db.test.save({"x": 2})
db.test.save({"x": 3})
self.assertEqual(1, db.test.find_one()["x"])
self.assertEqual(2, db.test.find_one(skip=1, limit=2)["x"])
def test_find_with_sort(self):
db = self.db
db.drop_collection("test")
db.test.save({"x": 2})
db.test.save({"x": 1})
db.test.save({"x": 3})
self.assertEqual(2, db.test.find_one()["x"])
self.assertEqual(1, db.test.find_one(sort=[("x", 1)])["x"])
self.assertEqual(3, db.test.find_one(sort=[("x", -1)])["x"])
def to_list(foo):
return [bar["x"] for bar in foo]
self.assertEqual([2,1,3], to_list(db.test.find()))
self.assertEqual([1,2,3], to_list(db.test.find(sort=[("x", 1)])))
self.assertEqual([3,2,1], to_list(db.test.find(sort=[("x", -1)])))
self.assertRaises(TypeError, db.test.find, sort=5)
self.assertRaises(TypeError, db.test.find, sort="hello")
self.assertRaises(ValueError, db.test.find, sort=["hello", 1])
def test_insert_adds_id(self):
doc = {"hello": "world"}
self.db.test.insert(doc)
self.assert_("_id" in doc)
docs = [{"hello": "world"}, {"hello": "world"}]
self.db.test.insert(docs)
for doc in docs:
self.assert_("_id" in doc)
def test_save_adds_id(self):
doc = {"hello": "world"}
self.db.test.save(doc)
self.assert_("_id" in doc)
# TODO doesn't actually test functionality, just that it doesn't blow up
def test_cursor_timeout(self):
list(self.db.test.find(timeout=False))
list(self.db.test.find(timeout=True))
def test_distinct(self):
if not version.at_least(self.db.connection, (1, 1)):
raise SkipTest()
self.db.drop_collection("test")
self.db.test.save({"a": 1})
self.db.test.save({"a": 2})
self.db.test.save({"a": 2})
self.db.test.save({"a": 2})
self.db.test.save({"a": 3})
distinct = self.db.test.distinct("a")
distinct.sort()
self.assertEqual([1, 2, 3], distinct)
self.db.drop_collection("test")
self.db.test.save({"a": {"b": "a"}, "c": 12})
self.db.test.save({"a": {"b": "b"}, "c": 12})
self.db.test.save({"a": {"b": "c"}, "c": 12})
self.db.test.save({"a": {"b": "c"}, "c": 12})
distinct = self.db.test.distinct("a.b")
distinct.sort()
self.assertEqual(["a", "b", "c"], distinct)
def test_query_on_query_field(self):
self.db.drop_collection("test")
self.db.test.save({"query": "foo"})
self.db.test.save({"bar": "foo"})
self.assertEqual(1, self.db.test.find({"query": {"$ne": None}}).count())
self.assertEqual(1, len(list(self.db.test.find({"query": {"$ne": None}}))))
def test_min_query(self):
self.db.drop_collection("test")
self.db.test.save({"x": 1})
self.db.test.save({"x": 2})
self.db.test.create_index("x")
self.assertEqual(1, len(list(self.db.test.find({"$min": {"x": 2},
"$query": {}}))))
self.assertEqual(2, self.db.test.find({"$min": {"x": 2},
"$query": {}})[0]["x"])
def test_insert_large_document(self):
self.assertRaises(InvalidDocument, self.db.test.insert,
{"foo": "x" * 4 * 1024 * 1024})
self.assertRaises(InvalidDocument, self.db.test.insert,
[{"x": 1}, {"foo": "x" * 4 * 1024 * 1024}])
self.db.test.insert([{"foo": "x" * 2 * 1024 * 1024},
{"foo": "x" * 2 * 1024 * 1024}], safe=True)
def test_map_reduce(self):
if not version.at_least(self.db.connection, (1, 1, 1)):
raise SkipTest()
db = self.db
db.drop_collection("test")
db.test.insert({"id": 1, "tags": ["dog", "cat"]})
db.test.insert({"id": 2, "tags": ["cat"]})
db.test.insert({"id": 3, "tags": ["mouse", "cat", "dog"]})
db.test.insert({"id": 4, "tags": []})
map = Code("function () {"
" this.tags.forEach(function(z) {"
" emit(z, 1);"
" });"
"}")
reduce = Code("function (key, values) {"
" var total = 0;"
" for (var i = 0; i < values.length; i++) {"
" total += values[i];"
" }"
" return total;"
"}")
result = db.test.map_reduce(map, reduce)
self.assertEqual(3, result.find_one({"_id": "cat"})["value"])
self.assertEqual(2, result.find_one({"_id": "dog"})["value"])
self.assertEqual(1, result.find_one({"_id": "mouse"})["value"])
full_result = db.test.map_reduce(map, reduce, full_response=True)
self.assertEqual(6, full_result["counts"]["emit"])
result = db.test.map_reduce(map, reduce, limit=2)
self.assertEqual(2, result.find_one({"_id": "cat"})["value"])
self.assertEqual(1, result.find_one({"_id": "dog"})["value"])
self.assertEqual(None, result.find_one({"_id": "mouse"}))
def test_messages_with_unicode_collection_names(self):
db = self.db
db[u"Employés"].insert({"x": 1})
db[u"Employés"].update({"x": 1}, {"x": 2})
db[u"Employés"].remove({})
db[u"Employés"].find_one()
list(db[u"Employés"].find())
def test_drop_indexes_non_existant(self):
self.db.drop_collection("test")
self.db.test.drop_indexes()
# This is really a bson test but easier to just reproduce it here...
# (Shame on me)
def test_bad_encode(self):
c = self.db.test
warnings.simplefilter("ignore")
self.assertRaises(InvalidDocument, c.save, {"x": c})
warnings.simplefilter("default")
def test_as_class(self):
c = self.db.test
c.remove()
c.insert({"x": 1})
self.assert_(isinstance(c.find().next(), dict))
self.assertFalse(isinstance(c.find().next(), SON))
self.assert_(isinstance(c.find(as_class=SON).next(), SON))
self.assert_(isinstance(c.find_one(), dict))
self.assertFalse(isinstance(c.find_one(), SON))
self.assert_(isinstance(c.find_one(as_class=SON), SON))
self.assertEqual(1, c.find_one(as_class=SON)["x"])
self.assertEqual(1, c.find(as_class=SON).next()["x"])
if __name__ == "__main__":
unittest.main()