merged with trunk

This commit is contained in:
John Dickinson 2010-10-18 15:47:59 +00:00
commit c53f49ce98
21 changed files with 1047 additions and 211 deletions

View File

@ -1,4 +1,4 @@
#!/bin/bash
python test/functional/tests.py
nosetests test/functional --exe
nosetests test/functionalnosetests --exe

View File

@ -470,16 +470,6 @@ error_suppression_interval 60 Time in seconds that must
no longer error limited
error_suppression_limit 10 Error count to consider a
node error limited
rate_limit 20000.0 Max container level ops per
second
account_rate_limit 200.0 Max account level ops per
second
rate_limit_account_whitelist Comma separated list of
account name hashes to not
rate limit
rate_limit_account_blacklist Comma separated list of
account name hashes to block
completely
============================ =============== =============================
[auth]

View File

@ -199,6 +199,12 @@ virtual machine will emulate running a four node Swift cluster.
[filter:cache]
use = egg:swift#memcache
#. Create `/etc/swift/swift.conf`::
[swift-hash]
# random unique string that can never change (DO NOT LOSE)
swift_hash_path_suffix = changeme
#. Create `/etc/swift/account-server/1.conf`::
[DEFAULT]

View File

@ -25,6 +25,7 @@ Overview:
overview_auth
overview_replication
overview_stats
ratelimit
Development:

View File

@ -106,3 +106,10 @@ MemCacheD
.. automodule:: swift.common.memcached
:members:
:show-inheritance:
Ratelimit
=========
.. automodule:: swift.common.middleware.ratelimit
:members:
:show-inheritance:

67
doc/source/ratelimit.rst Normal file
View File

@ -0,0 +1,67 @@
=============
Rate Limiting
=============
Rate limiting in swift is implemented as a pluggable middleware. Rate
limiting is performed on requests that result in database writes to the
account and container sqlite dbs. It uses memcached and is dependant on
the proxy servers having highly synchronized time. The rate limits are
limited by the accuracy of the proxy server clocks.
--------------
Configuration
--------------
All configuration is optional. If no account or container limits are provided
there will be no rate limiting. Configuration available:
======================== ========= ===========================================
Option Default Description
------------------------ --------- -------------------------------------------
clock_accuracy 1000 Represents how accurate the proxy servers'
system clocks are with each other. 1000
means that all the proxies' clock are
accurate to each other within 1
millisecond. No ratelimit should be
higher than the clock accuracy.
max_sleep_time_seconds 60 App will immediately return a 498 response
if the necessary sleep time ever exceeds
the given max_sleep_time_seconds.
account_ratelimit 0 If set, will limit all requests to
/account_name and PUTs to
/account_name/container_name. Number is in
requests per second
account_whitelist '' Comma separated lists of account names that
will not be rate limited.
account_blacklist '' Comma separated lists of account names that
will not be allowed. Returns a 497 response.
container_ratelimit_size '' When set with container_limit_x = r:
for containers of size x, limit requests
per second to r. Will limit GET and HEAD
requests to /account_name/container_name
and PUTs and DELETEs to
/account_name/container_name/object_name
======================== ========= ===========================================
The container rate limits are linearly interpolated from the values given. A
sample container rate limiting could be:
container_ratelimit_100 = 100
container_ratelimit_200 = 50
container_ratelimit_500 = 20
This would result in
================ ============
Container Size Rate Limit
---------------- ------------
0-99 No limiting
100 100
150 75
500 20
1000 20
================ ============

View File

@ -1,6 +1,7 @@
[DEFAULT]
# bind_ip = 0.0.0.0
# bind_port = 6002
# backlog = 4096
# workers = 1
# user = swift
# swift_dir = /etc/swift

View File

@ -1,6 +1,7 @@
[DEFAULT]
# bind_ip = 0.0.0.0
# bind_port = 6001
# backlog = 4096
# workers = 1
# user = swift
# swift_dir = /etc/swift

View File

@ -1,6 +1,7 @@
[DEFAULT]
# bind_ip = 0.0.0.0
# bind_port = 6000
# backlog = 4096
# workers = 1
# user = swift
# swift_dir = /etc/swift
@ -22,6 +23,8 @@ use = egg:swift#object
# disk_chunk_size = 65536
# max_upload_time = 86400
# slow = 1
# on PUTs, sync data every n MB
# mb_per_sync = 512
[object-replicator]
# log_name = object-replicator

View File

@ -1,6 +1,7 @@
[DEFAULT]
# bind_ip = 0.0.0.0
# bind_port = 80
# backlog = 4096
# swift_dir = /etc/swift
# workers = 1
# user = swift
@ -8,7 +9,7 @@
# key_file = /etc/swift/proxy.key
[pipeline:main]
pipeline = healthcheck cache auth proxy-server
pipeline = healthcheck cache ratelimit auth proxy-server
[app:proxy-server]
use = egg:swift#proxy
@ -28,12 +29,6 @@ use = egg:swift#proxy
# error_suppression_interval = 60
# How many errors can accumulate before a node is temporarily ignored.
# error_suppression_limit = 10
# How many ops per second to one container (as a float)
# rate_limit = 20000.0
# How many ops per second for account-level operations
# account_rate_limit = 200.0
# rate_limit_account_whitelist = acct1,acct2,etc
# rate_limit_account_blacklist = acct3,acct4,etc
[filter:auth]
use = egg:swift#auth
@ -56,3 +51,27 @@ use = egg:swift#memcache
# Default for memcache_servers is below, but you can specify multiple servers
# with the format: 10.1.2.3:11211,10.1.2.4:11211
# memcache_servers = 127.0.0.1:11211
[filter:ratelimit]
use = egg:swift#ratelimit
# clock_accuracy should represent how accurate the proxy servers' system clocks
# are with each other. 1000 means that all the proxies' clock are accurate to
# each other within 1 millisecond. No ratelimit should be higher than the
# clock accuracy.
# clock_accuracy = 1000
# max_sleep_time_seconds = 60
# account_ratelimit of 0 means disabled
# account_ratelimit = 0
# these are comma separated lists of account names
# account_whitelist = a,b
# account_blacklist = c,d
# with container_limit_x = r
# for containers of size x limit requests per second to r. The container
# rate will be linearly interpolated from the values given. With the values
# below, a container of size 5 will get a rate of 75.
# container_ratelimit_0 = 100
# container_ratelimit_10 = 50
# container_ratelimit_50 = 20

3
etc/swift.conf-sample Normal file
View File

@ -0,0 +1,3 @@
[swift-hash]
swift_hash_path_suffix = changeme

View File

@ -92,6 +92,7 @@ setup(
'auth=swift.common.middleware.auth:filter_factory',
'healthcheck=swift.common.middleware.healthcheck:filter_factory',
'memcache=swift.common.middleware.memcache:filter_factory',
'ratelimit=swift.common.middleware.ratelimit:filter_factory',
],
},
)

View File

@ -45,6 +45,7 @@ class Daemon(object):
sys.stderr = utils.LoggerFileObject(self.logger)
utils.drop_privileges(self.conf.get('user', 'swift'))
utils.validate_configuration()
try:
os.setsid()

View File

