Initial sourceforge import.
This commit is contained in:
+290
@@ -0,0 +1,290 @@
|
||||
import unittest
|
||||
import Milter
|
||||
import bms
|
||||
import mime
|
||||
import rfc822
|
||||
import StringIO
|
||||
#import pdb
|
||||
|
||||
class TestMilter(bms.bmsMilter):
|
||||
|
||||
def __init__(self):
|
||||
bms.bmsMilter.__init__(self)
|
||||
self.logfp = open("test/milter.log","a")
|
||||
self._delrcpt = [] # record deleted rcpts for testing
|
||||
self._addrcpt = [] # record added rcpts for testing
|
||||
|
||||
def log(self,*msg):
|
||||
for i in msg: print >>self.logfp, i,
|
||||
print >>self.logfp
|
||||
|
||||
def getsymval(self,name):
|
||||
if name == 'j': return 'test.milter.org'
|
||||
return bms.bmsMilter.getsymval(self,name)
|
||||
|
||||
def replacebody(self,chunk):
|
||||
if self._body:
|
||||
self._body.write(chunk)
|
||||
self.bodyreplaced = 1
|
||||
else:
|
||||
raise IOError,"replacebody not called from eom()"
|
||||
|
||||
# FIXME: rfc822 indexing does not really reflect the way chg/add header
|
||||
# work for a milter
|
||||
def chgheader(self,field,idx,value):
|
||||
if not self._body:
|
||||
raise IOError,"chgheader not called from eom()"
|
||||
self.log('chgheader: %s[%d]=%s' % (field,idx,value))
|
||||
if value == '':
|
||||
del self._msg[field]
|
||||
else:
|
||||
self._msg[field] = value
|
||||
self.headerschanged = 1
|
||||
|
||||
def addheader(self,field,value):
|
||||
if not self._body:
|
||||
raise IOError,"addheader not called from eom()"
|
||||
self.log('addheader: %s=%s' % (field,value))
|
||||
self._msg[field] = value
|
||||
self.headerschanged = 1
|
||||
|
||||
def delrcpt(self,rcpt):
|
||||
if not self._body:
|
||||
raise IOError,"delrcpt not called from eom()"
|
||||
self._delrcpt.append(rcpt)
|
||||
|
||||
def addrcpt(self,rcpt):
|
||||
if not self._body:
|
||||
raise IOError,"addrcpt not called from eom()"
|
||||
self._addrcpt.append(rcpt)
|
||||
|
||||
def setreply(self,rcode,xcode,msg):
|
||||
self.reply = (rcode,xcode,msg)
|
||||
|
||||
def feedFile(self,fp,sender="spam@adv.com",rcpt="victim@lamb.com"):
|
||||
self._body = None
|
||||
self.bodyreplaced = 0
|
||||
self.headerschanged = 0
|
||||
self.reply = None
|
||||
msg = rfc822.Message(fp)
|
||||
rc = self.envfrom('<%s>'%sender)
|
||||
if rc != Milter.CONTINUE: return rc
|
||||
rc = self.envrcpt('<%s>'%rcpt)
|
||||
if rc != Milter.CONTINUE: return rc
|
||||
line = None
|
||||
for h in msg.headers:
|
||||
if h[:1].isspace():
|
||||
line = line + h
|
||||
continue
|
||||
if not line:
|
||||
line = h
|
||||
continue
|
||||
s = line.split(': ',1)
|
||||
if len(s) > 1: val = s[1].strip()
|
||||
else: val = ''
|
||||
rc = self.header(s[0],val)
|
||||
if rc != Milter.CONTINUE: return rc
|
||||
line = h
|
||||
if line:
|
||||
s = line.split(': ',1)
|
||||
rc = self.header(s[0],s[1])
|
||||
if rc != Milter.CONTINUE: return rc
|
||||
rc = self.eoh()
|
||||
if rc != Milter.CONTINUE: return rc
|
||||
while 1:
|
||||
buf = fp.read(8192)
|
||||
if len(buf) == 0: break
|
||||
rc = self.body(buf)
|
||||
if rc != Milter.CONTINUE: return rc
|
||||
self._msg = msg
|
||||
self._body = StringIO.StringIO()
|
||||
rc = self.eom()
|
||||
if self.bodyreplaced:
|
||||
body = self._body.getvalue()
|
||||
else:
|
||||
msg.rewindbody()
|
||||
body = msg.fp.read()
|
||||
self._body = StringIO.StringIO()
|
||||
self._body.writelines(msg.headers)
|
||||
self._body.write('\n')
|
||||
self._body.write(body)
|
||||
return rc
|
||||
|
||||
def feedMsg(self,fname,sender="spam@adv.com",rcpt="victim@lamb.com"):
|
||||
fp = open('test/'+fname,'r')
|
||||
rc = self.feedFile(fp,sender,rcpt)
|
||||
fp.close()
|
||||
return rc
|
||||
|
||||
def connect(self,host='localhost'):
|
||||
self._body = None
|
||||
self.bodyreplaced = 0
|
||||
rc = bms.bmsMilter.connect(self,host,1,('1.2.3.4',1234))
|
||||
if rc != Milter.CONTINUE and rc != Milter.ACCEPT:
|
||||
self.close()
|
||||
return rc
|
||||
rc = self.hello('spamrelay')
|
||||
if rc != Milter.CONTINUE:
|
||||
self.close()
|
||||
return rc
|
||||
|
||||
class BMSMilterTestCase(unittest.TestCase):
|
||||
|
||||
def testDefang(self,fname='virus1'):
|
||||
milter = TestMilter()
|
||||
rc = milter.connect('testDefang')
|
||||
self.assertEqual(rc,Milter.CONTINUE)
|
||||
rc = milter.feedMsg(fname)
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
||||
fp = milter._body
|
||||
open('test/'+fname+".tstout","w").write(fp.getvalue())
|
||||
#self.failUnless(fp.getvalue() == open("test/virus1.out","r").read())
|
||||
fp.seek(0)
|
||||
msg = mime.MimeMessage(fp)
|
||||
str = msg.get_payload(1).get_payload()
|
||||
milter.log(str)
|
||||
milter.close()
|
||||
|
||||
# test some spams that crashed our parser
|
||||
def testParse(self,fname='spam7'):
|
||||
milter = TestMilter()
|
||||
milter.connect('testParse')
|
||||
rc = milter.feedMsg(fname)
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
|
||||
fp = milter._body
|
||||
open('test/'+fname+".tstout","w").write(fp.getvalue())
|
||||
milter.connect('pro-send.com')
|
||||
rc = milter.feedMsg('spam8')
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
|
||||
rc = milter.feedMsg('bounce')
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
|
||||
rc = milter.feedMsg('bounce1')
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
|
||||
milter.close()
|
||||
|
||||
def testDefang2(self):
|
||||
milter = TestMilter()
|
||||
milter.connect('testDefang2')
|
||||
rc = milter.feedMsg('samp1')
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
|
||||
rc = milter.feedMsg("virus3")
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
||||
fp = milter._body
|
||||
open("test/virus3.tstout","w").write(fp.getvalue())
|
||||
#self.failUnless(fp.getvalue() == open("test/virus3.out","r").read())
|
||||
rc = milter.feedMsg("virus6")
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
||||
self.failUnless(milter.headerschanged,"Message headers not adjusted")
|
||||
fp = milter._body
|
||||
open("test/virus6.tstout","w").write(fp.getvalue())
|
||||
milter.close()
|
||||
|
||||
def testDefang3(self):
|
||||
milter = TestMilter()
|
||||
milter.connect('testDefang3')
|
||||
# test script removal on complex HTML attachment
|
||||
rc = milter.feedMsg('amazon')
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
||||
fp = milter._body
|
||||
open("test/amazon.tstout","w").write(fp.getvalue())
|
||||
# test defanging Klez virus
|
||||
rc = milter.feedMsg("virus13")
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
||||
fp = milter._body
|
||||
open("test/virus13.tstout","w").write(fp.getvalue())
|
||||
# test script removal on quoted-printable HTML attachment
|
||||
# sgmllib can't handle the <![if cond]> syntax
|
||||
rc = milter.feedMsg('spam44')
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failIf(milter.bodyreplaced,"Message body replaced")
|
||||
fp = milter._body
|
||||
open("test/spam44.tstout","w").write(fp.getvalue())
|
||||
milter.close()
|
||||
|
||||
def testRFC822(self):
|
||||
milter = TestMilter()
|
||||
milter.connect('testRFC822')
|
||||
# test encoded rfc822 attachment
|
||||
#pdb.set_trace()
|
||||
rc = milter.feedMsg('test8')
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
||||
#self.failIf(milter.bodyreplaced,"Message body replaced")
|
||||
fp = milter._body
|
||||
open("test/test8.tstout","w").write(fp.getvalue())
|
||||
rc = milter.feedMsg('virus7')
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
||||
#self.failIf(milter.bodyreplaced,"Message body replaced")
|
||||
fp = milter._body
|
||||
open("test/virus7.tstout","w").write(fp.getvalue())
|
||||
|
||||
def testSmartAlias(self):
|
||||
milter = TestMilter()
|
||||
milter.connect('testSmartAlias')
|
||||
# test smart alias feature
|
||||
key = ('foo@bar.com','baz@bat.com')
|
||||
bms.smart_alias[key] = ['ham@eggs.com']
|
||||
rc = milter.feedMsg('test8',key[0],key[1])
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
||||
self.failUnless(milter._delrcpt == ['<baz@bat.com>'])
|
||||
self.failUnless(milter._addrcpt == ['<ham@eggs.com>'])
|
||||
|
||||
def testBadBoundary(self):
|
||||
milter = TestMilter()
|
||||
milter.connect('testBadBoundary')
|
||||
# test rfc822 attachment with invalid boundaries
|
||||
#pdb.set_trace()
|
||||
rc = milter.feedMsg('bound')
|
||||
self.assertEqual(rc,Milter.REJECT)
|
||||
self.assertEqual(milter.reply[0],'554')
|
||||
#self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
||||
self.failIf(milter.bodyreplaced,"Message body replaced")
|
||||
fp = milter._body
|
||||
open("test/bound.tstout","w").write(fp.getvalue())
|
||||
|
||||
def testCompoundFilename(self):
|
||||
milter = TestMilter()
|
||||
milter.connect('testCompoundFilename')
|
||||
# test rfc822 attachment with invalid boundaries
|
||||
#pdb.set_trace()
|
||||
rc = milter.feedMsg('test1')
|
||||
self.assertEqual(rc,Milter.ACCEPT)
|
||||
#self.failUnless(milter.bodyreplaced,"Message body not replaced")
|
||||
self.failIf(milter.bodyreplaced,"Message body replaced")
|
||||
fp = milter._body
|
||||
open("test/test1.tstout","w").write(fp.getvalue())
|
||||
|
||||
# def testReject(self):
|
||||
# "Test content based spam rejection."
|
||||
# milter = TestMilter()
|
||||
# milter.connect('gogo-china.com')
|
||||
# rc = milter.feedMsg('big5');
|
||||
# self.failUnless(rc == Milter.REJECT)
|
||||
# milter.close();
|
||||
|
||||
def suite(): return unittest.makeSuite(BMSMilterTestCase,'test')
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
if len(sys.argv) > 1:
|
||||
for fname in sys.argv[1:]:
|
||||
milter = TestMilter()
|
||||
milter.connect('main')
|
||||
fp = open(fname,'r')
|
||||
rc = milter.feedFile(fp)
|
||||
fp = milter._body
|
||||
sys.stdout.write(fp.getvalue())
|
||||
else:
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user