swift/swift/container/server.py

599 lines
27 KiB
Python

# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import traceback
from swift import gettext_ as _
from xml.etree.cElementTree import Element, SubElement, tostring
from eventlet import Timeout
import swift.common.db
from swift.container.backend import ContainerBroker, DATADIR
from swift.container.replicator import ContainerReplicatorRpc
from swift.common.db import DatabaseAlreadyExists
from swift.common.container_sync_realms import ContainerSyncRealms
from swift.common.request_helpers import get_param, get_listing_content_type, \
split_and_validate_path, is_sys_or_user_meta
from swift.common.utils import get_logger, hash_path, public, \
Timestamp, storage_directory, validate_sync_to, \
config_true_value, json, timing_stats, replication, \
override_bytes_from_content_type, get_log_line
from swift.common.constraints import check_mount, valid_timestamp, check_utf8
from swift.common import constraints
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.http import HTTP_NOT_FOUND, is_success
from swift.common.storage_policy import POLICIES
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPConflict, \
HTTPCreated, HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \
HTTPPreconditionFailed, HTTPMethodNotAllowed, Request, Response, \
HTTPInsufficientStorage, HTTPException, HeaderKeyDict
def gen_resp_headers(info, is_deleted=False):
"""
Convert container info dict to headers.
"""
# backend headers are always included
headers = {
'X-Backend-Timestamp': Timestamp(info.get('created_at', 0)).internal,
'X-Backend-PUT-Timestamp': Timestamp(info.get(
'put_timestamp', 0)).internal,
'X-Backend-DELETE-Timestamp': Timestamp(
info.get('delete_timestamp', 0)).internal,
'X-Backend-Status-Changed-At': Timestamp(
info.get('status_changed_at', 0)).internal,
'X-Backend-Storage-Policy-Index': info.get('storage_policy_index', 0),
}
if not is_deleted:
# base container info on deleted containers is not exposed to client
headers.update({
'X-Container-Object-Count': info.get('object_count', 0),
'X-Container-Bytes-Used': info.get('bytes_used', 0),
'X-Timestamp': Timestamp(info.get('created_at', 0)).normal,
'X-PUT-Timestamp': Timestamp(
info.get('put_timestamp', 0)).normal,
})
return headers
class ContainerController(object):
"""WSGI Controller for the container server."""
# Ensure these are all lowercase
save_headers = ['x-container-read', 'x-container-write',
'x-container-sync-key', 'x-container-sync-to']
def __init__(self, conf, logger=None):
self.logger = logger or get_logger(conf, log_route='container-server')
self.log_requests = config_true_value(conf.get('log_requests', 'true'))
self.root = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.node_timeout = int(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
replication_server = conf.get('replication_server', None)
if replication_server is not None:
replication_server = config_true_value(replication_server)
self.replication_server = replication_server
#: ContainerSyncCluster instance for validating sync-to values.
self.realms_conf = ContainerSyncRealms(
os.path.join(
conf.get('swift_dir', '/etc/swift'),
'container-sync-realms.conf'),
self.logger)
#: The list of hosts we're allowed to send syncs to. This can be
#: overridden by data in self.realms_conf
self.allowed_sync_hosts = [
h.strip()
for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
if h.strip()]
self.replicator_rpc = ContainerReplicatorRpc(
self.root, DATADIR, ContainerBroker, self.mount_check,
logger=self.logger)
self.auto_create_account_prefix = \
conf.get('auto_create_account_prefix') or '.'
if config_true_value(conf.get('allow_versions', 'f')):
self.save_headers.append('x-versions-location')
swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f'))
def _get_container_broker(self, drive, part, account, container, **kwargs):
"""
Get a DB broker for the container.
:param drive: drive that holds the container
:param part: partition the container is in
:param account: account name
:param container: container name
:returns: ContainerBroker object
"""
hsh = hash_path(account, container)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = os.path.join(self.root, drive, db_dir, hsh + '.db')
kwargs.setdefault('account', account)
kwargs.setdefault('container', container)
kwargs.setdefault('logger', self.logger)
return ContainerBroker(db_path, **kwargs)
def get_and_validate_policy_index(self, req):
"""
Validate that the index supplied maps to a policy.
:returns: policy index from request, or None if not present
:raises: HTTPBadRequest if the supplied index is bogus
"""
policy_index = req.headers.get('X-Backend-Storage-Policy-Index', None)
if policy_index is None:
return None
try:
policy_index = int(policy_index)
except ValueError:
raise HTTPBadRequest(
request=req, content_type="text/plain",
body=("Invalid X-Storage-Policy-Index %r" % policy_index))
policy = POLICIES.get_by_index(policy_index)
if policy is None:
raise HTTPBadRequest(
request=req, content_type="text/plain",
body=("Invalid X-Storage-Policy-Index %r" % policy_index))
return int(policy)
def account_update(self, req, account, container, broker):
"""
Update the account server(s) with latest container info.
:param req: swob.Request object
:param account: account name
:param container: container name
:param broker: container DB broker object
:returns: if all the account requests return a 404 error code,
HTTPNotFound response object,
if the account cannot be updated due to a malformed header,
an HTTPBadRequest response object,
otherwise None.
"""
account_hosts = [h.strip() for h in
req.headers.get('X-Account-Host', '').split(',')]
account_devices = [d.strip() for d in
req.headers.get('X-Account-Device', '').split(',')]
account_partition = req.headers.get('X-Account-Partition', '')
if len(account_hosts) != len(account_devices):
# This shouldn't happen unless there's a bug in the proxy,
# but if there is, we want to know about it.
self.logger.error(_('ERROR Account update failed: different '
'numbers of hosts and devices in request: '
'"%s" vs "%s"') %
(req.headers.get('X-Account-Host', ''),
req.headers.get('X-Account-Device', '')))
return HTTPBadRequest(req=req)
if account_partition:
updates = zip(account_hosts, account_devices)
else:
updates = []
account_404s = 0
for account_host, account_device in updates:
account_ip, account_port = account_host.rsplit(':', 1)
new_path = '/' + '/'.join([account, container])
info = broker.get_info()
account_headers = HeaderKeyDict({
'x-put-timestamp': info['put_timestamp'],
'x-delete-timestamp': info['delete_timestamp'],
'x-object-count': info['object_count'],
'x-bytes-used': info['bytes_used'],
'x-trans-id': req.headers.get('x-trans-id', '-'),
'X-Backend-Storage-Policy-Index': info['storage_policy_index'],
'user-agent': 'container-server %s' % os.getpid(),
'referer': req.as_referer()})
if req.headers.get('x-account-override-deleted', 'no').lower() == \
'yes':
account_headers['x-account-override-deleted'] = 'yes'
try:
with ConnectionTimeout(self.conn_timeout):
conn = http_connect(
account_ip, account_port, account_device,
account_partition, 'PUT', new_path, account_headers)
with Timeout(self.node_timeout):
account_response = conn.getresponse()
account_response.read()
if account_response.status == HTTP_NOT_FOUND:
account_404s += 1
elif not is_success(account_response.status):
self.logger.error(_(
'ERROR Account update failed '
'with %(ip)s:%(port)s/%(device)s (will retry '
'later): Response %(status)s %(reason)s'),
{'ip': account_ip, 'port': account_port,
'device': account_device,
'status': account_response.status,
'reason': account_response.reason})
except (Exception, Timeout):
self.logger.exception(_(
'ERROR account update failed with '
'%(ip)s:%(port)s/%(device)s (will retry later)'),
{'ip': account_ip, 'port': account_port,
'device': account_device})
if updates and account_404s == len(updates):
return HTTPNotFound(req=req)
else:
return None
@public
@timing_stats()
def DELETE(self, req):
"""Handle HTTP DELETE request."""
drive, part, account, container, obj = split_and_validate_path(
req, 4, 5, True)
req_timestamp = valid_timestamp(req)
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
# policy index is only relevant for delete_obj (and transitively for
# auto create accounts)
obj_policy_index = self.get_and_validate_policy_index(req) or 0
broker = self._get_container_broker(drive, part, account, container)
if account.startswith(self.auto_create_account_prefix) and obj and \
not os.path.exists(broker.db_file):
try:
broker.initialize(req_timestamp.internal, obj_policy_index)
except DatabaseAlreadyExists:
pass
if not os.path.exists(broker.db_file):
return HTTPNotFound()
if obj: # delete object
broker.delete_object(obj, req.headers.get('x-timestamp'),
obj_policy_index)
return HTTPNoContent(request=req)
else:
# delete container
if not broker.empty():
return HTTPConflict(request=req)
existed = Timestamp(broker.get_info()['put_timestamp']) and \
not broker.is_deleted()
broker.delete_db(req_timestamp.internal)
if not broker.is_deleted():
return HTTPConflict(request=req)
resp = self.account_update(req, account, container, broker)
if resp:
return resp
if existed:
return HTTPNoContent(request=req)
return HTTPNotFound()
def _update_or_create(self, req, broker, timestamp, new_container_policy,
requested_policy_index):
"""
Create new database broker or update timestamps for existing database.
:param req: the swob request object
:param broker: the broker instance for the container
:param timestamp: internalized timestamp
:param new_container_policy: the storage policy index to use
when creating the container
:param requested_policy_index: the storage policy index sent in the
request, may be None
:returns: created, a bool, if database did not previously exist
"""
if not os.path.exists(broker.db_file):
try:
broker.initialize(timestamp, new_container_policy)
except DatabaseAlreadyExists:
pass
else:
return True # created
recreated = broker.is_deleted()
if recreated:
# only set storage policy on deleted containers
broker.set_storage_policy_index(new_container_policy,
timestamp=timestamp)
elif requested_policy_index is not None:
# validate requested policy with existing container
if requested_policy_index != broker.storage_policy_index:
raise HTTPConflict(request=req)
broker.update_put_timestamp(timestamp)
if broker.is_deleted():
raise HTTPConflict(request=req)
if recreated:
broker.update_status_changed_at(timestamp)
return recreated
@public
@timing_stats()
def PUT(self, req):
"""Handle HTTP PUT request."""
drive, part, account, container, obj = split_and_validate_path(
req, 4, 5, True)
req_timestamp = valid_timestamp(req)
if 'x-container-sync-to' in req.headers:
err, sync_to, realm, realm_key = validate_sync_to(
req.headers['x-container-sync-to'], self.allowed_sync_hosts,
self.realms_conf)
if err:
return HTTPBadRequest(err)
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
requested_policy_index = self.get_and_validate_policy_index(req)
broker = self._get_container_broker(drive, part, account, container)
if obj: # put container object
# obj put expects the policy_index header, default is for
# legacy support during upgrade.
obj_policy_index = requested_policy_index or 0
if account.startswith(self.auto_create_account_prefix) and \
not os.path.exists(broker.db_file):
try:
broker.initialize(req_timestamp.internal, obj_policy_index)
except DatabaseAlreadyExists:
pass
if not os.path.exists(broker.db_file):
return HTTPNotFound()
broker.put_object(obj, req_timestamp.internal,
int(req.headers['x-size']),
req.headers['x-content-type'],
req.headers['x-etag'], 0,
obj_policy_index)
return HTTPCreated(request=req)
else: # put container
if requested_policy_index is None:
# use the default index sent by the proxy if available
new_container_policy = req.headers.get(
'X-Backend-Storage-Policy-Default', int(POLICIES.default))
else:
new_container_policy = requested_policy_index
created = self._update_or_create(req, broker,
req_timestamp.internal,
new_container_policy,
requested_policy_index)
metadata = {}
metadata.update(
(key, (value, req_timestamp.internal))
for key, value in req.headers.iteritems()
if key.lower() in self.save_headers or
is_sys_or_user_meta('container', key))
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
metadata['X-Container-Sync-To'][0] != \
broker.metadata['X-Container-Sync-To'][0]:
broker.set_x_container_sync_points(-1, -1)
broker.update_metadata(metadata, validate_metadata=True)
resp = self.account_update(req, account, container, broker)
if resp:
return resp
if created:
return HTTPCreated(request=req)
else:
return HTTPAccepted(request=req)
@public
@timing_stats(sample_rate=0.1)
def HEAD(self, req):
"""Handle HTTP HEAD request."""
drive, part, account, container, obj = split_and_validate_path(
req, 4, 5, True)
out_content_type = get_listing_content_type(req)
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
broker = self._get_container_broker(drive, part, account, container,
pending_timeout=0.1,
stale_reads_ok=True)
info, is_deleted = broker.get_info_is_deleted()
headers = gen_resp_headers(info, is_deleted=is_deleted)
if is_deleted:
return HTTPNotFound(request=req, headers=headers)
headers.update(
(key, value)
for key, (value, timestamp) in broker.metadata.iteritems()
if value != '' and (key.lower() in self.save_headers or
is_sys_or_user_meta('container', key)))
headers['Content-Type'] = out_content_type
return HTTPNoContent(request=req, headers=headers, charset='utf-8')
def update_data_record(self, record):
"""
Perform any mutations to container listing records that are common to
all serialization formats, and returns it as a dict.
Converts created time to iso timestamp.
Replaces size with 'swift_bytes' content type parameter.
:params record: object entry record
:returns: modified record
"""
(name, created, size, content_type, etag) = record[:5]
if content_type is None:
return {'subdir': name}
response = {'bytes': size, 'hash': etag, 'name': name,
'content_type': content_type}
response['last_modified'] = Timestamp(created).isoformat
override_bytes_from_content_type(response, logger=self.logger)
return response
@public
@timing_stats()
def GET(self, req):
"""Handle HTTP GET request."""
drive, part, account, container, obj = split_and_validate_path(
req, 4, 5, True)
path = get_param(req, 'path')
prefix = get_param(req, 'prefix')
delimiter = get_param(req, 'delimiter')
if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
# delimiters can be made more flexible later
return HTTPPreconditionFailed(body='Bad delimiter')
marker = get_param(req, 'marker', '')
end_marker = get_param(req, 'end_marker')
limit = constraints.CONTAINER_LISTING_LIMIT
given_limit = get_param(req, 'limit')
if given_limit and given_limit.isdigit():
limit = int(given_limit)
if limit > constraints.CONTAINER_LISTING_LIMIT:
return HTTPPreconditionFailed(
request=req,
body='Maximum limit is %d'
% constraints.CONTAINER_LISTING_LIMIT)
out_content_type = get_listing_content_type(req)
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
broker = self._get_container_broker(drive, part, account, container,
pending_timeout=0.1,
stale_reads_ok=True)
info, is_deleted = broker.get_info_is_deleted()
resp_headers = gen_resp_headers(info, is_deleted=is_deleted)
if is_deleted:
return HTTPNotFound(request=req, headers=resp_headers)
container_list = broker.list_objects_iter(
limit, marker, end_marker, prefix, delimiter, path,
storage_policy_index=info['storage_policy_index'])
return self.create_listing(req, out_content_type, info, resp_headers,
broker.metadata, container_list, container)
def create_listing(self, req, out_content_type, info, resp_headers,
metadata, container_list, container):
for key, (value, timestamp) in metadata.iteritems():
if value and (key.lower() in self.save_headers or
is_sys_or_user_meta('container', key)):
resp_headers[key] = value
ret = Response(request=req, headers=resp_headers,
content_type=out_content_type, charset='utf-8')
if out_content_type == 'application/json':
ret.body = json.dumps([self.update_data_record(record)
for record in container_list])
elif out_content_type.endswith('/xml'):
doc = Element('container', name=container.decode('utf-8'))
for obj in container_list:
record = self.update_data_record(obj)
if 'subdir' in record:
name = record['subdir'].decode('utf-8')
sub = SubElement(doc, 'subdir', name=name)
SubElement(sub, 'name').text = name
else:
obj_element = SubElement(doc, 'object')
for field in ["name", "hash", "bytes", "content_type",
"last_modified"]:
SubElement(obj_element, field).text = str(
record.pop(field)).decode('utf-8')
for field in sorted(record):
SubElement(obj_element, field).text = str(
record[field]).decode('utf-8')
ret.body = tostring(doc, encoding='UTF-8').replace(
"<?xml version='1.0' encoding='UTF-8'?>",
'<?xml version="1.0" encoding="UTF-8"?>', 1)
else:
if not container_list:
return HTTPNoContent(request=req, headers=resp_headers)
ret.body = '\n'.join(rec[0] for rec in container_list) + '\n'
return ret
@public
@replication
@timing_stats(sample_rate=0.01)
def REPLICATE(self, req):
"""
Handle HTTP REPLICATE request (json-encoded RPC calls for replication.)
"""
post_args = split_and_validate_path(req, 3)
drive, partition, hash = post_args
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
try:
args = json.load(req.environ['wsgi.input'])
except ValueError as err:
return HTTPBadRequest(body=str(err), content_type='text/plain')
ret = self.replicator_rpc.dispatch(post_args, args)
ret.request = req
return ret
@public
@timing_stats()
def POST(self, req):
"""Handle HTTP POST request."""
drive, part, account, container = split_and_validate_path(req, 4)
req_timestamp = valid_timestamp(req)
if 'x-container-sync-to' in req.headers:
err, sync_to, realm, realm_key = validate_sync_to(
req.headers['x-container-sync-to'], self.allowed_sync_hosts,
self.realms_conf)
if err:
return HTTPBadRequest(err)
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
broker = self._get_container_broker(drive, part, account, container)
if broker.is_deleted():
return HTTPNotFound(request=req)
metadata = {}
metadata.update(
(key, (value, req_timestamp.internal))
for key, value in req.headers.iteritems()
if key.lower() in self.save_headers or
is_sys_or_user_meta('container', key))
if metadata:
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
metadata['X-Container-Sync-To'][0] != \
broker.metadata['X-Container-Sync-To'][0]:
broker.set_x_container_sync_points(-1, -1)
broker.update_metadata(metadata, validate_metadata=True)
return HTTPNoContent(request=req)
def __call__(self, env, start_response):
start_time = time.time()
req = Request(env)
self.logger.txn_id = req.headers.get('x-trans-id', None)
if not check_utf8(req.path_info):
res = HTTPPreconditionFailed(body='Invalid UTF8 or contains NULL')
else:
try:
# disallow methods which have not been marked 'public'
try:
method = getattr(self, req.method)
getattr(method, 'publicly_accessible')
replication_method = getattr(method, 'replication', False)
if (self.replication_server is not None and
self.replication_server != replication_method):
raise AttributeError('Not allowed method.')
except AttributeError:
res = HTTPMethodNotAllowed()
else:
res = method(req)
except HTTPException as error_response:
res = error_response
except (Exception, Timeout):
self.logger.exception(_(
'ERROR __call__ error with %(method)s %(path)s '),
{'method': req.method, 'path': req.path})
res = HTTPInternalServerError(body=traceback.format_exc())
if self.log_requests:
trans_time = time.time() - start_time
log_message = get_log_line(req, res, trans_time, '')
if req.method.upper() == 'REPLICATE':
self.logger.debug(log_message)
else:
self.logger.info(log_message)
return res(env, start_response)
def app_factory(global_conf, **local_conf):
"""paste.deploy app factory for creating WSGI container server apps"""
conf = global_conf.copy()
conf.update(local_conf)
return ContainerController(conf)