- Add new timeout parameter to enable DNS lookup timeouts to be adjusted
- Drop usage of pymilter Milter.dns in dnsplug since it doesn't support
havine a timeout passed to it
This commit is contained in:
@@ -1,3 +1,8 @@
|
||||
Version 1.0.0
|
||||
- Add new timeout parameter to enable DNS lookup timeouts to be adjusted
|
||||
- Drop usage of pymilter Milter.dns in dnsplug since it doesn't support
|
||||
havine a timeout passed to it
|
||||
|
||||
2019-10-07 Version 0.9.5
|
||||
- Ignore unknown service types in key records (LP: #1847020)
|
||||
- This is required by RFC 6376 and predecessors. It becomes important
|
||||
|
||||
@@ -11,7 +11,7 @@ signing and verification.
|
||||
|
||||
VERSION
|
||||
|
||||
This is dkimpy 0.9.5.
|
||||
This is dkimpy 1.0.0.
|
||||
|
||||
REQUIREMENTS
|
||||
|
||||
|
||||
+9
-8
@@ -419,8 +419,8 @@ def fold(header, namelen=0, linesep=b'\r\n'):
|
||||
return pre + header
|
||||
|
||||
|
||||
def load_pk_from_dns(name, dnsfunc=get_txt):
|
||||
s = dnsfunc(name)
|
||||
def load_pk_from_dns(name, dnsfunc=get_txt, timeout=5):
|
||||
s = dnsfunc(name, timeout=timeout)
|
||||
if not s:
|
||||
raise KeyFormatError("missing public key: %s"%name)
|
||||
try:
|
||||
@@ -478,7 +478,7 @@ class DomainSigner(object):
|
||||
#: @param debug_content: log headers and body after canonicalization (default False)
|
||||
#: @param linesep: use this line seperator for folding the headers
|
||||
def __init__(self,message=None,logger=None,signature_algorithm=b'rsa-sha256',
|
||||
minkey=1024, linesep=b'\r\n', debug_content=False):
|
||||
minkey=1024, linesep=b'\r\n', debug_content=False, timeout=5):
|
||||
self.set_message(message)
|
||||
if logger is None:
|
||||
logger = get_default_logger()
|
||||
@@ -502,6 +502,7 @@ class DomainSigner(object):
|
||||
self.minkey = minkey
|
||||
# use this line seperator for output
|
||||
self.linesep = linesep
|
||||
self.timeout = timeout
|
||||
|
||||
|
||||
#: Header fields to protect from additions by default.
|
||||
@@ -671,7 +672,7 @@ class DomainSigner(object):
|
||||
def verify_sig(self, sig, include_headers, sig_header, dnsfunc):
|
||||
name = sig[b's'] + b"._domainkey." + sig[b'd'] + b"."
|
||||
try:
|
||||
pk, self.keysize, ktag = load_pk_from_dns(name, dnsfunc)
|
||||
pk, self.keysize, ktag = load_pk_from_dns(name, dnsfunc, timeout=self.timeout)
|
||||
except KeyFormatError as e:
|
||||
self.logger.error("%s" % e)
|
||||
return False
|
||||
@@ -1270,14 +1271,14 @@ def sign(message, selector, domain, privkey, identity=None,
|
||||
return d.sign(selector, domain, privkey, identity=identity, canonicalize=canonicalize, include_headers=include_headers, length=length)
|
||||
|
||||
|
||||
def verify(message, logger=None, dnsfunc=get_txt, minkey=1024):
|
||||
def verify(message, logger=None, dnsfunc=get_txt, minkey=1024, timeout=5):
|
||||
"""Verify the first (topmost) DKIM signature on an RFC822 formatted message.
|
||||
@param message: an RFC822 formatted message (with either \\n or \\r\\n line endings)
|
||||
@param logger: a logger to which debug info will be written (default None)
|
||||
@return: True if signature verifies or False otherwise
|
||||
"""
|
||||
# type: (bytes, any, function, int) -> bool
|
||||
d = DKIM(message,logger=logger,minkey=minkey)
|
||||
d = DKIM(message,logger=logger,minkey=minkey,timeout=timeout)
|
||||
try:
|
||||
return d.verify(dnsfunc=dnsfunc)
|
||||
except DKIMException as x:
|
||||
@@ -1317,7 +1318,7 @@ def arc_sign(message, selector, domain, privkey,
|
||||
timestamp=timestamp, standardize=standardize)
|
||||
|
||||
|
||||
def arc_verify(message, logger=None, dnsfunc=get_txt, minkey=1024):
|
||||
def arc_verify(message, logger=None, dnsfunc=get_txt, minkey=1024, timeout=5):
|
||||
# type: (bytes, any, function, int) -> tuple
|
||||
"""Verify the ARC chain on an RFC822 formatted message.
|
||||
@param message: an RFC822 formatted message (with either \\n or \\r\\n line endings)
|
||||
@@ -1327,7 +1328,7 @@ def arc_verify(message, logger=None, dnsfunc=get_txt, minkey=1024):
|
||||
@return: three-tuple of (CV Result (CV_Pass, CV_Fail or CV_None), list of
|
||||
result dictionaries, result reason)
|
||||
"""
|
||||
a = ARC(message,logger=logger,minkey=minkey)
|
||||
a = ARC(message,logger=logger,minkey=minkey,timeout=5)
|
||||
try:
|
||||
return a.verify(dnsfunc=dnsfunc)
|
||||
except DKIMException as x:
|
||||
|
||||
+9
-10
@@ -25,10 +25,10 @@ __all__ = [
|
||||
]
|
||||
|
||||
|
||||
def get_txt_dnspython(name):
|
||||
def get_txt_dnspython(name, timeout=5):
|
||||
"""Return a TXT record associated with a DNS name."""
|
||||
try:
|
||||
a = dns.resolver.query(name, dns.rdatatype.TXT,raise_on_no_answer=False)
|
||||
a = dns.resolver.query(name, dns.rdatatype.TXT,raise_on_no_answer=False, lifetime=timeout)
|
||||
for r in a.response.answer:
|
||||
if r.rdtype == dns.rdatatype.TXT:
|
||||
return b"".join(r.items[0].strings)
|
||||
@@ -36,18 +36,18 @@ def get_txt_dnspython(name):
|
||||
return None
|
||||
|
||||
|
||||
def get_txt_pydns(name):
|
||||
def get_txt_pydns(name, timeout=5):
|
||||
"""Return a TXT record associated with a DNS name."""
|
||||
# Older pydns releases don't like a trailing dot.
|
||||
if name.endswith('.'):
|
||||
name = name[:-1]
|
||||
response = DNS.DnsRequest(name, qtype='txt').req()
|
||||
response = DNS.DnsRequest(name, qtype='txt', timeout=timeout).req()
|
||||
if not response.answers:
|
||||
return None
|
||||
return b''.join(response.answers[0]['data'])
|
||||
|
||||
|
||||
def get_txt_Milter_dns(name):
|
||||
def get_txt_Milter_dns(name, timeout=5):
|
||||
"""Return a TXT record associated with a DNS name."""
|
||||
# Older pydns releases don't like a trailing dot.
|
||||
if name.endswith('.'):
|
||||
@@ -64,15 +64,14 @@ try:
|
||||
_get_txt = get_txt_dnspython
|
||||
except ImportError:
|
||||
try:
|
||||
from Milter.dns import Session
|
||||
_get_txt = get_txt_Milter_dns
|
||||
except ImportError:
|
||||
import DNS
|
||||
DNS.DiscoverNameServers()
|
||||
_get_txt = get_txt_pydns
|
||||
except:
|
||||
raise
|
||||
|
||||
|
||||
def get_txt(name):
|
||||
def get_txt(name, timeout=5):
|
||||
"""Return a TXT record associated with a DNS name.
|
||||
|
||||
@param name: The bytestring domain name to look up.
|
||||
@@ -82,7 +81,7 @@ def get_txt(name):
|
||||
unicode_name = name.decode('UTF-8')
|
||||
except UnicodeDecodeError:
|
||||
return None
|
||||
txt = _get_txt(unicode_name)
|
||||
txt = _get_txt(unicode_name, timeout)
|
||||
if type(txt) is str:
|
||||
txt = txt.encode('utf-8')
|
||||
return txt
|
||||
|
||||
@@ -42,7 +42,7 @@ class TestSignAndVerify(unittest.TestCase):
|
||||
self.message = read_test_data("test.message")
|
||||
self.key = read_test_data("test.private")
|
||||
|
||||
def dnsfunc(self, domain):
|
||||
def dnsfunc(self, domain, timeout=5):
|
||||
sample_dns = """\
|
||||
k=rsa; \
|
||||
p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
|
||||
|
||||
@@ -62,7 +62,7 @@ class TestSignAndVerify(unittest.TestCase):
|
||||
self.key = read_test_data("test.private")
|
||||
self.rfckey = read_test_data("rfc8032_7_1.key")
|
||||
|
||||
def dnsfunc(self, domain):
|
||||
def dnsfunc(self, domain, timeout=5):
|
||||
sample_dns = """\
|
||||
k=rsa; s=email;\
|
||||
p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
|
||||
@@ -86,7 +86,7 @@ Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB"""
|
||||
self.assertTrue(domain in _dns_responses,domain)
|
||||
return _dns_responses[domain]
|
||||
|
||||
def dnsfunc2(self, domain):
|
||||
def dnsfunc2(self, domain, timeout=5):
|
||||
sample_dns = """\
|
||||
k=rsa; \
|
||||
p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
|
||||
@@ -110,7 +110,7 @@ Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB"""
|
||||
self.assertTrue(domain in _dns_responses,domain)
|
||||
return _dns_responses[domain]
|
||||
|
||||
def dnsfunc3(self, domain):
|
||||
def dnsfunc3(self, domain, timeout=5):
|
||||
sample_dns = """\
|
||||
k=rsa; \
|
||||
p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
|
||||
@@ -134,7 +134,7 @@ Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB"""
|
||||
self.assertTrue(domain in _dns_responses,domain)
|
||||
return _dns_responses[domain]
|
||||
|
||||
def dnsfunc4(self, domain):
|
||||
def dnsfunc4(self, domain, timeout=5):
|
||||
sample_dns = """\
|
||||
k=rsa; \
|
||||
p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
|
||||
@@ -158,7 +158,7 @@ Y+vtSBczUiKERHv1yRbcaQtZFh5wtiRrN04BLUTD21MycBX5jYchHjPY/wIDAQAB"""
|
||||
self.assertTrue(domain in _dns_responses,domain)
|
||||
return _dns_responses[domain]
|
||||
|
||||
def dnsfunc5(self, domain):
|
||||
def dnsfunc5(self, domain, timeout=5):
|
||||
sample_dns = """\
|
||||
k=rsa; \
|
||||
p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
|
||||
|
||||
@@ -59,7 +59,7 @@ class TestSignAndVerify(unittest.TestCase):
|
||||
self.key = read_test_data("ed25519test.key")
|
||||
self.rfckey = read_test_data("rfc8032_7_1.key")
|
||||
|
||||
def dnsfunc(self, domain):
|
||||
def dnsfunc(self, domain, timeout=5):
|
||||
sample_dns = """\
|
||||
k=ed25519; \
|
||||
p=yi50DjK5O9pqbFpNHklsv9lqaS0ArSYu02qp1S0DW1Y="""
|
||||
|
||||
Reference in New Issue
Block a user