Self consistent, but not externally verified ed25519 based on draft-ietf-dcrup-dkim-crypto-08

This commit is contained in:
Scott Kitterman
2018-02-05 03:53:03 -05:00
parent 10e12c60d8
commit 077bf4813c
18 changed files with 447 additions and 32 deletions
+82 -26
View File
@@ -44,6 +44,12 @@ try:
except:
pass
# only needed for ed25519 signing/verification
try:
import nacl.signing
import nacl.encoding
except:
pass
from dkim.canonicalization import (
CanonicalizationPolicy,
@@ -78,6 +84,8 @@ __all__ = [
"MessageFormatError",
"ParameterError",
"ValidationError",
"AuthresNotFoundError",
"NaClNotFoundError",
"CV_Pass",
"CV_Fail",
"CV_None",
@@ -152,6 +160,13 @@ class AuthresNotFoundError(DKIMException):
""" Authres Package not installed, needed for ARC """
pass
class NaClNotFoundError(DKIMException):
""" Nacl package not installed, needed for ed25119 signatures """
pass
class UnknownKeyTypeError(DKIMException):
""" Key type (k tag) is not known (rsa/ed25519) """
def select_headers(headers, include_headers):
"""Select message header fields to be signed/verified.
@@ -197,6 +212,20 @@ def hash_headers(hasher, canonicalize_headers, headers, include_headers,
hasher.update(y)
return sign_headers
def hash_headers_ed25519(pk, canonicalize_headers, headers, include_headers,
sigheader, sig):
"""Update hash for signed message header fields."""
hash_header = ''
sign_headers = select_headers(headers,include_headers)
# The call to _remove() assumes that the signature b= only appears
# once in the signature header
cheaders = canonicalize_headers.canonicalize_headers(
[(sigheader[0], RE_BTAG.sub(b'\\1',sigheader[1]))])
# the dkim sig is hashed with no trailing crlf, even if the
# canonicalization algorithm would add one.
for x,y in sign_headers + [(x, y.rstrip()) for x,y in cheaders]:
hash_header += x + y
return sign_headers, hash_header
def validate_signature_fields(sig, mandatory_fields=[b'v', b'a', b'b', b'bh', b'd', b'h', b's'], arc=False):
"""Validate DKIM or ARC Signature fields.
@@ -356,13 +385,22 @@ def load_pk_from_dns(name, dnsfunc=get_txt):
except InvalidTagValueList as e:
raise KeyFormatError(e)
try:
pk = parse_public_key(base64.b64decode(pub[b'p']))
keysize = bitsize(pk['modulus'])
if pub[b'k'] == b'ed25519':
pk = nacl.signing.VerifyKey(pub[b'p'], encoder=nacl.encoding.Base64Encoder)
keysize = 256
ktag = b'ed25519'
except KeyError:
raise KeyFormatError("incomplete public key: %s" % s)
except (TypeError,UnparsableKeyError) as e:
raise KeyFormatError("could not parse public key (%s): %s" % (pub[b'p'],e))
return pk, keysize
pub[b'k'] = b'rsa'
if pub[b'k'] == b'rsa':
try:
pk = parse_public_key(base64.b64decode(pub[b'p']))
keysize = bitsize(pk['modulus'])
except KeyError:
raise KeyFormatError("incomplete public key: %s" % s)
except (TypeError,UnparsableKeyError) as e:
raise KeyFormatError("could not parse public key (%s): %s" % (pub[b'p'],e))
ktag = b'rsa'
return pk, keysize, ktag
#: Abstract base class for holding messages and options during DKIM/ARC signing and verification.
class DomainSigner(object):
@@ -513,10 +551,14 @@ class DomainSigner(object):
h, canon_policy, headers, include_headers, header, sig)
self.logger.debug("sign %s headers: %r" % (header_name, h.hashed()))
try:
sig2 = RSASSA_PKCS1_v1_5_sign(h, pk)
except DigestTooLargeError:
raise ParameterError("digest too large for modulus")
if self.signature_algorithm == b'rsa-sha256' or self.signature_algorithm == b'rsa-sha1':
try:
sig2 = RSASSA_PKCS1_v1_5_sign(h, pk)
except DigestTooLargeError:
raise ParameterError("digest too large for modulus")
elif self.signature_algorithm == b'ed25519':
sigobj = pk.sign(h.digest())
sig2 = sigobj.signature
# Folding b= is explicity allowed, but yahoo and live.com are broken
#header_value += base64.b64encode(bytes(sig2))
# Instead of leaving unfolded (which lets an MTA fold it later and still
@@ -539,7 +581,7 @@ class DomainSigner(object):
def verify_sig(self, sig, include_headers, sig_header, dnsfunc):
name = sig[b's'] + b"._domainkey." + sig[b'd'] + b"."
try:
pk, self.keysize = load_pk_from_dns(name, dnsfunc)
pk, self.keysize, ktag = load_pk_from_dns(name, dnsfunc)
except KeyFormatError as e:
self.logger.error("%s" % e)
return False
@@ -584,16 +626,25 @@ class DomainSigner(object):
self.signed_headers = hash_headers(
h, canon_policy, headers, include_headers, sig_header, sig)
self.logger.debug("signed for %s: %r" % (sig_header[0], h.hashed()))
try:
signature = base64.b64decode(re.sub(br"\s+", b"", sig[b'b']))
res = RSASSA_PKCS1_v1_5_verify(h, signature, pk)
self.logger.debug("%s valid: %s" % (sig_header[0], res))
if res and self.keysize < self.minkey:
raise KeyFormatError("public key too small: %d" % self.keysize)
return res
except (TypeError,DigestTooLargeError) as e:
raise KeyFormatError("digest too large for modulus: %s"%e)
signature = base64.b64decode(re.sub(br"\s+", b"", sig[b'b']))
if ktag == b'rsa':
try:
res = RSASSA_PKCS1_v1_5_verify(h, signature, pk)
self.logger.debug("%s valid: %s" % (sig_header[0], res))
if res and self.keysize < self.minkey:
raise KeyFormatError("public key too small: %d" % self.keysize)
return res
except (TypeError,DigestTooLargeError) as e:
raise KeyFormatError("digest too large for modulus: %s"%e)
elif ktag == b'ed25519':
try:
pk.verify(h.digest(), signature)
self.logger.debug("%s valid" % (sig_header[0]))
return True
except (nacl.exceptions.BadSignatureError) as e:
return False
else:
raise UnknownKeyTypeError(ktag)
#: Hold messages and options during DKIM signing and verification.
class DKIM(DomainSigner):
@@ -632,12 +683,17 @@ class DKIM(DomainSigner):
#: @return: DKIM-Signature header field terminated by '\r\n'
#: @raise DKIMException: when the message, include_headers, or key are badly
#: formed.
def sign(self, selector, domain, privkey, identity=None,
def sign(self, selector, domain, privkey, signature_algorithm=None, identity=None,
canonicalize=(b'relaxed',b'simple'), include_headers=None, length=False):
try:
pk = parse_pem_private_key(privkey)
except UnparsableKeyError as e:
raise KeyFormatError(str(e))
if signature_algorithm:
self.signature_algorithm = signature_algorithm
if self.signature_algorithm == b'rsa-sha256' or self.signature_algorithm == b'rsa-sha1':
try:
pk = parse_pem_private_key(privkey)
except UnparsableKeyError as e:
raise KeyFormatError(str(e))
elif self.signature_algorithm == b'ed25519':
pk = nacl.signing.SigningKey(privkey, encoder=nacl.encoding.Base64Encoder)
if identity is not None and not identity.endswith(domain):
raise ParameterError("identity must end with domain")