Don't log message content by default.
Right now, it is quite easy to end up logging the whole message in the log when verifying signatures. This can result in wasted resources writing the log to disk and increasing memory usage. This can also be a private data leak if logging is put in DEBUG in a production environment.
This commit is contained in:
+13
-6
@@ -112,12 +112,14 @@ CV_None = b'none'
|
||||
|
||||
|
||||
class HashThrough(object):
|
||||
def __init__(self, hasher):
|
||||
def __init__(self, hasher, debug=False):
|
||||
self.data = []
|
||||
self.hasher = hasher
|
||||
self.name = hasher.name
|
||||
self.debug = debug
|
||||
|
||||
def update(self, data):
|
||||
if self.debug:
|
||||
self.data.append(data)
|
||||
return self.hasher.update(data)
|
||||
|
||||
@@ -460,11 +462,12 @@ class DomainSigner(object):
|
||||
#: @param logger: a logger to which debug info will be written (default None)
|
||||
#: @param signature_algorithm: the signing algorithm to use when signing
|
||||
def __init__(self,message=None,logger=None,signature_algorithm=b'rsa-sha256',
|
||||
minkey=1024):
|
||||
minkey=1024, debug_content=False):
|
||||
self.set_message(message)
|
||||
if logger is None:
|
||||
logger = get_default_logger()
|
||||
self.logger = logger
|
||||
self.debug_content = debug_content and logger.isEnabledFor(logging.DEBUG)
|
||||
if signature_algorithm not in HASH_ALGORITHMS:
|
||||
raise ParameterError(
|
||||
"Unsupported signature algorithm: "+signature_algorithm)
|
||||
@@ -610,12 +613,13 @@ class DomainSigner(object):
|
||||
header_value = fold(header_value, namelen=len(header_name))
|
||||
header_value = RE_BTAG.sub(b'\\1',header_value)
|
||||
header = (header_name, b' ' + header_value)
|
||||
h = HashThrough(self.hasher())
|
||||
h = HashThrough(self.hasher(), self.debug_content)
|
||||
sig = dict(fields)
|
||||
|
||||
headers = canon_policy.canonicalize_headers(self.headers)
|
||||
self.signed_headers = hash_headers(
|
||||
h, canon_policy, headers, include_headers, header, sig)
|
||||
if self.debug_content:
|
||||
self.logger.debug("sign %s headers: %r" % (header_name, h.hashed()))
|
||||
|
||||
if self.signature_algorithm == b'rsa-sha256' or self.signature_algorithm == b'rsa-sha1':
|
||||
@@ -662,12 +666,13 @@ class DomainSigner(object):
|
||||
|
||||
# validate body if present
|
||||
if b'bh' in sig:
|
||||
h = HashThrough(hasher())
|
||||
h = HashThrough(hasher(), self.debug_content)
|
||||
|
||||
body = canon_policy.canonicalize_body(self.body)
|
||||
if b'l' in sig:
|
||||
body = body[:int(sig[b'l'])]
|
||||
h.update(body)
|
||||
if self.debug_content:
|
||||
self.logger.debug("body hashed: %r" % h.hashed())
|
||||
bodyhash = h.digest()
|
||||
|
||||
@@ -687,11 +692,12 @@ class DomainSigner(object):
|
||||
# generalized to check for extras of other singleton headers.
|
||||
if b'from' in include_headers:
|
||||
include_headers.append(b'from')
|
||||
h = HashThrough(hasher())
|
||||
h = HashThrough(hasher(), self.debug_content)
|
||||
|
||||
headers = canon_policy.canonicalize_headers(self.headers)
|
||||
self.signed_headers = hash_headers(
|
||||
h, canon_policy, headers, include_headers, sig_header, sig)
|
||||
if self.debug_content:
|
||||
self.logger.debug("signed for %s: %r" % (sig_header[0], h.hashed()))
|
||||
signature = base64.b64decode(re.sub(br"\s+", b"", sig[b'b']))
|
||||
if ktag == b'rsa':
|
||||
@@ -1008,8 +1014,9 @@ class ARC(DomainSigner):
|
||||
canon_policy = CanonicalizationPolicy.from_c_value(b'relaxed/relaxed')
|
||||
|
||||
self.hasher = HASH_ALGORITHMS[self.signature_algorithm]
|
||||
h = HashThrough(self.hasher())
|
||||
h = HashThrough(self.hasher(), self.debug_content)
|
||||
h.update(canon_policy.canonicalize_body(self.body))
|
||||
if self.debug_content:
|
||||
self.logger.debug("sign ams body hashed: %r" % h.hashed())
|
||||
bodyhash = base64.b64encode(h.digest())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user