diff --git a/son.py b/son.py index f0d3c5ff3..0df84f795 100644 --- a/son.py +++ b/son.py @@ -5,6 +5,12 @@ of keys is important.""" import unittest from UserDict import DictMixin +from xml.parsers import expat +from objectid import ObjectId +from dbref import DBRef +import datetime +import re +import binascii class SON(DictMixin): """SON data. @@ -37,7 +43,7 @@ class SON(DictMixin): def __repr__(self): result = [] for key in self.__keys: - result.append("(%s, %s)" % (repr(key), repr(self.__data[key]))) + result.append("(%r, %r)" % (key, self.__data[key])) return "SON([%s])" % ", ".join(result) def update(self, data): @@ -69,6 +75,99 @@ class SON(DictMixin): other.__keys = self.__keys[:] return other + @classmethod + def from_xml(cls, xml): + """Create an instance of SON from an xml document. + """ + parser = expat.ParserCreate() + son_stack = [] + list_stack = [] + append_to = [] + info = {} + extra = {} + + def pad(list, index): + while index >= len(list): + list.append(None) + + def set(key, value): + if append_to[-1] == "array": + index = int(key) + pad(list_stack[-1], index) + list_stack[-1][int(key)] = value + else: + son_stack[-1][key] = value + + def start_element(name, attributes): + info["current"] = name + if name == "doc": + a = SON() + if attributes.has_key("name"): + set(attributes["name"], a) + son_stack.append(a) + append_to.append("doc") + elif name == "array": + a = [] + set(attributes["name"], a) + list_stack.append(a) + append_to.append("array") + elif name in ["id", "int", "string", "boolean", "number", "date", "code", "ref", "regex"]: + info["key"] = attributes["name"] + elif name == "null": + set(attributes["name"], None) + + def end_element(name): + if name == "doc": + append_to.pop() + info["last"] = son_stack.pop() + elif name == "array": + append_to.pop() + info["array"] = False + list_stack.pop() + elif name == "ref": + set(info["key"], DBRef(extra["ns"], extra["oid"])) + elif name == "regex": + set(info["key"], re.compile(extra["pattern"], extra["options"])) + + def char_data(data): + data = data.strip() + if not data: + return + if info["current"] == "id": + set(info["key"], ObjectId(binascii.unhexlify(data))) + elif info["current"] == "int": + set(info["key"], int(data)) + elif info["current"] == "string": + set(info["key"], data) + elif info["current"] == "code": + set(info["key"], data.encode("utf-8")) + elif info["current"] == "boolean": + set(info["key"], data == "true") + elif info["current"] == "number": + set(info["key"], float(data)) + elif info["current"] == "date": + set(info["key"], datetime.datetime.fromtimestamp(float(data) / 1000.0)) + elif info["current"] == "ns": + extra["ns"] = data + elif info["current"] == "oid": + extra["oid"] = ObjectId(binascii.unhexlify(data)) + elif info["current"] == "pattern": + extra["pattern"] = data + elif info["current"] == "options": + options = 0 + if "i" in data: + options |= re.IGNORECASE + if "m" in data: + options |= re.MULTILINE + extra["options"] = options + + parser.StartElementHandler = start_element + parser.EndElementHandler = end_element + parser.CharacterDataHandler = char_data + + parser.Parse(xml) + return info["last"] + class TestSON(unittest.TestCase): def setUp(self): pass @@ -84,5 +183,53 @@ class TestSON(unittest.TestCase): self.assertEqual(b["hello"], "world") self.assertRaises(KeyError, lambda: b["goodbye"]) + def test_from_xml(self): + smorgasbord = """ + + + + 285a664923b5fcd8ec000000 + 42 + foo + true + 3.14159265358979 + + x + y + z + + yup + + + 123144452057 + + namespace + ca5c67496c01d896f7010000 + + + foobar + i + + this is code + + + +""" + self.assertEqual(SON.from_xml(smorgasbord), + SON([(u"_id", ObjectId("\x28\x5A\x66\x49\x23\xB5\xFC\xD8\xEC\x00\x00\x00")), + (u"the_answer", 42), + (u"b", u"foo"), + (u"c", True), + (u"pi", 3.14159265358979), + (u"an_array", [u"x", u"y", u"z", SON([(u"subobject", u"yup")])]), + (u"now", datetime.datetime(1973, 11, 26, 1, 47, 32, 57000)), + (u"dbref", + DBRef("namespace", + ObjectId("\xCA\x5C\x67\x49\x6C\x01\xD8\x96\xF7\x01\x00\x00"))), + (u"regex", re.compile(u"foobar", re.IGNORECASE)), + (u"$where", "this is code"), + (u"mynull", None), + ])) + if __name__ == "__main__": unittest.main()