From 1dceafa7d5999ad7366b2df49887954bb5695992 Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Fri, 8 Jan 2021 20:23:37 +0000 Subject: [PATCH] ssync: sync non-durable fragments from handoffs Previously, ssync would not sync nor cleanup non-durable data fragments on handoffs. When the reconstructor is syncing objects from a handoff node (a 'revert' reconstructor job) it may be useful, and is not harmful, to also send non-durable fragments if the receiver has older or no fragment data. Several changes are made to enable this. On the sending side: - For handoff (revert) jobs, the reconstructor instantiates SsyncSender with a new 'include_non_durable' option. - If configured with the include_non_durable option, the SsyncSender calls the diskfile yield_hashes function with options that allow non-durable fragments to be yielded. - The diskfile yield_hashes function is enhanced to include a 'durable' flag in the data structure yielded for each object. - The SsyncSender includes the 'durable' flag in the metadata sent during the missing_check exchange with the receiver. - If the receiver requests the non-durable object, the SsyncSender includes a new 'X-Backend-No-Commit' header when sending the PUT subrequest for the object. - The SsyncSender includes the non-durable object in the collection of synced objects returned to the reconstructor so that the non-durable fragment is removed from the handoff node. On the receiving side: - The object server includes a new 'X-Backend-Accept-No-Commit' header in its response to SSYNC requests. This indicates to the sender that the receiver has been upgraded to understand the 'X-Backend-No-Commit' header. - The SsyncReceiver is enhanced to consider non-durable data when determining if the sender's data is wanted or not. - The object server PUT method is enhanced to check for and 'X-Backend-No-Commit' header before committing a diskfile. If a handoff sender has both a durable and newer non-durable fragment for the same object and frag-index, only the newer non-durable fragment will be synced and removed on the first reconstructor pass. The durable fragment will be synced and removed on the next reconstructor pass. Change-Id: I1d47b865e0a621f35d323bbed472a6cfd2a5971b Closes-Bug: 1778002 --- swift/obj/diskfile.py | 31 ++- swift/obj/reconstructor.py | 5 +- swift/obj/server.py | 13 +- swift/obj/ssync_receiver.py | 68 ++++-- swift/obj/ssync_sender.py | 60 +++-- test/probe/common.py | 30 ++- test/probe/test_reconstructor_revert.py | 131 +++++++++++ test/unit/obj/common.py | 11 +- test/unit/obj/test_diskfile.py | 229 ++++++++++++++----- test/unit/obj/test_reconstructor.py | 68 ++++-- test/unit/obj/test_server.py | 60 ++++- test/unit/obj/test_ssync.py | 103 ++++++++- test/unit/obj/test_ssync_receiver.py | 288 +++++++++++++++++++++++- test/unit/obj/test_ssync_sender.py | 267 +++++++++++++++++++++- 14 files changed, 1195 insertions(+), 169 deletions(-) diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 893f1c5047..6c4fcd8a32 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -1590,6 +1590,7 @@ class BaseDiskFileManager(object): - ts_meta -> timestamp of meta file, if one exists - ts_ctype -> timestamp of meta file containing most recent content-type value, if one exists + - durable -> True if data file at ts_data is durable, False otherwise where timestamps are instances of :class:`~swift.common.utils.Timestamp` @@ -1611,11 +1612,15 @@ class BaseDiskFileManager(object): (os.path.join(partition_path, suffix), suffix) for suffix in suffixes) - key_preference = ( + # define keys that we need to extract the result from the on disk info + # data: + # (x, y, z) -> result[x] should take the value of y[z] + key_map = ( ('ts_meta', 'meta_info', 'timestamp'), ('ts_data', 'data_info', 'timestamp'), ('ts_data', 'ts_info', 'timestamp'), ('ts_ctype', 'ctype_info', 'ctype_timestamp'), + ('durable', 'data_info', 'durable'), ) # cleanup_ondisk_files() will remove empty hash dirs, and we'll @@ -1626,21 +1631,24 @@ class BaseDiskFileManager(object): for object_hash in self._listdir(suffix_path): object_path = os.path.join(suffix_path, object_hash) try: - results = self.cleanup_ondisk_files( + diskfile_info = self.cleanup_ondisk_files( object_path, **kwargs) - if results['files']: + if diskfile_info['files']: found_files = True - timestamps = {} - for ts_key, info_key, info_ts_key in key_preference: - if info_key not in results: + result = {} + for result_key, diskfile_info_key, info_key in key_map: + if diskfile_info_key not in diskfile_info: continue - timestamps[ts_key] = results[info_key][info_ts_key] - if 'ts_data' not in timestamps: + info = diskfile_info[diskfile_info_key] + if info_key in info: + # durable key not returned from replicated Diskfile + result[result_key] = info[info_key] + if 'ts_data' not in result: # file sets that do not include a .data or .ts # file cannot be opened and therefore cannot # be ssync'd continue - yield (object_hash, timestamps) + yield object_hash, result except AssertionError as err: self.logger.debug('Invalid file set in %s (%s)' % ( object_path, err)) @@ -3489,6 +3497,11 @@ class ECDiskFileManager(BaseDiskFileManager): break if durable_info and durable_info['timestamp'] == timestamp: durable_frag_set = frag_set + # a data frag filename may not have the #d part if durability + # is defined by a legacy .durable, so always mark all data + # frags as durable here + for frag in frag_set: + frag['durable'] = True break # ignore frags that are older than durable timestamp # Choose which frag set to use diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index a05c08d7eb..2b3db0217b 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -864,7 +864,7 @@ class ObjectReconstructor(Daemon): # ssync any out-of-sync suffixes with the remote node success, _ = ssync_sender( - self, node, job, suffixes)() + self, node, job, suffixes, include_non_durable=False)() # update stats for this attempt self.suffix_sync += len(suffixes) self.logger.update_stats('suffix.syncs', len(suffixes)) @@ -891,7 +891,8 @@ class ObjectReconstructor(Daemon): node['backend_index'] = job['policy'].get_backend_index( node['index']) success, in_sync_objs = ssync_sender( - self, node, job, job['suffixes'])() + self, node, job, job['suffixes'], + include_non_durable=True)() if success: syncd_with += 1 reverted_objs.update(in_sync_objs) diff --git a/swift/obj/server.py b/swift/obj/server.py index 52d6b139e9..a49ea1d185 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -1048,7 +1048,9 @@ class ObjectController(BaseStorageServer): if multi_stage_mime_state: self._send_multi_stage_continue_headers( request, **multi_stage_mime_state) - writer.commit(request.timestamp) + if not config_true_value( + request.headers.get('X-Backend-No-Commit', False)): + writer.commit(request.timestamp) if multi_stage_mime_state: self._drain_mime_request(**multi_stage_mime_state) except (DiskFileXattrNotSupported, DiskFileNoSpace): @@ -1310,7 +1312,14 @@ class ObjectController(BaseStorageServer): @replication @timing_stats(sample_rate=0.1) def SSYNC(self, request): - return Response(app_iter=ssync_receiver.Receiver(self, request)()) + # the ssync sender may want to send PUT subrequests for non-durable + # data that should not be committed; legacy behaviour has been to + # commit all PUTs (subject to EC footer metadata), so we need to + # indicate to the sender that this object server has been upgraded to + # understand the X-Backend-No-Commit header. + headers = {'X-Backend-Accept-No-Commit': True} + return Response(app_iter=ssync_receiver.Receiver(self, request)(), + headers=headers) def __call__(self, env, start_response): """WSGI Application entry point for the Swift Object Server.""" diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 78bd87f229..762556b82c 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -35,7 +35,8 @@ def decode_missing(line): """ Parse a string of the form generated by :py:func:`~swift.obj.ssync_sender.encode_missing` and return a dict - with keys ``object_hash``, ``ts_data``, ``ts_meta``, ``ts_ctype``. + with keys ``object_hash``, ``ts_data``, ``ts_meta``, ``ts_ctype``, + ``durable``. The encoder for this line is :py:func:`~swift.obj.ssync_sender.encode_missing` @@ -46,6 +47,7 @@ def decode_missing(line): t_data = urllib.parse.unquote(parts[1]) result['ts_data'] = Timestamp(t_data) result['ts_meta'] = result['ts_ctype'] = result['ts_data'] + result['durable'] = True # default to True in case this key isn't sent if len(parts) > 2: # allow for a comma separated list of k:v pairs to future-proof subparts = urllib.parse.unquote(parts[2]).split(',') @@ -55,6 +57,8 @@ def decode_missing(line): result['ts_meta'] = Timestamp(t_data, delta=int(v, 16)) elif k == 't': result['ts_ctype'] = Timestamp(t_data, delta=int(v, 16)) + elif k == 'durable': + result['durable'] = utils.config_true_value(v) return result @@ -279,6 +283,7 @@ class Receiver(object): except exceptions.DiskFileDeleted as err: result = {'ts_data': err.timestamp} except exceptions.DiskFileError: + # e.g. a non-durable EC frag result = {} else: result = { @@ -286,25 +291,35 @@ class Receiver(object): 'ts_meta': df.timestamp, 'ts_ctype': df.content_type_timestamp, } - if (make_durable and df.fragments and - remote['ts_data'] in df.fragments and - self.frag_index in df.fragments[remote['ts_data']] and - (df.durable_timestamp is None or - df.durable_timestamp < remote['ts_data'])): - # We have the frag, just missing durable state, so make the frag - # durable now. Try this just once to avoid looping if it fails. - try: - with df.create() as writer: - writer.commit(remote['ts_data']) - return self._check_local(remote, make_durable=False) - except Exception: - # if commit fails then log exception and fall back to wanting - # a full update - self.app.logger.exception( - '%s/%s/%s EXCEPTION in ssync.Receiver while ' - 'attempting commit of %s' - % (self.request.remote_addr, self.device, self.partition, - df._datadir)) + if ((df.durable_timestamp is None or + df.durable_timestamp < remote['ts_data']) and + df.fragments and + remote['ts_data'] in df.fragments and + self.frag_index in df.fragments[remote['ts_data']]): + # The remote is offering a fragment that we already have but is + # *newer* than anything *durable* that we have + if remote['durable']: + # We have the frag, just missing durable state, so make the + # frag durable now. Try this just once to avoid looping if + # it fails. + if make_durable: + try: + with df.create() as writer: + writer.commit(remote['ts_data']) + return self._check_local(remote, make_durable=False) + except Exception: + # if commit fails then log exception and fall back to + # wanting a full update + self.app.logger.exception( + '%s/%s/%s EXCEPTION in ssync.Receiver while ' + 'attempting commit of %s' + % (self.request.remote_addr, self.device, + self.partition, df._datadir)) + else: + # We have the non-durable frag that is on offer, but our + # ts_data may currently be set to an older durable frag, so + # bump our ts_data to prevent the remote frag being wanted. + result['ts_data'] = remote['ts_data'] return result def _check_missing(self, line): @@ -454,10 +469,15 @@ class Receiver(object): header = header.strip().lower() value = value.strip() subreq.headers[header] = value - if header != 'etag': - # make sure ssync doesn't cause 'Etag' to be added to - # obj metadata in addition to 'ETag' which object server - # sets (note capitalization) + if header not in ('etag', 'x-backend-no-commit'): + # we'll use X-Backend-Replication-Headers to force the + # object server to write all sync'd metadata, but with some + # exceptions: + # - make sure ssync doesn't cause 'Etag' to be added to + # obj metadata in addition to 'ETag' which object server + # sets (note capitalization) + # - filter out x-backend-no-commit which ssync sender may + # have added to the subrequest replication_headers.append(header) if header == 'content-length': content_length = int(value) diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index bcb16fb0e6..2a42fbf01e 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -19,14 +19,17 @@ from six.moves import urllib from swift.common import bufferedhttp from swift.common import exceptions from swift.common import http +from swift.common.utils import config_true_value -def encode_missing(object_hash, ts_data, ts_meta=None, ts_ctype=None): +def encode_missing(object_hash, ts_data, ts_meta=None, ts_ctype=None, + **kwargs): """ - Returns a string representing the object hash, its data file timestamp - and the delta forwards to its metafile and content-type timestamps, if - non-zero, in the form: - `` [m:[,t:]]`` + Returns a string representing the object hash, its data file timestamp, + the delta forwards to its metafile and content-type timestamps, if + non-zero, and its durability, in the form: + `` [m:[,t:] + [,durable:False]`` The decoder for this line is :py:func:`~swift.obj.ssync_receiver.decode_missing` @@ -34,12 +37,18 @@ def encode_missing(object_hash, ts_data, ts_meta=None, ts_ctype=None): msg = ('%s %s' % (urllib.parse.quote(object_hash), urllib.parse.quote(ts_data.internal))) + extra_parts = [] if ts_meta and ts_meta != ts_data: delta = ts_meta.raw - ts_data.raw - msg = '%s m:%x' % (msg, delta) + extra_parts.append('m:%x' % delta) if ts_ctype and ts_ctype != ts_data: delta = ts_ctype.raw - ts_data.raw - msg = '%s,t:%x' % (msg, delta) + extra_parts.append('t:%x' % delta) + if 'durable' in kwargs and kwargs['durable'] is False: + # only send durable in the less common case that it is False + extra_parts.append('durable:%s' % kwargs['durable']) + if extra_parts: + msg = '%s %s' % (msg, ','.join(extra_parts)) return msg.encode('ascii') @@ -133,7 +142,8 @@ class Sender(object): process is there. """ - def __init__(self, daemon, node, job, suffixes, remote_check_objs=None): + def __init__(self, daemon, node, job, suffixes, remote_check_objs=None, + include_non_durable=False): self.daemon = daemon self.df_mgr = self.daemon._df_router[job['policy']] self.node = node @@ -142,6 +152,7 @@ class Sender(object): # When remote_check_objs is given in job, ssync_sender trys only to # make sure those objects exist or not in remote. self.remote_check_objs = remote_check_objs + self.include_non_durable = include_non_durable def __call__(self): """ @@ -221,11 +232,11 @@ class Sender(object): with the object server. """ connection = response = None + node_addr = '%s:%s' % (self.node['replication_ip'], + self.node['replication_port']) with exceptions.MessageTimeout( self.daemon.conn_timeout, 'connect send'): - connection = SsyncBufferedHTTPConnection( - '%s:%s' % (self.node['replication_ip'], - self.node['replication_port'])) + connection = SsyncBufferedHTTPConnection(node_addr) connection.putrequest('SSYNC', '/%s/%s' % ( self.node['device'], self.job['partition'])) connection.putheader('Transfer-Encoding', 'chunked') @@ -248,6 +259,14 @@ class Sender(object): raise exceptions.ReplicationException( 'Expected status %s; got %s (%s)' % (http.HTTP_OK, response.status, err_msg)) + if self.include_non_durable and not config_true_value( + response.getheader('x-backend-accept-no-commit', False)): + # fall back to legacy behaviour if receiver does not understand + # X-Backend-Commit + self.daemon.logger.warning( + 'ssync receiver %s does not accept non-durable fragments' % + node_addr) + self.include_non_durable = False return connection, response def missing_check(self, connection, response): @@ -265,10 +284,14 @@ class Sender(object): self.daemon.node_timeout, 'missing_check start'): msg = b':MISSING_CHECK: START\r\n' connection.send(b'%x\r\n%s\r\n' % (len(msg), msg)) + # an empty frag_prefs list is sufficient to get non-durable frags + # yielded, in which case an older durable frag will not be yielded + frag_prefs = [] if self.include_non_durable else None hash_gen = self.df_mgr.yield_hashes( self.job['device'], self.job['partition'], self.job['policy'], self.suffixes, - frag_index=self.job.get('frag_index')) + frag_index=self.job.get('frag_index'), + frag_prefs=frag_prefs) if self.remote_check_objs is not None: hash_gen = six.moves.filter( lambda objhash_timestamps: @@ -330,13 +353,14 @@ class Sender(object): self.daemon.node_timeout, 'updates start'): msg = b':UPDATES: START\r\n' connection.send(b'%x\r\n%s\r\n' % (len(msg), msg)) + frag_prefs = [] if self.include_non_durable else None for object_hash, want in send_map.items(): object_hash = urllib.parse.unquote(object_hash) try: df = self.df_mgr.get_diskfile_from_hash( self.job['device'], self.job['partition'], object_hash, self.job['policy'], frag_index=self.job.get('frag_index'), - open_expired=True) + open_expired=True, frag_prefs=frag_prefs) except exceptions.DiskFileNotExist: continue url_path = urllib.parse.quote( @@ -344,13 +368,15 @@ class Sender(object): try: df.open() if want.get('data'): + is_durable = (df.durable_timestamp == df.data_timestamp) # EC reconstructor may have passed a callback to build an # alternative diskfile - construct it using the metadata # from the data file only. df_alt = self.job.get( 'sync_diskfile_builder', lambda *args: df)( self.job, self.node, df.get_datafile_metadata()) - self.send_put(connection, url_path, df_alt) + self.send_put(connection, url_path, df_alt, + durable=is_durable) if want.get('meta') and df.data_timestamp != df.timestamp: self.send_post(connection, url_path, df) except exceptions.DiskFileDeleted as err: @@ -443,12 +469,16 @@ class Sender(object): headers = {'X-Timestamp': timestamp.internal} self.send_subrequest(connection, 'DELETE', url_path, headers, None) - def send_put(self, connection, url_path, df): + def send_put(self, connection, url_path, df, durable=True): """ Sends a PUT subrequest for the url_path using the source df (DiskFile) and content_length. """ headers = {'Content-Length': str(df.content_length)} + if not durable: + # only send this header for the less common case; without this + # header object servers assume default commit behaviour + headers['X-Backend-No-Commit'] = 'True' for key, value in df.get_datafile_metadata().items(): if key not in ('name', 'Content-Length'): headers[key] = value diff --git a/test/probe/common.py b/test/probe/common.py index 782d364c4d..667e0245bb 100644 --- a/test/probe/common.py +++ b/test/probe/common.py @@ -677,8 +677,9 @@ class ECProbeTest(ProbeTest): def assert_direct_get_succeeds(self, onode, opart, require_durable=True, extra_headers=None): try: - self.direct_get(onode, opart, require_durable=require_durable, - extra_headers=extra_headers) + return self.direct_get(onode, opart, + require_durable=require_durable, + extra_headers=extra_headers) except direct_client.DirectClientException as err: self.fail('Node data on %r was not available: %s' % (onode, err)) @@ -715,6 +716,31 @@ class ECProbeTest(ProbeTest): raise return made_non_durable + def make_durable(self, nodes, opart): + # ensure all data files on the specified nodes are durable + made_durable = 0 + for i, node in enumerate(nodes): + part_dir = self.storage_dir(node, part=opart) + for dirs, subdirs, files in os.walk(part_dir): + for fname in sorted(files, reverse=True): + # make the newest non-durable be durable + if (fname.endswith('.data') and + not fname.endswith('#d.data')): + made_durable += 1 + non_durable_fname = fname.replace('.data', '#d.data') + os.rename(os.path.join(dirs, fname), + os.path.join(dirs, non_durable_fname)) + + break + headers, etag = self.assert_direct_get_succeeds(node, opart) + self.assertIn('X-Backend-Durable-Timestamp', headers) + try: + os.remove(os.path.join(part_dir, 'hashes.pkl')) + except OSError as e: + if e.errno != errno.ENOENT: + raise + return made_durable + if __name__ == "__main__": for server in ('account', 'container'): diff --git a/test/probe/test_reconstructor_revert.py b/test/probe/test_reconstructor_revert.py index 8692d8b5b7..c90de6b8a6 100644 --- a/test/probe/test_reconstructor_revert.py +++ b/test/probe/test_reconstructor_revert.py @@ -316,6 +316,137 @@ class TestReconstructorRevert(ECProbeTest): else: self.fail('Did not find rebuilt fragment on partner node') + def test_handoff_non_durable(self): + # verify that reconstructor reverts non-durable frags from handoff to + # primary (and also durable frag of same object on same handoff) and + # cleans up non-durable data files on handoffs after revert + headers = {'X-Storage-Policy': self.policy.name} + client.put_container(self.url, self.token, self.container_name, + headers=headers) + + # get our node lists + opart, onodes = self.object_ring.get_nodes( + self.account, self.container_name, self.object_name) + pdevs = [self.device_dir(onode) for onode in onodes] + hnodes = list(itertools.islice( + self.object_ring.get_more_nodes(opart), 2)) + + # kill a primary nodes so we can force data onto a handoff + self.kill_drive(pdevs[0]) + + # PUT object at t1 + contents = Body(total=3.5 * 2 ** 20) + headers = {'x-object-meta-foo': 'meta-foo'} + headers_post = {'x-object-meta-bar': 'meta-bar'} + client.put_object(self.url, self.token, self.container_name, + self.object_name, contents=contents, + headers=headers) + client.post_object(self.url, self.token, self.container_name, + self.object_name, headers=headers_post) + # (Some versions of?) swiftclient will mutate the headers dict on post + headers_post.pop('X-Auth-Token', None) + + # this primary can't serve the data; we expect 507 here and not 404 + # because we're using mount_check to kill nodes + self.assert_direct_get_fails(onodes[0], opart, 507) + # these primaries and first handoff do have the data + for onode in (onodes[1:]): + self.assert_direct_get_succeeds(onode, opart) + _hdrs, older_frag_etag = self.assert_direct_get_succeeds(hnodes[0], + opart) + self.assert_direct_get_fails(hnodes[1], opart, 404) + + # make sure we can GET the object; there's 5 primaries and 1 handoff + headers, older_obj_etag = self.proxy_get() + self.assertEqual(contents.etag, older_obj_etag) + self.assertEqual('meta-bar', headers.get('x-object-meta-bar')) + + # PUT object at t2; make all frags non-durable so that the previous + # durable frags at t1 remain on object server; use InternalClient so + # that x-backend-no-commit is passed through + internal_client = self.make_internal_client() + contents2 = Body(total=2.5 * 2 ** 20) # different content + self.assertNotEqual(contents2.etag, older_obj_etag) # sanity check + headers = {'x-backend-no-commit': 'True', + 'x-object-meta-bar': 'meta-bar-new'} + internal_client.upload_object(contents2, self.account, + self.container_name.decode('utf8'), + self.object_name.decode('utf8'), + headers) + # GET should still return the older durable object + headers, obj_etag = self.proxy_get() + self.assertEqual(older_obj_etag, obj_etag) + self.assertEqual('meta-bar', headers.get('x-object-meta-bar')) + # on handoff we have older durable and newer non-durable + _hdrs, frag_etag = self.assert_direct_get_succeeds(hnodes[0], opart) + self.assertEqual(older_frag_etag, frag_etag) + _hdrs, newer_frag_etag = self.assert_direct_get_succeeds( + hnodes[0], opart, require_durable=False) + self.assertNotEqual(older_frag_etag, newer_frag_etag) + + # now make all the newer frags durable only on the 5 primaries + self.assertEqual(5, self.make_durable(onodes[1:], opart)) + # now GET will return the newer object + headers, newer_obj_etag = self.proxy_get() + self.assertEqual(contents2.etag, newer_obj_etag) + self.assertNotEqual(older_obj_etag, newer_obj_etag) + self.assertEqual('meta-bar-new', headers.get('x-object-meta-bar')) + + # fix the 507'ing primary + self.revive_drive(pdevs[0]) + + # fire up reconstructor on handoff node only + hnode_id = (hnodes[0]['port'] % 100) // 10 + self.reconstructor.once(number=hnode_id) + + # primary now has only the newer non-durable frag + self.assert_direct_get_fails(onodes[0], opart, 404) + _hdrs, frag_etag = self.assert_direct_get_succeeds( + onodes[0], opart, require_durable=False) + self.assertEqual(newer_frag_etag, frag_etag) + + # handoff has only the older durable + _hdrs, frag_etag = self.assert_direct_get_succeeds(hnodes[0], opart) + self.assertEqual(older_frag_etag, frag_etag) + headers, frag_etag = self.assert_direct_get_succeeds( + hnodes[0], opart, require_durable=False) + self.assertEqual(older_frag_etag, frag_etag) + self.assertEqual('meta-bar', headers.get('x-object-meta-bar')) + + # fire up reconstructor on handoff node only, again + self.reconstructor.once(number=hnode_id) + + # primary now has the newer non-durable frag and the older durable frag + headers, frag_etag = self.assert_direct_get_succeeds(onodes[0], opart) + self.assertEqual(older_frag_etag, frag_etag) + self.assertEqual('meta-bar', headers.get('x-object-meta-bar')) + headers, frag_etag = self.assert_direct_get_succeeds( + onodes[0], opart, require_durable=False) + self.assertEqual(newer_frag_etag, frag_etag) + self.assertEqual('meta-bar-new', headers.get('x-object-meta-bar')) + + # handoff has nothing + self.assert_direct_get_fails(hnodes[0], opart, 404, + require_durable=False) + + # kill all but first two primaries + for pdev in pdevs[2:]: + self.kill_drive(pdev) + # fire up reconstructor on the remaining primary[1]; without the + # other primaries, primary[1] cannot rebuild the frag but it can let + # primary[0] know that its non-durable frag can be made durable + self.reconstructor.once(number=self.config_number(onodes[1])) + + # first primary now has a *durable* *newer* frag - it *was* useful to + # sync the non-durable! + headers, frag_etag = self.assert_direct_get_succeeds(onodes[0], opart) + self.assertEqual(newer_frag_etag, frag_etag) + self.assertEqual('meta-bar-new', headers.get('x-object-meta-bar')) + + # revive primaries (in case we want to debug) + for pdev in pdevs[2:]: + self.revive_drive(pdev) + if __name__ == "__main__": unittest.main() diff --git a/test/unit/obj/common.py b/test/unit/obj/common.py index fd83427da7..90c6922be3 100644 --- a/test/unit/obj/common.py +++ b/test/unit/obj/common.py @@ -70,10 +70,10 @@ class BaseTest(unittest.TestCase): shutil.rmtree(self.tmpdir, ignore_errors=True) def _make_diskfile(self, device='dev', partition='9', - account='a', container='c', obj='o', body='test', + account='a', container='c', obj='o', body=b'test', extra_metadata=None, policy=None, frag_index=None, timestamp=None, df_mgr=None, - commit=True, verify=True): + commit=True, verify=True, **kwargs): policy = policy or POLICIES.legacy object_parts = account, container, obj timestamp = Timestamp.now() if timestamp is None else timestamp @@ -81,7 +81,7 @@ class BaseTest(unittest.TestCase): df_mgr = self.daemon._df_router[policy] df = df_mgr.get_diskfile( device, partition, *object_parts, policy=policy, - frag_index=frag_index) + frag_index=frag_index, **kwargs) write_diskfile(df, timestamp, data=body, extra_metadata=extra_metadata, commit=commit) if commit and verify: @@ -99,9 +99,10 @@ class BaseTest(unittest.TestCase): def _make_open_diskfile(self, device='dev', partition='9', account='a', container='c', obj='o', body=b'test', extra_metadata=None, policy=None, - frag_index=None, timestamp=None, df_mgr=None): + frag_index=None, timestamp=None, df_mgr=None, + commit=True, **kwargs): df = self._make_diskfile(device, partition, account, container, obj, body, extra_metadata, policy, frag_index, - timestamp, df_mgr) + timestamp, df_mgr, commit, **kwargs) df.open() return df diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index a1d1c8c25e..53e56f0a01 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -1539,8 +1539,9 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): invalidations_file = os.path.join( part_dir, diskfile.HASH_INVALIDATIONS_FILE) with open(invalidations_file) as f: - self.assertEqual('%s\n%s' % (df1_suffix, df2_suffix), - f.read().strip('\n')) # sanity + invalids = f.read().splitlines() + self.assertEqual(sorted((df1_suffix, df2_suffix)), + sorted(invalids)) # sanity # next time get hashes runs with mock.patch('time.time', mock_time): @@ -2768,55 +2769,59 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): expected) def test_yield_hashes_legacy_durable(self): - old_ts = '1383180000.12345' - fresh_ts = Timestamp(time() - 10).internal - fresher_ts = Timestamp(time() - 1).internal + old_ts = Timestamp('1383180000.12345') + fresh_ts = Timestamp(time() - 10) + fresher_ts = Timestamp(time() - 1) suffix_map = { 'abc': { '9373a92d072897b136b3fc06595b4abc': [ - fresh_ts + '.ts'], + fresh_ts.internal + '.ts'], }, '456': { '9373a92d072897b136b3fc06595b0456': [ - old_ts + '#2.data', - old_ts + '.durable'], + old_ts.internal + '#2.data', + old_ts.internal + '.durable'], '9373a92d072897b136b3fc06595b7456': [ - fresh_ts + '.ts', - fresher_ts + '#2.data', - fresher_ts + '.durable'], + fresh_ts.internal + '.ts', + fresher_ts.internal + '#2.data', + fresher_ts.internal + '.durable'], }, 'def': {}, } expected = { '9373a92d072897b136b3fc06595b4abc': {'ts_data': fresh_ts}, - '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts}, - '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts, + 'durable': True}, + '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=2) def test_yield_hashes(self): - old_ts = '1383180000.12345' - fresh_ts = Timestamp(time() - 10).internal - fresher_ts = Timestamp(time() - 1).internal + old_ts = Timestamp('1383180000.12345') + fresh_ts = Timestamp(time() - 10) + fresher_ts = Timestamp(time() - 1) suffix_map = { 'abc': { '9373a92d072897b136b3fc06595b4abc': [ - fresh_ts + '.ts'], + fresh_ts.internal + '.ts'], }, '456': { '9373a92d072897b136b3fc06595b0456': [ - old_ts + '#2#d.data'], + old_ts.internal + '#2#d.data'], '9373a92d072897b136b3fc06595b7456': [ - fresh_ts + '.ts', - fresher_ts + '#2#d.data'], + fresh_ts.internal + '.ts', + fresher_ts.internal + '#2#d.data'], }, 'def': {}, } expected = { '9373a92d072897b136b3fc06595b4abc': {'ts_data': fresh_ts}, - '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts}, - '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts, + 'durable': True}, + '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=2) @@ -2847,9 +2852,11 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): expected = { '9373a92d072897b136b3fc06595b4abc': {'ts_data': ts1}, '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1, - 'ts_meta': ts3}, + 'ts_meta': ts3, + 'durable': True}, '9373a92d072897b136b3fc06595b7456': {'ts_data': ts1, - 'ts_meta': ts2}, + 'ts_meta': ts2, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected) @@ -2885,9 +2892,11 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): expected = { '9373a92d072897b136b3fc06595b4abc': {'ts_data': ts1}, '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1, - 'ts_meta': ts3}, + 'ts_meta': ts3, + 'durable': True}, '9373a92d072897b136b3fc06595b7456': {'ts_data': ts1, - 'ts_meta': ts2}, + 'ts_meta': ts2, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected) @@ -2921,8 +2930,10 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): 'def': {}, } expected = { - '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts}, - '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts, + 'durable': True}, + '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, suffixes=['456'], frag_index=2) @@ -2947,8 +2958,10 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): 'def': {}, } expected = { - '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts}, - '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': old_ts, + 'durable': True}, + '9373a92d072897b136b3fc06595b7456': {'ts_data': fresher_ts, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, suffixes=['456'], frag_index=2) @@ -2965,7 +2978,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): }, } expected = { - '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=2) @@ -2974,12 +2988,62 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): suffix_map['456']['9373a92d072897b136b3fc06595b7456'] = [ ts1.internal + '#2#d.data'] expected = { - '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1}, - '9373a92d072897b136b3fc06595b7456': {'ts_data': ts1}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1, + 'durable': True}, + '9373a92d072897b136b3fc06595b7456': {'ts_data': ts1, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=2) + def test_yield_hashes_optionally_yields_non_durable_data(self): + ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) + ts1 = next(ts_iter) + ts2 = next(ts_iter) + suffix_map = { + 'abc': { + '9373a92d072897b136b3fc06595b4abc': [ + ts1.internal + '#2#d.data', + ts2.internal + '#2.data'], # newer non-durable + '9373a92d072897b136b3fc06595b0abc': [ + ts1.internal + '#2.data', # older non-durable + ts2.internal + '#2#d.data'], + }, + '456': { + '9373a92d072897b136b3fc06595b0456': [ + ts1.internal + '#2#d.data'], + '9373a92d072897b136b3fc06595b7456': [ + ts2.internal + '#2.data'], + }, + } + + # sanity check non-durables not yielded + expected = { + '9373a92d072897b136b3fc06595b4abc': {'ts_data': ts1, + 'durable': True}, + '9373a92d072897b136b3fc06595b0abc': {'ts_data': ts2, + 'durable': True}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1, + 'durable': True}, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=2, frag_prefs=None) + + # an empty frag_prefs list is sufficient to get non-durables yielded + # (in preference over *older* durable) + expected = { + '9373a92d072897b136b3fc06595b4abc': {'ts_data': ts2, + 'durable': False}, + '9373a92d072897b136b3fc06595b0abc': {'ts_data': ts2, + 'durable': True}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1, + 'durable': True}, + '9373a92d072897b136b3fc06595b7456': {'ts_data': ts2, + 'durable': False}, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=2, frag_prefs=[]) + def test_yield_hashes_skips_missing_legacy_durable(self): ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) ts1 = next(ts_iter) @@ -2993,7 +3057,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): }, } expected = { - '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=2) @@ -3002,8 +3067,10 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): suffix_map['456']['9373a92d072897b136b3fc06595b7456'].append( ts1.internal + '.durable') expected = { - '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1}, - '9373a92d072897b136b3fc06595b7456': {'ts_data': ts1}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1, + 'durable': True}, + '9373a92d072897b136b3fc06595b7456': {'ts_data': ts1, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=2) @@ -3023,7 +3090,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): }, } expected = { - '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=None) @@ -3034,7 +3102,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): suffix_map['456']['9373a92d072897b136b3fc06595b0456'].append( ts2.internal + '.durable') expected = { - '9373a92d072897b136b3fc06595b0456': {'ts_data': ts2}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': ts2, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=None) @@ -3055,7 +3124,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): }, } expected = { - '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': ts1, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=None) @@ -3072,7 +3142,8 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): }, } expected = { - '9373a92d072897b136b3fc06595b0456': {'ts_data': ts2}, + '9373a92d072897b136b3fc06595b0456': {'ts_data': ts2, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=None) @@ -3130,12 +3201,16 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): }, } expected = { - '9333a92d072897b136b3fc06595b0456': {'ts_data': ts1}, + '9333a92d072897b136b3fc06595b0456': {'ts_data': ts1, + 'durable': True}, '9999a92d072897b136b3fc06595bb456': {'ts_data': ts1, - 'ts_meta': ts2}, - '9333a92d072897b136b3fc06595b1456': {'ts_data': ts1}, + 'ts_meta': ts2, + 'durable': True}, + '9333a92d072897b136b3fc06595b1456': {'ts_data': ts1, + 'durable': True}, '9999a92d072897b136b3fc06595bc456': {'ts_data': ts1, - 'ts_meta': ts2}, + 'ts_meta': ts2, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=2) @@ -3170,9 +3245,12 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): }, } expected = { - '1111111111111111111111111111127e': {'ts_data': ts1}, - '2222222222222222222222222222227e': {'ts_data': ts2}, - '3333333333333333333333333333300b': {'ts_data': ts3}, + '1111111111111111111111111111127e': {'ts_data': ts1, + 'durable': True}, + '2222222222222222222222222222227e': {'ts_data': ts2, + 'durable': True}, + '3333333333333333333333333333300b': {'ts_data': ts3, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=2) @@ -3212,9 +3290,12 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): }, } expected = { - '1111111111111111111111111111127e': {'ts_data': ts1}, - '2222222222222222222222222222227e': {'ts_data': ts2}, - '3333333333333333333333333333300b': {'ts_data': ts3}, + '1111111111111111111111111111127e': {'ts_data': ts1, + 'durable': True}, + '2222222222222222222222222222227e': {'ts_data': ts2, + 'durable': True}, + '3333333333333333333333333333300b': {'ts_data': ts3, + 'durable': True}, } self._check_yield_hashes(POLICIES.default, suffix_map, expected, frag_index=2) @@ -3271,7 +3352,7 @@ class DiskFileMixin(BaseDiskFileTestMixin): def _create_ondisk_file(self, df, data, timestamp, metadata=None, ctype_timestamp=None, - ext='.data', legacy_durable=False): + ext='.data', legacy_durable=False, commit=True): mkdirs(df._datadir) if timestamp is None: timestamp = time() @@ -3292,12 +3373,15 @@ class DiskFileMixin(BaseDiskFileTestMixin): if ext == '.data' and df.policy.policy_type == EC_POLICY: if legacy_durable: filename = '%s#%s' % (timestamp.internal, df._frag_index) - durable_file = os.path.join(df._datadir, - '%s.durable' % timestamp.internal) - with open(durable_file, 'wb') as f: - pass - else: + if commit: + durable_file = os.path.join( + df._datadir, '%s.durable' % timestamp.internal) + with open(durable_file, 'wb') as f: + pass + elif commit: filename = '%s#%s#d' % (timestamp.internal, df._frag_index) + else: + filename = '%s#%s' % (timestamp.internal, df._frag_index) if ctype_timestamp: metadata.update( {'Content-Type-Timestamp': @@ -6300,6 +6384,35 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase): df.open() # not quarantined + def test_ondisk_data_info_has_durable_key(self): + # non-durable; use frag_prefs=[] to allow it to be opened + df = self._simple_get_diskfile(obj='o1', frag_prefs=[]) + self._create_ondisk_file(df, b'', ext='.data', timestamp=10, + metadata={'name': '/a/c/o1'}, commit=False) + with df.open(): + self.assertIn('durable', df._ondisk_info['data_info']) + self.assertFalse(df._ondisk_info['data_info']['durable']) + + # durable + df = self._simple_get_diskfile(obj='o2') + self._create_ondisk_file(df, b'', ext='.data', timestamp=10, + metadata={'name': '/a/c/o2'}) + with df.open(): + self.assertIn('durable', df._ondisk_info['data_info']) + self.assertTrue(df._ondisk_info['data_info']['durable']) + + # legacy durable + df = self._simple_get_diskfile(obj='o3') + self._create_ondisk_file(df, b'', ext='.data', timestamp=10, + metadata={'name': '/a/c/o3'}, + legacy_durable=True) + with df.open(): + data_info = df._ondisk_info['data_info'] + # sanity check it is legacy with no #d part in filename + self.assertEqual(data_info['filename'], '0000000010.00000#2.data') + self.assertIn('durable', data_info) + self.assertTrue(data_info['durable']) + @patch_policies(with_ec_default=True) class TestSuffixHashes(unittest.TestCase): @@ -7066,7 +7179,9 @@ class TestSuffixHashes(unittest.TestCase): df2.delete(self.ts()) # suffix2 should be in invalidations file with open(invalidations_file, 'r') as f: - self.assertEqual("%s\n%s\n" % (suffix2, suffix2), f.read()) + invalids = f.read().splitlines() + self.assertEqual(sorted((suffix2, suffix2)), + sorted(invalids)) # sanity # hashes file is not yet changed with open(hashes_file, 'rb') as f: found_hashes = pickle.load(f) diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index c9682395a3..215d83ce46 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -52,10 +52,11 @@ from test.unit.obj.common import write_diskfile @contextmanager def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs): - def fake_ssync(daemon, node, job, suffixes): + def fake_ssync(daemon, node, job, suffixes, **kwargs): if ssync_calls is not None: - ssync_calls.append( - {'node': node, 'job': job, 'suffixes': suffixes}) + call_args = {'node': node, 'job': job, 'suffixes': suffixes} + call_args.update(kwargs) + ssync_calls.append(call_args) def fake_call(): if response_callback: @@ -1136,6 +1137,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.success = False break context['success'] = self.success + context.update(kwargs) def __call__(self, *args, **kwargs): return self.success, self.available_map if self.success else {} @@ -1168,6 +1170,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): expected_calls = [] for context in ssync_calls: if context['job']['job_type'] == REVERT: + self.assertTrue(context.get('include_non_durable')) for dirpath, files in visit_obj_dirs(context): # sanity check - expect some files to be in dir, # may not be for the reverted frag index @@ -1176,6 +1179,9 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): expected_calls.append(mock.call(context['job'], context['available_map'], context['node']['index'])) + else: + self.assertFalse(context.get('include_non_durable')) + mock_delete.assert_has_calls(expected_calls, any_order=True) # N.B. in this next test sequence we acctually delete files after @@ -1193,12 +1199,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.reconstructor.reconstruct() for context in ssync_calls: if context['job']['job_type'] == REVERT: + self.assertTrue(True, context.get('include_non_durable')) data_file_tail = ('#%s.data' % context['node']['index']) for dirpath, files in visit_obj_dirs(context): n_files_after += len(files) for filename in files: self.assertFalse(filename.endswith(data_file_tail)) + else: + self.assertFalse(context.get('include_non_durable')) # sanity check that some files should were deleted self.assertGreater(n_files, n_files_after) @@ -1225,13 +1234,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertEqual(len(captured_ssync), 2) expected_ssync_calls = { # device, part, frag_index: expected_occurrences - ('sda1', 2, 2): 1, - ('sda1', 2, 0): 1, + ('sda1', 2, 2, True): 1, + ('sda1', 2, 0, True): 1, } self.assertEqual(expected_ssync_calls, dict(collections.Counter( (context['job']['device'], context['job']['partition'], - context['job']['frag_index']) + context['job']['frag_index'], + context['include_non_durable']) for context in captured_ssync ))) @@ -1296,14 +1306,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.reconstructor.reconstruct(override_partitions=[2]) expected_ssync_calls = sorted([ - (u'10.0.0.0', REVERT, 2, [u'3c1']), - (u'10.0.0.2', REVERT, 2, [u'061']), + (u'10.0.0.0', REVERT, 2, [u'3c1'], True), + (u'10.0.0.2', REVERT, 2, [u'061'], True), ]) self.assertEqual(expected_ssync_calls, sorted(( c['node']['ip'], c['job']['job_type'], c['job']['partition'], c['suffixes'], + c.get('include_non_durable') ) for c in ssync_calls)) expected_stats = { @@ -3797,14 +3808,15 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): [(r['ip'], r['path']) for r in request_log.requests]) expected_ssync_calls = sorted([ - (sync_to[0]['ip'], 0, set(['123', 'abc'])), - (sync_to[1]['ip'], 0, set(['123', 'abc'])), - (sync_to[2]['ip'], 0, set(['123', 'abc'])), + (sync_to[0]['ip'], 0, set(['123', 'abc']), False), + (sync_to[1]['ip'], 0, set(['123', 'abc']), False), + (sync_to[2]['ip'], 0, set(['123', 'abc']), False), ]) self.assertEqual(expected_ssync_calls, sorted(( c['node']['ip'], c['job']['partition'], set(c['suffixes']), + c.get('include_non_durable'), ) for c in ssync_calls)) def test_sync_duplicates_to_remote_region(self): @@ -3966,12 +3978,13 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): for r in request_log.requests)) expected_ssync_calls = sorted([ - (sync_to[1]['ip'], 0, ['abc']), + (sync_to[1]['ip'], 0, ['abc'], False), ]) self.assertEqual(expected_ssync_calls, sorted(( c['node']['ip'], c['job']['partition'], c['suffixes'], + c.get('include_non_durable') ) for c in ssync_calls)) def test_process_job_primary_some_in_sync(self): @@ -4038,11 +4051,12 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual( dict(collections.Counter( - (c['node']['index'], tuple(sorted(c['suffixes']))) + (c['node']['index'], tuple(sorted(c['suffixes'])), + c.get('include_non_durable')) for c in ssync_calls)), - {(sync_to[0]['index'], ('123',)): 1, - (sync_to[1]['index'], ('abc',)): 1, - (sync_to[2]['index'], ('123', 'abc')): 1, + {(sync_to[0]['index'], ('123',), False): 1, + (sync_to[1]['index'], ('abc',), False): 1, + (sync_to[2]['index'], ('123', 'abc'), False): 1, }) def test_process_job_primary_down(self): @@ -4102,14 +4116,15 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual(expected_suffix_calls, found_suffix_calls) expected_ssync_calls = sorted([ - ('10.0.0.0', 0, set(['123', 'abc'])), - ('10.0.0.1', 0, set(['123', 'abc'])), - ('10.0.0.2', 0, set(['123', 'abc'])), + ('10.0.0.0', 0, set(['123', 'abc']), False), + ('10.0.0.1', 0, set(['123', 'abc']), False), + ('10.0.0.2', 0, set(['123', 'abc']), False), ]) found_ssync_calls = sorted(( c['node']['ip'], c['job']['partition'], set(c['suffixes']), + c.get('include_non_durable') ) for c in ssync_calls) self.assertEqual(expected_ssync_calls, found_ssync_calls) @@ -4276,10 +4291,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual( sorted(collections.Counter( (c['node']['ip'], c['node']['port'], c['node']['device'], - tuple(sorted(c['suffixes']))) + tuple(sorted(c['suffixes'])), + c.get('include_non_durable')) for c in ssync_calls).items()), [((sync_to[0]['ip'], sync_to[0]['port'], sync_to[0]['device'], - ('123', 'abc')), 1)]) + ('123', 'abc'), True), 1)]) def test_process_job_will_not_revert_to_handoff(self): frag_index = random.randint( @@ -4331,10 +4347,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual( sorted(collections.Counter( (c['node']['ip'], c['node']['port'], c['node']['device'], - tuple(sorted(c['suffixes']))) + tuple(sorted(c['suffixes'])), + c.get('include_non_durable')) for c in ssync_calls).items()), [((sync_to[0]['ip'], sync_to[0]['port'], sync_to[0]['device'], - ('123', 'abc')), 1)]) + ('123', 'abc'), True), 1)]) def test_process_job_revert_is_handoff_fails(self): frag_index = random.randint( @@ -4385,10 +4402,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): self.assertEqual( sorted(collections.Counter( (c['node']['ip'], c['node']['port'], c['node']['device'], - tuple(sorted(c['suffixes']))) + tuple(sorted(c['suffixes'])), + c.get('include_non_durable')) for c in ssync_calls).items()), [((sync_to[0]['ip'], sync_to[0]['port'], sync_to[0]['device'], - ('123', 'abc')), 1)]) + ('123', 'abc'), True), 1)]) self.assertEqual(self.reconstructor.handoffs_remaining, 1) def test_process_job_revert_cleanup(self): diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index f04e9523c0..a9f87dacb1 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -2629,14 +2629,15 @@ class TestObjectController(unittest.TestCase): resp = req.get_response(self.object_controller) self.assertEqual(resp.status_int, 201) - def test_EC_GET_PUT_data(self): + def test_EC_PUT_GET_data(self): for policy in self.ec_policies: + ts = next(self.ts) raw_data = (b'VERIFY' * policy.ec_segment_size)[:-432] frag_archives = encode_frag_archive_bodies(policy, raw_data) frag_index = random.randint(0, len(frag_archives) - 1) # put EC frag archive req = Request.blank('/sda1/p/a/c/o', method='PUT', headers={ - 'X-Timestamp': next(self.ts).internal, + 'X-Timestamp': ts.internal, 'Content-Type': 'application/verify', 'Content-Length': len(frag_archives[frag_index]), 'X-Object-Sysmeta-Ec-Frag-Index': frag_index, @@ -2654,6 +2655,59 @@ class TestObjectController(unittest.TestCase): self.assertEqual(resp.status_int, 200) self.assertEqual(resp.body, frag_archives[frag_index]) + # check the diskfile is durable + df_mgr = diskfile.ECDiskFileManager(self.conf, + self.object_controller.logger) + df = df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o', policy, + frag_prefs=[]) + with df.open(): + self.assertEqual(ts, df.data_timestamp) + self.assertEqual(df.data_timestamp, df.durable_timestamp) + + def test_EC_PUT_GET_data_no_commit(self): + for policy in self.ec_policies: + ts = next(self.ts) + raw_data = (b'VERIFY' * policy.ec_segment_size)[:-432] + frag_archives = encode_frag_archive_bodies(policy, raw_data) + frag_index = random.randint(0, len(frag_archives) - 1) + # put EC frag archive + req = Request.blank('/sda1/p/a/c/o', method='PUT', headers={ + 'X-Timestamp': ts.internal, + 'Content-Type': 'application/verify', + 'Content-Length': len(frag_archives[frag_index]), + 'X-Backend-No-Commit': 'true', + 'X-Object-Sysmeta-Ec-Frag-Index': frag_index, + 'X-Backend-Storage-Policy-Index': int(policy), + }) + req.body = frag_archives[frag_index] + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 201) + + # get EC frag archive will 404 - nothing durable... + req = Request.blank('/sda1/p/a/c/o', headers={ + 'X-Backend-Storage-Policy-Index': int(policy), + }) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 404) + + # ...unless we explicitly request *any* fragment... + req = Request.blank('/sda1/p/a/c/o', headers={ + 'X-Backend-Storage-Policy-Index': int(policy), + 'X-Backend-Fragment-Preferences': '[]', + }) + resp = req.get_response(self.object_controller) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.body, frag_archives[frag_index]) + + # check the diskfile is not durable + df_mgr = diskfile.ECDiskFileManager(self.conf, + self.object_controller.logger) + df = df_mgr.get_diskfile('sda1', 'p', 'a', 'c', 'o', policy, + frag_prefs=[]) + with df.open(): + self.assertEqual(ts, df.data_timestamp) + self.assertIsNone(df.durable_timestamp) + def test_EC_GET_quarantine_invalid_frag_archive(self): policy = random.choice(self.ec_policies) raw_data = (b'VERIFY' * policy.ec_segment_size)[:-432] @@ -7109,6 +7163,8 @@ class TestObjectController(unittest.TestCase): headers={}) resp = req.get_response(self.object_controller) self.assertEqual(resp.status_int, 200) + self.assertEqual('True', + resp.headers.get('X-Backend-Accept-No-Commit')) def test_PUT_with_full_drive(self): diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index 19be0d4d72..1c29d378be 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -123,7 +123,7 @@ class TestBaseSsync(BaseTest): return self.obj_data[path] def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp, - frag_indexes=None, commit=True): + frag_indexes=None, commit=True, **kwargs): frag_indexes = frag_indexes or [None] metadata = {'Content-Type': 'plain/text'} diskfiles = [] @@ -136,22 +136,22 @@ class TestBaseSsync(BaseTest): device=self.device, partition=self.partition, account='a', container='c', obj=obj_name, body=object_data, extra_metadata=metadata, timestamp=timestamp, policy=policy, - frag_index=frag_index, df_mgr=df_mgr, commit=commit) + frag_index=frag_index, df_mgr=df_mgr, commit=commit, **kwargs) diskfiles.append(df) return diskfiles - def _open_tx_diskfile(self, obj_name, policy, frag_index=None): + def _open_tx_diskfile(self, obj_name, policy, frag_index=None, **kwargs): df_mgr = self.daemon._df_router[policy] df = df_mgr.get_diskfile( self.device, self.partition, account='a', container='c', - obj=obj_name, policy=policy, frag_index=frag_index) + obj=obj_name, policy=policy, frag_index=frag_index, **kwargs) df.open() return df - def _open_rx_diskfile(self, obj_name, policy, frag_index=None): + def _open_rx_diskfile(self, obj_name, policy, frag_index=None, **kwargs): df = self.rx_controller.get_diskfile( self.device, self.partition, 'a', 'c', obj_name, policy=policy, - frag_index=frag_index, open_expired=True) + frag_index=frag_index, open_expired=True, **kwargs) df.open() return df @@ -261,7 +261,7 @@ class TestBaseSsync(BaseTest): return results def _verify_ondisk_files(self, tx_objs, policy, tx_frag_index=None, - rx_frag_index=None): + rx_frag_index=None, **kwargs): """ Verify tx and rx files that should be in sync. :param tx_objs: sender diskfiles @@ -278,7 +278,7 @@ class TestBaseSsync(BaseTest): # this diskfile should have been sync'd, # check rx file is ok rx_df = self._open_rx_diskfile( - o_name, policy, rx_frag_index) + o_name, policy, rx_frag_index, **kwargs) # for EC revert job or replication etags should match match_etag = (tx_frag_index == rx_frag_index) self._verify_diskfile_sync( @@ -453,7 +453,7 @@ class TestSsyncEC(TestBaseSsyncEC): rx_df_mgr, obj_name, policy, t2, (12, 13), commit=False) expected_subreqs['PUT'].append(obj_name) - # o3 on rx has frag at other time and non-durable - PUT required + # o3 on rx has frag at newer time and non-durable - PUT required t3 = next(self.ts_iter) obj_name = 'o3' tx_objs[obj_name] = self._create_ondisk_files( @@ -520,6 +520,91 @@ class TestSsyncEC(TestBaseSsyncEC): self._verify_ondisk_files( tx_objs, policy, frag_index, rx_node_index) + def test_handoff_non_durable_fragment(self): + # test that a sync_revert type job does PUT when the tx is non-durable + policy = POLICIES.default + rx_node_index = frag_index = 0 + tx_node_index = 1 + + # create sender side diskfiles... + tx_objs = {} + rx_objs = {} + tx_df_mgr = self.daemon._df_router[policy] + rx_df_mgr = self.rx_controller._diskfile_router[policy] + + expected_subreqs = defaultdict(list) + + # o1 non-durable on tx and missing on rx + t1 = next(self.ts_iter) # newer non-durable tx .data + obj_name = 'o1' + tx_objs[obj_name] = self._create_ondisk_files( + tx_df_mgr, obj_name, policy, t1, (tx_node_index, rx_node_index,), + commit=False, frag_prefs=[]) + expected_subreqs['PUT'].append(obj_name) + + # o2 non-durable on tx and rx + t2 = next(self.ts_iter) + obj_name = 'o2' + tx_objs[obj_name] = self._create_ondisk_files( + tx_df_mgr, obj_name, policy, t2, (tx_node_index, rx_node_index,), + commit=False, frag_prefs=[]) + rx_objs[obj_name] = self._create_ondisk_files( + rx_df_mgr, obj_name, policy, t2, (rx_node_index,), commit=False, + frag_prefs=[]) + + # o3 durable on tx and missing on rx, to check the include_non_durable + # does not exclude durables + t3 = next(self.ts_iter) + obj_name = 'o3' + tx_objs[obj_name] = self._create_ondisk_files( + tx_df_mgr, obj_name, policy, t3, (tx_node_index, rx_node_index,)) + expected_subreqs['PUT'].append(obj_name) + + suffixes = set() + for diskfiles in tx_objs.values(): + for df in diskfiles: + suffixes.add(os.path.basename(os.path.dirname(df._datadir))) + + # create ssync sender instance...with include_non_durable + job = {'device': self.device, + 'partition': self.partition, + 'policy': policy, + 'frag_index': frag_index} + node = dict(self.rx_node) + sender = ssync_sender.Sender(self.daemon, node, job, suffixes, + include_non_durable=True) + # wrap connection from tx to rx to capture ssync messages... + sender.connect, trace = self.make_connect_wrapper(sender) + + # run the sync protocol... + sender() + + # verify protocol + results = self._analyze_trace(trace) + self.assertEqual(3, len(results['tx_missing'])) + self.assertEqual(2, len(results['rx_missing'])) + self.assertEqual(2, len(results['tx_updates'])) + self.assertFalse(results['rx_updates']) + for subreq in results.get('tx_updates'): + obj = subreq['path'].split('/')[3] + method = subreq['method'] + self.assertTrue(obj in expected_subreqs[method], + 'Unexpected %s subreq for object %s, expected %s' + % (method, obj, expected_subreqs[method])) + expected_subreqs[method].remove(obj) + if method == 'PUT': + expected_body = self._get_object_data( + subreq['path'], frag_index=rx_node_index) + self.assertEqual(expected_body, subreq['body']) + # verify all expected subreqs consumed + for _method, expected in expected_subreqs.items(): + self.assertFalse(expected) + + # verify on disk files... + # tx_objs.pop('o4') # o4 should not have been sync'd + self._verify_ondisk_files( + tx_objs, policy, frag_index, rx_node_index, frag_prefs=[]) + def test_fragment_sync(self): # check that a sync_only type job does call reconstructor to build a # diskfile to send, and continues making progress despite an error diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 674b040b7b..1adf1cac01 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -772,6 +772,8 @@ class TestReceiver(unittest.TestCase): @patch_policies(with_ec_default=True) def test_MISSING_CHECK_missing_durable(self): + # check that local non-durable frag is made durable if remote sends + # same ts for same frag, but only if remote is durable self.controller.logger = mock.MagicMock() self.controller._diskfile_router = diskfile.DiskFileRouter( self.conf, self.controller.logger) @@ -791,8 +793,31 @@ class TestReceiver(unittest.TestCase): 'X-Timestamp': ts1, 'Content-Length': '1'} diskfile.write_metadata(fp, metadata1) + self.assertEqual([ts1 + '#2.data'], os.listdir(object_dir)) # sanity - # make a request - expect no data to be wanted + # offer same non-durable frag - expect no data to be wanted + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC', + 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0', + 'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '2'}, + body=':MISSING_CHECK: START\r\n' + + self.hash1 + ' ' + ts1 + ' durable:no\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = req.get_response(self.controller) + self.assertEqual( + self.body_lines(resp.body), + [b':MISSING_CHECK: START', + b':MISSING_CHECK: END', + b':UPDATES: START', b':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.error.called) + self.assertFalse(self.controller.logger.exception.called) + # the local frag is still not durable... + self.assertEqual([ts1 + '#2.data'], os.listdir(object_dir)) + + # offer same frag but durable - expect no data to be wanted req = swob.Request.blank( '/sda1/1', environ={'REQUEST_METHOD': 'SSYNC', @@ -811,6 +836,8 @@ class TestReceiver(unittest.TestCase): self.assertEqual(resp.status_int, 200) self.assertFalse(self.controller.logger.error.called) self.assertFalse(self.controller.logger.exception.called) + # the local frag is now durable... + self.assertEqual([ts1 + '#2#d.data'], os.listdir(object_dir)) @patch_policies(with_ec_default=True) @mock.patch('swift.obj.diskfile.ECDiskFileWriter.commit') @@ -834,6 +861,7 @@ class TestReceiver(unittest.TestCase): 'X-Timestamp': ts1, 'Content-Length': '1'} diskfile.write_metadata(fp, metadata1) + self.assertEqual([ts1 + '#2.data'], os.listdir(object_dir)) # sanity # make a request with commit disabled - expect data to be wanted req = swob.Request.blank( @@ -881,6 +909,198 @@ class TestReceiver(unittest.TestCase): 'EXCEPTION in ssync.Receiver while attempting commit of', self.controller.logger.exception.call_args[0][0]) + @patch_policies(with_ec_default=True) + def test_MISSING_CHECK_local_non_durable(self): + # check that local non-durable fragment does not prevent other frags + # being wanted from the sender + self.controller.logger = mock.MagicMock() + self.controller._diskfile_router = diskfile.DiskFileRouter( + self.conf, self.controller.logger) + + ts_iter = make_timestamp_iter() + ts1 = next(ts_iter).internal + ts2 = next(ts_iter).internal + ts3 = next(ts_iter).internal + # make non-durable rx disk file at ts2 + object_dir = utils.storage_directory( + os.path.join(self.testdir, 'sda1', + diskfile.get_data_dir(POLICIES[0])), + '1', self.hash1) + utils.mkdirs(object_dir) + fp = open(os.path.join(object_dir, ts2 + '#2.data'), 'w+') + fp.write('1') + fp.flush() + metadata1 = { + 'name': self.name1, + 'X-Timestamp': ts2, + 'Content-Length': '1'} + diskfile.write_metadata(fp, metadata1) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) # sanity + + def do_check(tx_missing_line, expected_rx_missing_lines): + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC', + 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0', + 'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '2'}, + body=':MISSING_CHECK: START\r\n' + + tx_missing_line + '\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = req.get_response(self.controller) + self.assertEqual( + self.body_lines(resp.body), + [b':MISSING_CHECK: START'] + + [l.encode('ascii') for l in expected_rx_missing_lines] + + [b':MISSING_CHECK: END', + b':UPDATES: START', b':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.error.called) + self.assertFalse(self.controller.logger.exception.called) + + # check remote frag is always wanted - older, newer, durable or not... + do_check(self.hash1 + ' ' + ts1 + ' durable:no', + [self.hash1 + ' dm']) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + do_check(self.hash1 + ' ' + ts1 + ' durable:yes', + [self.hash1 + ' dm']) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + do_check(self.hash1 + ' ' + ts1, [self.hash1 + ' dm']) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + do_check(self.hash1 + ' ' + ts3 + ' durable:no', + [self.hash1 + ' dm']) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + do_check(self.hash1 + ' ' + ts3 + ' durable:yes', + [self.hash1 + ' dm']) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + do_check(self.hash1 + ' ' + ts3, [self.hash1 + ' dm']) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + # ... except when at same timestamp + do_check(self.hash1 + ' ' + ts2 + ' durable:no', []) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + # durable remote frag at ts2 will make the local durable.. + do_check(self.hash1 + ' ' + ts2 + ' durable:yes', []) + self.assertEqual([ts2 + '#2#d.data'], os.listdir(object_dir)) + + @patch_policies(with_ec_default=True) + def test_MISSING_CHECK_local_durable(self): + # check that local durable fragment does not prevent newer non-durable + # frags being wanted from the sender + self.controller.logger = mock.MagicMock() + self.controller._diskfile_router = diskfile.DiskFileRouter( + self.conf, self.controller.logger) + + ts_iter = make_timestamp_iter() + ts1 = next(ts_iter).internal + ts2 = next(ts_iter).internal + ts3 = next(ts_iter).internal + # make non-durable rx disk file at ts2 + object_dir = utils.storage_directory( + os.path.join(self.testdir, 'sda1', + diskfile.get_data_dir(POLICIES[0])), + '1', self.hash1) + utils.mkdirs(object_dir) + fp = open(os.path.join(object_dir, ts2 + '#2.data'), 'w+') + fp.write('1') + fp.flush() + metadata1 = { + 'name': self.name1, + 'X-Timestamp': ts2, + 'Content-Length': '1'} + diskfile.write_metadata(fp, metadata1) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) # sanity + + def do_check(tx_missing_line, expected_rx_missing_lines): + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC', + 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0', + 'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '2'}, + body=':MISSING_CHECK: START\r\n' + + tx_missing_line + '\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = req.get_response(self.controller) + self.assertEqual( + self.body_lines(resp.body), + [b':MISSING_CHECK: START'] + + [l.encode('ascii') for l in expected_rx_missing_lines] + + [b':MISSING_CHECK: END', + b':UPDATES: START', b':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.error.called) + self.assertFalse(self.controller.logger.exception.called) + + # check remote frag is always wanted - older, newer, durable or not... + do_check(self.hash1 + ' ' + ts1 + ' durable:no', + [self.hash1 + ' dm']) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + do_check(self.hash1 + ' ' + ts1 + ' durable:yes', + [self.hash1 + ' dm']) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + do_check(self.hash1 + ' ' + ts1, [self.hash1 + ' dm']) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + do_check(self.hash1 + ' ' + ts3 + ' durable:no', + [self.hash1 + ' dm']) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + do_check(self.hash1 + ' ' + ts3 + ' durable:yes', + [self.hash1 + ' dm']) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + do_check(self.hash1 + ' ' + ts3, [self.hash1 + ' dm']) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + # ... except when at same timestamp + do_check(self.hash1 + ' ' + ts2 + ' durable:no', []) + self.assertEqual([ts2 + '#2.data'], os.listdir(object_dir)) + # durable remote frag at ts2 will make the local durable.. + do_check(self.hash1 + ' ' + ts2 + ' durable:yes', []) + self.assertEqual([ts2 + '#2#d.data'], os.listdir(object_dir)) + + @patch_policies(with_ec_default=True) + def test_MISSING_CHECK_local_durable_older_than_remote_non_durable(self): + # check that newer non-durable fragment is wanted + self.controller.logger = mock.MagicMock() + self.controller._diskfile_router = diskfile.DiskFileRouter( + self.conf, self.controller.logger) + + ts_iter = make_timestamp_iter() + ts1 = next(ts_iter).internal + ts2 = next(ts_iter).internal + # make durable rx disk file at ts2 + object_dir = utils.storage_directory( + os.path.join(self.testdir, 'sda1', + diskfile.get_data_dir(POLICIES[0])), + '1', self.hash1) + utils.mkdirs(object_dir) + fp = open(os.path.join(object_dir, ts1 + '#2#d.data'), 'w+') + fp.write('1') + fp.flush() + metadata1 = { + 'name': self.name1, + 'X-Timestamp': ts1, + 'Content-Length': '1'} + diskfile.write_metadata(fp, metadata1) + + # make a request offering non-durable at ts2 + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC', + 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '0', + 'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '2'}, + body=':MISSING_CHECK: START\r\n' + + self.hash1 + ' ' + ts2 + ' durable:no\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = req.get_response(self.controller) + self.assertEqual( + self.body_lines(resp.body), + [b':MISSING_CHECK: START', + (self.hash1 + ' dm').encode('ascii'), + b':MISSING_CHECK: END', + b':UPDATES: START', b':UPDATES: END']) + self.assertEqual(resp.status_int, 200) + self.assertFalse(self.controller.logger.error.called) + self.assertFalse(self.controller.logger.exception.called) + def test_MISSING_CHECK_storage_policy(self): # update router post policy patch self.controller._diskfile_router = diskfile.DiskFileRouter( @@ -1499,6 +1719,7 @@ class TestReceiver(unittest.TestCase): 'X-Object-Meta-Test1: one\r\n' 'Content-Encoding: gzip\r\n' 'Specialty-Header: value\r\n' + 'X-Backend-No-Commit: True\r\n' '\r\n' '1') resp = req.get_response(self.controller) @@ -1520,9 +1741,11 @@ class TestReceiver(unittest.TestCase): 'X-Object-Meta-Test1': 'one', 'Content-Encoding': 'gzip', 'Specialty-Header': 'value', + 'X-Backend-No-Commit': 'True', 'Host': 'localhost:80', 'X-Backend-Storage-Policy-Index': '0', 'X-Backend-Replication': 'True', + # note: Etag and X-Backend-No-Commit not in replication-headers 'X-Backend-Replication-Headers': ( 'content-length x-timestamp x-object-meta-test1 ' 'content-encoding specialty-header')}) @@ -1530,7 +1753,8 @@ class TestReceiver(unittest.TestCase): def test_UPDATES_PUT_replication_headers(self): self.controller.logger = mock.MagicMock() - # sanity check - regular PUT will not persist Specialty-Header + # sanity check - regular PUT will not persist Specialty-Header or + # X-Backend-No-Commit req = swob.Request.blank( '/sda1/0/a/c/o1', body='1', environ={'REQUEST_METHOD': 'PUT'}, @@ -1540,6 +1764,7 @@ class TestReceiver(unittest.TestCase): 'X-Timestamp': '1364456113.12344', 'X-Object-Meta-Test1': 'one', 'Content-Encoding': 'gzip', + 'X-Backend-No-Commit': 'False', 'Specialty-Header': 'value'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 201) @@ -1547,6 +1772,7 @@ class TestReceiver(unittest.TestCase): 'sda1', '0', 'a', 'c', 'o1', POLICIES.default) df.open() self.assertFalse('Specialty-Header' in df.get_metadata()) + self.assertFalse('X-Backend-No-Commit' in df.get_metadata()) # an SSYNC request can override PUT header filtering... req = swob.Request.blank( @@ -1561,6 +1787,7 @@ class TestReceiver(unittest.TestCase): 'X-Timestamp: 1364456113.12344\r\n' 'X-Object-Meta-Test1: one\r\n' 'Content-Encoding: gzip\r\n' + 'X-Backend-No-Commit: False\r\n' 'Specialty-Header: value\r\n' '\r\n' '1') @@ -1572,7 +1799,7 @@ class TestReceiver(unittest.TestCase): self.assertEqual(resp.status_int, 200) # verify diskfile has metadata permitted by replication headers - # including Specialty-Header + # including Specialty-Header, but not Etag or X-Backend-No-Commit df = self.controller.get_diskfile( 'sda1', '0', 'a', 'c', 'o2', POLICIES.default) df.open() @@ -2264,7 +2491,8 @@ class TestModuleMethods(unittest.TestCase): expected = dict(object_hash=object_hash, ts_meta=t_data, ts_data=t_data, - ts_ctype=t_data) + ts_ctype=t_data, + durable=True) self.assertEqual(expected, ssync_receiver.decode_missing(msg.encode('ascii'))) @@ -2273,7 +2501,8 @@ class TestModuleMethods(unittest.TestCase): expected = dict(object_hash=object_hash, ts_data=t_data, ts_meta=t_meta, - ts_ctype=t_data) + ts_ctype=t_data, + durable=True) self.assertEqual(expected, ssync_receiver.decode_missing(msg.encode('ascii'))) @@ -2283,7 +2512,8 @@ class TestModuleMethods(unittest.TestCase): expected = dict(object_hash=object_hash, ts_data=t_data, ts_meta=t_meta, - ts_ctype=t_ctype) + ts_ctype=t_ctype, + durable=True) self.assertEqual( expected, ssync_receiver.decode_missing(msg.encode('ascii'))) @@ -2298,7 +2528,8 @@ class TestModuleMethods(unittest.TestCase): expected = dict(object_hash=object_hash, ts_data=t_data, ts_meta=t_meta, - ts_ctype=t_data) + ts_ctype=t_data, + durable=True) self.assertEqual( expected, ssync_receiver.decode_missing(msg.encode('ascii'))) @@ -2307,7 +2538,8 @@ class TestModuleMethods(unittest.TestCase): expected = dict(object_hash=object_hash, ts_meta=t_data, ts_data=t_data, - ts_ctype=t_data) + ts_ctype=t_data, + durable=True) self.assertEqual(expected, ssync_receiver.decode_missing(msg.encode('ascii'))) @@ -2318,7 +2550,8 @@ class TestModuleMethods(unittest.TestCase): expected = dict(object_hash=object_hash, ts_meta=t_meta, ts_data=t_data, - ts_ctype=t_data) + ts_ctype=t_data, + durable=True) self.assertEqual( expected, ssync_receiver.decode_missing(msg.encode('ascii'))) @@ -2329,10 +2562,45 @@ class TestModuleMethods(unittest.TestCase): expected = dict(object_hash=object_hash, ts_meta=t_meta, ts_data=t_data, - ts_ctype=t_data) + ts_ctype=t_data, + durable=True) self.assertEqual(expected, ssync_receiver.decode_missing(msg.encode('ascii'))) + # not durable + def check_non_durable(durable_val): + msg = '%s %s m:%x,durable:%s' % (object_hash, + t_data.internal, + d_meta_data, + durable_val) + expected = dict(object_hash=object_hash, + ts_meta=t_meta, + ts_data=t_data, + ts_ctype=t_data, + durable=False) + self.assertEqual( + expected, ssync_receiver.decode_missing(msg.encode('ascii'))) + check_non_durable('no') + check_non_durable('false') + check_non_durable('False') + + # explicit durable (as opposed to True by default) + def check_durable(durable_val): + msg = '%s %s m:%x,durable:%s' % (object_hash, + t_data.internal, + d_meta_data, + durable_val) + expected = dict(object_hash=object_hash, + ts_meta=t_meta, + ts_data=t_data, + ts_ctype=t_data, + durable=True) + self.assertEqual( + expected, ssync_receiver.decode_missing(msg.encode('ascii'))) + check_durable('yes') + check_durable('true') + check_durable('True') + def test_encode_wanted(self): ts_iter = make_timestamp_iter() old_t_data = next(ts_iter) diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index 4324101d5e..f34459f42c 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -55,7 +55,7 @@ class NullBufferedHTTPConnection(object): class FakeResponse(ssync_sender.SsyncBufferedHTTPResponse): - def __init__(self, chunk_body=''): + def __init__(self, chunk_body='', headers=None): self.status = 200 self.close_called = False if not six.PY2: @@ -65,6 +65,7 @@ class FakeResponse(ssync_sender.SsyncBufferedHTTPResponse): b'%x\r\n%s\r\n0\r\n\r\n' % (len(chunk_body), chunk_body)) self.ssync_response_buffer = b'' self.ssync_response_chunk_left = 0 + self.headers = headers or {} def read(self, *args, **kwargs): return b'' @@ -72,6 +73,12 @@ class FakeResponse(ssync_sender.SsyncBufferedHTTPResponse): def close(self): self.close_called = True + def getheader(self, header_name, default=None): + return str(self.headers.get(header_name, default)) + + def getheaders(self): + return self.headers.items() + class FakeConnection(object): @@ -380,6 +387,56 @@ class TestSender(BaseTest): method_name, mock_method.mock_calls, expected_calls)) + def _do_test_connect_include_non_durable(self, + include_non_durable, + resp_headers): + # construct sender and make connect call + node = dict(replication_ip='1.2.3.4', replication_port=5678, + device='sda1', backend_index=0) + job = dict(partition='9', policy=POLICIES[1]) + sender = ssync_sender.Sender(self.daemon, node, job, None, + include_non_durable=include_non_durable) + self.assertEqual(include_non_durable, sender.include_non_durable) + with mock.patch( + 'swift.obj.ssync_sender.SsyncBufferedHTTPConnection' + ) as mock_conn_class: + mock_conn = mock_conn_class.return_value + mock_conn.getresponse.return_value = FakeResponse('', resp_headers) + sender.connect() + mock_conn_class.assert_called_once_with('1.2.3.4:5678') + return sender + + def test_connect_legacy_receiver(self): + sender = self._do_test_connect_include_non_durable(False, {}) + self.assertFalse(sender.include_non_durable) + warnings = self.daemon_logger.get_lines_for_level('warning') + self.assertEqual([], warnings) + + def test_connect_upgraded_receiver(self): + resp_hdrs = {'x-backend-accept-no-commit': 'True'} + sender = self._do_test_connect_include_non_durable(False, resp_hdrs) + # 'x-backend-accept-no-commit' in response does not override + # sender.include_non_durable + self.assertFalse(sender.include_non_durable) + warnings = self.daemon_logger.get_lines_for_level('warning') + self.assertEqual([], warnings) + + def test_connect_legacy_receiver_include_non_durable(self): + sender = self._do_test_connect_include_non_durable(True, {}) + # no 'x-backend-accept-no-commit' in response, + # sender.include_non_durable has been overridden + self.assertFalse(sender.include_non_durable) + warnings = self.daemon_logger.get_lines_for_level('warning') + self.assertEqual(['ssync receiver 1.2.3.4:5678 does not accept ' + 'non-durable fragments'], warnings) + + def test_connect_upgraded_receiver_include_non_durable(self): + resp_hdrs = {'x-backend-accept-no-commit': 'True'} + sender = self._do_test_connect_include_non_durable(True, resp_hdrs) + self.assertTrue(sender.include_non_durable) + warnings = self.daemon_logger.get_lines_for_level('warning') + self.assertEqual([], warnings) + def test_call(self): def patch_sender(sender, available_map, send_map): connection = FakeConnection() @@ -1465,7 +1522,7 @@ class TestSender(BaseTest): exc = err self.assertEqual(str(exc), '0.01 seconds: send_put chunk') - def _check_send_put(self, obj_name, meta_value): + def _check_send_put(self, obj_name, meta_value, durable=True): ts_iter = make_timestamp_iter() t1 = next(ts_iter) body = b'test' @@ -1473,7 +1530,8 @@ class TestSender(BaseTest): u'Unicode-Meta-Name': meta_value} df = self._make_open_diskfile(obj=obj_name, body=body, timestamp=t1, - extra_metadata=extra_metadata) + extra_metadata=extra_metadata, + commit=durable) expected = dict(df.get_metadata()) expected['body'] = body if six.PY2 else body.decode('ascii') expected['chunk_size'] = len(body) @@ -1481,14 +1539,17 @@ class TestSender(BaseTest): wire_meta = meta_value if six.PY2 else meta_value.encode('utf8') path = six.moves.urllib.parse.quote(expected['name']) expected['path'] = path - expected['length'] = format(145 + len(path) + len(wire_meta), 'x') + no_commit = '' if durable else 'X-Backend-No-Commit: True\r\n' + expected['no_commit'] = no_commit + length = 145 + len(path) + len(wire_meta) + len(no_commit) + expected['length'] = format(length, 'x') # .meta file metadata is not included in expected for data only PUT t2 = next(ts_iter) metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'} df.write_metadata(metadata) df.open() connection = FakeConnection() - self.sender.send_put(connection, path, df) + self.sender.send_put(connection, path, df, durable=durable) expected = ( '%(length)s\r\n' 'PUT %(path)s\r\n' @@ -1496,6 +1557,7 @@ class TestSender(BaseTest): 'ETag: %(ETag)s\r\n' 'Some-Other-Header: value\r\n' 'Unicode-Meta-Name: %(meta)s\r\n' + '%(no_commit)s' 'X-Timestamp: %(X-Timestamp)s\r\n' '\r\n' '\r\n' @@ -1508,6 +1570,9 @@ class TestSender(BaseTest): def test_send_put(self): self._check_send_put('o', 'meta') + def test_send_put_non_durable(self): + self._check_send_put('o', 'meta', durable=False) + def test_send_put_unicode(self): if six.PY2: self._check_send_put( @@ -1575,6 +1640,174 @@ class TestSender(BaseTest): self.assertTrue(connection.closed) +@patch_policies(with_ec_default=True) +class TestSenderEC(BaseTest): + def setUp(self): + skip_if_no_xattrs() + super(TestSenderEC, self).setUp() + self.daemon_logger = debug_logger('test-ssync-sender') + self.daemon = ObjectReplicator(self.daemon_conf, + self.daemon_logger) + job = {'policy': POLICIES.legacy} # sufficient for Sender.__init__ + self.sender = ssync_sender.Sender(self.daemon, None, job, None) + + def test_missing_check_non_durable(self): + # sender has durable and non-durable data files for frag index 2 + ts_iter = make_timestamp_iter() + frag_index = 2 + device = 'dev' + part = '9' + object_parts = ('a', 'c', 'o') + object_hash = utils.hash_path(*object_parts) + + # older durable data file at t1 + t1 = next(ts_iter) + df_durable = self._make_diskfile( + device, part, *object_parts, timestamp=t1, policy=POLICIES.default, + frag_index=frag_index, commit=True, verify=False) + with df_durable.open(): + self.assertEqual(t1, df_durable.durable_timestamp) # sanity + + # newer non-durable data file at t2 + t2 = next(ts_iter) + df_non_durable = self._make_diskfile( + device, part, *object_parts, timestamp=t2, policy=POLICIES.default, + frag_index=frag_index, commit=False, frag_prefs=[]) + with df_non_durable.open(): + self.assertNotEqual(df_non_durable.data_timestamp, + df_non_durable.durable_timestamp) # sanity + + self.sender.job = { + 'device': device, + 'partition': part, + 'policy': POLICIES.default, + 'frag_index': frag_index, + } + self.sender.node = {} + + # First call missing check with sender in default mode - expect the + # non-durable frag to be ignored + response = FakeResponse( + chunk_body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n') + connection = FakeConnection() + available_map, send_map = self.sender.missing_check(connection, + response) + self.assertEqual( + b''.join(connection.sent), + b'17\r\n:MISSING_CHECK: START\r\n\r\n' + b'33\r\n' + object_hash.encode('utf8') + + b' ' + t1.internal.encode('utf8') + b'\r\n\r\n' + b'15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual( + available_map, {object_hash: {'ts_data': t1, 'durable': True}}) + + # Now make sender send non-durables and repeat missing_check - this + # time the durable is ignored and the non-durable is included in + # available_map (but NOT sent to receiver) + self.sender.include_non_durable = True + response = FakeResponse( + chunk_body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n') + connection = FakeConnection() + available_map, send_map = self.sender.missing_check(connection, + response) + self.assertEqual( + b''.join(connection.sent), + b'17\r\n:MISSING_CHECK: START\r\n\r\n' + b'41\r\n' + object_hash.encode('utf8') + + b' ' + t2.internal.encode('utf8') + b' durable:False\r\n\r\n' + b'15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual( + available_map, {object_hash: {'ts_data': t2, 'durable': False}}) + + # Finally, purge the non-durable frag and repeat missing-check to + # confirm that the durable frag is now found and sent to receiver + df_non_durable.purge(t2, frag_index) + response = FakeResponse( + chunk_body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n') + connection = FakeConnection() + available_map, send_map = self.sender.missing_check(connection, + response) + self.assertEqual( + b''.join(connection.sent), + b'17\r\n:MISSING_CHECK: START\r\n\r\n' + b'33\r\n' + object_hash.encode('utf8') + + b' ' + t1.internal.encode('utf8') + b'\r\n\r\n' + b'15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual( + available_map, {object_hash: {'ts_data': t1, 'durable': True}}) + + def test_updates_put_non_durable(self): + # sender has durable and non-durable data files for frag index 2 and is + # initialised to include non-durables + ts_iter = make_timestamp_iter() + frag_index = 2 + device = 'dev' + part = '9' + object_parts = ('a', 'c', 'o') + object_hash = utils.hash_path(*object_parts) + + # older durable data file + t1 = next(ts_iter) + df_durable = self._make_diskfile( + device, part, *object_parts, timestamp=t1, policy=POLICIES.default, + frag_index=frag_index, commit=True, verify=False) + with df_durable.open(): + self.assertEqual(t1, df_durable.durable_timestamp) # sanity + + # newer non-durable data file + t2 = next(ts_iter) + df_non_durable = self._make_diskfile( + device, part, *object_parts, timestamp=t2, policy=POLICIES.default, + frag_index=frag_index, commit=False, frag_prefs=[]) + with df_non_durable.open(): + self.assertNotEqual(df_non_durable.data_timestamp, + df_non_durable.durable_timestamp) # sanity + + # pretend receiver requested data only + send_map = {object_hash: {'data': True}} + + def check_updates(include_non_durable, expected_durable_kwarg): + # call updates and check that the call to send_put is as expected + self.sender.include_non_durable = include_non_durable + self.sender.job = { + 'device': device, + 'partition': part, + 'policy': POLICIES.default, + 'frag_index': frag_index, + } + self.sender.node = {} + self.sender.send_delete = mock.MagicMock() + self.sender.send_put = mock.MagicMock() + self.sender.send_post = mock.MagicMock() + response = FakeResponse( + chunk_body=':UPDATES: START\r\n:UPDATES: END\r\n') + connection = FakeConnection() + + self.sender.updates(connection, response, send_map) + + self.assertEqual(self.sender.send_delete.mock_calls, []) + self.assertEqual(self.sender.send_post.mock_calls, []) + self.assertEqual(1, len(self.sender.send_put.mock_calls)) + args, kwargs = self.sender.send_put.call_args + connection, path, df_non_durable = args + self.assertEqual(path, '/a/c/o') + self.assertEqual({'durable': expected_durable_kwarg}, kwargs) + # note that the put line isn't actually sent since we mock + # send_put; send_put is tested separately. + self.assertEqual( + b''.join(connection.sent), + b'11\r\n:UPDATES: START\r\n\r\n' + b'f\r\n:UPDATES: END\r\n\r\n') + + # note: we never expect the (False, False) case + check_updates(include_non_durable=False, expected_durable_kwarg=True) + # non-durable frag is newer so is sent + check_updates(include_non_durable=True, expected_durable_kwarg=False) + # remove the newer non-durable frag so that the durable frag is sent... + df_non_durable.purge(t2, frag_index) + check_updates(include_non_durable=True, expected_durable_kwarg=True) + + class TestModuleMethods(unittest.TestCase): def test_encode_missing(self): object_hash = '9d41d8cd98f00b204e9800998ecf0abc' @@ -1618,15 +1851,35 @@ class TestModuleMethods(unittest.TestCase): expected.encode('ascii'), ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type)) + # optional durable param + expected = ('%s %s m:%x,t:%x' + % (object_hash, t_data.internal, d_meta_data, d_type_data)) + self.assertEqual( + expected.encode('ascii'), + ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type, + durable=None)) + expected = ('%s %s m:%x,t:%x,durable:False' + % (object_hash, t_data.internal, d_meta_data, d_type_data)) + self.assertEqual( + expected.encode('ascii'), + ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type, + durable=False)) + expected = ('%s %s m:%x,t:%x' + % (object_hash, t_data.internal, d_meta_data, d_type_data)) + self.assertEqual( + expected.encode('ascii'), + ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type, + durable=True)) + # test encode and decode functions invert expected = {'object_hash': object_hash, 'ts_meta': t_meta, - 'ts_data': t_data, 'ts_ctype': t_type} + 'ts_data': t_data, 'ts_ctype': t_type, 'durable': False} msg = ssync_sender.encode_missing(**expected) actual = ssync_receiver.decode_missing(msg) self.assertEqual(expected, actual) expected = {'object_hash': object_hash, 'ts_meta': t_meta, - 'ts_data': t_meta, 'ts_ctype': t_meta} + 'ts_data': t_meta, 'ts_ctype': t_meta, 'durable': True} msg = ssync_sender.encode_missing(**expected) actual = ssync_receiver.decode_missing(msg) self.assertEqual(expected, actual)