DKIM.verify: Refactor to minimize code duplication in dkim.asyncsupport.
This commit is contained in:
+16
-9
@@ -888,15 +888,11 @@ class DKIM(DomainSigner):
|
|||||||
self.signature_fields = dict(sigfields)
|
self.signature_fields = dict(sigfields)
|
||||||
return b'DKIM-Signature: ' + res
|
return b'DKIM-Signature: ' + res
|
||||||
|
|
||||||
#: Verify a DKIM signature.
|
|
||||||
#: @type idx: int
|
def verify_headerprep(self, idx=0):
|
||||||
#: @param idx: which signature to verify. The first (topmost) signature is 0.
|
"""Non-DNS verify parts to minimize asyncio code duplication."""
|
||||||
#: @type dnsfunc: callable
|
|
||||||
#: @param dnsfunc: an option function to lookup TXT resource records
|
|
||||||
#: for a DNS domain. The default uses dnspython or pydns.
|
|
||||||
#: @return: True if signature verifies or False otherwise
|
|
||||||
#: @raise DKIMException: when the message, signature, or key are badly formed
|
|
||||||
def verify(self,idx=0,dnsfunc=get_txt):
|
|
||||||
sigheaders = [(x,y) for x,y in self.headers if x.lower() == b"dkim-signature"]
|
sigheaders = [(x,y) for x,y in self.headers if x.lower() == b"dkim-signature"]
|
||||||
if len(sigheaders) <= idx:
|
if len(sigheaders) <= idx:
|
||||||
return False
|
return False
|
||||||
@@ -916,7 +912,18 @@ class DKIM(DomainSigner):
|
|||||||
|
|
||||||
include_headers = [x.lower() for x in re.split(br"\s*:\s*", sig[b'h'])]
|
include_headers = [x.lower() for x in re.split(br"\s*:\s*", sig[b'h'])]
|
||||||
self.include_headers = tuple(include_headers)
|
self.include_headers = tuple(include_headers)
|
||||||
|
return sig, include_headers, sigheaders
|
||||||
|
|
||||||
|
#: Verify a DKIM signature.
|
||||||
|
#: @type idx: int
|
||||||
|
#: @param idx: which signature to verify. The first (topmost) signature is 0.
|
||||||
|
#: @type dnsfunc: callable
|
||||||
|
#: @param dnsfunc: an option function to lookup TXT resource records
|
||||||
|
#: for a DNS domain. The default uses dnspython or pydns.
|
||||||
|
#: @return: True if signature verifies or False otherwise
|
||||||
|
#: @raise DKIMException: when the message, signature, or key are badly formed
|
||||||
|
def verify(self,idx=0,dnsfunc=get_txt):
|
||||||
|
sig, include_headers, sigheaders = self.verify_headerprep(idx=0)
|
||||||
return self.verify_sig(sig, include_headers, sigheaders[idx], dnsfunc)
|
return self.verify_sig(sig, include_headers, sigheaders[idx], dnsfunc)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
+1
-20
@@ -94,26 +94,7 @@ class DKIM(dkim.DKIM):
|
|||||||
|
|
||||||
|
|
||||||
async def verify(self,idx=0,dnsfunc=get_txt_async):
|
async def verify(self,idx=0,dnsfunc=get_txt_async):
|
||||||
sigheaders = [(x,y) for x,y in self.headers if x.lower() == b"dkim-signature"]
|
sig, include_headers, sigheaders = self.verify_headerprep(idx=0)
|
||||||
if len(sigheaders) <= idx:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# By default, we validate the first DKIM-Signature line found.
|
|
||||||
try:
|
|
||||||
sig = dkim.parse_tag_value(sigheaders[idx][1])
|
|
||||||
self.signature_fields = sig
|
|
||||||
except dkim.InvalidTagValueList as e:
|
|
||||||
raise dkim.MessageFormatError(e)
|
|
||||||
|
|
||||||
self.logger.debug("sig: %r" % sig)
|
|
||||||
|
|
||||||
dkim.validate_signature_fields(sig)
|
|
||||||
self.domain = sig[b'd']
|
|
||||||
self.selector = sig[b's']
|
|
||||||
|
|
||||||
include_headers = [x.lower() for x in re.split(br"\s*:\s*", sig[b'h'])]
|
|
||||||
self.include_headers = tuple(include_headers)
|
|
||||||
|
|
||||||
return await self.verify_sig(sig, include_headers, sigheaders[idx], dnsfunc)
|
return await self.verify_sig(sig, include_headers, sigheaders[idx], dnsfunc)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user