From 7b1a3f70dcfc77e85d1cef3b3488a81c67596ea2 Mon Sep 17 00:00:00 2001 From: William Grant Date: Sat, 4 Jun 2011 14:05:54 +1000 Subject: [PATCH 1/5] Add CanonicalizationPolicy, which encapsulates the combined hybrid simple/relaxed schemes. --- dkim/__init__.py | 10 +++++++--- dkim/canonicalization.py | 14 ++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/dkim/__init__.py b/dkim/__init__.py index dee2b7a..6311ab1 100644 --- a/dkim/__init__.py +++ b/dkim/__init__.py @@ -25,7 +25,10 @@ import logging import re import time -from dkim.canonicalization import algorithms +from dkim.canonicalization import ( + algorithms, + CanonicalizationPolicy, + ) from dkim.crypto import ( DigestTooLargeError, HASH_ALGORITHMS, @@ -334,8 +337,9 @@ def verify(message, logger=None, dnsfunc=get_txt): except KeyError as e: logger.error("unknown canonicalization algorithm: %s" % e.message) return False - headers = header_algorithm.canonicalize_headers(headers) - body = body_algorithm.canonicalize_body(body) + canon_policy = CanonicalizationPolicy(header_algorithm, body_algorithm) + headers = canon_policy.canonicalize_headers(headers) + body = canon_policy.canonicalize_body(body) try: hasher = HASH_ALGORITHMS[sig[b'a']] diff --git a/dkim/canonicalization.py b/dkim/canonicalization.py index 9b191f2..7023dc8 100644 --- a/dkim/canonicalization.py +++ b/dkim/canonicalization.py @@ -23,6 +23,7 @@ import re __all__ = [ 'algorithms', + 'CanonicalizationPolicy', ] @@ -83,4 +84,17 @@ class Relaxed: compress_whitespace(strip_trailing_whitespace(body))) +class CanonicalizationPolicy: + + def __init__(self, header_algorithm, body_algorithm): + self.header_algorithm = header_algorithm + self.body_algorithm = body_algorithm + + def canonicalize_headers(self, headers): + return self.header_algorithm.canonicalize_headers(headers) + + def canonicalize_body(self, body): + return self.body_algorithm.canonicalize_body(body) + + algorithms = dict((c.name, c) for c in (Simple, Relaxed)) From 206c86089022c9aee00666591193321eaf4d42b4 Mon Sep 17 00:00:00 2001 From: William Grant Date: Sat, 4 Jun 2011 14:30:19 +1000 Subject: [PATCH 2/5] Pull c= value parsing out into Canonicalizationpolicy.from_c_value. --- dkim/__init__.py | 22 +++------------- dkim/canonicalization.py | 29 +++++++++++++++++++++ dkim/tests/test_canonicalization.py | 39 ++++++++++++++++++++++++++++- 3 files changed, 70 insertions(+), 20 deletions(-) diff --git a/dkim/__init__.py b/dkim/__init__.py index 6311ab1..cee4ae0 100644 --- a/dkim/__init__.py +++ b/dkim/__init__.py @@ -320,24 +320,9 @@ def verify(message, logger=None, dnsfunc=get_txt): logger.error("signature fields failed to validate: %s" % e) return False - m = re.match(b"(\w+)(?:/(\w+))?$", sig[b'c']) - if m is None: - logger.error( - "c= value is not in format method/method (%s)" % sig[b'c']) + canon_policy = CanonicalizationPolicy.from_c_value(sig.get(b'c'), logger) + if canon_policy is None: return False - can_headers = m.group(1) - if m.group(2) is not None: - can_body = m.group(2) - else: - can_body = b"simple" - - try: - header_algorithm = algorithms[can_headers] - body_algorithm = algorithms[can_body] - except KeyError as e: - logger.error("unknown canonicalization algorithm: %s" % e.message) - return False - canon_policy = CanonicalizationPolicy(header_algorithm, body_algorithm) headers = canon_policy.canonicalize_headers(headers) body = canon_policy.canonicalize_body(body) @@ -376,8 +361,7 @@ def verify(message, logger=None, dnsfunc=get_txt): include_headers = re.split(br"\s*:\s*", sig[b'h']) h = hasher() - hash_headers( - h, header_algorithm, headers, include_headers, sigheaders, sig) + hash_headers(h, canon_policy, headers, include_headers, sigheaders, sig) signature = base64.b64decode(re.sub(br"\s+", b"", sig[b'b'])) try: return RSASSA_PKCS1_v1_5_verify( diff --git a/dkim/canonicalization.py b/dkim/canonicalization.py index 7023dc8..7395dd4 100644 --- a/dkim/canonicalization.py +++ b/dkim/canonicalization.py @@ -90,6 +90,35 @@ class CanonicalizationPolicy: self.header_algorithm = header_algorithm self.body_algorithm = body_algorithm + @classmethod + def from_c_value(cls, c, logger=None): + """Construct the canonicalization policy described by a c= value. + + @param c: c= value from a DKIM-Signature header field + @return: a C{CanonicalizationPolicy}, or C{None} if the value is + invalid + """ + if c is None: + c = b'simple/simple' + m = c.split(b'/') + if len(m) not in (1, 2): + if logger: + logger.error( + "c= value is not in format method/method: %s" % c) + return None + if len(m) == 1: + m.append(b'simple') + can_headers, can_body = m + try: + header_algorithm = algorithms[can_headers] + body_algorithm = algorithms[can_body] + except KeyError as e: + if logger: + logger.error( + "unknown canonicalization algorithm: %s" % e.message) + return None + return cls(header_algorithm, body_algorithm) + def canonicalize_headers(self, headers): return self.header_algorithm.canonicalize_headers(headers) diff --git a/dkim/tests/test_canonicalization.py b/dkim/tests/test_canonicalization.py index 5269f72..fb97d44 100644 --- a/dkim/tests/test_canonicalization.py +++ b/dkim/tests/test_canonicalization.py @@ -18,7 +18,11 @@ import unittest -from dkim.canonicalization import Simple, Relaxed +from dkim.canonicalization import ( + CanonicalizationPolicy, + Simple, + Relaxed, + ) class BaseCanonicalizationTest(unittest.TestCase): @@ -94,6 +98,39 @@ class TestRelaxedAlgorithmBody(BaseCanonicalizationTest): b'Foo\r\nbar\r\n\r\n\r\n') +class TestCanonicalizationPolicyFromCValue(unittest.TestCase): + + def assertAlgorithms(self, header_algo, body_algo, c_value): + p = CanonicalizationPolicy.from_c_value(c_value) + self.assertEqual( + (header_algo, body_algo), + (p.header_algorithm, p.body_algorithm)) + + def assertValueDoesNotParse(self, c_value): + self.assertIs(None, CanonicalizationPolicy.from_c_value(c_value)) + + def test_both_default_to_simple(self): + self.assertAlgorithms(Simple, Simple, None) + + def test_relaxed_headers(self): + self.assertAlgorithms(Relaxed, Simple, b'relaxed') + + def test_relaxed_body(self): + self.assertAlgorithms(Simple, Relaxed, b'simple/relaxed') + + def test_relaxed_both(self): + self.assertAlgorithms(Relaxed, Relaxed, b'relaxed/relaxed') + + def test_explict_simple_both(self): + self.assertAlgorithms(Simple, Simple, b'simple/simple') + + def test_corruption_is_ignored(self): + self.assertValueDoesNotParse(b'') + self.assertValueDoesNotParse(b'simple/simple/simple') + self.assertValueDoesNotParse(b'relaxed/stressed') + self.assertValueDoesNotParse(b'worried') + + def test_suite(): from unittest import TestLoader return TestLoader().loadTestsFromName(__name__) From 6b4b98478b36a2093cb0a0d5f86a5e8549d92bfa Mon Sep 17 00:00:00 2001 From: William Grant Date: Sat, 4 Jun 2011 14:37:07 +1000 Subject: [PATCH 3/5] Add CanonicalizationPolicy.to_c_value(). --- dkim/canonicalization.py | 4 ++++ dkim/tests/test_canonicalization.py | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/dkim/canonicalization.py b/dkim/canonicalization.py index 7395dd4..98e54bb 100644 --- a/dkim/canonicalization.py +++ b/dkim/canonicalization.py @@ -119,6 +119,10 @@ class CanonicalizationPolicy: return None return cls(header_algorithm, body_algorithm) + def to_c_value(self): + return b'/'.join( + (self.header_algorithm.name, self.body_algorithm.name)) + def canonicalize_headers(self, headers): return self.header_algorithm.canonicalize_headers(headers) diff --git a/dkim/tests/test_canonicalization.py b/dkim/tests/test_canonicalization.py index fb97d44..ce0685d 100644 --- a/dkim/tests/test_canonicalization.py +++ b/dkim/tests/test_canonicalization.py @@ -131,6 +131,26 @@ class TestCanonicalizationPolicyFromCValue(unittest.TestCase): self.assertValueDoesNotParse(b'worried') +class TestCanonicalizationpolicyToCValue(unittest.TestCase): + + def assertCValue(self, c_value, header_algo, body_algo): + self.assertEqual( + c_value, + CanonicalizationPolicy(header_algo, body_algo).to_c_value()) + + def test_both_simple(self): + self.assertCValue(b'simple/simple', Simple, Simple) + + def test_relaxed_body(self): + self.assertCValue(b'simple/relaxed', Simple, Relaxed) + + def test_both_relaxed(self): + self.assertCValue(b'relaxed/relaxed', Relaxed, Relaxed) + + def test_relaxed_headers(self): + self.assertCValue(b'relaxed/simple', Relaxed, Simple) + + def test_suite(): from unittest import TestLoader return TestLoader().loadTestsFromName(__name__) From f8e938c1529e15c08157f2d81b57ddf29c30e506 Mon Sep 17 00:00:00 2001 From: William Grant Date: Sat, 4 Jun 2011 14:37:18 +1000 Subject: [PATCH 4/5] Use CanonicalizationPolicy in sign(). --- dkim/__init__.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/dkim/__init__.py b/dkim/__init__.py index cee4ae0..7224b4d 100644 --- a/dkim/__init__.py +++ b/dkim/__init__.py @@ -25,10 +25,7 @@ import logging import re import time -from dkim.canonicalization import ( - algorithms, - CanonicalizationPolicy, - ) +from dkim.canonicalization import CanonicalizationPolicy from dkim.crypto import ( DigestTooLargeError, HASH_ALGORITHMS, @@ -234,7 +231,9 @@ def sign(message, selector, domain, privkey, identity=None, if identity is not None and not identity.endswith(domain): raise ParameterError("identity must end with domain") - headers = algorithms[canonicalize[0]].canonicalize_headers(headers) + canon_policy = CanonicalizationPolicy.from_c_value( + b'/'.join(canonicalize)) + headers = canon_policy.canonicalize_headers(headers) if include_headers is None: include_headers = [x[0].lower() for x in headers] @@ -242,7 +241,7 @@ def sign(message, selector, domain, privkey, identity=None, include_headers = [x.lower() for x in include_headers] sign_headers = [x for x in headers if x[0].lower() in include_headers] - body = algorithms[canonicalize[1]].canonicalize_body(body) + body = canon_policy.canonicalize_body(body) h = hashlib.sha256() h.update(body) @@ -251,9 +250,7 @@ def sign(message, selector, domain, privkey, identity=None, sigfields = [x for x in [ (b'v', b"1"), (b'a', signature_algorithm), - (b'c', b"/".join( - (algorithms[canonicalize[0]].name, - algorithms[canonicalize[1]].name))), + (b'c', canon_policy.to_c_value()), (b'd', domain), (b'i', identity or b"@"+domain), length and (b'l', len(body)), @@ -266,7 +263,7 @@ def sign(message, selector, domain, privkey, identity=None, ] if x] sig_value = fold(b"; ".join(b"=".join(x) for x in sigfields)) - dkim_header = algorithms[canonicalize[0]].canonicalize_headers([ + dkim_header = canon_policy.canonicalize_headers([ [b'DKIM-Signature', b' ' + sig_value]])[0] # the dkim sig is hashed with no trailing crlf, even if the # canonicalization algorithm would add one. From 3008ad76a7e2422da5ac96dd950b2d5e63d05612 Mon Sep 17 00:00:00 2001 From: William Grant Date: Sat, 4 Jun 2011 14:38:29 +1000 Subject: [PATCH 5/5] Capitalise and unexport dkim.canonicalization.algorithms. --- dkim/canonicalization.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dkim/canonicalization.py b/dkim/canonicalization.py index 98e54bb..0e05981 100644 --- a/dkim/canonicalization.py +++ b/dkim/canonicalization.py @@ -22,7 +22,6 @@ import re __all__ = [ - 'algorithms', 'CanonicalizationPolicy', ] @@ -110,8 +109,8 @@ class CanonicalizationPolicy: m.append(b'simple') can_headers, can_body = m try: - header_algorithm = algorithms[can_headers] - body_algorithm = algorithms[can_body] + header_algorithm = ALGORITHMS[can_headers] + body_algorithm = ALGORITHMS[can_body] except KeyError as e: if logger: logger.error( @@ -130,4 +129,4 @@ class CanonicalizationPolicy: return self.body_algorithm.canonicalize_body(body) -algorithms = dict((c.name, c) for c in (Simple, Relaxed)) +ALGORITHMS = dict((c.name, c) for c in (Simple, Relaxed))