diff --git a/Milter/test.py b/Milter/test.py new file mode 100644 index 0000000..77edbc1 --- /dev/null +++ b/Milter/test.py @@ -0,0 +1,134 @@ +## @package Milter.test +# A test framework for milters + +import rfc822 +import StringIO +import Milter + +## +# +class TestBase(object): + + _protocol = 0 + def __init__(self): + self.logfp = open("test/milter.log","a") + self._delrcpt = [] # record deleted rcpts for testing + self._addrcpt = [] # record added rcpts for testing + self._macros = { } + + def log(self,*msg): + for i in msg: print >>self.logfp, i, + print >>self.logfp + + def setsymval(self,name,val,step=None): + self._macros[name] = val + + def getsymval(self,name): + return self._macros.get(name,'') + + def replacebody(self,chunk): + if self._body: + self._body.write(chunk) + self.bodyreplaced = True + else: + raise IOError,"replacebody not called from eom()" + + # FIXME: rfc822 indexing does not really reflect the way chg/add header + # work for a milter + def chgheader(self,field,idx,value): + if not self._body: + raise IOError,"chgheader not called from eom()" + self.log('chgheader: %s[%d]=%s' % (field,idx,value)) + if value == '': + del self._msg[field] + else: + self._msg[field] = value + self.headerschanged = True + + def addheader(self,field,value,idx=-1): + if not self._body: + raise IOError,"addheader not called from eom()" + self.log('addheader: %s=%s' % (field,value)) + self._msg[field] = value + self.headerschanged = True + + def delrcpt(self,rcpt): + if not self._body: + raise IOError,"delrcpt not called from eom()" + self._delrcpt.append(rcpt) + + def addrcpt(self,rcpt): + if not self._body: + raise IOError,"addrcpt not called from eom()" + self._addrcpt.append(rcpt) + + def setreply(self,rcode,xcode,msg): + self.reply = (rcode,xcode,msg) + + def feedFile(self,fp,sender="spam@adv.com",rcpt="victim@lamb.com"): + self._body = None + self.bodyreplaced = False + self.headerschanged = False + self.reply = None + msg = rfc822.Message(fp) + rc = self.envfrom('<%s>'%sender) + if rc != Milter.CONTINUE: return rc + rc = self.envrcpt('<%s>'%rcpt) + if rc != Milter.CONTINUE: return rc + line = None + for h in msg.headers: + if h[:1].isspace(): + line = line + h + continue + if not line: + line = h + continue + s = line.split(': ',1) + if len(s) > 1: val = s[1].strip() + else: val = '' + rc = self.header(s[0],val) + if rc != Milter.CONTINUE: return rc + line = h + if line: + s = line.split(': ',1) + rc = self.header(s[0],s[1]) + if rc != Milter.CONTINUE: return rc + rc = self.eoh() + if rc != Milter.CONTINUE: return rc + while 1: + buf = fp.read(8192) + if len(buf) == 0: break + rc = self.body(buf) + if rc != Milter.CONTINUE: return rc + self._msg = msg + self._body = StringIO.StringIO() + rc = self.eom() + if self.bodyreplaced: + body = self._body.getvalue() + else: + msg.rewindbody() + body = msg.fp.read() + self._body = StringIO.StringIO() + self._body.writelines(msg.headers) + self._body.write('\n') + self._body.write(body) + return rc + + def feedMsg(self,fname,sender="spam@adv.com",rcpt="victim@lamb.com"): + fp = open('test/'+fname,'r') + rc = self.feedFile(fp,sender,rcpt) + fp.close() + return rc + + def connect(self,host='localhost',helo='spamrelay',ip='1.2.3.4'): + self._body = None + self.bodyreplaced = False + rc = super(TestBase,self).connect(host,1,(ip,1234)) + if rc != Milter.CONTINUE and rc != Milter.ACCEPT: + self.close() + return rc + rc = self.hello(helo) + if rc != Milter.CONTINUE: + self.close() + return rc + diff --git a/testsample.py b/testsample.py index 98ae969..2e08454 100644 --- a/testsample.py +++ b/testsample.py @@ -4,97 +4,12 @@ import sample import mime import rfc822 import StringIO +from Milter.test import TestBase -class TestMilter(sample.sampleMilter): - - _protocol = 0 +class TestMilter(TestBase,sample.sampleMilter): def __init__(self): - self.logfp = open("test/milter.log","a") - - def log(self,*msg): - for i in msg: print >>self.logfp, i, - print >>self.logfp - - def replacebody(self,chunk): - if self._body: - self._body.write(chunk) - self.bodyreplaced = True - else: - raise IOError,"replacebody not called from eom()" - - # FIXME: rfc822 indexing does not really reflect the way chg/add header - # work for a milter - def chgheader(self,field,idx,value): - self.log('chgheader: %s[%d]=%s' % (field,idx,value)) - if value == '': - del self._msg[field] - else: - self._msg[field] = value - self.headerschanged = True - - def addheader(self,field,value): - self.log('addheader: %s=%s' % (field,value)) - self._msg[field] = value - self.headerschanged = True - - def feedMsg(self,fname): - self._body = None - self.bodyreplaced = False - self.headerschanged = 0 - fp = open('test/'+fname,'r') - msg = rfc822.Message(fp) - rc = self.envfrom('') - if rc != Milter.CONTINUE: return rc - rc = self.envrcpt('') - if rc != Milter.CONTINUE: return rc - line = None - for h in msg.headers: - if h[:1].isspace(): - line = line + h - continue - if not line: - line = h - continue - s = line.split(': ',1) - rc = self.header(s[0],s[1].strip()) - if rc != Milter.CONTINUE: return rc - line = h - if line: - s = line.split(': ',1) - rc = self.header(s[0],s[1]) - if rc != Milter.CONTINUE: return rc - rc = self.eoh() - if rc != Milter.CONTINUE: return rc - while 1: - buf = fp.read(8192) - if len(buf) == 0: break - rc = self.body(buf) - if rc != Milter.CONTINUE: return rc - self._msg = msg - self._body = StringIO.StringIO() - rc = self.eom() - if self.bodyreplaced: - body = self._body.getvalue() - else: - msg.rewindbody() - body = msg.fp.read() - self._body = StringIO.StringIO() - self._body.writelines(msg.headers) - self._body.write('\n') - self._body.write(body) - return rc - - def connect(self,host='localhost'): - self._body = None - self.bodyreplaced = False - rc = sample.sampleMilter.connect(self,host,1,0) - if rc != Milter.CONTINUE and rc != Milter.ACCEPT: - self.close() - return rc - rc = self.hello('spamrelay') - if rc != Milter.CONTINUE: - self.close() - return rc + TestBase.__init__(self) + sample.sampleMilter.__init__(self) class BMSMilterTestCase(unittest.TestCase):