diff --git a/src/saml2/client.py b/src/saml2/client.py index d64bd80..6ee5ef3 100644 --- a/src/saml2/client.py +++ b/src/saml2/client.py @@ -64,21 +64,73 @@ class Saml2Client(Base): :return: session id and AuthnRequest info """ - destination = self._sso_location(entityid, binding) + reqid, negotiated_binding, info = self.prepare_for_negotiated_authenticate( + entityid=entityid, + relay_state=relay_state, + binding=binding, + vorg=vorg, + nameid_format=nameid_format, + scoping=scoping, + consent=consent, + extensions=extensions, + sign=sign, + response_binding=response_binding, + **kwargs) - reqid, req = self.create_authn_request(destination, vorg, scoping, - response_binding, nameid_format, - consent=consent, - extensions=extensions, sign=sign, - **kwargs) - _req_str = "%s" % req - - logger.info("AuthNReq: %s" % _req_str) - - info = self.apply_binding(binding, _req_str, destination, relay_state) + assert negotiated_binding == binding return reqid, info + def prepare_for_negotiated_authenticate(self, entityid=None, relay_state="", + binding=None, vorg="", + nameid_format=None, + scoping=None, consent=None, extensions=None, + sign=None, + response_binding=saml2.BINDING_HTTP_POST, + **kwargs): + """ Makes all necessary preparations for an authentication request that negotiates + which binding to use for authentication. + + :param entityid: The entity ID of the IdP to send the request to + :param relay_state: To where the user should be returned after + successfull log in. + :param binding: Which binding to use for sending the request + :param vorg: The entity_id of the virtual organization I'm a member of + :param scoping: For which IdPs this query are aimed. + :param consent: Whether the principal have given her consent + :param extensions: Possible extensions + :param sign: Whether the request should be signed or not. + :param response_binding: Which binding to use for receiving the response + :param kwargs: Extra key word arguments + :return: session id and AuthnRequest info + """ + + expected_binding = binding + + for binding in [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST]: + if expected_binding and binding != expected_binding: + continue + + destination = self._sso_location(entityid, binding) + logger.info("destination to provider: %s" % destination) + + reqid, request = self.create_authn_request( + destination, vorg, scoping, response_binding, nameid_format, + consent=consent, + extensions=extensions, sign=sign, + **kwargs) + + _req_str = str(request) + + logger.info("AuthNReq: %s" % _req_str) + + http_info = self.apply_binding(binding, _req_str, destination, + relay_state) + + return reqid, binding, http_info + else: + raise SignOnError("No supported bindings available for authentication") + def global_logout(self, name_id, reason="", expire=None, sign=None): """ More or less a layer of indirection :-/ Bootstrapping the whole thing by finding all the IdPs that should diff --git a/src/saml2/client_base.py b/src/saml2/client_base.py index de43387..4e9cbdf 100644 --- a/src/saml2/client_base.py +++ b/src/saml2/client_base.py @@ -71,6 +71,10 @@ class VerifyError(SAMLError): pass +class SignOnError(SAMLError): + pass + + class LogoutError(SAMLError): pass diff --git a/tests/test_51_client.py b/tests/test_51_client.py index a82a278..26b9da8 100644 --- a/tests/test_51_client.py +++ b/tests/test_51_client.py @@ -635,6 +635,26 @@ class TestClientWithDummy(): resp_args = self.server.response_args(req.message, [response_binding]) assert resp_args["binding"] == response_binding + def test_do_negotiated_authn(self): + binding = BINDING_HTTP_REDIRECT + response_binding = BINDING_HTTP_POST + sid, auth_binding, http_args = self.client.prepare_for_negotiated_authenticate( + IDP, "http://www.example.com/relay_state", + binding=binding, response_binding=response_binding) + + assert binding == auth_binding + assert isinstance(sid, basestring) + assert len(http_args) == 4 + assert http_args["headers"][0][0] == "Location" + assert http_args["data"] == [] + redirect_url = http_args["headers"][0][1] + _, _, _, _, qs, _ = urlparse.urlparse(redirect_url) + qs_dict = urlparse.parse_qs(qs) + req = self.server.parse_authn_request(qs_dict["SAMLRequest"][0], + binding) + resp_args = self.server.response_args(req.message, [response_binding]) + assert resp_args["binding"] == response_binding + def test_do_attribute_query(self): response = self.client.do_attribute_query( IDP, "_e7b68a04488f715cda642fbdd90099f5", @@ -699,6 +719,41 @@ class TestClientWithDummy(): 'http://www.example.com/login' assert ac.authn_context_class_ref.text == INTERNETPROTOCOLPASSWORD + def test_negotiated_post_sso(self): + binding = BINDING_HTTP_POST + response_binding = BINDING_HTTP_POST + sid, auth_binding, http_args = self.client.prepare_for_negotiated_authenticate( + "urn:mace:example.com:saml:roland:idp", relay_state="really", + binding=binding, response_binding=response_binding) + _dic = unpack_form(http_args["data"][3]) + + assert binding == auth_binding + + req = self.server.parse_authn_request(_dic["SAMLRequest"], binding) + resp_args = self.server.response_args(req.message, [response_binding]) + assert resp_args["binding"] == response_binding + + # Normally a response would now be sent back to the users web client + # Here I fake what the client will do + # create the form post + + http_args["data"] = urllib.urlencode(_dic) + http_args["method"] = "POST" + http_args["dummy"] = _dic["SAMLRequest"] + http_args["headers"] = [('Content-type', + 'application/x-www-form-urlencoded')] + + response = self.client.send(**http_args) + print response.text + _dic = unpack_form(response.text[3], "SAMLResponse") + resp = self.client.parse_authn_request_response(_dic["SAMLResponse"], + BINDING_HTTP_POST, + {sid: "/"}) + ac = resp.assertion.authn_statement[0].authn_context + assert ac.authenticating_authority[0].text == \ + 'http://www.example.com/login' + assert ac.authn_context_class_ref.text == INTERNETPROTOCOLPASSWORD + # if __name__ == "__main__": # tc = TestClient() @@ -708,4 +763,4 @@ class TestClientWithDummy(): if __name__ == "__main__": tc = TestClient() tc.setup_class() - tc.test_sign_then_encrypt_assertion_advice() \ No newline at end of file + tc.test_sign_then_encrypt_assertion_advice()