Bugfixes. Rest of tests.

This commit is contained in:
gholt 2010-09-11 17:03:09 -07:00
parent b56bf3a0f3
commit 2b995be46c
5 changed files with 502 additions and 72 deletions

View File

@ -336,25 +336,27 @@ class AuthController(object):
(rv, time() - begin))
return rv
def authorize_reseller_admin(self, request):
def is_account_admin(self, request, for_account):
if request.headers.get('X-Auth-Admin-User') == '.super_admin' and \
request.headers.get('X-Auth-Admin-Key') == self.super_admin_key:
return None
return True
try:
account, user = \
request.headers.get('X-Auth-Admin-User').split(':', 1)
except ValueError:
return HTTPForbidden(request=request)
except (AttributeError, ValueError):
return False
with self.get_conn() as conn:
row = conn.execute('''
SELECT user FROM account
WHERE account = ? AND user = ? AND password = ? AND
reseller_admin = 't' ''',
SELECT reseller_admin, admin FROM account
WHERE account = ? AND user = ? AND password = ?''',
(account, user,
request.headers.get('X-Auth-Admin-Key'))).fetchone()
if row:
return None
return HTTPForbidden(request=request)
if row[0] == 't':
return True
if row[1] == 't' and account == for_account:
return True
return False
def handle_token(self, request):
"""
@ -416,17 +418,18 @@ class AuthController(object):
request.headers.get('X-Auth-Admin-User') != '.super_admin' or
request.headers.get('X-Auth-Admin-Key') != self.super_admin_key):
return HTTPForbidden(request=request)
resp = self.authorize_reseller_admin(request)
if resp:
return resp
create_account_admin = \
request.headers.get('x-auth-user-admin') == 'true'
if create_account_admin and \
not self.is_account_admin(request, account_name):
return HTTPForbidden(request=request)
if 'X-Auth-User-Key' not in request.headers:
return HTTPBadRequest('X-Auth-User-Key is required')
return HTTPBadRequest(body='X-Auth-User-Key is required')
password = request.headers['x-auth-user-key']
storage_url = self.create_user(account_name, user_name, password,
request.headers.get('x-auth-user-admin') == 'true',
create_reseller_admin)
create_account_admin, create_reseller_admin)
if storage_url == 'already exists':
return HTTPBadRequest(storage_url)
return HTTPBadRequest(body=storage_url)
if not storage_url:
return HTTPServiceUnavailable()
return HTTPNoContent(headers={'x-storage-url': storage_url})

View File

@ -13,16 +13,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from os import kill
from os import environ, kill
from signal import SIGTERM
from subprocess import call, Popen
from time import sleep
from ConfigParser import ConfigParser
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.client import get_auth
from swift.common.ring import Ring
AUTH_SERVER_CONF_FILE = environ.get('SWIFT_AUTH_SERVER_CONF_FILE',
'/etc/swift/auth-server.conf')
c = ConfigParser()
if not c.read(AUTH_SERVER_CONF_FILE):
exit('Unable to read config file: %s' % AUTH_SERVER_CONF_FILE)
conf = dict(c.items('app:auth-server'))
SUPER_ADMIN_KEY = conf.get('super_admin_key', 'devauth')
def kill_pids(pids):
for pid in pids.values():
try:
@ -50,7 +60,9 @@ def reset_environment():
container_ring = Ring('/etc/swift/container.ring.gz')
object_ring = Ring('/etc/swift/object.ring.gz')
sleep(5)
conn = http_connect('127.0.0.1', '11000', 'POST', '/recreate_accounts')
conn = http_connect('127.0.0.1', '11000', 'POST', '/recreate_accounts',
headers={'X-Auth-Admin-User': '.super_admin',
'X-Auth-Admin-Key': SUPER_ADMIN_KEY})
resp = conn.getresponse()
if resp.status != 200:
raise Exception('Recreating accounts failed. (%d)' % resp.status)

View File

@ -53,12 +53,9 @@ def fake_http_connect(*code_iter, **kwargs):
def getheader(self, name):
return self.getheaders().get(name.lower())
code_iter = iter(code_iter)
def connect(*args, **ckwargs):
if 'give_content_type' in kwargs:
if len(args) >= 7 and 'content_type' in args[6]:
kwargs['give_content_type'](args[6]['content-type'])
else:
kwargs['give_content_type']('')
def connect(*args, **kwargs):
connect.last_args = args
connect.last_kwargs = kwargs
return FakeConn(code_iter.next())
return connect
@ -66,6 +63,7 @@ def fake_http_connect(*code_iter, **kwargs):
class TestAuthServer(unittest.TestCase):
def setUp(self):
self.ohttp_connect = auth_server.http_connect
self.testdir = os.path.join(os.path.dirname(__file__),
'auth_server')
rmtree(self.testdir, ignore_errors=1)
@ -75,6 +73,7 @@ class TestAuthServer(unittest.TestCase):
self.controller = auth_server.AuthController(self.conf)
def tearDown(self):
auth_server.http_connect = self.ohttp_connect
rmtree(self.testdir, ignore_errors=1)
def test_get_conn(self):
@ -120,7 +119,7 @@ class TestAuthServer(unittest.TestCase):
headers={'X-Storage-User': 'tester',
'X-Storage-Pass': 'testing'}))
token = res.headers['x-storage-token']
ttl = self.controller.validate_token(token)
ttl, _, _, _ = self.controller.validate_token(token)
self.assert_(ttl > 0, repr(ttl))
def test_validate_token_expired(self):
@ -135,7 +134,7 @@ class TestAuthServer(unittest.TestCase):
headers={'X-Storage-User': 'tester',
'X-Storage-Pass': 'testing'}))
token = res.headers['x-storage-token']
ttl = self.controller.validate_token(token)
ttl, _, _, _ = self.controller.validate_token(token)
self.assert_(ttl > 0, repr(ttl))
auth_server.time = lambda: 1 + self.controller.token_life
self.assertEquals(self.controller.validate_token(token), False)
@ -318,7 +317,7 @@ class TestAuthServer(unittest.TestCase):
headers={'X-Storage-User': 'tester',
'X-Storage-Pass': 'testing'}))
token = res.headers['x-storage-token']
ttl = self.controller.validate_token(token)
ttl, _, _, _ = self.controller.validate_token(token)
self.assert_(ttl > 0, repr(ttl))
def test_auth_SOSO_good_Mosso_headers(self):
@ -330,7 +329,7 @@ class TestAuthServer(unittest.TestCase):
headers={'X-Auth-User': 'test:tester',
'X-Auth-Key': 'testing'}))
token = res.headers['x-storage-token']
ttl = self.controller.validate_token(token)
ttl, _, _, _ = self.controller.validate_token(token)
self.assert_(ttl > 0, repr(ttl))
def test_auth_SOSO_bad_Mosso_headers(self):
@ -438,7 +437,7 @@ class TestAuthServer(unittest.TestCase):
headers={'X-Auth-User': 'test:tester',
'X-Auth-Key': 'testing'}))
token = res.headers['x-storage-token']
ttl = self.controller.validate_token(token)
ttl, _, _, _ = self.controller.validate_token(token)
self.assert_(ttl > 0, repr(ttl))
def test_auth_Mosso_good_SOSO_header_names(self):
@ -450,7 +449,7 @@ class TestAuthServer(unittest.TestCase):
headers={'X-Storage-User': 'test:tester',
'X-Storage-Pass': 'testing'}))
token = res.headers['x-storage-token']
ttl = self.controller.validate_token(token)
ttl, _, _, _ = self.controller.validate_token(token)
self.assert_(ttl > 0, repr(ttl))
def test_basic_logging(self):
@ -600,6 +599,56 @@ class TestAuthServer(unittest.TestCase):
finally:
rmtree(swift_dir)
def test_upgrading_from_db2(self):
swift_dir = '/tmp/swift_test_auth_%s' % uuid4().hex
os.mkdir(swift_dir)
try:
# Create db1
db_file = os.path.join(swift_dir, 'auth.db')
conn = get_db_connection(db_file, okay_to_create=True)
conn.execute('''CREATE TABLE IF NOT EXISTS account (
account TEXT, url TEXT, cfaccount TEXT,
user TEXT, password TEXT, admin TEXT)''')
conn.execute('''CREATE INDEX IF NOT EXISTS ix_account_account
ON account (account)''')
conn.execute('''CREATE TABLE IF NOT EXISTS token (
token TEXT, created FLOAT,
account TEXT, user TEXT, cfaccount TEXT)''')
conn.execute('''CREATE INDEX IF NOT EXISTS ix_token_token
ON token (token)''')
conn.execute('''CREATE INDEX IF NOT EXISTS ix_token_created
ON token (created)''')
conn.execute('''CREATE INDEX IF NOT EXISTS ix_token_account
ON token (account)''')
conn.execute('''INSERT INTO account
(account, url, cfaccount, user, password, admin)
VALUES ('act', 'url', 'cfa', 'us1', 'pas', '')''')
conn.execute('''INSERT INTO account
(account, url, cfaccount, user, password, admin)
VALUES ('act', 'url', 'cfa', 'us2', 'pas', 't')''')
conn.execute('''INSERT INTO token
(token, created, account, user, cfaccount)
VALUES ('tok', '1', 'act', 'us1', 'cfa')''')
conn.commit()
conn.close()
# Upgrade to current db
conf = {'swift_dir': swift_dir, 'super_admin_key': 'testkey'}
controller = auth_server.AuthController(conf)
# Check new items exist and are correct
conn = get_db_connection(db_file)
row = conn.execute('''SELECT admin, reseller_admin
FROM account WHERE user = 'us1' ''').fetchone()
self.assert_(not row[0], row[0])
self.assert_(not row[1], row[1])
row = conn.execute('''SELECT admin, reseller_admin
FROM account WHERE user = 'us2' ''').fetchone()
self.assertEquals(row[0], 't')
self.assert_(not row[1], row[1])
row = conn.execute('SELECT user FROM token').fetchone()
self.assert_(row)
finally:
rmtree(swift_dir)
def test_create_user_twice(self):
auth_server.http_connect = fake_http_connect(201)
self.controller.create_user('test', 'tester', 'testing')
@ -615,6 +664,295 @@ class TestAuthServer(unittest.TestCase):
url2 = self.controller.create_user('test', 'tester2', 'testing2')
self.assertEquals(url, url2)
def test_no_super_admin_key(self):
conf = {'swift_dir': self.testdir, 'log_name': 'auth'}
self.assertRaises(ValueError, auth_server.AuthController, conf)
conf['super_admin_key'] = 'testkey'
auth_server.AuthController(conf)
def test_add_storage_account(self):
auth_server.http_connect = fake_http_connect(201)
stgact = self.controller.add_storage_account()
self.assert_(stgact.startswith(self.controller.reseller_prefix),
stgact)
# Make sure token given is the expected single use token
token = auth_server.http_connect.last_args[-1]['X-Auth-Token']
self.assert_(self.controller.validate_token(token))
self.assert_(not self.controller.validate_token(token))
auth_server.http_connect = fake_http_connect(201)
stgact = self.controller.add_storage_account('bob')
self.assertEquals(stgact, 'bob')
# Make sure token given is the expected single use token
token = auth_server.http_connect.last_args[-1]['X-Auth-Token']
self.assert_(self.controller.validate_token(token))
self.assert_(not self.controller.validate_token(token))
def test_regular_user(self):
auth_server.http_connect = fake_http_connect(201)
self.controller.create_user('act', 'usr', 'pas').split('/')[-1]
res = self.controller.handle_auth(Request.blank('/v1.0',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'pas'}))
_, _, _, stgact = \
self.controller.validate_token(res.headers['x-auth-token'])
self.assertEquals(stgact, '')
def test_account_admin(self):
auth_server.http_connect = fake_http_connect(201)
stgact = self.controller.create_user(
'act', 'usr', 'pas', admin=True).split('/')[-1]
res = self.controller.handle_auth(Request.blank('/v1.0',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'pas'}))
_, _, _, vstgact = \
self.controller.validate_token(res.headers['x-auth-token'])
self.assertEquals(stgact, vstgact)
def test_reseller_admin(self):
auth_server.http_connect = fake_http_connect(201)
self.controller.create_user(
'act', 'usr', 'pas', reseller_admin=True).split('/')[-1]
res = self.controller.handle_auth(Request.blank('/v1.0',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Auth-User': 'act:usr', 'X-Auth-Key': 'pas'}))
_, _, _, stgact = \
self.controller.validate_token(res.headers['x-auth-token'])
self.assertEquals(stgact, '.reseller_admin')
def test_is_account_admin(self):
req = Request.blank('/', headers={'X-Auth-Admin-User': '.super_admin',
'X-Auth-Admin-Key': 'testkey'})
self.assert_(self.controller.is_account_admin(req, 'any'))
req = Request.blank('/', headers={'X-Auth-Admin-User': '.super_admin',
'X-Auth-Admin-Key': 'testkey2'})
self.assert_(not self.controller.is_account_admin(req, 'any'))
req = Request.blank('/', headers={'X-Auth-Admin-User': '.super_admi',
'X-Auth-Admin-Key': 'testkey'})
self.assert_(not self.controller.is_account_admin(req, 'any'))
auth_server.http_connect = fake_http_connect(201, 201)
self.controller.create_user(
'act1', 'resadmin', 'pas', reseller_admin=True).split('/')[-1]
self.controller.create_user('act1', 'usr', 'pas').split('/')[-1]
self.controller.create_user(
'act2', 'actadmin', 'pas', admin=True).split('/')[-1]
req = Request.blank('/', headers={'X-Auth-Admin-User': 'act1:resadmin',
'X-Auth-Admin-Key': 'pas'})
self.assert_(self.controller.is_account_admin(req, 'any'))
self.assert_(self.controller.is_account_admin(req, 'act1'))
self.assert_(self.controller.is_account_admin(req, 'act2'))
req = Request.blank('/', headers={'X-Auth-Admin-User': 'act1:usr',
'X-Auth-Admin-Key': 'pas'})
self.assert_(not self.controller.is_account_admin(req, 'any'))
self.assert_(not self.controller.is_account_admin(req, 'act1'))
self.assert_(not self.controller.is_account_admin(req, 'act2'))
req = Request.blank('/', headers={'X-Auth-Admin-User': 'act2:actadmin',
'X-Auth-Admin-Key': 'pas'})
self.assert_(not self.controller.is_account_admin(req, 'any'))
self.assert_(not self.controller.is_account_admin(req, 'act1'))
self.assert_(self.controller.is_account_admin(req, 'act2'))
def test_handle_add_user_create_reseller_admin(self):
auth_server.http_connect = fake_http_connect(201)
self.controller.create_user('act', 'usr', 'pas')
self.controller.create_user('act', 'actadmin', 'pas', admin=True)
self.controller.create_user('act', 'resadmin', 'pas',
reseller_admin=True)
req = Request.blank('/account/act/resadmin2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Reseller-Admin': 'true'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/account/act/resadmin2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Reseller-Admin': 'true',
'X-Auth-Admin-User': 'act:usr',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/account/act/resadmin2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Reseller-Admin': 'true',
'X-Auth-Admin-User': 'act:actadmin',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/account/act/resadmin2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Reseller-Admin': 'true',
'X-Auth-Admin-User': 'act:resadmin',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/account/act/resadmin2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Reseller-Admin': 'true',
'X-Auth-Admin-User': '.super_admin',
'X-Auth-Admin-Key': 'testkey'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 2, resp.status_int)
def test_handle_add_user_create_account_admin(self):
auth_server.http_connect = fake_http_connect(201, 201)
self.controller.create_user('act', 'usr', 'pas')
self.controller.create_user('act', 'actadmin', 'pas', admin=True)
self.controller.create_user('act2', 'actadmin', 'pas', admin=True)
self.controller.create_user('act2', 'resadmin', 'pas',
reseller_admin=True)
req = Request.blank('/account/act/actadmin2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/account/act/actadmin2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': 'act:usr',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/account/act/actadmin2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': 'act2:actadmin',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/account/act/actadmin2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': 'act:actadmin',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 2, resp.status_int)
req = Request.blank('/account/act/actadmin3',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': 'act2:resadmin',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 2, resp.status_int)
req = Request.blank('/account/act/actadmin4',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': '.super_admin',
'X-Auth-Admin-Key': 'testkey'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 2, resp.status_int)
def test_handle_add_user_create_normal_user(self):
auth_server.http_connect = fake_http_connect(201, 201)
self.controller.create_user('act', 'usr', 'pas')
self.controller.create_user('act', 'actadmin', 'pas', admin=True)
self.controller.create_user('act2', 'actadmin', 'pas', admin=True)
self.controller.create_user('act2', 'resadmin', 'pas',
reseller_admin=True)
req = Request.blank('/account/act/usr2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/account/act/usr2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': 'act:usr',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/account/act/usr2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': 'act2:actadmin',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/account/act/usr2',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': 'act:actadmin',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 2, resp.status_int)
req = Request.blank('/account/act/usr3',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': 'act2:resadmin',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 2, resp.status_int)
req = Request.blank('/account/act/usr4',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': '.super_admin',
'X-Auth-Admin-Key': 'testkey'})
resp = self.controller.handle_add_user(req)
self.assert_(resp.status_int // 100 == 2, resp.status_int)
def test_handle_account_recreate_permissions(self):
auth_server.http_connect = fake_http_connect(201, 201)
self.controller.create_user('act', 'usr', 'pas')
self.controller.create_user('act', 'actadmin', 'pas', admin=True)
self.controller.create_user('act', 'resadmin', 'pas',
reseller_admin=True)
req = Request.blank('/recreate_accounts',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true'})
resp = self.controller.handle_account_recreate(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/recreate_accounts',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': 'act:usr',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_account_recreate(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/recreate_accounts',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': 'act:actadmin',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_account_recreate(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/recreate_accounts',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': 'act:resadmin',
'X-Auth-Admin-Key': 'pas'})
resp = self.controller.handle_account_recreate(req)
self.assert_(resp.status_int // 100 == 4, resp.status_int)
req = Request.blank('/recreate_accounts',
headers={'X-Auth-User-Key': 'pas',
'X-Auth-User-Admin': 'true',
'X-Auth-Admin-User': '.super_admin',
'X-Auth-Admin-Key': 'testkey'})
resp = self.controller.handle_account_recreate(req)
self.assert_(resp.status_int // 100 == 2, resp.status_int)
if __name__ == '__main__':
unittest.main()

View File

@ -289,6 +289,37 @@ class TestAuth(unittest.TestCase):
req.acl = '.r:.example.com'
self.assertEquals(self.test_auth.authorize(req), None)
def test_account_put_permissions(self):
req = Request.blank('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'})
req.remote_user = 'act:usr,act'
resp = str(self.test_auth.authorize(req))
self.assert_(resp.startswith('403'), resp)
req = Request.blank('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'})
req.remote_user = 'act:usr,act,AUTH_other'
resp = str(self.test_auth.authorize(req))
self.assert_(resp.startswith('403'), resp)
# Even PUTs to your own account as account admin should fail
req = Request.blank('/v1/AUTH_old', environ={'REQUEST_METHOD': 'PUT'})
req.remote_user = 'act:usr,act,AUTH_old'
resp = str(self.test_auth.authorize(req))
self.assert_(resp.startswith('403'), resp)
req = Request.blank('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'})
req.remote_user = 'act:usr,act,.reseller_admin'
resp = self.test_auth.authorize(req)
self.assertEquals(resp, None)
# .super_admin is not something the middleware should ever see or care
# about
req = Request.blank('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'})
req.remote_user = 'act:usr,act,.super_admin'
resp = self.test_auth.authorize(req)
resp = str(self.test_auth.authorize(req))
self.assert_(resp.startswith('403'), resp)
if __name__ == '__main__':
unittest.main()

View File

@ -2153,90 +2153,136 @@ class TestAccountController(unittest.TestCase):
finally:
self.app.object_chunk_size = orig_object_chunk_size
def test_PUT(self):
with save_globals():
controller = proxy_server.AccountController(self.app, 'account')
def test_status_map(statuses, expected, **kwargs):
proxy_server.http_connect = \
fake_http_connect(*statuses, **kwargs)
self.app.memcache.store = {}
req = Request.blank('/a', {})
req.content_length = 0
self.app.update_request(req)
res = controller.PUT(req)
expected = str(expected)
self.assertEquals(res.status[:len(expected)], expected)
test_status_map((201, 201, 201), 201)
test_status_map((201, 201, 500), 201)
test_status_map((201, 500, 500), 503)
test_status_map((204, 500, 404), 503)
def test_PUT_max_account_name_length(self):
with save_globals():
controller = proxy_server.AccountController(self.app, '1'*256)
self.assert_status_map(controller.PUT, (201, 201, 201), 201)
controller = proxy_server.AccountController(self.app, '2'*257)
self.assert_status_map(controller.PUT, (201, 201, 201), 400)
def test_PUT_connect_exceptions(self):
with save_globals():
controller = proxy_server.AccountController(self.app, 'account')
self.assert_status_map(controller.PUT, (201, 201, -1), 201)
self.assert_status_map(controller.PUT, (201, -1, -1), 503)
self.assert_status_map(controller.PUT, (503, 503, -1), 503)
def test_PUT_metadata(self):
self.metadata_helper('PUT')
def test_POST_metadata(self):
self.metadata_helper('POST')
def metadata_helper(self, method):
for test_header, test_value in (
('X-Account-Meta-TestHeader', 'TestValue'),
('X-Account-Meta-TestHeader', '')):
test_errors = []
def test_connect(ipaddr, port, device, partition, method, path,
headers=None, query_string=None):
for k, v in headers.iteritems():
if k.lower() == test_header.lower() and \
v == test_value:
break
else:
test_errors.append('%s: %s not in %s' %
(test_header, test_value, headers))
if path == '/a':
for k, v in headers.iteritems():
if k.lower() == test_header.lower() and \
v == test_value:
break
else:
test_errors.append('%s: %s not in %s' %
(test_header, test_value, headers))
with save_globals():
controller = \
proxy_server.AccountController(self.app, 'a')
proxy_server.http_connect = fake_http_connect(201, 201, 201,
give_connect=test_connect)
req = Request.blank('/a', environ={'REQUEST_METHOD': 'POST'},
give_connect=test_connect)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={test_header: test_value})
self.app.update_request(req)
res = controller.POST(req)
res = getattr(controller, method)(req)
self.assertEquals(test_errors, [])
def test_PUT_bad_metadata(self):
self.bad_metadata_helper('PUT')
def test_POST_bad_metadata(self):
self.bad_metadata_helper('POST')
def bad_metadata_helper(self, method):
with save_globals():
controller = proxy_server.AccountController(self.app, 'a')
proxy_server.http_connect = fake_http_connect(204, 204, 204)
req = Request.blank('/a', environ={'REQUEST_METHOD': 'POST'})
proxy_server.http_connect = fake_http_connect(200, 201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method})
self.app.update_request(req)
resp = controller.POST(req)
self.assertEquals(resp.status_int, 204)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
proxy_server.http_connect = fake_http_connect(204, 204, 204)
req = Request.blank('/a', environ={'REQUEST_METHOD': 'POST'},
proxy_server.http_connect = fake_http_connect(201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={'X-Account-Meta-' +
('a' * MAX_META_NAME_LENGTH): 'v'})
self.app.update_request(req)
resp = controller.POST(req)
self.assertEquals(resp.status_int, 204)
proxy_server.http_connect = fake_http_connect(204, 204, 204)
req = Request.blank('/a', environ={'REQUEST_METHOD': 'POST'},
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
proxy_server.http_connect = fake_http_connect(201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={'X-Account-Meta-' +
('a' * (MAX_META_NAME_LENGTH + 1)): 'v'})
self.app.update_request(req)
resp = controller.POST(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 400)
proxy_server.http_connect = fake_http_connect(204, 204, 204)
req = Request.blank('/a', environ={'REQUEST_METHOD': 'POST'},
proxy_server.http_connect = fake_http_connect(201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={'X-Account-Meta-Too-Long':
'a' * MAX_META_VALUE_LENGTH})
self.app.update_request(req)
resp = controller.POST(req)
self.assertEquals(resp.status_int, 204)
proxy_server.http_connect = fake_http_connect(204, 204, 204)
req = Request.blank('/a', environ={'REQUEST_METHOD': 'POST'},
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
proxy_server.http_connect = fake_http_connect(201, 201, 201)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers={'X-Account-Meta-Too-Long':
'a' * (MAX_META_VALUE_LENGTH + 1)})
self.app.update_request(req)
resp = controller.POST(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 400)
proxy_server.http_connect = fake_http_connect(204, 204, 204)
proxy_server.http_connect = fake_http_connect(201, 201, 201)
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Account-Meta-%d' % x] = 'v'
req = Request.blank('/a', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers=headers)
self.app.update_request(req)
resp = controller.POST(req)
self.assertEquals(resp.status_int, 204)
proxy_server.http_connect = fake_http_connect(204, 204, 204)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
proxy_server.http_connect = fake_http_connect(201, 201, 201)
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Account-Meta-%d' % x] = 'v'
req = Request.blank('/a', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers=headers)
self.app.update_request(req)
resp = controller.POST(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 400)
proxy_server.http_connect = fake_http_connect(204, 204, 204)
proxy_server.http_connect = fake_http_connect(201, 201, 201)
headers = {}
header_value = 'a' * MAX_META_VALUE_LENGTH
size = 0
@ -2248,18 +2294,18 @@ class TestAccountController(unittest.TestCase):
if MAX_META_OVERALL_SIZE - size > 1:
headers['X-Account-Meta-a'] = \
'a' * (MAX_META_OVERALL_SIZE - size - 1)
req = Request.blank('/a', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers=headers)
self.app.update_request(req)
resp = controller.POST(req)
self.assertEquals(resp.status_int, 204)
proxy_server.http_connect = fake_http_connect(204, 204, 204)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 201)
proxy_server.http_connect = fake_http_connect(201, 201, 201)
headers['X-Account-Meta-a'] = \
'a' * (MAX_META_OVERALL_SIZE - size)
req = Request.blank('/a', environ={'REQUEST_METHOD': 'POST'},
req = Request.blank('/a/c', environ={'REQUEST_METHOD': method},
headers=headers)
self.app.update_request(req)
resp = controller.POST(req)
resp = getattr(controller, method)(req)
self.assertEquals(resp.status_int, 400)