From 35416dfc46157aa1f7b157f2be5a419ccc6c58a2 Mon Sep 17 00:00:00 2001 From: "Stuart D. Gathman" Date: Wed, 29 May 2024 11:15:26 -0400 Subject: [PATCH] Support MTAs with colon separator --- Milter/policy.py | 16 ++++++++++++---- testpolicy.py | 5 +++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/Milter/policy.py b/Milter/policy.py index c1d1711..0adc5a7 100644 --- a/Milter/policy.py +++ b/Milter/policy.py @@ -6,6 +6,8 @@ try: else: raise RuntimeException('unsupported mode') self.f = db.DB() self.f.open(fname,flags=flags) + def __contains__(self,key): + return not not self.f.get(key) def __getitem__(self,key): v = self.f.get(key) if not v: raise KeyError(key) @@ -61,13 +63,19 @@ class MTAPolicy(object): else: sep = b'!' pfx = pfx.encode() + sep - try: + try: # try with localpart@domain return acf[pfx + self.sender.encode() + sfx].rstrip(b'\x00').decode() except KeyError: - try: - return acf[pfx + self.domain.encode() + sfx].rstrip(b'\x00').decode() + try: # try with domain + d = self.domain.encode() + k = pfx + d + sfx + while not k in acf and b'.' in d: + # check partial domains + d = b'.'.join(d.split(b'.')[1:]) + k = pfx + b'.' + d + sfx + return acf[k].rstrip(b'\x00').decode() except KeyError: - try: + try: # try bare prefix return acf[pfx + sfx].rstrip(b'\x00').decode() except KeyError: try: diff --git a/testpolicy.py b/testpolicy.py index 42b6b20..f09acbb 100644 --- a/testpolicy.py +++ b/testpolicy.py @@ -23,6 +23,8 @@ class PolicyTestCase(unittest.TestCase): print("Missing test/access") def testPolicy(self): + self.config.use_colon = True + self.config.use_nulls = True with MTAPolicy('good@example.com',conf=self.config) as p: pol = p.getPolicy('smtp-auth') self.assertEqual(pol,'OK') @@ -35,6 +37,9 @@ class PolicyTestCase(unittest.TestCase): with MTAPolicy('any@random.com',conf=self.config) as p: pol = p.getPolicy('smtp-test') self.assertEqual(pol,'REJECT') + with MTAPolicy('foo@bar.baz.com',conf=self.config) as p: + pol = p.getPolicy('smtp-test') + self.assertEqual(pol,'WILDCARD') def suite(): return unittest.makeSuite(PolicyTestCase,'test')