1030 lines
28 KiB
Python
Executable File
1030 lines
28 KiB
Python
Executable File
#!/usr/bin/env python
|
|
"""SPF (Sender-Permitted From) implementation.
|
|
|
|
Copyright (c) 2003, Terence Way
|
|
This module is free software, and you may redistribute it and/or modify
|
|
it under the same terms as Python itself, so long as this copyright message
|
|
and disclaimer are retained in their original form.
|
|
|
|
IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
|
|
SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
|
|
THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
|
|
DAMAGE.
|
|
|
|
THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
|
|
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
|
|
PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
|
|
AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
|
|
SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
|
|
|
|
For more information about SPF, a tool against email forgery, see
|
|
http://spf.pobox.com
|
|
|
|
For news, bugfixes, etc. visit the home page for this implementation at
|
|
http://www.wayforward.net/spf/
|
|
"""
|
|
|
|
# Changes:
|
|
# 9-dec-2003, v1.1, Meng Weng Wong added PTR code, THANK YOU
|
|
# 11-dec-2003, v1.2, ttw added macro expansion, exp=, and redirect=
|
|
# 13-dec-2003, v1.3, ttw added %{o} original domain macro,
|
|
# print spf result on command line, support default=,
|
|
# support localhost, follow DNS CNAMEs, cache DNS results
|
|
# during query, support Python 2.2 for Mac OS X
|
|
# 16-dec-2003, v1.4, ttw fixed include handling (include is a mechanism,
|
|
# complete with status results, so -include: should work.
|
|
# Expand macros AFTER looking for status characters ?-+
|
|
# so altavista.com SPF records work.
|
|
# 17-dec-2003, v1.5, ttw use socket.inet_aton() instead of DNS.addr2bin, so
|
|
# n, n.n, and n.n.n forms for IPv4 addresses work, and to
|
|
# ditch the annoying Python 2.4 FutureWarning
|
|
# 18-dec-2003, v1.6, Failures on Intel hardware: endianness. Use ! on
|
|
# struct.pack(), struct.unpack().
|
|
#
|
|
# Development taken over by Stuart Gathman <stuart@bmsi.com> since
|
|
# Terrence is not responding to email.
|
|
#
|
|
# $Log$
|
|
# Revision 1.24 2005/03/16 21:58:39 stuart
|
|
# Change Milter module to package.
|
|
#
|
|
# Revision 1.22 2005/02/09 17:52:59 stuart
|
|
# Report DNS errors as PermError rather than unknown.
|
|
#
|
|
# Revision 1.21 2004/11/20 16:37:03 stuart
|
|
# Handle multi-segment TXT records.
|
|
#
|
|
# Revision 1.20 2004/11/19 06:10:30 stuart
|
|
# Use PermError exception instead of reporting unknown.
|
|
#
|
|
# Revision 1.19 2004/11/09 23:00:18 stuart
|
|
# Limit recursion and DNS lookups separately.
|
|
#
|
|
#
|
|
# Revision 1.17 2004/09/10 18:08:26 stuart
|
|
# Return unknown for null mechanism
|
|
#
|
|
# Revision 1.16 2004/09/04 23:27:06 stuart
|
|
# More mechanism aliases.
|
|
#
|
|
# Revision 1.15 2004/08/30 21:19:05 stuart
|
|
# Return unknown for invalid ip syntax in mechanism
|
|
#
|
|
# Revision 1.14 2004/08/23 02:28:24 stuart
|
|
# Remove Perl usage message.
|
|
#
|
|
# Revision 1.13 2004/07/23 19:23:12 stuart
|
|
# Always fail to match on ip6, until we support it properly.
|
|
#
|
|
# Revision 1.12 2004/07/23 18:48:15 stuart
|
|
# Fold CID parsing into spf
|
|
#
|
|
# Revision 1.11 2004/07/21 21:32:01 stuart
|
|
# Handle CID records (Microsoft XML format).
|
|
#
|
|
# Revision 1.10 2004/04/19 22:12:11 stuart
|
|
# Release 0.6.9
|
|
#
|
|
# Revision 1.9 2004/04/18 03:29:35 stuart
|
|
# Pass most tests except -local and -rcpt-to
|
|
#
|
|
# Revision 1.8 2004/04/17 22:17:55 stuart
|
|
# Header comment method.
|
|
#
|
|
# Revision 1.7 2004/04/17 18:22:48 stuart
|
|
# Support default explanation.
|
|
#
|
|
# Revision 1.6 2004/04/06 20:18:02 stuart
|
|
# Fix bug in include
|
|
#
|
|
# Revision 1.5 2004/04/05 22:29:46 stuart
|
|
# SPF best_guess,
|
|
#
|
|
# Revision 1.4 2004/03/25 03:27:34 stuart
|
|
# Support delegation of SPF records.
|
|
#
|
|
# Revision 1.3 2004/03/13 12:23:23 stuart
|
|
# Expanded result codes. Tolerate common method misspellings.
|
|
#
|
|
|
|
__author__ = "Terence Way"
|
|
__email__ = "terry@wayforward.net"
|
|
__version__ = "1.6: December 18, 2003"
|
|
MODULE = 'spf'
|
|
|
|
USAGE = """To check an incoming mail request:
|
|
% python spf.py {ip} {sender} {helo}
|
|
% python spf.py 69.55.226.139 tway@optsw.com mx1.wayforward.net
|
|
|
|
To test an SPF record:
|
|
% python spf.py "v=spf1..." {ip} {sender} {helo}
|
|
% python spf.py "v=spf1 +mx +ip4:10.0.0.1 -all" 10.0.0.1 tway@foo.com a
|
|
|
|
To fetch an SPF record:
|
|
% python spf.py {domain}
|
|
% python spf.py wayforward.net
|
|
|
|
To test this script (and to output this usage message):
|
|
% python spf.py
|
|
"""
|
|
|
|
import re
|
|
import socket # for inet_ntoa() and inet_aton()
|
|
import struct # for pack() and unpack()
|
|
import time # for time()
|
|
|
|
import DNS # http://pydns.sourceforge.net
|
|
import xml.sax
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Convert a MS Caller-ID entry (XML) to a SPF entry
|
|
#
|
|
# (c) 2004 by Ernesto Baschny
|
|
# (c) 2004 Python version by Stuart Gathman
|
|
#
|
|
# Date: 2004-02-25
|
|
#
|
|
# A complete reverse translation (SPF -> CID) might be impossible, since
|
|
# there are no ways to handle:
|
|
# - PTR and EXISTS mechanism
|
|
# - MX mechanism with an different domain as argument
|
|
# - macros
|
|
#
|
|
# References:
|
|
# http://www.microsoft.com/mscorp/twc/privacy/spam_callerid.mspx
|
|
# http://spf.pobox.com/
|
|
#
|
|
# Known bugs:
|
|
# - Currently it won't handle the exclusions provided in the A and R
|
|
# tags (prefix '!'). They will show up "as-is" in the SPF record
|
|
# - I really haven't read the MS-CID specs in-depth, so there are probably
|
|
# other bugs too :)
|
|
#
|
|
# Ernesto Baschny <ernst@baschny.de>
|
|
#
|
|
|
|
class CIDParser(xml.sax.ContentHandler):
|
|
"Convert a MS Caller-ID entry (XML) to a SPF entry."
|
|
|
|
def __init__(self,q=None):
|
|
self.spf = []
|
|
self.action = '-all'
|
|
self.has_servers = None
|
|
self.spf_entry = None
|
|
if q:
|
|
self.spf_query = q
|
|
else:
|
|
self.spf_query = query(i='127.0.0.1', s='localhost', h='unknown')
|
|
|
|
def startElement(self,tag,attr):
|
|
if tag == 'm':
|
|
if self.has_servers != None and not self.has_servers:
|
|
raise ValueError(
|
|
"Declared <noMailServers\> and later <m>, this CID entry is not valid."
|
|
)
|
|
self.has_servers = True
|
|
elif tag == 'noMailServers':
|
|
if self.has_servers:
|
|
raise ValueError(
|
|
"Declared <m> and later <noMailServers\>, this CID entry is not valid."
|
|
)
|
|
self.has_servers = False
|
|
elif tag == 'ep':
|
|
if attr.has_key('testing') and attr.getValue('testing') == 'true':
|
|
# A CID with 'testing' found:
|
|
# From the MS-specs:
|
|
# "Documents in which such attribute is present with a true
|
|
# value SHOULD be entirely ignored (one should act as if the
|
|
# document were absent)"
|
|
# From the SPF-specs:
|
|
# "Neutral (?): The SPF client MUST proceed as if a domain did
|
|
# not publish SPF data."
|
|
# So we set SPF action to "neutral":
|
|
self.action = '?all'
|
|
elif tag == 'mx':
|
|
# The empty MX-tag, same as SPF's MX-mechanism
|
|
self.spf.append('mx')
|
|
self.tag = tag
|
|
|
|
def characters(self,text):
|
|
tag = self.tag
|
|
# Remove starting and trailing spaces from text:
|
|
text = text.strip()
|
|
|
|
if tag == 'a' or tag == 'r':
|
|
# The A and R tags from MS-CID are both handled by the
|
|
# ipv4/6-mechanisms from SPF:
|
|
if text.find(':') < 0:
|
|
mechanism = 'ip4'
|
|
else:
|
|
mechanism = 'ip6'
|
|
self.spf.append(mechanism + ':' + text)
|
|
elif tag == 'indirect':
|
|
# MS-CID's indirect is "sort of" the include from SPF:
|
|
# Not really true, because the <indirect> tag from MS-CID also
|
|
# provides a fallback in case the included domain doesn't provide
|
|
# _ep-records: The inbound MX-servers of the included domains
|
|
# are added to the list of allowed outgoing mailservers for the
|
|
# domain that declared the _ep-record with the <indirect> tag.
|
|
# In SPF you would use the 'mx:domain' to handle this, but this
|
|
# wouldn't depend on referred domain having or not SPF-records.
|
|
cid_xml = self.cid_txt(text)
|
|
if cid_xml:
|
|
p = CIDParser()
|
|
xml.sax.parseString(cid_xml,p)
|
|
if p.has_servers != False:
|
|
self.spf += p.spf
|
|
else:
|
|
self.spf.append('mx:' + text)
|
|
|
|
def cid_txt(self,domain):
|
|
q = self.spf_query
|
|
domain='_ep.' + domain
|
|
a = q.dns_txt(domain)
|
|
if not a: return None
|
|
if a[0].lower().startswith('<ep ') and a[-1].lower().endswith('</ep>'):
|
|
return ''.join(a)
|
|
return None
|
|
|
|
def endElement(self,tag):
|
|
if tag == 'ep':
|
|
# This is the end... assemble what we've got
|
|
spf_entry = ['v=spf1']
|
|
if self.has_servers != False:
|
|
spf_entry += self.spf
|
|
spf_entry.append(self.action)
|
|
self.spf_entry = ' '.join(spf_entry)
|
|
|
|
def spf_txt(self,cid_xml):
|
|
if not cid_xml.startswith('<'):
|
|
cid_xml = self.cid_txt(cid_xml)
|
|
if not cid_xml: return None
|
|
# Parse the beast. Any XML-problem will be reported by xlm.sax
|
|
self.spf_entry = None
|
|
xml.sax.parseString(cid_xml,self)
|
|
return self.spf_entry
|
|
|
|
# 32-bit IPv4 address mask
|
|
MASK = 0xFFFFFFFFL
|
|
|
|
# Regular expression to look for modifiers
|
|
RE_MODIFIER = re.compile(r'^([a-zA-Z]+)=')
|
|
|
|
# Regular expression to find macro expansions
|
|
RE_CHAR = re.compile(r'%(%|_|-|(\{[a-zA-Z][0-9]*r?[^\}]*\}))')
|
|
|
|
# Regular expression to break up a macro expansion
|
|
RE_ARGS = re.compile(r'([0-9]*)(r?)([^0-9a-zA-Z]*)')
|
|
|
|
# Local parts and senders have their delimiters replaced with '.' during
|
|
# macro expansion
|
|
#
|
|
JOINERS = {'l': '.', 's': '.'}
|
|
|
|
RESULTS = {'+': 'pass', '-': 'fail', '?': 'neutral', '~': 'softfail',
|
|
'pass': 'pass', 'fail': 'fail', 'unknown': 'unknown',
|
|
'neutral': 'neutral', 'softfail': 'softfail',
|
|
'none': 'none', 'deny': 'fail' }
|
|
|
|
EXPLANATIONS = {'pass': 'sender SPF verified', 'fail': 'access denied',
|
|
'unknown': 'SPF unknown',
|
|
'softfail': 'domain in transition',
|
|
'neutral': 'access neither permitted nor denied',
|
|
'none': ''
|
|
}
|
|
|
|
# if set to a domain name, search _spf.domain namespace if no SPF record
|
|
# found in source domain.
|
|
|
|
DELEGATE = None
|
|
|
|
# support pre 2.2.1....
|
|
try:
|
|
bool, True, False = bool, True, False
|
|
except NameError:
|
|
False, True = 0, 1
|
|
def bool(x): return not not x
|
|
# ...pre 2.2.1
|
|
|
|
# standard default SPF record
|
|
DEFAULT_SPF = 'v=spf1 a/24 mx/24 ptr'
|
|
|
|
# maximum DNS lookups allowed
|
|
MAX_LOOKUP = 100
|
|
MAX_RECURSION = 20
|
|
|
|
class TempError(Exception):
|
|
"Temporary SPF error"
|
|
|
|
class PermError(Exception):
|
|
"Permanent SPF error"
|
|
def __init__(self,msg,mech=None):
|
|
Exception.__init__(self,msg,mech)
|
|
self.msg = msg
|
|
self.mech = mech
|
|
def __str__(self):
|
|
if self.mech:
|
|
return '%s: %s'%(self.msg,self.mech)
|
|
return self.msg
|
|
|
|
def check(i, s, h,local=None,receiver=None):
|
|
"""Test an incoming MAIL FROM:<s>, from a client with ip address i.
|
|
h is the HELO/EHLO domain name.
|
|
|
|
Returns (result, mta-status-code, explanation) where result in
|
|
['pass', 'unknown', 'fail', 'error', 'softfail', 'none', 'neutral' ].
|
|
|
|
Example:
|
|
>>> check(i='127.0.0.1', s='terry@wayforward.net', h='localhost')
|
|
('pass', 250, 'local connections always pass')
|
|
|
|
#>>> check(i='61.51.192.42', s='liukebing@bcc.com', h='bmsi.com')
|
|
|
|
"""
|
|
return query(i=i, s=s, h=h,local=local,receiver=receiver).check()
|
|
|
|
class query(object):
|
|
"""A query object keeps the relevant information about a single SPF
|
|
query:
|
|
|
|
i: ip address of SMTP client
|
|
s: sender declared in MAIL FROM:<>
|
|
l: local part of sender s
|
|
d: current domain, initially domain part of sender s
|
|
h: EHLO/HELO domain
|
|
v: 'in-addr' for IPv4 clients and 'ip6' for IPv6 clients
|
|
t: current timestamp
|
|
p: SMTP client domain name
|
|
o: domain part of sender s
|
|
|
|
This is also, by design, the same variables used in SPF macro
|
|
expansion.
|
|
|
|
Also keeps cache: DNS cache.
|
|
"""
|
|
def __init__(self, i, s, h,local=None,receiver=None):
|
|
self.i, self.s, self.h = i, s, h
|
|
if not s and h:
|
|
self.s = 'postmaster@' + h
|
|
self.l, self.o = split_email(s, h)
|
|
self.t = str(int(time.time()))
|
|
self.v = 'in-addr'
|
|
self.d = self.o
|
|
self.p = None
|
|
if receiver:
|
|
self.r = receiver
|
|
self.cache = {}
|
|
self.exps = dict(EXPLANATIONS)
|
|
self.local = local # local policy
|
|
self.lookups = 0
|
|
|
|
def set_default_explanation(self,exp):
|
|
exps = self.exps
|
|
for i in 'softfail','fail','unknown':
|
|
exps[i] = exp
|
|
|
|
def getp(self):
|
|
if not self.p:
|
|
p = self.dns_ptr(self.i)
|
|
if len(p) > 0:
|
|
self.p = p[0]
|
|
else:
|
|
self.p = self.i
|
|
return self.p
|
|
|
|
def best_guess(self,spf=DEFAULT_SPF):
|
|
"""Return a best guess based on a default SPF record"""
|
|
return self.check(spf)
|
|
|
|
def check(self, spf=None):
|
|
"""
|
|
Returns (result, mta-status-code, explanation) where
|
|
result in ['fail', 'softfail', 'neutral' 'unknown', 'pass', 'error']
|
|
"""
|
|
if self.i.startswith('127.'):
|
|
return ('pass', 250, 'local connections always pass')
|
|
|
|
try:
|
|
self.lookups = 0
|
|
if not spf:
|
|
spf = self.dns_spf(self.d)
|
|
if self.local and spf:
|
|
spf += ' ' + self.local
|
|
return self.check1(spf, self.d, 0)
|
|
except DNS.DNSError,x:
|
|
return ('error', 450, 'SPF DNS Error: ' + str(x))
|
|
except TempError,x:
|
|
return ('error', 450, 'SPF Temporary Error: ' + str(x))
|
|
except PermError,x:
|
|
# Pre-Lentczner draft treats this as an unknown result
|
|
# and equivalent to no SPF record.
|
|
self.prob = x.msg
|
|
self.mech.append(x.mech)
|
|
return ('unknown', 550, 'SPF Permanent Error: ' + str(x))
|
|
|
|
def check1(self, spf, domain, recursion):
|
|
# spf rfc: 3.7 Processing Limits
|
|
#
|
|
if recursion > MAX_RECURSION:
|
|
self.prob = 'Too many levels of recursion'
|
|
return ('unknown', 250, 'SPF recursion limit exceeded')
|
|
try:
|
|
tmp, self.d = self.d, domain
|
|
return self.check0(spf,recursion)
|
|
finally:
|
|
self.d = tmp
|
|
|
|
def check0(self, spf,recursion):
|
|
"""Test this query information against SPF text.
|
|
|
|
Returns (result, mta-status-code, explanation) where
|
|
result in ['fail', 'unknown', 'pass', 'none']
|
|
"""
|
|
|
|
if not spf:
|
|
return ('none', 250, EXPLANATIONS['none'])
|
|
|
|
# split string by whitespace, drop the 'v=spf1'
|
|
#
|
|
spf = spf.split()[1:]
|
|
|
|
# copy of explanations to be modified by exp=
|
|
exps = self.exps
|
|
redirect = None
|
|
|
|
# no mechanisms at all cause unknown result, unless
|
|
# overridden with 'default=' modifier
|
|
#
|
|
default = 'neutral'
|
|
self.mech = [] # unknown mechanisms
|
|
|
|
# Look for modifiers
|
|
#
|
|
for m in spf:
|
|
m = RE_MODIFIER.split(m)[1:]
|
|
if len(m) != 2: continue
|
|
|
|
if m[0] == 'exp':
|
|
exps['fail'] = exps['unknown'] = \
|
|
self.get_explanation(m[1])
|
|
elif m[0] == 'redirect':
|
|
redirect = self.expand(m[1])
|
|
elif m[0] == 'default':
|
|
# default=- is the same as default=fail
|
|
default = RESULTS.get(m[1], default)
|
|
|
|
# spf rfc: 3.6 Unrecognized Mechanisms and Modifiers
|
|
|
|
# Look for mechanisms
|
|
#
|
|
for mech in spf:
|
|
if RE_MODIFIER.match(mech): continue
|
|
m, arg, cidrlength = parse_mechanism(mech, self.d)
|
|
|
|
# map '?' '+' or '-' to 'unknown' 'pass' or 'fail'
|
|
if m:
|
|
result = RESULTS.get(m[0])
|
|
if result:
|
|
# eat '?' '+' or '-'
|
|
m = m[1:]
|
|
else:
|
|
# default pass
|
|
result = 'pass'
|
|
|
|
if m in ['a', 'mx', 'ptr', 'prt', 'exists', 'include']:
|
|
arg = self.expand(arg)
|
|
|
|
if m == 'include':
|
|
if arg != self.d:
|
|
res,code,txt = self.check1(self.dns_spf(arg),
|
|
arg, recursion + 1)
|
|
if res == 'pass':
|
|
break
|
|
if res == 'none':
|
|
raise PermError(
|
|
'No valid SPF record for included domain: %s'%arg,
|
|
mech)
|
|
continue
|
|
else:
|
|
raise PermError('include mechanism missing domain',mech)
|
|
elif m == 'all':
|
|
break
|
|
|
|
elif m == 'exists':
|
|
if len(self.dns_a(arg)) > 0:
|
|
break
|
|
|
|
elif m == 'a':
|
|
if cidrmatch(self.i, self.dns_a(arg),
|
|
cidrlength):
|
|
break
|
|
|
|
elif m == 'mx':
|
|
if cidrmatch(self.i, self.dns_mx(arg),
|
|
cidrlength):
|
|
break
|
|
|
|
elif m in ('ip4', 'ipv4', 'ip') and arg != self.d:
|
|
try:
|
|
if cidrmatch(self.i, [arg], cidrlength):
|
|
break
|
|
except socket.error:
|
|
raise PermError('syntax error',mech)
|
|
|
|
elif m in ('ip6', 'ipv6'):
|
|
# Until we support IPV6, we should never
|
|
# get an IPv6 connection. So this mech
|
|
# will never match.
|
|
pass
|
|
|
|
elif m in ('ptr', 'prt'):
|
|
if domainmatch(self.validated_ptrs(self.i),
|
|
arg):
|
|
break
|
|
|
|
else:
|
|
# unknown mechanisms cause immediate unknown
|
|
# abort results
|
|
raise PermError('Unknown mechanism found',mech)
|
|
else:
|
|
# no matches
|
|
if redirect:
|
|
return self.check1(self.dns_spf(redirect),
|
|
redirect, recursion + 1)
|
|
else:
|
|
result = default
|
|
|
|
if result == 'fail':
|
|
return (result, 550, exps[result])
|
|
else:
|
|
return (result, 250, exps[result])
|
|
|
|
def get_explanation(self, spec):
|
|
"""Expand an explanation."""
|
|
if spec:
|
|
return self.expand(''.join(self.dns_txt(self.expand(spec))))
|
|
else:
|
|
return 'explanation : Required option is missing'
|
|
|
|
def expand(self, str):
|
|
"""Do SPF RFC macro expansion.
|
|
|
|
Examples:
|
|
>>> q = query(s='strong-bad@email.example.com',
|
|
... h='mx.example.org', i='192.0.2.3')
|
|
>>> q.p = 'mx.example.org'
|
|
|
|
>>> q.expand('%{d}')
|
|
'email.example.com'
|
|
|
|
>>> q.expand('%{d4}')
|
|
'email.example.com'
|
|
|
|
>>> q.expand('%{d3}')
|
|
'email.example.com'
|
|
|
|
>>> q.expand('%{d2}')
|
|
'example.com'
|
|
|
|
>>> q.expand('%{d1}')
|
|
'com'
|
|
|
|
>>> q.expand('%{p}')
|
|
'mx.example.org'
|
|
|
|
>>> q.expand('%{p2}')
|
|
'example.org'
|
|
|
|
>>> q.expand('%{dr}')
|
|
'com.example.email'
|
|
|
|
>>> q.expand('%{d2r}')
|
|
'example.email'
|
|
|
|
>>> q.expand('%{l}')
|
|
'strong-bad'
|
|
|
|
>>> q.expand('%{l-}')
|
|
'strong.bad'
|
|
|
|
>>> q.expand('%{lr}')
|
|
'strong-bad'
|
|
|
|
>>> q.expand('%{lr-}')
|
|
'bad.strong'
|
|
|
|
>>> q.expand('%{l1r-}')
|
|
'strong'
|
|
|
|
>>> q.expand('%{ir}.%{v}._spf.%{d2}')
|
|
'3.2.0.192.in-addr._spf.example.com'
|
|
|
|
>>> q.expand('%{lr-}.lp._spf.%{d2}')
|
|
'bad.strong.lp._spf.example.com'
|
|
|
|
>>> q.expand('%{lr-}.lp.%{ir}.%{v}._spf.%{d2}')
|
|
'bad.strong.lp.3.2.0.192.in-addr._spf.example.com'
|
|
|
|
>>> q.expand('%{ir}.%{v}.%{l1r-}.lp._spf.%{d2}')
|
|
'3.2.0.192.in-addr.strong.lp._spf.example.com'
|
|
|
|
>>> q.expand('%{p2}.trusted-domains.example.net')
|
|
'example.org.trusted-domains.example.net'
|
|
|
|
>>> q.expand('%{p2}.trusted-domains.example.net')
|
|
'example.org.trusted-domains.example.net'
|
|
|
|
"""
|
|
end = 0
|
|
result = ''
|
|
for i in RE_CHAR.finditer(str):
|
|
result += str[end:i.start()]
|
|
macro = str[i.start():i.end()]
|
|
if macro == '%%':
|
|
result += '%'
|
|
elif macro == '%_':
|
|
result += ' '
|
|
elif macro == '%-':
|
|
result += '%20'
|
|
else:
|
|
letter = macro[2].lower()
|
|
if letter == 'p':
|
|
self.getp()
|
|
expansion = getattr(self, letter, '')
|
|
if expansion:
|
|
result += expand_one(expansion,
|
|
macro[3:-1],
|
|
JOINERS.get(letter))
|
|
|
|
end = i.end()
|
|
return result + str[end:]
|
|
|
|
def dns_spf(self, domain):
|
|
"""Get the SPF record recorded in DNS for a specific domain
|
|
name. Returns None if not found, or if more than one record
|
|
is found.
|
|
"""
|
|
a = [t for t in self.dns_txt(domain) if t.startswith('v=spf1')]
|
|
if not a:
|
|
if DELEGATE:
|
|
a = [t
|
|
for t in self.dns_txt(domain+'._spf.'+DELEGATE)
|
|
if t.startswith('v=spf1')
|
|
]
|
|
if not a:
|
|
# No SPF record: convert and return CID if present
|
|
p = CIDParser(q=self)
|
|
try:
|
|
return p.spf_txt(domain)
|
|
except xml.sax._exceptions.SAXParseException,x:
|
|
raise PermError("Caller-ID parse error",domain)
|
|
|
|
if len(a) == 1:
|
|
return a[0]
|
|
else:
|
|
return None
|
|
|
|
def dns_txt(self, domainname):
|
|
"Get a list of TXT records for a domain name."
|
|
if domainname:
|
|
return [''.join(a) for a in self.dns(domainname, 'TXT')]
|
|
return []
|
|
|
|
def dns_mx(self, domainname):
|
|
"""Get a list of IP addresses for all MX exchanges for a
|
|
domain name.
|
|
"""
|
|
return [a for mx in self.dns(domainname, 'MX') \
|
|
for a in self.dns_a(mx[1])]
|
|
|
|
def dns_a(self, domainname):
|
|
"""Get a list of IP addresses for a domainname."""
|
|
return self.dns(domainname, 'A')
|
|
|
|
def dns_aaaa(self, domainname):
|
|
"""Get a list of IPv6 addresses for a domainname."""
|
|
return self.dns(domainname, 'AAAA')
|
|
|
|
def validated_ptrs(self, i):
|
|
"""Figure out the validated PTR domain names for a given IP
|
|
address.
|
|
"""
|
|
return [p for p in self.dns_ptr(i) if i in self.dns_a(p)]
|
|
|
|
def dns_ptr(self, i):
|
|
"""Get a list of domain names for an IP address."""
|
|
return self.dns(reverse_dots(i) + ".in-addr.arpa", 'PTR')
|
|
|
|
def dns(self, name, qtype):
|
|
"""DNS query.
|
|
|
|
If the result is in cache, return that. Otherwise pull the
|
|
result from DNS, and cache ALL answers, so additional info
|
|
is available for further queries later.
|
|
|
|
CNAMEs are followed.
|
|
|
|
If there is no data, [] is returned.
|
|
|
|
pre: qtype in ['A', 'AAAA', 'MX', 'PTR', 'TXT', 'SPF']
|
|
post: isinstance(__return__, types.ListType)
|
|
"""
|
|
self.lookups += 1
|
|
if self.lookups > MAX_LOOKUP:
|
|
raise PermError('Too many DNS lookups')
|
|
result = self.cache.get( (name, qtype) )
|
|
cname = None
|
|
if not result:
|
|
req = DNS.DnsRequest(name, qtype=qtype)
|
|
resp = req.req()
|
|
for a in resp.answers:
|
|
# key k: ('wayforward.net', 'A'), value v
|
|
k, v = (a['name'], a['typename']), a['data']
|
|
if k == (name, 'CNAME'):
|
|
cname = v
|
|
self.cache.setdefault(k, []).append(v)
|
|
result = self.cache.get( (name, qtype), [])
|
|
if not result and cname:
|
|
result = self.dns(cname, qtype)
|
|
return result
|
|
|
|
def get_header(self,res,receiver):
|
|
if res in ('pass','fail','softfail'):
|
|
return '%s (%s: %s) client-ip=%s; envelope-from=%s; helo=%s;' % (
|
|
res,receiver,self.get_header_comment(res),self.i,
|
|
self.l + '@' + self.o, self.h)
|
|
if res == 'unknown':
|
|
return '%s (%s: %s)' % (' '.join([res] + self.mech),
|
|
receiver,self.get_header_comment(res))
|
|
return '%s (%s: %s)' % (res,receiver,self.get_header_comment(res))
|
|
|
|
def get_header_comment(self,res):
|
|
"""Return comment for Received-SPF header.
|
|
"""
|
|
sender = self.o
|
|
if res == 'pass':
|
|
if self.i.startswith('127.'):
|
|
return "localhost is always allowed."
|
|
else: return \
|
|
"domain of %s designates %s as permitted sender" \
|
|
% (sender,self.i)
|
|
elif res == 'softfail': return \
|
|
"transitioning domain of %s does not designate %s as permitted sender" \
|
|
% (sender,self.i)
|
|
elif res == 'neutral': return \
|
|
"%s is neither permitted nor denied by domain of %s" \
|
|
% (self.i,sender)
|
|
elif res == 'none': return \
|
|
"%s is neither permitted nor denied by domain of %s" \
|
|
% (self.i,sender)
|
|
#"%s does not designate permitted sender hosts" % sender
|
|
elif res == 'unknown': return \
|
|
"error in processing during lookup of domain of %s: %s" \
|
|
% (sender, self.prob)
|
|
elif res == 'error': return \
|
|
"error in processing during lookup of %s" % sender
|
|
elif res == 'fail': return \
|
|
"domain of %s does not designate %s as permitted sender" \
|
|
% (sender,self.i)
|
|
raise ValueError("invalid SPF result for header comment: "+res)
|
|
|
|
def split_email(s, h):
|
|
"""Given a sender email s and a HELO domain h, create a valid tuple
|
|
(l, d) local-part and domain-part.
|
|
|
|
Examples:
|
|
>>> split_email('', 'wayforward.net')
|
|
('postmaster', 'wayforward.net')
|
|
|
|
>>> split_email('foo.com', 'wayforward.net')
|
|
('postmaster', 'foo.com')
|
|
|
|
>>> split_email('terry@wayforward.net', 'optsw.com')
|
|
('terry', 'wayforward.net')
|
|
"""
|
|
if not s:
|
|
return 'postmaster', h
|
|
else:
|
|
parts = s.split('@', 1)
|
|
if len(parts) == 2:
|
|
return tuple(parts)
|
|
else:
|
|
return 'postmaster', s
|
|
|
|
def parse_mechanism(str, d):
|
|
"""Breaks A, MX, IP4, and PTR mechanisms into a (name, domain,
|
|
cidr) tuple. The domain portion defaults to d if not present,
|
|
the cidr defaults to 32 if not present.
|
|
|
|
Examples:
|
|
>>> parse_mechanism('a', 'foo.com')
|
|
('a', 'foo.com', 32)
|
|
|
|
>>> parse_mechanism('a:bar.com', 'foo.com')
|
|
('a', 'bar.com', 32)
|
|
|
|
>>> parse_mechanism('a/24', 'foo.com')
|
|
('a', 'foo.com', 24)
|
|
|
|
>>> parse_mechanism('a:bar.com/16', 'foo.com')
|
|
('a', 'bar.com', 16)
|
|
"""
|
|
a = str.split('/')
|
|
if len(a) == 2:
|
|
a, port = a[0], int(a[1])
|
|
else:
|
|
a, port = str, 32
|
|
|
|
b = a.split(':')
|
|
if len(b) == 2:
|
|
return b[0], b[1], port
|
|
else:
|
|
return a, d, port
|
|
|
|
def reverse_dots(name):
|
|
"""Reverse dotted IP addresses or domain names.
|
|
|
|
Example:
|
|
>>> reverse_dots('192.168.0.145')
|
|
'145.0.168.192'
|
|
|
|
>>> reverse_dots('email.example.com')
|
|
'com.example.email'
|
|
"""
|
|
a = name.split('.')
|
|
a.reverse()
|
|
return '.'.join(a)
|
|
|
|
def domainmatch(ptrs, domainsuffix):
|
|
"""grep for a given domain suffix against a list of validated PTR
|
|
domain names.
|
|
|
|
Examples:
|
|
>>> domainmatch(['FOO.COM'], 'foo.com')
|
|
1
|
|
|
|
>>> domainmatch(['moo.foo.com'], 'FOO.COM')
|
|
1
|
|
|
|
>>> domainmatch(['moo.bar.com'], 'foo.com')
|
|
0
|
|
|
|
"""
|
|
domainsuffix = domainsuffix.lower()
|
|
for ptr in ptrs:
|
|
ptr = ptr.lower()
|
|
|
|
if ptr == domainsuffix or ptr.endswith('.' + domainsuffix):
|
|
return True
|
|
|
|
return False
|
|
|
|
def cidrmatch(i, ipaddrs, cidr_length = 32):
|
|
"""Match an IP address against a list of other IP addresses.
|
|
|
|
Examples:
|
|
>>> cidrmatch('192.168.0.45', ['192.168.0.44', '192.168.0.45'])
|
|
1
|
|
|
|
>>> cidrmatch('192.168.0.43', ['192.168.0.44', '192.168.0.45'])
|
|
0
|
|
|
|
>>> cidrmatch('192.168.0.43', ['192.168.0.44', '192.168.0.45'], 24)
|
|
1
|
|
"""
|
|
c = cidr(i, cidr_length)
|
|
for ip in ipaddrs:
|
|
if cidr(ip, cidr_length) == c:
|
|
return True
|
|
return False
|
|
|
|
def cidr(i, n):
|
|
"""Convert an IP address string with a CIDR mask into a 32-bit
|
|
integer.
|
|
|
|
i must be a string of numbers 0..255 separated by dots '.'::
|
|
pre: forall([0 <= int(p) < 256 for p in i.split('.')])
|
|
|
|
n is a number of bits to mask::
|
|
pre: 0 <= n <= 32
|
|
|
|
Examples:
|
|
>>> bin2addr(cidr('192.168.5.45', 32))
|
|
'192.168.5.45'
|
|
>>> bin2addr(cidr('192.168.5.45', 24))
|
|
'192.168.5.0'
|
|
>>> bin2addr(cidr('192.168.0.45', 8))
|
|
'192.0.0.0'
|
|
"""
|
|
return ~(MASK >> n) & MASK & addr2bin(i)
|
|
|
|
def addr2bin(str):
|
|
"""Convert a string IPv4 address into an unsigned integer.
|
|
|
|
Examples::
|
|
>>> addr2bin('127.0.0.1')
|
|
2130706433L
|
|
|
|
>>> addr2bin('127.0.0.1') == socket.INADDR_LOOPBACK
|
|
1
|
|
|
|
>>> addr2bin('255.255.255.254')
|
|
4294967294L
|
|
|
|
>>> addr2bin('192.168.0.1')
|
|
3232235521L
|
|
|
|
Unlike DNS.addr2bin, the n, n.n, and n.n.n forms for IP addresses
|
|
are handled as well::
|
|
>>> addr2bin('10.65536')
|
|
167837696L
|
|
>>> 10 * (2 ** 24) + 65536
|
|
167837696
|
|
|
|
>>> addr2bin('10.93.512')
|
|
173867520L
|
|
>>> 10 * (2 ** 24) + 93 * (2 ** 16) + 512
|
|
173867520
|
|
"""
|
|
return struct.unpack("!L", socket.inet_aton(str))[0]
|
|
|
|
def bin2addr(addr):
|
|
"""Convert a numeric IPv4 address into string n.n.n.n form.
|
|
|
|
Examples::
|
|
>>> bin2addr(socket.INADDR_LOOPBACK)
|
|
'127.0.0.1'
|
|
|
|
>>> bin2addr(socket.INADDR_ANY)
|
|
'0.0.0.0'
|
|
|
|
>>> bin2addr(socket.INADDR_NONE)
|
|
'255.255.255.255'
|
|
"""
|
|
return socket.inet_ntoa(struct.pack("!L", addr))
|
|
|
|
def expand_one(expansion, str, joiner):
|
|
if not str:
|
|
return expansion
|
|
len, reverse, delimiters = RE_ARGS.split(str)[1:4]
|
|
if not delimiters:
|
|
delimiters = '.'
|
|
expansion = split(expansion, delimiters, joiner)
|
|
if reverse: expansion.reverse()
|
|
if len: expansion = expansion[-int(len)*2+1:]
|
|
return ''.join(expansion)
|
|
|
|
def split(str, delimiters, joiner=None):
|
|
"""Split a string into pieces by a set of delimiter characters. The
|
|
resulting list is delimited by joiner, or the original delimiter if
|
|
joiner is not specified.
|
|
|
|
Examples:
|
|
>>> split('192.168.0.45', '.')
|
|
['192', '.', '168', '.', '0', '.', '45']
|
|
|
|
>>> split('terry@wayforward.net', '@.')
|
|
['terry', '@', 'wayforward', '.', 'net']
|
|
|
|
>>> split('terry@wayforward.net', '@.', '.')
|
|
['terry', '.', 'wayforward', '.', 'net']
|
|
"""
|
|
result, element = [], ''
|
|
for c in str:
|
|
if c in delimiters:
|
|
result.append(element)
|
|
element = ''
|
|
if joiner:
|
|
result.append(joiner)
|
|
else:
|
|
result.append(c)
|
|
else:
|
|
element += c
|
|
result.append(element)
|
|
return result
|
|
|
|
def _test():
|
|
import doctest, spf
|
|
return doctest.testmod(spf)
|
|
|
|
DNS.DiscoverNameServers() # Fails on Mac OS X? Add domain to /etc/resolv.conf
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
if len(sys.argv) == 1:
|
|
print USAGE
|
|
_test()
|
|
elif len(sys.argv) == 2:
|
|
q = query(i='127.0.0.1', s='localhost', h='unknown',
|
|
receiver=socket.gethostname())
|
|
print q.dns_spf(sys.argv[1])
|
|
elif len(sys.argv) == 4:
|
|
print check(i=sys.argv[1], s=sys.argv[2], h=sys.argv[3],
|
|
receiver=socket.gethostname())
|
|
elif len(sys.argv) == 5:
|
|
i, s, h = sys.argv[2:]
|
|
q = query(i=i, s=s, h=h, receiver=socket.gethostname())
|
|
print q.check(sys.argv[1])
|
|
else:
|
|
print USAGE
|