Merge "Container-Sync to perform HEAD before PUT object on remote"

This commit is contained in:
Jenkins 2016-03-23 20:44:37 +00:00 committed by Gerrit Code Review
commit deded2e7a1
5 changed files with 326 additions and 90 deletions

View File

@ -769,6 +769,7 @@ class SimpleClient(object):
req.get_method = lambda: method
conn = urllib2.urlopen(req, timeout=timeout)
body = conn.read()
info = conn.info()
try:
body_data = json.loads(body)
except ValueError:
@ -792,13 +793,13 @@ class SimpleClient(object):
url,
conn.getcode(),
sent_content_length,
conn.info()['content-length'],
info['content-length'],
trans_start,
trans_stop,
trans_stop - trans_start,
additional_info
)))
return [None, body_data]
return [info, body_data]
def retry_request(self, method, **kwargs):
retries = kwargs.pop('retries', self.retries)
@ -837,6 +838,12 @@ class SimpleClient(object):
contents=contents.read(), **kwargs)
def head_object(url, **kwargs):
"""For usage with container sync """
client = SimpleClient(url=url)
return client.retry_request('HEAD', **kwargs)
def put_object(url, **kwargs):
"""For usage with container sync """
client = SimpleClient(url=url)

View File

@ -29,7 +29,8 @@ from swift.container.backend import ContainerBroker
from swift.container.sync_store import ContainerSyncStore
from swift.common.container_sync_realms import ContainerSyncRealms
from swift.common.internal_client import (
delete_object, put_object, InternalClient, UnexpectedResponse)
delete_object, put_object, head_object,
InternalClient, UnexpectedResponse)
from swift.common.exceptions import ClientException
from swift.common.ring import Ring
from swift.common.ring.utils import is_local_device
@ -389,10 +390,84 @@ class ContainerSync(Daemon):
self.logger.exception(_('ERROR Syncing %s'),
broker if broker else path)
def _update_sync_to_headers(self, name, sync_to, user_key,
realm, realm_key, method, headers):
"""
Updates container sync headers
:param name: The name of the object
:param sync_to: The URL to the remote container.
:param user_key: The X-Container-Sync-Key to use when sending requests
to the other container.
:param realm: The realm from self.realms_conf, if there is one.
If None, fallback to using the older allowed_sync_hosts
way of syncing.
:param realm_key: The realm key from self.realms_conf, if there
is one. If None, fallback to using the older
allowed_sync_hosts way of syncing.
:param method: HTTP method to create sig with
:param headers: headers to update with container sync headers
"""
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(name)
sig = self.realms_conf.get_sig(method, path,
headers.get('x-timestamp', 0),
nonce, realm_key,
user_key)
headers['x-container-sync-auth'] = '%s %s %s' % (realm,
nonce,
sig)
else:
headers['x-container-sync-key'] = user_key
def _object_in_remote_container(self, name, sync_to, user_key,
realm, realm_key, timestamp):
"""
Performs head object on remote to eliminate extra remote put and
local get object calls
:param name: The name of the object in the updated row in the local
database triggering the sync update.
:param sync_to: The URL to the remote container.
:param user_key: The X-Container-Sync-Key to use when sending requests
to the other container.
:param realm: The realm from self.realms_conf, if there is one.
If None, fallback to using the older allowed_sync_hosts
way of syncing.
:param realm_key: The realm key from self.realms_conf, if there
is one. If None, fallback to using the older
allowed_sync_hosts way of syncing.
:param timestamp: last modified date of local object
:returns: True if object already exists in remote
"""
headers = {'x-timestamp': timestamp.internal}
self._update_sync_to_headers(name, sync_to, user_key, realm,
realm_key, 'HEAD', headers)
try:
metadata, _ = head_object(sync_to, name=name,
headers=headers,
proxy=self.select_http_proxy(),
logger=self.logger,
retries=0)
remote_ts = Timestamp(metadata.get('x-timestamp', 0))
self.logger.debug("remote obj timestamp %s local obj %s" %
(timestamp.internal, remote_ts.internal))
if timestamp <= remote_ts:
return True
# Object in remote should be updated
return False
except ClientException as http_err:
# Object not in remote
if http_err.http_status == 404:
return False
raise http_err
def container_sync_row(self, row, sync_to, user_key, broker, info,
realm, realm_key):
"""
Sends the update the row indicates to the sync_to container.
Update can be either delete or put.
:param row: The updated row in the local database triggering the sync
update.
@ -420,17 +495,9 @@ class ContainerSync(Daemon):
# timestamp of the source tombstone
try:
headers = {'x-timestamp': ts_data.internal}
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(
row['name'])
sig = self.realms_conf.get_sig(
'DELETE', path, headers['x-timestamp'], nonce,
realm_key, user_key)
headers['x-container-sync-auth'] = '%s %s %s' % (
realm, nonce, sig)
else:
headers['x-container-sync-key'] = user_key
self._update_sync_to_headers(row['name'], sync_to,
user_key, realm, realm_key,
'DELETE', headers)
delete_object(sync_to, name=row['name'], headers=headers,
proxy=self.select_http_proxy(),
logger=self.logger,
@ -444,6 +511,10 @@ class ContainerSync(Daemon):
else:
# when sync'ing a live object, use ts_meta - this is the time
# at which the source object was last modified by a PUT or POST
if self._object_in_remote_container(row['name'],
sync_to, user_key, realm,
realm_key, ts_meta):
return True
exc = None
# look up for the newest one
headers_out = {'X-Newest': True,
@ -478,16 +549,8 @@ class ContainerSync(Daemon):
if 'content-type' in headers:
headers['content-type'] = clean_content_type(
headers['content-type'])
if realm and realm_key:
nonce = uuid.uuid4().hex
path = urlparse(sync_to).path + '/' + quote(row['name'])
sig = self.realms_conf.get_sig(
'PUT', path, headers['x-timestamp'], nonce, realm_key,
user_key)
headers['x-container-sync-auth'] = '%s %s %s' % (
realm, nonce, sig)
else:
headers['x-container-sync-key'] = user_key
self._update_sync_to_headers(row['name'], sync_to, user_key,
realm, realm_key, 'PUT', headers)
put_object(sync_to, name=row['name'], headers=headers,
contents=FileLikeIter(body),
proxy=self.select_http_proxy(), logger=self.logger,

View File

@ -266,6 +266,26 @@ class TestContainerSync(ReplProbeTest):
% item) for item in mismatched_headers])
self.fail(msg)
def test_sync_newer_remote(self):
source_container, dest_container = self._setup_synced_containers()
# upload to source
object_name = 'object-%s' % uuid.uuid4()
client.put_object(self.url, self.token, source_container, object_name,
'old-source-body')
# upload to dest with same name
client.put_object(self.url, self.token, dest_container, object_name,
'new-test-body')
# cycle container-sync
Manager(['container-sync']).once()
# verify that the remote object did not change
resp_headers, body = client.get_object(self.url, self.token,
dest_container, object_name)
self.assertEqual(body, 'new-test-body')
if __name__ == "__main__":
unittest.main()

View File

@ -343,6 +343,9 @@ class TestInternalClient(unittest.TestCase):
def read(self):
return json.dumps(body)
def info(self):
return {}
for timeout in (0.0, 42.0, None):
mocked_func = 'swift.common.internal_client.urllib2.urlopen'
with mock.patch(mocked_func) as mock_urlopen:
@ -1181,76 +1184,84 @@ class TestGetAuth(unittest.TestCase):
'http://127.0.0.1', 'user', 'key', auth_version=2.0)
mock_time_value = 1401224049.98
def mock_time():
global mock_time_value
mock_time_value += 1
return mock_time_value
class TestSimpleClient(unittest.TestCase):
def _test_get_head(self, request, urlopen, method):
mock_time_value = [1401224049.98]
def mock_time():
# global mock_time_value
mock_time_value[0] += 1
return mock_time_value[0]
with mock.patch('swift.common.internal_client.time', mock_time):
# basic request, only url as kwarg
request.return_value.get_type.return_value = "http"
urlopen.return_value.read.return_value = ''
urlopen.return_value.getcode.return_value = 200
urlopen.return_value.info.return_value = {'content-length': '345'}
sc = internal_client.SimpleClient(url='http://127.0.0.1')
logger = FakeLogger()
retval = sc.retry_request(
method, headers={'content-length': '123'}, logger=logger)
self.assertEqual(urlopen.call_count, 1)
request.assert_called_with('http://127.0.0.1?format=json',
headers={'content-length': '123'},
data=None)
self.assertEqual([{'content-length': '345'}, None], retval)
self.assertEqual(method, request.return_value.get_method())
self.assertEqual(logger.log_dict['debug'], [(
('-> 2014-05-27T20:54:11 ' + method +
' http://127.0.0.1%3Fformat%3Djson 200 '
'123 345 1401224050.98 1401224051.98 1.0 -',), {})])
# Check if JSON is decoded
urlopen.return_value.read.return_value = '{}'
retval = sc.retry_request(method)
self.assertEqual([{'content-length': '345'}, {}], retval)
# same as above, now with token
sc = internal_client.SimpleClient(url='http://127.0.0.1',
token='token')
retval = sc.retry_request(method)
request.assert_called_with('http://127.0.0.1?format=json',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([{'content-length': '345'}, {}], retval)
# same as above, now with prefix
sc = internal_client.SimpleClient(url='http://127.0.0.1',
token='token')
retval = sc.retry_request(method, prefix="pre_")
request.assert_called_with(
'http://127.0.0.1?format=json&prefix=pre_',
headers={'X-Auth-Token': 'token'}, data=None)
self.assertEqual([{'content-length': '345'}, {}], retval)
# same as above, now with container name
retval = sc.retry_request(method, container='cont')
request.assert_called_with('http://127.0.0.1/cont?format=json',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([{'content-length': '345'}, {}], retval)
# same as above, now with object name
retval = sc.retry_request(method, container='cont', name='obj')
request.assert_called_with('http://127.0.0.1/cont/obj',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([{'content-length': '345'}, {}], retval)
@mock.patch('eventlet.green.urllib2.urlopen')
@mock.patch('eventlet.green.urllib2.Request')
@mock.patch('swift.common.internal_client.time', mock_time)
def test_get(self, request, urlopen):
# basic GET request, only url as kwarg
request.return_value.get_type.return_value = "http"
urlopen.return_value.read.return_value = ''
urlopen.return_value.getcode.return_value = 200
urlopen.return_value.info.return_value = {'content-length': '345'}
sc = internal_client.SimpleClient(url='http://127.0.0.1')
logger = FakeLogger()
retval = sc.retry_request(
'GET', headers={'content-length': '123'}, logger=logger)
self.assertEqual(urlopen.call_count, 1)
request.assert_called_with('http://127.0.0.1?format=json',
headers={'content-length': '123'},
data=None)
self.assertEqual([None, None], retval)
self.assertEqual('GET', request.return_value.get_method())
self.assertEqual(logger.log_dict['debug'], [(
('-> 2014-05-27T20:54:11 GET http://127.0.0.1%3Fformat%3Djson 200 '
'123 345 1401224050.98 1401224051.98 1.0 -',), {})])
self._test_get_head(request, urlopen, 'GET')
# Check if JSON is decoded
urlopen.return_value.read.return_value = '{}'
retval = sc.retry_request('GET')
self.assertEqual([None, {}], retval)
# same as above, now with token
sc = internal_client.SimpleClient(url='http://127.0.0.1',
token='token')
retval = sc.retry_request('GET')
request.assert_called_with('http://127.0.0.1?format=json',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([None, {}], retval)
# same as above, now with prefix
sc = internal_client.SimpleClient(url='http://127.0.0.1',
token='token')
retval = sc.retry_request('GET', prefix="pre_")
request.assert_called_with('http://127.0.0.1?format=json&prefix=pre_',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([None, {}], retval)
# same as above, now with container name
retval = sc.retry_request('GET', container='cont')
request.assert_called_with('http://127.0.0.1/cont?format=json',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([None, {}], retval)
# same as above, now with object name
retval = sc.retry_request('GET', container='cont', name='obj')
request.assert_called_with('http://127.0.0.1/cont/obj',
headers={'X-Auth-Token': 'token'},
data=None)
self.assertEqual([None, {}], retval)
@mock.patch('eventlet.green.urllib2.urlopen')
@mock.patch('eventlet.green.urllib2.Request')
def test_head(self, request, urlopen):
self._test_get_head(request, urlopen, 'HEAD')
@mock.patch('eventlet.green.urllib2.urlopen')
@mock.patch('eventlet.green.urllib2.Request')
@ -1272,6 +1283,7 @@ class TestSimpleClient(unittest.TestCase):
request.return_value.get_type.return_value = "http"
mock_resp = mock.MagicMock()
mock_resp.read.return_value = ''
mock_resp.info.return_value = {}
urlopen.side_effect = [urllib2.URLError(''), mock_resp]
sc = internal_client.SimpleClient(url='http://127.0.0.1', retries=1,
token='token')
@ -1283,13 +1295,14 @@ class TestSimpleClient(unittest.TestCase):
self.assertEqual(urlopen.call_count, 2)
request.assert_called_with('http://127.0.0.1?format=json', data=None,
headers={'X-Auth-Token': 'token'})
self.assertEqual([None, None], retval)
self.assertEqual([{}, None], retval)
self.assertEqual(sc.attempts, 2)
@mock.patch('eventlet.green.urllib2.urlopen')
def test_get_with_retries_param(self, mock_urlopen):
mock_response = mock.MagicMock()
mock_response.read.return_value = ''
mock_response.info.return_value = {}
mock_urlopen.side_effect = internal_client.httplib.BadStatusLine('')
c = internal_client.SimpleClient(url='http://127.0.0.1', token='token')
self.assertEqual(c.retries, 5)
@ -1315,7 +1328,7 @@ class TestSimpleClient(unittest.TestCase):
retval = c.retry_request('GET', retries=1)
self.assertEqual(mock_sleep.call_count, 1)
self.assertEqual(mock_urlopen.call_count, 2)
self.assertEqual([None, None], retval)
self.assertEqual([{}, None], retval)
@mock.patch('eventlet.green.urllib2.urlopen')
def test_request_with_retries_with_HTTPError(self, mock_urlopen):
@ -1380,9 +1393,13 @@ class TestSimpleClient(unittest.TestCase):
url = 'https://127.0.0.1:1/a'
class FakeConn(object):
def read(self):
return 'irrelevant'
def info(self):
return {}
mocked = 'swift.common.internal_client.urllib2.urlopen'
# module level methods

View File

@ -855,6 +855,8 @@ class TestContainerSync(unittest.TestCase):
def _test_container_sync_row_put(self, realm, realm_key):
orig_uuid = sync.uuid
orig_put_object = sync.put_object
orig_head_object = sync.head_object
try:
class FakeUUID(object):
class uuid4(object):
@ -891,6 +893,7 @@ class TestContainerSync(unittest.TestCase):
sync.put_object = fake_put_object
expected_put_count = 0
excepted_failure_count = 0
with mock.patch('swift.container.sync.InternalClient'):
cs = sync.ContainerSync({}, container_ring=FakeRing(),
@ -911,6 +914,14 @@ class TestContainerSync(unittest.TestCase):
# Success as everything says it worked.
# simulate a row with data at 1.1 and later ctype, meta times
created_at = ts_data.internal + '+1388+1388' # last modified = 1.2
def fake_object_in_rcontainer(row, sync_to, user_key,
broker, realm, realm_key):
return False
orig_object_in_rcontainer = cs._object_in_remote_container
cs._object_in_remote_container = fake_object_in_rcontainer
self.assertTrue(cs.container_sync_row(
{'deleted': False,
'name': 'object',
@ -935,6 +946,7 @@ class TestContainerSync(unittest.TestCase):
iter('contents'))
cs.swift.get_object = fake_get_object
# Success as everything says it worked, also checks 'date' and
# 'last-modified' headers are removed and that 'etag' header is
# stripped of double quotes.
@ -980,6 +992,7 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(len(exc), 1)
self.assertEqual(str(exc[-1]), 'test exception')
@ -1003,6 +1016,7 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(len(exc), 1)
self.assertEqual(str(exc[-1]), 'test client exception')
@ -1029,6 +1043,8 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
self.assertLogMessage('info', 'Unauth')
def fake_put_object(*args, **kwargs):
@ -1044,6 +1060,8 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
self.assertLogMessage('info', 'Not found', 1)
def fake_put_object(*args, **kwargs):
@ -1059,10 +1077,121 @@ class TestContainerSync(unittest.TestCase):
{'account': 'a', 'container': 'c', 'storage_policy_index': 0},
realm, realm_key))
self.assertEqual(cs.container_puts, expected_put_count)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
self.assertLogMessage('error', 'ERROR Syncing')
# Test the following cases:
# remote has the same date and a put doesn't take place
# remote has more up to date copy and a put doesn't take place
# head_object returns ClientException(404) and a put takes place
# head_object returns other ClientException put doesn't take place
# and we get failure
# head_object returns other Exception put does not take place
# and we get failure
# remote returns old copy and a put takes place
test_row = {'deleted': False,
'name': 'object',
'created_at': timestamp.internal,
'etag': '1111'}
test_info = {'account': 'a',
'container': 'c',
'storage_policy_index': 0}
actual_puts = []
def fake_put_object(*args, **kwargs):
actual_puts.append((args, kwargs))
def fake_head_object(*args, **kwargs):
return ({'x-timestamp': '1.2'}, '')
sync.put_object = fake_put_object
sync.head_object = fake_head_object
cs._object_in_remote_container = orig_object_in_rcontainer
self.assertTrue(cs.container_sync_row(
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info,
realm, realm_key))
# No additional put has taken place
self.assertEqual(len(actual_puts), 0)
# No additional errors
self.assertEqual(cs.container_failures, excepted_failure_count)
def fake_head_object(*args, **kwargs):
return ({'x-timestamp': '1.3'}, '')
sync.head_object = fake_head_object
self.assertTrue(cs.container_sync_row(
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info,
realm, realm_key))
# No additional put has taken place
self.assertEqual(len(actual_puts), 0)
# No additional errors
self.assertEqual(cs.container_failures, excepted_failure_count)
actual_puts = []
def fake_head_object(*args, **kwargs):
raise ClientException('test client exception', http_status=404)
sync.head_object = fake_head_object
self.assertTrue(cs.container_sync_row(
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info, realm, realm_key))
# Additional put has taken place
self.assertEqual(len(actual_puts), 1)
# No additional errors
self.assertEqual(cs.container_failures, excepted_failure_count)
def fake_head_object(*args, **kwargs):
raise ClientException('test client exception', http_status=401)
sync.head_object = fake_head_object
self.assertFalse(cs.container_sync_row(
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info, realm, realm_key))
# No additional put has taken place, failures increased
self.assertEqual(len(actual_puts), 1)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
def fake_head_object(*args, **kwargs):
raise Exception()
sync.head_object = fake_head_object
self.assertFalse(cs.container_sync_row(
test_row,
'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info, realm, realm_key))
# No additional put has taken place, failures increased
self.assertEqual(len(actual_puts), 1)
excepted_failure_count += 1
self.assertEqual(cs.container_failures, excepted_failure_count)
def fake_head_object(*args, **kwargs):
return ({'x-timestamp': '1.1'}, '')
sync.head_object = fake_head_object
self.assertTrue(cs.container_sync_row(
test_row, 'http://sync/to/path',
'key', FakeContainerBroker('broker'),
test_info, realm, realm_key))
# Additional put has taken place
self.assertEqual(len(actual_puts), 2)
# No additional errors
self.assertEqual(cs.container_failures, excepted_failure_count)
finally:
sync.uuid = orig_uuid
sync.put_object = orig_put_object
sync.head_object = orig_head_object
def test_select_http_proxy_None(self):