Resolve merge conflicts

This commit is contained in:
Scott Kitterman
2019-04-13 21:21:49 -04:00
6 changed files with 69 additions and 15 deletions
+19 -12
View File
@@ -367,7 +367,7 @@ def text(s):
return s.encode('ascii') return s.encode('ascii')
def fold(header, namelen=0): def fold(header, namelen=0, linesep=b'\r\n'):
"""Fold a header line into multiple crlf-separated lines of text at column """Fold a header line into multiple crlf-separated lines of text at column
72. The crlf does not count for line length. 72. The crlf does not count for line length.
@@ -404,10 +404,10 @@ def fold(header, namelen=0):
i = header[:maxleng].rfind(b" ") i = header[:maxleng].rfind(b" ")
if i == -1: if i == -1:
j = maxleng j = maxleng
pre += header[:j] + b"\r\n " pre += header[:j] + linesep + b" "
else: else:
j = i + 1 j = i + 1
pre += header[:i] + b"\r\n " pre += header[:i] + linesep + b" "
header = header[j:] header = header[j:]
maxleng = 71 maxleng = 71
if len(header) > 2: if len(header) > 2:
@@ -466,8 +466,9 @@ class DomainSigner(object):
#: @param logger: a logger to which debug info will be written (default None) #: @param logger: a logger to which debug info will be written (default None)
#: @param signature_algorithm: the signing algorithm to use when signing #: @param signature_algorithm: the signing algorithm to use when signing
#: @param debug_content: log headers and body after canonicalization (default False) #: @param debug_content: log headers and body after canonicalization (default False)
#: @param linesep: use this line seperator for folding the headers
def __init__(self,message=None,logger=None,signature_algorithm=b'rsa-sha256', def __init__(self,message=None,logger=None,signature_algorithm=b'rsa-sha256',
minkey=1024, debug_content=False): minkey=1024, linesep=b'\r\n', debug_content=False):
self.set_message(message) self.set_message(message)
if logger is None: if logger is None:
logger = get_default_logger() logger = get_default_logger()
@@ -489,6 +490,9 @@ class DomainSigner(object):
#: Minimum public key size. Shorter keys raise KeyFormatError. The #: Minimum public key size. Shorter keys raise KeyFormatError. The
#: default is 1024 #: default is 1024
self.minkey = minkey self.minkey = minkey
# use this line seperator for output
self.linesep = linesep
#: Header fields to protect from additions by default. #: Header fields to protect from additions by default.
#: #:
@@ -615,7 +619,7 @@ class DomainSigner(object):
header_value = b"; ".join(b"=".join(x) for x in fields) header_value = b"; ".join(b"=".join(x) for x in fields)
if not standardize: if not standardize:
header_value = fold(header_value, namelen=len(header_name)) header_value = fold(header_value, namelen=len(header_name), linesep=b'\r\n')
header_value = RE_BTAG.sub(b'\\1',header_value) header_value = RE_BTAG.sub(b'\\1',header_value)
header = (header_name, b' ' + header_value) header = (header_name, b' ' + header_value)
h = HashThrough(self.hasher(), self.debug_content) h = HashThrough(self.hasher(), self.debug_content)
@@ -642,10 +646,10 @@ class DomainSigner(object):
# relaxed/simple (for broken receivers), and fold now. # relaxed/simple (for broken receivers), and fold now.
idx = [i for i in range(len(fields)) if fields[i][0] == b'b'][0] idx = [i for i in range(len(fields)) if fields[i][0] == b'b'][0]
fields[idx] = (b'b', base64.b64encode(bytes(sig2))) fields[idx] = (b'b', base64.b64encode(bytes(sig2)))
header_value = b"; ".join(b"=".join(x) for x in fields) + b"\r\n" header_value = b"; ".join(b"=".join(x) for x in fields) + self.linesep
if not standardize: if not standardize:
header_value = fold(header_value, namelen=len(header_name)) header_value = fold(header_value, namelen=len(header_name), linesep=self.linesep)
return header_value return header_value
@@ -957,7 +961,7 @@ class ARC(DomainSigner):
results_lists = [raw.replace(srv_id + b';', b'').strip() for (raw, parsed) in auth_headers] results_lists = [raw.replace(srv_id + b';', b'').strip() for (raw, parsed) in auth_headers]
results_lists = [tags.split(b';') for tags in results_lists] results_lists = [tags.split(b';') for tags in results_lists]
results = [tag.strip() for sublist in results_lists for tag in sublist] results = [tag.strip() for sublist in results_lists for tag in sublist]
auth_results = srv_id + b'; ' + b';\r\n '.join(results) auth_results = srv_id + b'; ' + (b';' + self.linesep + b' ').join(results)
# extract cv # extract cv
parsed_auth_results = AuthenticationResultsHeader.parse('Authentication-Results: ' + auth_results.decode('utf-8')) parsed_auth_results = AuthenticationResultsHeader.parse('Authentication-Results: ' + auth_results.decode('utf-8'))
@@ -1227,7 +1231,8 @@ class ARC(DomainSigner):
def sign(message, selector, domain, privkey, identity=None, def sign(message, selector, domain, privkey, identity=None,
canonicalize=(b'relaxed', b'simple'), canonicalize=(b'relaxed', b'simple'),
signature_algorithm=b'rsa-sha256', signature_algorithm=b'rsa-sha256',
include_headers=None, length=False, logger=None): include_headers=None, length=False, logger=None,
linesep=b'\r\n'):
# type: (bytes, bytes, bytes, bytes, bytes, tuple, bytes, list, bool, any) -> bytes # type: (bytes, bytes, bytes, bytes, bytes, tuple, bytes, list, bool, any) -> bytes
"""Sign an RFC822 message and return the DKIM-Signature header line. """Sign an RFC822 message and return the DKIM-Signature header line.
@param message: an RFC822 formatted message (with either \\n or \\r\\n line endings) @param message: an RFC822 formatted message (with either \\n or \\r\\n line endings)
@@ -1240,11 +1245,12 @@ def sign(message, selector, domain, privkey, identity=None,
@param include_headers: a list of strings indicating which headers are to be signed (default all headers not listed as SHOULD NOT sign) @param include_headers: a list of strings indicating which headers are to be signed (default all headers not listed as SHOULD NOT sign)
@param length: true if the l= tag should be included to indicate body length (default False) @param length: true if the l= tag should be included to indicate body length (default False)
@param logger: a logger to which debug info will be written (default None) @param logger: a logger to which debug info will be written (default None)
@param linesep: use this line seperator for folding the headers
@return: DKIM-Signature header field terminated by \\r\\n @return: DKIM-Signature header field terminated by \\r\\n
@raise DKIMException: when the message, include_headers, or key are badly formed. @raise DKIMException: when the message, include_headers, or key are badly formed.
""" """
d = DKIM(message,logger=logger,signature_algorithm=signature_algorithm) d = DKIM(message,logger=logger,signature_algorithm=signature_algorithm,linesep=linesep)
return d.sign(selector, domain, privkey, identity=identity, canonicalize=canonicalize, include_headers=include_headers, length=length) return d.sign(selector, domain, privkey, identity=identity, canonicalize=canonicalize, include_headers=include_headers, length=length)
@@ -1271,7 +1277,7 @@ dkim_verify = verify
def arc_sign(message, selector, domain, privkey, def arc_sign(message, selector, domain, privkey,
srv_id, signature_algorithm=b'rsa-sha256', srv_id, signature_algorithm=b'rsa-sha256',
include_headers=None, timestamp=None, include_headers=None, timestamp=None,
logger=None, standardize=False): logger=None, standardize=False, linesep=b'\r\n'):
# type: (bytes, bytes, bytes, bytes, bytes, bytes, list, any, any, bool) -> list # type: (bytes, bytes, bytes, bytes, bytes, bytes, list, any, any, bool) -> list
"""Sign an RFC822 message and return the ARC set header lines for the next instance """Sign an RFC822 message and return the ARC set header lines for the next instance
@param message: an RFC822 formatted message (with either \\n or \\r\\n line endings) @param message: an RFC822 formatted message (with either \\n or \\r\\n line endings)
@@ -1283,11 +1289,12 @@ def arc_sign(message, selector, domain, privkey,
@param include_headers: a list of strings indicating which headers are to be signed (default all headers not listed as SHOULD NOT sign) @param include_headers: a list of strings indicating which headers are to be signed (default all headers not listed as SHOULD NOT sign)
@param timestamp: the time in integer seconds when the message is sealed (default is int(time.time) based on platform, can be string or int) @param timestamp: the time in integer seconds when the message is sealed (default is int(time.time) based on platform, can be string or int)
@param logger: a logger to which debug info will be written (default None) @param logger: a logger to which debug info will be written (default None)
@param linesep: use this line seperator for folding the headers
@return: A list containing the ARC set of header fields for the next instance @return: A list containing the ARC set of header fields for the next instance
@raise DKIMException: when the message, include_headers, or key are badly formed. @raise DKIMException: when the message, include_headers, or key are badly formed.
""" """
a = ARC(message,logger=logger,signature_algorithm=b'rsa-sha256') a = ARC(message,logger=logger,signature_algorithm=b'rsa-sha256',linesep=linesep)
if not include_headers: if not include_headers:
include_headers = a.default_sign_headers() include_headers = a.default_sign_headers()
return a.sign(selector, domain, privkey, srv_id, include_headers=include_headers, return a.sign(selector, domain, privkey, srv_id, include_headers=include_headers,
+1 -1
View File
@@ -59,7 +59,7 @@ def main():
#try: #try:
sig = dkim.arc_sign(message, selector, domain, open(privatekeyfile, "rb").read(), sig = dkim.arc_sign(message, selector, domain, open(privatekeyfile, "rb").read(),
domain + ": none", cv) domain + ": none", cv, dkim.util.get_linesep(message))
for line in sig: for line in sig:
sys.stdout.write(line) sys.stdout.write(line)
sys.stdout.write(message) sys.stdout.write(message)
+2 -2
View File
@@ -70,8 +70,8 @@ def main():
message = sys.stdin.read() message = sys.stdin.read()
try: try:
d = dkim.DKIM(message,logger=logger, d = dkim.DKIM(message,logger=logger, signature_algorithm=args.signalg,
signature_algorithm=args.signalg) linesep=dkim.util.get_linesep(message))
sig = d.sign(args.selector, args.domain, open( sig = d.sign(args.selector, args.domain, open(
args.privatekeyfile, "rb").read(), identity = args.identity, args.privatekeyfile, "rb").read(), identity = args.identity,
canonicalize=canonicalize, include_headers=include_headers, canonicalize=canonicalize, include_headers=include_headers,
+16
View File
@@ -46,6 +46,11 @@ class TestFold(unittest.TestCase):
self.assertEqual( self.assertEqual(
b"foo" * 24 + b"\r\n foo", dkim.fold(b"foo" * 25)) b"foo" * 24 + b"\r\n foo", dkim.fold(b"foo" * 25))
def test_linesep(self):
self.assertEqual(
b"foo" * 24 + b"\n foo", dkim.fold(b"foo" * 25, linesep=b"\n"))
class TestSignAndVerify(unittest.TestCase): class TestSignAndVerify(unittest.TestCase):
"""End-to-end signature and verification tests.""" """End-to-end signature and verification tests."""
@@ -203,6 +208,17 @@ p=11qYAYKxCrfVS/7TyWQHOg7hcvPapiMlrwIaaPcHURo="""
res = d.verify(dnsfunc=self.dnsfunc5) res = d.verify(dnsfunc=self.dnsfunc5)
self.assertTrue(res) self.assertTrue(res)
def test_verifies_lflinesep(self):
# A message verifies after being signed.
for header_algo in (b"simple", b"relaxed"):
for body_algo in (b"simple", b"relaxed"):
sig = dkim.sign(
self.message, b"test", b"example.com", self.key,
canonicalize=(header_algo, body_algo), linesep=b"\n")
res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc)
self.assertFalse(b'\r\n' in sig)
self.assertTrue(res)
def test_implicit_k(self): def test_implicit_k(self):
# A message verifies after being signed when k= tag is not provided. # A message verifies after being signed when k= tag is not provided.
for header_algo in (b"simple", b"relaxed"): for header_algo in (b"simple", b"relaxed"):
+25
View File
@@ -22,6 +22,7 @@ from dkim.util import (
DuplicateTag, DuplicateTag,
InvalidTagSpec, InvalidTagSpec,
parse_tag_value, parse_tag_value,
get_linesep,
) )
@@ -75,6 +76,30 @@ class TestParseTagValue(unittest.TestCase):
self.assertEqual(len(sig),11) self.assertEqual(len(sig),11)
class TestGetLineSep(unittest.TestCase):
"""Line seperator probing tests."""
def test_default(self):
self.assertEqual(
b'\r\n',
get_linesep(b'abc'))
def test_withcrlf(self):
self.assertEqual(
b'\r\n',
get_linesep(b'abc\r\n'))
def test_withlf(self):
self.assertEqual(
b'\n',
get_linesep(b'abc\n'))
def test_toosmall(self):
self.assertEqual(
b'\r\n',
get_linesep(b'a'))
def test_suite(): def test_suite():
from unittest import TestLoader from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__) return TestLoader().loadTestsFromName(__name__)
+6
View File
@@ -33,6 +33,7 @@ __all__ = [
'InvalidTagSpec', 'InvalidTagSpec',
'InvalidTagValueList', 'InvalidTagValueList',
'parse_tag_value', 'parse_tag_value',
'get_linesep',
] ]
@@ -80,3 +81,8 @@ def get_default_logger():
if not logger.handlers: if not logger.handlers:
logger.addHandler(NullHandler()) logger.addHandler(NullHandler())
return logger return logger
def get_linesep(msg):
if msg[-2:] != b'\r\n' and msg[-1:] == b'\n':
return b'\n'
return b'\r\n'