Extract and test DNS and canonicalization and some hashing functions.
This commit is contained in:
+28
-107
@@ -25,14 +25,17 @@ import logging
|
|||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
from dkim.canonicalization import algorithms
|
||||||
from dkim.crypto import (
|
from dkim.crypto import (
|
||||||
DigestTooLargeError,
|
DigestTooLargeError,
|
||||||
|
HASH_ALGORITHMS,
|
||||||
parse_pem_private_key,
|
parse_pem_private_key,
|
||||||
parse_public_key,
|
parse_public_key,
|
||||||
RSASSA_PKCS1_v1_5_sign,
|
RSASSA_PKCS1_v1_5_sign,
|
||||||
RSASSA_PKCS1_v1_5_verify,
|
RSASSA_PKCS1_v1_5_verify,
|
||||||
UnparsableKeyError,
|
UnparsableKeyError,
|
||||||
)
|
)
|
||||||
|
from dkim.dns import get_txt
|
||||||
from dkim.util import (
|
from dkim.util import (
|
||||||
get_default_logger,
|
get_default_logger,
|
||||||
InvalidTagValueList,
|
InvalidTagValueList,
|
||||||
@@ -40,8 +43,6 @@ from dkim.util import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"Simple",
|
|
||||||
"Relaxed",
|
|
||||||
"InternalError",
|
"InternalError",
|
||||||
"KeyFormatError",
|
"KeyFormatError",
|
||||||
"MessageFormatError",
|
"MessageFormatError",
|
||||||
@@ -50,42 +51,6 @@ __all__ = [
|
|||||||
"verify",
|
"verify",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class Simple:
|
|
||||||
"""Class that represents the "simple" canonicalization algorithm."""
|
|
||||||
|
|
||||||
name = b"simple"
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def canonicalize_headers(headers):
|
|
||||||
# No changes to headers.
|
|
||||||
return headers
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def canonicalize_body(body):
|
|
||||||
# Ignore all empty lines at the end of the message body.
|
|
||||||
return re.sub(b"(\r\n)*$", b"\r\n", body)
|
|
||||||
|
|
||||||
class Relaxed:
|
|
||||||
"""Class that represents the "relaxed" canonicalization algorithm."""
|
|
||||||
|
|
||||||
name = b"relaxed"
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def canonicalize_headers(headers):
|
|
||||||
# Convert all header field names to lowercase.
|
|
||||||
# Unfold all header lines.
|
|
||||||
# Compress WSP to single space.
|
|
||||||
# Remove all WSP at the start or end of the field value (strip).
|
|
||||||
return [(x[0].lower(), re.sub(br"\s+", b" ", re.sub(b"\r\n", b"", x[1])).strip()+b"\r\n") for x in headers]
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def canonicalize_body(body):
|
|
||||||
# Remove all trailing WSP at end of lines.
|
|
||||||
# Compress non-line-ending WSP to single space.
|
|
||||||
# Ignore all empty lines at the end of the message body.
|
|
||||||
return re.sub(b"(\r\n)*$", b"\r\n", re.sub(br"[\x09\x20]+", b" ", re.sub(b"[\\x09\\x20]+\r\n", b"\r\n", body)))
|
|
||||||
|
|
||||||
class DKIMException(Exception):
|
class DKIMException(Exception):
|
||||||
"""Base class for DKIM errors."""
|
"""Base class for DKIM errors."""
|
||||||
pass
|
pass
|
||||||
@@ -217,36 +182,6 @@ def rfc822_parse(message):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
def dnstxt_dnspython(name):
|
|
||||||
"""Return a TXT record associated with a DNS name."""
|
|
||||||
a = dns.resolver.query(name, dns.rdatatype.TXT)
|
|
||||||
for r in a.response.answer:
|
|
||||||
if r.rdtype == dns.rdatatype.TXT:
|
|
||||||
return b"".join(r.items[0].strings)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def dnstxt_pydns(name):
|
|
||||||
"""Return a TXT record associated with a DNS name."""
|
|
||||||
# Older pydns releases don't like a trailing dot.
|
|
||||||
if name.endswith('.'):
|
|
||||||
name = name[:-1]
|
|
||||||
DNS.ParseResolvConf()
|
|
||||||
response = DNS.DnsRequest(name, qtype='txt').req()
|
|
||||||
if not response.answers:
|
|
||||||
return None
|
|
||||||
return response.answers[0]['data'][0]
|
|
||||||
|
|
||||||
|
|
||||||
# Prefer dnspython if it's there, otherwise use pydns.
|
|
||||||
try:
|
|
||||||
import dns.resolver
|
|
||||||
dnstxt = dnstxt_dnspython
|
|
||||||
except ImportError:
|
|
||||||
import DNS
|
|
||||||
dnstxt = dnstxt_pydns
|
|
||||||
|
|
||||||
|
|
||||||
def fold(header):
|
def fold(header):
|
||||||
"""Fold a header line into multiple crlf-separated lines at column 72."""
|
"""Fold a header line into multiple crlf-separated lines at column 72."""
|
||||||
i = header.rfind(b"\r\n ")
|
i = header.rfind(b"\r\n ")
|
||||||
@@ -268,8 +203,9 @@ def fold(header):
|
|||||||
|
|
||||||
|
|
||||||
def sign(message, selector, domain, privkey, identity=None,
|
def sign(message, selector, domain, privkey, identity=None,
|
||||||
canonicalize=(Simple, Simple), include_headers=None, length=False,
|
canonicalize=(b'simple', b'simple'),
|
||||||
logger=None):
|
signature_algorithm=b'rsa-sha256',
|
||||||
|
include_headers=None, length=False, logger=None):
|
||||||
"""Sign an RFC822 message and return the DKIM-Signature header line.
|
"""Sign an RFC822 message and return the DKIM-Signature header line.
|
||||||
|
|
||||||
@param message: an RFC822 formatted message (with either \\n or \\r\\n line endings)
|
@param message: an RFC822 formatted message (with either \\n or \\r\\n line endings)
|
||||||
@@ -295,7 +231,7 @@ def sign(message, selector, domain, privkey, identity=None,
|
|||||||
if identity is not None and not identity.endswith(domain):
|
if identity is not None and not identity.endswith(domain):
|
||||||
raise ParameterError("identity must end with domain")
|
raise ParameterError("identity must end with domain")
|
||||||
|
|
||||||
headers = canonicalize[0].canonicalize_headers(headers)
|
headers = algorithms[canonicalize[0]].canonicalize_headers(headers)
|
||||||
|
|
||||||
if include_headers is None:
|
if include_headers is None:
|
||||||
include_headers = [x[0].lower() for x in headers]
|
include_headers = [x[0].lower() for x in headers]
|
||||||
@@ -303,7 +239,7 @@ def sign(message, selector, domain, privkey, identity=None,
|
|||||||
include_headers = [x.lower() for x in include_headers]
|
include_headers = [x.lower() for x in include_headers]
|
||||||
sign_headers = [x for x in headers if x[0].lower() in include_headers]
|
sign_headers = [x for x in headers if x[0].lower() in include_headers]
|
||||||
|
|
||||||
body = canonicalize[1].canonicalize_body(body)
|
body = algorithms[canonicalize[1]].canonicalize_body(body)
|
||||||
|
|
||||||
h = hashlib.sha256()
|
h = hashlib.sha256()
|
||||||
h.update(body)
|
h.update(body)
|
||||||
@@ -311,8 +247,10 @@ def sign(message, selector, domain, privkey, identity=None,
|
|||||||
|
|
||||||
sigfields = [x for x in [
|
sigfields = [x for x in [
|
||||||
(b'v', b"1"),
|
(b'v', b"1"),
|
||||||
(b'a', b"rsa-sha256"),
|
(b'a', signature_algorithm),
|
||||||
(b'c', b"/".join((canonicalize[0].name, canonicalize[1].name))),
|
(b'c', b"/".join(
|
||||||
|
(algorithms[canonicalize[0]].name,
|
||||||
|
algorithms[canonicalize[1]].name))),
|
||||||
(b'd', domain),
|
(b'd', domain),
|
||||||
(b'i', identity or b"@"+domain),
|
(b'i', identity or b"@"+domain),
|
||||||
length and (b'l', len(body)),
|
length and (b'l', len(body)),
|
||||||
@@ -325,7 +263,7 @@ def sign(message, selector, domain, privkey, identity=None,
|
|||||||
] if x]
|
] if x]
|
||||||
|
|
||||||
sig_value = fold(b"; ".join(b"=".join(x) for x in sigfields))
|
sig_value = fold(b"; ".join(b"=".join(x) for x in sigfields))
|
||||||
dkim_header = canonicalize[0].canonicalize_headers([
|
dkim_header = algorithms[canonicalize[0]].canonicalize_headers([
|
||||||
[b'DKIM-Signature', b' ' + sig_value]])[0]
|
[b'DKIM-Signature', b' ' + sig_value]])[0]
|
||||||
# the dkim sig is hashed with no trailing crlf, even if the
|
# the dkim sig is hashed with no trailing crlf, even if the
|
||||||
# canonicalization algorithm would add one.
|
# canonicalization algorithm would add one.
|
||||||
@@ -350,7 +288,7 @@ def sign(message, selector, domain, privkey, identity=None,
|
|||||||
return b'DKIM-Signature: ' + sig_value + b"\r\n"
|
return b'DKIM-Signature: ' + sig_value + b"\r\n"
|
||||||
|
|
||||||
|
|
||||||
def verify(message, logger=None, dnsfunc=dnstxt):
|
def verify(message, logger=None, dnsfunc=get_txt):
|
||||||
"""Verify a DKIM signature on an RFC822 formatted message.
|
"""Verify a DKIM signature on an RFC822 formatted message.
|
||||||
|
|
||||||
@param message: an RFC822 formatted message (with either \\n or \\r\\n line endings)
|
@param message: an RFC822 formatted message (with either \\n or \\r\\n line endings)
|
||||||
@@ -390,30 +328,19 @@ def verify(message, logger=None, dnsfunc=dnstxt):
|
|||||||
else:
|
else:
|
||||||
can_body = b"simple"
|
can_body = b"simple"
|
||||||
|
|
||||||
if can_headers == b"simple":
|
try:
|
||||||
canonicalize_headers = Simple
|
header_algorithm = algorithms[can_headers]
|
||||||
elif can_headers == b"relaxed":
|
body_algorithm = algorithms[can_body]
|
||||||
canonicalize_headers = Relaxed
|
except KeyError as e:
|
||||||
else:
|
logger.error("unknown canonicalization algorithm: %s" % e.message)
|
||||||
logger.error("unknown header canonicalization (%s)" % can_headers)
|
|
||||||
return False
|
return False
|
||||||
|
headers = header_algorithm.canonicalize_headers(headers)
|
||||||
|
body = body_algorithm.canonicalize_body(body)
|
||||||
|
|
||||||
headers = canonicalize_headers.canonicalize_headers(headers)
|
try:
|
||||||
|
hasher = HASH_ALGORITHMS[sig[b'a']]
|
||||||
if can_body == b"simple":
|
except KeyError as e:
|
||||||
body = Simple.canonicalize_body(body)
|
logger.error("unknown signature algorithm: %s" % e.message)
|
||||||
elif can_body == b"relaxed":
|
|
||||||
body = Relaxed.canonicalize_body(body)
|
|
||||||
else:
|
|
||||||
logger.error("unknown body canonicalization (%s)" % can_body)
|
|
||||||
return False
|
|
||||||
|
|
||||||
if sig[b'a'] == b"rsa-sha1":
|
|
||||||
hasher = hashlib.sha1
|
|
||||||
elif sig[b'a'] == b"rsa-sha256":
|
|
||||||
hasher = hashlib.sha256
|
|
||||||
else:
|
|
||||||
logger.error("unknown signature algorithm (%s)" % sig[b'a'])
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if b'l' in sig:
|
if b'l' in sig:
|
||||||
@@ -429,14 +356,8 @@ def verify(message, logger=None, dnsfunc=dnstxt):
|
|||||||
(base64.b64encode(bodyhash), sig[b'bh']))
|
(base64.b64encode(bodyhash), sig[b'bh']))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# dnstxt wants Unicode
|
name = sig[b's'] + b"._domainkey." + sig[b'd'] + b"."
|
||||||
try:
|
s = dnsfunc(name)
|
||||||
selector = sig[b's'].decode('ascii')
|
|
||||||
domain = sig[b'd'].decode('ascii')
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
return False
|
|
||||||
name = "%s._domainkey.%s." % (selector, domain)
|
|
||||||
s = dnsfunc(name).encode('utf-8')
|
|
||||||
if not s:
|
if not s:
|
||||||
return False
|
return False
|
||||||
try:
|
try:
|
||||||
@@ -452,7 +373,7 @@ def verify(message, logger=None, dnsfunc=dnstxt):
|
|||||||
include_headers = re.split(br"\s*:\s*", sig[b'h'])
|
include_headers = re.split(br"\s*:\s*", sig[b'h'])
|
||||||
h = hasher()
|
h = hasher()
|
||||||
hash_headers(
|
hash_headers(
|
||||||
h, canonicalize_headers, headers, include_headers, sigheaders, sig)
|
h, header_algorithm, headers, include_headers, sigheaders, sig)
|
||||||
signature = base64.b64decode(re.sub(br"\s+", b"", sig[b'b']))
|
signature = base64.b64decode(re.sub(br"\s+", b"", sig[b'b']))
|
||||||
try:
|
try:
|
||||||
return RSASSA_PKCS1_v1_5_verify(
|
return RSASSA_PKCS1_v1_5_verify(
|
||||||
|
|||||||
@@ -0,0 +1,86 @@
|
|||||||
|
# This software is provided 'as-is', without any express or implied
|
||||||
|
# warranty. In no event will the author be held liable for any damages
|
||||||
|
# arising from the use of this software.
|
||||||
|
#
|
||||||
|
# Permission is granted to anyone to use this software for any purpose,
|
||||||
|
# including commercial applications, and to alter it and redistribute it
|
||||||
|
# freely, subject to the following restrictions:
|
||||||
|
#
|
||||||
|
# 1. The origin of this software must not be misrepresented; you must not
|
||||||
|
# claim that you wrote the original software. If you use this software
|
||||||
|
# in a product, an acknowledgment in the product documentation would be
|
||||||
|
# appreciated but is not required.
|
||||||
|
# 2. Altered source versions must be plainly marked as such, and must not be
|
||||||
|
# misrepresented as being the original software.
|
||||||
|
# 3. This notice may not be removed or altered from any source distribution.
|
||||||
|
#
|
||||||
|
# Copyright (c) 2008 Greg Hewgill http://hewgill.com
|
||||||
|
#
|
||||||
|
# This has been modified from the original software.
|
||||||
|
# Copyright (c) 2011 William Grant <me@williamgrant.id.au>
|
||||||
|
|
||||||
|
import re
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
'algorithms',
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def strip_trailing_whitespace(content):
|
||||||
|
return re.sub(b"[\t ]+\r\n", b"\r\n", content)
|
||||||
|
|
||||||
|
|
||||||
|
def compress_whitespace(content):
|
||||||
|
return re.sub(b"[\t ]+", b" ", content)
|
||||||
|
|
||||||
|
|
||||||
|
def strip_trailing_lines(content):
|
||||||
|
return re.sub(b"(\r\n)*$", b"\r\n", content)
|
||||||
|
|
||||||
|
|
||||||
|
def unfold_header_value(content):
|
||||||
|
return re.sub(b"\r\n", b"", content)
|
||||||
|
|
||||||
|
|
||||||
|
class Simple:
|
||||||
|
"""Class that represents the "simple" canonicalization algorithm."""
|
||||||
|
|
||||||
|
name = b"simple"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def canonicalize_headers(headers):
|
||||||
|
# No changes to headers.
|
||||||
|
return headers
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def canonicalize_body(body):
|
||||||
|
# Ignore all empty lines at the end of the message body.
|
||||||
|
return strip_trailing_lines(body)
|
||||||
|
|
||||||
|
|
||||||
|
class Relaxed:
|
||||||
|
"""Class that represents the "relaxed" canonicalization algorithm."""
|
||||||
|
|
||||||
|
name = b"relaxed"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def canonicalize_headers(headers):
|
||||||
|
# Convert all header field names to lowercase.
|
||||||
|
# Unfold all header lines.
|
||||||
|
# Compress WSP to single space.
|
||||||
|
# Remove all WSP at the start or end of the field value (strip).
|
||||||
|
return [
|
||||||
|
(x[0].lower().rstrip(),
|
||||||
|
compress_whitespace(unfold_header_value(x[1])).strip() + b"\r\n")
|
||||||
|
for x in headers]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def canonicalize_body(body):
|
||||||
|
# Remove all trailing WSP at end of lines.
|
||||||
|
# Compress non-line-ending WSP to single space.
|
||||||
|
# Ignore all empty lines at the end of the message body.
|
||||||
|
return strip_trailing_lines(
|
||||||
|
compress_whitespace(strip_trailing_whitespace(body)))
|
||||||
|
|
||||||
|
|
||||||
|
algorithms = dict((c.name, c) for c in (Simple, Relaxed))
|
||||||
@@ -21,6 +21,7 @@
|
|||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'DigestTooLargeError',
|
'DigestTooLargeError',
|
||||||
|
'HASH_ALGORITHMS',
|
||||||
'parse_pem_private_key',
|
'parse_pem_private_key',
|
||||||
'parse_private_key',
|
'parse_private_key',
|
||||||
'parse_public_key',
|
'parse_public_key',
|
||||||
@@ -30,6 +31,7 @@ __all__ = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
|
import hashlib
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from dkim.asn1 import (
|
from dkim.asn1 import (
|
||||||
@@ -76,6 +78,10 @@ ASN1_RSAPrivateKey = [
|
|||||||
])
|
])
|
||||||
]
|
]
|
||||||
|
|
||||||
|
HASH_ALGORITHMS = {
|
||||||
|
b'rsa-sha1': hashlib.sha1,
|
||||||
|
b'rsa-sha256': hashlib.sha256,
|
||||||
|
}
|
||||||
|
|
||||||
# These values come from RFC 3447, section 9.2 Notes, page 43.
|
# These values come from RFC 3447, section 9.2 Notes, page 43.
|
||||||
HASH_ID_MAP = {
|
HASH_ID_MAP = {
|
||||||
|
|||||||
+68
@@ -0,0 +1,68 @@
|
|||||||
|
# This software is provided 'as-is', without any express or implied
|
||||||
|
# warranty. In no event will the author be held liable for any damages
|
||||||
|
# arising from the use of this software.
|
||||||
|
#
|
||||||
|
# Permission is granted to anyone to use this software for any purpose,
|
||||||
|
# including commercial applications, and to alter it and redistribute it
|
||||||
|
# freely, subject to the following restrictions:
|
||||||
|
#
|
||||||
|
# 1. The origin of this software must not be misrepresented; you must not
|
||||||
|
# claim that you wrote the original software. If you use this software
|
||||||
|
# in a product, an acknowledgment in the product documentation would be
|
||||||
|
# appreciated but is not required.
|
||||||
|
# 2. Altered source versions must be plainly marked as such, and must not be
|
||||||
|
# misrepresented as being the original software.
|
||||||
|
# 3. This notice may not be removed or altered from any source distribution.
|
||||||
|
#
|
||||||
|
# Copyright (c) 2008 Greg Hewgill http://hewgill.com
|
||||||
|
#
|
||||||
|
# This has been modified from the original software.
|
||||||
|
# Copyright (c) 2011 William Grant <me@williamgrant.id.au>
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
'get_txt'
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def get_txt_dnspython(name):
|
||||||
|
"""Return a TXT record associated with a DNS name."""
|
||||||
|
a = dns.resolver.query(name, dns.rdatatype.TXT)
|
||||||
|
for r in a.response.answer:
|
||||||
|
if r.rdtype == dns.rdatatype.TXT:
|
||||||
|
return b"".join(r.items[0].strings)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_txt_pydns(name):
|
||||||
|
"""Return a TXT record associated with a DNS name."""
|
||||||
|
# Older pydns releases don't like a trailing dot.
|
||||||
|
if name.endswith('.'):
|
||||||
|
name = name[:-1]
|
||||||
|
DNS.ParseResolvConf()
|
||||||
|
response = DNS.DnsRequest(name, qtype='txt').req()
|
||||||
|
if not response.answers:
|
||||||
|
return None
|
||||||
|
return response.answers[0]['data'][0]
|
||||||
|
|
||||||
|
|
||||||
|
# Prefer dnspython if it's there, otherwise use pydns.
|
||||||
|
try:
|
||||||
|
import dns.resolver
|
||||||
|
_get_txt = get_txt_dnspython
|
||||||
|
except ImportError:
|
||||||
|
import DNS
|
||||||
|
_get_txt = get_txt_pydns
|
||||||
|
|
||||||
|
|
||||||
|
def get_txt(name):
|
||||||
|
"""Return a TXT record associated with a DNS name.
|
||||||
|
|
||||||
|
@param name: The bytestring domain name to look up.
|
||||||
|
"""
|
||||||
|
# pydns needs Unicode, but DKIM's d= is ASCII (already punycoded).
|
||||||
|
try:
|
||||||
|
unicode_name = name.decode('ascii')
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
return None
|
||||||
|
return _get_txt(unicode_name).encode('utf-8')
|
||||||
@@ -21,11 +21,13 @@ import unittest
|
|||||||
|
|
||||||
def test_suite():
|
def test_suite():
|
||||||
from dkim.tests import (
|
from dkim.tests import (
|
||||||
|
test_canonicalization,
|
||||||
test_crypto,
|
test_crypto,
|
||||||
test_dkim,
|
test_dkim,
|
||||||
test_util,
|
test_util,
|
||||||
)
|
)
|
||||||
modules = [
|
modules = [
|
||||||
|
test_canonicalization,
|
||||||
test_crypto,
|
test_crypto,
|
||||||
test_dkim,
|
test_dkim,
|
||||||
test_util,
|
test_util,
|
||||||
|
|||||||
@@ -0,0 +1,99 @@
|
|||||||
|
# This software is provided 'as-is', without any express or implied
|
||||||
|
# warranty. In no event will the author be held liable for any damages
|
||||||
|
# arising from the use of this software.
|
||||||
|
#
|
||||||
|
# Permission is granted to anyone to use this software for any purpose,
|
||||||
|
# including commercial applications, and to alter it and redistribute it
|
||||||
|
# freely, subject to the following restrictions:
|
||||||
|
#
|
||||||
|
# 1. The origin of this software must not be misrepresented; you must not
|
||||||
|
# claim that you wrote the original software. If you use this software
|
||||||
|
# in a product, an acknowledgment in the product documentation would be
|
||||||
|
# appreciated but is not required.
|
||||||
|
# 2. Altered source versions must be plainly marked as such, and must not be
|
||||||
|
# misrepresented as being the original software.
|
||||||
|
# 3. This notice may not be removed or altered from any source distribution.
|
||||||
|
#
|
||||||
|
# Copyright (c) 2011 William Grant <me@williamgrant.id.au>
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from dkim.canonicalization import Simple, Relaxed
|
||||||
|
|
||||||
|
|
||||||
|
class BaseCanonicalizationTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def assertCanonicalForm(self, expected, input):
|
||||||
|
self.assertEqual(expected, self.func(expected))
|
||||||
|
self.assertEqual(expected, self.func(input))
|
||||||
|
|
||||||
|
|
||||||
|
class TestSimpleAlgorithmHeaders(BaseCanonicalizationTest):
|
||||||
|
|
||||||
|
func = staticmethod(Simple.canonicalize_headers)
|
||||||
|
|
||||||
|
def test_untouched(self):
|
||||||
|
test_headers = [(b'Foo ', b'bar\r\n'), (b'Foo', b'baz\r\n')]
|
||||||
|
self.assertCanonicalForm(
|
||||||
|
test_headers,
|
||||||
|
test_headers)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSimpleAlgorithmBody(BaseCanonicalizationTest):
|
||||||
|
|
||||||
|
func = staticmethod(Simple.canonicalize_body)
|
||||||
|
|
||||||
|
def test_strips_trailing_empty_lines_from_body(self):
|
||||||
|
self.assertCanonicalForm(
|
||||||
|
b'Foo \tbar \r\n',
|
||||||
|
b'Foo \tbar \r\n\r\n')
|
||||||
|
|
||||||
|
|
||||||
|
class TestRelaxedAlgorithmHeaders(BaseCanonicalizationTest):
|
||||||
|
|
||||||
|
func = staticmethod(Relaxed.canonicalize_headers)
|
||||||
|
|
||||||
|
def test_lowercases_names(self):
|
||||||
|
self.assertCanonicalForm(
|
||||||
|
[(b'foo', b'Bar\r\n'), (b'baz', b'Foo\r\n')],
|
||||||
|
[(b'Foo', b'Bar\r\n'), (b'BaZ', b'Foo\r\n')])
|
||||||
|
|
||||||
|
def test_unfolds_values(self):
|
||||||
|
self.assertCanonicalForm(
|
||||||
|
[(b'foo', b'Bar baz\r\n')],
|
||||||
|
[(b'Foo', b'Bar\r\n baz\r\n')])
|
||||||
|
|
||||||
|
def test_wsp_compresses_values(self):
|
||||||
|
self.assertCanonicalForm(
|
||||||
|
[(b'foo', b'Bar baz\r\n')],
|
||||||
|
[(b'Foo', b'Bar \t baz\r\n')])
|
||||||
|
|
||||||
|
def test_wsp_strips(self):
|
||||||
|
self.assertCanonicalForm(
|
||||||
|
[(b'foo', b'Bar baz\r\n')],
|
||||||
|
[(b'Foo ', b' Bar \t baz \r\n')])
|
||||||
|
|
||||||
|
|
||||||
|
class TestRelaxedAlgorithmBody(BaseCanonicalizationTest):
|
||||||
|
|
||||||
|
func = staticmethod(Relaxed.canonicalize_body)
|
||||||
|
|
||||||
|
def test_strips_trailing_wsp(self):
|
||||||
|
self.assertCanonicalForm(
|
||||||
|
b'Foo\r\nbar\r\n',
|
||||||
|
b'Foo \t\r\nbar\r\n')
|
||||||
|
|
||||||
|
def test_wsp_compresses(self):
|
||||||
|
self.assertCanonicalForm(
|
||||||
|
b'Foo bar\r\n',
|
||||||
|
b'Foo \t bar\r\n')
|
||||||
|
|
||||||
|
def test_strips_trailing_empty_lines(self):
|
||||||
|
self.assertCanonicalForm(
|
||||||
|
b'Foo\r\nbar\r\n',
|
||||||
|
b'Foo\r\nbar\r\n\r\n\r\n')
|
||||||
|
|
||||||
|
|
||||||
|
def test_suite():
|
||||||
|
from unittest import TestLoader
|
||||||
|
return TestLoader().loadTestsFromName(__name__)
|
||||||
@@ -53,8 +53,12 @@ class TestSignAndVerify(unittest.TestCase):
|
|||||||
self.key = read_test_data("test.private")
|
self.key = read_test_data("test.private")
|
||||||
|
|
||||||
def dnsfunc(self, domain):
|
def dnsfunc(self, domain):
|
||||||
|
try:
|
||||||
|
domain = domain.decode('ascii')
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
return None
|
||||||
self.assertEqual('test._domainkey.example.com.', domain)
|
self.assertEqual('test._domainkey.example.com.', domain)
|
||||||
return read_test_data("test.txt").decode('utf-8')
|
return read_test_data("test.txt")
|
||||||
|
|
||||||
def test_verifies(self):
|
def test_verifies(self):
|
||||||
# A message verifies after being signed.
|
# A message verifies after being signed.
|
||||||
|
|||||||
Reference in New Issue
Block a user