diff --git a/dkim/__init__.py b/dkim/__init__.py index 0762ce7..3533444 100644 --- a/dkim/__init__.py +++ b/dkim/__init__.py @@ -25,10 +25,7 @@ import logging import re import time -from dkim.canonicalization import ( - Relaxed, - Simple, - ) +from dkim.canonicalization import algorithms from dkim.crypto import ( DigestTooLargeError, parse_pem_private_key, @@ -234,7 +231,7 @@ def fold(header): def sign(message, selector, domain, privkey, identity=None, - canonicalize=(Simple, Simple), include_headers=None, length=False, + canonicalize=(b'simple', b'simple'), include_headers=None, length=False, logger=None): """Sign an RFC822 message and return the DKIM-Signature header line. @@ -261,7 +258,7 @@ def sign(message, selector, domain, privkey, identity=None, if identity is not None and not identity.endswith(domain): raise ParameterError("identity must end with domain") - headers = canonicalize[0].canonicalize_headers(headers) + headers = algorithms[canonicalize[0]].canonicalize_headers(headers) if include_headers is None: include_headers = [x[0].lower() for x in headers] @@ -269,7 +266,7 @@ def sign(message, selector, domain, privkey, identity=None, include_headers = [x.lower() for x in include_headers] sign_headers = [x for x in headers if x[0].lower() in include_headers] - body = canonicalize[1].canonicalize_body(body) + body = algorithms[canonicalize[1]].canonicalize_body(body) h = hashlib.sha256() h.update(body) @@ -278,7 +275,9 @@ def sign(message, selector, domain, privkey, identity=None, sigfields = [x for x in [ (b'v', b"1"), (b'a', b"rsa-sha256"), - (b'c', b"/".join((canonicalize[0].name, canonicalize[1].name))), + (b'c', b"/".join( + (algorithms[canonicalize[0]].name, + algorithms[canonicalize[1]].name))), (b'd', domain), (b'i', identity or b"@"+domain), length and (b'l', len(body)), @@ -291,7 +290,7 @@ def sign(message, selector, domain, privkey, identity=None, ] if x] sig_value = fold(b"; ".join(b"=".join(x) for x in sigfields)) - dkim_header = canonicalize[0].canonicalize_headers([ + dkim_header = algorithms[canonicalize[0]].canonicalize_headers([ [b'DKIM-Signature', b' ' + sig_value]])[0] # the dkim sig is hashed with no trailing crlf, even if the # canonicalization algorithm would add one. @@ -356,23 +355,14 @@ def verify(message, logger=None, dnsfunc=dnstxt): else: can_body = b"simple" - if can_headers == b"simple": - canonicalize_headers = Simple - elif can_headers == b"relaxed": - canonicalize_headers = Relaxed - else: - logger.error("unknown header canonicalization (%s)" % can_headers) - return False - - headers = canonicalize_headers.canonicalize_headers(headers) - - if can_body == b"simple": - body = Simple.canonicalize_body(body) - elif can_body == b"relaxed": - body = Relaxed.canonicalize_body(body) - else: - logger.error("unknown body canonicalization (%s)" % can_body) + try: + header_algorithm = algorithms[can_headers] + body_algorithm = algorithms[can_body] + except KeyError as e: + logger.error("unknown canonicalization algorithm: %s" % e.message) return False + headers = header_algorithm.canonicalize_headers(headers) + body = body_algorithm.canonicalize_body(body) if sig[b'a'] == b"rsa-sha1": hasher = hashlib.sha1 @@ -418,7 +408,7 @@ def verify(message, logger=None, dnsfunc=dnstxt): include_headers = re.split(br"\s*:\s*", sig[b'h']) h = hasher() hash_headers( - h, canonicalize_headers, headers, include_headers, sigheaders, sig) + h, header_algorithm, headers, include_headers, sigheaders, sig) signature = base64.b64decode(re.sub(br"\s+", b"", sig[b'b'])) try: return RSASSA_PKCS1_v1_5_verify( diff --git a/dkim/canonicalization.py b/dkim/canonicalization.py index 2cff4c2..ef192c0 100644 --- a/dkim/canonicalization.py +++ b/dkim/canonicalization.py @@ -64,3 +64,6 @@ class Relaxed: # Ignore all empty lines at the end of the message body. removed_trailing_lines = re.sub(b"(\r\n)*$", b"\r\n", compressed_wsp) return removed_trailing_lines + + +algorithms = dict((c.name, c) for c in (Simple, Relaxed))