ac524832e9
While the existing named kwargs (marker, limit, prefix and etc) are common parameters among different use cases of direct_get_container, a new 'extra_params' kwarg is added to support additional private parameters for each individual use case. Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Change-Id: If1d355fbc5d292cee4f44c0e5dc52e5df9ae990b
734 lines
29 KiB
Python
734 lines
29 KiB
Python
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Internal client library for making calls directly to the servers rather than
|
|
through the proxy.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import socket
|
|
|
|
from eventlet import sleep, Timeout
|
|
import six
|
|
import six.moves.cPickle as pickle
|
|
from six.moves.http_client import HTTPException
|
|
|
|
from swift.common.bufferedhttp import http_connect, http_connect_raw
|
|
from swift.common.exceptions import ClientException
|
|
from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER, \
|
|
get_ip_port
|
|
from swift.common.swob import normalize_etag
|
|
from swift.common.utils import Timestamp, FileLikeIter, quote
|
|
from swift.common.http import HTTP_NO_CONTENT, HTTP_INSUFFICIENT_STORAGE, \
|
|
is_success, is_server_error
|
|
from swift.common.header_key_dict import HeaderKeyDict
|
|
|
|
|
|
class DirectClientException(ClientException):
|
|
|
|
def __init__(self, stype, method, node, part, path, resp, host=None):
|
|
# host can be used to override the node ip and port reported in
|
|
# the exception
|
|
host = host if host is not None else node
|
|
if not isinstance(path, six.text_type):
|
|
path = path.decode("utf-8")
|
|
full_path = quote('/%s/%s%s' % (node['device'], part, path))
|
|
msg = '%s server %s:%s direct %s %r gave status %s' % (
|
|
stype, host['ip'], host['port'], method, full_path, resp.status)
|
|
headers = HeaderKeyDict(resp.getheaders())
|
|
super(DirectClientException, self).__init__(
|
|
msg, http_host=host['ip'], http_port=host['port'],
|
|
http_device=node['device'], http_status=resp.status,
|
|
http_reason=resp.reason, http_headers=headers)
|
|
|
|
|
|
class DirectClientReconException(ClientException):
|
|
|
|
def __init__(self, method, node, path, resp):
|
|
if not isinstance(path, six.text_type):
|
|
path = path.decode("utf-8")
|
|
msg = 'server %s:%s direct %s %r gave status %s' % (
|
|
node['ip'], node['port'], method, path, resp.status)
|
|
headers = HeaderKeyDict(resp.getheaders())
|
|
super(DirectClientReconException, self).__init__(
|
|
msg, http_host=node['ip'], http_port=node['port'],
|
|
http_status=resp.status, http_reason=resp.reason,
|
|
http_headers=headers)
|
|
|
|
|
|
def _make_path(*components):
|
|
return u'/' + u'/'.join(
|
|
x.decode('utf-8') if isinstance(x, six.binary_type) else x
|
|
for x in components)
|
|
|
|
|
|
def _make_req(node, part, method, path, headers, stype,
|
|
conn_timeout=5, response_timeout=15, send_timeout=15,
|
|
contents=None, content_length=None, chunk_size=65535):
|
|
"""
|
|
Make request to backend storage node.
|
|
(i.e. 'Account', 'Container', 'Object')
|
|
:param node: a node dict from a ring
|
|
:param part: an integer, the partition number
|
|
:param method: a string, the HTTP method (e.g. 'PUT', 'DELETE', etc)
|
|
:param path: a string, the request path
|
|
:param headers: a dict, header name => value
|
|
:param stype: a string, describing the type of service
|
|
:param conn_timeout: timeout while waiting for connection; default is 5
|
|
seconds
|
|
:param response_timeout: timeout while waiting for response; default is 15
|
|
seconds
|
|
:param send_timeout: timeout for sending request body; default is 15
|
|
seconds
|
|
:param contents: an iterable or string to read object data from
|
|
:param content_length: value to send as content-length header
|
|
:param chunk_size: if defined, chunk size of data to send
|
|
:returns: an HTTPResponse object
|
|
:raises DirectClientException: if the response status is not 2xx
|
|
:raises eventlet.Timeout: if either conn_timeout or response_timeout is
|
|
exceeded
|
|
"""
|
|
if contents is not None:
|
|
if content_length is not None:
|
|
headers['Content-Length'] = str(content_length)
|
|
else:
|
|
for n, v in headers.items():
|
|
if n.lower() == 'content-length':
|
|
content_length = int(v)
|
|
if not contents:
|
|
headers['Content-Length'] = '0'
|
|
if isinstance(contents, six.string_types):
|
|
contents = [contents]
|
|
if content_length is None:
|
|
headers['Transfer-Encoding'] = 'chunked'
|
|
|
|
ip, port = get_ip_port(node, headers)
|
|
headers.setdefault('X-Backend-Allow-Reserved-Names', 'true')
|
|
with Timeout(conn_timeout):
|
|
conn = http_connect(ip, port, node['device'], part,
|
|
method, path, headers=headers)
|
|
|
|
if contents is not None:
|
|
contents_f = FileLikeIter(contents)
|
|
|
|
with Timeout(send_timeout):
|
|
if content_length is None:
|
|
chunk = contents_f.read(chunk_size)
|
|
while chunk:
|
|
conn.send(b'%x\r\n%s\r\n' % (len(chunk), chunk))
|
|
chunk = contents_f.read(chunk_size)
|
|
conn.send(b'0\r\n\r\n')
|
|
else:
|
|
left = content_length
|
|
while left > 0:
|
|
size = chunk_size
|
|
if size > left:
|
|
size = left
|
|
chunk = contents_f.read(size)
|
|
if not chunk:
|
|
break
|
|
conn.send(chunk)
|
|
left -= len(chunk)
|
|
|
|
with Timeout(response_timeout):
|
|
resp = conn.getresponse()
|
|
resp.read()
|
|
if not is_success(resp.status):
|
|
raise DirectClientException(stype, method, node, part, path, resp)
|
|
return resp
|
|
|
|
|
|
def _get_direct_account_container(path, stype, node, part,
|
|
marker=None, limit=None,
|
|
prefix=None, delimiter=None,
|
|
conn_timeout=5, response_timeout=15,
|
|
end_marker=None, reverse=None, headers=None,
|
|
extra_params=None):
|
|
"""Base function for get direct account and container.
|
|
|
|
Do not use directly use the direct_get_account or
|
|
direct_get_container instead.
|
|
"""
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
params = {'format': 'json'}
|
|
if extra_params:
|
|
for key, value in extra_params.items():
|
|
if value is not None:
|
|
params[key] = value
|
|
if marker:
|
|
if 'marker' in params:
|
|
raise TypeError('duplicate values for keyword arg: marker')
|
|
params['marker'] = quote(marker)
|
|
if limit:
|
|
if 'limit' in params:
|
|
raise TypeError('duplicate values for keyword arg: limit')
|
|
params['limit'] = '%d' % limit
|
|
if prefix:
|
|
if 'prefix' in params:
|
|
raise TypeError('duplicate values for keyword arg: prefix')
|
|
params['prefix'] = quote(prefix)
|
|
if delimiter:
|
|
if 'delimiter' in params:
|
|
raise TypeError('duplicate values for keyword arg: delimiter')
|
|
params['delimiter'] = quote(delimiter)
|
|
if end_marker:
|
|
if 'end_marker' in params:
|
|
raise TypeError('duplicate values for keyword arg: end_marker')
|
|
params['end_marker'] = quote(end_marker)
|
|
if reverse:
|
|
if 'reverse' in params:
|
|
raise TypeError('duplicate values for keyword arg: reverse')
|
|
params['reverse'] = quote(reverse)
|
|
qs = '&'.join('%s=%s' % (k, v) for k, v in params.items())
|
|
|
|
ip, port = get_ip_port(node, headers)
|
|
with Timeout(conn_timeout):
|
|
conn = http_connect(ip, port, node['device'], part,
|
|
'GET', path, query_string=qs,
|
|
headers=gen_headers(hdrs_in=headers))
|
|
with Timeout(response_timeout):
|
|
resp = conn.getresponse()
|
|
if not is_success(resp.status):
|
|
resp.read()
|
|
raise DirectClientException(stype, 'GET', node, part, path, resp)
|
|
|
|
resp_headers = HeaderKeyDict()
|
|
for header, value in resp.getheaders():
|
|
resp_headers[header] = value
|
|
if resp.status == HTTP_NO_CONTENT:
|
|
resp.read()
|
|
return resp_headers, []
|
|
return resp_headers, json.loads(resp.read())
|
|
|
|
|
|
def gen_headers(hdrs_in=None, add_ts=True):
|
|
"""
|
|
Get the headers ready for a request. All requests should have a User-Agent
|
|
string, but if one is passed in don't over-write it. Not all requests will
|
|
need an X-Timestamp, but if one is passed in do not over-write it.
|
|
|
|
:param headers: dict or None, base for HTTP headers
|
|
:param add_ts: boolean, should be True for any "unsafe" HTTP request
|
|
|
|
:returns: HeaderKeyDict based on headers and ready for the request
|
|
"""
|
|
hdrs_out = HeaderKeyDict(hdrs_in) if hdrs_in else HeaderKeyDict()
|
|
if add_ts and 'X-Timestamp' not in hdrs_out:
|
|
hdrs_out['X-Timestamp'] = Timestamp.now().internal
|
|
if 'user-agent' not in hdrs_out:
|
|
hdrs_out['User-Agent'] = 'direct-client %s' % os.getpid()
|
|
hdrs_out.setdefault('X-Backend-Allow-Reserved-Names', 'true')
|
|
return hdrs_out
|
|
|
|
|
|
def direct_get_account(node, part, account, marker=None, limit=None,
|
|
prefix=None, delimiter=None, conn_timeout=5,
|
|
response_timeout=15, end_marker=None, reverse=None,
|
|
headers=None):
|
|
"""
|
|
Get listings directly from the account server.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param part: partition the account is on
|
|
:param account: account name
|
|
:param marker: marker query
|
|
:param limit: query limit
|
|
:param prefix: prefix query
|
|
:param delimiter: delimiter for the query
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:param end_marker: end_marker query
|
|
:param reverse: reverse the returned listing
|
|
:returns: a tuple of (response headers, a list of containers) The response
|
|
headers will HeaderKeyDict.
|
|
"""
|
|
path = _make_path(account)
|
|
return _get_direct_account_container(path, "Account", node, part,
|
|
headers=headers,
|
|
marker=marker,
|
|
limit=limit, prefix=prefix,
|
|
delimiter=delimiter,
|
|
end_marker=end_marker,
|
|
reverse=reverse,
|
|
conn_timeout=conn_timeout,
|
|
response_timeout=response_timeout)
|
|
|
|
|
|
def direct_delete_account(node, part, account, conn_timeout=5,
|
|
response_timeout=15, headers=None):
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
path = _make_path(account)
|
|
_make_req(node, part, 'DELETE', path, gen_headers(headers, True),
|
|
'Account', conn_timeout, response_timeout)
|
|
|
|
|
|
def direct_head_container(node, part, account, container, conn_timeout=5,
|
|
response_timeout=15, headers=None):
|
|
"""
|
|
Request container information directly from the container server.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param part: partition the container is on
|
|
:param account: account name
|
|
:param container: container name
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:returns: a dict containing the response's headers in a HeaderKeyDict
|
|
:raises ClientException: HTTP HEAD request failed
|
|
"""
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
path = _make_path(account, container)
|
|
resp = _make_req(node, part, 'HEAD', path, gen_headers(headers),
|
|
'Container', conn_timeout, response_timeout)
|
|
|
|
resp_headers = HeaderKeyDict()
|
|
for header, value in resp.getheaders():
|
|
resp_headers[header] = value
|
|
return resp_headers
|
|
|
|
|
|
def direct_get_container(node, part, account, container, marker=None,
|
|
limit=None, prefix=None, delimiter=None,
|
|
conn_timeout=5, response_timeout=15, end_marker=None,
|
|
reverse=None, headers=None, extra_params=None):
|
|
"""
|
|
Get container listings directly from the container server.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param part: partition the container is on
|
|
:param account: account name
|
|
:param container: container name
|
|
:param marker: marker query
|
|
:param limit: query limit
|
|
:param prefix: prefix query
|
|
:param delimiter: delimiter for the query
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:param end_marker: end_marker query
|
|
:param reverse: reverse the returned listing
|
|
:param headers: headers to be included in the request
|
|
:param extra_params: a dict of extra parameters to be included in the
|
|
request. It can be used to pass additional parameters, e.g,
|
|
{'states':'updating'} can be used with shard_range/namespace listing.
|
|
It can also be used to pass the existing keyword args, like 'marker' or
|
|
'limit', but if the same parameter appears twice in both keyword arg
|
|
(not None) and extra_params, this function will raise TypeError.
|
|
:returns: a tuple of (response headers, a list of objects) The response
|
|
headers will be a HeaderKeyDict.
|
|
"""
|
|
path = _make_path(account, container)
|
|
return _get_direct_account_container(path, "Container", node,
|
|
part, marker=marker,
|
|
limit=limit, prefix=prefix,
|
|
delimiter=delimiter,
|
|
end_marker=end_marker,
|
|
reverse=reverse,
|
|
conn_timeout=conn_timeout,
|
|
response_timeout=response_timeout,
|
|
headers=headers,
|
|
extra_params=extra_params)
|
|
|
|
|
|
def direct_delete_container(node, part, account, container, conn_timeout=5,
|
|
response_timeout=15, headers=None):
|
|
"""
|
|
Delete container directly from the container server.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param part: partition the container is on
|
|
:param account: account name
|
|
:param container: container name
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:param headers: dict to be passed into HTTPConnection headers
|
|
:raises ClientException: HTTP DELETE request failed
|
|
"""
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
path = _make_path(account, container)
|
|
add_timestamp = 'x-timestamp' not in (k.lower() for k in headers)
|
|
_make_req(node, part, 'DELETE', path, gen_headers(headers, add_timestamp),
|
|
'Container', conn_timeout, response_timeout)
|
|
|
|
|
|
def direct_put_container(node, part, account, container, conn_timeout=5,
|
|
response_timeout=15, headers=None, contents=None,
|
|
content_length=None, chunk_size=65535):
|
|
"""
|
|
Make a PUT request to a container server.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param part: partition the container is on
|
|
:param account: account name
|
|
:param container: container name
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:param headers: additional headers to include in the request
|
|
:param contents: an iterable or string to send in request body (optional)
|
|
:param content_length: value to send as content-length header (optional)
|
|
:param chunk_size: chunk size of data to send (optional)
|
|
:raises ClientException: HTTP PUT request failed
|
|
"""
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
lower_headers = set(k.lower() for k in headers)
|
|
headers_out = gen_headers(headers,
|
|
add_ts='x-timestamp' not in lower_headers)
|
|
path = _make_path(account, container)
|
|
_make_req(node, part, 'PUT', path, headers_out, 'Container', conn_timeout,
|
|
response_timeout, contents=contents,
|
|
content_length=content_length, chunk_size=chunk_size)
|
|
|
|
|
|
def direct_post_container(node, part, account, container, conn_timeout=5,
|
|
response_timeout=15, headers=None):
|
|
"""
|
|
Make a POST request to a container server.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param part: partition the container is on
|
|
:param account: account name
|
|
:param container: container name
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:param headers: additional headers to include in the request
|
|
:raises ClientException: HTTP PUT request failed
|
|
"""
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
lower_headers = set(k.lower() for k in headers)
|
|
headers_out = gen_headers(headers,
|
|
add_ts='x-timestamp' not in lower_headers)
|
|
path = _make_path(account, container)
|
|
return _make_req(node, part, 'POST', path, headers_out, 'Container',
|
|
conn_timeout, response_timeout)
|
|
|
|
|
|
def direct_put_container_object(node, part, account, container, obj,
|
|
conn_timeout=5, response_timeout=15,
|
|
headers=None):
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
have_x_timestamp = 'x-timestamp' in (k.lower() for k in headers)
|
|
|
|
path = _make_path(account, container, obj)
|
|
_make_req(node, part, 'PUT', path,
|
|
gen_headers(headers, add_ts=(not have_x_timestamp)),
|
|
'Container', conn_timeout, response_timeout)
|
|
|
|
|
|
def direct_delete_container_object(node, part, account, container, obj,
|
|
conn_timeout=5, response_timeout=15,
|
|
headers=None):
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
headers = gen_headers(headers, add_ts='x-timestamp' not in (
|
|
k.lower() for k in headers))
|
|
|
|
path = _make_path(account, container, obj)
|
|
_make_req(node, part, 'DELETE', path, headers,
|
|
'Container', conn_timeout, response_timeout)
|
|
|
|
|
|
def direct_head_object(node, part, account, container, obj, conn_timeout=5,
|
|
response_timeout=15, headers=None):
|
|
"""
|
|
Request object information directly from the object server.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param part: partition the container is on
|
|
:param account: account name
|
|
:param container: container name
|
|
:param obj: object name
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:param headers: dict to be passed into HTTPConnection headers
|
|
:returns: a dict containing the response's headers in a HeaderKeyDict
|
|
:raises ClientException: HTTP HEAD request failed
|
|
"""
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
headers = gen_headers(headers)
|
|
|
|
path = _make_path(account, container, obj)
|
|
resp = _make_req(node, part, 'HEAD', path, headers,
|
|
'Object', conn_timeout, response_timeout)
|
|
|
|
resp_headers = HeaderKeyDict()
|
|
for header, value in resp.getheaders():
|
|
resp_headers[header] = value
|
|
return resp_headers
|
|
|
|
|
|
def direct_get_object(node, part, account, container, obj, conn_timeout=5,
|
|
response_timeout=15, resp_chunk_size=None, headers=None):
|
|
"""
|
|
Get object directly from the object server.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param part: partition the container is on
|
|
:param account: account name
|
|
:param container: container name
|
|
:param obj: object name
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:param resp_chunk_size: if defined, chunk size of data to read.
|
|
:param headers: dict to be passed into HTTPConnection headers
|
|
:returns: a tuple of (response headers, the object's contents) The response
|
|
headers will be a HeaderKeyDict.
|
|
:raises ClientException: HTTP GET request failed
|
|
"""
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
ip, port = get_ip_port(node, headers)
|
|
path = _make_path(account, container, obj)
|
|
with Timeout(conn_timeout):
|
|
conn = http_connect(ip, port, node['device'], part,
|
|
'GET', path, headers=gen_headers(headers))
|
|
with Timeout(response_timeout):
|
|
resp = conn.getresponse()
|
|
if not is_success(resp.status):
|
|
resp.read()
|
|
raise DirectClientException('Object', 'GET', node, part, path, resp)
|
|
|
|
if resp_chunk_size:
|
|
|
|
def _object_body():
|
|
buf = resp.read(resp_chunk_size)
|
|
while buf:
|
|
yield buf
|
|
buf = resp.read(resp_chunk_size)
|
|
object_body = _object_body()
|
|
else:
|
|
object_body = resp.read()
|
|
resp_headers = HeaderKeyDict()
|
|
for header, value in resp.getheaders():
|
|
resp_headers[header] = value
|
|
return resp_headers, object_body
|
|
|
|
|
|
def direct_put_object(node, part, account, container, name, contents,
|
|
content_length=None, etag=None, content_type=None,
|
|
headers=None, conn_timeout=5, response_timeout=15,
|
|
chunk_size=65535):
|
|
"""
|
|
Put object directly from the object server.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param part: partition the container is on
|
|
:param account: account name
|
|
:param container: container name
|
|
:param name: object name
|
|
:param contents: an iterable or string to read object data from
|
|
:param content_length: value to send as content-length header
|
|
:param etag: etag of contents
|
|
:param content_type: value to send as content-type header
|
|
:param headers: additional headers to include in the request
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:param chunk_size: if defined, chunk size of data to send.
|
|
:returns: etag from the server response
|
|
:raises ClientException: HTTP PUT request failed
|
|
"""
|
|
|
|
path = _make_path(account, container, name)
|
|
if headers is None:
|
|
headers = {}
|
|
if etag:
|
|
headers['ETag'] = normalize_etag(etag)
|
|
if content_type is not None:
|
|
headers['Content-Type'] = content_type
|
|
else:
|
|
headers['Content-Type'] = 'application/octet-stream'
|
|
# Incase the caller want to insert an object with specific age
|
|
add_ts = 'X-Timestamp' not in headers
|
|
|
|
resp = _make_req(
|
|
node, part, 'PUT', path, gen_headers(headers, add_ts=add_ts),
|
|
'Object', conn_timeout, response_timeout, contents=contents,
|
|
content_length=content_length, chunk_size=chunk_size)
|
|
|
|
return normalize_etag(resp.getheader('etag'))
|
|
|
|
|
|
def direct_post_object(node, part, account, container, name, headers,
|
|
conn_timeout=5, response_timeout=15):
|
|
"""
|
|
Direct update to object metadata on object server.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param part: partition the container is on
|
|
:param account: account name
|
|
:param container: container name
|
|
:param name: object name
|
|
:param headers: headers to store as metadata
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:raises ClientException: HTTP POST request failed
|
|
"""
|
|
path = _make_path(account, container, name)
|
|
_make_req(node, part, 'POST', path, gen_headers(headers, True),
|
|
'Object', conn_timeout, response_timeout)
|
|
|
|
|
|
def direct_delete_object(node, part, account, container, obj,
|
|
conn_timeout=5, response_timeout=15, headers=None):
|
|
"""
|
|
Delete object directly from the object server.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param part: partition the container is on
|
|
:param account: account name
|
|
:param container: container name
|
|
:param obj: object name
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:raises ClientException: HTTP DELETE request failed
|
|
"""
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
headers = gen_headers(headers, add_ts='x-timestamp' not in (
|
|
k.lower() for k in headers))
|
|
|
|
path = _make_path(account, container, obj)
|
|
_make_req(node, part, 'DELETE', path, headers,
|
|
'Object', conn_timeout, response_timeout)
|
|
|
|
|
|
def direct_get_suffix_hashes(node, part, suffixes, conn_timeout=5,
|
|
response_timeout=15, headers=None):
|
|
"""
|
|
Get suffix hashes directly from the object server.
|
|
|
|
Note that unlike other ``direct_client`` functions, this one defaults
|
|
to using the replication network to make requests.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param part: partition the container is on
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:param headers: dict to be passed into HTTPConnection headers
|
|
:returns: dict of suffix hashes
|
|
:raises ClientException: HTTP REPLICATE request failed
|
|
"""
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
headers.setdefault(USE_REPLICATION_NETWORK_HEADER, 'true')
|
|
ip, port = get_ip_port(node, headers)
|
|
path = '/%s' % '-'.join(suffixes)
|
|
with Timeout(conn_timeout):
|
|
conn = http_connect(ip, port,
|
|
node['device'], part, 'REPLICATE', path,
|
|
headers=gen_headers(headers))
|
|
with Timeout(response_timeout):
|
|
resp = conn.getresponse()
|
|
if not is_success(resp.status):
|
|
raise DirectClientException('Object', 'REPLICATE',
|
|
node, part, path, resp,
|
|
host={'ip': node['replication_ip'],
|
|
'port': node['replication_port']}
|
|
)
|
|
return pickle.loads(resp.read())
|
|
|
|
|
|
def retry(func, *args, **kwargs):
|
|
"""
|
|
Helper function to retry a given function a number of times.
|
|
|
|
:param func: callable to be called
|
|
:param retries: number of retries
|
|
:param error_log: logger for errors
|
|
:param args: arguments to send to func
|
|
:param kwargs: keyward arguments to send to func (if retries or
|
|
error_log are sent, they will be deleted from kwargs
|
|
before sending on to func)
|
|
:returns: result of func
|
|
:raises ClientException: all retries failed
|
|
"""
|
|
retries = kwargs.pop('retries', 5)
|
|
error_log = kwargs.pop('error_log', None)
|
|
attempts = 0
|
|
backoff = 1
|
|
while attempts <= retries:
|
|
attempts += 1
|
|
try:
|
|
return attempts, func(*args, **kwargs)
|
|
except (socket.error, HTTPException, Timeout) as err:
|
|
if error_log:
|
|
error_log(err)
|
|
if attempts > retries:
|
|
raise
|
|
except ClientException as err:
|
|
if error_log:
|
|
error_log(err)
|
|
if attempts > retries or not is_server_error(err.http_status) or \
|
|
err.http_status == HTTP_INSUFFICIENT_STORAGE:
|
|
raise
|
|
sleep(backoff)
|
|
backoff *= 2
|
|
# Shouldn't actually get down here, but just in case.
|
|
if args and 'ip' in args[0]:
|
|
raise ClientException('Raise too many retries',
|
|
http_host=args[0]['ip'],
|
|
http_port=args[0]['port'],
|
|
http_device=args[0]['device'])
|
|
else:
|
|
raise ClientException('Raise too many retries')
|
|
|
|
|
|
def direct_get_recon(node, recon_command, conn_timeout=5, response_timeout=15,
|
|
headers=None):
|
|
"""
|
|
Get recon json directly from the storage server.
|
|
|
|
:param node: node dictionary from the ring
|
|
:param recon_command: recon string (post /recon/)
|
|
:param conn_timeout: timeout in seconds for establishing the connection
|
|
:param response_timeout: timeout in seconds for getting the response
|
|
:param headers: dict to be passed into HTTPConnection headers
|
|
:returns: deserialized json response
|
|
:raises DirectClientReconException: HTTP GET request failed
|
|
"""
|
|
if headers is None:
|
|
headers = {}
|
|
|
|
ip, port = get_ip_port(node, headers)
|
|
path = '/recon/%s' % recon_command
|
|
with Timeout(conn_timeout):
|
|
conn = http_connect_raw(ip, port, 'GET', path,
|
|
headers=gen_headers(headers))
|
|
with Timeout(response_timeout):
|
|
resp = conn.getresponse()
|
|
if not is_success(resp.status):
|
|
raise DirectClientReconException('GET', node, path, resp)
|
|
return json.loads(resp.read())
|