Fix some test cases and bugs found on py3 bmsmilter install.
This commit is contained in:
+4
-4
@@ -358,9 +358,9 @@ class Base(object):
|
||||
if e == 'bytes':
|
||||
#self.envfrom_bytes = self.envfrom
|
||||
return self.envfrom(*b)
|
||||
s = (v.decode(encoding='utf-8',errors=e) for v in b)
|
||||
s = [v.decode(encoding='utf-8',errors=e) for v in b]
|
||||
except UnicodeDecodeError: s = b
|
||||
return self.envfrom(fld,*s)
|
||||
return self.envfrom(s[0],*s[1:])
|
||||
## Called when the SMTP client says MAIL FROM. Called by the
|
||||
# <a href="milter_api/xxfi_envfrom.html">
|
||||
# xxfi_envfrom</a> callback.
|
||||
@@ -383,9 +383,9 @@ class Base(object):
|
||||
if e == 'bytes':
|
||||
#self.envrcpt_bytes = self.envrcpt
|
||||
return self.envrcpt(*b)
|
||||
s = (v.decode(encoding='utf-8',errors=e) for v in b)
|
||||
s = [v.decode(encoding='utf-8',errors=e) for v in b]
|
||||
except UnicodeDecodeError: s = b
|
||||
return self.envrcpt(fld,*s)
|
||||
return self.envrcpt(s[0],*s[1:])
|
||||
## Called when the SMTP client says RCPT TO. Called by the
|
||||
# <a href="milter_api/xxfi_envrcpt.html">
|
||||
# xxfi_envrcpt</a> callback.
|
||||
|
||||
@@ -16,6 +16,7 @@ try:
|
||||
f = DB()
|
||||
f.open(fname,mode)
|
||||
return f
|
||||
except ModuleNotFoundError: raise
|
||||
except:
|
||||
import anydbm as dbm
|
||||
dbmopen = dbm.open
|
||||
|
||||
@@ -125,7 +125,7 @@ class MimeMessage(Message):
|
||||
"""Return a list of (attr,name) pairs of attributes that IE might
|
||||
interpret as a name - and hence decide to execute this message."""
|
||||
names = []
|
||||
for attr,val in self._get_params_preserve([],'content-type'):
|
||||
for attr,val in self.get_params([],'content-type',False):
|
||||
if isinstance(val, tuple):
|
||||
# It's an RFC 2231 encoded parameter
|
||||
newvalue = _unquotevalue(val)
|
||||
@@ -195,6 +195,11 @@ class MimeMessage(Message):
|
||||
|
||||
def get_payload(self,i=None,decode=False):
|
||||
msg = self.submsg
|
||||
if msg is None:
|
||||
t = self.get_content_type().lower()
|
||||
if t == 'message/rfc822' or t.startswith('multipart/'):
|
||||
msg = super().get_payload()
|
||||
self.submsg = msg
|
||||
if isinstance(msg,Message) and msg.ismodified():
|
||||
self.set_payload([msg])
|
||||
return Message.get_payload(self,i,decode)
|
||||
@@ -213,7 +218,11 @@ class MimeMessage(Message):
|
||||
if t == 'message/rfc822' or t.startswith('multipart/'):
|
||||
if not self.submsg:
|
||||
txt = self.get_payload()
|
||||
if type(txt) == str:
|
||||
if type(txt) is bytes:
|
||||
self.submsg = email.message_from_bytes(txt,MimeMessage)
|
||||
for part in self.submsg.walk():
|
||||
part.modified = False
|
||||
elif type(txt) is str:
|
||||
txt = self.get_payload(decode=True)
|
||||
self.submsg = email.message_from_string(txt,MimeMessage)
|
||||
for part in self.submsg.walk():
|
||||
@@ -274,17 +283,24 @@ def check_name(msg,savname=None,ckname=check_ext,scan_zip=False):
|
||||
msg["Content-Type"] = "text/plain; name="+name
|
||||
return Milter.CONTINUE
|
||||
|
||||
def check_attachments(msg,check):
|
||||
def check_attachments(msg,check,lev=None):
|
||||
"""Scan attachments.
|
||||
msg MimeMessage
|
||||
check function(MimeMessage): int
|
||||
Return CONTINUE, REJECT, ACCEPT
|
||||
"""
|
||||
if msg.is_multipart():
|
||||
if not lev: lev = []
|
||||
lev.append(1)
|
||||
if msg.get_content_type().endswith('/rfc822'):
|
||||
foo = 1
|
||||
for i in msg.get_payload():
|
||||
rc = check_attachments(i,check)
|
||||
print('chkm',lev,msg.get_content_type())
|
||||
rc = check_attachments(i,check,lev=lev)
|
||||
if rc != Milter.CONTINUE: return rc
|
||||
lev[-1] += 1
|
||||
return Milter.CONTINUE
|
||||
print('chk',lev,msg.get_content_type())
|
||||
return check(msg)
|
||||
|
||||
# save call context for Python without nested_scopes
|
||||
|
||||
+27
@@ -192,6 +192,33 @@ class MimeTestCase(unittest.TestCase):
|
||||
self.assertEqual(self.filename,"7501'S FOR TWO GOLDEN SOURCES SHIPMENTS FOR TAX & DUTY PURPOSES ONLY.PDF")
|
||||
self.assertEqual(rc,Milter.CONTINUE)
|
||||
|
||||
def test_getnames(self):
|
||||
names = []
|
||||
self.sawpif = False
|
||||
def do_part(m):
|
||||
n = m.getnames()
|
||||
a = names
|
||||
a += n
|
||||
return Milter.CONTINUE
|
||||
def chk_part(m):
|
||||
for k,n in m.getnames():
|
||||
if n and n.lower().endswith('.pif'):
|
||||
self.sawpif = True
|
||||
s = m.get_submsg()
|
||||
print(m.get_content_type(),type(s),'modified:',m.ismodified())
|
||||
if isinstance(s,email.message.Message):
|
||||
return mime.check_attachments(s,chk_part)
|
||||
return Milter.CONTINUE
|
||||
|
||||
with self.zf.open('virus7','r') as fp:
|
||||
msg = mime.message_from_file(fp)
|
||||
self.assertTrue(msg.ismultipart())
|
||||
mime.check_attachments(msg,do_part)
|
||||
self.assertTrue(('filename','application.pif') in names)
|
||||
self.assertFalse(self.sawpif)
|
||||
mime.check_attachments(msg,chk_part)
|
||||
self.assertTrue(self.sawpif)
|
||||
|
||||
def testHTML(self,fname=""):
|
||||
result = StringIO()
|
||||
filter = mime.HTMLScriptFilter(result)
|
||||
|
||||
Reference in New Issue
Block a user