Merge "tempauth: Support fernet tokens"

This commit is contained in:
Zuul 2025-04-30 23:45:49 +00:00 committed by Gerrit Code Review
commit cd1cc537d9
8 changed files with 273 additions and 88 deletions

View File

@ -459,6 +459,19 @@ use = egg:swift#tempauth
# This can be useful with an SSL load balancer in front of a non-SSL server.
# storage_url_scheme = default
#
# Fernet keys may be used for storage, rather than relying on memcached.
# Multiple keys may be configured using options named 'fernet_key_<key_id>'
# where 'key_id' is a unique identifier. The value should be 32 url-safe
# base64-encoded bytes, such as may be generated using
# `openssl rand -base64 32 | tr '+/' '-_'`
# Any of these keys may be used for decryption. Only one key may be used
# for encryption by a proxy at any given time; configure it with the
# 'active_fernet_key_id' option. All proxies in the cluster should know
# about a key before it is activated. If blank (the default),
# memcached-backed tokens will be issued.
# fernet_key_myid = <32 url-safe base64-encoded bytes>
# active_fernet_key_id = myid
#
# Lastly, you need to list all the accounts/users you want here. The format is:
# user_<account>_<user> = <key> [group] [group] [...] [storage_url]
# or if you want underscores in <account> or <user>, you can base64 encode them

View File