@ -0,0 +1,217 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import eventlet
from webob import Request, Response
from swift.common.utils import split_path, cache_from_env, get_logger
from swift.proxy.server import get_container_memcache_key
class MaxSleepTimeHit(Exception):
pass
class RateLimitMiddleware(object):
"""
Rate limiting middleware
Rate limits requests on both an Account and Container level. Limits are
configurable.
"""
def __init__(self, app, conf, logger=None):
self.app = app
if logger:
self.logger = logger
else:
self.logger = get_logger(conf)
self.account_ratelimit = float(conf.get('account_ratelimit', 0))
self.max_sleep_time_seconds = float(conf.get('max_sleep_time_seconds',
60))
self.clock_accuracy = int(conf.get('clock_accuracy', 1000))
self.ratelimit_whitelist = [acc.strip() for acc in
conf.get('account_whitelist', '').split(',')
if acc.strip()]
self.ratelimit_blacklist = [acc.strip() for acc in
conf.get('account_blacklist', '').split(',')
if acc.strip()]
self.memcache_client = None
conf_limits = []
for conf_key in conf.keys():
if conf_key.startswith('container_ratelimit_'):
cont_size = int(conf_key[len('container_ratelimit_'):])
rate = float(conf[conf_key])
conf_limits.append((cont_size, rate))
conf_limits.sort()
self.container_ratelimits = []
while conf_limits:
cur_size, cur_rate = conf_limits.pop(0)
if conf_limits:
next_size, next_rate = conf_limits[0]
slope = (float(next_rate) - float(cur_rate)) \
/ (next_size - cur_size)
def new_scope(cur_size, slope, cur_rate):
# making new scope for variables
return lambda x: (x - cur_size) * slope + cur_rate
line_func = new_scope(cur_size, slope, cur_rate)
else:
line_func = lambda x: cur_rate
self.container_ratelimits.append((cur_size, cur_rate, line_func))
def get_container_maxrate(self, container_size):
"""
Returns number of requests allowed per second for given container size.
"""
last_func = None
if container_size:
container_size = int(container_size)
for size, rate, func in self.container_ratelimits:
if container_size < size:
break
last_func = func
if last_func:
return last_func(container_size)
return None
def get_ratelimitable_key_tuples(self, req_method, account_name,
container_name=None,
obj_name=None):
"""
Returns a list of key (used in memcache), ratelimit tuples. Keys
should be checked in order.
:param req_method: HTTP method
:param account_name: account name from path
:param container_name: container name from path
:param obj_name: object name from path
"""
keys = []
if self.account_ratelimit and account_name and (
not (container_name or obj_name) or
(container_name and not obj_name and req_method == 'PUT')):
keys.append(("ratelimit/%s" % account_name,
self.account_ratelimit))
if account_name and container_name and (
(not obj_name and req_method in ('GET', 'HEAD')) or
(obj_name and req_method in ('PUT', 'DELETE'))):
container_size = None
memcache_key = get_container_memcache_key(account_name,
container_name)
container_info = self.memcache_client.get(memcache_key)
if type(container_info) == dict:
container_size = container_info.get('container_size', 0)
container_rate = self.get_container_maxrate(container_size)
if container_rate:
keys.append(("ratelimit/%s/%s" % (account_name,
container_name),
container_rate))
return keys
def _get_sleep_time(self, key, max_rate):
'''
Returns the amount of time (a float in seconds) that the app
should sleep. Throws a MaxSleepTimeHit exception if maximum
sleep time is exceeded.
:param key: a memcache key
:param max_rate: maximum rate allowed in requests per second
'''
now_m = int(round(time.time() * self.clock_accuracy))
time_per_request_m = int(round(self.clock_accuracy / max_rate))
running_time_m = self.memcache_client.incr(key,
delta=time_per_request_m)
need_to_sleep_m = 0
request_time_limit = now_m + (time_per_request_m * max_rate)
if running_time_m < now_m:
next_avail_time = int(now_m + time_per_request_m)
self.memcache_client.set(key, str(next_avail_time),
serialize=False)
elif running_time_m - now_m - time_per_request_m > 0:
need_to_sleep_m = running_time_m - now_m - time_per_request_m
max_sleep_m = self.max_sleep_time_seconds * self.clock_accuracy
if max_sleep_m - need_to_sleep_m <= self.clock_accuracy * 0.01:
# treat as no-op decrement time
self.memcache_client.incr(key, delta=-time_per_request_m)
raise MaxSleepTimeHit("Max Sleep Time Exceeded: %s" %
need_to_sleep_m)
return float(need_to_sleep_m) / self.clock_accuracy
def handle_ratelimit(self, req, account_name, container_name, obj_name):
'''
Performs rate limiting and account white/black listing. Sleeps
if necessary.
:param account_name: account name from path
:param container_name: container name from path
:param obj_name: object name from path
'''
if account_name in self.ratelimit_blacklist:
self.logger.error('Returning 497 because of blacklisting')
return Response(status='497 Blacklisted',
body='Your account has been blacklisted', request=req)
if account_name in self.ratelimit_whitelist:
return None
for key, max_rate in self.get_ratelimitable_key_tuples(
req.method,
account_name,
container_name=container_name,
obj_name=obj_name):
try:
need_to_sleep = self._get_sleep_time(key, max_rate)
if need_to_sleep > 0:
eventlet.sleep(need_to_sleep)
except MaxSleepTimeHit, e:
self.logger.error('Returning 498 because of ops ' + \
'rate limiting (Max Sleep) %s' % e)
error_resp = Response(status='498 Rate Limited',
body='Slow down', request=req)
return error_resp
return None
def __call__(self, env, start_response):
"""
WSGI entry point.
Wraps env in webob.Request object and passes it down.
:param env: WSGI environment dictionary
:param start_response: WSGI callable
"""
req = Request(env)
if self.memcache_client is None:
self.memcache_client = cache_from_env(env)
version, account, container, obj = split_path(req.path, 1, 4, True)
ratelimit_resp = self.handle_ratelimit(req, account, container, obj)
if ratelimit_resp is None:
return self.app(env, start_response)
else:
return ratelimit_resp(env, start_response)
def filter_factory(global_conf, **local_conf):
"""
paste.deploy app factory for creating WSGI proxy apps.
"""
conf = global_conf.copy()
conf.update(local_conf)
def limit_filter(app):
return RateLimitMiddleware(app, conf)
return limit_filter

View File

