dkim.canonicalization now has an algorithms dict mapping names to algorithms. Use it.

This commit is contained in:
William Grant
2011-06-03 21:52:33 +10:00
parent a1fc55bcaa
commit 28aaa6f2f2
2 changed files with 19 additions and 26 deletions
+16 -26
View File
@@ -25,10 +25,7 @@ import logging
import re import re
import time import time
from dkim.canonicalization import ( from dkim.canonicalization import algorithms
Relaxed,
Simple,
)
from dkim.crypto import ( from dkim.crypto import (
DigestTooLargeError, DigestTooLargeError,
parse_pem_private_key, parse_pem_private_key,
@@ -234,7 +231,7 @@ def fold(header):
def sign(message, selector, domain, privkey, identity=None, def sign(message, selector, domain, privkey, identity=None,
canonicalize=(Simple, Simple), include_headers=None, length=False, canonicalize=(b'simple', b'simple'), include_headers=None, length=False,
logger=None): logger=None):
"""Sign an RFC822 message and return the DKIM-Signature header line. """Sign an RFC822 message and return the DKIM-Signature header line.
@@ -261,7 +258,7 @@ def sign(message, selector, domain, privkey, identity=None,
if identity is not None and not identity.endswith(domain): if identity is not None and not identity.endswith(domain):
raise ParameterError("identity must end with domain") raise ParameterError("identity must end with domain")
headers = canonicalize[0].canonicalize_headers(headers) headers = algorithms[canonicalize[0]].canonicalize_headers(headers)
if include_headers is None: if include_headers is None:
include_headers = [x[0].lower() for x in headers] include_headers = [x[0].lower() for x in headers]
@@ -269,7 +266,7 @@ def sign(message, selector, domain, privkey, identity=None,
include_headers = [x.lower() for x in include_headers] include_headers = [x.lower() for x in include_headers]
sign_headers = [x for x in headers if x[0].lower() in include_headers] sign_headers = [x for x in headers if x[0].lower() in include_headers]
body = canonicalize[1].canonicalize_body(body) body = algorithms[canonicalize[1]].canonicalize_body(body)
h = hashlib.sha256() h = hashlib.sha256()
h.update(body) h.update(body)
@@ -278,7 +275,9 @@ def sign(message, selector, domain, privkey, identity=None,
sigfields = [x for x in [ sigfields = [x for x in [
(b'v', b"1"), (b'v', b"1"),
(b'a', b"rsa-sha256"), (b'a', b"rsa-sha256"),
(b'c', b"/".join((canonicalize[0].name, canonicalize[1].name))), (b'c', b"/".join(
(algorithms[canonicalize[0]].name,
algorithms[canonicalize[1]].name))),
(b'd', domain), (b'd', domain),
(b'i', identity or b"@"+domain), (b'i', identity or b"@"+domain),
length and (b'l', len(body)), length and (b'l', len(body)),
@@ -291,7 +290,7 @@ def sign(message, selector, domain, privkey, identity=None,
] if x] ] if x]
sig_value = fold(b"; ".join(b"=".join(x) for x in sigfields)) sig_value = fold(b"; ".join(b"=".join(x) for x in sigfields))
dkim_header = canonicalize[0].canonicalize_headers([ dkim_header = algorithms[canonicalize[0]].canonicalize_headers([
[b'DKIM-Signature', b' ' + sig_value]])[0] [b'DKIM-Signature', b' ' + sig_value]])[0]
# the dkim sig is hashed with no trailing crlf, even if the # the dkim sig is hashed with no trailing crlf, even if the
# canonicalization algorithm would add one. # canonicalization algorithm would add one.
@@ -356,23 +355,14 @@ def verify(message, logger=None, dnsfunc=dnstxt):
else: else:
can_body = b"simple" can_body = b"simple"
if can_headers == b"simple": try:
canonicalize_headers = Simple header_algorithm = algorithms[can_headers]
elif can_headers == b"relaxed": body_algorithm = algorithms[can_body]
canonicalize_headers = Relaxed except KeyError as e:
else: logger.error("unknown canonicalization algorithm: %s" % e.message)
logger.error("unknown header canonicalization (%s)" % can_headers)
return False
headers = canonicalize_headers.canonicalize_headers(headers)
if can_body == b"simple":
body = Simple.canonicalize_body(body)
elif can_body == b"relaxed":
body = Relaxed.canonicalize_body(body)
else:
logger.error("unknown body canonicalization (%s)" % can_body)
return False return False
headers = header_algorithm.canonicalize_headers(headers)
body = body_algorithm.canonicalize_body(body)
if sig[b'a'] == b"rsa-sha1": if sig[b'a'] == b"rsa-sha1":
hasher = hashlib.sha1 hasher = hashlib.sha1
@@ -418,7 +408,7 @@ def verify(message, logger=None, dnsfunc=dnstxt):
include_headers = re.split(br"\s*:\s*", sig[b'h']) include_headers = re.split(br"\s*:\s*", sig[b'h'])
h = hasher() h = hasher()
hash_headers( hash_headers(
h, canonicalize_headers, headers, include_headers, sigheaders, sig) h, header_algorithm, headers, include_headers, sigheaders, sig)
signature = base64.b64decode(re.sub(br"\s+", b"", sig[b'b'])) signature = base64.b64decode(re.sub(br"\s+", b"", sig[b'b']))
try: try:
return RSASSA_PKCS1_v1_5_verify( return RSASSA_PKCS1_v1_5_verify(
+3
View File
@@ -64,3 +64,6 @@ class Relaxed:
# Ignore all empty lines at the end of the message body. # Ignore all empty lines at the end of the message body.
removed_trailing_lines = re.sub(b"(\r\n)*$", b"\r\n", compressed_wsp) removed_trailing_lines = re.sub(b"(\r\n)*$", b"\r\n", compressed_wsp)
return removed_trailing_lines return removed_trailing_lines
algorithms = dict((c.name, c) for c in (Simple, Relaxed))