swift/swift/proxy/server.py

1430 lines
60 KiB
Python

# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import mimetypes
import os
import time
import traceback
from ConfigParser import ConfigParser
from urllib import unquote, quote
import uuid
import functools
from eventlet.timeout import Timeout
from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \
HTTPNotFound, HTTPPreconditionFailed, \
HTTPRequestTimeout, HTTPServiceUnavailable, \
HTTPUnprocessableEntity, HTTPRequestEntityTooLarge, HTTPServerError, \
status_map
from webob import Request, Response
from swift.common.ring import Ring
from swift.common.utils import get_logger, normalize_timestamp, split_path, \
cache_from_env
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \
check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \
MAX_FILE_SIZE
from swift.common.exceptions import ChunkReadTimeout, \
ChunkWriteTimeout, ConnectionTimeout
def update_headers(response, headers):
"""
Helper function to update headers in the response.
:param response: webob.Response object
:param headers: dictionary headers
"""
if hasattr(headers, 'items'):
headers = headers.items()
for name, value in headers:
if name == 'etag':
response.headers[name] = value.replace('"', '')
elif name not in ('date', 'content-length', 'content-type',
'connection', 'x-timestamp', 'x-put-timestamp'):
response.headers[name] = value
def public(func):
"""
Decorator to declare which methods are public accessible as HTTP requests
:param func: function to make public
"""
func.publicly_accessible = True
@functools.wraps(func)
def wrapped(*a, **kw):
return func(*a, **kw)
return wrapped
def delay_denial(func):
"""
Decorator to declare which methods should have any swift.authorize call
delayed. This is so the method can load the Request object up with
additional information that may be needed by the authorization system.
:param func: function to delay authorization on
"""
func.delay_denial = True
@functools.wraps(func)
def wrapped(*a, **kw):
return func(*a, **kw)
return wrapped
def get_container_memcache_key(account, container):
path = '/%s/%s' % (account, container)
return 'container%s' % path
class Controller(object):
"""Base WSGI controller class for the proxy"""
def __init__(self, app):
self.account_name = None
self.app = app
self.trans_id = '-'
def error_increment(self, node):
"""
Handles incrementing error counts when talking to nodes.
:param node: dictionary of node to increment the error count for
"""
node['errors'] = node.get('errors', 0) + 1
node['last_error'] = time.time()
def error_occurred(self, node, msg):
"""
Handle logging, and handling of errors.
:param node: dictionary of node to handle errors for
:param msg: error message
"""
self.error_increment(node)
self.app.logger.error(
'%s %s:%s' % (msg, node['ip'], node['port']))
def exception_occurred(self, node, typ, additional_info):
"""
Handle logging of generic exceptions.
:param node: dictionary of node to log the error for
:param typ: server type
:param additional_info: additional information to log
"""
self.app.logger.exception(
'ERROR with %s server %s:%s/%s transaction %s re: %s' % (typ,
node['ip'], node['port'], node['device'], self.trans_id,
additional_info))
def error_limited(self, node):
"""
Check if the node is currently error limited.
:param node: dictionary of node to check
:returns: True if error limited, False otherwise
"""
now = time.time()
if not 'errors' in node:
return False
if 'last_error' in node and node['last_error'] < \
now - self.app.error_suppression_interval:
del node['last_error']
if 'errors' in node:
del node['errors']
return False
limited = node['errors'] > self.app.error_suppression_limit
if limited:
self.app.logger.debug(
'Node error limited %s:%s (%s)' % (
node['ip'], node['port'], node['device']))
return limited
def error_limit(self, node):
"""
Mark a node as error limited.
:param node: dictionary of node to error limit
"""
node['errors'] = self.app.error_suppression_limit + 1
node['last_error'] = time.time()
def account_info(self, account):
"""
Get account information, and also verify that the account exists.
:param account: name of the account to get the info for
:returns: tuple of (account partition, account nodes) or (None, None)
if it does not exist
"""
partition, nodes = self.app.account_ring.get_nodes(account)
path = '/%s' % account
cache_key = 'account%s' % path
# 0 = no responses, 200 = found, 404 = not found, -1 = mixed responses
if self.app.memcache.get(cache_key):
return partition, nodes
result_code = 0
attempts_left = self.app.account_ring.replica_count
headers = {'x-cf-trans-id': self.trans_id}
for node in self.iter_nodes(partition, nodes, self.app.account_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], partition, 'HEAD', path, headers)
with Timeout(self.app.node_timeout):
resp = conn.getresponse()
body = resp.read()
if 200 <= resp.status <= 299:
result_code = 200
break
elif resp.status == 404:
result_code = 404 if not result_code else -1
elif resp.status == 507:
self.error_limit(node)
continue
else:
result_code = -1
attempts_left -= 1
if attempts_left <= 0:
break
except:
self.exception_occurred(node, 'Account',
'Trying to get account info for %s' % path)
if result_code == 200:
cache_timeout = self.app.recheck_account_existence
else:
cache_timeout = self.app.recheck_account_existence * 0.1
self.app.memcache.set(cache_key, result_code, timeout=cache_timeout)
if result_code == 200:
return partition, nodes
return (None, None)
def container_info(self, account, container):
"""
Get container information and thusly verify container existance.
This will also make a call to account_info to verify that the
account exists.
:param account: account name for the container
:param container: container name to look up
:returns: tuple of (container partition, container nodes, container
read acl, container write acl) or (None, None, None, None) if
the container does not exist
"""
partition, nodes = self.app.container_ring.get_nodes(
account, container)
path = '/%s/%s' % (account, container)
cache_key = get_container_memcache_key(account, container)
cache_value = self.app.memcache.get(cache_key)
if isinstance(cache_value, dict):
status = cache_value['status']
read_acl = cache_value['read_acl']
write_acl = cache_value['write_acl']
if status // 100 == 2:
return partition, nodes, read_acl, write_acl
if not self.account_info(account)[1]:
return (None, None, None, None)
result_code = 0
read_acl = None
write_acl = None
container_size = None
attempts_left = self.app.container_ring.replica_count
headers = {'x-cf-trans-id': self.trans_id}
for node in self.iter_nodes(partition, nodes, self.app.container_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], partition, 'HEAD', path, headers)
with Timeout(self.app.node_timeout):
resp = conn.getresponse()
body = resp.read()
if 200 <= resp.status <= 299:
result_code = 200
read_acl = resp.getheader('x-container-read')
write_acl = resp.getheader('x-container-write')
container_size = \
resp.getheader('X-Container-Object-Count')
break
elif resp.status == 404:
result_code = 404 if not result_code else -1
elif resp.status == 507:
self.error_limit(node)
continue
else:
result_code = -1
attempts_left -= 1
if attempts_left <= 0:
break
except:
self.exception_occurred(node, 'Container',
'Trying to get container info for %s' % path)
if result_code == 200:
cache_timeout = self.app.recheck_container_existence
else:
cache_timeout = self.app.recheck_container_existence * 0.1
self.app.memcache.set(cache_key, {'status': result_code,
'read_acl': read_acl,
'write_acl': write_acl,
'container_size': container_size},
timeout=cache_timeout)
if result_code == 200:
return partition, nodes, read_acl, write_acl
return (None, None, None, None)
def iter_nodes(self, partition, nodes, ring):
"""
Node iterator that will first iterate over the normal nodes for a
partition and then the handoff partitions for the node.
:param partition: partition to iterate nodes for
:param nodes: list of node dicts from the ring
:param ring: ring to get handoff nodes from
"""
for node in nodes:
yield node
for node in ring.get_more_nodes(partition):
yield node
def get_update_nodes(self, partition, nodes, ring):
""" Returns ring.replica_count nodes; the nodes will not be error
limited, if possible. """
"""
Attempt to get a non error limited list of nodes.
:param partition: partition for the nodes
:param nodes: list of node dicts for the partition
:param ring: ring to get handoff nodes from
:returns: list of node dicts that are not error limited (if possible)
"""
# make a copy so we don't modify caller's list
nodes = list(nodes)
update_nodes = []
for node in self.iter_nodes(partition, nodes, ring):
if self.error_limited(node):
continue
update_nodes.append(node)
if len(update_nodes) >= ring.replica_count:
break
while len(update_nodes) < ring.replica_count:
node = nodes.pop()
if node not in update_nodes:
update_nodes.append(node)
return update_nodes
def best_response(self, req, statuses, reasons, bodies, server_type,
etag=None):
"""
Given a list of responses from several servers, choose the best to
return to the API.
:param req: webob.Request object
:param statuses: list of statuses returned
:param reasons: list of reasons for each status
:param bodies: bodies of each response
:param server_type: type of server the responses came from
:param etag: etag
:returns: webob.Response object with the correct status, body, etc. set
"""
resp = Response(request=req)
if len(statuses):
for hundred in (200, 300, 400):
hstatuses = \
[s for s in statuses if hundred <= s < hundred + 100]
if len(hstatuses) > len(statuses) / 2:
status = max(hstatuses)
status_index = statuses.index(status)
resp.status = '%s %s' % (status, reasons[status_index])
resp.body = bodies[status_index]
resp.content_type = 'text/plain'
if etag:
resp.headers['etag'] = etag.strip('"')
return resp
self.app.logger.error('%s returning 503 for %s, transaction %s' %
(server_type, statuses, self.trans_id))
resp.status = '503 Internal Server Error'
return resp
@public
def GET(self, req):
"""Handler for HTTP GET requests."""
return self.GETorHEAD(req)
@public
def HEAD(self, req):
"""Handler for HTTP HEAD requests."""
return self.GETorHEAD(req)
def GETorHEAD_base(self, req, server_type, partition, nodes, path,
attempts):
"""
Base handler for HTTP GET or HEAD requests.
:param req: webob.Request object
:param server_type: server type
:param partition: partition
:param nodes: nodes
:param path: path for the request
:param attempts: number of attempts to try
:returns: webob.Response object
"""
statuses = []
reasons = []
bodies = []
for node in nodes:
if len(statuses) >= attempts:
break
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], partition, req.method, path,
headers=req.headers,
query_string=req.query_string)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
except:
self.exception_occurred(node, server_type,
'Trying to %s %s' % (req.method, req.path))
continue
if source.status == 507:
self.error_limit(node)
continue
if 200 <= source.status <= 399:
# 404 if we know we don't have a synced copy
if not float(source.getheader('X-PUT-Timestamp', '1')):
statuses.append(404)
reasons.append('')
bodies.append('')
source.read()
continue
if req.method == 'GET' and source.status in (200, 206):
res = Response(request=req, conditional_response=True)
res.bytes_transferred = 0
def file_iter():
try:
while True:
with ChunkReadTimeout(self.app.node_timeout):
chunk = source.read(self.app.object_chunk_size)
if not chunk:
break
yield chunk
res.bytes_transferred += len(chunk)
except GeneratorExit:
res.client_disconnect = True
self.app.logger.info(
'Client disconnected on read transaction %s' %
self.trans_id)
except:
self.exception_occurred(node, 'Object',
'Trying to read during GET of %s' % req.path)
raise
res.app_iter = file_iter()
update_headers(res, source.getheaders())
res.status = source.status
res.content_length = source.getheader('Content-Length')
if source.getheader('Content-Type'):
res.charset = None
res.content_type = source.getheader('Content-Type')
return res
elif 200 <= source.status <= 399:
res = status_map[source.status](request=req)
update_headers(res, source.getheaders())
if req.method == 'HEAD':
res.content_length = source.getheader('Content-Length')
if source.getheader('Content-Type'):
res.charset = None
res.content_type = source.getheader('Content-Type')
return res
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(source.read())
if source.status >= 500:
self.error_occurred(node, 'ERROR %d %s From %s Server' %
(source.status, bodies[-1][:1024], server_type))
return self.best_response(req, statuses, reasons, bodies,
'%s %s' % (server_type, req.method))
class ObjectController(Controller):
"""WSGI controller for object requests."""
def __init__(self, app, account_name, container_name, object_name,
**kwargs):
Controller.__init__(self, app)
self.account_name = unquote(account_name)
self.container_name = unquote(container_name)
self.object_name = unquote(object_name)
def node_post_or_delete(self, req, partition, node, path):
"""
Handle common POST/DELETE functionality
:param req: webob.Request object
:param partition: partition for the object
:param node: node dictionary for the object
:param path: path to send for the request
"""
if self.error_limited(node):
return 500, '', ''
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'],
partition, req.method, path, req.headers)
with Timeout(self.app.node_timeout):
response = conn.getresponse()
body = response.read()
if response.status == 507:
self.error_limit(node)
elif response.status >= 500:
self.error_occurred(node,
'ERROR %d %s From Object Server' %
(response.status, body[:1024]))
return response.status, response.reason, body
except:
self.exception_occurred(node, 'Object',
'Trying to %s %s' % (req.method, req.path))
return 500, '', ''
def GETorHEAD(self, req):
"""Handle HTTP GET or HEAD requests."""
if 'swift.authorize' in req.environ:
req.acl = \
self.container_info(self.account_name, self.container_name)[2]
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
return self.GETorHEAD_base(req, 'Object', partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, self.app.object_ring.replica_count)
@public
@delay_denial
def GET(self, req):
"""Handler for HTTP GET requests."""
return self.GETorHEAD(req)
@public
@delay_denial
def HEAD(self, req):
"""Handler for HTTP HEAD requests."""
return self.GETorHEAD(req)
@public
@delay_denial
def POST(self, req):
"""HTTP POST request handler."""
error_response = check_metadata(req, 'object')
if error_response:
return error_response
container_partition, containers, _, req.acl = \
self.container_info(self.account_name, self.container_name)
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not containers:
return HTTPNotFound(request=req)
containers = self.get_update_nodes(container_partition, containers,
self.app.container_ring)
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(partition, nodes, self.app.object_ring):
container = containers.pop()
req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container
req.headers['X-Container-Partition'] = container_partition
req.headers['X-Container-Device'] = container['device']
status, reason, body = \
self.node_post_or_delete(req, partition, node, req.path_info)
if 200 <= status < 300 or 400 <= status < 500:
statuses.append(status)
reasons.append(reason)
bodies.append(body)
else:
containers.insert(0, container)
if not containers:
break
while len(statuses) < len(nodes):
statuses.append(503)
reasons.append('')
bodies.append('')
return self.best_response(req, statuses, reasons,
bodies, 'Object POST')
@public
@delay_denial
def PUT(self, req):
"""HTTP PUT request handler."""
container_partition, containers, _, req.acl = \
self.container_info(self.account_name, self.container_name)
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not containers:
return HTTPNotFound(request=req)
containers = self.get_update_nodes(container_partition, containers,
self.app.container_ring)
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
# this is a temporary hook for migrations to set PUT timestamps
if '!Migration-Timestamp!' in req.headers:
req.headers['X-Timestamp'] = \
normalize_timestamp(req.headers['!Migration-Timestamp!'])
# Sometimes the 'content-type' header exists, but is set to None.
if not req.headers.get('content-type'):
guessed_type, _ = mimetypes.guess_type(req.path_info)
if not guessed_type:
req.headers['Content-Type'] = 'application/octet-stream'
else:
req.headers['Content-Type'] = guessed_type
error_response = check_object_creation(req, self.object_name)
if error_response:
return error_response
conns = []
data_source = \
iter(lambda: req.body_file.read(self.app.client_chunk_size), '')
source_header = req.headers.get('X-Copy-From')
if source_header:
source_header = unquote(source_header)
acct = req.path_info.split('/', 2)[1]
if not source_header.startswith('/'):
source_header = '/' + source_header
source_header = '/' + acct + source_header
try:
src_container_name, src_obj_name = \
source_header.split('/', 3)[2:]
except ValueError:
return HTTPPreconditionFailed(request=req,
body='X-Copy-From header must be of the form'
'<container name>/<object name>')
source_req = req.copy_get()
source_req.path_info = source_header
orig_obj_name = self.object_name
orig_container_name = self.container_name
self.object_name = src_obj_name
self.container_name = src_container_name
source_resp = self.GET(source_req)
if source_resp.status_int >= 300:
return source_resp
self.object_name = orig_obj_name
self.container_name = orig_container_name
data_source = source_resp.app_iter
new_req = Request.blank(req.path_info,
environ=req.environ, headers=req.headers)
new_req.content_length = source_resp.content_length
new_req.etag = source_resp.etag
# we no longer need the X-Copy-From header
del new_req.headers['X-Copy-From']
for k, v in source_resp.headers.items():
if k.lower().startswith('x-object-meta-'):
new_req.headers[k] = v
for k, v in req.headers.items():
if k.lower().startswith('x-object-meta-'):
new_req.headers[k] = v
req = new_req
for node in self.iter_nodes(partition, nodes, self.app.object_ring):
container = containers.pop()
req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container
req.headers['X-Container-Partition'] = container_partition
req.headers['X-Container-Device'] = container['device']
req.headers['Expect'] = '100-continue'
resp = conn = None
if not self.error_limited(node):
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], partition, 'PUT',
req.path_info, req.headers)
conn.node = node
with Timeout(self.app.node_timeout):
resp = conn.getexpect()
except:
self.exception_occurred(node, 'Object',
'Expect: 100-continue on %s' % req.path)
if conn and resp:
if resp.status == 100:
conns.append(conn)
if not containers:
break
continue
elif resp.status == 507:
self.error_limit(node)
containers.insert(0, container)
if len(conns) <= len(nodes) / 2:
self.app.logger.error(
'Object PUT returning 503, %s/%s required connections, '
'transaction %s' %
(len(conns), len(nodes) / 2 + 1, self.trans_id))
return HTTPServiceUnavailable(request=req)
try:
req.bytes_transferred = 0
while True:
with ChunkReadTimeout(self.app.client_timeout):
try:
chunk = data_source.next()
except StopIteration:
if req.headers.get('transfer-encoding'):
chunk = ''
else:
break
len_chunk = len(chunk)
req.bytes_transferred += len_chunk
if req.bytes_transferred > MAX_FILE_SIZE:
return HTTPRequestEntityTooLarge(request=req)
for conn in list(conns):
try:
with ChunkWriteTimeout(self.app.node_timeout):
if req.headers.get('transfer-encoding'):
conn.send('%x\r\n%s\r\n' % (len_chunk, chunk))
else:
conn.send(chunk)
except:
self.exception_occurred(conn.node, 'Object',
'Trying to write to %s' % req.path)
conns.remove(conn)
if len(conns) <= len(nodes) / 2:
self.app.logger.error(
'Object PUT exceptions during send, %s/%s '
'required connections, transaction %s' %
(len(conns), len(nodes) // 2 + 1,
self.trans_id))
return HTTPServiceUnavailable(request=req)
if req.headers.get('transfer-encoding') and chunk == '':
break
except ChunkReadTimeout, err:
self.app.logger.info(
'ERROR Client read timeout (%ss)' % err.seconds)
return HTTPRequestTimeout(request=req)
except:
req.client_disconnect = True
self.app.logger.exception(
'ERROR Exception causing client disconnect')
return Response(status='499 Client Disconnect')
if req.content_length and req.bytes_transferred < req.content_length:
req.client_disconnect = True
self.app.logger.info(
'Client disconnected without sending enough data %s' %
self.trans_id)
return Response(status='499 Client Disconnect')
statuses = []
reasons = []
bodies = []
etags = set()
for conn in conns:
try:
with Timeout(self.app.node_timeout):
response = conn.getresponse()
statuses.append(response.status)
reasons.append(response.reason)
bodies.append(response.read())
if response.status >= 500:
self.error_occurred(conn.node,
'ERROR %d %s From Object Server re: %s' %
(response.status, bodies[-1][:1024], req.path))
elif 200 <= response.status < 300:
etags.add(response.getheader('etag').strip('"'))
except:
self.exception_occurred(conn.node, 'Object',
'Trying to get final status of PUT to %s' % req.path)
if len(etags) > 1:
self.app.logger.error(
'Object servers returned %s mismatched etags' % len(etags))
return HTTPServerError(request=req)
etag = len(etags) and etags.pop() or None
while len(statuses) < len(nodes):
statuses.append(503)
reasons.append('')
bodies.append('')
resp = self.best_response(req, statuses, reasons, bodies, 'Object PUT',
etag=etag)
if source_header:
resp.headers['X-Copied-From'] = quote(
source_header.split('/', 2)[2])
for k, v in req.headers.items():
if k.lower().startswith('x-object-meta-'):
resp.headers[k] = v
resp.last_modified = float(req.headers['X-Timestamp'])
return resp
@public
@delay_denial
def DELETE(self, req):
"""HTTP DELETE request handler."""
container_partition, containers, _, req.acl = \
self.container_info(self.account_name, self.container_name)
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not containers:
return HTTPNotFound(request=req)
containers = self.get_update_nodes(container_partition, containers,
self.app.container_ring)
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(partition, nodes, self.app.object_ring):
container = containers.pop()
req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container
req.headers['X-Container-Partition'] = container_partition
req.headers['X-Container-Device'] = container['device']
status, reason, body = \
self.node_post_or_delete(req, partition, node, req.path_info)
if 200 <= status < 300 or 400 <= status < 500:
statuses.append(status)
reasons.append(reason)
bodies.append(body)
else:
containers.insert(0, container)
if not containers:
break
while len(statuses) < len(nodes):
statuses.append(503)
reasons.append('')
bodies.append('')
return self.best_response(req, statuses, reasons, bodies,
'Object DELETE')
@public
@delay_denial
def COPY(self, req):
"""HTTP COPY request handler."""
dest = req.headers.get('Destination')
if not dest:
return HTTPPreconditionFailed(request=req,
body='Destination header required')
dest = unquote(dest)
if not dest.startswith('/'):
dest = '/' + dest
try:
_, dest_container, dest_object = dest.split('/', 2)
except ValueError:
return HTTPPreconditionFailed(request=req,
body='Destination header must be of the form '
'<container name>/<object name>')
new_source = '/' + self.container_name + '/' + self.object_name
self.container_name = dest_container
self.object_name = dest_object
new_headers = {}
for k, v in req.headers.items():
new_headers[k] = v
new_headers['X-Copy-From'] = new_source
new_headers['Content-Length'] = 0
del new_headers['Destination']
new_path = '/' + self.account_name + dest
new_req = Request.blank(new_path, environ=req.environ,
headers=new_headers)
return self.PUT(new_req)
class ContainerController(Controller):
"""WSGI controller for container requests"""
# Ensure these are all lowercase
pass_through_headers = ['x-container-read', 'x-container-write']
def __init__(self, app, account_name, container_name, **kwargs):
Controller.__init__(self, app)
self.account_name = unquote(account_name)
self.container_name = unquote(container_name)
def clean_acls(self, req):
if 'swift.clean_acl' in req.environ:
for header in ('x-container-read', 'x-container-write'):
if header in req.headers:
try:
req.headers[header] = \
req.environ['swift.clean_acl'](header,
req.headers[header])
except ValueError, err:
return HTTPBadRequest(request=req, body=str(err))
return None
def GETorHEAD(self, req):
"""Handler for HTTP GET/HEAD requests."""
if not self.account_info(self.account_name)[1]:
return HTTPNotFound(request=req)
part, nodes = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
resp = self.GETorHEAD_base(req, 'Container', part, nodes,
req.path_info, self.app.container_ring.replica_count)
# set the memcache container size for ratelimiting
cache_key = get_container_memcache_key(self.account_name,
self.container_name)
self.app.memcache.set(cache_key,
{'status': resp.status_int,
'read_acl': resp.headers.get('x-container-read'),
'write_acl': resp.headers.get('x-container-write'),
'container_size': resp.headers.get('x-container-object-count')},
timeout=self.app.recheck_container_existence)
if 'swift.authorize' in req.environ:
req.acl = resp.headers.get('x-container-read')
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
return resp
@public
@delay_denial
def GET(self, req):
"""Handler for HTTP GET requests."""
return self.GETorHEAD(req)
@public
@delay_denial
def HEAD(self, req):
"""Handler for HTTP HEAD requests."""
return self.GETorHEAD(req)
@public
def PUT(self, req):
"""HTTP PUT request handler."""
error_response = \
self.clean_acls(req) or check_metadata(req, 'container')
if error_response:
return error_response
if len(self.container_name) > MAX_CONTAINER_NAME_LENGTH:
resp = HTTPBadRequest(request=req)
resp.body = 'Container name length of %d longer than %d' % \
(len(self.container_name), MAX_CONTAINER_NAME_LENGTH)
return resp
account_partition, accounts = self.account_info(self.account_name)
if not accounts:
return HTTPNotFound(request=req)
accounts = self.get_update_nodes(account_partition, accounts,
self.app.account_ring)
container_partition, containers = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'x-cf-trans-id': self.trans_id}
headers.update(value for value in req.headers.iteritems()
if value[0].lower() in self.pass_through_headers or
value[0].lower().startswith('x-container-meta-'))
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(container_partition, containers,
self.app.container_ring):
if self.error_limited(node):
continue
try:
account = accounts.pop()
headers['X-Account-Host'] = '%(ip)s:%(port)s' % account
headers['X-Account-Partition'] = account_partition
headers['X-Account-Device'] = account['device']
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], container_partition, 'PUT',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
else:
if source.status == 507:
self.error_limit(node)
accounts.insert(0, account)
except:
accounts.insert(0, account)
self.exception_occurred(node, 'Container',
'Trying to PUT to %s' % req.path)
if not accounts:
break
while len(statuses) < len(containers):
statuses.append(503)
reasons.append('')
bodies.append('')
cache_key = get_container_memcache_key(self.account_name,
self.container_name)
self.app.memcache.delete(cache_key)
return self.best_response(req, statuses, reasons, bodies,
'Container PUT')
@public
def POST(self, req):
"""HTTP POST request handler."""
error_response = \
self.clean_acls(req) or check_metadata(req, 'container')
if error_response:
return error_response
account_partition, accounts = self.account_info(self.account_name)
if not accounts:
return HTTPNotFound(request=req)
container_partition, containers = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'x-cf-trans-id': self.trans_id}
headers.update(value for value in req.headers.iteritems()
if value[0].lower() in self.pass_through_headers or
value[0].lower().startswith('x-container-meta-'))
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(container_partition, containers,
self.app.container_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], container_partition, 'POST',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
elif source.status == 507:
self.error_limit(node)
except:
self.exception_occurred(node, 'Container',
'Trying to POST %s' % req.path)
if len(statuses) >= len(containers):
break
while len(statuses) < len(containers):
statuses.append(503)
reasons.append('')
bodies.append('')
cache_key = get_container_memcache_key(self.account_name,
self.container_name)
self.app.memcache.delete(cache_key)
return self.best_response(req, statuses, reasons, bodies,
'Container POST')
@public
def DELETE(self, req):
"""HTTP DELETE request handler."""
account_partition, accounts = self.account_info(self.account_name)
if not accounts:
return HTTPNotFound(request=req)
accounts = self.get_update_nodes(account_partition, accounts,
self.app.account_ring)
container_partition, containers = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'x-cf-trans-id': self.trans_id}
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(container_partition, containers,
self.app.container_ring):
if self.error_limited(node):
continue
try:
account = accounts.pop()
headers['X-Account-Host'] = '%(ip)s:%(port)s' % account
headers['X-Account-Partition'] = account_partition
headers['X-Account-Device'] = account['device']
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], container_partition, 'DELETE',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
else:
if source.status == 507:
self.error_limit(node)
accounts.insert(0, account)
except:
accounts.insert(0, account)
self.exception_occurred(node, 'Container',
'Trying to DELETE %s' % req.path)
if not accounts:
break
while len(statuses) < len(containers):
statuses.append(503)
reasons.append('')
bodies.append('')
cache_key = get_container_memcache_key(self.account_name,
self.container_name)
self.app.memcache.delete(cache_key)
resp = self.best_response(req, statuses, reasons, bodies,
'Container DELETE')
if 200 <= resp.status_int <= 299:
for status in statuses:
if status < 200 or status > 299:
# If even one node doesn't do the delete, we can't be sure
# what the outcome will be once everything is in sync; so
# we 503.
self.app.logger.error('Returning 503 because not all '
'container nodes confirmed DELETE, transaction %s' %
self.trans_id)
return HTTPServiceUnavailable(request=req)
if resp.status_int == 202: # Indicates no server had the container
return HTTPNotFound(request=req)
return resp
class AccountController(Controller):
"""WSGI controller for account requests"""
def __init__(self, app, account_name, **kwargs):
Controller.__init__(self, app)
self.account_name = unquote(account_name)
def GETorHEAD(self, req):
"""Handler for HTTP GET/HEAD requests."""
partition, nodes = self.app.account_ring.get_nodes(self.account_name)
return self.GETorHEAD_base(req, 'Account', partition, nodes,
req.path_info.rstrip('/'), self.app.account_ring.replica_count)
@public
def PUT(self, req):
"""HTTP PUT request handler."""
error_response = check_metadata(req, 'account')
if error_response:
return error_response
if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
resp = HTTPBadRequest(request=req)
resp.body = 'Account name length of %d longer than %d' % \
(len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
return resp
account_partition, accounts = \
self.app.account_ring.get_nodes(self.account_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'x-cf-trans-id': self.trans_id}
headers.update(value for value in req.headers.iteritems()
if value[0].lower().startswith('x-account-meta-'))
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(account_partition, accounts,
self.app.account_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], account_partition, 'PUT',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
else:
if source.status == 507:
self.error_limit(node)
except:
self.exception_occurred(node, 'Account',
'Trying to PUT to %s' % req.path)
if len(statuses) >= len(accounts):
break
while len(statuses) < len(accounts):
statuses.append(503)
reasons.append('')
bodies.append('')
self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
return self.best_response(req, statuses, reasons, bodies,
'Account PUT')
@public
def POST(self, req):
"""HTTP POST request handler."""
error_response = check_metadata(req, 'account')
if error_response:
return error_response
account_partition, accounts = \
self.app.account_ring.get_nodes(self.account_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'X-CF-Trans-Id': self.trans_id}
headers.update(value for value in req.headers.iteritems()
if value[0].lower().startswith('x-account-meta-'))
statuses = []
reasons = []
bodies = []
for node in self.iter_nodes(account_partition, accounts,
self.app.account_ring):
if self.error_limited(node):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(node['ip'], node['port'],
node['device'], account_partition, 'POST',
req.path_info, headers)
with Timeout(self.app.node_timeout):
source = conn.getresponse()
body = source.read()
if 200 <= source.status < 300 \
or 400 <= source.status < 500:
statuses.append(source.status)
reasons.append(source.reason)
bodies.append(body)
elif source.status == 507:
self.error_limit(node)
except:
self.exception_occurred(node, 'Account',
'Trying to POST %s' % req.path)
if len(statuses) >= len(accounts):
break
while len(statuses) < len(accounts):
statuses.append(503)
reasons.append('')
bodies.append('')
self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
return self.best_response(req, statuses, reasons, bodies,
'Account POST')
class BaseApplication(object):
"""Base WSGI application for the proxy server"""
def __init__(self, conf, memcache=None, logger=None, account_ring=None,
container_ring=None, object_ring=None):
if logger is None:
self.logger = get_logger(conf)
else:
self.logger = logger
if conf is None:
conf = {}
swift_dir = conf.get('swift_dir', '/etc/swift')
self.node_timeout = int(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.client_timeout = int(conf.get('client_timeout', 60))
self.object_chunk_size = int(conf.get('object_chunk_size', 65536))
self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
self.log_headers = conf.get('log_headers') == 'True'
self.error_suppression_interval = \
int(conf.get('error_suppression_interval', 60))
self.error_suppression_limit = \
int(conf.get('error_suppression_limit', 10))
self.recheck_container_existence = \
int(conf.get('recheck_container_existence', 60))
self.recheck_account_existence = \
int(conf.get('recheck_account_existence', 60))
self.resellers_conf = ConfigParser()
self.resellers_conf.read(os.path.join(swift_dir, 'resellers.conf'))
self.object_ring = object_ring or \
Ring(os.path.join(swift_dir, 'object.ring.gz'))
self.container_ring = container_ring or \
Ring(os.path.join(swift_dir, 'container.ring.gz'))
self.account_ring = account_ring or \
Ring(os.path.join(swift_dir, 'account.ring.gz'))
self.memcache = memcache
def get_controller(self, path):
"""
Get the controller to handle a request.
:param path: path from request
:returns: tuple of (controller class, path dictionary)
:raises: ValueError (thrown by split_path) id given invalid path
"""
version, account, container, obj = split_path(path, 1, 4, True)
d = dict(version=version,
account_name=account,
container_name=container,
object_name=obj)
if obj and container and account:
return ObjectController, d
elif container and account:
return ContainerController, d
elif account and not container and not obj:
return AccountController, d
return None, d
def __call__(self, env, start_response):
"""
WSGI entry point.
Wraps env in webob.Request object and passes it down.
:param env: WSGI environment dictionary
:param start_response: WSGI callable
"""
try:
if self.memcache is None:
self.memcache = cache_from_env(env)
req = self.update_request(Request(env))
if 'eventlet.posthooks' in env:
env['eventlet.posthooks'].append(
(self.posthooklogger, (req,), {}))
return self.handle_request(req)(env, start_response)
else:
# Lack of posthook support means that we have to log on the
# start of the response, rather than after all the data has
# been sent. This prevents logging client disconnects
# differently than full transmissions.
response = self.handle_request(req)(env, start_response)
self.posthooklogger(env, req)
return response
except:
print "EXCEPTION IN __call__: %s: %s" % \
(traceback.format_exc(), env)
start_response('500 Server Error',
[('Content-Type', 'text/plain')])
return ['Internal server error.\n']
def posthooklogger(self, env, req):
pass
def update_request(self, req):
req.bytes_transferred = '-'
req.client_disconnect = False
req.headers['x-cf-trans-id'] = 'tx' + str(uuid.uuid4())
if 'x-storage-token' in req.headers and \
'x-auth-token' not in req.headers:
req.headers['x-auth-token'] = req.headers['x-storage-token']
return req
def handle_request(self, req):
"""
Entry point for proxy server.
Should return a WSGI-style callable (such as webob.Response).
:param req: webob.Request object
"""
try:
try:
controller, path_parts = self.get_controller(req.path)
except ValueError:
return HTTPNotFound(request=req)
if not check_utf8(req.path_info):
return HTTPPreconditionFailed(request=req, body='Invalid UTF8')
if not controller:
return HTTPPreconditionFailed(request=req, body='Bad URL')
controller = controller(self, **path_parts)
controller.trans_id = req.headers.get('x-cf-trans-id', '-')
try:
handler = getattr(controller, req.method)
if not getattr(handler, 'publicly_accessible'):
handler = None
except AttributeError:
handler = None
if not handler:
return HTTPMethodNotAllowed(request=req)
if path_parts['version']:
req.path_info_pop()
if 'swift.authorize' in req.environ:
# We call authorize before the handler, always. If authorized,
# we remove the swift.authorize hook so isn't ever called
# again. If not authorized, we return the denial unless the
# controller's method indicates it'd like to gather more
# information and try again later.
resp = req.environ['swift.authorize'](req)
if not resp:
# No resp means authorized, no delayed recheck required.
del req.environ['swift.authorize']
else:
# Response indicates denial, but we might delay the denial
# and recheck later. If not delayed, return the error now.
if not getattr(handler, 'delay_denial', None):
return resp
return handler(req)
except Exception:
self.logger.exception('ERROR Unhandled exception in request')
return HTTPServerError(request=req)
class Application(BaseApplication):
"""WSGI application for the proxy server."""
def handle_request(self, req):
"""
Wraps the BaseApplication.handle_request and logs the request.
"""
req.start_time = time.time()
req.response = super(Application, self).handle_request(req)
return req.response
def posthooklogger(self, env, req):
response = getattr(req, 'response', None)
if not response:
return
trans_time = '%.4f' % (time.time() - req.start_time)
the_request = quote(unquote(req.path))
if req.query_string:
the_request = the_request + '?' + req.query_string
# remote user for zeus
client = req.headers.get('x-cluster-client-ip')
if not client and 'x-forwarded-for' in req.headers:
# remote user for other lbs
client = req.headers['x-forwarded-for'].split(',')[0].strip()
logged_headers = None
if self.log_headers:
logged_headers = '\n'.join('%s: %s' % (k, v)
for k, v in req.headers.items())
status_int = response.status_int
if getattr(req, 'client_disconnect', False) or \
getattr(response, 'client_disconnect', False):
status_int = 499
self.logger.info(' '.join(quote(str(x)) for x in (
client or '-',
req.remote_addr or '-',
time.strftime('%d/%b/%Y/%H/%M/%S', time.gmtime()),
req.method,
the_request,
req.environ['SERVER_PROTOCOL'],
status_int,
req.referer or '-',
req.user_agent or '-',
req.headers.get('x-auth-token', '-'),
getattr(req, 'bytes_transferred', 0) or '-',
getattr(response, 'bytes_transferred', 0) or '-',
req.headers.get('etag', '-'),
req.headers.get('x-cf-trans-id', '-'),
logged_headers or '-',
trans_time,
)))
def app_factory(global_conf, **local_conf):
"""paste.deploy app factory for creating WSGI proxy apps."""
conf = global_conf.copy()
conf.update(local_conf)
return Application(conf)