Extend direct_client

Rework header handling and add some methods needed by the reconciler.

 * response headers are case insensitive HeaderKeyDicts
 * add direct client container obj put and delete
 * add headers param to direct head object
 * add headers to DirectClientException

DirectClientException is a subclass of ClientException with a convience
constructor.  ClientException now supports an http_headers kwarg.

Exceptions raised from direct_client will include headers.

DocImpact
Implements: blueprint storage-policies
Change-Id: Ia484d569619df0bf85f973e4e916de2ac6401d5e
This commit is contained in:
Clay Gerrard 2014-05-27 15:33:55 -07:00
parent 8bec50838c
commit d495d3ec72
3 changed files with 596 additions and 307 deletions

View File

@ -39,6 +39,19 @@ except ImportError:
import json import json
class DirectClientException(ClientException):
def __init__(self, stype, method, node, part, path, resp):
full_path = quote('/%s/%s%s' % (node['device'], part, path))
msg = '%s server %s:%s direct %s %r gave status %s' % (
stype, node['ip'], node['port'], method, full_path, resp.status)
headers = HeaderKeyDict(resp.getheaders())
super(DirectClientException, self).__init__(
msg, http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason, http_headers=headers)
def _get_direct_account_container(path, stype, node, part, def _get_direct_account_container(path, stype, node, part,
account, marker=None, limit=None, account, marker=None, limit=None,
prefix=None, delimiter=None, conn_timeout=5, prefix=None, delimiter=None, conn_timeout=5,
@ -65,17 +78,10 @@ def _get_direct_account_container(path, stype, node, part,
resp = conn.getresponse() resp = conn.getresponse()
if not is_success(resp.status): if not is_success(resp.status):
resp.read() resp.read()
raise ClientException( raise DirectClientException(stype, 'GET', node, part, path, resp)
'%s server %s:%s direct GET %s gave stats %s' % resp_headers = HeaderKeyDict()
(stype, node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
resp_headers = {}
for header, value in resp.getheaders(): for header, value in resp.getheaders():
resp_headers[header.lower()] = value resp_headers[header] = value
if resp.status == HTTP_NO_CONTENT: if resp.status == HTTP_NO_CONTENT:
resp.read() resp.read()
return resp_headers, [] return resp_headers, []
@ -106,7 +112,7 @@ def direct_get_account(node, part, account, marker=None, limit=None,
:param conn_timeout: timeout in seconds for establishing the connection :param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response :param response_timeout: timeout in seconds for getting the response
:returns: a tuple of (response headers, a list of containers) The response :returns: a tuple of (response headers, a list of containers) The response
headers will be a dict and all header names will be lowercase. headers will HeaderKeyDict.
""" """
path = '/' + account path = '/' + account
return _get_direct_account_container(path, "Account", node, part, return _get_direct_account_container(path, "Account", node, part,
@ -117,6 +123,24 @@ def direct_get_account(node, part, account, marker=None, limit=None,
response_timeout=15) response_timeout=15)
def direct_delete_account(node, part, account, conn_timeout=5,
response_timeout=15, headers=None):
if headers is None:
headers = {}
path = '/%s' % account
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'DELETE', path,
headers=gen_headers(headers, True))
with Timeout(response_timeout):
resp = conn.getresponse()
resp.read()
if not is_success(resp.status):
raise DirectClientException('Account', 'DELETE',
node, part, path, resp)
def direct_head_container(node, part, account, container, conn_timeout=5, def direct_head_container(node, part, account, container, conn_timeout=5,
response_timeout=15): response_timeout=15):
""" """
@ -128,8 +152,7 @@ def direct_head_container(node, part, account, container, conn_timeout=5,
:param container: container name :param container: container name
:param conn_timeout: timeout in seconds for establishing the connection :param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response :param response_timeout: timeout in seconds for getting the response
:returns: a dict containing the response's headers (all header names will :returns: a dict containing the response's headers in a HeaderKeyDict
be lowercase)
""" """
path = '/%s/%s' % (account, container) path = '/%s/%s' % (account, container)
with Timeout(conn_timeout): with Timeout(conn_timeout):
@ -139,17 +162,11 @@ def direct_head_container(node, part, account, container, conn_timeout=5,
resp = conn.getresponse() resp = conn.getresponse()
resp.read() resp.read()
if not is_success(resp.status): if not is_success(resp.status):
raise ClientException( raise DirectClientException('Container', 'HEAD',
'Container server %s:%s direct HEAD %s gave status %s' % node, part, path, resp)
(node['ip'], node['port'], resp_headers = HeaderKeyDict()
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
resp_headers = {}
for header, value in resp.getheaders(): for header, value in resp.getheaders():
resp_headers[header.lower()] = value resp_headers[header] = value
return resp_headers return resp_headers
@ -170,7 +187,7 @@ def direct_get_container(node, part, account, container, marker=None,
:param conn_timeout: timeout in seconds for establishing the connection :param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response :param response_timeout: timeout in seconds for getting the response
:returns: a tuple of (response headers, a list of objects) The response :returns: a tuple of (response headers, a list of objects) The response
headers will be a dict and all header names will be lowercase. headers will be a HeaderKeyDict.
""" """
path = '/%s/%s' % (account, container) path = '/%s/%s' % (account, container)
return _get_direct_account_container(path, "Container", node, return _get_direct_account_container(path, "Container", node,
@ -195,17 +212,56 @@ def direct_delete_container(node, part, account, container, conn_timeout=5,
resp = conn.getresponse() resp = conn.getresponse()
resp.read() resp.read()
if not is_success(resp.status): if not is_success(resp.status):
raise ClientException( raise DirectClientException('Container', 'DELETE',
'Container server %s:%s direct DELETE %s gave status %s' % node, part, path, resp)
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)), resp.status),
http_host=node['ip'], http_port=node['port'], def direct_put_container_object(node, part, account, container, obj,
http_device=node['device'], http_status=resp.status, conn_timeout=5, response_timeout=15,
http_reason=resp.reason) headers=None):
if headers is None:
headers = {}
have_x_timestamp = 'x-timestamp' in (k.lower() for k in headers)
path = '/%s/%s/%s' % (account, container, obj)
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'PUT', path,
headers=gen_headers(headers,
add_ts=(not have_x_timestamp)))
with Timeout(response_timeout):
resp = conn.getresponse()
resp.read()
if not is_success(resp.status):
raise DirectClientException('Container', 'PUT',
node, part, path, resp)
def direct_delete_container_object(node, part, account, container, obj,
conn_timeout=5, response_timeout=15,
headers=None):
if headers is None:
headers = {}
headers = gen_headers(headers, add_ts='x-timestamp' not in (
k.lower() for k in headers))
path = '/%s/%s/%s' % (account, container, obj)
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'DELETE', path, headers=headers)
with Timeout(response_timeout):
resp = conn.getresponse()
resp.read()
if not is_success(resp.status):
raise DirectClientException('Container', 'DELETE',
node, part, path, resp)
def direct_head_object(node, part, account, container, obj, conn_timeout=5, def direct_head_object(node, part, account, container, obj, conn_timeout=5,
response_timeout=15): response_timeout=15, headers=None):
""" """
Request object information directly from the object server. Request object information directly from the object server.
@ -216,28 +272,27 @@ def direct_head_object(node, part, account, container, obj, conn_timeout=5,
:param obj: object name :param obj: object name
:param conn_timeout: timeout in seconds for establishing the connection :param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response :param response_timeout: timeout in seconds for getting the response
:returns: a dict containing the response's headers (all header names will :param headers: dict to be passed into HTTPConnection headers
be lowercase) :returns: a dict containing the response's headers in a HeaderKeyDict
""" """
if headers is None:
headers = {}
headers = gen_headers(headers)
path = '/%s/%s/%s' % (account, container, obj) path = '/%s/%s/%s' % (account, container, obj)
with Timeout(conn_timeout): with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part, conn = http_connect(node['ip'], node['port'], node['device'], part,
'HEAD', path, headers=gen_headers()) 'HEAD', path, headers=headers)
with Timeout(response_timeout): with Timeout(response_timeout):
resp = conn.getresponse() resp = conn.getresponse()
resp.read() resp.read()
if not is_success(resp.status): if not is_success(resp.status):
raise ClientException( raise DirectClientException('Object', 'HEAD',
'Object server %s:%s direct HEAD %s gave status %s' % node, part, path, resp)
(node['ip'], node['port'], resp_headers = HeaderKeyDict()
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
resp_headers = {}
for header, value in resp.getheaders(): for header, value in resp.getheaders():
resp_headers[header.lower()] = value resp_headers[header] = value
return resp_headers return resp_headers
@ -256,7 +311,7 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5,
:param resp_chunk_size: if defined, chunk size of data to read. :param resp_chunk_size: if defined, chunk size of data to read.
:param headers: dict to be passed into HTTPConnection headers :param headers: dict to be passed into HTTPConnection headers
:returns: a tuple of (response headers, the object's contents) The response :returns: a tuple of (response headers, the object's contents) The response
headers will be a dict and all header names will be lowercase. headers will be a HeaderKeyDict.
""" """
if headers is None: if headers is None:
headers = {} headers = {}
@ -269,13 +324,8 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5,
resp = conn.getresponse() resp = conn.getresponse()
if not is_success(resp.status): if not is_success(resp.status):
resp.read() resp.read()
raise ClientException( raise DirectClientException('Object', 'GET',
'Object server %s:%s direct GET %s gave status %s' % node, part, path, resp)
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)), resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
if resp_chunk_size: if resp_chunk_size:
def _object_body(): def _object_body():
@ -286,9 +336,9 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5,
object_body = _object_body() object_body = _object_body()
else: else:
object_body = resp.read() object_body = resp.read()
resp_headers = {} resp_headers = HeaderKeyDict()
for header, value in resp.getheaders(): for header, value in resp.getheaders():
resp_headers[header.lower()] = value resp_headers[header] = value
return resp_headers, object_body return resp_headers, object_body
@ -368,14 +418,8 @@ def direct_put_object(node, part, account, container, name, contents,
resp = conn.getresponse() resp = conn.getresponse()
resp.read() resp.read()
if not is_success(resp.status): if not is_success(resp.status):
raise ClientException( raise DirectClientException('Object', 'PUT',
'Object server %s:%s direct PUT %s gave status %s' % node, part, path, resp)
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
return resp.getheader('etag').strip('"') return resp.getheader('etag').strip('"')
@ -402,14 +446,8 @@ def direct_post_object(node, part, account, container, name, headers,
resp = conn.getresponse() resp = conn.getresponse()
resp.read() resp.read()
if not is_success(resp.status): if not is_success(resp.status):
raise ClientException( raise DirectClientException('Object', 'POST',
'Object server %s:%s direct POST %s gave status %s' % node, part, path, resp)
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
def direct_delete_object(node, part, account, container, obj, def direct_delete_object(node, part, account, container, obj,
@ -429,22 +467,19 @@ def direct_delete_object(node, part, account, container, obj,
if headers is None: if headers is None:
headers = {} headers = {}
headers = gen_headers(headers, add_ts='x-timestamp' not in (
k.lower() for k in headers))
path = '/%s/%s/%s' % (account, container, obj) path = '/%s/%s/%s' % (account, container, obj)
with Timeout(conn_timeout): with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part, conn = http_connect(node['ip'], node['port'], node['device'], part,
'DELETE', path, headers=gen_headers(headers, True)) 'DELETE', path, headers=headers)
with Timeout(response_timeout): with Timeout(response_timeout):
resp = conn.getresponse() resp = conn.getresponse()
resp.read() resp.read()
if not is_success(resp.status): if not is_success(resp.status):
raise ClientException( raise DirectClientException('Object', 'DELETE',
'Object server %s:%s direct DELETE %s gave status %s' % node, part, path, resp)
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
def retry(func, *args, **kwargs): def retry(func, *args, **kwargs):

View File

@ -139,7 +139,7 @@ class ClientException(Exception):
def __init__(self, msg, http_scheme='', http_host='', http_port='', def __init__(self, msg, http_scheme='', http_host='', http_port='',
http_path='', http_query='', http_status=0, http_reason='', http_path='', http_query='', http_status=0, http_reason='',
http_device='', http_response_content=''): http_device='', http_response_content='', http_headers=None):
Exception.__init__(self, msg) Exception.__init__(self, msg)
self.msg = msg self.msg = msg
self.http_scheme = http_scheme self.http_scheme = http_scheme
@ -151,6 +151,7 @@ class ClientException(Exception):
self.http_reason = http_reason self.http_reason = http_reason
self.http_device = http_device self.http_device = http_device
self.http_response_content = http_response_content self.http_response_content = http_response_content
self.http_headers = http_headers or {}
def __str__(self): def __str__(self):
a = self.msg a = self.msg

View File

@ -15,325 +15,578 @@
import unittest import unittest
import os import os
import urllib
from contextlib import contextmanager
import StringIO import StringIO
from hashlib import md5 from hashlib import md5
import time
import mock
from swift.common import direct_client from swift.common import direct_client
from swift.common.exceptions import ClientException from swift.common.exceptions import ClientException
from swift.common.utils import json from swift.common.utils import json, normalize_timestamp
from swift.common.swob import HeaderKeyDict, RESPONSE_REASONS
from swift.common.storage_policy import POLICY_INDEX, POLICIES
from test.unit import patch_policies
def mock_http_connect(status, fake_headers=None, body=None): class FakeConn(object):
class FakeConn(object): def __init__(self, status, headers=None, body='', **kwargs):
self.status = status
def __init__(self, status, fake_headers, body, *args, **kwargs): try:
self.status = status self.reason = RESPONSE_REASONS[self.status][0]
except Exception:
self.reason = 'Fake' self.reason = 'Fake'
self.body = body self.body = body
self.host = args[0] self.resp_headers = HeaderKeyDict()
self.port = args[1] if headers:
self.method = args[4] self.resp_headers.update(headers)
self.path = args[5] self.with_exc = False
self.with_exc = False self.etag = None
self.headers = kwargs.get('headers', {})
self.fake_headers = fake_headers def _update_raw_call_args(self, *args, **kwargs):
capture_attrs = ('host', 'port', 'method', 'path', 'req_headers',
'query_string')
for attr, value in zip(capture_attrs, args[:len(capture_attrs)]):
setattr(self, attr, value)
return self
def getresponse(self):
if self.etag:
self.resp_headers['etag'] = str(self.etag.hexdigest())
if self.with_exc:
raise Exception('test')
return self
def getheader(self, header, default=None):
return self.resp_headers.get(header, default)
def getheaders(self):
return self.resp_headers.items()
def read(self):
return self.body
def send(self, data):
if not self.etag:
self.etag = md5() self.etag = md5()
self.etag.update(data)
def getresponse(self):
if self.with_exc:
raise Exception('test')
if self.fake_headers is not None and self.method == 'POST':
self.fake_headers.append(self.headers)
return self
def getheader(self, header, default=None):
return self.headers.get(header.lower(), default)
def getheaders(self):
if self.fake_headers is not None:
for key in self.fake_headers:
self.headers.update({key: self.fake_headers[key]})
return self.headers.items()
def read(self):
return self.body
def send(self, data):
self.etag.update(data)
self.headers['etag'] = str(self.etag.hexdigest())
def close(self):
return
return lambda *args, **kwargs: FakeConn(status, fake_headers, body,
*args, **kwargs)
@contextmanager
def mocked_http_conn(*args, **kwargs):
fake_conn = FakeConn(*args, **kwargs)
mock_http_conn = lambda *args, **kwargs: \
fake_conn._update_raw_call_args(*args, **kwargs)
with mock.patch('swift.common.bufferedhttp.http_connect_raw',
new=mock_http_conn):
yield fake_conn
@patch_policies
class TestDirectClient(unittest.TestCase): class TestDirectClient(unittest.TestCase):
def setUp(self):
self.node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'}
self.part = '0'
self.account = u'\u062a account'
self.container = u'\u062a container'
self.obj = u'\u062a obj/name'
self.account_path = '/sda/0/%s' % urllib.quote(
self.account.encode('utf-8'))
self.container_path = '/sda/0/%s/%s' % tuple(
urllib.quote(p.encode('utf-8')) for p in (
self.account, self.container))
self.obj_path = '/sda/0/%s/%s/%s' % tuple(
urllib.quote(p.encode('utf-8')) for p in (
self.account, self.container, self.obj))
self.user_agent = 'direct-client %s' % os.getpid()
def test_gen_headers(self): def test_gen_headers(self):
hdrs = direct_client.gen_headers() stub_user_agent = 'direct-client %s' % os.getpid()
assert 'user-agent' in hdrs
assert hdrs['user-agent'] == 'direct-client %s' % os.getpid()
assert len(hdrs.keys()) == 1
hdrs = direct_client.gen_headers(add_ts=True) headers = direct_client.gen_headers()
assert 'user-agent' in hdrs self.assertEqual(headers['user-agent'], stub_user_agent)
assert 'x-timestamp' in hdrs self.assertEqual(1, len(headers))
assert len(hdrs.keys()) == 2
hdrs = direct_client.gen_headers(hdrs_in={'foo-bar': '47'}) now = time.time()
assert 'user-agent' in hdrs headers = direct_client.gen_headers(add_ts=True)
assert 'foo-bar' in hdrs self.assertEqual(headers['user-agent'], stub_user_agent)
assert hdrs['foo-bar'] == '47' self.assert_(now - 1 < float(headers['x-timestamp']) < now + 1)
assert len(hdrs.keys()) == 2 self.assertEqual(headers['x-timestamp'],
normalize_timestamp(float(headers['x-timestamp'])))
self.assertEqual(2, len(headers))
hdrs = direct_client.gen_headers(hdrs_in={'user-agent': '47'}) headers = direct_client.gen_headers(hdrs_in={'foo-bar': '47'})
assert 'user-agent' in hdrs self.assertEqual(headers['user-agent'], stub_user_agent)
assert hdrs['user-agent'] == 'direct-client %s' % os.getpid() self.assertEqual(headers['foo-bar'], '47')
assert len(hdrs.keys()) == 1 self.assertEqual(2, len(headers))
headers = direct_client.gen_headers(hdrs_in={'user-agent': '47'})
self.assertEqual(headers['user-agent'], stub_user_agent)
self.assertEqual(1, len(headers))
for policy in POLICIES:
for add_ts in (True, False):
now = time.time()
headers = direct_client.gen_headers(
{POLICY_INDEX: policy.idx}, add_ts=add_ts)
self.assertEqual(headers['user-agent'], stub_user_agent)
self.assertEqual(headers[POLICY_INDEX], str(policy.idx))
expected_header_count = 2
if add_ts:
expected_header_count += 1
self.assertEqual(
headers['x-timestamp'],
normalize_timestamp(float(headers['x-timestamp'])))
self.assert_(
now - 1 < float(headers['x-timestamp']) < now + 1)
self.assertEqual(expected_header_count, len(headers))
def test_direct_get_account(self): def test_direct_get_account(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'} stub_headers = HeaderKeyDict({
part = '0' 'X-Account-Container-Count': '1',
account = 'a' 'X-Account-Object-Count': '1',
'X-Account-Bytes-Used': '1',
'X-Timestamp': '1234567890',
'X-PUT-Timestamp': '1234567890'})
body = '[{"count": 1, "bytes": 20971520, "name": "c1"}]'
with mocked_http_conn(200, stub_headers, body) as conn:
resp_headers, resp = direct_client.direct_get_account(
self.node, self.part, self.account)
self.assertEqual(conn.method, 'GET')
self.assertEqual(conn.path, self.account_path)
self.assertEqual(conn.req_headers['user-agent'], self.user_agent)
self.assertEqual(resp_headers, stub_headers)
self.assertEqual(json.loads(body), resp)
def test_direct_client_exception(self):
stub_headers = {'X-Trans-Id': 'txb5f59485c578460f8be9e-0053478d09'}
body = 'a server error has occurred'
with mocked_http_conn(500, stub_headers, body):
try:
direct_client.direct_get_account(self.node, self.part,
self.account)
except ClientException as err:
pass
else:
self.fail('ClientException not raised')
self.assertEqual(err.http_status, 500)
expected_err_msg_parts = (
'Account server %s:%s' % (self.node['ip'], self.node['port']),
'GET %r' % self.account_path,
'status 500',
)
for item in expected_err_msg_parts:
self.assert_(item in str(err), '%r was not in "%s"' % (item, err))
self.assertEqual(err.http_host, self.node['ip'])
self.assertEqual(err.http_port, self.node['port'])
self.assertEqual(err.http_device, self.node['device'])
self.assertEqual(err.http_status, 500)
self.assertEqual(err.http_reason, 'Internal Error')
self.assertEqual(err.http_headers, stub_headers)
def test_direct_get_account_no_content_does_not_parse_body(self):
headers = { headers = {
'X-Account-Container-Count': '1', 'X-Account-Container-Count': '1',
'X-Account-Object-Count': '1', 'X-Account-Object-Count': '1',
'X-Account-Bytes-Used': '1', 'X-Account-Bytes-Used': '1',
'X-Timestamp': '1234567890', 'X-Timestamp': '1234567890',
'X-PUT-Timestamp': '1234567890'} 'X-PUT-Timestamp': '1234567890'}
with mocked_http_conn(204, headers) as conn:
resp_headers, resp = direct_client.direct_get_account(
self.node, self.part, self.account)
self.assertEqual(conn.method, 'GET')
self.assertEqual(conn.path, self.account_path)
body = '[{"count": 1, "bytes": 20971520, "name": "c1"}]' self.assertEqual(conn.req_headers['user-agent'], self.user_agent)
self.assertEqual(resp_headers, resp_headers)
fake_headers = {}
for header, value in headers.items():
fake_headers[header.lower()] = value
was_http_connector = direct_client.http_connect
direct_client.http_connect = mock_http_connect(200, fake_headers, body)
resp_headers, resp = direct_client.direct_get_account(node, part,
account)
fake_headers.update({'user-agent': 'direct-client %s' % os.getpid()})
self.assertEqual(fake_headers, resp_headers)
self.assertEqual(json.loads(body), resp)
direct_client.http_connect = mock_http_connect(204, fake_headers, body)
resp_headers, resp = direct_client.direct_get_account(node, part,
account)
fake_headers.update({'user-agent': 'direct-client %s' % os.getpid()})
self.assertEqual(fake_headers, resp_headers)
self.assertEqual([], resp) self.assertEqual([], resp)
direct_client.http_connect = was_http_connector def test_direct_get_account_error(self):
with mocked_http_conn(500) as conn:
try:
direct_client.direct_get_account(
self.node, self.part, self.account)
except ClientException as err:
pass
else:
self.fail('ClientException not raised')
self.assertEqual(conn.method, 'GET')
self.assertEqual(conn.path, self.account_path)
self.assertEqual(err.http_status, 500)
self.assert_('GET' in str(err))
def test_direct_delete_account(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'}
part = '0'
account = 'a'
mock_path = 'swift.common.bufferedhttp.http_connect_raw'
with mock.patch(mock_path) as fake_connect:
fake_connect.return_value.getresponse.return_value.status = 200
direct_client.direct_delete_account(node, part, account)
args, kwargs = fake_connect.call_args
method = args[2]
self.assertEqual('DELETE', method)
path = args[3]
self.assertEqual('/sda/0/a', path)
headers = args[4]
self.assert_('X-Timestamp' in headers)
def test_direct_head_container(self): def test_direct_head_container(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'} headers = HeaderKeyDict(key='value')
part = '0'
account = 'a'
container = 'c'
headers = {'key': 'value'}
was_http_connector = direct_client.http_connect with mocked_http_conn(200, headers) as conn:
direct_client.http_connect = mock_http_connect(200, headers) resp = direct_client.direct_head_container(
self.node, self.part, self.account, self.container)
self.assertEqual(conn.method, 'HEAD')
self.assertEqual(conn.path, self.container_path)
resp = direct_client.direct_head_container(node, part, account, self.assertEqual(conn.req_headers['user-agent'],
container) self.user_agent)
headers.update({'user-agent': 'direct-client %s' % os.getpid()})
self.assertEqual(headers, resp) self.assertEqual(headers, resp)
direct_client.http_connect = was_http_connector def test_direct_head_container_error(self):
headers = HeaderKeyDict(key='value')
with mocked_http_conn(503, headers) as conn:
try:
direct_client.direct_head_container(
self.node, self.part, self.account, self.container)
except ClientException as err:
pass
else:
self.fail('ClientException not raised')
# check request
self.assertEqual(conn.method, 'HEAD')
self.assertEqual(conn.path, self.container_path)
self.assertEqual(conn.req_headers['user-agent'], self.user_agent)
self.assertEqual(err.http_status, 503)
self.assertEqual(err.http_headers, headers)
self.assert_('HEAD' in str(err))
def test_direct_head_container_deleted(self):
important_timestamp = normalize_timestamp(time.time())
headers = HeaderKeyDict({'X-Backend-Important-Timestamp':
important_timestamp})
with mocked_http_conn(404, headers) as conn:
try:
direct_client.direct_head_container(
self.node, self.part, self.account, self.container)
except Exception as err:
self.assert_(isinstance(err, ClientException))
else:
self.fail('ClientException not raised')
self.assertEqual(conn.method, 'HEAD')
self.assertEqual(conn.path, self.container_path)
self.assertEqual(conn.req_headers['user-agent'], self.user_agent)
self.assertEqual(err.http_status, 404)
self.assertEqual(err.http_headers, headers)
def test_direct_get_container(self): def test_direct_get_container(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'} headers = HeaderKeyDict({'key': 'value'})
part = '0'
account = 'a'
container = 'c'
headers = {'key': 'value'}
body = '[{"hash": "8f4e3", "last_modified": "317260", "bytes": 209}]' body = '[{"hash": "8f4e3", "last_modified": "317260", "bytes": 209}]'
was_http_connector = direct_client.http_connect with mocked_http_conn(200, headers, body) as conn:
direct_client.http_connect = mock_http_connect(200, headers, body) resp_headers, resp = direct_client.direct_get_container(
self.node, self.part, self.account, self.container)
resp_headers, resp = ( self.assertEqual(conn.req_headers['user-agent'],
direct_client.direct_get_container(node, part, account, container)) 'direct-client %s' % os.getpid())
headers.update({'user-agent': 'direct-client %s' % os.getpid()})
self.assertEqual(headers, resp_headers) self.assertEqual(headers, resp_headers)
self.assertEqual(json.loads(body), resp) self.assertEqual(json.loads(body), resp)
direct_client.http_connect = mock_http_connect(204, headers, body) def test_direct_get_container_no_content_does_not_decode_body(self):
headers = {}
body = ''
with mocked_http_conn(204, headers, body) as conn:
resp_headers, resp = direct_client.direct_get_container(
self.node, self.part, self.account, self.container)
resp_headers, resp = ( self.assertEqual(conn.req_headers['user-agent'],
direct_client.direct_get_container(node, part, account, container)) 'direct-client %s' % os.getpid())
headers.update({'user-agent': 'direct-client %s' % os.getpid()})
self.assertEqual(headers, resp_headers) self.assertEqual(headers, resp_headers)
self.assertEqual([], resp) self.assertEqual([], resp)
direct_client.http_connect = was_http_connector
def test_direct_delete_container(self): def test_direct_delete_container(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'} with mocked_http_conn(200) as conn:
part = '0' direct_client.direct_delete_container(
account = 'a' self.node, self.part, self.account, self.container)
container = 'c' self.assertEqual(conn.method, 'DELETE')
self.assertEqual(conn.path, self.container_path)
was_http_connector = direct_client.http_connect def test_direct_delete_container_error(self):
direct_client.http_connect = mock_http_connect(200) with mocked_http_conn(500) as conn:
try:
direct_client.direct_delete_container(
self.node, self.part, self.account, self.container)
except ClientException as err:
pass
else:
self.fail('ClientException not raised')
direct_client.direct_delete_container(node, part, account, container) self.assertEqual(conn.method, 'DELETE')
self.assertEqual(conn.path, self.container_path)
direct_client.http_connect = was_http_connector self.assertEqual(err.http_status, 500)
self.assert_('DELETE' in str(err))
def test_direct_put_container_object(self):
headers = {'x-foo': 'bar'}
with mocked_http_conn(204) as conn:
rv = direct_client.direct_put_container_object(
self.node, self.part, self.account, self.container, self.obj,
headers=headers)
self.assertEqual(conn.method, 'PUT')
self.assertEqual(conn.path, self.obj_path)
self.assert_('x-timestamp' in conn.req_headers)
self.assertEqual('bar', conn.req_headers.get('x-foo'))
self.assertEqual(rv, None)
def test_direct_put_container_object_error(self):
with mocked_http_conn(500) as conn:
try:
direct_client.direct_put_container_object(
self.node, self.part, self.account, self.container,
self.obj)
except ClientException as err:
pass
else:
self.fail('ClientException not raised')
self.assertEqual(conn.method, 'PUT')
self.assertEqual(conn.path, self.obj_path)
self.assertEqual(err.http_status, 500)
self.assert_('PUT' in str(err))
def test_direct_delete_container_object(self):
with mocked_http_conn(204) as conn:
rv = direct_client.direct_delete_container_object(
self.node, self.part, self.account, self.container, self.obj)
self.assertEqual(conn.method, 'DELETE')
self.assertEqual(conn.path, self.obj_path)
self.assertEqual(rv, None)
def test_direct_delete_container_obj_error(self):
with mocked_http_conn(500) as conn:
try:
direct_client.direct_delete_container_object(
self.node, self.part, self.account, self.container,
self.obj)
except ClientException as err:
pass
else:
self.fail('ClientException not raised')
self.assertEqual(conn.method, 'DELETE')
self.assertEqual(conn.path, self.obj_path)
self.assertEqual(err.http_status, 500)
self.assert_('DELETE' in str(err))
def test_direct_head_object(self): def test_direct_head_object(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'} headers = HeaderKeyDict({'x-foo': 'bar'})
part = '0'
account = 'a'
container = 'c'
name = 'o'
headers = {'key': 'value'}
was_http_connector = direct_client.http_connect with mocked_http_conn(200, headers) as conn:
direct_client.http_connect = mock_http_connect(200, headers) resp = direct_client.direct_head_object(
self.node, self.part, self.account, self.container,
self.obj, headers=headers)
self.assertEqual(conn.method, 'HEAD')
self.assertEqual(conn.path, self.obj_path)
resp = direct_client.direct_head_object(node, part, account, self.assertEqual(conn.req_headers['user-agent'], self.user_agent)
container, name) self.assertEqual('bar', conn.req_headers.get('x-foo'))
headers.update({'user-agent': 'direct-client %s' % os.getpid()}) self.assert_('x-timestamp' not in conn.req_headers,
'x-timestamp was in HEAD request headers')
self.assertEqual(headers, resp) self.assertEqual(headers, resp)
direct_client.http_connect = was_http_connector def test_direct_head_object_error(self):
with mocked_http_conn(500) as conn:
try:
direct_client.direct_head_object(
self.node, self.part, self.account, self.container,
self.obj)
except ClientException as err:
pass
else:
self.fail('ClientException not raised')
self.assertEqual(conn.method, 'HEAD')
self.assertEqual(conn.path, self.obj_path)
self.assertEqual(err.http_status, 500)
self.assert_('HEAD' in str(err))
def test_direct_head_object_not_found(self):
important_timestamp = normalize_timestamp(time.time())
stub_headers = {'X-Backend-Important-Timestamp': important_timestamp}
with mocked_http_conn(404, headers=stub_headers) as conn:
try:
direct_client.direct_head_object(
self.node, self.part, self.account, self.container,
self.obj)
except ClientException as err:
pass
else:
self.fail('ClientException not raised')
self.assertEqual(conn.method, 'HEAD')
self.assertEqual(conn.path, self.obj_path)
self.assertEqual(err.http_status, 404)
self.assertEqual(err.http_headers['x-backend-important-timestamp'],
important_timestamp)
def test_direct_get_object(self): def test_direct_get_object(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'}
part = '0'
account = 'a'
container = 'c'
name = 'o'
contents = StringIO.StringIO('123456') contents = StringIO.StringIO('123456')
was_http_connector = direct_client.http_connect with mocked_http_conn(200, body=contents) as conn:
direct_client.http_connect = mock_http_connect(200, body=contents) resp_header, obj_body = direct_client.direct_get_object(
self.node, self.part, self.account, self.container, self.obj)
resp_header, obj_body = ( self.assertEqual(conn.method, 'GET')
direct_client.direct_get_object(node, part, account, container, self.assertEqual(conn.path, self.obj_path)
name))
self.assertEqual(obj_body, contents) self.assertEqual(obj_body, contents)
direct_client.http_connect = was_http_connector def test_direct_get_object_error(self):
with mocked_http_conn(500) as conn:
try:
direct_client.direct_get_object(
self.node, self.part,
self.account, self.container, self.obj)
except ClientException as err:
pass
else:
self.fail('ClientException not raised')
self.assertEqual(conn.method, 'GET')
self.assertEqual(conn.path, self.obj_path)
pass self.assertEqual(err.http_status, 500)
self.assert_('GET' in str(err))
def test_direct_post_object(self): def test_direct_post_object(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'}
part = '0'
account = 'a'
container = 'c'
name = 'o'
headers = {'Key': 'value'} headers = {'Key': 'value'}
fake_headers = [] resp_headers = []
was_http_connector = direct_client.http_connect with mocked_http_conn(200, resp_headers) as conn:
direct_client.http_connect = mock_http_connect(200, fake_headers) direct_client.direct_post_object(
self.node, self.part, self.account, self.container, self.obj,
headers)
self.assertEqual(conn.method, 'POST')
self.assertEqual(conn.path, self.obj_path)
direct_client.direct_post_object(node, part, account, for header in headers:
container, name, headers) self.assertEqual(conn.req_headers[header], headers[header])
self.assertEqual(headers['Key'], fake_headers[0].get('Key'))
direct_client.http_connect = was_http_connector def test_direct_post_object_error(self):
headers = {'Key': 'value'}
with mocked_http_conn(500) as conn:
try:
direct_client.direct_post_object(
self.node, self.part, self.account, self.container,
self.obj, headers)
except ClientException as err:
pass
else:
self.fail('ClientException not raised')
self.assertEqual(conn.method, 'POST')
self.assertEqual(conn.path, self.obj_path)
for header in headers:
self.assertEqual(conn.req_headers[header], headers[header])
self.assertEqual(conn.req_headers['user-agent'], self.user_agent)
self.assert_('x-timestamp' in conn.req_headers)
self.assertEqual(err.http_status, 500)
self.assert_('POST' in str(err))
def test_direct_delete_object(self): def test_direct_delete_object(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'} with mocked_http_conn(200) as conn:
part = '0' resp = direct_client.direct_delete_object(
account = 'a' self.node, self.part, self.account, self.container, self.obj)
container = 'c' self.assertEqual(conn.method, 'DELETE')
name = 'o' self.assertEqual(conn.path, self.obj_path)
self.assertEqual(resp, None)
was_http_connector = direct_client.http_connect def test_direct_delete_object_error(self):
direct_client.http_connect = mock_http_connect(200) with mocked_http_conn(503) as conn:
try:
direct_client.direct_delete_object(
self.node, self.part, self.account, self.container,
self.obj)
except ClientException as err:
pass
else:
self.fail('ClientException not raised')
self.assertEqual(conn.method, 'DELETE')
self.assertEqual(conn.path, self.obj_path)
self.assertEqual(err.http_status, 503)
self.assert_('DELETE' in str(err))
direct_client.direct_delete_object(node, part, account, container, def test_direct_put_object_with_content_length(self):
name)
direct_client.http_connect = was_http_connector
def test_direct_put_object(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'}
part = '0'
account = 'a'
container = 'c'
name = 'o'
contents = StringIO.StringIO('123456') contents = StringIO.StringIO('123456')
was_http_connector = direct_client.http_connect with mocked_http_conn(200) as conn:
direct_client.http_connect = mock_http_connect(200) resp = direct_client.direct_put_object(
self.node, self.part, self.account, self.container, self.obj,
resp = direct_client.direct_put_object(node, part, account, contents, 6)
container, name, contents, 6) self.assertEqual(conn.method, 'PUT')
self.assertEqual(conn.path, self.obj_path)
self.assertEqual(md5('123456').hexdigest(), resp) self.assertEqual(md5('123456').hexdigest(), resp)
direct_client.http_connect = was_http_connector
def test_direct_put_object_fail(self): def test_direct_put_object_fail(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'}
part = '0'
account = 'a'
container = 'c'
name = 'o'
contents = StringIO.StringIO('123456') contents = StringIO.StringIO('123456')
was_http_connector = direct_client.http_connect with mocked_http_conn(500) as conn:
direct_client.http_connect = mock_http_connect(500) try:
direct_client.direct_put_object(
self.assertRaises(ClientException, direct_client.direct_put_object, self.node, self.part, self.account, self.container,
node, part, account, container, name, contents) self.obj, contents)
except ClientException as err:
direct_client.http_connect = was_http_connector pass
else:
self.fail('ClientException not raised')
self.assertEqual(conn.method, 'PUT')
self.assertEqual(conn.path, self.obj_path)
self.assertEqual(err.http_status, 500)
def test_direct_put_object_chunked(self): def test_direct_put_object_chunked(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'}
part = '0'
account = 'a'
container = 'c'
name = 'o'
contents = StringIO.StringIO('123456') contents = StringIO.StringIO('123456')
was_http_connector = direct_client.http_connect with mocked_http_conn(200) as conn:
direct_client.http_connect = mock_http_connect(200) resp = direct_client.direct_put_object(
self.node, self.part, self.account, self.container, self.obj,
resp = direct_client.direct_put_object(node, part, account, contents)
container, name, contents) self.assertEqual(conn.method, 'PUT')
self.assertEqual(conn.path, self.obj_path)
self.assertEqual(md5('6\r\n123456\r\n0\r\n\r\n').hexdigest(), resp) self.assertEqual(md5('6\r\n123456\r\n0\r\n\r\n').hexdigest(), resp)
direct_client.http_connect = was_http_connector
def test_retry(self): def test_retry(self):
node = {'ip': '1.2.3.4', 'port': '6000', 'device': 'sda'} headers = HeaderKeyDict({'key': 'value'})
part = '0'
account = 'a'
container = 'c'
name = 'o'
headers = {'key': 'value'}
was_http_connector = direct_client.http_connect with mocked_http_conn(200, headers) as conn:
direct_client.http_connect = mock_http_connect(200, headers) attempts, resp = direct_client.retry(
direct_client.direct_head_object, self.node, self.part,
attempts, resp = direct_client.retry(direct_client.direct_head_object, self.account, self.container, self.obj)
node, part, account, container, self.assertEqual(conn.method, 'HEAD')
name) self.assertEqual(conn.path, self.obj_path)
headers.update({'user-agent': 'direct-client %s' % os.getpid()}) self.assertEqual(conn.req_headers['user-agent'], self.user_agent)
self.assertEqual(headers, resp) self.assertEqual(headers, resp)
self.assertEqual(attempts, 1) self.assertEqual(attempts, 1)
direct_client.http_connect = was_http_connector
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()