Handle Unicode in get_txt.
This commit is contained in:
+2
-8
@@ -356,14 +356,8 @@ def verify(message, logger=None, dnsfunc=get_txt):
|
|||||||
(base64.b64encode(bodyhash), sig[b'bh']))
|
(base64.b64encode(bodyhash), sig[b'bh']))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# dnstxt wants Unicode
|
name = sig[b's'] + b"._domainkey." + sig[b'd'] + b"."
|
||||||
try:
|
s = dnsfunc(name)
|
||||||
selector = sig[b's'].decode('ascii')
|
|
||||||
domain = sig[b'd'].decode('ascii')
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
return False
|
|
||||||
name = "%s._domainkey.%s." % (selector, domain)
|
|
||||||
s = dnsfunc(name).encode('utf-8')
|
|
||||||
if not s:
|
if not s:
|
||||||
return False
|
return False
|
||||||
try:
|
try:
|
||||||
|
|||||||
+15
-2
@@ -49,7 +49,20 @@ def get_txt_pydns(name):
|
|||||||
# Prefer dnspython if it's there, otherwise use pydns.
|
# Prefer dnspython if it's there, otherwise use pydns.
|
||||||
try:
|
try:
|
||||||
import dns.resolver
|
import dns.resolver
|
||||||
get_txt = get_txt_dnspython
|
_get_txt = get_txt_dnspython
|
||||||
except ImportError:
|
except ImportError:
|
||||||
import DNS
|
import DNS
|
||||||
get_txt = get_txt_pydns
|
_get_txt = get_txt_pydns
|
||||||
|
|
||||||
|
|
||||||
|
def get_txt(name):
|
||||||
|
"""Return a TXT record associated with a DNS name.
|
||||||
|
|
||||||
|
@param name: The bytestring domain name to look up.
|
||||||
|
"""
|
||||||
|
# pydns needs Unicode, but DKIM's d= is ASCII (already punycoded).
|
||||||
|
try:
|
||||||
|
unicode_name = name.decode('ascii')
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
return None
|
||||||
|
return _get_txt(unicode_name).decode('utf-8')
|
||||||
|
|||||||
@@ -53,8 +53,12 @@ class TestSignAndVerify(unittest.TestCase):
|
|||||||
self.key = read_test_data("test.private")
|
self.key = read_test_data("test.private")
|
||||||
|
|
||||||
def dnsfunc(self, domain):
|
def dnsfunc(self, domain):
|
||||||
|
try:
|
||||||
|
domain = domain.decode('ascii')
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
return None
|
||||||
self.assertEqual('test._domainkey.example.com.', domain)
|
self.assertEqual('test._domainkey.example.com.', domain)
|
||||||
return read_test_data("test.txt").decode('utf-8')
|
return read_test_data("test.txt")
|
||||||
|
|
||||||
def test_verifies(self):
|
def test_verifies(self):
|
||||||
# A message verifies after being signed.
|
# A message verifies after being signed.
|
||||||
|
|||||||
Reference in New Issue
Block a user