Fix strings/bytes python3 issues in sigver

Various points require binary data to feed into xmlsec or strings to
make logical sense of things. This may introduce some requirement that
saml documents are utf-8, where previously binary data would be passed
through without problems. Without the proper encoding metadata passed
through that is not a simple problem to solve.
This commit is contained in:
Clint Byrum 2015-05-23 09:39:10 -07:00
parent 9c2b951be6
commit 9d901afed5
3 changed files with 30 additions and 19 deletions

View File

@ -83,6 +83,8 @@ def create_class_from_xml_string(target_class, xml_string):
the contents of the XML - or None if the root XML tag and namespace did the contents of the XML - or None if the root XML tag and namespace did
not match those of the target class. not match those of the target class.
""" """
if not isinstance(xml_string, six.binary_type):
xml_string = xml_string.encode('utf-8')
tree = ElementTree.fromstring(xml_string) tree = ElementTree.fromstring(xml_string)
return create_class_from_element_tree(target_class, tree) return create_class_from_element_tree(target_class, tree)

View File

@ -266,7 +266,7 @@ def _instance(klass, ava, seccont, base64encode=False, elements_to_sign=None):
#print("# %s" % (prop)) #print("# %s" % (prop))
if prop in ava: if prop in ava:
if isinstance(ava[prop], bool): if isinstance(ava[prop], bool):
setattr(instance, prop, "%s" % ava[prop]) setattr(instance, prop, str(ava[prop]).encode('utf-8'))
elif isinstance(ava[prop], int): elif isinstance(ava[prop], int):
setattr(instance, prop, "%d" % ava[prop]) setattr(instance, prop, "%d" % ava[prop])
else: else:
@ -313,7 +313,7 @@ def signed_instance_factory(instance, seccont, elements_to_sign=None):
:return: A class instance if not signed otherwise a string :return: A class instance if not signed otherwise a string
""" """
if elements_to_sign: if elements_to_sign:
signed_xml = "%s" % instance signed_xml = str(instance).encode('utf-8')
for (node_name, nodeid) in elements_to_sign: for (node_name, nodeid) in elements_to_sign:
signed_xml = seccont.sign_statement( signed_xml = seccont.sign_statement(
signed_xml, node_name=node_name, node_id=nodeid) signed_xml, node_name=node_name, node_id=nodeid)
@ -351,6 +351,7 @@ def make_temp(string, suffix="", decode=True, delete=True):
xmlsec function). xmlsec function).
""" """
ntf = NamedTemporaryFile(suffix=suffix, delete=delete) ntf = NamedTemporaryFile(suffix=suffix, delete=delete)
assert isinstance(string, six.binary_type)
if decode: if decode:
ntf.write(base64.b64decode(string)) ntf.write(base64.b64decode(string))
else: else:
@ -543,7 +544,7 @@ def extract_rsa_key_from_x509_cert(pem):
def pem_format(key): def pem_format(key):
return "\n".join(["-----BEGIN CERTIFICATE-----", return "\n".join(["-----BEGIN CERTIFICATE-----",
key, "-----END CERTIFICATE-----"]) key, "-----END CERTIFICATE-----"]).encode('ascii')
def import_rsa_key_from_file(filename): def import_rsa_key_from_file(filename):
@ -740,8 +741,9 @@ class CryptoBackendXmlSec1(CryptoBackend):
def version(self): def version(self):
com_list = [self.xmlsec, "--version"] com_list = [self.xmlsec, "--version"]
pof = Popen(com_list, stderr=PIPE, stdout=PIPE) pof = Popen(com_list, stderr=PIPE, stdout=PIPE)
content = pof.stdout.read().decode('ascii')
try: try:
return pof.stdout.read().split(" ")[1] return content.split(" ")[1]
except IndexError: except IndexError:
return "" return ""
@ -757,7 +759,7 @@ class CryptoBackendXmlSec1(CryptoBackend):
:return: :return:
""" """
logger.debug("Encryption input len: %d" % len(text)) logger.debug("Encryption input len: %d" % len(text))
_, fil = make_temp("%s" % text, decode=False) _, fil = make_temp(str(text).encode('utf-8'), decode=False)
com_list = [self.xmlsec, "--encrypt", "--pubkey-cert-pem", recv_key, com_list = [self.xmlsec, "--encrypt", "--pubkey-cert-pem", recv_key,
"--session-key", session_key_type, "--xml-data", fil] "--session-key", session_key_type, "--xml-data", fil]
@ -768,6 +770,8 @@ class CryptoBackendXmlSec1(CryptoBackend):
(_stdout, _stderr, output) = self._run_xmlsec(com_list, [template], (_stdout, _stderr, output) = self._run_xmlsec(com_list, [template],
exception=DecryptError, exception=DecryptError,
validate_output=False) validate_output=False)
if isinstance(output, six.binary_type):
output = output.decode('utf-8')
return output return output
def encrypt_assertion(self, statement, enc_key, template, def encrypt_assertion(self, statement, enc_key, template,
@ -785,8 +789,8 @@ class CryptoBackendXmlSec1(CryptoBackend):
if isinstance(statement, SamlBase): if isinstance(statement, SamlBase):
statement = pre_encrypt_assertion(statement) statement = pre_encrypt_assertion(statement)
_, fil = make_temp("%s" % statement, decode=False, delete=False) _, fil = make_temp(str(statement).encode('utf-8'), decode=False, delete=False)
_, tmpl = make_temp("%s" % template, decode=False) _, tmpl = make_temp(str(template).encode('utf-8'), decode=False)
if not node_xpath: if not node_xpath:
node_xpath = ASSERT_XPATH node_xpath = ASSERT_XPATH
@ -815,7 +819,7 @@ class CryptoBackendXmlSec1(CryptoBackend):
""" """
logger.debug("Decrypt input len: %d" % len(enctext)) logger.debug("Decrypt input len: %d" % len(enctext))
_, fil = make_temp("%s" % enctext, decode=False) _, fil = make_temp(str(enctext).encode('utf-8'), decode=False)
com_list = [self.xmlsec, "--decrypt", "--privkey-pem", com_list = [self.xmlsec, "--decrypt", "--privkey-pem",
key_file, "--id-attr:%s" % ID_ATTR, ENC_KEY_CLASS] key_file, "--id-attr:%s" % ID_ATTR, ENC_KEY_CLASS]
@ -838,9 +842,11 @@ class CryptoBackendXmlSec1(CryptoBackend):
'id','Id' or 'ID' 'id','Id' or 'ID'
:return: The signed statement :return: The signed statement
""" """
if not isinstance(statement, six.binary_type):
statement = str(statement).encode('utf-8')
_, fil = make_temp("%s" % statement, suffix=".xml", decode=False, _, fil = make_temp(statement, suffix=".xml",
delete=self._xmlsec_delete_tmpfiles) decode=False, delete=self._xmlsec_delete_tmpfiles)
com_list = [self.xmlsec, "--sign", com_list = [self.xmlsec, "--sign",
"--privkey-pem", key_file, "--privkey-pem", key_file,
@ -875,6 +881,8 @@ class CryptoBackendXmlSec1(CryptoBackend):
:param id_attr: Should normally be one of "id", "Id" or "ID" :param id_attr: Should normally be one of "id", "Id" or "ID"
:return: Boolean True if the signature was correct otherwise False. :return: Boolean True if the signature was correct otherwise False.
""" """
if not isinstance(signedtext, six.binary_type):
signedtext = signedtext.encode('utf-8')
_, fil = make_temp(signedtext, suffix=".xml", _, fil = make_temp(signedtext, suffix=".xml",
decode=False, delete=self._xmlsec_delete_tmpfiles) decode=False, delete=self._xmlsec_delete_tmpfiles)
@ -924,8 +932,8 @@ class CryptoBackendXmlSec1(CryptoBackend):
pof = Popen(com_list, stderr=PIPE, stdout=PIPE) pof = Popen(com_list, stderr=PIPE, stdout=PIPE)
p_out = pof.stdout.read() p_out = pof.stdout.read().decode('utf-8')
p_err = pof.stderr.read() p_err = pof.stderr.read().decode('utf-8')
if pof.returncode is not None and pof.returncode < 0: if pof.returncode is not None and pof.returncode < 0:
logger.error(LOG_LINE % (p_out, p_err)) logger.error(LOG_LINE % (p_out, p_err))
@ -1685,7 +1693,7 @@ class SecurityContext(object):
id_attr = ID_ATTR id_attr = ID_ATTR
if not key_file and key: if not key_file and key:
_, key_file = make_temp("%s" % key, ".pem") _, key_file = make_temp(str(key).encode('utf-8'), ".pem")
if not key and not key_file: if not key and not key_file:
key_file = self.key_file key_file = self.key_file

View File

@ -299,10 +299,11 @@ class TestSecurity():
to_sign = [(class_name(self._assertion), self._assertion.id), to_sign = [(class_name(self._assertion), self._assertion.id),
(class_name(response), response.id)] (class_name(response), response.id)]
s_response = sigver.signed_instance_factory(response, self.sec, to_sign) s_response = sigver.signed_instance_factory(response, self.sec,
to_sign)
print(s_response) print(s_response)
res = self.sec.verify_signature("%s" % s_response, res = self.sec.verify_signature(s_response,
node_name=class_name(samlp.Response())) node_name=class_name(samlp.Response()))
print(res) print(res)
@ -327,7 +328,7 @@ class TestSecurity():
assert ci == self.sec.my_cert assert ci == self.sec.my_cert
res = self.sec.verify_signature("%s" % s_response, res = self.sec.verify_signature(s_response,
node_name=class_name(samlp.Response())) node_name=class_name(samlp.Response()))
assert res assert res
@ -358,7 +359,7 @@ class TestSecurity():
ci = "".join(sigver.cert_from_instance(ass)[0].split()) ci = "".join(sigver.cert_from_instance(ass)[0].split())
assert ci == self.sec.my_cert assert ci == self.sec.my_cert
res = self.sec.verify_signature("%s" % s_assertion, res = self.sec.verify_signature(s_assertion,
node_name=class_name(ass)) node_name=class_name(ass))
assert res assert res
@ -447,9 +448,9 @@ def test_xbox():
encrypted_assertion = EncryptedAssertion() encrypted_assertion = EncryptedAssertion()
encrypted_assertion.add_extension_element(_ass0) encrypted_assertion.add_extension_element(_ass0)
_, pre = make_temp("%s" % pre_encryption_part(), decode=False) _, pre = make_temp(str(pre_encryption_part()).encode('utf-8'), decode=False)
enctext = sec.crypto.encrypt( enctext = sec.crypto.encrypt(
"%s" % encrypted_assertion, conf.cert_file, pre, "des-192", str(encrypted_assertion), conf.cert_file, pre, "des-192",
'/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]') '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]')