150 lines
4.3 KiB
Python
150 lines
4.3 KiB
Python
import unittest
|
|
import Milter
|
|
import sample
|
|
import mime
|
|
import rfc822
|
|
import StringIO
|
|
|
|
class TestMilter(sample.sampleMilter):
|
|
|
|
def __init__(self):
|
|
self.logfp = open("test/milter.log","a")
|
|
|
|
def log(self,*msg):
|
|
for i in msg: print >>self.logfp, i,
|
|
print >>self.logfp
|
|
|
|
def replacebody(self,chunk):
|
|
if self._body:
|
|
self._body.write(chunk)
|
|
self.bodyreplaced = 1
|
|
else:
|
|
raise IOError,"replacebody not called from eom()"
|
|
|
|
# FIXME: rfc822 indexing does not really reflect the way chg/add header
|
|
# work for a milter
|
|
def chgheader(self,field,idx,value):
|
|
self.log('chgheader: %s[%d]=%s' % (field,idx,value))
|
|
if value == '':
|
|
del self._msg[field]
|
|
else:
|
|
self._msg[field] = value
|
|
self.headerschanged = 1
|
|
|
|
def addheader(self,field,value):
|
|
self.log('addheader: %s=%s' % (field,value))
|
|
self._msg[field] = value
|
|
self.headerschanged = 1
|
|
|
|
def feedMsg(self,fname):
|
|
self._body = None
|
|
self.bodyreplaced = 0
|
|
self.headerschanged = 0
|
|
fp = open('test/'+fname,'r')
|
|
msg = rfc822.Message(fp)
|
|
rc = self.envfrom('<spam@advertisements.com>')
|
|
if rc != Milter.CONTINUE: return rc
|
|
rc = self.envrcpt('<victim@lamb.com>')
|
|
if rc != Milter.CONTINUE: return rc
|
|
line = None
|
|
for h in msg.headers:
|
|
if h[:1].isspace():
|
|
line = line + h
|
|
continue
|
|
if not line:
|
|
line = h
|
|
continue
|
|
s = line.split(': ',1)
|
|
rc = self.header(s[0],s[1].strip())
|
|
if rc != Milter.CONTINUE: return rc
|
|
line = h
|
|
if line:
|
|
s = line.split(': ',1)
|
|
rc = self.header(s[0],s[1])
|
|
if rc != Milter.CONTINUE: return rc
|
|
rc = self.eoh()
|
|
if rc != Milter.CONTINUE: return rc
|
|
while 1:
|
|
buf = fp.read(8192)
|
|
if len(buf) == 0: break
|
|
rc = self.body(buf)
|
|
if rc != Milter.CONTINUE: return rc
|
|
self._msg = msg
|
|
self._body = StringIO.StringIO()
|
|
rc = self.eom()
|
|
if self.bodyreplaced:
|
|
body = self._body.getvalue()
|
|
else:
|
|
msg.rewindbody()
|
|
body = msg.fp.read()
|
|
self._body = StringIO.StringIO()
|
|
self._body.writelines(msg.headers)
|
|
self._body.write('\n')
|
|
self._body.write(body)
|
|
return rc
|
|
|
|
def connect(self,host='localhost'):
|
|
self._body = None
|
|
self.bodyreplaced = 0
|
|
rc = sample.sampleMilter.connect(self,host,1,0)
|
|
if rc != Milter.CONTINUE and rc != Milter.ACCEPT:
|
|
self.close()
|
|
return rc
|
|
rc = self.hello('spamrelay')
|
|
if rc != Milter.CONTINUE:
|
|
self.close()
|
|
return rc
|
|
|
|
class BMSMilterTestCase(unittest.TestCase):
|
|
|
|
def testDefang(self,fname='virus1'):
|
|
milter = TestMilter()
|
|
rc = milter.connect()
|
|
self.failUnless(rc == Milter.CONTINUE)
|
|
rc = milter.feedMsg(fname)
|
|
self.failUnless(rc == Milter.ACCEPT)
|
|
self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
|
fp = milter._body
|
|
open('test/'+fname+".tstout","w").write(fp.getvalue())
|
|
#self.failUnless(fp.getvalue() == open("test/virus1.out","r").read())
|
|
fp.seek(0)
|
|
msg = mime.MimeMessage(fp)
|
|
s = msg.get_payload(1).get_payload()
|
|
milter.log(s)
|
|
milter.close()
|
|
|
|
def testParse(self,fname='spam7'):
|
|
milter = TestMilter()
|
|
milter.connect('somehost')
|
|
rc = milter.feedMsg(fname)
|
|
self.failUnless(rc == Milter.ACCEPT)
|
|
self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
|
|
fp = milter._body
|
|
open('test/'+fname+".tstout","w").write(fp.getvalue())
|
|
milter.close()
|
|
|
|
def testDefang2(self):
|
|
milter = TestMilter()
|
|
milter.connect('somehost')
|
|
rc = milter.feedMsg('samp1')
|
|
self.failUnless(rc == Milter.ACCEPT)
|
|
self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
|
|
rc = milter.feedMsg("virus3")
|
|
self.failUnless(rc == Milter.ACCEPT)
|
|
self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
|
fp = milter._body
|
|
open("test/virus3.tstout","w").write(fp.getvalue())
|
|
#self.failUnless(fp.getvalue() == open("test/virus3.out","r").read())
|
|
rc = milter.feedMsg("virus6")
|
|
self.failUnless(rc == Milter.ACCEPT)
|
|
self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
|
self.failUnless(milter.headerschanged,"Message headers not adjusted")
|
|
fp = milter._body
|
|
open("test/virus6.tstout","w").write(fp.getvalue())
|
|
milter.close()
|
|
|
|
def suite(): return unittest.makeSuite(BMSMilterTestCase,'test')
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|