@ -19,7 +19,7 @@ from swift.common.exceptions import UnknownSecretIdError
from swift.common.middleware.crypto.crypto_utils import CRYPTO_KEY_CALLBACK
from swift.common.swob import Request, HTTPException, wsgi_to_str, str_to_wsgi
from swift.common.utils import readconf, strict_b64decode, get_logger, \
split_path
split_path, load_multikey_opts
from swift.common.wsgi import WSGIContext
@ -282,17 +282,6 @@ class BaseKeyMaster(object):
return readconf(self.keymaster_config_path,
self.keymaster_conf_section)
def _load_multikey_opts(self, conf, prefix):
result = []
for k, v in conf.items():
if not k.startswith(prefix):
continue
suffix = k[len(prefix):]
if suffix and (suffix[0] != '_' or len(suffix) < 2):
raise ValueError('Malformed root secret option name %s' % k)
result.append((k, suffix[1:] or None, v))
return sorted(result)
def __call__(self, env, start_response):
req = Request(env)
@ -366,8 +355,8 @@ class KeyMaster(BaseKeyMaster):
:rtype: dict
"""
root_secrets = {}
for opt, secret_id, value in self._load_multikey_opts(
conf, 'encryption_root_secret'):
for opt, secret_id, value in load_multikey_opts(
conf, 'encryption_root_secret', allow_none_key=True):
try:
secret = self._decode_root_secret(value)
except ValueError:

View File

@ -17,7 +17,7 @@ import logging
import os
from swift.common.middleware.crypto import keymaster
from swift.common.utils import LogLevelFilter
from swift.common.utils import LogLevelFilter, load_multikey_opts
from kmip.pie.client import ProxyKmipClient
@ -145,7 +145,7 @@ class KmipKeyMaster(keymaster.BaseKeyMaster):
return conf
def _get_root_secret(self, conf):
multikey_opts = self._load_multikey_opts(conf, 'key_id')
multikey_opts = load_multikey_opts(conf, 'key_id', allow_none_key=True)
kmip_to_secret = {}
root_secrets = {}
with self.proxy_kmip_client as client:

View File

@ -16,6 +16,7 @@ from castellan import key_manager, options
from castellan.common.credentials import keystone_password
from oslo_config import cfg
from swift.common.middleware.crypto.keymaster import BaseKeyMaster
from swift.common.utils import load_multikey_opts
class KmsKeyMaster(BaseKeyMaster):
@ -74,8 +75,8 @@ class KmsKeyMaster(BaseKeyMaster):
manager = key_manager.API(oslo_conf)
root_secrets = {}
for opt, secret_id, key_id in self._load_multikey_opts(
conf, 'key_id'):
for opt, secret_id, key_id in load_multikey_opts(
conf, 'key_id', allow_none_key=True):
key = manager.get(ctxt, key_id)
if key is None:
raise ValueError("Retrieval of encryption root secret with "

View File

@ -179,7 +179,9 @@ from time import time
from traceback import format_exc
from uuid import uuid4
import base64
import zlib
from cryptography import fernet
from eventlet import Timeout
from swift.common.memcached import MemcacheConnectionError
from swift.common.swob import (
@ -192,7 +194,7 @@ from swift.common.request_helpers import get_sys_meta_prefix
from swift.common.middleware.acl import (
clean_acl, parse_acl, referrer_allowed, acls_from_account_info)
from swift.common.utils import cache_from_env, get_logger, \
split_path, config_true_value
split_path, config_true_value, load_multikey_opts
from swift.common.registry import register_swift_info
from swift.common.utils import config_read_reseller_options, quote
from swift.proxy.controllers.base import get_account_info
@ -229,6 +231,17 @@ class TempAuth(object):
if not self.auth_prefix.endswith('/'):
self.auth_prefix += '/'
self.token_life = int(conf.get('token_life', DEFAULT_TOKEN_LIFE))
self.fernet_keys = {
key_id: fernet.Fernet(key)
for _, key_id, key in load_multikey_opts(conf, 'fernet_key')
}
self.fernet = (fernet.MultiFernet(self.fernet_keys.values())
if self.fernet_keys else None)
self.active_fernet_key_id = conf.get('active_fernet_key_id')
if self.active_fernet_key_id and \
self.active_fernet_key_id not in self.fernet_keys:
raise ValueError("key_id %r not found; %r are available" % (
self.active_fernet_key_id, sorted(self.fernet_keys.keys())))
self.allow_overrides = config_true_value(
conf.get('allow_overrides', 't'))
self.storage_url_scheme = conf.get('storage_url_scheme', 'default')
@ -425,6 +438,40 @@ class TempAuth(object):
groups = ','.join(groups)
return groups
def groups_from_fernet(self, env, token):
try:
if self.fernet:
return self.fernet.decrypt(
token.encode('ascii'),
ttl=self.token_life).decode('utf8')
except (ValueError, fernet.InvalidToken):
pass
return None
def groups_from_compressed_fernet(self, env, token):
try:
if self.fernet:
return zlib.decompress(self.fernet.decrypt(
token.encode('ascii'),
ttl=self.token_life)).decode('utf8')
except (ValueError, fernet.InvalidToken):
pass
return None
def groups_from_memcache(self, env, token):
memcache_client = cache_from_env(env)
if not memcache_client:
raise Exception('Memcache required')
memcache_token_key = '%s/token/%stk%s' % (
self.reseller_prefix, self.reseller_prefix, token)
cached_auth_data = memcache_client.get(memcache_token_key)
groups = None
if cached_auth_data:
expires, groups = cached_auth_data
if expires < time():
groups = None
return groups
def get_groups(self, env, token):
"""
Get groups for the given token.
@ -436,38 +483,40 @@ class TempAuth(object):
of. The first group in the list is also considered a unique
identifier for that user.
"""
groups = None
memcache_client = cache_from_env(env)
if not memcache_client:
raise Exception('Memcache required')
memcache_token_key = '%s/token/%s' % (self.reseller_prefix, token)
cached_auth_data = memcache_client.get(memcache_token_key)
if cached_auth_data:
expires, groups = cached_auth_data
if expires < time():
groups = None
handlers = [
('zftk', self.groups_from_compressed_fernet),
('ftk', self.groups_from_fernet),
('tk', self.groups_from_memcache),
]
if token:
for prefix, handler in handlers:
prefix = self.reseller_prefix + prefix
if token.startswith(prefix):
groups = handler(env, token[len(prefix):])
if groups:
return groups
s3_auth_details = env.get('s3api.auth_details') or\
env.get('swift3.auth_details')
if s3_auth_details:
if 'check_signature' not in s3_auth_details:
self.logger.warning(
'Swift3 did not provide a check_signature function; '
'upgrade Swift3 if you want to use it with tempauth')
return None
account_user = s3_auth_details['access_key']
if account_user not in self.users:
return None
user = self.users[account_user]
account = account_user.split(':', 1)[0]
account_id = user['url'].rsplit('/', 1)[-1]
if not s3_auth_details['check_signature'](user['key']):
return None
env['PATH_INFO'] = env['PATH_INFO'].replace(
str_to_wsgi(account_user), wsgi_unquote(account_id), 1)
groups = self._get_user_groups(account, account_user, account_id)
if not s3_auth_details:
return None
return groups
if 'check_signature' not in s3_auth_details:
self.logger.warning(
'Swift3 did not provide a check_signature function; '
'upgrade Swift3 if you want to use it with tempauth')
return None
account_user = s3_auth_details['access_key']
if account_user not in self.users:
return None
user = self.users[account_user]
account = account_user.split(':', 1)[0]
account_id = user['url'].rsplit('/', 1)[-1]
if not s3_auth_details['check_signature'](user['key']):
return None
env['PATH_INFO'] = env['PATH_INFO'].replace(
str_to_wsgi(account_user), wsgi_unquote(account_id), 1)
return self._get_user_groups(account, account_user, account_id)
def account_acls(self, req):
"""
@ -718,6 +767,26 @@ class TempAuth(object):
def _create_new_token(self, memcache_client,
account, account_user, account_id):
if self.active_fernet_key_id:
expires = time() + self.token_life
token_prefix = 'ftk' # nosec: B105
groups = self._get_user_groups(
account,
account_user,
account_id,
).encode('utf8')
compressed = zlib.compress(groups)
if len(compressed) < len(groups):
token_prefix = 'zftk' # nosec: B105
groups = compressed
token = ''.join([
self.reseller_prefix,
token_prefix,
self.fernet_keys[self.active_fernet_key_id].encrypt(
groups).decode('ascii'),
])
return token, expires
# Generate new token
token = '%stk%s' % (self.reseller_prefix, uuid4().hex)
expires = time() + self.token_life
@ -817,27 +886,29 @@ class TempAuth(object):
self.logger.increment('token_denied')
return HTTPUnauthorized(request=req, headers=unauthed_headers)
account_id = self.users[account_user]['url'].rsplit('/', 1)[-1]
# Get memcache client
# Try to get memcache client
memcache_client = cache_from_env(req.environ)
if not memcache_client:
if not (memcache_client or self.active_fernet_key_id):
raise Exception('Memcache required')
# See if a token already exists and hasn't expired
token = None
memcache_user_key = '%s/user/%s' % (self.reseller_prefix, account_user)
candidate_token = memcache_client.get(memcache_user_key)
if candidate_token:
memcache_token_key = \
'%s/token/%s' % (self.reseller_prefix, candidate_token)
cached_auth_data = memcache_client.get(memcache_token_key)
if cached_auth_data:
expires, old_groups = cached_auth_data
old_groups = [group for group in old_groups.split(',')]
new_groups = self._get_user_groups(account, account_user,
account_id)
if memcache_client:
memcache_user_key = '%s/user/%s' % (
self.reseller_prefix, account_user)
candidate_token = memcache_client.get(memcache_user_key)
if candidate_token:
memcache_token_key = \
'%s/token/%s' % (self.reseller_prefix, candidate_token)
cached_auth_data = memcache_client.get(memcache_token_key)
if cached_auth_data:
expires, old_groups = cached_auth_data
old_groups = [group for group in old_groups.split(',')]
new_groups = self._get_user_groups(account, account_user,
account_id)
if expires > time() and \
set(old_groups) == set(new_groups.split(',')):
token = candidate_token
if expires > time() and \
set(old_groups) == set(new_groups.split(',')):
token = candidate_token
# Create a new token if one didn't exist
if not token:
try:

