Merge changes from pyspf to pass test suite.

This commit is contained in:
Stuart Gathman
2006-09-08 22:02:57 +00:00
parent 157f33edb8
commit e0f58cce1f
+350 -121
View File
@@ -2,8 +2,8 @@
"""SPF (Sender Policy Framework) implementation.
Copyright (c) 2003, Terence Way
Portions Copyright (c) 2004,2005,2006 Stuart Gathman <stuart@bmsi.com>
Portions Copyright (c) 2005,2006 Scott Kitterman <scott@kitterman.com>
Portions Copyright (c) 2004,2005 Stuart Gathman <stuart@bmsi.com>
Portions Copyright (c) 2005 Scott Kitterman <scott@kitterman.com>
This module is free software, and you may redistribute it and/or modify
it under the same terms as Python itself, so long as this copyright message
and disclaimer are retained in their original form.
@@ -48,6 +48,9 @@ For news, bugfixes, etc. visit the home page for this implementation at
# Terrence is not responding to email.
#
# $Log$
# Revision 1.25 2006/07/31 15:25:39 customdesigned
# Permerror for multiple TXT SPF records.
#
# Revision 1.24 2006/07/28 01:21:33 customdesigned
# Remove debug print
#
@@ -334,7 +337,7 @@ def isSPF(txt):
MASK = 0xFFFFFFFFL
# Regular expression to look for modifiers
RE_MODIFIER = re.compile(r'^([a-zA-Z0-9_\-\.]+)=')
RE_MODIFIER = re.compile(r'^([a-z][a-z0-9_\-\.]*)=',re.IGNORECASE)
# Regular expression to find macro expansions
RE_CHAR = re.compile(r'%(%|_|-|(\{[a-zA-Z][0-9]*r?[^\}]*\}))')
@@ -342,10 +345,28 @@ RE_CHAR = re.compile(r'%(%|_|-|(\{[a-zA-Z][0-9]*r?[^\}]*\}))')
# Regular expression to break up a macro expansion
RE_ARGS = re.compile(r'([0-9]*)(r?)([^0-9a-zA-Z]*)')
RE_CIDR = re.compile(r'/([1-9]|1[0-9]|2[0-9]|3[0-2])$')
RE_DUAL_CIDR = re.compile(r'//(0|[1-9]\d*)$')
RE_CIDR = re.compile(r'/(0|[1-9]\d*)$')
RE_IP4 = re.compile(r'\.'.join(
[r'(?:\d|[1-9]\d|1\d\d|2[0-4]\d|25[0-5])']*4)+'$')
PAT_IP4 = r'\.'.join([r'(?:\d|[1-9]\d|1\d\d|2[0-4]\d|25[0-5])']*4)
RE_IP4 = re.compile(PAT_IP4+'$')
RE_TOPLAB = re.compile(
r'\.[0-9a-z]*[a-z][0-9a-z]*|[0-9a-z]+-[0-9a-z-]*[0-9a-z]$',re.IGNORECASE)
RE_IP6 = re.compile( '(?:%(hex4)s:){6}%(ls32)s$'
'|::(?:%(hex4)s:){5}%(ls32)s$'
'|(?:%(hex4)s)?::(?:%(hex4)s:){4}%(ls32)s$'
'|(?:(?:%(hex4)s:){0,1}%(hex4)s)?::(?:%(hex4)s:){3}%(ls32)s$'
'|(?:(?:%(hex4)s:){0,2}%(hex4)s)?::(?:%(hex4)s:){2}%(ls32)s$'
'|(?:(?:%(hex4)s:){0,3}%(hex4)s)?::%(hex4)s:%(ls32)s$'
'|(?:(?:%(hex4)s:){0,4}%(hex4)s)?::%(ls32)s$'
'|(?:(?:%(hex4)s:){0,5}%(hex4)s)?::%(hex4)s$'
'|(?:(?:%(hex4)s:){0,6}%(hex4)s)?::$'
% {
'ls32': r'(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|%s)'%PAT_IP4,
'hex4': r'[0-9a-f]{1,4}'
}, re.IGNORECASE)
# Local parts and senders have their delimiters replaced with '.' during
# macro expansion
@@ -353,18 +374,55 @@ RE_IP4 = re.compile(r'\.'.join(
JOINERS = {'l': '.', 's': '.'}
RESULTS = {'+': 'pass', '-': 'fail', '?': 'neutral', '~': 'softfail',
'pass': 'pass', 'fail': 'fail', 'unknown': 'unknown',
'pass': 'pass', 'fail': 'fail', 'permerror': 'permerror',
'error': 'error', 'neutral': 'neutral', 'softfail': 'softfail',
'none': 'none', 'deny': 'fail' }
'none': 'none', 'local': 'local', 'trusted': 'trusted',
'ambiguous': 'ambiguous'}
EXPLANATIONS = {'pass': 'sender SPF verified', 'fail': 'access denied',
'unknown': 'permanent error in processing',
'error': 'temporary error in processing',
'softfail': 'domain in transition',
EXPLANATIONS = {'pass': 'sender SPF authorized',
'fail': 'SPF fail - not authorized',
'permerror': 'permanent error in processing',
'temperror': 'temporary DNS error in processing',
'softfail': 'domain owner discourages use of this host',
'neutral': 'access neither permitted nor denied',
'none': ''
'none': '',
#Note: The following are not formally SPF results
'local': 'No SPF result due to local policy',
'trusted': 'No SPF check - trusted-forwarder.org',
#Ambiguous only used in harsh mode for SPF validation
'ambiguous': 'No error, but results may vary'
}
#Default receiver policies - can be overridden.
POLICY = {'tfwl': False, #Check trusted-forwarder.org
'skip_localhost': True, #Don't check SPF on local connections
'always_helo': False, #Only works if helo_first is also True.
'spf_helo_mustpass': True, #Treat HELO test returning softfail or
#neutral as Fail - HELO should be a single IP per name. No reason to
#accept SPF relaxed provisions for HELO. No affect if None.
'reject_helo_fail': False,
'spf_reject_fail': True,
'spf_reject_neutral': False,
'spf_accept_softfail': True,
'spf_best_guess': True,
'spf_strict': True,
}
# Recommended SMTP codes for certain SPF results. For results not in
# this table the recommendation is to accept the message as authorized.
# An SPF result is never enough to recommend that a message be accepted for
# delivery. Additional checks are generally required.
# The softfail result requires special processing.
SMTP_CODES = {
'fail': [550,'5.7.1'],
'temperror': [451,'4.4.3'],
'permerror': [550,'5.5.2'],
'softfail': [451,'4.3.0']
}
if not POLICY['spf_accept_softfail']:
SMTP_CODES['softfail'] = (550,'5.7.1')
if POLICY['spf_reject_neutral']:
SMTP_CODES['neutral'] = (550,'5.7.1')
# if set to a domain name, search _spf.domain namespace if no SPF record
# found in source domain.
@@ -381,17 +439,46 @@ except NameError:
# standard default SPF record for best_guess
DEFAULT_SPF = 'v=spf1 a/24 mx/24 ptr'
#Whitelisted forwarders here. Additional locally trusted forwarders can be
#added to this record.
TRUSTED_FORWARDERS = 'v=spf1 ?include:spf.trusted-forwarder.org -all'
# maximum DNS lookups allowed
MAX_LOOKUP = 10 #RFC 4408 Para 10.1
MAX_MX = 10 #RFC 4408 Para 10.1
MAX_PTR = 10 #RFC 4408 Para 10.1
MAX_CNAME = 10 # analogous interpretation to MAX_PTR
MAX_RECURSION = 20
ALL_MECHANISMS = ('a', 'mx', 'ptr', 'exists', 'include', 'ip4', 'ip6', 'all')
COMMON_MISTAKES = { 'prt': 'ptr', 'ip': 'ip4', 'ipv4': 'ip4', 'ipv6': 'ip6' }
#If harsh processing, for the validator, is invoked, warn if results
#likely deviate from the publishers intention.
class AmbiguityWarning(Exception):
"SPF Warning - ambiguous results"
def __init__(self,msg,mech=None,ext=None):
Exception.__init__(self,msg,mech)
self.msg = msg
self.mech = mech
self.ext = ext
def __str__(self):
if self.mech:
return '%s: %s'%(self.msg,self.mech)
return self.msg
class TempError(Exception):
"Temporary SPF error"
def __init__(self,msg,mech=None,ext=None):
Exception.__init__(self,msg,mech)
self.msg = msg
self.mech = mech
self.ext = ext
def __str__(self):
if self.mech:
return '%s: %s'%(self.msg,self.mech)
return self.msg
class PermError(Exception):
"Permanent SPF error"
@@ -409,13 +496,10 @@ def check(i, s, h,local=None,receiver=None):
"""Test an incoming MAIL FROM:<s>, from a client with ip address i.
h is the HELO/EHLO domain name.
Returns (result, mta-status-code, explanation) where result in
['pass', 'unknown', 'fail', 'error', 'softfail', 'none', 'neutral' ].
Returns (result, code, explanation) where result in
['pass', 'permerror', 'fail', 'temperror', 'softfail', 'none', 'neutral' ].
Example:
>>> check(i='127.0.0.1', s='terry@wayforward.net', h='localhost')
('pass', 250, 'local connections always pass')
#>>> check(i='61.51.192.42', s='liukebing@bcc.com', h='bmsi.com')
"""
@@ -451,17 +535,22 @@ class query(object):
self.p = None
if receiver:
self.r = receiver
else:
self.r = 'unknown'
# Since the cache does not track Time To Live, it is created
# fresh for each query. It is important for efficiently using
# multiple results provided in DNS answers.
self.cache = {}
self.defexps = dict(EXPLANATIONS)
self.exps = dict(EXPLANATIONS)
self.local = local # local policy
self.libspf_local = local # local policy
self.lookups = 0
# strict can be False, True, or 2 for harsh
# strict can be False, True, or 2 (numeric) for harsh
self.strict = strict
self.perm_error = None
def set_default_explanation(self,exp):
exps = self.exps
for i in 'softfail','fail','unknown':
for i in 'softfail','fail','permerror':
exps[i] = exp
def getp(self):
@@ -477,10 +566,11 @@ class query(object):
"""Return a best guess based on a default SPF record"""
return self.check(spf)
def check(self, spf=None):
"""
Returns (result, mta-status-code, explanation) where result
in ['fail', 'softfail', 'neutral' 'unknown', 'pass', 'error', 'none']
in ['fail', 'softfail', 'neutral' 'permerror', 'pass', 'temperror', 'none']
Examples:
>>> q = query(s='strong-bad@email.example.com',
@@ -489,34 +579,42 @@ class query(object):
('neutral', 250, 'access neither permitted nor denied')
>>> q.check(spf='v=spf1 ip4:192.0.0.0/8 ?all moo')
('unknown', 550, 'SPF Permanent Error: Unknown mechanism found: moo')
('permerror', 550, 'SPF Permanent Error: Unknown mechanism found: moo')
>>> q.check(spf='v=spf1 =a ?all moo')
('unknown', 550, 'SPF Permanent Error: Unknown qualifier, RFC 4408 para 4.6.1, found in: =a')
('permerror', 550, 'SPF Permanent Error: Unknown qualifier, RFC 4408 para 4.6.1, found in: =a')
>>> q.check(spf='v=spf1 ip4:192.0.0.0/8 ~all')
('pass', 250, 'sender SPF verified')
('pass', 250, 'sender SPF authorized')
>>> q.check(spf='v=spf1 ip4:192.0.0.0/8 -all moo=')
('pass', 250, 'sender SPF authorized')
>>> q.check(spf='v=spf1 ip4:192.0.0.0/8 -all match.sub-domains_9=yes')
('pass', 250, 'sender SPF authorized')
>>> q.strict = False
>>> q.check(spf='v=spf1 ip4:192.0.0.0/8 -all moo')
('unknown', 550, 'SPF Permanent Error: Unknown mechanism found: moo')
>>> q.perm_error.ext
('pass', 250, 'sender SPF verified')
('pass', 250, 'sender SPF authorized')
>>> q.check(spf='v=spf1 ip4:192.1.0.0/16 moo -all')
('unknown', 550, 'SPF Permanent Error: Unknown mechanism found: moo')
>>> str(q.perm_error.ext)
'None'
('permerror', 550, 'SPF Permanent Error: Unknown mechanism found: moo')
>>> q.check(spf='v=spf1 ip4:192.1.0.0/16 ~all')
('softfail', 250, 'domain in transition')
('softfail', 250, 'domain owner discourages use of this host')
>>> q.check(spf='v=spf1 -ip4:192.1.0.0/6 ~all')
('fail', 550, 'access denied')
('fail', 550, 'SPF fail - not authorized')
# Assumes DNS available
>>> q.check()
('none', 250, '')
>>> q.check(spf='v=spf1 ip4:1.2.3.4 -a:example.net -all')
('fail', 550, 'SPF fail - not authorized')
>>> q.libspf_local='ip4:192.0.2.3 a:example.org'
>>> q.check(spf='v=spf1 ip4:1.2.3.4 -a:example.net -all')
('pass', 250, 'sender SPF authorized')
"""
self.mech = [] # unknown mechanisms
# If not strict, certain PermErrors (mispelled
@@ -524,23 +622,20 @@ class query(object):
# will continue processing. However, the exception
# that strict processing would raise is saved here
self.perm_error = None
if self.i.startswith('127.'):
return ('pass', 250, 'local connections always pass')
try:
self.lookups = 0
if not spf:
spf = self.dns_spf(self.d)
if self.local and spf:
spf += ' ' + self.local
rc = self.check1(spf, self.d, 0)
if self.perm_error:
# extended processing succeeded, but strict failed
self.perm_error.ext = rc
raise self.perm_error
return rc
if self.libspf_local and spf:
spf = insert_libspf_local_policy(
spf,self.libspf_local)
return self.check1(spf, self.d, 0)
except TempError,x:
return ('error', 450, 'SPF Temporary Error: ' + str(x))
self.prob = x.msg
if x.mech:
self.mech.append(x.mech)
return ('temperror',451, 'SPF Temporary Error: ' + str(x))
except PermError,x:
if not self.perm_error:
self.perm_error = x
@@ -549,7 +644,7 @@ class query(object):
self.mech.append(x.mech)
# Pre-Lentczner draft treats this as an unknown result
# and equivalent to no SPF record.
return ('unknown', 550, 'SPF Permanent Error: ' + str(x))
return ('permerror', 550, 'SPF Permanent Error: ' + str(x))
def check1(self, spf, domain, recursion):
# spf rfc: 3.7 Processing Limits
@@ -565,11 +660,17 @@ class query(object):
# As an extended result, however, it should be
# a PermError.
raise PermError('Too many levels of recursion')
try:
try:
tmp, self.d = self.d, domain
return self.check0(spf,recursion)
finally:
self.d = tmp
except AmbiguityWarning,x:
self.prob = x.msg
if x.mech:
self.mech.append(x.mech)
return ('ambiguous', 000, 'SPF Ambiguity Warning: %s' % x)
def note_error(self,*msg):
if self.strict:
@@ -579,6 +680,8 @@ class query(object):
try:
raise PermError(*msg)
except PermError, x:
# FIXME: keep a list of errors for even friendlier
# diagnostics.
self.perm_error = x
return self.perm_error
@@ -602,11 +705,11 @@ class query(object):
>>> try: q.validate_mechanism('ip4:1.2.3.4/247')
... except PermError,x: print x
Invalid IP4 address: ip4:1.2.3.4/247
Invalid IP4 CIDR length: ip4:1.2.3.4/247
>>> try: q.validate_mechanism('a:example.com:8080')
... except PermError,x: print x
Too many :. Not allowed in domain name.: a:example.com:8080
Invalid domain found (use FQDN): example.com:8080
>>> try: q.validate_mechanism('ip4:1.2.3.444/24')
... except PermError,x: print x
@@ -621,10 +724,13 @@ class query(object):
>>> q.validate_mechanism('~exists:%{i}.%{s1}.100/86400.rate.%{d}')
('~exists:%{i}.%{s1}.100/86400.rate.%{d}', 'exists', '192.0.2.3.com.100/86400.rate.email.example.com', 32, 'softfail')
>>> q.validate_mechanism('a:mail.example.com.')
('a:mail.example.com.', 'a', 'mail.example.com', 32, 'pass')
"""
# a mechanism
m, arg, cidrlength = parse_mechanism(mech, self.d)
# map '?' '+' or '-' to 'unknown' 'pass' or 'fail'
m, arg, cidrlength, cidr6length = parse_mechanism(mech, self.d)
# map '?' '+' or '-' to 'neutral' 'pass' or 'fail'
if m:
result = RESULTS.get(m[0])
if result:
@@ -641,34 +747,56 @@ class query(object):
x = self.note_error(
'Use the ip4 mechanism for ip4 addresses',mech)
m = 'ip4'
# Check for : within the arguement
if arg.count(':') > 0:
raise PermError('Too many :. Not allowed in domain name.',mech)
# validate cidr and dual-cidr
if m in ('a', 'mx', 'ptr'):
if cidrlength is None:
cidrlength = 32;
elif cidrlength > 32:
raise PermError('Invalid IP4 CIDR length',mech)
if cidr6length is None:
cidr6length = 128
elif cidr6length > 128:
raise PermError('Invalid IP6 CIDR length',mech)
elif m == 'ip4':
if cidr6length is not None:
raise PermError('Dual CIDR not allowed',mech)
if cidrlength is None:
cidrlength = 32;
elif cidrlength > 32:
raise PermError('Invalid IP4 CIDR length',mech)
if not RE_IP4.match(arg):
raise PermError('Invalid IP4 address',mech)
elif m == 'ip6':
if cidr6length is not None:
raise PermError('Dual CIDR not allowed',mech)
if cidrlength is None:
cidrlength = 128
elif cidrlength > 128:
raise PermError('Invalid IP6 CIDR length',mech)
if not RE_IP6.match(arg):
raise PermError('Invalid IP6 address',mech)
else:
if cidrlength is not None or cidr6length is not None:
raise PermError('Dual CIDR not allowed',mech)
cidrlength = 32
# validate domain-spec
if m in ('a', 'mx', 'ptr', 'exists', 'include'):
arg = self.expand(arg)
# FQDN must contain at least one '.'
pos = arg.rfind('.')
if not (0 < pos < len(arg) - 1):
raise PermError('Invalid domain found (use FQDN)',
arg)
#Test for all numeric TLD as recommended by RFC 3696
#Note this TLD test may pass non-existant TLDs. 3696
#recommends using DNS lookups to test beyond this
#initial test.
if arg[pos+1:].isdigit():
raise PermError('Top Level Domain may not be all numbers',
arg)
# any trailing dot was removed by expand()
if RE_TOPLAB.split(arg)[-1]:
raise PermError('Invalid domain found (use FQDN)', arg)
if m == 'include':
if arg == self.d:
if mech != 'include':
raise PermError('include has trivial recursion',mech)
raise PermError('include mechanism missing domain',mech)
return mech,m,arg,cidrlength,result
if m == 'ip4' and not RE_IP4.match(arg):
raise PermError('Invalid IP4 address',mech)
#validate 'all' mechanism per RFC 4408 ABNF
if m == 'all' and \
(arg != self.d or mech.count(':') or mech.count('/')):
# validate 'all' mechanism per RFC 4408 ABNF
if m == 'all' and mech.count(':'):
# print '|'+ arg + '|', mech, self.d,
self.note_error(
'Invalid all mechanism format - only qualifier allowed with all'
@@ -677,7 +805,7 @@ class query(object):
return mech,m,arg,cidrlength,result
if m[1:] in ALL_MECHANISMS:
x = self.note_error(
'Unknown qualifier, RFC 4408 para 4.6.1, found in', mech)
'Unknown qualifier, RFC 4408 para 4.6.1, found in',mech)
else:
x = self.note_error('Unknown mechanism found',mech)
return mech,m,arg,cidrlength,x
@@ -694,7 +822,7 @@ class query(object):
# split string by whitespace, drop the 'v=spf1'
spf = spf.split()
# Catch case where SPF record has no spaces
# Catch case where SPF record has no spaces.
# Can never happen with conforming dns_spf(), however
# in the future we might want to give permerror
# for common mistakes like IN TXT "v=spf1" "mx" "-all"
@@ -756,8 +884,13 @@ class query(object):
elif m == 'exists':
self.check_lookups()
try:
if len(self.dns_a(arg)) > 0:
break
except AmbiguityWarning:
# Exists wants no response sometimes so don't raise
# the warning.
pass
elif m == 'a':
self.check_lookups()
@@ -769,7 +902,10 @@ class query(object):
if cidrmatch(self.i, self.dns_mx(arg), cidrlength):
break
elif m == 'ip4' and arg != self.d:
elif m == 'ip4':
if arg == self.d:
raise PermError('Missing IP4 arg',mech)
try:
if cidrmatch(self.i, [arg], cidrlength):
break
@@ -777,6 +913,8 @@ class query(object):
raise PermError('syntax error',mech)
elif m == 'ip6':
if arg == self.d:
raise PermError('Missing IP6 arg',mech)
# Until we support IPV6, we should never
# get an IPv6 connection. So this mech
# will never match.
@@ -792,8 +930,13 @@ class query(object):
else:
# no matches
if redirect:
return self.check1(self.dns_spf(redirect),
redirect, recursion + 1)
#Catch redirect to a non-existant SPF record.
redirect_record = self.dns_spf(redirect)
if not redirect_record:
raise PermError('redirect domain has no SPF record',
redirect)
return self.check1(redirect_record, redirect,
recursion + 1)
else:
result = default
@@ -882,9 +1025,6 @@ class query(object):
>>> q.expand('%{p2}.trusted-domains.example.net')
'example.org.trusted-domains.example.net'
>>> q.expand('%{p2}.trusted-domains.example.net')
'example.org.trusted-domains.example.net'
>>> q.expand('%{p2}.trusted-domains.example.net.')
'example.org.trusted-domains.example.net'
@@ -930,12 +1070,19 @@ class query(object):
if len(a) == 1 and self.strict < 2:
return a[0]
# check official SPF type first when it becomes more popular
try:
b = [t for t in self.dns_99(domain) if isSPF(t)]
except TempError,x:
# some braindead DNS servers hang on type 99 query
if self.strict > 1: raise x
b = []
if len(b) > 1:
raise PermError('Two or more type SPF spf records found.')
if len(b) == 1:
# FIXME: really must fully parse each record
# and compare with appropriate parts case insensitive.
if self.strict >= 2 and len(a) == 1 and a[0] != b[0]:
raise PermError(
if self.strict > 1 and len(a) == 1 and a[0] != b[0]:
#Changed from permerror to warning based on RFC 4408 Auth 48 change
raise AmbiguityWarning(
'v=spf1 records of both type TXT and SPF (type 99) present, but not identical')
return b[0]
if len(a) == 1:
@@ -956,14 +1103,7 @@ class query(object):
def dns_99(self, domainname):
"Get a list of type SPF=99 records for a domain name."
if domainname:
try:
return [''.join(a) for a in self.dns(domainname, 'SPF')]
except TempError,x:
if self.strict: raise x
self.note_error(
'DNS responds, but times out on type99 (SPF) query: %s'%
domainname
)
return []
def dns_mx(self, domainname):
@@ -972,18 +1112,29 @@ class query(object):
"""
# RFC 4408 section 5.4 "mx"
# To prevent DoS attacks, more than 10 MX names MUST NOT be looked up
mxnames = self.dns(domainname, 'MX')
if len(mxnames) > MAX_MX:
self.note_error('More than %d MX records returned'%MAX_MX)
if self.strict:
max = MAX_MX
if self.strict > 1 and len(mxnames) == 0:
raise AmbiguityWarning(
'No MX records found for mx mechanism', domainname)
else:
max = MAX_MX * 4
return [a for mx in self.dns(domainname, 'MX')[:max] \
for a in self.dns_a(mx[1])]
return [a for mx in mxnames[:max] for a in self.dns_a(mx[1])]
def dns_a(self, domainname):
"""Get a list of IP addresses for a domainname."""
if domainname:
if not domainname: return []
if self.strict > 1:
alist = self.dns(domainname, 'A')
if len(alist) == 0:
raise AmbiguityWarning('No A records found for',
domainname)
else:
return alist
return self.dns(domainname, 'A')
return []
def dns_aaaa(self, domainname):
"""Get a list of IPv6 addresses for a domainname."""
@@ -996,6 +1147,20 @@ class query(object):
# To prevent DoS attacks, more than 10 PTR names MUST NOT be looked up
if self.strict:
max = MAX_PTR
if self.strict > 1:
#Break out the number of PTR records returned for testing
try:
ptrnames = self.dns_ptr(i)
ptrip = [p for p in ptrnames if i in self.dns_a(p)]
if len(ptrnames) > max:
warning = 'More than ' + str(max) + ' PTR records returned'
raise AmbiguityWarning(warning, i)
else:
if len(ptrnames) == 0:
raise AmbiguityWarning('No PTR records found for ptr mechanism', ptrnames)
return ptrip
except:
raise AmbiguityWarning('No PTR records found for ptr mechanism', ptrnames)
else:
max = MAX_PTR * 4
return [p for p in self.dns_ptr(i)[:max] if i in self.dns_a(p)]
@@ -1030,6 +1195,7 @@ class query(object):
if not cnames:
cnames = {}
elif len(cnames) >= MAX_CNAME:
#return result # if too many == NX_DOMAIN
raise PermError(
'Length of CNAME chain exceeds %d' % MAX_CNAME)
cnames[name] = cname
@@ -1041,22 +1207,21 @@ class query(object):
def get_header(self,res,receiver=None):
if not receiver:
receiver = self.r
if res in ('unknown','permerror'):
txt = ' '.join([res] + self.mech)
else:
txt = res
if res in ('pass','fail','softfail'):
return '%s (%s: %s) client-ip=%s; envelope-from=%s; helo=%s;' % (
txt,receiver,self.get_header_comment(res),self.i,
res,receiver,self.get_header_comment(res),self.i,
self.l + '@' + self.o, self.h)
if res == 'permerror':
return '%s (%s: %s)' % (' '.join([res] + self.mech),
receiver,self.get_header_comment(res))
return '%s (%s: %s)' % (res,receiver,self.get_header_comment(res))
def get_header_comment(self,res):
"""Return comment for Received-SPF header.
"""
sender = self.o
if res == 'pass':
if self.i.startswith('127.'):
return "localhost is always allowed."
else: return \
return \
"domain of %s designates %s as permitted sender" \
% (sender,self.i)
elif res == 'softfail': return \
@@ -1069,10 +1234,10 @@ class query(object):
"%s is neither permitted nor denied by domain of %s" \
% (self.i,sender)
#"%s does not designate permitted sender hosts" % sender
elif res in ('unknown','permerror'): return \
elif res == 'permerror': return \
"permanent error in processing domain of %s: %s" \
% (sender, self.prob)
elif res in ('error','temperror'): return \
elif res == 'error': return \
"temporary error in processing during lookup of %s" % sender
elif res == 'fail': return \
"domain of %s does not designate %s as permitted sender" \
@@ -1104,45 +1269,50 @@ def split_email(s, h):
def parse_mechanism(str, d):
"""Breaks A, MX, IP4, and PTR mechanisms into a (name, domain,
cidr) tuple. The domain portion defaults to d if not present,
cidr,cidr6) tuple. The domain portion defaults to d if not present,
the cidr defaults to 32 if not present.
Examples:
>>> parse_mechanism('a', 'foo.com')
('a', 'foo.com', 32)
('a', 'foo.com', None, None)
>>> parse_mechanism('a:bar.com', 'foo.com')
('a', 'bar.com', 32)
('a', 'bar.com', None, None)
>>> parse_mechanism('a/24', 'foo.com')
('a', 'foo.com', 24)
('a', 'foo.com', 24, None)
>>> parse_mechanism('A:foo:bar.com/16', 'foo.com')
('a', 'foo:bar.com', 16)
('a', 'foo:bar.com', 16, None)
>>> parse_mechanism('-exists:%{i}.%{s1}.100/86400.rate.%{d}','foo.com')
('-exists', '%{i}.%{s1}.100/86400.rate.%{d}', 32)
('-exists', '%{i}.%{s1}.100/86400.rate.%{d}', None, None)
>>> parse_mechanism('mx:%%%_/.Claranet.de/27','foo.com')
('mx', '%%%_/.Claranet.de', 27)
('mx', '%%%_/.Claranet.de', 27, None)
>>> parse_mechanism('mx:%{d}/27','foo.com')
('mx', '%{d}', 27)
('mx', '%{d}', 27, None)
>>> parse_mechanism('iP4:192.0.0.0/8','foo.com')
('ip4', '192.0.0.0', 8)
('ip4', '192.0.0.0', 8, None)
"""
a = RE_DUAL_CIDR.split(str)
if len(a) == 3:
str, cidr6 = a[0], int(a[1])
else:
cidr6 = None
a = RE_CIDR.split(str)
if len(a) == 3:
a, port = a[0], int(a[1])
str, cidr = a[0], int(a[1])
else:
a, port = str, 32
cidr = None
b = a.split(':',1)
if len(b) == 2:
return b[0].lower(), b[1], port
else:
return a.lower(), d, port
a = str.split(':',1)
if len(a) < 2:
return str.lower(), d, cidr, cidr6
return a[0].lower(), a[1], cidr, cidr6
def reverse_dots(name):
"""Reverse dotted IP addresses or domain names.
@@ -1195,15 +1365,17 @@ def cidrmatch(i, ipaddrs, cidr_length = 32):
>>> cidrmatch('192.168.0.43', ['192.168.0.44', '192.168.0.45'], 24)
1
"""
try:
c = cidr(i, cidr_length)
for ip in ipaddrs:
if cidr(ip, cidr_length) == c:
return True
except socket.error: pass
return False
def cidr(i, n):
"""Convert an IP address string with a CIDR mask into a 32-bit
integer.
or 128-bit integer.
i must be a string of numbers 0..255 separated by dots '.'::
pre: forall([0 <= int(p) < 256 for p in i.split('.')])
@@ -1249,7 +1421,12 @@ def addr2bin(str):
>>> 10 * (2 ** 24) + 93 * (2 ** 16) + 512
173867520
"""
try:
return struct.unpack("!L", socket.inet_aton(str))[0]
except socket.error:
if not socket.has_ipv6: raise
h,l = struct.unpack("!QQ", socket.inet_pton(socket.AF_INET6,str))
return h << 64 | l;
def bin2addr(addr):
"""Convert a numeric IPv4 address into string n.n.n.n form.
@@ -1306,6 +1483,57 @@ def split(str, delimiters, joiner=None):
result.append(element)
return result
def insert_libspf_local_policy(spftxt,local=None):
"""Returns spftxt with local inserted just before last non-fail
mechanism. This is how the libspf{2} libraries handle "local-policy".
Examples:
>>> insert_libspf_local_policy('v=spf1 -all')
'v=spf1 -all'
>>> insert_libspf_local_policy('v=spf1 -all','mx')
'v=spf1 -all'
>>> insert_libspf_local_policy('v=spf1','a mx ptr')
'v=spf1 a mx ptr'
>>> insert_libspf_local_policy('v=spf1 mx -all','a ptr')
'v=spf1 mx a ptr -all'
>>> insert_libspf_local_policy('v=spf1 mx -include:foo.co +all','a ptr')
'v=spf1 mx a ptr -include:foo.co +all'
# FIXME: is this right? If so, "last non-fail" is a bogus description.
>>> insert_libspf_local_policy('v=spf1 mx ?include:foo.co +all','a ptr')
'v=spf1 mx a ptr ?include:foo.co +all'
>>> spf='v=spf1 ip4:1.2.3.4 -a:example.net -all'
>>> local='ip4:192.0.2.3 a:example.org'
>>> insert_libspf_local_policy(spf,local)
'v=spf1 ip4:1.2.3.4 ip4:192.0.2.3 a:example.org -a:example.net -all'
"""
# look to find the all (if any) and then put local
# just after last non-fail mechanism. This is how
# libspf2 handles "local policy", and some people
# apparently find it useful (don't ask me why).
if not local: return spftxt
spf = spftxt.split()[1:]
if spf:
# local policy is SPF mechanisms/modifiers with no
# 'v=spf1' at the start
spf.reverse() #find the last non-fail mechanism
for mech in spf:
# map '?' '+' or '-' to 'neutral' 'pass'
# or 'fail'
if not RESULTS.get(mech[0]):
# actually finds last mech with default result
where = spf.index(mech)
spf[where:where] = [local]
spf.reverse()
local = ' '.join(spf)
break
else:
return spftxt # No local policy adds for v=spf1 -all
# Processing limits not applied to local policy. Suggest
# inserting 'local' mechanism to handle this properly
#MAX_LOOKUP = 100
return 'v=spf1 '+local
def _test():
import doctest, spf
return doctest.testmod(spf)
@@ -1329,6 +1557,7 @@ if __name__ == '__main__':
q = query(i=i, s=s, h=h, receiver=socket.gethostname(),
strict=False)
print q.check(sys.argv[1])
if q.perm_error: print q.perm_error.ext
if q.perm_error and q.perm_error.ext:
print q.perm_error.ext
else:
print USAGE