diff --git a/.bzrignore b/.bzrignore index a9b310f..6a691e0 100644 --- a/.bzrignore +++ b/.bzrignore @@ -1 +1 @@ -.testrepository +/.testrepository diff --git a/dkim/__init__.py b/dkim/__init__.py index 4845793..223d7cf 100644 --- a/dkim/__init__.py +++ b/dkim/__init__.py @@ -30,6 +30,10 @@ from dkim.crypto import ( RSASSA_PKCS1_v1_5_sign, RSASSA_PKCS1_v1_5_verify, ) +from dkim.util import ( + InvalidTagValueList, + parse_tag_value, + ) __all__ = [ "Simple", @@ -104,6 +108,7 @@ def _remove(s, t): def hash_headers(hasher, canonicalize_headers, headers, include_headers, sigheaders, sig): + """Sign message header fields.""" sign_headers = [] lastindex = {} for h in include_headers: @@ -126,6 +131,14 @@ def hash_headers(hasher, canonicalize_headers, headers, include_headers, def validate_signature_fields(sig, debuglog=None): + """Validate DKIM-Signature fields. + + Basic checks for presence and correct formatting of mandatory fields. + + @param sig: A dict mapping field keys to values. + @param debuglog: A file-like object to which details will be written + on error. + """ mandatory_fields = ('v', 'a', 'b', 'bh', 'd', 'h', 's') for field in mandatory_fields: if field not in sig: @@ -329,19 +342,10 @@ def verify(message, debuglog=None, dnsfunc=dnstxt): return False # Currently, we only validate the first DKIM-Signature line found. - - a = re.split(r"\s*;\s*", sigheaders[0][1].strip()) - if debuglog is not None: - print >>debuglog, "a:", a - sig = {} - for x in a: - if x: - m = re.match(r"(\w+)\s*=\s*(.*)", x, re.DOTALL) - if m is None: - if debuglog is not None: - print >>debuglog, "invalid format of signature part: %s" % x - return False - sig[m.group(1)] = m.group(2) + try: + sig = parse_tag_value(sigheaders[0][1]) + except InvalidTagValueList: + return False if debuglog is not None: print >>debuglog, "sig:", sig @@ -406,20 +410,10 @@ def verify(message, debuglog=None, dnsfunc=dnstxt): s = dnsfunc(sig['s']+"._domainkey."+sig['d']+".") if not s: return False - a = re.split(r"\s*;\s*", s) - # Trailing ';' on signature record is valid, see RFC 4871 3.2 - # tag-list = tag-spec 0*( ";" tag-spec ) [ ";" ] - if a[-1] == '': - a.pop(-1) - pub = {} - for f in a: - m = re.match(r"(\w+)=(.*)", f) - if m is not None: - pub[m.group(1)] = m.group(2) - else: - if debuglog is not None: - print >>debuglog, "invalid format in _domainkey txt record" - return False + try: + pub = parse_tag_value(s) + except InvalidTagValueList: + return False pk = parse_public_key(base64.b64decode(pub['p'])) include_headers = re.split(r"\s*:\s*", sig['h']) diff --git a/dkim/crypto.py b/dkim/crypto.py index c2bba28..499ad2c 100644 --- a/dkim/crypto.py +++ b/dkim/crypto.py @@ -69,6 +69,11 @@ ASN1_RSAPrivateKey = [ def parse_public_key(data): + """Parse an RSA public key. + + @param data: A DER-encoded X.509 subjectPublicKeyInfo + containing an RFC3447 RSAPublicKey. + """ x = asn1_parse(ASN1_Object, data) # Not sure why the [1:] is necessary to skip a byte. pkd = asn1_parse(ASN1_RSAPublicKey, x[0][1][1:]) @@ -96,6 +101,14 @@ def parse_private_key(data): def EMSA_PKCS1_v1_5_encode(digest, modlen, hashid): + """Encode a digest with EMSA-PKCS1-v1_5. + + Defined in RFC3447 section 9.2. + + @param digest: A digest value to encode. + @param modlen: The desired message length. + @param hashid: The ID of the hash used to generate the digest. + """ dinfo = asn1_build( (SEQUENCE, [ (SEQUENCE, [ @@ -103,8 +116,7 @@ def EMSA_PKCS1_v1_5_encode(digest, modlen, hashid): (NULL, None), ]), (OCTET_STRING, digest), - ]), - ) + ])) if len(dinfo)+3 > modlen: raise Exception("Hash too large for modulus") # XXX: DKIMException return "\x00\x01"+"\xff"*(modlen-len(dinfo)-3)+"\x00"+dinfo diff --git a/dkim/tests/test_dkim.py b/dkim/tests/test_dkim.py index 1baff54..9d6a2e6 100644 --- a/dkim/tests/test_dkim.py +++ b/dkim/tests/test_dkim.py @@ -1,7 +1,30 @@ +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the author be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. +# +# Copyright (c) 2011 William Grant + import os.path import unittest import dkim +from dkim.util import ( + DuplicateTag, + InvalidTagSpec, + parse_tag_value, + ) def read_test_data(filename): @@ -50,5 +73,44 @@ class TestSignAndVerify(unittest.TestCase): self.assertFalse(res) +class TestParseTagValue(unittest.TestCase): + """Tag=Value parsing tests.""" + + def test_single(self): + self.assertEqual( + {'foo': 'bar'}, + parse_tag_value('foo=bar')) + + def test_trailing_separator_ignored(self): + self.assertEqual( + {'foo': 'bar'}, + parse_tag_value('foo=bar;')) + + def test_multiple(self): + self.assertEqual( + {'foo': 'bar', 'baz': 'foo'}, + parse_tag_value('foo=bar;baz=foo')) + + def test_value_with_equals(self): + self.assertEqual( + {'foo': 'bar', 'baz': 'foo=bar'}, + parse_tag_value('foo=bar;baz=foo=bar')) + + def test_whitespace_is_stripped(self): + self.assertEqual( + {'foo': 'bar', 'baz': 'f oo=bar'}, + parse_tag_value(' foo \t= bar;\tbaz= f oo=bar ')) + + def test_missing_value_is_an_error(self): + self.assertRaisesRegexp( + InvalidTagSpec, 'baz', + parse_tag_value, 'foo=bar;baz') + + def test_duplicate_tag_is_an_error(self): + self.assertRaisesRegexp( + DuplicateTag, 'foo', + parse_tag_value, 'foo=bar;foo=baz') + + if __name__ == '__main__': unittest.main() diff --git a/dkim/util.py b/dkim/util.py new file mode 100644 index 0000000..8511ca2 --- /dev/null +++ b/dkim/util.py @@ -0,0 +1,60 @@ +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the author be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. +# +# Copyright (c) 2011 William Grant + +__all__ = [ + 'DuplicateTag', + 'InvalidTagSpec', + 'InvalidTagValueList', + 'parse_tag_value', + ] + + +class InvalidTagValueList(Exception): + pass + + +class DuplicateTag(InvalidTagValueList): + pass + + +class InvalidTagSpec(InvalidTagValueList): + pass + + +def parse_tag_value(tag_list): + """Parse a DKIM Tag=Value list. + + Interprets the syntax specified by RFC4871 section 3.2. + Assumes that folding whitespace is already unfolded. + + @param tag_list: A string containing a DKIM Tag=Value list. + """ + tags = {} + tag_specs = tag_list.split(';') + # Trailing semicolons are valid. + if not tag_specs[-1]: + tag_specs.pop() + for tag_spec in tag_specs: + try: + key, value = tag_spec.split('=', 1) + except ValueError: + raise InvalidTagSpec(tag_spec) + if key.strip() in tags: + raise DuplicateTag(key.strip()) + tags[key.strip()] = value.strip() + return tags