diff --git a/dkim/__init__.py b/dkim/__init__.py index 71e906f..861a501 100644 --- a/dkim/__init__.py +++ b/dkim/__init__.py @@ -23,6 +23,11 @@ import time import dns.resolver +from dkim.util import ( + InvalidTagValueList, + parse_tag_value, + ) + __all__ = [ "Simple", "Relaxed", @@ -520,19 +525,10 @@ def verify(message, debuglog=None, dnsfunc=dnstxt): return False # Currently, we only validate the first DKIM-Signature line found. - - a = re.split(r"\s*;\s*", sigheaders[0][1].strip()) - if debuglog is not None: - print >>debuglog, "a:", a - sig = {} - for x in a: - if x: - m = re.match(r"(\w+)\s*=\s*(.*)", x, re.DOTALL) - if m is None: - if debuglog is not None: - print >>debuglog, "invalid format of signature part: %s" % x - return False - sig[m.group(1)] = m.group(2) + try: + sig = parse_tag_value(sigheaders[0][1]) + except InvalidTagValueList: + return False if debuglog is not None: print >>debuglog, "sig:", sig @@ -597,20 +593,10 @@ def verify(message, debuglog=None, dnsfunc=dnstxt): s = dnsfunc(sig['s']+"._domainkey."+sig['d']+".") if not s: return False - a = re.split(r"\s*;\s*", s) - # Trailing ';' on signature record is valid, see RFC 4871 3.2 - # tag-list = tag-spec 0*( ";" tag-spec ) [ ";" ] - if a[-1] == '': - a.pop(-1) - pub = {} - for f in a: - m = re.match(r"(\w+)=(.*)", f) - if m is not None: - pub[m.group(1)] = m.group(2) - else: - if debuglog is not None: - print >>debuglog, "invalid format in _domainkey txt record" - return False + try: + pub = parse_tag_value(s) + except InvalidTagValueList: + return False pk = parse_public_key(base64.b64decode(pub['p'])) modlen = len(int2str(pk['modulus'])) if debuglog is not None: diff --git a/dkim/tests/test_dkim.py b/dkim/tests/test_dkim.py index aecc326..9d6a2e6 100644 --- a/dkim/tests/test_dkim.py +++ b/dkim/tests/test_dkim.py @@ -20,6 +20,11 @@ import os.path import unittest import dkim +from dkim.util import ( + DuplicateTag, + InvalidTagSpec, + parse_tag_value, + ) def read_test_data(filename): @@ -68,5 +73,44 @@ class TestSignAndVerify(unittest.TestCase): self.assertFalse(res) +class TestParseTagValue(unittest.TestCase): + """Tag=Value parsing tests.""" + + def test_single(self): + self.assertEqual( + {'foo': 'bar'}, + parse_tag_value('foo=bar')) + + def test_trailing_separator_ignored(self): + self.assertEqual( + {'foo': 'bar'}, + parse_tag_value('foo=bar;')) + + def test_multiple(self): + self.assertEqual( + {'foo': 'bar', 'baz': 'foo'}, + parse_tag_value('foo=bar;baz=foo')) + + def test_value_with_equals(self): + self.assertEqual( + {'foo': 'bar', 'baz': 'foo=bar'}, + parse_tag_value('foo=bar;baz=foo=bar')) + + def test_whitespace_is_stripped(self): + self.assertEqual( + {'foo': 'bar', 'baz': 'f oo=bar'}, + parse_tag_value(' foo \t= bar;\tbaz= f oo=bar ')) + + def test_missing_value_is_an_error(self): + self.assertRaisesRegexp( + InvalidTagSpec, 'baz', + parse_tag_value, 'foo=bar;baz') + + def test_duplicate_tag_is_an_error(self): + self.assertRaisesRegexp( + DuplicateTag, 'foo', + parse_tag_value, 'foo=bar;foo=baz') + + if __name__ == '__main__': unittest.main() diff --git a/dkim/util.py b/dkim/util.py new file mode 100644 index 0000000..8511ca2 --- /dev/null +++ b/dkim/util.py @@ -0,0 +1,60 @@ +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the author be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. +# +# Copyright (c) 2011 William Grant + +__all__ = [ + 'DuplicateTag', + 'InvalidTagSpec', + 'InvalidTagValueList', + 'parse_tag_value', + ] + + +class InvalidTagValueList(Exception): + pass + + +class DuplicateTag(InvalidTagValueList): + pass + + +class InvalidTagSpec(InvalidTagValueList): + pass + + +def parse_tag_value(tag_list): + """Parse a DKIM Tag=Value list. + + Interprets the syntax specified by RFC4871 section 3.2. + Assumes that folding whitespace is already unfolded. + + @param tag_list: A string containing a DKIM Tag=Value list. + """ + tags = {} + tag_specs = tag_list.split(';') + # Trailing semicolons are valid. + if not tag_specs[-1]: + tag_specs.pop() + for tag_spec in tag_specs: + try: + key, value = tag_spec.split('=', 1) + except ValueError: + raise InvalidTagSpec(tag_spec) + if key.strip() in tags: + raise DuplicateTag(key.strip()) + tags[key.strip()] = value.strip() + return tags