@ -31,7 +31,7 @@ import ctypes
import ctypes.util
import fcntl
import struct
from ConfigParser import ConfigParser
from ConfigParser import ConfigParser, NoSectionError, NoOptionError
from tempfile import mkstemp
import cPickle as pickle
@ -56,12 +56,25 @@ _posix_fadvise = None
# Used by hash_path to offer a bit more security when generating hashes for
# paths. It simply appends this value to all paths; guessing the hash a path
# will end up with would also require knowing this suffix.
HASH_PATH_SUFFIX = os.environ.get('SWIFT_HASH_PATH_SUFFIX', 'endcap')
hash_conf = ConfigParser()
HASH_PATH_SUFFIX = ''
if hash_conf.read('/etc/swift/swift.conf'):
try:
HASH_PATH_SUFFIX = hash_conf.get('swift-hash',
'swift_hash_path_suffix')
except (NoSectionError, NoOptionError):
pass
# Used when reading config values
TRUE_VALUES = set(('true', '1', 'yes', 'True', 'Yes', 'on', 'On'))
def validate_configuration():
if HASH_PATH_SUFFIX == '':
sys.exit("Error: [swift-hash]: swift_hash_path_suffix missing "
"from /etc/swift/swift.conf")
def load_libc_function(func_name):
"""
Attempt to find the function in libc, otherwise return a no-op func.

View File

@ -34,7 +34,7 @@ wsgi.ACCEPT_ERRNO.add(ECONNRESET)
from eventlet.green import socket, ssl
from swift.common.utils import get_logger, drop_privileges, \
LoggerFileObject, NullLogger
validate_configuration, LoggerFileObject, NullLogger
def monkey_patch_mimetools():
@ -112,6 +112,7 @@ def run_wsgi(conf_file, app_section, *args, **kwargs): # pragma: no cover
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 600)
worker_count = int(conf.get('workers', '1'))
drop_privileges(conf.get('user', 'swift'))
validate_configuration()
def run_server():
wsgi.HttpProtocol.default_request_version = "HTTP/1.0"

View File

@ -259,7 +259,7 @@ class ObjectController(object):
self.log_requests = conf.get('log_requests', 't')[:1].lower() == 't'
self.max_upload_time = int(conf.get('max_upload_time', 86400))
self.slow = int(conf.get('slow', 0))
self.chunks_per_sync = int(conf.get('chunks_per_sync', 8000))
self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
def container_update(self, op, account, container, obj, headers_in,
headers_out, objdevice):
@ -359,11 +359,10 @@ class ObjectController(object):
upload_expiration = time.time() + self.max_upload_time
etag = md5()
upload_size = 0
last_sync = 0
with file.mkstemp() as (fd, tmppath):
if 'content-length' in request.headers:
fallocate(fd, int(request.headers['content-length']))
chunk_count = 0
dropped_cache = 0
for chunk in iter(lambda: request.body_file.read(
self.network_chunk_size), ''):
upload_size += len(chunk)
@ -373,13 +372,11 @@ class ObjectController(object):
while chunk:
written = os.write(fd, chunk)
chunk = chunk[written:]
chunk_count += 1
# For large files sync every 512MB (by default) written
if chunk_count % self.chunks_per_sync == 0:
if upload_size - last_sync >= self.bytes_per_sync:
os.fdatasync(fd)
drop_buffer_cache(fd, dropped_cache,
upload_size - dropped_cache)
dropped_cache = upload_size
drop_buffer_cache(fd, last_sync, upload_size - last_sync)
last_sync = upload_size
if 'content-length' in request.headers and \
int(request.headers['content-length']) != upload_size:

View File

@ -89,6 +89,11 @@ def delay_denial(func):
return wrapped
def get_container_memcache_key(account, container):
path = '/%s/%s' % (account, container)
return 'container%s' % path
class Controller(object):
"""Base WSGI controller class for the proxy"""
@ -229,21 +234,20 @@ class Controller(object):
partition, nodes = self.app.container_ring.get_nodes(
account, container)
path = '/%s/%s' % (account, container)
cache_key = 'container%s' % path
# Older memcache values (should be treated as if they aren't there):
# 0 = no responses, 200 = found, 404 = not found, -1 = mixed responses
# Newer memcache values:
# [older status value from above, read acl, write acl]
cache_key = get_container_memcache_key(account, container)
cache_value = self.app.memcache.get(cache_key)
if hasattr(cache_value, '__iter__'):
status, read_acl, write_acl = cache_value
if status == 200:
if isinstance(cache_value, dict):
status = cache_value['status']
read_acl = cache_value['read_acl']
write_acl = cache_value['write_acl']
if status // 100 == 2:
return partition, nodes, read_acl, write_acl
if not self.account_info(account)[1]:
return (None, None, None, None)
result_code = 0
read_acl = None
write_acl = None
container_size = None
attempts_left = self.app.container_ring.replica_count
headers = {'x-cf-trans-id': self.trans_id}
for node in self.iter_nodes(partition, nodes, self.app.container_ring):
@ -260,6 +264,8 @@ class Controller(object):
result_code = 200
read_acl = resp.getheader('x-container-read')
write_acl = resp.getheader('x-container-write')
container_size = \
resp.getheader('X-Container-Object-Count')
break
elif resp.status == 404:
result_code = 404 if not result_code else -1
@ -278,7 +284,10 @@ class Controller(object):
cache_timeout = self.app.recheck_container_existence
else:
cache_timeout = self.app.recheck_container_existence * 0.1
self.app.memcache.set(cache_key, (result_code, read_acl, write_acl),
self.app.memcache.set(cache_key, {'status': result_code,
'read_acl': read_acl,
'write_acl': write_acl,
'container_size': container_size},
timeout=cache_timeout)
if result_code == 200:
return partition, nodes, read_acl, write_acl
@ -415,6 +424,7 @@ class Controller(object):
if req.method == 'GET' and source.status in (200, 206):
res = Response(request=req, conditional_response=True)
res.bytes_transferred = 0
def file_iter():
try:
while True:
@ -691,7 +701,7 @@ class ObjectController(Controller):
req.bytes_transferred += len_chunk
if req.bytes_transferred > MAX_FILE_SIZE:
return HTTPRequestEntityTooLarge(request=req)
for conn in conns:
for conn in list(conns):
try:
with ChunkWriteTimeout(self.app.node_timeout):
if req.headers.get('transfer-encoding'):
@ -702,6 +712,13 @@ class ObjectController(Controller):
self.exception_occurred(conn.node, 'Object',
'Trying to write to %s' % req.path)
conns.remove(conn)
if len(conns) <= len(nodes) / 2:
self.app.logger.error(
'Object PUT exceptions during send, %s/%s '
'required connections, transaction %s' %
(len(conns), len(nodes) // 2 + 1,
self.trans_id))
return HTTPServiceUnavailable(request=req)
if req.headers.get('transfer-encoding') and chunk == '':
break
except ChunkReadTimeout, err:
@ -740,7 +757,9 @@ class ObjectController(Controller):
self.exception_occurred(conn.node, 'Object',
'Trying to get final status of PUT to %s' % req.path)
if len(etags) > 1:
return HTTPUnprocessableEntity(request=req)
self.app.logger.error(
'Object servers returned %s mismatched etags' % len(etags))
return HTTPServerError(request=req)
etag = len(etags) and etags.pop() or None
while len(statuses) < len(nodes):
statuses.append(503)
@ -860,6 +879,17 @@ class ContainerController(Controller):
self.account_name, self.container_name)
resp = self.GETorHEAD_base(req, 'Container', part, nodes,
req.path_info, self.app.container_ring.replica_count)
# set the memcache container size for ratelimiting
cache_key = get_container_memcache_key(self.account_name,
self.container_name)
self.app.memcache.set(cache_key,
{'status': resp.status_int,
'read_acl': resp.headers.get('x-container-read'),
'write_acl': resp.headers.get('x-container-write'),
'container_size': resp.headers.get('x-container-object-count')},
timeout=self.app.recheck_container_existence)
if 'swift.authorize' in req.environ:
req.acl = resp.headers.get('x-container-read')
aresp = req.environ['swift.authorize'](req)
@ -941,7 +971,9 @@ class ContainerController(Controller):
statuses.append(503)
reasons.append('')
bodies.append('')
self.app.memcache.delete('container%s' % req.path_info.rstrip('/'))
cache_key = get_container_memcache_key(self.account_name,
self.container_name)
self.app.memcache.delete(cache_key)
return self.best_response(req, statuses, reasons, bodies,
'Container PUT')
@ -993,7 +1025,9 @@ class ContainerController(Controller):
statuses.append(503)
reasons.append('')
bodies.append('')
self.app.memcache.delete('container%s' % req.path_info.rstrip('/'))
cache_key = get_container_memcache_key(self.account_name,
self.container_name)
self.app.memcache.delete(cache_key)
return self.best_response(req, statuses, reasons, bodies,
'Container POST')
@ -1047,7 +1081,9 @@ class ContainerController(Controller):
statuses.append(503)
reasons.append('')
bodies.append('')
self.app.memcache.delete('container%s' % req.path_info.rstrip('/'))
cache_key = get_container_memcache_key(self.account_name,
self.container_name)
self.app.memcache.delete(cache_key)
resp = self.best_response(req, statuses, reasons, bodies,
'Container DELETE')
if 200 <= resp.status_int <= 299:
@ -1214,14 +1250,6 @@ class BaseApplication(object):
self.account_ring = account_ring or \
Ring(os.path.join(swift_dir, 'account.ring.gz'))
self.memcache = memcache
self.rate_limit = float(conf.get('rate_limit', 20000.0))
self.account_rate_limit = float(conf.get('account_rate_limit', 200.0))
self.rate_limit_whitelist = [x.strip() for x in
conf.get('rate_limit_account_whitelist', '').split(',')
if x.strip()]
self.rate_limit_blacklist = [x.strip() for x in
conf.get('rate_limit_account_blacklist', '').split(',')
if x.strip()]
def get_controller(self, path):
"""
@ -1302,10 +1330,6 @@ class BaseApplication(object):
return HTTPPreconditionFailed(request=req, body='Invalid UTF8')
if not controller:
return HTTPPreconditionFailed(request=req, body='Bad URL')
rate_limit_allowed_err_resp = \
self.check_rate_limit(req, path_parts)
if rate_limit_allowed_err_resp is not None:
return rate_limit_allowed_err_resp
controller = controller(self, **path_parts)
controller.trans_id = req.headers.get('x-cf-trans-id', '-')
@ -1339,10 +1363,6 @@ class BaseApplication(object):
self.logger.exception('ERROR Unhandled exception in request')
return HTTPServerError(request=req)
def check_rate_limit(self, req, path_parts):
"""Check for rate limiting."""
return None
class Application(BaseApplication):
"""WSGI application for the proxy server."""
@ -1395,45 +1415,6 @@ class Application(BaseApplication):
trans_time,
)))
def check_rate_limit(self, req, path_parts):
"""
Check for rate limiting.
:param req: webob.Request object
:param path_parts: parsed path dictionary
"""
if path_parts['account_name'] in self.rate_limit_blacklist:
self.logger.error('Returning 497 because of blacklisting')
return Response(status='497 Blacklisted',
body='Your account has been blacklisted', request=req)
if path_parts['account_name'] not in self.rate_limit_whitelist:
current_second = time.strftime('%x%H%M%S')
general_rate_limit_key = '%s%s' % (path_parts['account_name'],
current_second)
ops_count = self.memcache.incr(general_rate_limit_key, timeout=2)
if ops_count > self.rate_limit:
self.logger.error(
'Returning 498 because of ops rate limiting')
return Response(status='498 Rate Limited',
body='Slow down', request=req)
elif (path_parts['container_name']
and not path_parts['object_name']) \
or \
(path_parts['account_name']
and not path_parts['container_name']):
# further limit operations on a single account or container
rate_limit_key = '%s%s%s' % (path_parts['account_name'],
path_parts['container_name'] or '-',
current_second)
ops_count = self.memcache.incr(rate_limit_key, timeout=2)
if ops_count > self.account_rate_limit:
self.logger.error(
'Returning 498 because of account and container'
' rate limiting')
return Response(status='498 Rate Limited',
body='Slow down', request=req)
return None
def app_factory(global_conf, **local_conf):
"""paste.deploy app factory for creating WSGI proxy apps."""

View File

