random generator for mongo documents, test that to_dict o from_dict is identity. fix get_c_string to check for 0 bytes
This commit is contained in:
parent
fa6b5a314d
commit
8ed0c63e86
25
bson.py
25
bson.py
@ -35,7 +35,15 @@ def _get_c_string(data):
|
||||
return (unicode(data[:end], "utf-8"), data[end + 1:])
|
||||
|
||||
def _make_c_string(string):
|
||||
return string.encode("utf-8") + "\x00"
|
||||
string = string.encode("utf-8")
|
||||
try:
|
||||
string.index("\x00")
|
||||
raise InvalidDocument("cannot encode string %r: contains '\\x00'" % string)
|
||||
except InvalidDocument:
|
||||
raise
|
||||
except ValueError:
|
||||
pass
|
||||
return string + "\x00"
|
||||
|
||||
def _validate_number(data):
|
||||
assert len(data) >= 8
|
||||
@ -47,9 +55,8 @@ def _validate_string(data):
|
||||
assert data[length - 1] == "\x00"
|
||||
return data[length:]
|
||||
|
||||
_valid_object_name = re.compile("^.*$")
|
||||
def _validate_object(data):
|
||||
return _validate_document(data, _valid_object_name)
|
||||
return _validate_document(data, None)
|
||||
|
||||
_valid_array_name = re.compile("^\d+$")
|
||||
def _validate_array(data):
|
||||
@ -120,19 +127,20 @@ def _validate_element_data(type, data):
|
||||
try:
|
||||
return _element_validator[type](data)
|
||||
except KeyError:
|
||||
raise InvalidBSON()
|
||||
raise InvalidBSON("unrecognized type: %s" % type)
|
||||
|
||||
def _validate_element(data, valid_name):
|
||||
element_type = data[0]
|
||||
(element_name, data) = _get_c_string(data[1:])
|
||||
assert valid_name.match(element_name)
|
||||
if valid_name:
|
||||
assert valid_name.match(element_name), "%r doesn't match %s" % (element_name, valid_name.pattern)
|
||||
return _validate_element_data(element_type, data)
|
||||
|
||||
def _validate_elements(data, valid_name):
|
||||
while data:
|
||||
data = _validate_element(data, valid_name)
|
||||
|
||||
def _validate_document(data, valid_name=_valid_object_name):
|
||||
def _validate_document(data, valid_name=None):
|
||||
try:
|
||||
obj_size = struct.unpack("<i", data[:4])[0]
|
||||
except struct.error:
|
||||
@ -364,5 +372,10 @@ class TestBSON(unittest.TestCase):
|
||||
helper({"an array": [1, True, 3.8, u"world"]})
|
||||
helper({"an object": {"test": u"something"}})
|
||||
|
||||
def from_then_to_dict(dict):
|
||||
return dict == (BSON.from_dict(dict)).to_dict()
|
||||
|
||||
qcheck.check_unittest(self, from_then_to_dict, qcheck.gen_mongo_dict(3))
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
import random
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
gen_target = 100
|
||||
examples = 5
|
||||
@ -6,14 +8,59 @@ examples = 5
|
||||
def gen_range(start, stop):
|
||||
return lambda: random.randint(start, stop)
|
||||
|
||||
def char():
|
||||
return chr(random.randint(0, 255))
|
||||
def gen_int():
|
||||
return lambda: random.randint(-sys.maxint-1, sys.maxint)
|
||||
|
||||
def gen_float():
|
||||
return lambda: (random.random() - 0.5) * sys.maxint
|
||||
|
||||
def gen_boolean():
|
||||
return lambda: random.choice([True, False])
|
||||
|
||||
def gen_printable_char():
|
||||
return lambda: chr(random.randint(32, 126))
|
||||
|
||||
def gen_printable_string(gen_length):
|
||||
return lambda: "".join(gen_list(gen_printable_char(), gen_length)())
|
||||
|
||||
def gen_char():
|
||||
return lambda: chr(random.randint(0, 255))
|
||||
|
||||
def gen_string(gen_length):
|
||||
return lambda: "".join(gen_list(char, gen_length))
|
||||
return lambda: "".join(gen_list(gen_char(), gen_length)())
|
||||
|
||||
def gen_list(generator, length):
|
||||
return [generator() for _ in range(length())]
|
||||
def gen_unichar():
|
||||
return lambda: unichr(random.randint(1, 0xFFFF))
|
||||
|
||||
def gen_unicode(gen_length):
|
||||
return lambda: u"".join(gen_list(gen_unichar(), gen_length)())
|
||||
|
||||
def gen_list(generator, gen_length):
|
||||
return lambda: [generator() for _ in range(gen_length())]
|
||||
|
||||
def gen_dict(gen_key, gen_value, gen_length):
|
||||
def a_dict(gen_key, gen_value, length):
|
||||
result = {}
|
||||
for _ in range(length):
|
||||
result[gen_key()] = gen_value()
|
||||
return result
|
||||
return lambda: a_dict(gen_key, gen_value, gen_length())
|
||||
|
||||
def gen_mongo_value(depth):
|
||||
choices = [gen_unicode(gen_range(0, 50)),
|
||||
gen_int(),
|
||||
gen_float(),
|
||||
gen_boolean()]
|
||||
if depth > 0:
|
||||
choices.append(gen_mongo_list(depth))
|
||||
choices.append(gen_mongo_dict(depth))
|
||||
return lambda: random.choice(choices)()
|
||||
|
||||
def gen_mongo_list(depth):
|
||||
return gen_list(gen_mongo_value(depth - 1), gen_range(0, 10))
|
||||
|
||||
def gen_mongo_dict(depth):
|
||||
return gen_dict(gen_unicode(gen_range(0, 20)), gen_mongo_value(depth - 1), gen_range(0, 10))
|
||||
|
||||
|
||||
def isnt(predicate):
|
||||
@ -22,10 +69,13 @@ def isnt(predicate):
|
||||
|
||||
def check(predicate, generator):
|
||||
counter_examples = []
|
||||
for i in range(gen_target):
|
||||
for _ in range(gen_target):
|
||||
case = generator()
|
||||
if not predicate(case):
|
||||
counter_examples.append(repr(case))
|
||||
try:
|
||||
if not predicate(case):
|
||||
counter_examples.append(repr(case))
|
||||
except:
|
||||
counter_examples.append("%r : %s" % (case, traceback.format_exc()))
|
||||
return counter_examples
|
||||
|
||||
def check_unittest(test, predicate, generator):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user