Drop Python 2.5 support, add 3.1 and 3.2.

This commit is contained in:
William Grant
2011-06-03 00:18:38 +10:00
11 changed files with 221 additions and 179 deletions
+2 -2
View File
@@ -12,8 +12,8 @@ This is pydkim 0.3.
REQUIREMENTS
Python 2.5 or later is required.
The dnspython library (http://www.dnspython.org) library is required.
- Python 2.x >= 2.6, or Python 3.x >= 3.1.
- dnspython or pydns. dnspython is preferred if both are present.
INSTALLATION
+92 -82
View File
@@ -54,7 +54,7 @@ __all__ = [
class Simple:
"""Class that represents the "simple" canonicalization algorithm."""
name = "simple"
name = b"simple"
@staticmethod
def canonicalize_headers(headers):
@@ -64,12 +64,12 @@ class Simple:
@staticmethod
def canonicalize_body(body):
# Ignore all empty lines at the end of the message body.
return re.sub("(\r\n)*$", "\r\n", body)
return re.sub(b"(\r\n)*$", b"\r\n", body)
class Relaxed:
"""Class that represents the "relaxed" canonicalization algorithm."""
name = "relaxed"
name = b"relaxed"
@staticmethod
def canonicalize_headers(headers):
@@ -77,14 +77,14 @@ class Relaxed:
# Unfold all header lines.
# Compress WSP to single space.
# Remove all WSP at the start or end of the field value (strip).
return [(x[0].lower(), re.sub(r"\s+", " ", re.sub("\r\n", "", x[1])).strip()+"\r\n") for x in headers]
return [(x[0].lower(), re.sub(br"\s+", b" ", re.sub(b"\r\n", b"", x[1])).strip()+b"\r\n") for x in headers]
@staticmethod
def canonicalize_body(body):
# Remove all trailing WSP at end of lines.
# Compress non-line-ending WSP to single space.
# Ignore all empty lines at the end of the message body.
return re.sub("(\r\n)*$", "\r\n", re.sub(r"[\x09\x20]+", " ", re.sub("[\\x09\\x20]+\r\n", "\r\n", body)))
return re.sub(b"(\r\n)*$", b"\r\n", re.sub(br"[\x09\x20]+", b" ", re.sub(b"[\\x09\\x20]+\r\n", b"\r\n", body)))
class DKIMException(Exception):
"""Base class for DKIM errors."""
@@ -131,11 +131,11 @@ def hash_headers(hasher, canonicalize_headers, headers, include_headers,
# The call to _remove() assumes that the signature b= only appears
# once in the signature header
cheaders = canonicalize_headers.canonicalize_headers(
[(sigheaders[0][0], _remove(sigheaders[0][1], sig['b']))])
[(sigheaders[0][0], _remove(sigheaders[0][1], sig[b'b']))])
sign_headers += [(x[0], x[1].rstrip()) for x in cheaders]
for x in sign_headers:
hasher.update(x[0])
hasher.update(":")
hasher.update(b":")
hasher.update(x[1])
@@ -147,40 +147,43 @@ def validate_signature_fields(sig):
@param sig: A dict mapping field keys to values.
"""
mandatory_fields = ('v', 'a', 'b', 'bh', 'd', 'h', 's')
mandatory_fields = (b'v', b'a', b'b', b'bh', b'd', b'h', b's')
for field in mandatory_fields:
if field not in sig:
raise ValidationError("signature missing %s=" % field)
if sig['v'] != "1":
raise ValidationError("v= value is not 1 (%s)" % sig['v'])
if re.match(r"[\s0-9A-Za-z+/]+=*$", sig['b']) is None:
raise ValidationError("b= value is not valid base64 (%s)" % sig['b'])
if re.match(r"[\s0-9A-Za-z+/]+=*$", sig['bh']) is None:
if sig[b'v'] != b"1":
raise ValidationError("v= value is not 1 (%s)" % sig[b'v'])
if re.match(br"[\s0-9A-Za-z+/]+=*$", sig[b'b']) is None:
raise ValidationError("b= value is not valid base64 (%s)" % sig[b'b'])
if re.match(br"[\s0-9A-Za-z+/]+=*$", sig[b'bh']) is None:
raise ValidationError(
"bh= value is not valid base64 (%s)" % sig['bh'])
if 'i' in sig and (
not sig['i'].endswith(sig['d']) or
sig['i'][-len(sig['d'])-1] not in "@."):
"bh= value is not valid base64 (%s)" % sig[b'bh'])
# Nasty hack to support both str and bytes... check for both the
# character and integer values.
if b'i' in sig and (
not sig[b'i'].endswith(sig[b'd']) or
sig[b'i'][-len(sig[b'd'])-1] not in ('@', '.', 64, 46)):
raise ValidationError(
"i= domain is not a subdomain of d= (i=%s d=%d)" %
(sig['i'], sig['d']))
if 'l' in sig and re.match(r"\d{,76}$", sig['l']) is None:
(sig[b'i'], sig[b'd']))
if b'l' in sig and re.match(br"\d{,76}$", sig['l']) is None:
raise ValidationError(
"l= value is not a decimal integer (%s)" % sig['l'])
if 'q' in sig and sig['q'] != "dns/txt":
raise ValidationError("q= value is not dns/txt (%s)" % sig['q'])
if 't' in sig and re.match(r"\d+$", sig['t']) is None:
"l= value is not a decimal integer (%s)" % sig[b'l'])
if b'q' in sig and sig[b'q'] != b"dns/txt":
raise ValidationError("q= value is not dns/txt (%s)" % sig[b'q'])
if b't' in sig and re.match(br"\d+$", sig[b't']) is None:
raise ValidationError(
"t= value is not a decimal integer (%s)" % sig['t'])
if 'x' in sig:
if re.match(r"\d+$", sig['x']) is None:
"t= value is not a decimal integer (%s)" % sig[b't'])
if b'x' in sig:
if re.match(br"\d+$", sig[b'x']) is None:
raise ValidationError(
"x= value is not a decimal integer (%s)" % sig['x'])
if int(sig['x']) < int(sig['t']):
"x= value is not a decimal integer (%s)" % sig[b'x'])
if int(sig[b'x']) < int(sig[b't']):
raise ValidationError(
"x= value is less than t= value (x=%s t=%s)" %
(sig['x'], sig['t']))
(sig[b'x'], sig[b't']))
def rfc822_parse(message):
"""Parse a message in RFC822 format.
@@ -191,27 +194,27 @@ def rfc822_parse(message):
The body is a CRLF-separated string.
"""
headers = []
lines = re.split("\r?\n", message)
lines = re.split(b"\r?\n", message)
i = 0
while i < len(lines):
if len(lines[i]) == 0:
# End of headers, return what we have plus the body, excluding the blank line.
i += 1
break
if re.match(r"[\x09\x20]", lines[i][0]):
headers[-1][1] += lines[i]+"\r\n"
if lines[i][0] in ("\x09", "\x20", 0x09, 0x20):
headers[-1][1] += lines[i]+b"\r\n"
else:
m = re.match(r"([\x21-\x7e]+?):", lines[i])
m = re.match(br"([\x21-\x7e]+?):", lines[i])
if m is not None:
headers.append([m.group(1), lines[i][m.end(0):]+"\r\n"])
elif lines[i].startswith("From "):
headers.append([m.group(1), lines[i][m.end(0):]+b"\r\n"])
elif lines[i].startswith(b"From "):
pass
else:
raise MessageFormatError("Unexpected characters in RFC822 header: %s" % lines[i])
i += 1
return (headers, "\r\n".join(lines[i:]))
return (headers, b"\r\n".join(lines[i:]))
def dnstxt_dnspython(name):
@@ -219,7 +222,7 @@ def dnstxt_dnspython(name):
a = dns.resolver.query(name, dns.rdatatype.TXT)
for r in a.response.answer:
if r.rdtype == dns.rdatatype.TXT:
return "".join(r.items[0].strings)
return b"".join(r.items[0].strings)
return None
@@ -246,20 +249,20 @@ except ImportError:
def fold(header):
"""Fold a header line into multiple crlf-separated lines at column 72."""
i = header.rfind("\r\n ")
i = header.rfind(b"\r\n ")
if i == -1:
pre = ""
pre = b""
else:
i += 3
pre = header[:i]
header = header[i:]
while len(header) > 72:
i = header[:72].rfind(" ")
i = header[:72].rfind(b" ")
if i == -1:
j = i
else:
j = i + 1
pre += header[:i] + "\r\n "
pre += header[:i] + b"\r\n "
header = header[j:]
return pre + header
@@ -286,7 +289,7 @@ def sign(message, selector, domain, privkey, identity=None,
try:
pk = parse_pem_private_key(privkey)
except UnparsableKeyError, e:
except UnparsableKeyError as e:
raise KeyFormatError(str(e))
if identity is not None and not identity.endswith(domain):
@@ -307,26 +310,26 @@ def sign(message, selector, domain, privkey, identity=None,
bodyhash = base64.b64encode(h.digest())
sigfields = [x for x in [
('v', "1"),
('a', "rsa-sha256"),
('c', "%s/%s" % (canonicalize[0].name, canonicalize[1].name)),
('d', domain),
('i', identity or "@"+domain),
length and ('l', len(body)),
('q', "dns/txt"),
('s', selector),
('t', str(int(time.time()))),
('h', " : ".join(x[0] for x in sign_headers)),
('bh', bodyhash),
('b', ""),
(b'v', b"1"),
(b'a', b"rsa-sha256"),
(b'c', b"/".join((canonicalize[0].name, canonicalize[1].name))),
(b'd', domain),
(b'i', identity or b"@"+domain),
length and (b'l', len(body)),
(b'q', b"dns/txt"),
(b's', selector),
(b't', str(int(time.time())).encode('ascii')),
(b'h', b" : ".join(x[0] for x in sign_headers)),
(b'bh', bodyhash),
(b'b', b""),
] if x]
sig_value = fold("; ".join("%s=%s" % x for x in sigfields))
sig_value = fold(b"; ".join(b"=".join(x) for x in sigfields))
dkim_header = canonicalize[0].canonicalize_headers([
['DKIM-Signature', ' ' + sig_value]])[0]
[b'DKIM-Signature', b' ' + sig_value]])[0]
# the dkim sig is hashed with no trailing crlf, even if the
# canonicalization algorithm would add one.
if dkim_header[1][-2:] == '\r\n':
if dkim_header[1][-2:] == b'\r\n':
dkim_header = (dkim_header[0], dkim_header[1][:-2])
sign_headers.append(dkim_header)
@@ -334,7 +337,7 @@ def sign(message, selector, domain, privkey, identity=None,
h = hashlib.sha256()
for x in sign_headers:
h.update(x[0])
h.update(":")
h.update(b":")
h.update(x[1])
try:
@@ -342,9 +345,9 @@ def sign(message, selector, domain, privkey, identity=None,
h, pk['privateExponent'], pk['modulus'])
except DigestTooLargeError:
raise ParameterError("digest too large for modulus")
sig_value += base64.b64encode(sig2)
sig_value += base64.b64encode(bytes(sig2))
return 'DKIM-Signature: ' + sig_value + "\r\n"
return b'DKIM-Signature: ' + sig_value + b"\r\n"
def verify(message, logger=None, dnsfunc=dnstxt):
@@ -359,7 +362,7 @@ def verify(message, logger=None, dnsfunc=dnstxt):
(headers, body) = rfc822_parse(message)
sigheaders = [x for x in headers if x[0].lower() == "dkim-signature"]
sigheaders = [x for x in headers if x[0].lower() == b"dkim-signature"]
if len(sigheaders) < 1:
return False
@@ -372,24 +375,24 @@ def verify(message, logger=None, dnsfunc=dnstxt):
try:
validate_signature_fields(sig)
except ValidationError, e:
except ValidationError as e:
logger.error("signature fields failed to validate: %s" % e)
return False
m = re.match("(\w+)(?:/(\w+))?$", sig['c'])
m = re.match(b"(\w+)(?:/(\w+))?$", sig[b'c'])
if m is None:
logger.error(
"c= value is not in format method/method (%s)" % sig['c'])
"c= value is not in format method/method (%s)" % sig[b'c'])
return False
can_headers = m.group(1)
if m.group(2) is not None:
can_body = m.group(2)
else:
can_body = "simple"
can_body = b"simple"
if can_headers == "simple":
if can_headers == b"simple":
canonicalize_headers = Simple
elif can_headers == "relaxed":
elif can_headers == b"relaxed":
canonicalize_headers = Relaxed
else:
logger.error("unknown header canonicalization (%s)" % can_headers)
@@ -397,36 +400,43 @@ def verify(message, logger=None, dnsfunc=dnstxt):
headers = canonicalize_headers.canonicalize_headers(headers)
if can_body == "simple":
if can_body == b"simple":
body = Simple.canonicalize_body(body)
elif can_body == "relaxed":
elif can_body == b"relaxed":
body = Relaxed.canonicalize_body(body)
else:
logger.error("unknown body canonicalization (%s)" % can_body)
return False
if sig['a'] == "rsa-sha1":
if sig[b'a'] == b"rsa-sha1":
hasher = hashlib.sha1
elif sig['a'] == "rsa-sha256":
elif sig[b'a'] == b"rsa-sha256":
hasher = hashlib.sha256
else:
logger.error("unknown signature algorithm (%s)" % sig['a'])
logger.error("unknown signature algorithm (%s)" % sig[b'a'])
return False
if 'l' in sig:
body = body[:int(sig['l'])]
if b'l' in sig:
body = body[:int(sig[b'l'])]
h = hasher()
h.update(body)
bodyhash = h.digest()
logger.debug("bh: %s" % base64.b64encode(bodyhash))
if bodyhash != base64.b64decode(re.sub(r"\s+", "", sig['bh'])):
if bodyhash != base64.b64decode(re.sub(br"\s+", b"", sig[b'bh'])):
logger.error(
"body hash mismatch (got %s, expected %s)" %
(base64.b64encode(bodyhash), sig['bh']))
(base64.b64encode(bodyhash), sig[b'bh']))
return False
s = dnsfunc(sig['s']+"._domainkey."+sig['d']+".")
# dnstxt wants Unicode
try:
selector = sig[b's'].decode('ascii')
domain = sig[b'd'].decode('ascii')
except UnicodeDecodeError:
return False
name = "%s._domainkey.%s." % (selector, domain)
s = dnsfunc(name).encode('utf-8')
if not s:
return False
try:
@@ -434,16 +444,16 @@ def verify(message, logger=None, dnsfunc=dnstxt):
except InvalidTagValueList:
return False
try:
pk = parse_public_key(base64.b64decode(pub['p']))
except UnparsableKeyError, e:
pk = parse_public_key(base64.b64decode(pub[b'p']))
except UnparsableKeyError as e:
logger.error("could not parse public key: %s" % e)
return False
include_headers = re.split(r"\s*:\s*", sig['h'])
include_headers = re.split(br"\s*:\s*", sig[b'h'])
h = hasher()
hash_headers(
h, canonicalize_headers, headers, include_headers, sigheaders, sig)
signature = base64.b64decode(re.sub(r"\s+", "", sig['b']))
signature = base64.b64decode(re.sub(br"\s+", b"", sig[b'b']))
try:
return RSASSA_PKCS1_v1_5_verify(
h, signature, pk['publicExponent'], pk['modulus'])
+20 -13
View File
@@ -50,25 +50,25 @@ def asn1_parse(template, data):
@param data: byte string data to parse
@return: decoded structure
"""
data = bytearray(data)
r = []
i = 0
for t in template:
tag = ord(data[i])
tag = data[i]
i += 1
if tag == t[0]:
length = ord(data[i])
length = data[i]
i += 1
if length & 0x80:
n = length & 0x7f
length = 0
for j in range(n):
length = (length << 8) | ord(data[i])
length = (length << 8) | data[i]
i += 1
if tag == INTEGER:
n = 0
for j in range(length):
n = (n << 8) | ord(data[i])
n = (n << 8) | data[i]
i += 1
r.append(n)
elif tag == BIT_STRING:
@@ -100,14 +100,21 @@ def asn1_length(n):
"""
assert n >= 0
if n < 0x7f:
return chr(n)
r = ""
return bytearray([n])
r = bytearray()
while n > 0:
r = chr(n & 0xff) + r
r.insert(n & 0xff)
n >>= 8
return r
def asn1_encode(type, data):
length = asn1_length(len(data))
length.insert(0, type)
length.extend(data)
return length
def asn1_build(node):
"""Build a DER-encoded ASN.1 data structure.
@@ -115,16 +122,16 @@ def asn1_build(node):
@return: DER-encoded ASN.1 byte string
"""
if node[0] == OCTET_STRING:
return chr(OCTET_STRING) + asn1_length(len(node[1])) + node[1]
return asn1_encode(OCTET_STRING, node[1])
if node[0] == NULL:
assert node[1] is None
return chr(NULL) + asn1_length(0)
return asn1_encode(NULL, b'')
elif node[0] == OBJECT_IDENTIFIER:
return chr(OBJECT_IDENTIFIER) + asn1_length(len(node[1])) + node[1]
return asn1_encode(OBJECT_IDENTIFIER, node[1])
elif node[0] == SEQUENCE:
r = ""
r = bytearray()
for x in node[1]:
r += asn1_build(x)
return chr(SEQUENCE) + asn1_length(len(r)) + r
return asn1_encode(SEQUENCE, r)
else:
raise ASN1FormatError("Unexpected tag in template: %02x" % node[0])
+12 -11
View File
@@ -79,8 +79,8 @@ ASN1_RSAPrivateKey = [
# These values come from RFC 3447, section 9.2 Notes, page 43.
HASH_ID_MAP = {
'sha1': "\x2b\x0e\x03\x02\x1a",
'sha256': "\x60\x86\x48\x01\x65\x03\x04\x02\x01",
'sha1': b"\x2b\x0e\x03\x02\x1a",
'sha256': b"\x60\x86\x48\x01\x65\x03\x04\x02\x01",
}
@@ -105,7 +105,7 @@ def parse_public_key(data):
# Not sure why the [1:] is necessary to skip a byte.
x = asn1_parse(ASN1_Object, data)
pkd = asn1_parse(ASN1_RSAPublicKey, x[0][1][1:])
except ASN1FormatError, e:
except ASN1FormatError as e:
raise UnparsableKeyError(str(e))
pk = {
'modulus': pkd[0][0],
@@ -122,7 +122,7 @@ def parse_private_key(data):
"""
try:
pka = asn1_parse(ASN1_RSAPrivateKey, data)
except ASN1FormatError, e:
except ASN1FormatError as e:
raise UnparsableKeyError(str(e))
pk = {
'version': pka[0][0],
@@ -144,12 +144,12 @@ def parse_pem_private_key(data):
@param data: RFC3447 RSAPrivateKey in PEM format.
@return: RSA private key
"""
m = re.search("--\n(.*?)\n--", data, re.DOTALL)
m = re.search(b"--\n(.*?)\n--", data, re.DOTALL)
if m is None:
raise UnparsableKeyError("Private key not found")
try:
pkdata = base64.b64decode(m.group(1))
except TypeError, e:
except TypeError as e:
raise UnparsableKeyError(str(e))
return parse_private_key(pkdata)
@@ -171,7 +171,7 @@ def EMSA_PKCS1_v1_5_encode(hash, mlen):
]))
if len(dinfo) + 11 > mlen:
raise DigestTooLargeError()
return "\x00\x01"+"\xff"*(mlen-len(dinfo)-3)+"\x00"+dinfo
return b"\x00\x01"+b"\xff"*(mlen-len(dinfo)-3)+b"\x00"+dinfo
def str2int(s):
@@ -180,9 +180,10 @@ def str2int(s):
@param s: byte string representing a positive integer to convert
@return: converted integer
"""
s = bytearray(s)
r = 0
for c in s:
r = (r << 8) | ord(c)
r = (r << 8) | c
return r
@@ -195,15 +196,15 @@ def int2str(n, length=-1):
specified
"""
assert n >= 0
r = []
r = bytearray()
while length < 0 or len(r) < length:
r.append(chr(n & 0xff))
r.append(n & 0xff)
n >>= 8
if length < 0 and n == 0:
break
r.reverse()
assert length < 0 or len(r) == length
return ''.join(r)
return r
def perform_rsa(message, exponent, modulus, mlen):
+1 -1
View File
@@ -30,5 +30,5 @@ def test_suite():
test_dkim,
test_util,
]
suites = map(lambda x: x.test_suite(), modules)
suites = [x.test_suite() for x in modules]
return unittest.TestSuite(suites)
+30 -30
View File
@@ -17,6 +17,7 @@
# Copyright (c) 2011 William Grant <me@williamgrant.id.au>
import base64
import binascii
import hashlib
import unittest
@@ -54,13 +55,13 @@ TEST_KEY_PRIVATE_EXPONENT = int(
class TestStrIntConversion(unittest.TestCase):
def test_str2int(self):
self.assertEquals(1234, str2int('\x04\xd2'))
self.assertEqual(1234, str2int(b'\x04\xd2'))
def test_int2str(self):
self.assertEquals('\x04\xd2', int2str(1234))
self.assertEqual(b'\x04\xd2', int2str(1234))
def test_int2str_with_length(self):
self.assertEquals('\x00\x00\x04\xd2', int2str(1234, 4))
self.assertEqual(b'\x00\x00\x04\xd2', int2str(1234, 4))
def test_int2str_fails_on_negative(self):
self.assertRaises(AssertionError, int2str, -1)
@@ -70,39 +71,39 @@ class TestParseKeys(unittest.TestCase):
def test_parse_pem_private_key(self):
key = parse_pem_private_key(read_test_data('test.private'))
self.assertEquals(key['modulus'], TEST_KEY_MODULUS)
self.assertEquals(key['publicExponent'], TEST_KEY_PUBLIC_EXPONENT)
self.assertEquals(key['privateExponent'], TEST_KEY_PRIVATE_EXPONENT)
self.assertEqual(key['modulus'], TEST_KEY_MODULUS)
self.assertEqual(key['publicExponent'], TEST_KEY_PUBLIC_EXPONENT)
self.assertEqual(key['privateExponent'], TEST_KEY_PRIVATE_EXPONENT)
def test_parse_public_key(self):
data = read_test_data('test.txt')
key = parse_public_key(base64.b64decode(parse_tag_value(data)['p']))
self.assertEquals(key['modulus'], TEST_KEY_MODULUS)
self.assertEquals(key['publicExponent'], TEST_KEY_PUBLIC_EXPONENT)
key = parse_public_key(base64.b64decode(parse_tag_value(data)[b'p']))
self.assertEqual(key['modulus'], TEST_KEY_MODULUS)
self.assertEqual(key['publicExponent'], TEST_KEY_PUBLIC_EXPONENT)
class TestEMSA_PKCS1_v1_5(unittest.TestCase):
def test_encode_sha256(self):
hash = hashlib.sha256('message')
self.assertEquals(
'\x00\x01\xff\xff\xff\xff\xff\xff\xff\xff\x00'
'010\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04 '
+ hash.digest(),
hash = hashlib.sha256(b'message')
self.assertEqual(
b'\x00\x01\xff\xff\xff\xff\xff\xff\xff\xff\x00'
b'010\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04'
b' ' + hash.digest(),
EMSA_PKCS1_v1_5_encode(hash, 62))
def test_encode_sha1(self):
hash = hashlib.sha1('message')
self.assertEquals(
'\x00\x01\xff\xff\xff\xff\xff\xff\xff\xff\x00'
'0!0\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'
hash = hashlib.sha1(b'message')
self.assertEqual(
b'\x00\x01\xff\xff\xff\xff\xff\xff\xff\xff\x00'
b'0!0\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'
+ hash.digest(),
EMSA_PKCS1_v1_5_encode(hash, 46))
def test_encode_forbids_too_short(self):
# PKCS#1 requires at least 8 bytes of padding, so there must be
# at least that much space.
hash = hashlib.sha1('message')
hash = hashlib.sha1(b'message')
self.assertRaises(
DigestTooLargeError,
EMSA_PKCS1_v1_5_encode, hash, 45)
@@ -110,7 +111,7 @@ class TestEMSA_PKCS1_v1_5(unittest.TestCase):
class TestRSA(unittest.TestCase):
message = '0004fb'.decode('hex')
message = binascii.unhexlify(b'0004fb')
modulus = 186101
modlen = 3
public_exponent = 907
@@ -119,14 +120,14 @@ class TestRSA(unittest.TestCase):
def test_perform(self):
signed = perform_rsa(
self.message, self.private_exponent, self.modulus, self.modlen)
self.assertEquals('01f140'.decode('hex'), signed)
self.assertEqual(binascii.unhexlify(b'01f140'), signed)
def test_sign_and_verify(self):
signed = perform_rsa(
self.message, self.private_exponent, self.modulus, self.modlen)
unsigned = perform_rsa(
signed, self.public_exponent, self.modulus, self.modlen)
self.assertEquals(self.message, unsigned)
self.assertEqual(self.message, unsigned)
class TestRSASSA(unittest.TestCase):
@@ -135,18 +136,17 @@ class TestRSASSA(unittest.TestCase):
self.key = parse_pem_private_key(read_test_data('test.private'))
self.hash = hashlib.sha1(self.test_digest)
test_digest = '0123456789abcdef0123'
test_signature = (
'cc8d3647d64dd3bc12984947a27bdfbb565041fcc9db781afb4b60d29d288d8d60de'
'9e1916d6f81569c3e72af442538dd6aecb50a6de9a14565fdd679c46ff7842482e15'
'e5aa078549621b6f12ca8cd57ecfad95b18e53581e131c6c3c7cd01cb153adeb439d'
'2d6ab8b215b19be0e69ef490885004a474eb26d747a219693e8c').decode('hex')
test_digest = b'0123456789abcdef0123'
test_signature = binascii.unhexlify(
b'cc8d3647d64dd3bc12984947a27bdfbb565041fcc9db781afb4b60d29d288d8d60d'
b'e9e1916d6f81569c3e72af442538dd6aecb50a6de9a14565fdd679c46ff7842482e'
b'15e5aa078549621b6f12ca8cd57ecfad95b18e53581e131c6c3c7cd01cb153adeb4'
b'39d2d6ab8b215b19be0e69ef490885004a474eb26d747a219693e8c')
def test_sign_and_verify(self):
signature = RSASSA_PKCS1_v1_5_sign(
self.hash, TEST_KEY_PRIVATE_EXPONENT, TEST_KEY_MODULUS)
self.assertEquals(
self.test_signature, signature)
self.assertEqual(self.test_signature, signature)
self.assertTrue(
RSASSA_PKCS1_v1_5_verify(
self.hash, signature, TEST_KEY_PUBLIC_EXPONENT,
+14 -7
View File
@@ -28,20 +28,21 @@ def read_test_data(filename):
The files live in dkim/tests/data.
"""
path = os.path.join(os.path.dirname(__file__), 'data', filename)
return open(path).read()
with open(path, 'rb') as f:
return f.read()
class TestFold(unittest.TestCase):
def test_short_line(self):
self.assertEqual(
"foo", dkim.fold("foo"))
b"foo", dkim.fold(b"foo"))
def DISABLED_test_long_line(self):
# The function is terribly broken, not passing even this simple
# test.
self.assertEqual(
"foo"*24 + "\r\n foo", dkim.fold("foo" * 25))
b"foo" * 24 + b"\r\n foo", dkim.fold(b"foo" * 25))
class TestSignAndVerify(unittest.TestCase):
@@ -53,18 +54,24 @@ class TestSignAndVerify(unittest.TestCase):
def dnsfunc(self, domain):
self.assertEqual('test._domainkey.example.com.', domain)
return read_test_data("test.txt")
return read_test_data("test.txt").decode('utf-8')
def test_verifies(self):
# A message verifies after being signed.
sig = dkim.sign(self.message, "test", "example.com", self.key)
sig = dkim.sign(self.message, b"test", b"example.com", self.key)
res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc)
self.assertTrue(res)
def test_altered_body_fails(self):
# An altered body fails verification.
sig = dkim.sign(self.message, "test", "example.com", self.key)
res = dkim.verify(sig + self.message + "foo", dnsfunc=self.dnsfunc)
sig = dkim.sign(self.message, b"test", b"example.com", self.key)
res = dkim.verify(sig + self.message + b"foo", dnsfunc=self.dnsfunc)
self.assertFalse(res)
def test_badly_encoded_domain_fails(self):
# Domains should be ASCII. Bad ASCII causes verification to fail.
sig = dkim.sign(self.message, b"test", b"example.com\xe9", self.key)
res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc)
self.assertFalse(res)
+14 -16
View File
@@ -30,38 +30,36 @@ class TestParseTagValue(unittest.TestCase):
def test_single(self):
self.assertEqual(
{'foo': 'bar'},
parse_tag_value('foo=bar'))
{b'foo': b'bar'},
parse_tag_value(b'foo=bar'))
def test_trailing_separator_ignored(self):
self.assertEqual(
{'foo': 'bar'},
parse_tag_value('foo=bar;'))
{b'foo': b'bar'},
parse_tag_value(b'foo=bar;'))
def test_multiple(self):
self.assertEqual(
{'foo': 'bar', 'baz': 'foo'},
parse_tag_value('foo=bar;baz=foo'))
{b'foo': b'bar', b'baz': b'foo'},
parse_tag_value(b'foo=bar;baz=foo'))
def test_value_with_equals(self):
self.assertEqual(
{'foo': 'bar', 'baz': 'foo=bar'},
parse_tag_value('foo=bar;baz=foo=bar'))
{b'foo': b'bar', b'baz': b'foo=bar'},
parse_tag_value(b'foo=bar;baz=foo=bar'))
def test_whitespace_is_stripped(self):
self.assertEqual(
{'foo': 'bar', 'baz': 'f oo=bar'},
parse_tag_value(' foo \t= bar;\tbaz= f oo=bar '))
{b'foo': b'bar', b'baz': b'f oo=bar'},
parse_tag_value(b' foo \t= bar;\tbaz= f oo=bar '))
def test_missing_value_is_an_error(self):
self.assertRaisesRegexp(
InvalidTagSpec, 'baz',
parse_tag_value, 'foo=bar;baz')
self.assertRaises(
InvalidTagSpec, parse_tag_value, b'foo=bar;baz')
def test_duplicate_tag_is_an_error(self):
self.assertRaisesRegexp(
DuplicateTag, 'foo',
parse_tag_value, 'foo=bar;foo=baz')
self.assertRaises(
DuplicateTag, parse_tag_value, b'foo=bar;foo=baz')
def test_suite():
+2 -2
View File
@@ -55,13 +55,13 @@ def parse_tag_value(tag_list):
@param tag_list: A string containing a DKIM Tag=Value list.
"""
tags = {}
tag_specs = tag_list.split(';')
tag_specs = tag_list.split(b';')
# Trailing semicolons are valid.
if not tag_specs[-1]:
tag_specs.pop()
for tag_spec in tag_specs:
try:
key, value = tag_spec.split('=', 1)
key, value = tag_spec.split(b'=', 1)
except ValueError:
raise InvalidTagSpec(tag_spec)
if key.strip() in tags:
+20 -10
View File
@@ -3,11 +3,11 @@
# This software is provided 'as-is', without any express or implied
# warranty. In no event will the author be held liable for any damages
# arising from the use of this software.
#
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
#
# 1. The origin of this software must not be misrepresented; you must not
# claim that you wrote the original software. If you use this software
# in a product, an acknowledgment in the product documentation would be
@@ -15,30 +15,40 @@
# 2. Altered source versions must be plainly marked as such, and must not be
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
#
#
# Copyright (c) 2008 Greg Hewgill http://hewgill.com
#
# This has been modified from the original software.
# Copyright (c) 2011 William Grant <me@williamgrant.id.au>
from __future__ import print_function
import sys
import dkim
if len(sys.argv) < 4 or len(sys.argv) > 5:
print >>sys.stderr, "Usage: dkimsign.py selector domain privatekeyfile [identity]"
print("Usage: dkimsign.py selector domain privatekeyfile [identity]", file=sys.stderr)
sys.exit(1)
selector = sys.argv[1]
domain = sys.argv[2]
if sys.version_info[0] >= 3:
# Make sys.stdin and stdout binary streams.
sys.stdin = sys.stdin.detach()
sys.stdout = sys.stdout.detach()
selector = sys.argv[1].encode('ascii')
domain = sys.argv[2].encode('ascii')
privatekeyfile = sys.argv[3]
if len(sys.argv) > 5:
identity = sys.argv[4]
identity = sys.argv[4].encode('ascii')
else:
identity = None
message = sys.stdin.read()
try:
sig = dkim.sign(message, selector, domain, open(privatekeyfile, "r").read(), identity = identity)
sig = dkim.sign(message, selector, domain, open(privatekeyfile, "rb").read(), identity = identity)
sys.stdout.write(sig)
sys.stdout.write(message)
except Exception, e:
print >>sys.stderr, e
except Exception as e:
print(e, file=sys.stderr)
sys.stdout.write(message)
+14 -5
View File
@@ -3,11 +3,11 @@
# This software is provided 'as-is', without any express or implied
# warranty. In no event will the author be held liable for any damages
# arising from the use of this software.
#
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
#
# 1. The origin of this software must not be misrepresented; you must not
# claim that you wrote the original software. If you use this software
# in a product, an acknowledgment in the product documentation would be
@@ -15,15 +15,24 @@
# 2. Altered source versions must be plainly marked as such, and must not be
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
#
#
# Copyright (c) 2008 Greg Hewgill http://hewgill.com
#
# This has been modified from the original software.
# Copyright (c) 2011 William Grant <me@williamgrant.id.au>
from __future__ import print_function
import sys
import dkim
if sys.version_info[0] >= 3:
# Make sys.stdin a binary stream.
sys.stdin = sys.stdin.detach()
message = sys.stdin.read()
if not dkim.verify(message):
print "signature verification failed"
print("signature verification failed")
sys.exit(1)
print "signature ok"
print("signature ok")