Add function _run_xmlsec, thus avoiding repeating code.
This commit is contained in:
@@ -584,10 +584,6 @@ def verify_signature(enctext, xmlsec_binary, cert_file=None, cert_type="pem",
|
||||
if node_id:
|
||||
com_list.extend(["--node-id", node_id])
|
||||
|
||||
com_list.append(fil)
|
||||
|
||||
logger.debug("com_list: %s" % com_list)
|
||||
|
||||
if __DEBUG:
|
||||
try:
|
||||
print " ".join(com_list)
|
||||
@@ -600,18 +596,8 @@ def verify_signature(enctext, xmlsec_binary, cert_file=None, cert_type="pem",
|
||||
print "%s: %s" % (cert_file, os.access(cert_file, os.F_OK))
|
||||
print "%s: %s" % (fil, os.access(fil, os.F_OK))
|
||||
|
||||
pof = Popen(com_list, stderr=PIPE, stdout=PIPE)
|
||||
p_out = pof.stdout.read()
|
||||
try:
|
||||
p_err = pof.stderr.read()
|
||||
if __DEBUG:
|
||||
print p_err
|
||||
verified = parse_xmlsec_output(p_err)
|
||||
except XmlsecError, exc:
|
||||
logger.error(LOG_LINE % (p_out, exc))
|
||||
raise SignatureError("%s" % (exc,))
|
||||
|
||||
return verified
|
||||
(_stdout, stderr, _output) = _run_xmlsec(com_list, [fil], exception=SignatureError)
|
||||
return parse_xmlsec_output(stderr)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -652,6 +638,36 @@ def read_cert_from_file(cert_file, cert_type):
|
||||
return base64.b64encode(str(data))
|
||||
|
||||
|
||||
def _run_xmlsec(com_list, extra_args, validate_output=True, exception=XmlsecError):
|
||||
"""
|
||||
Common code to invoke xmlsec and parse the output.
|
||||
:param com_list: Key-value parameter list for xmlsec
|
||||
:param extra_args: Positional parameters to be appended after all key-value parameters
|
||||
:param validate_output: Parse and validate the output
|
||||
:param exception: The exception class to raise on errors
|
||||
:result: Whatever xmlsec wrote to an --output temporary file
|
||||
"""
|
||||
ntf = NamedTemporaryFile()
|
||||
com_list.append(["--output", ntf.name])
|
||||
com_list += extra_args
|
||||
|
||||
logger.debug("xmlsec command: %s" % " ".join(com_list))
|
||||
|
||||
pof = Popen(com_list, stderr=PIPE, stdout=PIPE)
|
||||
|
||||
p_out = pof.stdout.read()
|
||||
p_err = pof.stderr.read()
|
||||
try:
|
||||
if validate_output:
|
||||
parse_xmlsec_output(p_err)
|
||||
except XmlsecError, exc:
|
||||
logger.error(LOG_LINE_2 % (p_out, p_err, exc))
|
||||
raise exception("%s" % (exc,))
|
||||
|
||||
ntf.seek(0)
|
||||
return (p_out, p_err, ntf.read())
|
||||
|
||||
|
||||
def security_context(conf, debug=None):
|
||||
""" Creates a security context based on the configuration
|
||||
|
||||
@@ -725,28 +741,15 @@ class SecurityContext(object):
|
||||
if not template:
|
||||
template = self.template
|
||||
|
||||
logger.info("input len: %d" % len(text))
|
||||
logger.info("Encryption input len: %d" % len(text))
|
||||
_, fil = make_temp("%s" % text, decode=False)
|
||||
ntf = NamedTemporaryFile()
|
||||
|
||||
com_list = [self.xmlsec, "--encrypt", "--pubkey-pem", recv_key,
|
||||
"--session-key", key_type, "--xml-data", fil,
|
||||
"--output", ntf.name, template]
|
||||
]
|
||||
|
||||
logger.debug("Encryption command: %s" % " ".join(com_list))
|
||||
|
||||
pof = Popen(com_list, stderr=PIPE, stdout=PIPE)
|
||||
|
||||
p_err = pof.stderr.read()
|
||||
try:
|
||||
parse_xmlsec_output(p_err)
|
||||
except XmlsecError, exc:
|
||||
p_out = pof.stdout.read()
|
||||
logger.error(LOG_LINE_2 % (p_out, p_err, exc))
|
||||
raise DecryptError("%s" % (exc,))
|
||||
|
||||
ntf.seek(0)
|
||||
return ntf.read()
|
||||
(_stdout, _stderr, output) = _run_xmlsec(com_list, [template], exception=DecryptError)
|
||||
return output
|
||||
|
||||
def decrypt(self, enctext):
|
||||
""" Decrypting an encrypted text by the use of a private key.
|
||||
@@ -755,28 +758,15 @@ class SecurityContext(object):
|
||||
:return: The decrypted text
|
||||
"""
|
||||
|
||||
logger.info("input len: %d" % len(enctext))
|
||||
logger.info("Decrypt input len: %d" % len(enctext))
|
||||
_, fil = make_temp("%s" % enctext, decode=False)
|
||||
ntf = NamedTemporaryFile()
|
||||
|
||||
com_list = [self.xmlsec, "--decrypt", "--privkey-pem",
|
||||
self.key_file, "--output", ntf.name,
|
||||
"--id-attr:%s" % ID_ATTR, ENC_KEY_CLASS, fil]
|
||||
self.key_file, "--id-attr:%s" % ID_ATTR, ENC_KEY_CLASS,
|
||||
]
|
||||
|
||||
logger.debug("Decrypt command: %s" % " ".join(com_list))
|
||||
|
||||
pof = Popen(com_list, stderr=PIPE, stdout=PIPE)
|
||||
|
||||
p_err = pof.stderr.read()
|
||||
try:
|
||||
parse_xmlsec_output(p_err)
|
||||
except XmlsecError, exc:
|
||||
p_out = pof.stdout.read()
|
||||
logger.error(LOG_LINE_2 % (p_out, p_err, exc))
|
||||
raise DecryptError("%s" % (exc,))
|
||||
|
||||
ntf.seek(0)
|
||||
return ntf.read()
|
||||
(_stdout, _stderr, output) = _run_xmlsec(com_list, [fil], exception=DecryptError)
|
||||
return output
|
||||
|
||||
def verify_signature(self, enctext, cert_file=None, cert_type="pem",
|
||||
node_name=NODE_NAME, node_id=None, id_attr=""):
|
||||
@@ -1046,10 +1036,7 @@ class SecurityContext(object):
|
||||
|
||||
_, fil = make_temp("%s" % statement, decode=False)
|
||||
|
||||
ntf = NamedTemporaryFile()
|
||||
|
||||
com_list = [self.xmlsec, "--sign",
|
||||
"--output", ntf.name,
|
||||
"--privkey-pem", key_file,
|
||||
"--id-attr:%s" % id_attr, klass_namn
|
||||
#"--store-signatures"
|
||||
@@ -1057,24 +1044,15 @@ class SecurityContext(object):
|
||||
if nodeid:
|
||||
com_list.extend(["--node-id", nodeid])
|
||||
|
||||
com_list.append(fil)
|
||||
|
||||
pof = Popen(com_list, stderr=PIPE, stdout=PIPE)
|
||||
p_out = pof.stdout.read()
|
||||
p_err = pof.stderr.read()
|
||||
|
||||
# this doesn't work if --store-signatures are used
|
||||
if p_out == "":
|
||||
ntf.seek(0)
|
||||
signed_statement = ntf.read()
|
||||
if not signed_statement:
|
||||
print >> sys.stderr, p_err
|
||||
raise Exception("Signing failed")
|
||||
else:
|
||||
return signed_statement
|
||||
else:
|
||||
print >> sys.stderr, p_out
|
||||
print "E", p_err
|
||||
try:
|
||||
(stdout, stderr, signed_statement) = _run_xmlsec(com_list, [fil])
|
||||
# this doesn't work if --store-signatures are used
|
||||
if stdout == "":
|
||||
if signed_statement:
|
||||
return signed_statement
|
||||
logger.error("Signing operation failed :\nstdout : %s\nstderr : %s" % (stdout, stderr))
|
||||
raise Exception("Signing failed")
|
||||
except DecryptError, exc:
|
||||
raise Exception("Signing failed")
|
||||
|
||||
def sign_assertion_using_xmlsec(self, statement, key=None, key_file=None,
|
||||
@@ -1135,6 +1113,7 @@ class SecurityContext(object):
|
||||
id_attr=id_attr)
|
||||
return statement
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user