@ -0,0 +1,363 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import time
from contextlib import contextmanager
from threading import Thread
from webob import Request
from swift.common.middleware import ratelimit
from swift.proxy.server import get_container_memcache_key
class FakeMemcache(object):
def __init__(self):
self.store = {}
def get(self, key):
return self.store.get(key)
def set(self, key, value, serialize=False, timeout=0):
self.store[key] = value
return True
def incr(self, key, delta=1, timeout=0):
self.store[key] = int(self.store.setdefault(key, 0)) + delta
return int(self.store[key])
@contextmanager
def soft_lock(self, key, timeout=0, retries=5):
yield True
def delete(self, key):
try:
del self.store[key]
except:
pass
return True
def mock_http_connect(response, headers=None, with_exc=False):
class FakeConn(object):
def __init__(self, status, headers, with_exc):
self.status = status
self.reason = 'Fake'
self.host = '1.2.3.4'
self.port = '1234'
self.with_exc = with_exc
self.headers = headers
if self.headers is None:
self.headers = {}
def getresponse(self):
if self.with_exc:
raise Exception('test')
return self
def getheader(self, header):
return self.headers[header]
def read(self, amt=None):
return ''
def close(self):
return
return lambda *args, **kwargs: FakeConn(response, headers, with_exc)
class FakeApp(object):
def __call__(self, env, start_response):
return ['204 No Content']
class FakeLogger(object):
def error(self, msg):
# a thread safe logger
pass
def start_response(*args):
pass
def dummy_filter_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
def limit_filter(app):
return ratelimit.RateLimitMiddleware(app, conf, logger=FakeLogger())
return limit_filter
class TestRateLimit(unittest.TestCase):
def _run(self, callable_func, num, rate, extra_sleep=0,
total_time=None, check_time=True):
begin = time.time()
for x in range(0, num):
result = callable_func()
# Extra sleep is here to test with different call intervals.
time.sleep(extra_sleep)
end = time.time()
if total_time is None:
total_time = num / rate
# Allow for one second of variation in the total time.
time_diff = abs(total_time - (end - begin))
if check_time:
self.assertTrue(time_diff < 1)
return time_diff
def test_get_container_maxrate(self):
conf_dict = {'container_ratelimit_10': 200,
'container_ratelimit_50': 100,
'container_ratelimit_75': 30}
test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp())
self.assertEquals(test_ratelimit.get_container_maxrate(0), None)
self.assertEquals(test_ratelimit.get_container_maxrate(5), None)
self.assertEquals(test_ratelimit.get_container_maxrate(10), 200)
self.assertEquals(test_ratelimit.get_container_maxrate(60), 72)
self.assertEquals(test_ratelimit.get_container_maxrate(160), 30)
def test_get_ratelimitable_key_tuples(self):
current_rate = 13
conf_dict = {'account_ratelimit': current_rate,
'container_ratelimit_3': 200}
fake_memcache = FakeMemcache()
fake_memcache.store[get_container_memcache_key('a', 'c')] = \
{'container_size': 5}
the_app = ratelimit.RateLimitMiddleware(None, conf_dict,
logger=FakeLogger())
the_app.memcache_client = fake_memcache
self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'GET', 'a', None, None)), 1)
self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'POST', 'a', 'c', None)), 0)
self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'PUT', 'a', 'c', None)), 1)
self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'GET', 'a', 'c', None)), 1)
self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'GET', 'a', 'c', 'o')), 0)
self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'PUT', 'a', 'c', 'o')), 1)
def test_ratelimit(self):
current_rate = 13
num_calls = 5
conf_dict = {'account_ratelimit': current_rate}
self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
req = Request.blank('/v/a')
req.environ['swift.cache'] = FakeMemcache()
make_app_call = lambda: self.test_ratelimit(req.environ,
start_response)
self._run(make_app_call, num_calls, current_rate)
def test_ratelimit_whitelist(self):
current_rate = 2
conf_dict = {'account_ratelimit': current_rate,
'max_sleep_time_seconds': 2,
'account_whitelist': 'a',
'account_blacklist': 'b'}
self.test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
req = Request.blank('/v/a/c')
req.environ['swift.cache'] = FakeMemcache()
class rate_caller(Thread):
def __init__(self, parent):
Thread.__init__(self)
self.parent = parent
def run(self):
self.result = self.parent.test_ratelimit(req.environ,
start_response)
nt = 5
begin = time.time()
threads = []
for i in range(nt):
rc = rate_caller(self)
rc.start()
threads.append(rc)
for thread in threads:
thread.join()
the_498s = [t for t in threads if \
''.join(t.result).startswith('Slow down')]
self.assertEquals(len(the_498s), 0)
time_took = time.time() - begin
self.assert_(time_took < 1)
def test_ratelimit_blacklist(self):
current_rate = 2
conf_dict = {'account_ratelimit': current_rate,
'max_sleep_time_seconds': 2,
'account_whitelist': 'a',
'account_blacklist': 'b'}
self.test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
req = Request.blank('/v/b/c')
req.environ['swift.cache'] = FakeMemcache()
class rate_caller(Thread):
def __init__(self, parent):
Thread.__init__(self)
self.parent = parent
def run(self):
self.result = self.parent.test_ratelimit(req.environ,
start_response)
nt = 5
begin = time.time()
threads = []
for i in range(nt):
rc = rate_caller(self)
rc.start()
threads.append(rc)
for thread in threads:
thread.join()
the_497s = [t for t in threads if \
''.join(t.result).startswith('Your account')]
self.assertEquals(len(the_497s), 5)
time_took = time.time() - begin
self.assert_(round(time_took, 1) == 0)
def test_ratelimit_max_rate_double(self):
current_rate = 2
conf_dict = {'account_ratelimit': current_rate,
'clock_accuracy': 100,
'max_sleep_time_seconds': 1}
# making clock less accurate for nosetests running slow
self.test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
req = Request.blank('/v/a')
req.environ['swift.cache'] = FakeMemcache()
begin = time.time()
class rate_caller(Thread):
def __init__(self, parent, name):
Thread.__init__(self)
self.parent = parent
self.name = name
def run(self):
self.result1 = self.parent.test_ratelimit(req.environ,
start_response)
time.sleep(.1)
self.result2 = self.parent.test_ratelimit(req.environ,
start_response)
nt = 3
threads = []
for i in range(nt):
rc = rate_caller(self, "thread %s" % i)
rc.start()
threads.append(rc)
for thread in threads:
thread.join()
all_results = [''.join(t.result1) for t in threads]
all_results += [''.join(t.result2) for t in threads]
the_498s = [t for t in all_results if t.startswith('Slow down')]
self.assertEquals(len(the_498s), 2)
time_took = time.time() - begin
self.assert_(1.5 <= round(time_took,1) < 1.7, time_took)
def test_ratelimit_max_rate_multiple_acc(self):
num_calls = 4
current_rate = 2
conf_dict = {'account_ratelimit': current_rate,
'max_sleep_time_seconds': 2}
fake_memcache = FakeMemcache()
the_app = ratelimit.RateLimitMiddleware(None, conf_dict,
logger=FakeLogger())
the_app.memcache_client = fake_memcache
req = lambda: None
req.method = 'GET'
class rate_caller(Thread):
def __init__(self, name):
self.myname = name
Thread.__init__(self)
def run(self):
for j in range(num_calls):
self.result = the_app.handle_ratelimit(req, self.myname,
None, None)
nt = 15
begin = time.time()
threads = []
for i in range(nt):
rc = rate_caller('a%s' % i)
rc.start()
threads.append(rc)
for thread in threads:
thread.join()
time_took = time.time() - begin
# the all 15 threads still take 1.5 secs
self.assert_(1.5 <= round(time_took,1) < 1.7)
def test_ratelimit_acc_vrs_container(self):
conf_dict = {'clock_accuracy': 1000,
'account_ratelimit': 10,
'max_sleep_time_seconds': 4,
'container_ratelimit_10': 6,
'container_ratelimit_50': 2,
'container_ratelimit_75': 1}
self.test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
req = Request.blank('/v/a/c')
req.environ['swift.cache'] = FakeMemcache()
cont_key = get_container_memcache_key('a', 'c')
class rate_caller(Thread):
def __init__(self, parent, name):
Thread.__init__(self)
self.parent = parent
self.name = name
def run(self):
self.result = self.parent.test_ratelimit(req.environ,
start_response)
def runthreads(threads, nt):
for i in range(nt):
rc = rate_caller(self, "thread %s" % i)
rc.start()
threads.append(rc)
for thread in threads:
thread.join()
begin = time.time()
req.environ['swift.cache'].set(cont_key, {'container_size': 20})
begin = time.time()
threads = []
runthreads(threads, 3)
time_took = time.time() - begin
self.assert_(round(time_took, 1) == .4)
if __name__ == '__main__':
unittest.main()

