108 lines
3.2 KiB
Python
108 lines
3.2 KiB
Python
import pytest
|
|
import requests
|
|
from mock import create_autospec, patch
|
|
|
|
from dcos import auth
|
|
from dcos.errors import DCOSException
|
|
|
|
|
|
def test_get_auth_scheme():
|
|
_get_auth_scheme({'www-authenticate': 'acsjwt'}, scheme='acsjwt')
|
|
_get_auth_scheme({'www-authenticate': 'oauthjwt'}, scheme='oauthjwt')
|
|
|
|
msg = ("Server responded with an HTTP 'www-authenticate' field of "
|
|
"'foobar', DC/OS only supports ['oauthjwt', 'acsjwt']")
|
|
_get_auth_scheme_exception({'www-authenticate': 'foobar'}, msg)
|
|
|
|
msg = ("Invalid HTTP response: server returned an HTTP 401 response "
|
|
"with no 'www-authenticate' field")
|
|
_get_auth_scheme_exception({}, msg)
|
|
|
|
|
|
def _get_auth_scheme(header, scheme):
|
|
with patch('requests.Response') as mock:
|
|
mock.headers = header
|
|
auth_scheme = auth._get_auth_scheme(mock)
|
|
assert auth_scheme == scheme
|
|
|
|
|
|
def _get_auth_scheme_exception(header, err_msg):
|
|
with patch('requests.Response') as mock:
|
|
mock.headers = header
|
|
with pytest.raises(DCOSException) as e:
|
|
auth._get_auth_scheme(mock)
|
|
|
|
assert str(e.value) == err_msg
|
|
|
|
|
|
@patch('dcos.http._request')
|
|
@patch('dcos.config.set_val')
|
|
def test_get_dcostoken_by_post_with_creds(config, req):
|
|
creds = {"foobar"}
|
|
resp = create_autospec(requests.Response)
|
|
resp.status_code = 200
|
|
resp.json.return_value = {"token": "foo"}
|
|
req.return_value = resp
|
|
|
|
auth._get_dcostoken_by_post_with_creds("http://url", creds)
|
|
req.assert_called_with(
|
|
"post", "http://url/acs/api/v1/auth/login", json=creds)
|
|
config.assert_called_with("core.dcos_acs_token", "foo")
|
|
|
|
|
|
@patch('dcos.http._request')
|
|
@patch('dcos.auth._get_dcostoken_by_oidc_implicit_flow')
|
|
@patch('dcos.auth._get_dcostoken_by_dcos_uid_password_auth')
|
|
def test_header_challenge_auth(cred_auth, oidc_auth, req):
|
|
resp = create_autospec(requests.Response)
|
|
resp.status_code = 401
|
|
resp.headers = {"www-authenticate": "oauthjwt"}
|
|
req.return_value = resp
|
|
|
|
auth.header_challenge_auth("url")
|
|
oidc_auth.assert_called_once()
|
|
|
|
resp2 = create_autospec(requests.Response)
|
|
resp2.status_code = 401
|
|
resp2.headers = {"www-authenticate": "acsjwt"}
|
|
req.return_value = resp2
|
|
|
|
auth.header_challenge_auth("url")
|
|
cred_auth.assert_called_once()
|
|
|
|
|
|
@patch('dcos.http.get')
|
|
@patch('dcos.config.get_config_val')
|
|
def test_get_providers(config, req_mock):
|
|
resp = create_autospec(requests.Response)
|
|
resp.return_value = {}
|
|
req_mock.return_value = resp
|
|
config.return_value = "http://localhost"
|
|
|
|
auth.get_providers()
|
|
req_mock.assert_called_with(
|
|
"http://localhost/acs/api/v1/auth/providers")
|
|
|
|
# test url construction valid with trailing slash
|
|
config.return_value = "http://localhost/"
|
|
|
|
auth.get_providers()
|
|
req_mock.assert_called_with(
|
|
"http://localhost/acs/api/v1/auth/providers")
|
|
|
|
|
|
@patch('dcos.http._request')
|
|
@patch('dcos.config.get_config_val')
|
|
def test_get_providers_errors(config, req):
|
|
config.return_value = "http://localhost"
|
|
|
|
resp = create_autospec(requests.Response)
|
|
resp.status_code = 404
|
|
req.return_value = resp
|
|
|
|
with pytest.raises(DCOSException) as e:
|
|
auth.get_providers()
|
|
|
|
err_msg = "This command is not supported for your cluster"
|
|
assert str(e.value) == err_msg
|