EMSA_PKCS1_v_1_5_encode now works out the hash ID itself.
This commit is contained in:
+2
-14
@@ -189,10 +189,6 @@ def validate_signature_fields(sig, debuglog=None):
|
|||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# These values come from RFC 3447, section 9.2 Notes, page 43.
|
|
||||||
HASHID_SHA1 = "\x2b\x0e\x03\x02\x1a"
|
|
||||||
HASHID_SHA256 = "\x60\x86\x48\x01\x65\x03\x04\x02\x01"
|
|
||||||
|
|
||||||
def rfc822_parse(message):
|
def rfc822_parse(message):
|
||||||
"""Parse a message in RFC822 format.
|
"""Parse a message in RFC822 format.
|
||||||
|
|
||||||
@@ -321,13 +317,10 @@ def sign(message, selector, domain, privkey, identity=None, canonicalize=(Simple
|
|||||||
h.update(x[0])
|
h.update(x[0])
|
||||||
h.update(":")
|
h.update(":")
|
||||||
h.update(x[1])
|
h.update(x[1])
|
||||||
d = h.digest()
|
|
||||||
if debuglog is not None:
|
|
||||||
print >>debuglog, "sign digest:", " ".join("%02x" % ord(x) for x in d)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sig2 = RSASSA_PKCS1_v1_5_sign(
|
sig2 = RSASSA_PKCS1_v1_5_sign(
|
||||||
d, HASHID_SHA256, pk['privateExponent'], pk['modulus'])
|
h, pk['privateExponent'], pk['modulus'])
|
||||||
except DigestTooLargeError:
|
except DigestTooLargeError:
|
||||||
raise ParameterError("digest too large for modulus")
|
raise ParameterError("digest too large for modulus")
|
||||||
sig_value += base64.b64encode(sig2)
|
sig_value += base64.b64encode(sig2)
|
||||||
@@ -392,10 +385,8 @@ def verify(message, debuglog=None, dnsfunc=dnstxt):
|
|||||||
|
|
||||||
if sig['a'] == "rsa-sha1":
|
if sig['a'] == "rsa-sha1":
|
||||||
hasher = hashlib.sha1
|
hasher = hashlib.sha1
|
||||||
hashid = HASHID_SHA1
|
|
||||||
elif sig['a'] == "rsa-sha256":
|
elif sig['a'] == "rsa-sha256":
|
||||||
hasher = hashlib.sha256
|
hasher = hashlib.sha256
|
||||||
hashid = HASHID_SHA256
|
|
||||||
else:
|
else:
|
||||||
if debuglog is not None:
|
if debuglog is not None:
|
||||||
print >>debuglog, "Unknown signature algorithm (%s)" % sig['a']
|
print >>debuglog, "Unknown signature algorithm (%s)" % sig['a']
|
||||||
@@ -432,13 +423,10 @@ def verify(message, debuglog=None, dnsfunc=dnstxt):
|
|||||||
h = hasher()
|
h = hasher()
|
||||||
hash_headers(
|
hash_headers(
|
||||||
h, canonicalize_headers, headers, include_headers, sigheaders, sig)
|
h, canonicalize_headers, headers, include_headers, sigheaders, sig)
|
||||||
d = h.digest()
|
|
||||||
if debuglog is not None:
|
|
||||||
print >>debuglog, "verify digest:", " ".join("%02x" % ord(x) for x in d)
|
|
||||||
signature = base64.b64decode(re.sub(r"\s+", "", sig['b']))
|
signature = base64.b64decode(re.sub(r"\s+", "", sig['b']))
|
||||||
try:
|
try:
|
||||||
return RSASSA_PKCS1_v1_5_verify(
|
return RSASSA_PKCS1_v1_5_verify(
|
||||||
d, hashid, signature, pk['publicExponent'], pk['modulus'])
|
h, signature, pk['publicExponent'], pk['modulus'])
|
||||||
except DigestTooLargeError:
|
except DigestTooLargeError:
|
||||||
if debuglog is not None:
|
if debuglog is not None:
|
||||||
print >>debuglog, "digest too large for modulus"
|
print >>debuglog, "digest too large for modulus"
|
||||||
|
|||||||
+17
-14
@@ -76,6 +76,13 @@ ASN1_RSAPrivateKey = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# These values come from RFC 3447, section 9.2 Notes, page 43.
|
||||||
|
HASH_ID_MAP = {
|
||||||
|
'sha1': "\x2b\x0e\x03\x02\x1a",
|
||||||
|
'sha256': "\x60\x86\x48\x01\x65\x03\x04\x02\x01",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class DigestTooLargeError(Exception):
|
class DigestTooLargeError(Exception):
|
||||||
"""The digest is too large to fit within the requested length."""
|
"""The digest is too large to fit within the requested length."""
|
||||||
pass
|
pass
|
||||||
@@ -146,21 +153,20 @@ def parse_pem_private_key(data):
|
|||||||
return parse_private_key(pkdata)
|
return parse_private_key(pkdata)
|
||||||
|
|
||||||
|
|
||||||
def EMSA_PKCS1_v1_5_encode(digest, mlen, hashid):
|
def EMSA_PKCS1_v1_5_encode(hash, mlen):
|
||||||
"""Encode a digest with RFC3447 EMSA-PKCS1-v1_5.
|
"""Encode a digest with RFC3447 EMSA-PKCS1-v1_5.
|
||||||
|
|
||||||
@param digest: digest byte string to encode
|
@param hash: hash object to encode
|
||||||
@param mlen: desired message length
|
@param mlen: desired message length
|
||||||
@param hashid: ID of the hash used to generate the digest
|
|
||||||
@return: encoded digest byte string
|
@return: encoded digest byte string
|
||||||
"""
|
"""
|
||||||
dinfo = asn1_build(
|
dinfo = asn1_build(
|
||||||
(SEQUENCE, [
|
(SEQUENCE, [
|
||||||
(SEQUENCE, [
|
(SEQUENCE, [
|
||||||
(OBJECT_IDENTIFIER, hashid),
|
(OBJECT_IDENTIFIER, HASH_ID_MAP[hash.name]),
|
||||||
(NULL, None),
|
(NULL, None),
|
||||||
]),
|
]),
|
||||||
(OCTET_STRING, digest),
|
(OCTET_STRING, hash.digest()),
|
||||||
]))
|
]))
|
||||||
if len(dinfo)+3 > mlen:
|
if len(dinfo)+3 > mlen:
|
||||||
raise DigestTooLargeError()
|
raise DigestTooLargeError()
|
||||||
@@ -211,32 +217,29 @@ def perform_rsa(message, exponent, modulus, mlen):
|
|||||||
return int2str(pow(str2int(message), exponent, modulus), mlen)
|
return int2str(pow(str2int(message), exponent, modulus), mlen)
|
||||||
|
|
||||||
|
|
||||||
def RSASSA_PKCS1_v1_5_sign(digest, hashid, private_exponent, modulus):
|
def RSASSA_PKCS1_v1_5_sign(hash, private_exponent, modulus):
|
||||||
"""Sign a digest with RFC3447 RSASSA-PKCS1-v1_5.
|
"""Sign a digest with RFC3447 RSASSA-PKCS1-v1_5.
|
||||||
|
|
||||||
@param digest: digest byte string to sign
|
@param hash: hash object to sign
|
||||||
@param hashid: ID of the hash used to generate the digest
|
|
||||||
@param private_exponent: private key exponent
|
@param private_exponent: private key exponent
|
||||||
@param modulus: key modulus
|
@param modulus: key modulus
|
||||||
@return: signed digest byte string
|
@return: signed digest byte string
|
||||||
"""
|
"""
|
||||||
modlen = len(int2str(modulus))
|
modlen = len(int2str(modulus))
|
||||||
encoded_digest = EMSA_PKCS1_v1_5_encode(digest, modlen, hashid)
|
encoded_digest = EMSA_PKCS1_v1_5_encode(hash, modlen)
|
||||||
return perform_rsa(encoded_digest, private_exponent, modulus, modlen)
|
return perform_rsa(encoded_digest, private_exponent, modulus, modlen)
|
||||||
|
|
||||||
|
|
||||||
def RSASSA_PKCS1_v1_5_verify(digest, hashid, signature, public_exponent,
|
def RSASSA_PKCS1_v1_5_verify(hash, signature, public_exponent, modulus):
|
||||||
modulus):
|
|
||||||
"""Verify a digest signed with RFC3447 RSASSA-PKCS1-v1_5.
|
"""Verify a digest signed with RFC3447 RSASSA-PKCS1-v1_5.
|
||||||
|
|
||||||
@param digest: digest byte string to check
|
@param hash: hash object to check
|
||||||
@param hashid: ID of the hash used to generate the digest
|
|
||||||
@param signature: signed digest byte string
|
@param signature: signed digest byte string
|
||||||
@param public_exponent: public key exponent
|
@param public_exponent: public key exponent
|
||||||
@param modulus: key modulus
|
@param modulus: key modulus
|
||||||
@return: True if the signature is valid, False otherwise
|
@return: True if the signature is valid, False otherwise
|
||||||
"""
|
"""
|
||||||
modlen = len(int2str(modulus))
|
modlen = len(int2str(modulus))
|
||||||
encoded_digest = EMSA_PKCS1_v1_5_encode(digest, modlen, hashid)
|
encoded_digest = EMSA_PKCS1_v1_5_encode(hash, modlen)
|
||||||
signed_digest = perform_rsa(signature, public_exponent, modulus, modlen)
|
signed_digest = perform_rsa(signature, public_exponent, modulus, modlen)
|
||||||
return encoded_digest == signed_digest
|
return encoded_digest == signed_digest
|
||||||
|
|||||||
+19
-23
@@ -17,12 +17,9 @@
|
|||||||
# Copyright (c) 2011 William Grant <me@williamgrant.id.au>
|
# Copyright (c) 2011 William Grant <me@williamgrant.id.au>
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
|
import hashlib
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from dkim import (
|
|
||||||
HASHID_SHA1,
|
|
||||||
HASHID_SHA256,
|
|
||||||
)
|
|
||||||
from dkim.crypto import (
|
from dkim.crypto import (
|
||||||
DigestTooLargeError,
|
DigestTooLargeError,
|
||||||
EMSA_PKCS1_v1_5_encode,
|
EMSA_PKCS1_v1_5_encode,
|
||||||
@@ -87,28 +84,28 @@ class TestParseKeys(unittest.TestCase):
|
|||||||
class TestEMSA_PKCS1_v1_5(unittest.TestCase):
|
class TestEMSA_PKCS1_v1_5(unittest.TestCase):
|
||||||
|
|
||||||
def test_encode_sha256(self):
|
def test_encode_sha256(self):
|
||||||
digest = '0123456789abcdef0123456789abcdef'
|
hash = hashlib.sha256('message')
|
||||||
self.assertEquals(
|
self.assertEquals(
|
||||||
'\x00\x01\xff\xff\xff\xff\xff\xff\xff\xff\x00'
|
'\x00\x01\xff\xff\xff\xff\xff\xff\xff\xff\x00'
|
||||||
'010\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04 '
|
'010\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04 '
|
||||||
+ digest,
|
+ hash.digest(),
|
||||||
EMSA_PKCS1_v1_5_encode(digest, 62, HASHID_SHA256))
|
EMSA_PKCS1_v1_5_encode(hash, 62))
|
||||||
|
|
||||||
def test_encode_sha1(self):
|
def test_encode_sha1(self):
|
||||||
digest = '0123456789abcdef0123'
|
hash = hashlib.sha1('message')
|
||||||
self.assertEquals(
|
self.assertEquals(
|
||||||
'\x00\x01\xff\xff\xff\xff\xff\xff\xff\xff\x00'
|
'\x00\x01\xff\xff\xff\xff\xff\xff\xff\xff\x00'
|
||||||
'0!0\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'
|
'0!0\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'
|
||||||
+ digest,
|
+ hash.digest(),
|
||||||
EMSA_PKCS1_v1_5_encode(digest, 46, HASHID_SHA1))
|
EMSA_PKCS1_v1_5_encode(hash, 46))
|
||||||
|
|
||||||
def test_encode_forbids_too_short(self):
|
def test_encode_forbids_too_short(self):
|
||||||
# PKCS#1 requires at least 8 bytes of padding, so there must be
|
# PKCS#1 requires at least 8 bytes of padding, so there must be
|
||||||
# at least that much space.
|
# at least that much space.
|
||||||
digest = '0123456789abcdef0123'
|
hash = hashlib.sha1('message')
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
DigestTooLargeError,
|
DigestTooLargeError,
|
||||||
EMSA_PKCS1_v1_5_encode, digest, 45, HASHID_SHA1)
|
EMSA_PKCS1_v1_5_encode, hash, 45)
|
||||||
|
|
||||||
|
|
||||||
class TestRSA(unittest.TestCase):
|
class TestRSA(unittest.TestCase):
|
||||||
@@ -136,31 +133,30 @@ class TestRSASSA(unittest.TestCase):
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.key = parse_pem_private_key(read_test_data('test.private'))
|
self.key = parse_pem_private_key(read_test_data('test.private'))
|
||||||
|
self.hash = hashlib.sha1(self.test_digest)
|
||||||
|
|
||||||
test_digest = '0123456789abcdef0123'
|
test_digest = '0123456789abcdef0123'
|
||||||
test_signature = (
|
test_signature = (
|
||||||
'3702809f62db933a5c3d18c2c76a3470658d2e79868fac98eaaca7e87d0cdc7'
|
'cc8d3647d64dd3bc12984947a27bdfbb565041fcc9db781afb4b60d29d288d8d60de'
|
||||||
'fd091182673ed57c66531835d814ff367ffa3d764e74ca8ab301982d13eabb5'
|
'9e1916d6f81569c3e72af442538dd6aecb50a6de9a14565fdd679c46ff7842482e15'
|
||||||
'dbe90e5c46ea223c5d3ee835aa74aaffe06e8018affeb78b5178818cb33656c'
|
'e5aa078549621b6f12ca8cd57ecfad95b18e53581e131c6c3c7cd01cb153adeb439d'
|
||||||
'ed462905bc0dc608e354f6ed3d4ec160ce9326ed227ccb0c1e5ba22098e10e6'
|
'2d6ab8b215b19be0e69ef490885004a474eb26d747a219693e8c').decode('hex')
|
||||||
'c083').decode('hex')
|
|
||||||
|
|
||||||
def test_sign_and_verify(self):
|
def test_sign_and_verify(self):
|
||||||
signature = RSASSA_PKCS1_v1_5_sign(
|
signature = RSASSA_PKCS1_v1_5_sign(
|
||||||
self.test_digest, HASHID_SHA1, TEST_KEY_PRIVATE_EXPONENT,
|
self.hash, TEST_KEY_PRIVATE_EXPONENT, TEST_KEY_MODULUS)
|
||||||
TEST_KEY_MODULUS)
|
|
||||||
self.assertEquals(
|
self.assertEquals(
|
||||||
self.test_signature, signature)
|
self.test_signature, signature)
|
||||||
self.assertTrue(
|
self.assertTrue(
|
||||||
RSASSA_PKCS1_v1_5_verify(
|
RSASSA_PKCS1_v1_5_verify(
|
||||||
self.test_digest, HASHID_SHA1, signature,
|
self.hash, signature, TEST_KEY_PUBLIC_EXPONENT,
|
||||||
TEST_KEY_PUBLIC_EXPONENT, TEST_KEY_MODULUS))
|
TEST_KEY_MODULUS))
|
||||||
|
|
||||||
def test_invalid_signature(self):
|
def test_invalid_signature(self):
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
RSASSA_PKCS1_v1_5_verify(
|
RSASSA_PKCS1_v1_5_verify(
|
||||||
self.test_digest, HASHID_SHA1, self.test_signature,
|
self.hash, self.test_signature, TEST_KEY_PUBLIC_EXPONENT,
|
||||||
TEST_KEY_PUBLIC_EXPONENT, TEST_KEY_MODULUS + 1))
|
TEST_KEY_MODULUS + 1))
|
||||||
|
|
||||||
|
|
||||||
def test_suite():
|
def test_suite():
|
||||||
|
|||||||
Reference in New Issue
Block a user