View File

@ -1501,6 +1501,32 @@ def cache_from_env(env, allow_none=False):
return item_from_env(env, 'swift.cache', allow_none)
def load_multikey_opts(conf, prefix, allow_none_key=False):
"""
Read multi-key options of the form "<prefix>_<key> = <value>"
:param conf: a config dict
:param prefix: the prefix for which to search
:param allow_none_key: if True, also parse "<prefix> = <value>" and
include it in the result as ``(None, value)``
:returns: a sorted list of (<key>, <value>) tuples
:raises ValueError: if an option starts with prefix but cannot be parsed
"""
result = []
for k, v in conf.items():
if not k.startswith(prefix):
continue
suffix = k[len(prefix):]
if not suffix and allow_none_key:
result.append((k, None, v))
continue
if len(suffix) >= 2 and suffix[0] == '_':
result.append((k, suffix[1:], v))
continue
raise ValueError('Malformed multi-key option name %s' % k)
return sorted(result)
def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
"""
Ensure that a pickle file gets written to disk. The file

View File

@ -399,7 +399,7 @@ class TestKeymaster(unittest.TestCase):
with self.assertRaises(ValueError) as err:
keymaster.KeyMaster(self.swift, conf)
self.assertEqual(
'Malformed root secret option name %s' % bad_option,
'Malformed multi-key option name %s' % bad_option,
str(err.exception))
do_test('encryption_root_secret1')
do_test('encryption_root_secret123')

View File

@ -22,10 +22,11 @@ from time import time
from urllib.parse import quote, urlparse
from swift.common.middleware import tempauth as auth
from swift.common.middleware.acl import format_acl
from swift.common.swob import Request, Response, bytes_to_wsgi
from swift.common.swob import Request, Response, bytes_to_wsgi, HTTPOk
from swift.common.statsd_client import StatsdClient
from swift.common.utils import split_path
from test.unit import FakeMemcache
from test.unit.common.middleware.helpers import FakeSwift
NO_CONTENT_RESP = (('204 No Content', {}, ''),) # mock server response
@ -537,8 +538,8 @@ class TestAuth(unittest.TestCase):
def test_detect_reseller_request(self):
req = self._make_request('/v1/AUTH_admin',
headers={'X-Auth-Token': 'AUTH_t'})
cache_key = 'AUTH_/token/AUTH_t'
headers={'X-Auth-Token': 'AUTH_tk'})
cache_key = 'AUTH_/token/AUTH_tk'
cache_entry = (time() + 3600, '.reseller_admin')
req.environ['swift.cache'].set(cache_key, cache_entry)
req.get_response(self.test_auth)
@ -670,8 +671,8 @@ class TestAuth(unittest.TestCase):
test_auth = auth.filter_factory({'user_acct_user': 'testing'})(
FakeApp(iter(NO_CONTENT_RESP * 1)))
req = self._make_request('/v1/AUTH_acct',
headers={'X-Auth-Token': 'AUTH_t'})
cache_key = 'AUTH_/token/AUTH_t'
headers={'X-Auth-Token': 'AUTH_tk'})
cache_key = 'AUTH_/token/AUTH_tk'
cache_entry = (time() + 3600, 'AUTH_acct')
req.environ['swift.cache'].set(cache_key, cache_entry)
resp = req.get_response(test_auth)
@ -724,12 +725,96 @@ class TestAuth(unittest.TestCase):
self.assertEqual(resp.headers.get('Www-Authenticate'),
'Swift realm="act"')
def test_fernet_token_no_memcache(self):
swift = FakeSwift()
swift.register('GET', '/v1/AUTH_ac', HTTPOk, {})
test_auth = auth.filter_factory({
'user_ac_user': 'testing .admin',
'fernet_key_2024': 'esipv1wC03xLGPb3cydid0uPINl6g8sydhlPh6iwJxk=',
'active_fernet_key_id': '2024',
})(swift)
req = Request.blank(
'/auth/v1.0',
headers={'X-Auth-User': 'ac:user', 'X-Auth-Key': 'testing'})
# no memcache!
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 200)
token = resp.headers['X-Auth-Token']
self.assertEqual(token[:8], 'AUTH_ftk')
req = Request.blank('/v1/AUTH_ac', headers={'X-Auth-Token': token})
# again, no memcache!
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 200)
# key rotation time
test_auth = auth.filter_factory({
'user_ac_user': 'testing .admin',
'fernet_key_2024': 'esipv1wC03xLGPb3cydid0uPINl6g8sydhlPh6iwJxk=',
'fernet_key_2025': 'gRXHeKlt5h1nMDZL_QA7UfVIJ5z3ZP3v351cvmiRZD4=',
'active_fernet_key_id': '2025',
})(swift)
# old token still good
req = Request.blank('/v1/AUTH_ac', headers={'X-Auth-Token': token})
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 200)
req = Request.blank(
'/auth/v1.0',
headers={'X-Auth-User': 'ac:user', 'X-Auth-Key': 'testing'})
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 200)
new_token = resp.headers['X-Auth-Token']
self.assertEqual(new_token[:8], 'AUTH_ftk')
# drop old key
test_auth = auth.filter_factory({
'user_ac_user': 'testing .admin',
'fernet_key_2025': 'gRXHeKlt5h1nMDZL_QA7UfVIJ5z3ZP3v351cvmiRZD4=',
'active_fernet_key_id': '2025',
})(swift)
# old token now bad
req = Request.blank('/v1/AUTH_ac', headers={'X-Auth-Token': token})
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 401)
# new token still good
req = Request.blank('/v1/AUTH_ac', headers={'X-Auth-Token': new_token})
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 200)
def test_compressed_fernet_token_no_memcache(self):
swift = FakeSwift()
swift.register('GET', '/v1/AUTH_ac', HTTPOk, {})
test_auth = auth.filter_factory({
'user_ac_user': 'testing .admin ' + ' '.join(
'similar-group-name-%d' % i for i in range(20)),
'fernet_key_2024': 'esipv1wC03xLGPb3cydid0uPINl6g8sydhlPh6iwJxk=',
'active_fernet_key_id': '2024',
})(swift)
req = Request.blank(
'/auth/v1.0',
headers={'X-Auth-User': 'ac:user', 'X-Auth-Key': 'testing'})
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 200)
token = resp.headers['X-Auth-Token']
self.assertEqual(token[:9], 'AUTH_zftk')
# token's good
req = Request.blank('/v1/AUTH_ac', headers={'X-Auth-Token': token})
resp = req.get_response(test_auth)
self.assertEqual(resp.status_int, 200)
def test_object_name_containing_slash(self):
test_auth = auth.filter_factory({'user_acct_user': 'testing'})(
FakeApp(iter(NO_CONTENT_RESP * 1)))
req = self._make_request('/v1/AUTH_acct/cont/obj/name/with/slash',
headers={'X-Auth-Token': 'AUTH_t'})
cache_key = 'AUTH_/token/AUTH_t'
headers={'X-Auth-Token': 'AUTH_tk'})
cache_key = 'AUTH_/token/AUTH_tk'
cache_entry = (time() + 3600, 'AUTH_acct')
req.environ['swift.cache'].set(cache_key, cache_entry)
resp = req.get_response(test_auth)
@ -1284,7 +1369,7 @@ class TestAccountAcls(unittest.TestCase):
def _make_request(self, path, **kwargs):
# Our TestAccountAcls default request will have a valid auth token
version, acct, _ = split_path(path, 1, 3, True)
headers = kwargs.pop('headers', {'X-Auth-Token': 'AUTH_t'})
headers = kwargs.pop('headers', {'X-Auth-Token': 'AUTH_tk'})
user_groups = kwargs.pop('user_groups', 'AUTH_firstacct')
# The account being accessed will have account ACLs
@ -1298,7 +1383,7 @@ class TestAccountAcls(unittest.TestCase):
# Authorize the token by populating the request's cache
req.environ['swift.cache'] = FakeMemcache()
cache_key = 'AUTH_/token/AUTH_t'
cache_key = 'AUTH_/token/AUTH_tk'
cache_entry = (time() + 3600, user_groups)
req.environ['swift.cache'].set(cache_key, cache_entry)
@ -1451,7 +1536,7 @@ class TestAccountAcls(unittest.TestCase):
FakeApp(iter(NO_CONTENT_RESP * 5)))
user_groups = test_auth._get_user_groups('admin', 'admin:user',
'AUTH_admin')
good_headers = {'X-Auth-Token': 'AUTH_t'}
good_headers = {'X-Auth-Token': 'AUTH_tk'}
good_acl = json.dumps({"read-only": [u"á", "b"]})
bad_list_types = '{"read-only": ["a", 99]}'
bad_acl = 'syntactically invalid acl -- this does not parse as JSON'
@ -1546,7 +1631,7 @@ class TestAccountAcls(unittest.TestCase):
sysmeta_hdr = 'x-account-sysmeta-core-access-control'
target = '/v1/AUTH_firstacct'
good_headers = {'X-Auth-Token': 'AUTH_t'}
good_headers = {'X-Auth-Token': 'AUTH_tk'}
good_acl = '{"read-only":["a","b"]}'
# no acls -- no problem!
@ -1567,7 +1652,7 @@ class TestAccountAcls(unittest.TestCase):
FakeApp(iter(NO_CONTENT_RESP * 3)))
target = '/v1/AUTH_firstacct'
good_headers = {'X-Auth-Token': 'AUTH_t'}
good_headers = {'X-Auth-Token': 'AUTH_tk'}
bad_acls = (
'syntax error',
'{"bad_key":"should_fail"}',
@ -1824,9 +1909,9 @@ class TestTokenHandling(unittest.TestCase):
self.req = Request.blank(path, headers=headers)
self.req.method = method
self.req.environ['swift.cache'] = FakeMemcache()
self._setup_user_and_token('AUTH_t', 'acct', 'acct:joe',
self._setup_user_and_token('AUTH_tk', 'acct', 'acct:joe',
'.admin')
self._setup_user_and_token('AUTH_s', 'admin', 'admin:glance',
self._setup_user_and_token('AUTH_tks', 'admin', 'admin:glance',
'.service')
resp = self.req.get_response(self.test_auth)
return resp
@ -1852,21 +1937,21 @@ class TestTokenHandling(unittest.TestCase):
def test_tokens_set_remote_user(self):
conf = {} # Default conf
resp = self._make_request(conf, '/v1/AUTH_acct',
{'x-auth-token': 'AUTH_t'})
{'x-auth-token': 'AUTH_tk'})
self.assertEqual(self.req.environ['REMOTE_USER'],
'acct,acct:joe,AUTH_acct')
self.assertEqual(resp.status_int, 200)
# Add x-service-token
resp = self._make_request(conf, '/v1/AUTH_acct',
{'x-auth-token': 'AUTH_t',
'x-service-token': 'AUTH_s'})
{'x-auth-token': 'AUTH_tk',
'x-service-token': 'AUTH_tks'})
self.assertEqual(self.req.environ['REMOTE_USER'],
'acct,acct:joe,AUTH_acct,admin,admin:glance,.service')
self.assertEqual(resp.status_int, 200)
# Put x-auth-token value into x-service-token
resp = self._make_request(conf, '/v1/AUTH_acct',
{'x-auth-token': 'AUTH_t',
'x-service-token': 'AUTH_t'})
{'x-auth-token': 'AUTH_tk',
'x-service-token': 'AUTH_tk'})
self.assertEqual(self.req.environ['REMOTE_USER'],
'acct,acct:joe,AUTH_acct,acct,acct:joe,AUTH_acct')
self.assertEqual(resp.status_int, 200)
@ -1875,15 +1960,15 @@ class TestTokenHandling(unittest.TestCase):
conf = {'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'}
resp = self._make_request(conf, '/v1/PRE2_acct',
{'x-auth-token': 'AUTH_t',
'x-service-token': 'AUTH_s'})
{'x-auth-token': 'AUTH_tk',
'x-service-token': 'AUTH_tks'})
self.assertEqual(resp.status_int, 200)
def test_service_token_omitted(self):
conf = {'reseller_prefix': 'AUTH, PRE2',
'PRE2_require_group': '.service'}
resp = self._make_request(conf, '/v1/PRE2_acct',
{'x-auth-token': 'AUTH_t'})
{'x-auth-token': 'AUTH_tk'})
self.assertEqual(resp.status_int, 403)
def test_invalid_tokens(self):
@ -1893,12 +1978,12 @@ class TestTokenHandling(unittest.TestCase):
{'x-auth-token': 'AUTH_junk'})
self.assertEqual(resp.status_int, 401)
resp = self._make_request(conf, '/v1/PRE2_acct',
{'x-auth-token': 'AUTH_t',
{'x-auth-token': 'AUTH_tk',
'x-service-token': 'AUTH_junk'})
self.assertEqual(resp.status_int, 403)
resp = self._make_request(conf, '/v1/PRE2_acct',
{'x-auth-token': 'AUTH_junk',
'x-service-token': 'AUTH_s'})
'x-service-token': 'AUTH_tks'})
self.assertEqual(resp.status_int, 401)