View File

@ -56,7 +56,7 @@ class TestObjectController(unittest.TestCase):
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.object_controller = object_server.ObjectController(conf)
self.object_controller.chunks_per_sync = 1
self.object_controller.bytes_per_sync = 1
def tearDown(self):
""" Tear down for testing swift.object_server.ObjectController """

View File

@ -28,6 +28,7 @@ from httplib import HTTPException
from shutil import rmtree
from time import time
from urllib import unquote, quote
from hashlib import md5
import eventlet
from eventlet import sleep, spawn, TimeoutError, util, wsgi, listen
@ -50,6 +51,7 @@ from swift.common.utils import mkdirs, normalize_timestamp, NullLogger
# mocks
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
def fake_http_connect(*code_iter, **kwargs):
class FakeConn(object):
def __init__(self, status, etag=None, body=''):
@ -61,12 +63,15 @@ def fake_http_connect(*code_iter, **kwargs):
self.received = 0
self.etag = etag
self.body = body
def getresponse(self):
if 'raise_exc' in kwargs:
raise Exception('test')
return self
def getexpect(self):
return FakeConn(100)
def getheaders(self):
headers = {'content-length': len(self.body),
'content-type': 'x-application/test',
@ -84,6 +89,7 @@ def fake_http_connect(*code_iter, **kwargs):
if 'slow' in kwargs:
headers['content-length'] = '4'
return headers.items()
def read(self, amt=None):
if 'slow' in kwargs:
if self.sent < 4:
@ -93,19 +99,23 @@ def fake_http_connect(*code_iter, **kwargs):
rv = self.body[:amt]
self.body = self.body[amt:]
return rv
def send(self, amt=None):
if 'slow' in kwargs:
if self.received < 4:
self.received += 1
sleep(0.1)
def getheader(self, name, default=None):
return dict(self.getheaders()).get(name.lower(), default)
etag_iter = iter(kwargs.get('etags') or [None] * len(code_iter))
x = kwargs.get('missing_container', [False] * len(code_iter))
if not isinstance(x, (tuple, list)):
x = [x] * len(code_iter)
container_ts_iter = iter(x)
code_iter = iter(code_iter)
def connect(*args, **ckwargs):
if 'give_content_type' in kwargs:
if len(args) >= 7 and 'content_type' in args[6]:
@ -119,6 +129,7 @@ def fake_http_connect(*code_iter, **kwargs):
if status == -1:
raise HTTPException()
return FakeConn(status, etag, body=kwargs.get('body', ''))
return connect
@ -180,11 +191,13 @@ class FakeMemcacheReturnsNone(FakeMemcache):
# using the FakeMemcache for container existence checks.
return None
class NullLoggingHandler(logging.Handler):
def emit(self, record):
pass
@contextmanager
def save_globals():
orig_http_connect = getattr(proxy_server, 'http_connect', None)
@ -211,6 +224,7 @@ class TestProxyServer(unittest.TestCase):
def test_calls_authorize_allow(self):
called = [False]
def authorize(req):
called[0] = True
with save_globals():
@ -226,6 +240,7 @@ class TestProxyServer(unittest.TestCase):
def test_calls_authorize_deny(self):
called = [False]
def authorize(req):
called[0] = True
return HTTPUnauthorized(request=req)
@ -251,6 +266,7 @@ class TestObjectController(unittest.TestCase):
kwargs = {}
if raise_exc:
kwargs['raise_exc'] = raise_exc
proxy_server.http_connect = fake_http_connect(*statuses, **kwargs)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', headers={'Content-Length': '0',
@ -258,6 +274,8 @@ class TestObjectController(unittest.TestCase):
self.app.update_request(req)
res = method(req)
self.assertEquals(res.status_int, expected)
# repeat test
proxy_server.http_connect = fake_http_connect(*statuses, **kwargs)
self.app.memcache.store = {}
req = Request.blank('/a/c/o', headers={'Content-Length': '0',
@ -270,6 +288,7 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
def test_content_type(filename, expected):
proxy_server.http_connect = fake_http_connect(201, 201, 201,
give_content_type=lambda content_type:
@ -277,17 +296,18 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/a/c/%s' % filename, {})
self.app.update_request(req)
res = controller.PUT(req)
test_content_type('test.jpg',
iter(['', '', '', 'image/jpeg', 'image/jpeg', 'image/jpeg']))
test_content_type('test.html',
iter(['', '', '', 'text/html', 'text/html', 'text/html']))
test_content_type('test.css',
iter(['', '', '', 'text/css', 'text/css', 'text/css']))
test_content_type('test.jpg', iter(['', '', '', 'image/jpeg',
'image/jpeg', 'image/jpeg']))
test_content_type('test.html', iter(['', '', '', 'text/html',
'text/html', 'text/html']))
test_content_type('test.css', iter(['', '', '', 'text/css',
'text/css', 'text/css']))
def test_PUT(self):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
def test_status_map(statuses, expected):
proxy_server.http_connect = fake_http_connect(*statuses)
req = Request.blank('/a/c/o.jpg', {})
@ -308,11 +328,24 @@ class TestObjectController(unittest.TestCase):
def __init__(self, status):
self.status = status
self.reason = 'Fake'
def getresponse(self): return self
def read(self, amt=None): return ''
def getheader(self, name): return ''
def getexpect(self): return FakeConn(100)
def getresponse(self):
return self
def read(self, amt=None):
return ''
def getheader(self, name):
return ''
def getexpect(self):
if self.status == -2:
raise HTTPException()
if self.status == -3:
return FakeConn(507)
return FakeConn(100)
code_iter = iter(code_iter)
def connect(*args, **ckwargs):
status = code_iter.next()
if status == -1:
@ -322,6 +355,7 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
def test_status_map(statuses, expected):
proxy_server.http_connect = mock_http_connect(*statuses)
self.app.memcache.store = {}
@ -332,6 +366,8 @@ class TestObjectController(unittest.TestCase):
expected = str(expected)
self.assertEquals(res.status[:len(expected)], expected)
test_status_map((200, 200, 201, 201, -1), 201)
test_status_map((200, 200, 201, 201, -2), 201) # expect timeout
test_status_map((200, 200, 201, 201, -3), 201) # error limited
test_status_map((200, 200, 201, -1, -1), 503)
test_status_map((200, 200, 503, 503, -1), 503)
@ -343,20 +379,38 @@ class TestObjectController(unittest.TestCase):
self.reason = 'Fake'
self.host = '1.2.3.4'
self.port = 1024
def getresponse(self): return self
def read(self, amt=None): return ''
self.etag = md5()
def getresponse(self):
self.etag = self.etag.hexdigest()
self.headers = {
'etag': self.etag,
}
return self
def read(self, amt=None):
return ''
def send(self, amt=None):
if self.status == -1:
raise HTTPException()
def getheader(self, name): return ''
def getexpect(self): return FakeConn(100)
else:
self.etag.update(amt)
def getheader(self, name):
return self.headers.get(name, '')
def getexpect(self):
return FakeConn(100)
code_iter = iter(code_iter)
def connect(*args, **ckwargs):
return FakeConn(code_iter.next())
return connect
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
def test_status_map(statuses, expected):
self.app.memcache.store = {}
proxy_server.http_connect = mock_http_connect(*statuses)
@ -366,7 +420,7 @@ class TestObjectController(unittest.TestCase):
res = controller.PUT(req)
expected = str(expected)
self.assertEquals(res.status[:len(expected)], expected)
test_status_map((200, 200, 201, 201, -1), 201)
test_status_map((200, 200, 201, -1, 201), 201)
test_status_map((200, 200, 201, -1, -1), 503)
test_status_map((200, 200, 503, 503, -1), 503)
@ -390,21 +444,32 @@ class TestObjectController(unittest.TestCase):
self.reason = 'Fake'
self.host = '1.2.3.4'
self.port = 1024
def getresponse(self):
if self.status == -1:
raise HTTPException()
return self
def read(self, amt=None): return ''
def send(self, amt=None): pass
def getheader(self, name): return ''
def getexpect(self): return FakeConn(100)
def read(self, amt=None):
return ''
def send(self, amt=None):
pass
def getheader(self, name):
return ''
def getexpect(self):
return FakeConn(100)
code_iter = iter(code_iter)
def connect(*args, **ckwargs):
return FakeConn(code_iter.next())
return connect
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
def test_status_map(statuses, expected):
self.app.memcache.store = {}
proxy_server.http_connect = mock_http_connect(*statuses)
@ -423,6 +488,7 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
def test_status_map(statuses, expected):
proxy_server.http_connect = fake_http_connect(*statuses)
self.app.memcache.store = {}
@ -444,6 +510,7 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
def test_status_map(statuses, expected):
proxy_server.http_connect = fake_http_connect(*statuses)
self.app.memcache.store = {}
@ -463,6 +530,7 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
def test_status_map(statuses, expected):
proxy_server.http_connect = fake_http_connect(*statuses)
self.app.memcache.store = {}
@ -490,14 +558,14 @@ class TestObjectController(unittest.TestCase):
# acct cont obj obj obj
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar',
'X-Object-Meta-Foo': 'x'*256})
'X-Object-Meta-Foo': 'x' * 256})
self.app.update_request(req)
res = controller.POST(req)
self.assertEquals(res.status_int, 202)
proxy_server.http_connect = fake_http_connect(202, 202, 202)
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar',
'X-Object-Meta-Foo': 'x'*257})
'X-Object-Meta-Foo': 'x' * 257})
self.app.update_request(req)
res = controller.POST(req)
self.assertEquals(res.status_int, 400)
@ -510,15 +578,15 @@ class TestObjectController(unittest.TestCase):
fake_http_connect(200, 200, 202, 202, 202)
# acct cont obj obj obj
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar',
('X-Object-Meta-'+'x'*128): 'x'})
'Content-Type': 'foo/bar',
('X-Object-Meta-' + 'x' * 128): 'x'})
self.app.update_request(req)
res = controller.POST(req)
self.assertEquals(res.status_int, 202)
proxy_server.http_connect = fake_http_connect(202, 202, 202)
req = Request.blank('/a/c/o', {}, headers={
'Content-Type': 'foo/bar',
('X-Object-Meta-'+'x'*129): 'x'})
'Content-Type': 'foo/bar',
('X-Object-Meta-' + 'x' * 129): 'x'})
self.app.update_request(req)
res = controller.POST(req)
self.assertEquals(res.status_int, 400)
@ -527,7 +595,8 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
headers = dict((('X-Object-Meta-'+str(i), 'a') for i in xrange(91)))
headers = dict(
(('X-Object-Meta-' + str(i), 'a') for i in xrange(91)))
headers.update({'Content-Type': 'foo/bar'})
proxy_server.http_connect = fake_http_connect(202, 202, 202)
req = Request.blank('/a/c/o', {}, headers=headers)
@ -539,7 +608,8 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
headers = dict((('X-Object-Meta-'+str(i), 'a'*256) for i in xrange(1000)))
headers = dict(
(('X-Object-Meta-' + str(i), 'a' * 256) for i in xrange(1000)))
headers.update({'Content-Type': 'foo/bar'})
proxy_server.http_connect = fake_http_connect(202, 202, 202)
req = Request.blank('/a/c/o', {}, headers=headers)
@ -561,9 +631,11 @@ class TestObjectController(unittest.TestCase):
for dev in self.app.object_ring.devs.values():
dev['ip'] = '127.0.0.1'
dev['port'] = 1
class SlowBody():
def __init__(self):
self.sent = 0
def read(self, size=-1):
if self.sent < 4:
sleep(0.1)
@ -606,9 +678,11 @@ class TestObjectController(unittest.TestCase):
for dev in self.app.object_ring.devs.values():
dev['ip'] = '127.0.0.1'
dev['port'] = 1
class SlowBody():
def __init__(self):
self.sent = 0
def read(self, size=-1):
raise Exception('Disconnected')
req = Request.blank('/a/c/o',
@ -651,7 +725,7 @@ class TestObjectController(unittest.TestCase):
except proxy_server.ChunkReadTimeout:
got_exc = True
self.assert_(not got_exc)
self.app.node_timeout=0.1
self.app.node_timeout = 0.1
proxy_server.http_connect = \
fake_http_connect(200, 200, 200, slow=True)
resp = controller.GET(req)
@ -687,7 +761,7 @@ class TestObjectController(unittest.TestCase):
fake_http_connect(200, 200, 201, 201, 201, slow=True)
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.app.node_timeout=0.1
self.app.node_timeout = 0.1
proxy_server.http_connect = \
fake_http_connect(201, 201, 201, slow=True)
req = Request.blank('/a/c/o',
@ -787,7 +861,8 @@ class TestObjectController(unittest.TestCase):
self.assert_('last_error' in controller.app.object_ring.devs[0])
self.assert_status_map(controller.PUT, (200, 201, 201, 201), 503)
self.assert_status_map(controller.POST, (200, 202, 202, 202), 503)
self.assert_status_map(controller.DELETE, (200, 204, 204, 204), 503)
self.assert_status_map(controller.DELETE,
(200, 204, 204, 204), 503)
self.app.error_suppression_interval = -300
self.assert_status_map(controller.HEAD, (200, 200, 200), 200)
self.assertRaises(BaseException,
@ -913,7 +988,7 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Object-Meta-' + ('a' *
MAX_META_NAME_LENGTH) : 'v'})
MAX_META_NAME_LENGTH): 'v'})
self.app.update_request(req)
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
@ -921,7 +996,7 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Object-Meta-' + ('a' *
(MAX_META_NAME_LENGTH + 1)) : 'v'})
(MAX_META_NAME_LENGTH + 1)): 'v'})
self.app.update_request(req)
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 400)
@ -1026,6 +1101,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o/o2')
# repeat tests with leading /
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
@ -1050,6 +1126,18 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o/o2')
# negative tests
# invalid x-copy-from path
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c'})
self.app.update_request(req)
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int // 100, 4) # client error
# server error
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
@ -1061,6 +1149,7 @@ class TestObjectController(unittest.TestCase):
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 503)
# not found
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
@ -1072,6 +1161,7 @@ class TestObjectController(unittest.TestCase):
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 404)
# some missing containers
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o'})
@ -1083,6 +1173,7 @@ class TestObjectController(unittest.TestCase):
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
# test object meta data
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '0',
'X-Copy-From': '/c/o',
@ -1094,7 +1185,8 @@ class TestObjectController(unittest.TestCase):
self.app.memcache.store = {}
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers.get('x-object-meta-test'), 'testing')
self.assertEquals(resp.headers.get('x-object-meta-test'),
'testing')
self.assertEquals(resp.headers.get('x-object-meta-ours'), 'okay')
def test_COPY(self):
@ -1120,7 +1212,8 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o')
req = Request.blank('/a/c/o/o2', environ={'REQUEST_METHOD': 'COPY'},
req = Request.blank('/a/c/o/o2',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': 'c/o'})
req.account = 'a'
controller.object_name = 'o/o2'
@ -1144,7 +1237,8 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers['x-copied-from'], 'c/o')
req = Request.blank('/a/c/o/o2', environ={'REQUEST_METHOD': 'COPY'},
req = Request.blank('/a/c/o/o2',
environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o/o2'
@ -1211,16 +1305,67 @@ class TestObjectController(unittest.TestCase):
self.app.memcache.store = {}
resp = controller.COPY(req)
self.assertEquals(resp.status_int, 201)
self.assertEquals(resp.headers.get('x-object-meta-test'), 'testing')
self.assertEquals(resp.headers.get('x-object-meta-test'),
'testing')
self.assertEquals(resp.headers.get('x-object-meta-ours'), 'okay')
def test_chunked_put(self):
# quick test of chunked put w/o PATH_TO_TEST_XFS
class ChunkedFile():
def __init__(self, bytes):
self.bytes = bytes
self.read_bytes = 0
@property
def bytes_left(self):
return self.bytes - self.read_bytes
def read(self, amt=None):
if self.read_bytes >= self.bytes:
raise StopIteration()
if not amt:
amt = self.bytes_left
data = 'a' * min(amt, self.bytes_left)
self.read_bytes += len(data)
return data
with save_globals():
proxy_server.http_connect = fake_http_connect(201, 201, 201, 201)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
req = Request.blank('/a/c/o', {}, headers={
'Transfer-Encoding': 'chunked',
'Content-Type': 'foo/bar'})
req.body_file = ChunkedFile(10)
self.app.memcache.store = {}
self.app.update_request(req)
res = controller.PUT(req)
self.assertEquals(res.status_int // 100, 2) # success
# test 413 entity to large
from swift.proxy import server
proxy_server.http_connect = fake_http_connect(201, 201, 201, 201)
req = Request.blank('/a/c/o', {}, headers={
'Transfer-Encoding': 'chunked',
'Content-Type': 'foo/bar'})
req.body_file = ChunkedFile(11)
self.app.memcache.store = {}
self.app.update_request(req)
try:
server.MAX_FILE_SIZE = 10
res = controller.PUT(req)
self.assertEquals(res.status_int, 413)
finally:
server.MAX_FILE_SIZE = MAX_FILE_SIZE
def test_chunked_put_and_a_bit_more(self):
# Since we're starting up a lot here, we're going to test more than
# just chunked puts; we're also going to test parts of
# proxy_server.Application we couldn't get to easily otherwise.
path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS')
if not path_to_test_xfs or not os.path.exists(path_to_test_xfs):
print >>sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \
print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \
'pointing to a valid directory.\n' \
'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \
'system for testing.'
@ -1295,17 +1440,6 @@ class TestObjectController(unittest.TestCase):
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 404'
self.assertEquals(headers[:len(exp)], exp)
# Check blacklist
prosrv.rate_limit_blacklist = ['a']
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('GET /v1/a HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nContent-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 497'
self.assertEquals(headers[:len(exp)], exp)
prosrv.rate_limit_blacklist = []
# Check invalid utf-8
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
@ -1326,31 +1460,6 @@ class TestObjectController(unittest.TestCase):
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 412'
self.assertEquals(headers[:len(exp)], exp)
# Check rate limiting
orig_rate_limit = prosrv.rate_limit
prosrv.rate_limit = 0
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('GET /v1/a HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\n'
'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 498'
self.assertEquals(headers[:len(exp)], exp)
prosrv.rate_limit = orig_rate_limit
orig_rate_limit = prosrv.account_rate_limit
prosrv.account_rate_limit = 0
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/c HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\n'
'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 498'
self.assertEquals(headers[:len(exp)], exp)
prosrv.account_rate_limit = orig_rate_limit
# Check bad method
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
@ -1362,8 +1471,10 @@ class TestObjectController(unittest.TestCase):
exp = 'HTTP/1.1 405'
self.assertEquals(headers[:len(exp)], exp)
# Check unhandled exception
orig_rate_limit = prosrv.rate_limit
del prosrv.rate_limit
orig_update_request = prosrv.update_request
def broken_update_request(env, req):
raise Exception('fake')
prosrv.update_request = broken_update_request
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('HEAD /v1/a HTTP/1.1\r\nHost: localhost\r\n'
@ -1373,7 +1484,7 @@ class TestObjectController(unittest.TestCase):
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 500'
self.assertEquals(headers[:len(exp)], exp)
prosrv.rate_limit = orig_rate_limit
prosrv.update_request = orig_update_request
# Okay, back to chunked put testing; Create account
ts = normalize_timestamp(time())
partition, nodes = prosrv.account_ring.get_nodes('a')
@ -1409,6 +1520,7 @@ class TestObjectController(unittest.TestCase):
# GET account with a query string to test that
# Application.log_request logs the query string. Also, throws
# in a test for logging x-forwarded-for (first entry only).
class Logger(object):
def info(self, msg):
self.msg = msg
@ -1416,7 +1528,8 @@ class TestObjectController(unittest.TestCase):
prosrv.logger = Logger()
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('GET /v1/a?format=json HTTP/1.1\r\nHost: localhost\r\n'
fd.write(
'GET /v1/a?format=json HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\n'
'Content-Length: 0\r\nX-Forwarded-For: host1, host2\r\n'
'\r\n')
@ -1430,6 +1543,7 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(prosrv.logger.msg[:len(exp)], exp)
prosrv.logger = orig_logger
# Turn on header logging.
class Logger(object):
def info(self, msg):
self.msg = msg
@ -1602,6 +1716,7 @@ class TestObjectController(unittest.TestCase):
def test_mismatched_etags(self):
with save_globals():
# no etag supplied, object servers return success w/ diff values
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
@ -1613,7 +1728,22 @@ class TestObjectController(unittest.TestCase):
'68b329da9893e34099c7d8ad5cb9c940',
'68b329da9893e34099c7d8ad5cb9c941'])
resp = controller.PUT(req)
self.assertEquals(resp.status_int, 422)
self.assertEquals(resp.status_int // 100, 5) # server error
# req supplies etag, object servers return 422 - mismatch
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={
'Content-Length': '0',
'ETag': '68b329da9893e34099c7d8ad5cb9c940',
})
self.app.update_request(req)
proxy_server.http_connect = fake_http_connect(200, 422, 422, 503,
etags=['68b329da9893e34099c7d8ad5cb9c940',
'68b329da9893e34099c7d8ad5cb9c941',
None,
None])
resp = controller.PUT(req)
self.assertEquals(resp.status_int // 100, 4) # client error
def test_request_bytes_transferred_attr(self):
with save_globals():
@ -1678,11 +1808,12 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(res.bytes_transferred, 5)
self.assert_(hasattr(res, 'client_disconnect'))
self.assert_(res.client_disconnect)
finally:
finally:
self.app.object_chunk_size = orig_object_chunk_size
def test_GET_calls_authorize(self):
called = [False]
def authorize(req):
called[0] = True
return HTTPUnauthorized(request=req)
@ -1699,6 +1830,7 @@ class TestObjectController(unittest.TestCase):
def test_HEAD_calls_authorize(self):
called = [False]
def authorize(req):
called[0] = True
return HTTPUnauthorized(request=req)
@ -1715,6 +1847,7 @@ class TestObjectController(unittest.TestCase):
def test_POST_calls_authorize(self):
called = [False]
def authorize(req):
called[0] = True
return HTTPUnauthorized(request=req)
@ -1732,6 +1865,7 @@ class TestObjectController(unittest.TestCase):
def test_PUT_calls_authorize(self):
called = [False]
def authorize(req):
called[0] = True
return HTTPUnauthorized(request=req)
@ -1748,7 +1882,6 @@ class TestObjectController(unittest.TestCase):
self.assert_(called[0])
class TestContainerController(unittest.TestCase):
"Test swift.proxy_server.ContainerController"
@ -1757,7 +1890,8 @@ class TestContainerController(unittest.TestCase):
account_ring=FakeRing(), container_ring=FakeRing(),
object_ring=FakeRing())
def assert_status_map(self, method, statuses, expected, raise_exc=False, missing_container=False):
def assert_status_map(self, method, statuses, expected,
raise_exc=False, missing_container=False):
with save_globals():
kwargs = {}
if raise_exc:
@ -1782,8 +1916,10 @@ class TestContainerController(unittest.TestCase):
with save_globals():
controller = proxy_server.ContainerController(self.app, 'account',
'container')
def test_status_map(statuses, expected, **kwargs):
proxy_server.http_connect = fake_http_connect(*statuses, **kwargs)
proxy_server.http_connect = fake_http_connect(*statuses,
**kwargs)
self.app.memcache.store = {}
req = Request.blank('/a/c', {})
self.app.update_request(req)
@ -1804,8 +1940,10 @@ class TestContainerController(unittest.TestCase):
with save_globals():
controller = proxy_server.ContainerController(self.app, 'account',
'container')
def test_status_map(statuses, expected, **kwargs):
proxy_server.http_connect = fake_http_connect(*statuses, **kwargs)
proxy_server.http_connect = fake_http_connect(*statuses,
**kwargs)
self.app.memcache.store = {}
req = Request.blank('/a/c', {})
req.content_length = 0
@ -1821,19 +1959,25 @@ class TestContainerController(unittest.TestCase):
def test_PUT_max_container_name_length(self):
with save_globals():
controller = proxy_server.ContainerController(self.app, 'account',
'1'*256)
self.assert_status_map(controller.PUT, (200, 200, 200, 201, 201, 201), 201, missing_container=True)
'1' * 256)
self.assert_status_map(controller.PUT,
(200, 200, 200, 201, 201, 201), 201,
missing_container=True)
controller = proxy_server.ContainerController(self.app, 'account',
'2'*257)
self.assert_status_map(controller.PUT, (201, 201, 201), 400, missing_container=True)
'2' * 257)
self.assert_status_map(controller.PUT, (201, 201, 201), 400,
missing_container=True)
def test_PUT_connect_exceptions(self):
with save_globals():
controller = proxy_server.ContainerController(self.app, 'account',
'container')
self.assert_status_map(controller.PUT, (200, 201, 201, -1), 201, missing_container=True)
self.assert_status_map(controller.PUT, (200, 201, -1, -1), 503, missing_container=True)
self.assert_status_map(controller.PUT, (200, 503, 503, -1), 503, missing_container=True)
self.assert_status_map(controller.PUT, (200, 201, 201, -1), 201,
missing_container=True)
self.assert_status_map(controller.PUT, (200, 201, -1, -1), 503,
missing_container=True)
self.assert_status_map(controller.PUT, (200, 503, 503, -1), 503,
missing_container=True)
def test_acc_missing_returns_404(self):
for meth in ('DELETE', 'PUT'):
@ -1846,7 +1990,8 @@ class TestContainerController(unittest.TestCase):
'account', 'container')
if meth == 'PUT':
proxy_server.http_connect = \
fake_http_connect(200, 200, 200, 200, 200, 200, missing_container=True)
fake_http_connect(200, 200, 200, 200, 200, 200,
missing_container=True)
else:
proxy_server.http_connect = \
fake_http_connect(200, 200, 200, 200)
@ -1884,6 +2029,7 @@ class TestContainerController(unittest.TestCase):
def __init__(self, allow_lock=None):
self.allow_lock = allow_lock
super(MockMemcache, self).__init__()
@contextmanager
def soft_lock(self, key, timeout=0, retries=5):
if self.allow_lock:
@ -1894,7 +2040,8 @@ class TestContainerController(unittest.TestCase):
controller = proxy_server.ContainerController(self.app, 'account',
'container')
self.app.memcache = MockMemcache(allow_lock=True)
proxy_server.http_connect = fake_http_connect(200, 200, 200, 201, 201, 201, missing_container=True)
proxy_server.http_connect = fake_http_connect(
200, 200, 200, 201, 201, 201, missing_container=True)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'PUT'})
self.app.update_request(req)
res = controller.PUT(req)
@ -1904,37 +2051,48 @@ class TestContainerController(unittest.TestCase):
with save_globals():
controller = proxy_server.ContainerController(self.app, 'account',
'container')
self.assert_status_map(controller.HEAD, (200, 503, 200, 200), 200, missing_container=False)
self.assert_status_map(controller.HEAD, (200, 503, 200, 200), 200,
missing_container=False)
self.assertEquals(
controller.app.container_ring.devs[0]['errors'], 2)
self.assert_('last_error' in controller.app.container_ring.devs[0])
for _ in xrange(self.app.error_suppression_limit):
self.assert_status_map(controller.HEAD, (200, 503, 503, 503), 503)
self.assert_status_map(controller.HEAD,
(200, 503, 503, 503), 503)
self.assertEquals(controller.app.container_ring.devs[0]['errors'],
self.app.error_suppression_limit + 1)
self.assert_status_map(controller.HEAD, (200, 200, 200, 200), 503)
self.assert_('last_error' in controller.app.container_ring.devs[0])
self.assert_status_map(controller.PUT, (200, 201, 201, 201), 503, missing_container=True)
self.assert_status_map(controller.DELETE, (200, 204, 204, 204), 503)
self.assert_status_map(controller.PUT, (200, 201, 201, 201), 503,
missing_container=True)
self.assert_status_map(controller.DELETE,
(200, 204, 204, 204), 503)
self.app.error_suppression_interval = -300
self.assert_status_map(controller.HEAD, (200, 200, 200, 200), 200)
self.assert_status_map(controller.DELETE, (200, 204, 204, 204), 404,
raise_exc=True)
self.assert_status_map(controller.DELETE, (200, 204, 204, 204),
404, raise_exc=True)
def test_DELETE(self):
with save_globals():
controller = proxy_server.ContainerController(self.app, 'account',
'container')
self.assert_status_map(controller.DELETE, (200, 204, 204, 204), 204)
self.assert_status_map(controller.DELETE, (200, 204, 204, 503), 503)
self.assert_status_map(controller.DELETE, (200, 204, 503, 503), 503)
self.assert_status_map(controller.DELETE, (200, 204, 404, 404), 404)
self.assert_status_map(controller.DELETE, (200, 404, 404, 404), 404)
self.assert_status_map(controller.DELETE, (200, 204, 503, 404), 503)
self.assert_status_map(controller.DELETE,
(200, 204, 204, 204), 204)
self.assert_status_map(controller.DELETE,
(200, 204, 204, 503), 503)
self.assert_status_map(controller.DELETE,
(200, 204, 503, 503), 503)
self.assert_status_map(controller.DELETE,
(200, 204, 404, 404), 404)
self.assert_status_map(controller.DELETE,
(200, 404, 404, 404), 404)
self.assert_status_map(controller.DELETE,
(200, 204, 503, 404), 503)
self.app.memcache = FakeMemcacheReturnsNone()
# 200: Account check, 404x3: Container check
self.assert_status_map(controller.DELETE, (200, 404, 404, 404), 404)
self.assert_status_map(controller.DELETE,
(200, 404, 404, 404), 404)
def test_response_bytes_transferred_attr(self):
with save_globals():
@ -1968,7 +2126,7 @@ class TestContainerController(unittest.TestCase):
self.assertEquals(res.bytes_transferred, 1)
self.assert_(hasattr(res, 'client_disconnect'))
self.assert_(res.client_disconnect)
finally:
finally:
self.app.object_chunk_size = orig_object_chunk_size
def test_PUT_metadata(self):
@ -1982,6 +2140,7 @@ class TestContainerController(unittest.TestCase):
('X-Container-Meta-TestHeader', 'TestValue'),
('X-Container-Meta-TestHeader', '')):
test_errors = []
def test_connect(ipaddr, port, device, partition, method, path,
headers=None, query_string=None):
if path == '/a/c':
@ -2095,6 +2254,7 @@ class TestContainerController(unittest.TestCase):
def test_POST_calls_clean_acl(self):
called = [False]
def clean_acl(header, value):
called[0] = True
raise ValueError('fake error')
@ -2122,6 +2282,7 @@ class TestContainerController(unittest.TestCase):
def test_PUT_calls_clean_acl(self):
called = [False]
def clean_acl(header, value):
called[0] = True
raise ValueError('fake error')
@ -2149,6 +2310,7 @@ class TestContainerController(unittest.TestCase):
def test_GET_calls_authorize(self):
called = [False]
def authorize(req):
called[0] = True
return HTTPUnauthorized(request=req)
@ -2165,6 +2327,7 @@ class TestContainerController(unittest.TestCase):
def test_HEAD_calls_authorize(self):
called = [False]
def authorize(req):
called[0] = True
return HTTPUnauthorized(request=req)
@ -2243,7 +2406,7 @@ class TestAccountController(unittest.TestCase):
self.app.account_ring.get_nodes('account')
for dev in self.app.account_ring.devs.values():
dev['ip'] = '127.0.0.1'
dev['port'] = 1 ## can't connect on this port
dev['port'] = 1 # can't connect on this port
controller = proxy_server.AccountController(self.app, 'account')
req = Request.blank('/account', environ={'REQUEST_METHOD': 'HEAD'})
self.app.update_request(req)
@ -2254,7 +2417,7 @@ class TestAccountController(unittest.TestCase):
self.app.account_ring.get_nodes('account')
for dev in self.app.account_ring.devs.values():
dev['ip'] = '127.0.0.1'
dev['port'] = -1 ## invalid port number
dev['port'] = -1 # invalid port number
controller = proxy_server.AccountController(self.app, 'account')
req = Request.blank('/account', environ={'REQUEST_METHOD': 'HEAD'})
self.app.update_request(req)
@ -2291,12 +2454,13 @@ class TestAccountController(unittest.TestCase):
self.assertEquals(res.bytes_transferred, 1)
self.assert_(hasattr(res, 'client_disconnect'))
self.assert_(res.client_disconnect)
finally:
finally:
self.app.object_chunk_size = orig_object_chunk_size
def test_PUT(self):
with save_globals():
controller = proxy_server.AccountController(self.app, 'account')
def test_status_map(statuses, expected, **kwargs):
proxy_server.http_connect = \
fake_http_connect(*statuses, **kwargs)
@ -2314,9 +2478,9 @@ class TestAccountController(unittest.TestCase):
def test_PUT_max_account_name_length(self):
with save_globals():
controller = proxy_server.AccountController(self.app, '1'*256)
controller = proxy_server.AccountController(self.app, '1' * 256)
self.assert_status_map(controller.PUT, (201, 201, 201), 201)
controller = proxy_server.AccountController(self.app, '2'*257)
controller = proxy_server.AccountController(self.app, '2' * 257)
self.assert_status_map(controller.PUT, (201, 201, 201), 400)
def test_PUT_connect_exceptions(self):
@ -2337,6 +2501,7 @@ class TestAccountController(unittest.TestCase):
('X-Account-Meta-TestHeader', 'TestValue'),
('X-Account-Meta-TestHeader', '')):
test_errors = []
def test_connect(ipaddr, port, device, partition, method, path,
headers=None, query_string=None):
if path == '/a':
@ -2358,7 +2523,6 @@ class TestAccountController(unittest.TestCase):
res = getattr(controller, method)(req)
self.assertEquals(test_errors, [])
def test_PUT_bad_metadata(self):
self.bad_metadata_helper('PUT')