Support MTAs with colon separator

This commit is contained in:
Stuart D. Gathman
2024-05-29 11:15:26 -04:00
parent c33de064ee
commit 35416dfc46
2 changed files with 17 additions and 4 deletions
+12 -4
View File
@@ -6,6 +6,8 @@ try:
else: raise RuntimeException('unsupported mode') else: raise RuntimeException('unsupported mode')
self.f = db.DB() self.f = db.DB()
self.f.open(fname,flags=flags) self.f.open(fname,flags=flags)
def __contains__(self,key):
return not not self.f.get(key)
def __getitem__(self,key): def __getitem__(self,key):
v = self.f.get(key) v = self.f.get(key)
if not v: raise KeyError(key) if not v: raise KeyError(key)
@@ -61,13 +63,19 @@ class MTAPolicy(object):
else: else:
sep = b'!' sep = b'!'
pfx = pfx.encode() + sep pfx = pfx.encode() + sep
try: try: # try with localpart@domain
return acf[pfx + self.sender.encode() + sfx].rstrip(b'\x00').decode() return acf[pfx + self.sender.encode() + sfx].rstrip(b'\x00').decode()
except KeyError: except KeyError:
try: try: # try with domain
return acf[pfx + self.domain.encode() + sfx].rstrip(b'\x00').decode() d = self.domain.encode()
k = pfx + d + sfx
while not k in acf and b'.' in d:
# check partial domains
d = b'.'.join(d.split(b'.')[1:])
k = pfx + b'.' + d + sfx
return acf[k].rstrip(b'\x00').decode()
except KeyError: except KeyError:
try: try: # try bare prefix
return acf[pfx + sfx].rstrip(b'\x00').decode() return acf[pfx + sfx].rstrip(b'\x00').decode()
except KeyError: except KeyError:
try: try:
+5
View File
@@ -23,6 +23,8 @@ class PolicyTestCase(unittest.TestCase):
print("Missing test/access") print("Missing test/access")
def testPolicy(self): def testPolicy(self):
self.config.use_colon = True
self.config.use_nulls = True
with MTAPolicy('good@example.com',conf=self.config) as p: with MTAPolicy('good@example.com',conf=self.config) as p:
pol = p.getPolicy('smtp-auth') pol = p.getPolicy('smtp-auth')
self.assertEqual(pol,'OK') self.assertEqual(pol,'OK')
@@ -35,6 +37,9 @@ class PolicyTestCase(unittest.TestCase):
with MTAPolicy('any@random.com',conf=self.config) as p: with MTAPolicy('any@random.com',conf=self.config) as p:
pol = p.getPolicy('smtp-test') pol = p.getPolicy('smtp-test')
self.assertEqual(pol,'REJECT') self.assertEqual(pol,'REJECT')
with MTAPolicy('foo@bar.baz.com',conf=self.config) as p:
pol = p.getPolicy('smtp-test')
self.assertEqual(pol,'WILDCARD')
def suite(): return unittest.makeSuite(PolicyTestCase,'test') def suite(): return unittest.makeSuite(PolicyTestCase,'test')