diff --git a/dkim/__init__.py b/dkim/__init__.py index a8d3bbe..b0b5721 100644 --- a/dkim/__init__.py +++ b/dkim/__init__.py @@ -365,7 +365,7 @@ def text(s): return s.encode('ascii') -def fold(header, namelen=0): +def fold(header, namelen=0, linesep=b'\r\n'): """Fold a header line into multiple crlf-separated lines at column 72. >>> text(fold(b'foo')) @@ -399,10 +399,10 @@ def fold(header, namelen=0): i = header[:maxleng].rfind(b" ") if i == -1: j = maxleng - pre += header[:j] + b"\r\n " + pre += header[:j] + linesep + b" " else: j = i + 1 - pre += header[:i] + b"\r\n " + pre += header[:i] + linesep + b" " header = header[j:] maxleng = 72 if len(header) > 2: @@ -460,8 +460,9 @@ class DomainSigner(object): #: (with either \\n or \\r\\n line endings) #: @param logger: a logger to which debug info will be written (default None) #: @param signature_algorithm: the signing algorithm to use when signing + #: @param linesep: use this line seperator for folding the headers def __init__(self,message=None,logger=None,signature_algorithm=b'rsa-sha256', - minkey=1024): + minkey=1024, linesep=b'\r\n'): self.set_message(message) if logger is None: logger = get_default_logger() @@ -482,6 +483,9 @@ class DomainSigner(object): #: Minimum public key size. Shorter keys raise KeyFormatError. The #: default is 1024 self.minkey = minkey + # use this line seperator for output + self.linesep = linesep + #: Header fields to protect from additions by default. #: @@ -608,7 +612,7 @@ class DomainSigner(object): header_value = b"; ".join(b"=".join(x) for x in fields) if not standardize: - header_value = fold(header_value, namelen=len(header_name)) + header_value = fold(header_value, namelen=len(header_name), linesep=b'\r\n') header_value = RE_BTAG.sub(b'\\1',header_value) header = (header_name, b' ' + header_value) h = HashThrough(self.hasher()) @@ -634,10 +638,10 @@ class DomainSigner(object): # relaxed/simple (for broken receivers), and fold now. idx = [i for i in range(len(fields)) if fields[i][0] == b'b'][0] fields[idx] = (b'b', base64.b64encode(bytes(sig2))) - header_value = b"; ".join(b"=".join(x) for x in fields) + b"\r\n" + header_value = b"; ".join(b"=".join(x) for x in fields) + self.linesep if not standardize: - header_value = fold(header_value, namelen=len(header_name)) + header_value = fold(header_value, namelen=len(header_name), linesep=self.linesep) return header_value @@ -947,7 +951,7 @@ class ARC(DomainSigner): results_lists = [raw.replace(srv_id + b';', b'').strip() for (raw, parsed) in auth_headers] results_lists = [tags.split(b';') for tags in results_lists] results = [tag.strip() for sublist in results_lists for tag in sublist] - auth_results = srv_id + b'; ' + b';\r\n '.join(results) + auth_results = srv_id + b'; ' + (b';' + self.linesep + b' ').join(results) # extract cv parsed_auth_results = AuthenticationResultsHeader.parse('Authentication-Results: ' + auth_results.decode('utf-8')) @@ -1216,7 +1220,8 @@ class ARC(DomainSigner): def sign(message, selector, domain, privkey, identity=None, canonicalize=(b'relaxed', b'simple'), signature_algorithm=b'rsa-sha256', - include_headers=None, length=False, logger=None): + include_headers=None, length=False, logger=None, + linesep=b'\r\n'): # type: (bytes, bytes, bytes, bytes, bytes, tuple, bytes, list, bool, any) -> bytes """Sign an RFC822 message and return the DKIM-Signature header line. @param message: an RFC822 formatted message (with either \\n or \\r\\n line endings) @@ -1229,11 +1234,12 @@ def sign(message, selector, domain, privkey, identity=None, @param include_headers: a list of strings indicating which headers are to be signed (default all headers not listed as SHOULD NOT sign) @param length: true if the l= tag should be included to indicate body length (default False) @param logger: a logger to which debug info will be written (default None) + @param linesep: use this line seperator for folding the headers @return: DKIM-Signature header field terminated by \\r\\n @raise DKIMException: when the message, include_headers, or key are badly formed. """ - d = DKIM(message,logger=logger,signature_algorithm=signature_algorithm) + d = DKIM(message,logger=logger,signature_algorithm=signature_algorithm,linesep=linesep) return d.sign(selector, domain, privkey, identity=identity, canonicalize=canonicalize, include_headers=include_headers, length=length) @@ -1260,7 +1266,7 @@ dkim_verify = verify def arc_sign(message, selector, domain, privkey, srv_id, signature_algorithm=b'rsa-sha256', include_headers=None, timestamp=None, - logger=None, standardize=False): + logger=None, standardize=False, linesep=b'\r\n'): # type: (bytes, bytes, bytes, bytes, bytes, bytes, list, any, any, bool) -> list """Sign an RFC822 message and return the ARC set header lines for the next instance @param message: an RFC822 formatted message (with either \\n or \\r\\n line endings) @@ -1272,11 +1278,12 @@ def arc_sign(message, selector, domain, privkey, @param include_headers: a list of strings indicating which headers are to be signed (default all headers not listed as SHOULD NOT sign) @param timestamp: the time in integer seconds when the message is sealed (default is int(time.time) based on platform, can be string or int) @param logger: a logger to which debug info will be written (default None) + @param linesep: use this line seperator for folding the headers @return: A list containing the ARC set of header fields for the next instance @raise DKIMException: when the message, include_headers, or key are badly formed. """ - a = ARC(message,logger=logger,signature_algorithm=b'rsa-sha256') + a = ARC(message,logger=logger,signature_algorithm=b'rsa-sha256',linesep=linesep) if not include_headers: include_headers = a.default_sign_headers() return a.sign(selector, domain, privkey, srv_id, include_headers=include_headers, diff --git a/dkim/arcsign.py b/dkim/arcsign.py index 2d668b9..ff5a263 100644 --- a/dkim/arcsign.py +++ b/dkim/arcsign.py @@ -59,7 +59,7 @@ def main(): #try: sig = dkim.arc_sign(message, selector, domain, open(privatekeyfile, "rb").read(), - domain + ": none", cv) + domain + ": none", cv, dkim.util.get_linesep(message)) for line in sig: sys.stdout.write(line) sys.stdout.write(message) diff --git a/dkim/dkimsign.py b/dkim/dkimsign.py index 73f9de4..227b8eb 100644 --- a/dkim/dkimsign.py +++ b/dkim/dkimsign.py @@ -70,8 +70,8 @@ def main(): message = sys.stdin.read() try: - d = dkim.DKIM(message,logger=logger, - signature_algorithm=args.signalg) + d = dkim.DKIM(message,logger=logger, signature_algorithm=args.signalg, + linesep=dkim.util.get_linesep(message)) sig = d.sign(args.selector, args.domain, open( args.privatekeyfile, "rb").read(), identity = args.identity, canonicalize=canonicalize, include_headers=include_headers, diff --git a/dkim/tests/test_dkim.py b/dkim/tests/test_dkim.py index fbe241b..a94937c 100644 --- a/dkim/tests/test_dkim.py +++ b/dkim/tests/test_dkim.py @@ -46,6 +46,11 @@ class TestFold(unittest.TestCase): self.assertEqual( b"foo" * 24 + b"\r\n foo", dkim.fold(b"foo" * 25)) + def test_linesep(self): + self.assertEqual( + b"foo" * 24 + b"\n foo", dkim.fold(b"foo" * 25, linesep=b"\n")) + + class TestSignAndVerify(unittest.TestCase): """End-to-end signature and verification tests.""" @@ -203,6 +208,17 @@ p=11qYAYKxCrfVS/7TyWQHOg7hcvPapiMlrwIaaPcHURo=""" res = d.verify(dnsfunc=self.dnsfunc5) self.assertTrue(res) + def test_verifies_lflinesep(self): + # A message verifies after being signed. + for header_algo in (b"simple", b"relaxed"): + for body_algo in (b"simple", b"relaxed"): + sig = dkim.sign( + self.message, b"test", b"example.com", self.key, + canonicalize=(header_algo, body_algo), linesep=b"\n") + res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) + self.assertFalse(b'\r\n' in sig) + self.assertTrue(res) + def test_implicit_k(self): # A message verifies after being signed when k= tag is not provided. for header_algo in (b"simple", b"relaxed"): diff --git a/dkim/tests/test_util.py b/dkim/tests/test_util.py index e01b3b5..bf39566 100644 --- a/dkim/tests/test_util.py +++ b/dkim/tests/test_util.py @@ -22,6 +22,7 @@ from dkim.util import ( DuplicateTag, InvalidTagSpec, parse_tag_value, + get_linesep, ) @@ -75,6 +76,30 @@ class TestParseTagValue(unittest.TestCase): self.assertEqual(len(sig),11) +class TestGetLineSep(unittest.TestCase): + """Line seperator probing tests.""" + + def test_default(self): + self.assertEqual( + b'\r\n', + get_linesep(b'abc')) + + def test_withcrlf(self): + self.assertEqual( + b'\r\n', + get_linesep(b'abc\r\n')) + + def test_withlf(self): + self.assertEqual( + b'\n', + get_linesep(b'abc\n')) + + def test_toosmall(self): + self.assertEqual( + b'\r\n', + get_linesep(b'a')) + + def test_suite(): from unittest import TestLoader return TestLoader().loadTestsFromName(__name__) diff --git a/dkim/util.py b/dkim/util.py index 3d1f722..97b59c9 100644 --- a/dkim/util.py +++ b/dkim/util.py @@ -33,6 +33,7 @@ __all__ = [ 'InvalidTagSpec', 'InvalidTagValueList', 'parse_tag_value', + 'get_linesep', ] @@ -80,3 +81,8 @@ def get_default_logger(): if not logger.handlers: logger.addHandler(NullHandler()) return logger + +def get_linesep(msg): + if msg[-2:] != b'\r\n' and msg[-1:] == b'\n': + return b'\n' + return b'\r\n'