PEP8 Blank Lines Style Issues, lp:1782596

This commit is contained in:
Thomas Ward
2018-10-27 20:53:32 -04:00
committed by Scott Kitterman
parent 45266f6f05
commit a1ca321fa5
13 changed files with 50 additions and 1 deletions
+27
View File
@@ -109,6 +109,7 @@ CV_Pass = b'pass'
CV_Fail = b'fail'
CV_None = b'none'
class HashThrough(object):
def __init__(self, hasher):
self.data = []
@@ -128,45 +129,56 @@ class HashThrough(object):
def hashed(self):
return b''.join(self.data)
def bitsize(x):
"""Return size of long in bits."""
return len(bin(x)) - 2
class DKIMException(Exception):
"""Base class for DKIM errors."""
pass
class InternalError(DKIMException):
"""Internal error in dkim module. Should never happen."""
pass
class KeyFormatError(DKIMException):
"""Key format error while parsing an RSA public or private key."""
pass
class MessageFormatError(DKIMException):
"""RFC822 message format error."""
pass
class ParameterError(DKIMException):
"""Input parameter error."""
pass
class ValidationError(DKIMException):
"""Validation error."""
pass
class AuthresNotFoundError(DKIMException):
""" Authres Package not installed, needed for ARC """
pass
class NaClNotFoundError(DKIMException):
""" Nacl package not installed, needed for ed25119 signatures """
pass
class UnknownKeyTypeError(DKIMException):
""" Key type (k tag) is not known (rsa/ed25519) """
def select_headers(headers, include_headers):
"""Select message header fields to be signed/verified.
@@ -192,10 +204,12 @@ def select_headers(headers, include_headers):
lastindex[h] = i
return sign_headers
# FWS = ([*WSP CRLF] 1*WSP) / obs-FWS ; Folding white space [RFC5322]
FWS = br'(?:(?:\s*\r?\n)?\s+)?'
RE_BTAG = re.compile(br'([;\s]b'+FWS+br'=)(?:'+FWS+br'[a-zA-Z0-9+/=])*(?:\r?\n\Z)?')
def hash_headers(hasher, canonicalize_headers, headers, include_headers,
sigheader, sig):
"""Update hash for signed message header fields."""
@@ -212,6 +226,7 @@ def hash_headers(hasher, canonicalize_headers, headers, include_headers,
hasher.update(y)
return sign_headers
def hash_headers_ed25519(pk, canonicalize_headers, headers, include_headers,
sigheader, sig):
"""Update hash for signed message header fields."""
@@ -227,6 +242,7 @@ def hash_headers_ed25519(pk, canonicalize_headers, headers, include_headers,
hash_header += x + y
return sign_headers, hash_header
def validate_signature_fields(sig, mandatory_fields=[b'v', b'a', b'b', b'bh', b'd', b'h', b's'], arc=False):
"""Validate DKIM or ARC Signature fields.
Basic checks for presence and correct formatting of mandatory fields.
@@ -299,6 +315,7 @@ def validate_signature_fields(sig, mandatory_fields=[b'v', b'a', b'b', b'bh', b'
"x= value is less than t= value (x=%s t=%s)" %
(sig[b'x'], sig[b't']))
def rfc822_parse(message):
"""Parse a message in RFC822 format.
@@ -327,6 +344,7 @@ def rfc822_parse(message):
i += 1
return (headers, b"\r\n".join(lines[i:]))
def text(s):
"""Normalize bytes/str to str for python 2/3 compatible doctests.
>>> text(b'foo')
@@ -341,6 +359,7 @@ def text(s):
if type(s) is str: return s
return s.encode('ascii')
def fold(header, namelen=0):
"""Fold a header line into multiple crlf-separated lines at column 72.
@@ -388,6 +407,7 @@ def fold(header, namelen=0):
else:
return pre + header
def load_pk_from_dns(name, dnsfunc=get_txt):
s = dnsfunc(name)
if not s:
@@ -424,6 +444,7 @@ def load_pk_from_dns(name, dnsfunc=get_txt):
raise KeyFormatError('unknown algorithm in k= tag: {0}'.format(pub[b'k']))
return pk, keysize, ktag
#: Abstract base class for holding messages and options during DKIM/ARC signing and verification.
class DomainSigner(object):
# NOTE - the first 2 indentation levels are 2 instead of 4
@@ -668,6 +689,7 @@ class DomainSigner(object):
else:
raise UnknownKeyTypeError(ktag)
#: Hold messages and options during DKIM signing and verification.
class DKIM(DomainSigner):
#: Sign an RFC822 message and return the DKIM-Signature header line.
@@ -807,6 +829,7 @@ class DKIM(DomainSigner):
return self.verify_sig(sig, include_headers, sigheaders[idx], dnsfunc)
#: Hold messages and options during ARC signing and verification.
class ARC(DomainSigner):
#: Header fields used by ARC
@@ -1156,6 +1179,7 @@ class ARC(DomainSigner):
self.logger.debug("as valid: %r" % as_valid)
return output
def sign(message, selector, domain, privkey, identity=None,
canonicalize=(b'relaxed', b'simple'),
signature_algorithm=b'rsa-sha256',
@@ -1178,6 +1202,7 @@ def sign(message, selector, domain, privkey, identity=None,
d = DKIM(message,logger=logger,signature_algorithm=signature_algorithm)
return d.sign(selector, domain, privkey, identity=identity, canonicalize=canonicalize, include_headers=include_headers, length=length)
def verify(message, logger=None, dnsfunc=get_txt, minkey=1024):
"""Verify the first (topmost) DKIM signature on an RFC822 formatted message.
@param message: an RFC822 formatted message (with either \\n or \\r\\n line endings)
@@ -1196,6 +1221,7 @@ def verify(message, logger=None, dnsfunc=get_txt, minkey=1024):
dkim_sign = sign
dkim_verify = verify
def arc_sign(message, selector, domain, privkey,
srv_id, signature_algorithm=b'rsa-sha256',
include_headers=None, timestamp=None,
@@ -1219,6 +1245,7 @@ def arc_sign(message, selector, domain, privkey,
return a.sign(selector, domain, privkey, srv_id, include_headers=include_headers,
timestamp=timestamp, standardize=standardize)
def arc_verify(message, logger=None, dnsfunc=get_txt, minkey=1024):
"""Verify the ARC chain on an RFC822 formatted message.
@param message: an RFC822 formatted message (with either \\n or \\r\\n line endings)
+2
View File
@@ -35,6 +35,7 @@ import dkim
logging.basicConfig(level=10)
def main():
if len(sys.argv) != 4:
print("Usage: arcsign.py selector domain privatekeyfile", file=sys.stderr)
@@ -66,5 +67,6 @@ def main():
# print(e, file=sys.stderr)
#sys.stdout.write(message)
if __name__ == "__main__":
main()
+2
View File
@@ -32,6 +32,7 @@ import sys
import dkim
def main():
if sys.version_info[0] >= 3:
# Make sys.stdin a binary stream.
@@ -50,5 +51,6 @@ def main():
if verbose:
print(repr(results))
if __name__ == "__main__":
main()
+1
View File
@@ -94,6 +94,7 @@ def asn1_parse(template, data):
except IndexError:
raise ASN1FormatError("Data truncated at byte %d"%i)
def asn1_length(n):
"""Return a string representing a field length in ASN.1 format.
+2
View File
@@ -29,6 +29,7 @@ import argparse
import dkim
def main():
# Backward compatibility hack because argparse doesn't support optional
# positional arguments
@@ -81,5 +82,6 @@ def main():
print(e, file=sys.stderr)
sys.stdout.write(message)
if __name__ == "__main__":
main()
+2
View File
@@ -27,6 +27,7 @@ import sys
import dkim
def main():
if sys.version_info[0] >= 3:
# Make sys.stdin a binary stream.
@@ -45,5 +46,6 @@ def main():
sys.exit(1)
print("signature ok")
if __name__ == "__main__":
main()
+5
View File
@@ -39,6 +39,7 @@ BITS_REQUIRED = 2048
# what openssl binary do we use to do key manipulation?
OPENSSL_BINARY = '/usr/bin/openssl'
def GenRSAKeys(private_key_file):
""" Generates a suitable private key. Output is unprotected.
You should encrypt your keys.
@@ -47,6 +48,7 @@ def GenRSAKeys(private_key_file):
subprocess.check_call([OPENSSL_BINARY, 'genrsa', '-out', private_key_file,
str(BITS_REQUIRED)])
def GenEd25519Keys(private_key_file):
"""Generates a base64 encoded private key for ed25519 DKIM signing.
Output is unprotected. You should encrypt your keys.
@@ -62,6 +64,7 @@ def GenEd25519Keys(private_key_file):
pkf.close()
return(priv_key)
def ExtractRSADnsPublicKey(private_key_file, dns_file):
""" Given a key, extract the bit we should place in DNS.
"""
@@ -79,6 +82,7 @@ def ExtractRSADnsPublicKey(private_key_file, dns_file):
print >> dns_fp, "k=rsa; h=sha256; p={0}".format(output)
dns_fp.close()
def ExtractEd25519PublicKey(private_key_file, dns_file, priv_key):
""" Given a ed25519 key, extract the bit we should place in DNS.
"""
@@ -90,6 +94,7 @@ def ExtractEd25519PublicKey(private_key_file, dns_file, priv_key):
print >> dns_fp, "k=ed25519; p={0}".format(output)
dns_fp.close()
def main():
parser = argparse.ArgumentParser(
description='Produce DKIM keys.',)
+3
View File
@@ -46,6 +46,7 @@ def get_txt_pydns(name):
return None
return b''.join(response.answers[0]['data'])
def get_txt_Milter_dns(name):
"""Return a TXT record associated with a DNS name."""
# Older pydns releases don't like a trailing dot.
@@ -56,6 +57,7 @@ def get_txt_Milter_dns(name):
if a: return b''.join(a[0])
return None
# Prefer dnspython if it's there, otherwise use pydns.
try:
import dns.resolver
@@ -69,6 +71,7 @@ except ImportError:
DNS.DiscoverNameServers()
_get_txt = get_txt_pydns
def get_txt(name):
"""Return a TXT record associated with a DNS name.
+1
View File
@@ -80,6 +80,7 @@ Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB"""
(cv, res, reason) = dkim.arc_verify(b''.join(sig_lines) + self.message, dnsfunc=self.dnsfunc)
self.assertEqual(cv, dkim.CV_Pass)
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)
+1
View File
@@ -115,6 +115,7 @@ class TestParseKeys(unittest.TestCase):
except UnparsableKeyError: return
self.fail("failed to reject invalid public key")
class TestEMSA_PKCS1_v1_5(unittest.TestCase):
def test_encode_sha256(self):
+1
View File
@@ -432,6 +432,7 @@ b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==
sigX[b'x'] = str(now - 24*3600).encode('ascii')
self.assertRaises(dkim.ValidationError, dkim.validate_signature_fields, sigX)
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)
+1
View File
@@ -292,6 +292,7 @@ yi50DjK5O9pqbFpNHklsv9lqaS0ArSYu02qp1S0DW1Y=\
sigX[b'x'] = str(now - 24*3600).encode('ascii')
self.assertRaises(dkim.ValidationError, dkim.validate_signature_fields, sigX)
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)
+2 -1
View File
@@ -21,6 +21,7 @@
import unittest
import dkim.dnsplug
class TestDNSPlug(unittest.TestCase):
def test_get_txt(self):
@@ -29,7 +30,7 @@ class TestDNSPlug(unittest.TestCase):
self.assertEqual(res, b"out")
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)