diff --git a/ChangeLog b/ChangeLog index 42397ce..5774634 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,5 @@ Version 1.0.0 + - Add support for RFC 8460 tlsrpt DKIM signature processing (LP: #1847020) - Add new timeout parameter to enable DNS lookup timeouts to be adjusted - Drop usage of pymilter Milter.dns in dnsplug since it doesn't support havine a timeout passed to it diff --git a/dkim/__init__.py b/dkim/__init__.py index 9bff51c..7203751 100644 --- a/dkim/__init__.py +++ b/dkim/__init__.py @@ -453,17 +453,20 @@ def load_pk_from_dns(name, dnsfunc=get_txt, timeout=5): ktag = b'rsa' if pub[b'k'] != b'rsa' and pub[b'k'] != b'ed25519': raise KeyFormatError('unknown algorithm in k= tag: {0}'.format(pub[b'k'])) + seqtlsrpt = False try: # Ignore unknown service types, RFC 6376 3.6.1 - if pub[b's'] != b'*' and pub[b's'] != b'email': + if pub[b's'] != b'*' and pub[b's'] != b'email' and pub[b's'] != b'tlsrpt': pk = None keysize = None ktag = None raise KeyFormatError('unknown service type in s= tag: {0}'.format(pub[b's'])) + elif pub[b's'] == b'tlsrpt': + seqtlsrpt = True except: # Default is '*' - all service types, so no error if missing from key record pass - return pk, keysize, ktag + return pk, keysize, ktag, seqtlsrpt #: Abstract base class for holding messages and options during DKIM/ARC signing and verification. @@ -478,8 +481,11 @@ class DomainSigner(object): #: @param debug_content: log headers and body after canonicalization (default False) #: @param linesep: use this line seperator for folding the headers #: @param timeout: number of seconds for DNS lookup timeout (default = 5) + #: @param tlsrpt: message is an RFC 8460 TLS report (default False) + #: False: Not a tlsrpt, True: Is a tlsrpt, 'strict': tlsrpt, invalid if + #: service type is missing. For signing, if True, length is never used. def __init__(self,message=None,logger=None,signature_algorithm=b'rsa-sha256', - minkey=1024, linesep=b'\r\n', debug_content=False, timeout=5): + minkey=1024, linesep=b'\r\n', debug_content=False, timeout=5, tlsrpt=False): self.set_message(message) if logger is None: logger = get_default_logger() @@ -504,6 +510,9 @@ class DomainSigner(object): # use this line seperator for output self.linesep = linesep self.timeout = timeout + self.tlsrpt = tlsrpt + # Service type in DKIM record is s=tlsrpt + self.seqtlsrpt = False #: Header fields to protect from additions by default. @@ -673,11 +682,19 @@ class DomainSigner(object): def verify_sig(self, sig, include_headers, sig_header, dnsfunc): name = sig[b's'] + b"._domainkey." + sig[b'd'] + b"." try: - pk, self.keysize, ktag = load_pk_from_dns(name, dnsfunc, timeout=self.timeout) + pk, self.keysize, ktag, self.seqtlsrpt = load_pk_from_dns(name, dnsfunc, timeout=self.timeout) except KeyFormatError as e: self.logger.error("%s" % e) return False + # RFC 8460 MAY ignore signatures without tlsrpt Service Type + if self.tlsrpt == 'strict' and not self.seqtlsrpt: + raise ValidationError("Message is tlsrpt and Service Type is not tlsrpt") + + # Inferred requirement from both RFC 8460 and RFC 6376 + if not self.tlsrpt and self.seqtlsrpt: + raise ValidationError("Message is not tlsrpt and Service Type is tlsrpt") + try: canon_policy = CanonicalizationPolicy.from_c_value(sig.get(b'c', b'simple/simple')) except InvalidCanonicalizationPolicyError as e: @@ -690,7 +707,7 @@ class DomainSigner(object): h = HashThrough(hasher(), self.debug_content) body = canon_policy.canonicalize_body(self.body) - if b'l' in sig: + if b'l' in sig and not self.tlsrpt: body = body[:int(sig[b'l'])] h.update(body) if self.debug_content: @@ -808,6 +825,10 @@ class DKIM(DomainSigner): # record what verify should extract self.include_headers = include_headers + if self.tlsrpt: + # RFC 8460 MUST NOT + length = False + # rfc4871 says FROM is required if b'from' not in include_headers: raise ParameterError("The From header field MUST be signed") @@ -1250,7 +1271,7 @@ def sign(message, selector, domain, privkey, identity=None, canonicalize=(b'relaxed', b'simple'), signature_algorithm=b'rsa-sha256', include_headers=None, length=False, logger=None, - linesep=b'\r\n'): + linesep=b'\r\n', tlsrpt=False): # type: (bytes, bytes, bytes, bytes, bytes, tuple, bytes, list, bool, any) -> bytes """Sign an RFC822 message and return the DKIM-Signature header line. @param message: an RFC822 formatted message (with either \\n or \\r\\n line endings) @@ -1264,22 +1285,29 @@ def sign(message, selector, domain, privkey, identity=None, @param length: true if the l= tag should be included to indicate body length (default False) @param logger: a logger to which debug info will be written (default None) @param linesep: use this line seperator for folding the headers + @param tlsrpt: message is an RFC 8460 TLS report (default False) + False: Not a tlsrpt, True: Is a tlsrpt, 'strict': tlsrpt, invalid if + service type is missing. For signing, if True, length is never used. @return: DKIM-Signature header field terminated by \\r\\n @raise DKIMException: when the message, include_headers, or key are badly formed. """ - d = DKIM(message,logger=logger,signature_algorithm=signature_algorithm,linesep=linesep) + d = DKIM(message,logger=logger,signature_algorithm=signature_algorithm,linesep=linesep,tlsrpt=tlsrpt) return d.sign(selector, domain, privkey, identity=identity, canonicalize=canonicalize, include_headers=include_headers, length=length) -def verify(message, logger=None, dnsfunc=get_txt, minkey=1024, timeout=5): +def verify(message, logger=None, dnsfunc=get_txt, minkey=1024, timeout=5, tlsrpt=False): """Verify the first (topmost) DKIM signature on an RFC822 formatted message. @param message: an RFC822 formatted message (with either \\n or \\r\\n line endings) @param logger: a logger to which debug info will be written (default None) + @param timeout: number of seconds for DNS lookup timeout (default = 5) + @param tlsrpt: message is an RFC 8460 TLS report (default False) + False: Not a tlsrpt, True: Is a tlsrpt, 'strict': tlsrpt, invalid if + service type is missing. For signing, if True, length is never used. @return: True if signature verifies or False otherwise """ # type: (bytes, any, function, int) -> bool - d = DKIM(message,logger=logger,minkey=minkey,timeout=timeout) + d = DKIM(message,logger=logger,minkey=minkey,timeout=timeout,tlsrpt=tlsrpt) try: return d.verify(dnsfunc=dnsfunc) except DKIMException as x: @@ -1326,6 +1354,7 @@ def arc_verify(message, logger=None, dnsfunc=get_txt, minkey=1024, timeout=5): @param logger: a logger to which debug info will be written (default None) @param dnsfunc: an optional function to lookup TXT resource records @param minkey: the minimum key size to accept + @param timeout: number of seconds for DNS lookup timeout (default = 5) @return: three-tuple of (CV Result (CV_Pass, CV_Fail or CV_None), list of result dictionaries, result reason) """ diff --git a/dkim/tests/__init__.py b/dkim/tests/__init__.py index e923974..bedc568 100644 --- a/dkim/tests/__init__.py +++ b/dkim/tests/__init__.py @@ -30,6 +30,7 @@ def test_suite(): test_crypto, test_dkim, test_dkim_ed25519, + test_dkim_tlsrpt, test_util, test_arc, test_dnsplug, @@ -39,6 +40,7 @@ def test_suite(): test_crypto, test_dkim, test_dkim_ed25519, + test_dkim_tlsrpt, test_util, test_arc, test_dnsplug, diff --git a/dkim/tests/data/test_tlsrpt.txt b/dkim/tests/data/test_tlsrpt.txt new file mode 100644 index 0000000..a057568 --- /dev/null +++ b/dkim/tests/data/test_tlsrpt.txt @@ -0,0 +1 @@ +v=DKIM1; g=*; k=rsa; s=tlsrpt; p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDkHlOQoBTzWRiGs5V6NpP3idY6Wk08a5qhdR6wy5bdOKb2jLQiY/J16JYi0Qvx/byYzCNb3W91y3FutACDfzwQ/BC/e/8uBsCR+yz1Lxj+PL6lHvqMKrM3rG4hstT5QjvHO9PzoxZyVYLzBfO2EeC3Ip3G+2kryOTIKT+l/K4w3QIDAQAB diff --git a/dkim/tests/test_dkim_tlsrpt.py b/dkim/tests/test_dkim_tlsrpt.py new file mode 100644 index 0000000..83b31b6 --- /dev/null +++ b/dkim/tests/test_dkim_tlsrpt.py @@ -0,0 +1,282 @@ +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the author be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. +# +# Copyright (c) 2011 William Grant + +import email +import os.path +import unittest +import time + +import dkim + + +def read_test_data(filename): + """Get the content of the given test data file. + + The files live in dkim/tests/data. + """ + path = os.path.join(os.path.dirname(__file__), 'data', filename) + with open(path, 'rb') as f: + return f.read() + + +class TestFold(unittest.TestCase): + + def test_short_line(self): + self.assertEqual( + b"foo", dkim.fold(b"foo")) + + def test_long_line(self): + # The function is terribly broken, not passing even this simple + # test. + self.assertEqual( + b"foo" * 24 + b"\r\n foo", dkim.fold(b"foo" * 25)) + + def test_linesep(self): + self.assertEqual( + b"foo" * 24 + b"\n foo", dkim.fold(b"foo" * 25, linesep=b"\n")) + + + +class TestSignAndVerify(unittest.TestCase): + """End-to-end signature and verification tests.""" + + def setUp(self): + self.message = read_test_data("test.message") + self.message3 = read_test_data("rfc6376.msg") + self.message4 = read_test_data("rfc6376.signed.msg") + self.key = read_test_data("test.private") + self.rfckey = read_test_data("rfc8032_7_1.key") + + def dnsfunc(self, domain, timeout=5): + sample_dns = """\ +k=rsa; s=email;\ +p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ +b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" + + _dns_responses = { + 'example._domainkey.canonical.com.': sample_dns, + 'test._domainkey.example.com.': read_test_data("test.txt"), + '20120113._domainkey.gmail.com.': """k=rsa; \ +p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\ ++eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\ +s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\ +hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\ +MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\ +Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB""" + } + try: + domain = domain.decode('ascii') + except UnicodeDecodeError: + return None + self.assertTrue(domain in _dns_responses,domain) + return _dns_responses[domain] + + def dnsfunc2(self, domain, timeout=5): + sample_dns = """\ +k=rsa; \ +p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ +b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" + + _dns_responses = { + 'example._domainkey.canonical.com.': sample_dns, + 'test._domainkey.example.com.': read_test_data("test2.txt"), + '20120113._domainkey.gmail.com.': """\ +p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\ ++eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\ +s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\ +hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\ +MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\ +Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB""" + } + try: + domain = domain.decode('ascii') + except UnicodeDecodeError: + return None + self.assertTrue(domain in _dns_responses,domain) + return _dns_responses[domain] + + def dnsfunc3(self, domain, timeout=5): + sample_dns = """\ +k=rsa; \ +p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ +b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" + + _dns_responses = { + 'example._domainkey.canonical.com.': sample_dns, + 'test._domainkey.example.com.': read_test_data("badversion.txt"), + '20120113._domainkey.gmail.com.': """\ +p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\ ++eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\ +s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\ +hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\ +MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\ +Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB""" + } + try: + domain = domain.decode('ascii') + except UnicodeDecodeError: + return None + self.assertTrue(domain in _dns_responses,domain) + return _dns_responses[domain] + + def dnsfunc4(self, domain, timeout=5): + sample_dns = """\ +k=rsa; \ +p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ +b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" + + _dns_responses = { + 'example._domainkey.canonical.com.': sample_dns, + 'test._domainkey.example.com.': read_test_data("badk.txt"), + '20120113._domainkey.gmail.com.': """\ +p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\ ++eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\ +s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\ +hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\ +MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\ +Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB""" + } + try: + domain = domain.decode('ascii') + except UnicodeDecodeError: + return None + self.assertTrue(domain in _dns_responses,domain) + return _dns_responses[domain] + + def dnsfunc5(self, domain, timeout=5): + sample_dns = """\ +k=rsa; \ +p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ +b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" + + _dns_responses = { + 'example._domainkey.canonical.com.': sample_dns, + 'test._domainkey.football.example.com.': read_test_data("test.txt"), + 'brisbane._domainkey.football.example.com.': """v=DKIM1; k=ed25519; \ +p=11qYAYKxCrfVS/7TyWQHOg7hcvPapiMlrwIaaPcHURo=""" + } + try: + domain = domain.decode('ascii') + except UnicodeDecodeError: + return None + self.assertTrue(domain in _dns_responses,domain) + return _dns_responses[domain] + + def dnsfunc6(self, domain, timeout=5): + sample_dns = """\ +k=rsa; s=email;\ +p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\ +b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==""" + + _dns_responses = { + 'example._domainkey.canonical.com.': sample_dns, + 'test._domainkey.example.com.': read_test_data("test_tlsrpt.txt"), + '20120113._domainkey.gmail.com.': """k=rsa; \ +p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Kd87/UeJjenpabgbFwh\ ++eBCsSTrqmwIYYvywlbhbqoo2DymndFkbjOVIPIldNs/m40KF+yzMn1skyoxcTUGCQ\ +s8g3FgD2Ap3ZB5DekAo5wMmk4wimDO+U8QzI3SD07y2+07wlNWwIt8svnxgdxGkVbb\ +hzY8i+RQ9DpSVpPbF7ykQxtKXkv/ahW3KjViiAH+ghvvIhkx4xYSIc9oSwVmAl5Oct\ +MEeWUwg8Istjqz8BZeTWbf41fbNhte7Y+YqZOwq1Sd0DbvYAD9NOZK9vlfuac0598H\ +Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB""" + } + try: + domain = domain.decode('ascii') + except UnicodeDecodeError: + return None + self.assertTrue(domain in _dns_responses,domain) + return _dns_responses[domain] + + + + + def test_ignores_tlsrptsvc(self): + # A non-tlsrpt signed with a key record with s=tlsrpt shouldn't verify. + for header_algo in (b"simple", b"relaxed"): + for body_algo in (b"simple", b"relaxed"): + sig = dkim.sign( + self.message, b"test", b"example.com", self.key, + canonicalize=(header_algo, body_algo)) + res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc6) + self.assertFalse(res) + + def test_tlsrpt_with_strict_tlsrptsvc(self): + # A tlsrpt signed with a key record with s=tlsrpt should verify with tlsrpt='strict'. + for header_algo in (b"simple", b"relaxed"): + for body_algo in (b"simple", b"relaxed"): + sig = dkim.sign( + self.message, b"test", b"example.com", self.key, + canonicalize=(header_algo, body_algo)) + res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc6, tlsrpt='strict') + self.assertTrue(res) + + def test_tlsrpt_with_tlsrptsvc(self): + # A tlsrpt signed with a key record with s=tlsrpt should verify. + for header_algo in (b"simple", b"relaxed"): + for body_algo in (b"simple", b"relaxed"): + sig = dkim.sign( + self.message, b"test", b"example.com", self.key, + canonicalize=(header_algo, body_algo)) + res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc6, tlsrpt=True) + self.assertTrue(res) + + def test_tlsrpt_with_strict_no_tlsrptsvc(self): + # A tlsrpt signed with a key record without s=tlsrpt not tlsrpt='strict' should not verify. + for header_algo in (b"simple", b"relaxed"): + for body_algo in (b"simple", b"relaxed"): + sig = dkim.sign( + self.message, b"test", b"example.com", self.key, + canonicalize=(header_algo, body_algo)) + res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc, tlsrpt='strict') + self.assertFalse(res) + + def test_tlsrpt_with_no_tlsrptsvc(self): + # A tlsrpt signed with a key record without s=tlsrpt and tlsrpt=True should verify. + for header_algo in (b"simple", b"relaxed"): + for body_algo in (b"simple", b"relaxed"): + sig = dkim.sign( + self.message, b"test", b"example.com", self.key, + canonicalize=(header_algo, body_algo)) + res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc, tlsrpt=True) + self.assertTrue(res) + + def test_tlsrpt_ignore_l_verify(self): + # For a tlsrpt, ignore l= on verify + for header_algo in (b"simple", b"relaxed"): + for body_algo in (b"simple", b"relaxed"): + sig = dkim.sign( + self.message, b"test", b"example.com", self.key, + canonicalize=(header_algo, body_algo), length=True) + self.message += b'added more text' + res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc, tlsrpt=True) + self.assertFalse(res) + + def test_tlsrpt_ignore_l_sign(self): + # For a tlsrpt, don't add l= when signing tlsrpt + for header_algo in (b"simple", b"relaxed"): + for body_algo in (b"simple", b"relaxed"): + sig = dkim.sign( + self.message, b"test", b"example.com", self.key, + canonicalize=(header_algo, body_algo), length=True, tlsrpt=True) + self.message += b'added more text' + res = dkim.verify(sig + self.message, dnsfunc=self.dnsfunc) + self.assertFalse(res) + + +def test_suite(): + from unittest import TestLoader + return TestLoader().loadTestsFromName(__name__)