Move parse_header to Milter.utils.

Test case for delayed DSN parsing.
Fix plock when source missing or cannot set owner/group.
This commit is contained in:
Stuart Gathman
2007-01-19 23:31:38 +00:00
parent 393aa6140a
commit 4c72135b0e
8 changed files with 112 additions and 66 deletions
+8 -1
View File
@@ -10,6 +10,9 @@
# CBV results.
#
# $Log$
# Revision 1.5 2007/01/11 19:59:40 customdesigned
# Purge old entries in auto_whitelist and send_dsn logs.
#
# Revision 1.4 2007/01/11 04:31:26 customdesigned
# Negative feedback for bad headers. Purge cache logs on startup.
#
@@ -51,7 +54,11 @@ class AddrCache(object):
changed = False
try:
too_old = now - age*24*60*60 # max age in days
for ln in open(self.fname):
try:
fp = open(self.fname)
except OSError:
fp = ()
for ln in fp:
try:
rcpt,ts = ln.strip().split(None,1)
l = time.strptime(ts,AddrCache.time_format)
+9 -5
View File
@@ -11,24 +11,28 @@ class PLock(object):
self.basename = basename
self.fp = None
def lock(self,lockname=None):
def lock(self,lockname=None,mode=0660,strict_perms=False):
"Start an update transaction. Return FILE to write new version."
self.unlock()
if not lockname:
lockname = self.basename + '.lock'
self.lockname = lockname
st = os.stat(self.basename)
try:
st = os.stat(self.basename)
mode |= st.st_mode
except OSError: pass
u = os.umask(0002)
try:
fd = os.open(lockname,os.O_WRONLY+os.O_CREAT+os.O_EXCL,st.st_mode|0660)
fd = os.open(lockname,os.O_WRONLY+os.O_CREAT+os.O_EXCL,mode)
finally:
os.umask(u)
self.fp = os.fdopen(fd,'w')
try:
os.chown(self.lockname,-1,st.st_gid)
except:
self.unlock()
raise
if strict_perms:
self.unlock()
raise
return self.fp
def wlock(self,lockname=None):
+27
View File
@@ -1,7 +1,9 @@
import re
import struct
import socket
import email.Errors
from fnmatch import fnmatchcase
from email.Header import decode_header
ip4re = re.compile(r'^[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*$')
@@ -56,3 +58,28 @@ def parse_addr(t):
pos = t.find('"@')
if pos > 0: return [t[1:pos],t[pos+2:]]
return t.split('@')
def parse_header(val):
"""Decode headers gratuitously encoded to hide the content.
"""
try:
h = decode_header(val)
if not len(h) or (not h[0][1] and len(h) == 1): return val
u = []
for s,enc in h:
if enc:
try:
u.append(unicode(s,enc))
except LookupError:
u.append(unicode(s))
else:
u.append(unicode(s))
u = ''.join(u)
for enc in ('us-ascii','iso-8859-1','utf8'):
try:
return u.encode(enc)
except UnicodeError: continue
except UnicodeDecodeError: pass
except LookupError: pass
except email.Errors.HeaderParseError: pass
return val