EC - eliminate .durable files

Instead of using a separate .durable file to indicate
the durable status of a .data file, rename the .data
to include a durable marker in the filename. This saves
one inode for every EC fragment archive.

An EC policy PUT will, as before, first rename a temp
file to:

   <timestamp>#<frag_index>.data

but now, when the object is committed, that file will be
renamed:

   <timestamp>#<frag_index>#d.data

with the '#d' suffix marking the data file as durable.

Diskfile suffix hashing returns the same result when the
new durable-data filename or the legacy durable file is
found in an object directory. A fragment archive that has
been created on an upgraded object server will therefore
appear to be in the same state, as far as the consistency
engine is concerned, as the same fragment archive created
on an older object server.

Since legacy .durable files will still exist in deployed
clusters, many of the unit tests scenarios have been
duplicated for both new durable-data filenames and legacy
durable files.

Change-Id: I6f1f62d47be0b0ac7919888c77480a636f11f607
This commit is contained in:
Alistair Coles 2016-08-09 16:09:38 +01:00
parent be1cd1ba40
commit b13b49a27c
13 changed files with 1737 additions and 988 deletions

View File

@ -317,35 +317,44 @@ EC archives are stored on disk in their respective objects-N directory based on
their policy index. See :doc:`overview_policies` for details on per policy
directory information.
The actual names on disk of EC archives also have one additional piece of data
encoded in the filename, the fragment archive index.
In addition to the object timestamp, the filenames of EC archives encode other
information related to the archive:
Each storage policy now must include a transformation function that diskfile
will use to build the filename to store on disk. The functions are implemented
in the diskfile module as policy specific sub classes ``DiskFileManager``.
* The fragment archive index. This is required for a few reasons. For one, it
allows us to store fragment archives of different indexes on the same storage
node which is not typical however it is possible in many circumstances.
Without unique filenames for the different EC archive files in a set, we
would be at risk of overwriting one archive of index `n` with another of
index `m` in some scenarios.
This is required for a few reasons. For one, it allows us to store fragment
archives of different indexes on the same storage node which is not typical
however it is possible in many circumstances. Without unique filenames for the
different EC archive files in a set, we would be at risk of overwriting one
archive of index n with another of index m in some scenarios.
The transformation function for the replication policy is simply a NOP. For
reconstruction, the index is appended to the filename just before the .data
extension. An example filename for a fragment archive storing the 5th fragment
would like this::
The index is appended to the filename just before the ``.data`` extension.
For example, the filename for a fragment archive storing the 5th fragment
would be::
1418673556.92690#5.data
An additional file is also included for Erasure Code policies called the
``.durable`` file. Its meaning will be covered in detail later, however, its on-
disk format does not require the name transformation function that was just
covered. The .durable for the example above would simply look like this::
* The durable state of the archive. The meaning of this will be described in
more detail later, but a fragment archive that is considered durable has an
additional ``#d`` string included in its filename immediately before the
``.data`` extension. For example::
1418673556.92690.durable
1418673556.92690#5#d.data
A policy-specific transformation function is therefore used to build the
archive filename. These functions are implemented in the diskfile module as
methods of policy specific sub classes of ``BaseDiskFileManager``.
The transformation function for the replication policy is simply a NOP.
.. note::
In older versions the durable state of an archive was represented by an
additional file called the ``.durable`` file instead of the ``#d``
substring in the ``.data`` filename. The ``.durable`` for the example above
would be::
1418673556.92690.durable
And it would be found alongside every fragment specific .data file following a
100% successful PUT operation.
Proxy Server
------------
@ -393,21 +402,31 @@ communicate back to the storage nodes once it has confirmation that a quorum of
fragment archives in the set have been written.
For the first phase of the conversation the proxy requires a quorum of
`ec_ndata + 1` fragment archives to be successfully put to storage nodes.
This ensures that the object could still be reconstructed even if one of the
fragment archives becomes unavailable. During the second phase of the
conversation the proxy communicates a confirmation to storage nodes that the
fragment archive quorum has been achieved. This causes the storage node to
create a `ts.durable` file at timestamp `ts` which acts as an indicator of
the last known durable set of fragment archives for a given object. The
presence of a `ts.durable` file means, to the object server, `there is a set
of ts.data files that are durable at timestamp ts`.
`ec_ndata + 1` fragment archives to be successfully put to storage nodes. This
ensures that the object could still be reconstructed even if one of the
fragment archives becomes unavailable. As described above, each fragment
archive file is named::
<ts>#<frag_index>.data
where ``ts`` is the timestamp and ``frag_index`` is the fragment archive index.
During the second phase of the conversation the proxy communicates a
confirmation to storage nodes that the fragment archive quorum has been
achieved. This causes each storage node to rename the fragment archive written
in the first phase of the conversation to include the substring ``#d`` in its
name::
<ts>#<frag_index>#d.data
This indicates to the object server that this fragment archive is `durable` and
that there is a set of data files that are durable at timestamp ``ts``.
For the second phase of the conversation the proxy requires a quorum of
`ec_ndata + 1` successful commits on storage nodes. This ensures that there are
sufficient committed fragment archives for the object to be reconstructed even
if one becomes unavailable. The reconstructor ensures that `.durable` files are
replicated on storage nodes where they may be missing.
if one becomes unavailable. The reconstructor ensures that the durable state is
replicated on storage nodes where it may be missing.
Note that the completion of the commit phase of the conversation
is also a signal for the object server to go ahead and immediately delete older
@ -423,9 +442,9 @@ The basic flow looks like this:
data/metadata write, send a 1st-phase response to proxy.
* Upon quorum of storage nodes responses, the proxy initiates 2nd-phase by
sending commit confirmations to object servers.
* Upon receipt of commit message, object servers store a 0-byte data file as
`<timestamp>.durable` indicating successful PUT, and send a final response to
the proxy server.
* Upon receipt of commit message, object servers rename ``.data`` files to
include the ``#d`` substring, indicating successful PUT, and send a final
response to the proxy server.
* The proxy waits for `ec_ndata + 1` object servers to respond with a
success (2xx) status before responding to the client with a successful
status.
@ -446,24 +465,25 @@ Here is a high level example of what the conversation looks like::
Content-MD5: <footer_meta_cksum>
<footer_meta>
--MIMEboundary
<object server writes data, metadata>
<object server writes data, metadata to <ts>#<frag_index>.data file>
obj: 100 Continue
<quorum>
proxy: X-Document: put commit
commit_confirmation
--MIMEboundary--
<object server writes ts.durable state>
<object server renames <ts>#<frag_index>.data to <ts>#<frag_index>#d.data>
obj: 20x
<proxy waits to receive >=2 2xx responses>
proxy: 2xx -> client
A few key points on the .durable file:
A few key points on the durable state of a fragment archive:
* The .durable file means \"the matching .data file for this has sufficient
fragment archives somewhere, committed, to reconstruct the object\".
* A durable fragment archive means that there exist sufficient other fragment
archives elsewhere in the cluster (durable and/or non-durable) to reconstruct
the object.
* When a proxy does a GET, it will require at least one object server to
respond with a fragment archive that has a matching `.durable` file before
reconstructing and returning the object to the client.
respond with a fragment archive is durable before reconstructing and
returning the object to the client.
Partial PUT Failures
====================
@ -471,10 +491,9 @@ Partial PUT Failures
A partial PUT failure has a few different modes. In one scenario the Proxy
Server is alive through the entire PUT conversation. This is a very
straightforward case. The client will receive a good response if and only if a
quorum of fragment archives were successfully landed on their storage nodes. In
this case the Reconstructor will discover the missing fragment archives, perform
a reconstruction and deliver fragment archives and their matching .durable files
to the nodes.
quorum of fragment archives were successfully landed on their storage nodes.
In this case the Reconstructor will discover the missing fragment archives,
perform a reconstruction and deliver those fragment archives to their nodes.
The more interesting case is what happens if the proxy dies in the middle of a
conversation. If it turns out that a quorum had been met and the commit phase
@ -499,7 +518,7 @@ implement the high level steps described earlier:
#. The proxy server makes simultaneous requests to `ec_ndata` primary object
server nodes with goal of finding a set of `ec_ndata` distinct EC archives
at the same timestamp, and an indication from at least one object server
that a `<timestamp>.durable` file exists for that timestamp. If this goal is
that a durable fragment archive exists for that timestamp. If this goal is
not achieved with the first `ec_ndata` requests then the proxy server
continues to issue requests to the remaining primary nodes and then handoff
nodes.
@ -510,12 +529,12 @@ implement the high level steps described earlier:
response since each EC archive's metadata is valid only for that archive.
#. The proxy streams the decoded data it has back to the client.
Note that the proxy does not require all objects servers to have a `.durable`
file for the EC archive that they return in response to a GET. The proxy
will be satisfied if just one object server has a `.durable` file at the same
timestamp as EC archives returned from other object servers. This means
that the proxy can successfully GET an object that had missing `.durable` files
when it was PUT (i.e. a partial PUT failure occurred).
Note that the proxy does not require all objects servers to have a durable
fragment archive to return in response to a GET. The proxy will be satisfied if
just one object server has a durable fragment archive at the same timestamp as
EC archives returned from other object servers. This means that the proxy can
successfully GET an object that had missing durable state on some nodes when it
was PUT (i.e. a partial PUT failure occurred).
Note also that an object server may inform the proxy server that it has more
than one EC archive for different timestamps and/or fragment indexes, which may
@ -541,12 +560,11 @@ which includes things like the entire object etag.
DiskFile
========
Erasure code uses subclassed ``ECDiskFile``, ``ECDiskFileWriter``,
Erasure code policies use subclassed ``ECDiskFile``, ``ECDiskFileWriter``,
``ECDiskFileReader`` and ``ECDiskFileManager`` to implement EC specific
handling of on disk files. This includes things like file name manipulation to
include the fragment index in the filename, determination of valid .data files
based on .durable presence, construction of EC specific hashes.pkl file to
include fragment index information, etc., etc.
include the fragment index and durable state in the filename, construction of
EC specific ``hashes.pkl`` file to include fragment index information, etc.
Metadata
--------

View File

@ -998,7 +998,7 @@ class BaseDiskFileManager(object):
def _get_hashes(self, partition_path, recalculate=None, do_listdir=False,
reclaim_age=None):
"""
Get a list of hashes for the suffix dir. do_listdir causes it to
Get hashes for each suffix dir in a partition. do_listdir causes it to
mistrust the hash cache for suffix existence at the (unexpectedly high)
cost of a listdir. reclaim_age is just passed on to hash_suffix.
@ -2572,48 +2572,58 @@ class ECDiskFileReader(BaseDiskFileReader):
class ECDiskFileWriter(BaseDiskFileWriter):
def _finalize_durable(self, durable_file_path):
def _finalize_durable(self, data_file_path, durable_data_file_path):
exc = None
try:
try:
with open(durable_file_path, 'wb') as _fp:
fsync(_fp.fileno())
os.rename(data_file_path, durable_data_file_path)
fsync_dir(self._datadir)
except (OSError, IOError) as err:
if err.errno not in (errno.ENOSPC, errno.EDQUOT):
# re-raise to catch all handler
raise
msg = (_('No space left on device for %(file)s (%(err)s)') %
{'file': durable_file_path, 'err': err})
self.manager.logger.error(msg)
exc = DiskFileNoSpace(str(err))
params = {'file': durable_data_file_path, 'err': err}
self.manager.logger.exception(
_('No space left on device for %(file)s (%(err)s)'),
params)
exc = DiskFileNoSpace(
'No space left on device for %(file)s (%(err)s)' % params)
else:
try:
self.manager.cleanup_ondisk_files(self._datadir)['files']
except OSError as os_err:
self.manager.logger.exception(
_('Problem cleaning up %(datadir)s (%(err)s)') %
_('Problem cleaning up %(datadir)s (%(err)s)'),
{'datadir': self._datadir, 'err': os_err})
except Exception as err:
msg = (_('Problem writing durable state file %(file)s (%(err)s)') %
{'file': durable_file_path, 'err': err})
self.manager.logger.exception(msg)
exc = DiskFileError(msg)
params = {'file': durable_data_file_path, 'err': err}
self.manager.logger.exception(
_('Problem making data file durable %(file)s (%(err)s)'),
params)
exc = DiskFileError(
'Problem making data file durable %(file)s (%(err)s)' % params)
if exc:
raise exc
def commit(self, timestamp):
"""
Finalize put by writing a timestamp.durable file for the object. We
do this for EC policy because it requires a 2-phase put commit
confirmation.
Finalize put by renaming the object data file to include a durable
marker. We do this for EC policy because it requires a 2-phase put
commit confirmation.
:param timestamp: object put timestamp, an instance of
:class:`~swift.common.utils.Timestamp`
:raises DiskFileError: if the diskfile frag_index has not been set
(either during initialisation or a call to put())
"""
durable_file_path = os.path.join(
self._datadir, timestamp.internal + '.durable')
tpool_reraise(self._finalize_durable, durable_file_path)
data_file_path = join(
self._datadir, self.manager.make_on_disk_filename(
timestamp, '.data', self._diskfile._frag_index))
durable_data_file_path = os.path.join(
self._datadir, self.manager.make_on_disk_filename(
timestamp, '.data', self._diskfile._frag_index, durable=True))
tpool_reraise(
self._finalize_durable, data_file_path, durable_data_file_path)
def put(self, metadata):
"""
@ -2631,7 +2641,9 @@ class ECDiskFileWriter(BaseDiskFileWriter):
# sure that the fragment index is included in object sysmeta.
fi = metadata.setdefault('X-Object-Sysmeta-Ec-Frag-Index',
self._diskfile._frag_index)
# defer cleanup until commit() writes .durable
fi = self.manager.validate_fragment_index(fi)
self._diskfile._frag_index = fi
# defer cleanup until commit() writes makes diskfile durable
cleanup = False
super(ECDiskFileWriter, self)._put(metadata, cleanup, frag_index=fi)
@ -2746,13 +2758,17 @@ class ECDiskFile(BaseDiskFile):
:param frag_index: fragment archive index, must be
a whole number or None.
"""
exts = ['.ts']
# when frag_index is None it's not possible to build a data file name
purge_file = self.manager.make_on_disk_filename(
timestamp, ext='.ts')
remove_file(os.path.join(self._datadir, purge_file))
if frag_index is not None:
exts.append('.data')
for ext in exts:
# data file may or may not be durable so try removing both filename
# possibilities
purge_file = self.manager.make_on_disk_filename(
timestamp, ext=ext, frag_index=frag_index)
timestamp, ext='.data', frag_index=frag_index)
remove_file(os.path.join(self._datadir, purge_file))
purge_file = self.manager.make_on_disk_filename(
timestamp, ext='.data', frag_index=frag_index, durable=True)
remove_file(os.path.join(self._datadir, purge_file))
self.manager.invalidate_hash(dirname(self._datadir))
@ -2779,7 +2795,7 @@ class ECDiskFileManager(BaseDiskFileManager):
return frag_index
def make_on_disk_filename(self, timestamp, ext=None, frag_index=None,
ctype_timestamp=None, *a, **kw):
ctype_timestamp=None, durable=False, *a, **kw):
"""
Returns the EC specific filename for given timestamp.
@ -2791,6 +2807,7 @@ class ECDiskFileManager(BaseDiskFileManager):
only, must be a whole number.
:param ctype_timestamp: an optional content-type timestamp, an instance
of :class:`~swift.common.utils.Timestamp`
:param durable: if True then include a durable marker in data filename.
:returns: a file name
:raises DiskFileError: if ext=='.data' and the kwarg frag_index is not
a whole number
@ -2801,7 +2818,9 @@ class ECDiskFileManager(BaseDiskFileManager):
# on the same node in certain situations
frag_index = self.validate_fragment_index(frag_index)
rv = timestamp.internal + '#' + str(frag_index)
return '%s%s' % (rv, ext or '')
if durable:
rv += '#d'
return '%s%s' % (rv, ext)
return super(ECDiskFileManager, self).make_on_disk_filename(
timestamp, ext, ctype_timestamp, *a, **kw)
@ -2809,10 +2828,11 @@ class ECDiskFileManager(BaseDiskFileManager):
"""
Returns timestamp(s) and other info extracted from a policy specific
file name. For EC policy the data file name includes a fragment index
which must be stripped off to retrieve the timestamp.
and possibly a durable marker, both of which which must be stripped off
to retrieve the timestamp.
:param filename: the file name including extension
:returns: a dict, with keys for timestamp, frag_index, ext and
:returns: a dict, with keys for timestamp, frag_index, durable, ext and
ctype_timestamp:
* timestamp is a :class:`~swift.common.utils.Timestamp`
@ -2820,7 +2840,9 @@ class ECDiskFileManager(BaseDiskFileManager):
* ctype_timestamp is a :class:`~swift.common.utils.Timestamp` or
None for .meta files, otherwise None
* ext is a string, the file extension including the leading dot or
the empty string if the filename has no extension.
the empty string if the filename has no extension
* durable is a boolean that is True if the filename is a data file
that includes a durable marker
:raises DiskFileError: if any part of the filename is not able to be
validated.
@ -2828,7 +2850,7 @@ class ECDiskFileManager(BaseDiskFileManager):
frag_index = None
float_frag, ext = splitext(filename)
if ext == '.data':
parts = float_frag.split('#', 1)
parts = float_frag.split('#')
try:
timestamp = Timestamp(parts[0])
except ValueError:
@ -2842,11 +2864,16 @@ class ECDiskFileManager(BaseDiskFileManager):
# expect validate_fragment_index raise DiskFileError
pass
frag_index = self.validate_fragment_index(frag_index)
try:
durable = parts[2] == 'd'
except IndexError:
durable = False
return {
'timestamp': timestamp,
'frag_index': frag_index,
'ext': ext,
'ctype_timestamp': None
'ctype_timestamp': None,
'durable': durable
}
rv = super(ECDiskFileManager, self).parse_on_disk_filename(filename)
rv['frag_index'] = None
@ -2855,7 +2882,8 @@ class ECDiskFileManager(BaseDiskFileManager):
def _process_ondisk_files(self, exts, results, frag_index=None,
frag_prefs=None, **kwargs):
"""
Implement EC policy specific handling of .data and .durable files.
Implement EC policy specific handling of .data and legacy .durable
files.
If a frag_prefs keyword arg is provided then its value may determine
which fragment index at which timestamp is used to construct the
@ -2898,13 +2926,9 @@ class ECDiskFileManager(BaseDiskFileManager):
"""
durable_info = None
if exts.get('.durable'):
# in older versions, separate .durable files were used to indicate
# the durability of data files having the same timestamp
durable_info = exts['.durable'][0]
# Mark everything older than most recent .durable as obsolete
# and remove from the exts dict.
for ext in exts.keys():
exts[ext], older = self._split_gte_timestamp(
exts[ext], durable_info['timestamp'])
results.setdefault('obsolete', []).extend(older)
# Split the list of .data files into sets of frags having the same
# timestamp, identifying the durable and newest sets (if any) as we go.
@ -2921,8 +2945,18 @@ class ECDiskFileManager(BaseDiskFileManager):
frag_set.sort(key=lambda info: info['frag_index'])
timestamp = frag_set[0]['timestamp']
frag_sets[timestamp] = frag_set
for frag in frag_set:
# a data file marked as durable may supersede a legacy durable
# file if it is newer
if frag['durable']:
if (not durable_info or
durable_info['timestamp'] < timestamp):
# this frag defines the durable timestamp
durable_info = frag
break
if durable_info and durable_info['timestamp'] == timestamp:
durable_frag_set = frag_set
break # ignore frags that are older than durable timestamp
# Choose which frag set to use
chosen_frag_set = None
@ -2986,7 +3020,15 @@ class ECDiskFileManager(BaseDiskFileManager):
exts['.meta'], chosen_frag['timestamp'])
results['frag_sets'] = frag_sets
# Mark any isolated .durable as obsolete
# Mark everything older than most recent durable data as obsolete
# and remove from the exts dict.
if durable_info:
for ext in exts.keys():
exts[ext], older = self._split_gte_timestamp(
exts[ext], durable_info['timestamp'])
results.setdefault('obsolete', []).extend(older)
# Mark any isolated legacy .durable as obsolete
if exts.get('.durable') and not durable_frag_set:
results.setdefault('obsolete', []).extend(exts['.durable'])
exts.pop('.durable')
@ -3042,6 +3084,15 @@ class ECDiskFileManager(BaseDiskFileManager):
fi = file_info['frag_index']
hashes[fi].update(file_info['timestamp'].internal)
if 'durable_frag_set' in ondisk_info:
# The durable_frag_set may be indicated by a legacy
# <timestamp>.durable file or by a durable <timestamp>#fi#d.data
# file. Either way we update hashes[None] with the string
# <timestamp>.durable which is a consistent representation of the
# abstract state of the object regardless of the actual file set.
# That way if we use a local combination of a legacy t1.durable and
# t1#0.data to reconstruct a remote t1#0#d.data then, when next
# hashed, the local and remote will make identical updates to their
# suffix hashes.
file_info = ondisk_info['durable_frag_set'][0]
hashes[None].update(file_info['timestamp'].internal + '.durable')

View File

@ -818,9 +818,9 @@ class ObjectController(BaseStorageServer):
send_hundred_continue_response()
if not self._read_put_commit_message(mime_documents_iter):
return HTTPServerError(request=request)
# got 2nd phase confirmation, write a timestamp.durable
# state file to indicate a successful PUT
# got 2nd phase confirmation (when required), call commit to
# indicate a successful PUT
writer.commit(request.timestamp)
# Drain any remaining MIME docs from the socket. There

View File

@ -277,8 +277,8 @@ class Receiver(object):
self.frag_index in df.fragments[remote['ts_data']] and
(df.durable_timestamp is None or
df.durable_timestamp < remote['ts_data'])):
# We have the frag, just missing a .durable, so try to create the
# .durable now. Try this just once to avoid looping if it fails.
# We have the frag, just missing durable state, so make the frag
# durable now. Try this just once to avoid looping if it fails.
try:
with df.create() as writer:
writer.commit(remote['ts_data'])

View File

@ -1983,7 +1983,7 @@ class ECGetResponseCollection(object):
# durable. Note that this may be a different bucket than the one this
# response got added to, and that we may never go and get a durable
# frag from this node; it is sufficient that we have been told that a
# .durable exists, somewhere, at t_durable.
# durable frag exists, somewhere, at t_durable.
t_durable = headers.get('X-Backend-Durable-Timestamp')
if not t_durable and not t_data_file:
# obj server not upgraded so assume this response's frag is durable
@ -2619,8 +2619,9 @@ class ECObjectController(BaseObjectController):
self._transfer_data(req, policy, data_source, putters,
nodes, min_conns, etag_hasher)
# The .durable file will propagate in a replicated fashion; if
# one exists, the reconstructor will spread it around.
# The durable state will propagate in a replicated fashion; if
# one fragment is durable then the reconstructor will spread the
# durable status around.
# In order to avoid successfully writing an object, but refusing
# to serve it on a subsequent GET because don't have enough
# durable data fragments - we require the same number of durable

View File

@ -13,7 +13,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from hashlib import md5
import unittest
import uuid
@ -75,32 +75,44 @@ class TestReconstructorPropDurable(ECProbeTest):
hasher = md5()
for chunk in data:
hasher.update(chunk)
return hasher.hexdigest()
return headers, hasher.hexdigest()
def _check_node(self, node, part, etag, headers_post):
# get fragment archive etag
fragment_archive_etag = self.direct_get(node, part)
headers, fragment_archive_etag = self.direct_get(node, part)
self.assertIn('X-Backend-Durable-Timestamp', headers) # sanity check
durable_timestamp = headers['X-Backend-Durable-Timestamp']
# remove the .durable from the selected node
# make the data file non-durable on the selected node
part_dir = self.storage_dir('object', node, part=part)
for dirs, subdirs, files in os.walk(part_dir):
for fname in files:
if fname.endswith('.durable'):
durable = os.path.join(dirs, fname)
os.remove(durable)
break
if fname.endswith('.data'):
non_durable_fname = fname.replace('#d', '')
os.rename(os.path.join(dirs, fname),
os.path.join(dirs, non_durable_fname))
try:
os.remove(os.path.join(part_dir, 'hashes.pkl'))
except OSError as e:
if e.errno != errno.ENOENT:
raise
# fire up reconstructor to propagate the .durable
# sanity check that fragment is no longer durable
headers = direct_client.direct_head_object(
node, part, self.account, self.container_name, self.object_name,
headers={'X-Backend-Storage-Policy-Index': int(self.policy),
'X-Backend-Fragment-Preferences': json.dumps([])})
self.assertNotIn('X-Backend-Durable-Timestamp', headers)
# fire up reconstructor to propagate durable state
self.reconstructor.once()
# fragment is still exactly as it was before!
self.assertEqual(fragment_archive_etag,
self.direct_get(node, part))
headers, fragment_archive_etag_2 = self.direct_get(node, part)
self.assertEqual(fragment_archive_etag, fragment_archive_etag_2)
self.assertIn('X-Backend-Durable-Timestamp', headers)
self.assertEqual(durable_timestamp,
headers['X-Backend-Durable-Timestamp'])
# check meta
meta = client.head_object(self.url, self.token,
@ -132,7 +144,7 @@ class TestReconstructorPropDurable(ECProbeTest):
self.object_name, headers=headers_post)
del headers_post['X-Auth-Token'] # WTF, where did this come from?
# built up a list of node lists to kill a .durable from,
# built up a list of node lists to make non-durable,
# first try a single node
# then adjacent nodes and then nodes >1 node apart
opart, onodes = self.object_ring.get_nodes(

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import os
import shutil
import tempfile
import unittest
@ -42,6 +43,35 @@ class FakeReplicator(object):
self._diskfile_mgr = self._diskfile_router[policy]
def write_diskfile(df, timestamp, data='test data', frag_index=None,
commit=True, legacy_durable=False, extra_metadata=None):
# Helper method to write some data and metadata to a diskfile.
# Optionally do not commit the diskfile, or commit but using a legacy
# durable file
with df.create() as writer:
writer.write(data)
metadata = {
'ETag': hashlib.md5(data).hexdigest(),
'X-Timestamp': timestamp.internal,
'Content-Length': str(len(data)),
}
if extra_metadata:
metadata.update(extra_metadata)
if frag_index is not None:
metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index)
writer.put(metadata)
if commit and legacy_durable:
# simulate legacy .durable file creation
durable_file = os.path.join(df._datadir,
timestamp.internal + '.durable')
with open(durable_file, 'wb'):
pass
elif commit:
writer.commit(timestamp)
# else: don't make it durable
return metadata
class BaseTest(unittest.TestCase):
def setUp(self):
# daemon will be set in subclass setUp
@ -64,20 +94,8 @@ class BaseTest(unittest.TestCase):
df = df_mgr.get_diskfile(
device, partition, *object_parts, policy=policy,
frag_index=frag_index)
content_length = len(body)
etag = hashlib.md5(body).hexdigest()
with df.create() as writer:
writer.write(body)
metadata = {
'X-Timestamp': timestamp.internal,
'Content-Length': str(content_length),
'ETag': etag,
}
if extra_metadata:
metadata.update(extra_metadata)
writer.put(metadata)
if commit:
writer.commit(timestamp)
write_diskfile(df, timestamp, data=body, extra_metadata=extra_metadata,
commit=commit)
return df
def _make_open_diskfile(self, device='dev', partition='9',

File diff suppressed because it is too large Load Diff

View File

@ -41,6 +41,7 @@ from swift.obj.reconstructor import REVERT
from test.unit import (patch_policies, debug_logger, mocked_http_conn,
FabricatedRing, make_timestamp_iter,
DEFAULT_TEST_EC_TYPE)
from test.unit.obj.common import write_diskfile
@contextmanager
@ -136,6 +137,8 @@ def get_header_frag_index(self, body):
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=2, ec_nparity=1)])
class TestGlobalSetupObjectReconstructor(unittest.TestCase):
# Tests for reconstructor using real objects in test partition directories.
legacy_durable = False
def setUp(self):
self.testdir = tempfile.mkdtemp()
@ -174,22 +177,16 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
# most of the reconstructor test methods require that there be
# real objects in place, not just part dirs, so we'll create them
# all here....
# part 0: 3C1/hash/xxx-1.data <-- job: sync_only - parnters (FI 1)
# /xxx.durable <-- included in earlier job (FI 1)
# 061/hash/xxx-1.data <-- included in earlier job (FI 1)
# /xxx.durable <-- included in earlier job (FI 1)
# /xxx-2.data <-- job: sync_revert to index 2
# part 0: 3C1/hash/xxx#1#d.data <-- job: sync_only - partners (FI 1)
# 061/hash/xxx#1#d.data <-- included in earlier job (FI 1)
# /xxx#2#d.data <-- job: sync_revert to index 2
# part 1: 3C1/hash/xxx-0.data <-- job: sync_only - parnters (FI 0)
# /xxx-1.data <-- job: sync_revert to index 1
# /xxx.durable <-- included in earlier jobs (FI 0, 1)
# 061/hash/xxx-1.data <-- included in earlier job (FI 1)
# /xxx.durable <-- included in earlier job (FI 1)
# part 1: 3C1/hash/xxx#0#d.data <-- job: sync_only - partners (FI 0)
# /xxx#1#d.data <-- job: sync_revert to index 1
# 061/hash/xxx#1#d.data <-- included in earlier job (FI 1)
# part 2: 3C1/hash/xxx-2.data <-- job: sync_revert to index 2
# /xxx.durable <-- included in earlier job (FI 2)
# 061/hash/xxx-0.data <-- job: sync_revert to index 0
# /xxx.durable <-- included in earlier job (FI 0)
# part 2: 3C1/hash/xxx#2#d.data <-- job: sync_revert to index 2
# 061/hash/xxx#0#d.data <-- job: sync_revert to index 0
def _create_frag_archives(policy, obj_path, local_id, obj_set):
# we'll create 2 sets of objects in different suffix dirs
@ -202,7 +199,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
# just the local
return local_id
else:
# onde local and all of another
# one local and all of another
if obj_num == 0:
return local_id
else:
@ -239,7 +236,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
timestamp=utils.Timestamp(t))
for part_num in self.part_nums:
# create 3 unique objcets per part, each part
# create 3 unique objects per part, each part
# will then have a unique mix of FIs for the
# possible scenarios
for obj_num in range(0, 3):
@ -285,18 +282,10 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
df_mgr = self.reconstructor._df_router[policy]
df = df_mgr.get_diskfile('sda1', part, 'a', 'c', object_name,
policy=policy)
with df.create() as writer:
timestamp = timestamp or utils.Timestamp(time.time())
test_data = test_data or 'test data'
writer.write(test_data)
metadata = {
'X-Timestamp': timestamp.internal,
'Content-Length': len(test_data),
'Etag': md5(test_data).hexdigest(),
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
}
writer.put(metadata)
writer.commit(timestamp)
timestamp = timestamp or utils.Timestamp(time.time())
test_data = test_data or 'test data'
write_diskfile(df, timestamp, data=test_data, frag_index=frag_index,
legacy_durable=self.legacy_durable)
return df
def assert_expected_jobs(self, part_num, jobs):
@ -1003,7 +992,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
hash_gen = self.reconstructor._df_router[policy].yield_hashes(
'sda1', '2', policy)
for path, hash_, ts in hash_gen:
self.fail('found %s with %s in %s', (hash_, ts, path))
self.fail('found %s with %s in %s' % (hash_, ts, path))
# but the partition directory and hashes pkl still exist
self.assertTrue(os.access(part_path, os.F_OK))
hashes_path = os.path.join(self.objects_1, '2', diskfile.HASH_FILE)
@ -1117,6 +1106,12 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.assertEqual(len(found_jobs), 6)
class TestGlobalSetupObjectReconstructorLegacyDurable(
TestGlobalSetupObjectReconstructor):
# Tests for reconstructor using real objects in test partition directories.
legacy_durable = True
@patch_policies(with_ec_default=True)
class TestObjectReconstructor(unittest.TestCase):
@ -2444,10 +2439,9 @@ class TestObjectReconstructor(unittest.TestCase):
], [
(r['ip'], r['path']) for r in request_log.requests
])
# hashpath is still there, but only the durable remains
# hashpath is still there, but all files have been purged
files = os.listdir(df._datadir)
self.assertEqual(1, len(files))
self.assertTrue(files[0].endswith('.durable'))
self.assertFalse(files)
# and more to the point, the next suffix recalc will clean it up
df_mgr = self.reconstructor._df_router[self.policy]

View File

@ -2368,7 +2368,7 @@ class TestObjectController(unittest.TestCase):
timestamp = utils.Timestamp(time()).internal
def put_with_index(expected_rsp, frag_index, node_index=None):
data_file_tail = '#%d.data' % frag_index
data_file_tail = '#%d#d.data' % frag_index
headers = {'X-Timestamp': timestamp,
'Content-Length': '6',
'Content-Type': 'application/octet-stream',
@ -2420,7 +2420,7 @@ class TestObjectController(unittest.TestCase):
# disk file
put_with_index(201, 7, 6)
def test_PUT_durable_files(self):
def test_PUT_commits_data(self):
for policy in POLICIES:
timestamp = utils.Timestamp(int(time())).internal
data_file_tail = '.data'
@ -2429,8 +2429,9 @@ class TestObjectController(unittest.TestCase):
'Content-Type': 'application/octet-stream',
'X-Backend-Storage-Policy-Index': int(policy)}
if policy.policy_type == EC_POLICY:
# commit renames data file
headers['X-Object-Sysmeta-Ec-Frag-Index'] = '2'
data_file_tail = '#2.data'
data_file_tail = '#2#d.data'
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers=headers)
@ -2446,12 +2447,6 @@ class TestObjectController(unittest.TestCase):
self.assertTrue(os.path.isfile(data_file),
'Expected file %r not found in %r for policy %r'
% (data_file, os.listdir(obj_dir), int(policy)))
durable_file = os.path.join(obj_dir, timestamp) + '.durable'
if policy.policy_type == EC_POLICY:
self.assertTrue(os.path.isfile(durable_file))
self.assertFalse(os.path.getsize(durable_file))
else:
self.assertFalse(os.path.isfile(durable_file))
rmtree(obj_dir)
def test_HEAD(self):
@ -3237,7 +3232,7 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 202)
# PUT again at ts_2 but without a .durable file
# PUT again at ts_2 but without making the data file durable
ts_2 = next(ts_iter)
headers = {'X-Timestamp': ts_2.internal,
'Content-Length': '5',
@ -3249,8 +3244,7 @@ class TestObjectController(unittest.TestCase):
environ={'REQUEST_METHOD': 'PUT'},
headers=headers)
req.body = 'NEWER'
# patch the commit method to do nothing so EC object gets
# no .durable file
# patch the commit method to do nothing so EC object is non-durable
with mock.patch('swift.obj.diskfile.ECDiskFileWriter.commit'):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
@ -6853,15 +6847,17 @@ class TestObjectServer(unittest.TestCase):
self.assertEqual(len(log_lines), 1)
self.assertIn(' 499 ', log_lines[0])
# verify successful object data and durable state file write
# verify successful object data file write
found_files = self.find_files()
# .data file is there
# non durable .data file is there
self.assertEqual(len(found_files['.data']), 1)
obj_datafile = found_files['.data'][0]
self.assertEqual("%s#2.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# but .durable isn't
self.assertEqual(found_files['.durable'], [])
# but no other files
self.assertFalse(found_files['.data'][1:])
found_files.pop('.data')
self.assertFalse(found_files)
# And no container update
self.assertFalse(_container_update.called)
@ -6891,15 +6887,17 @@ class TestObjectServer(unittest.TestCase):
self.assertEqual(len(log_lines), 1)
self.assertIn(' 499 ', log_lines[0])
# verify successful object data and durable state file write
# verify successful object data file write
found_files = self.find_files()
# .data file is there
# non durable .data file is there
self.assertEqual(len(found_files['.data']), 1)
obj_datafile = found_files['.data'][0]
self.assertEqual("%s#2.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# but .durable isn't
self.assertEqual(found_files['.durable'], [])
# but no other files
self.assertFalse(found_files['.data'][1:])
found_files.pop('.data')
self.assertFalse(found_files)
# And no container update
self.assertFalse(_container_update.called)
@ -6948,7 +6946,7 @@ class TestObjectServer(unittest.TestCase):
resp.read()
resp.close()
# verify successful object data and durable state file write
# verify successful object data file write
put_timestamp = context['put_timestamp']
found_files = self.find_files()
# .data file is there
@ -6956,8 +6954,10 @@ class TestObjectServer(unittest.TestCase):
obj_datafile = found_files['.data'][0]
self.assertEqual("%s.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# replicated objects do not have a .durable file
self.assertEqual(found_files['.durable'], [])
# but no other files
self.assertFalse(found_files['.data'][1:])
found_files.pop('.data')
self.assertFalse(found_files)
# And container update was called
self.assertTrue(context['mock_container_update'].called)
@ -6991,13 +6991,12 @@ class TestObjectServer(unittest.TestCase):
# .data file is there
self.assertEqual(len(found_files['.data']), 1)
obj_datafile = found_files['.data'][0]
self.assertEqual("%s#2.data" % put_timestamp.internal,
self.assertEqual("%s#2#d.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# .durable file is there
self.assertEqual(len(found_files['.durable']), 1)
durable_file = found_files['.durable'][0]
self.assertEqual("%s.durable" % put_timestamp.internal,
os.path.basename(durable_file))
# but no other files
self.assertFalse(found_files['.data'][1:])
found_files.pop('.data')
self.assertFalse(found_files)
# And container update was called
self.assertTrue(context['mock_container_update'].called)
@ -7047,8 +7046,7 @@ class TestObjectServer(unittest.TestCase):
# no artifacts left on disk
found_files = self.find_files()
self.assertEqual(len(found_files['.data']), 0)
self.assertEqual(len(found_files['.durable']), 0)
self.assertFalse(found_files)
# ... and no container update
_container_update = context['mock_container_update']
self.assertFalse(_container_update.called)
@ -7112,13 +7110,12 @@ class TestObjectServer(unittest.TestCase):
# .data file is there
self.assertEqual(len(found_files['.data']), 1)
obj_datafile = found_files['.data'][0]
self.assertEqual("%s#2.data" % put_timestamp.internal,
self.assertEqual("%s#2#d.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# .durable file is there
self.assertEqual(len(found_files['.durable']), 1)
durable_file = found_files['.durable'][0]
self.assertEqual("%s.durable" % put_timestamp.internal,
os.path.basename(durable_file))
# but no other files
self.assertFalse(found_files['.data'][1:])
found_files.pop('.data')
self.assertFalse(found_files)
# And container update was called
self.assertTrue(context['mock_container_update'].called)
@ -7139,15 +7136,17 @@ class TestObjectServer(unittest.TestCase):
resp.close()
put_timestamp = context['put_timestamp']
_container_update = context['mock_container_update']
# verify that durable file was NOT created
# verify that durable data file was NOT created
found_files = self.find_files()
# .data file is there
# non durable .data file is there
self.assertEqual(len(found_files['.data']), 1)
obj_datafile = found_files['.data'][0]
self.assertEqual("%s#2.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# but .durable isn't
self.assertEqual(found_files['.durable'], [])
# but no other files
self.assertFalse(found_files['.data'][1:])
found_files.pop('.data')
self.assertFalse(found_files)
# And no container update
self.assertFalse(_container_update.called)
@ -7196,13 +7195,12 @@ class TestObjectServer(unittest.TestCase):
# .data file is there
self.assertEqual(len(found_files['.data']), 1)
obj_datafile = found_files['.data'][0]
self.assertEqual("%s#2.data" % put_timestamp.internal,
self.assertEqual("%s#2#d.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# .durable file is there
self.assertEqual(len(found_files['.durable']), 1)
durable_file = found_files['.durable'][0]
self.assertEqual("%s.durable" % put_timestamp.internal,
os.path.basename(durable_file))
# but no other files
self.assertFalse(found_files['.data'][1:])
found_files.pop('.data')
self.assertFalse(found_files)
# And container update was called
self.assertTrue(context['mock_container_update'].called)
@ -7246,13 +7244,12 @@ class TestObjectServer(unittest.TestCase):
# .data file is there
self.assertEqual(len(found_files['.data']), 1)
obj_datafile = found_files['.data'][0]
self.assertEqual("%s#2.data" % put_timestamp.internal,
self.assertEqual("%s#2#d.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# ... and .durable is there
self.assertEqual(len(found_files['.durable']), 1)
durable_file = found_files['.durable'][0]
self.assertEqual("%s.durable" % put_timestamp.internal,
os.path.basename(durable_file))
# but no other files
self.assertFalse(found_files['.data'][1:])
found_files.pop('.data')
self.assertFalse(found_files)
# but no container update
self.assertFalse(context['mock_container_update'].called)

View File

@ -390,9 +390,9 @@ class TestSsyncEC(TestBaseSsync):
tx_objs, policy, frag_index, rx_node_index)
self._verify_tombstones(tx_tombstones, policy)
def test_handoff_fragment_only_missing_durable(self):
def test_handoff_fragment_only_missing_durable_state(self):
# test that a sync_revert type job does not PUT when the rx is only
# missing a durable file
# missing durable state
policy = POLICIES.default
rx_node_index = frag_index = 0
tx_node_index = 1
@ -405,10 +405,10 @@ class TestSsyncEC(TestBaseSsync):
expected_subreqs = defaultdict(list)
# o1 in sync on rx but rx missing .durable - no PUT required
t1a = next(self.ts_iter) # older rx .data with .durable
# o1 in sync on rx but rx missing durable state - no PUT required
t1a = next(self.ts_iter) # older durable rx .data
t1b = next(self.ts_iter) # rx .meta
t1c = next(self.ts_iter) # tx .data with .durable, rx missing .durable
t1c = next(self.ts_iter) # durable tx .data, non-durable rx .data
obj_name = 'o1'
tx_objs[obj_name] = self._create_ondisk_files(
tx_df_mgr, obj_name, policy, t1c, (tx_node_index, rx_node_index,))
@ -419,7 +419,7 @@ class TestSsyncEC(TestBaseSsync):
rx_objs[obj_name] = self._create_ondisk_files(
rx_df_mgr, obj_name, policy, t1c, (rx_node_index, 9), commit=False)
# o2 on rx has wrong frag_indexes and missing .durable - PUT required
# o2 on rx has wrong frag_indexes and is non-durable - PUT required
t2 = next(self.ts_iter)
obj_name = 'o2'
tx_objs[obj_name] = self._create_ondisk_files(
@ -428,7 +428,7 @@ class TestSsyncEC(TestBaseSsync):
rx_df_mgr, obj_name, policy, t2, (13, 14), commit=False)
expected_subreqs['PUT'].append(obj_name)
# o3 on rx has frag at other time missing .durable - PUT required
# o3 on rx has frag at other time and non-durable - PUT required
t3 = next(self.ts_iter)
obj_name = 'o3'
tx_objs[obj_name] = self._create_ondisk_files(
@ -656,6 +656,79 @@ class TestSsyncEC(TestBaseSsync):
self.assertIn("Invalid X-Backend-Ssync-Frag-Index 'Not a number'",
error_msg)
def test_revert_job_with_legacy_durable(self):
# test a sync_revert type job using a sender object with a legacy
# durable file, that will create a receiver object with durable data
policy = POLICIES.default
rx_node_index = 0
# for a revert job we iterate over frag index that belongs on
# remote node
frag_index = rx_node_index
# create non durable tx obj by not committing, then create a legacy
# .durable file
tx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
t1 = next(self.ts_iter)
tx_objs['o1'] = self._create_ondisk_files(
tx_df_mgr, 'o1', policy, t1, (rx_node_index,), commit=False)
tx_datadir = tx_objs['o1'][0]._datadir
durable_file = os.path.join(tx_datadir, t1.internal + '.durable')
with open(durable_file, 'wb'):
pass
self.assertEqual(2, len(os.listdir(tx_datadir))) # sanity check
suffixes = [os.path.basename(os.path.dirname(tx_datadir))]
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy,
'frag_index': frag_index}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
sender()
# verify protocol
results = self._analyze_trace(trace)
self.assertEqual(1, len(results['tx_missing']))
self.assertEqual(1, len(results['rx_missing']))
self.assertEqual(1, len(results['tx_updates']))
self.assertFalse(results['rx_updates'])
# sanity check - rx diskfile is durable
expected_rx_file = '%s#%s#d.data' % (t1.internal, rx_node_index)
rx_df = self._open_rx_diskfile('o1', policy, rx_node_index)
self.assertEqual([expected_rx_file], os.listdir(rx_df._datadir))
# verify on disk files...
self._verify_ondisk_files(
tx_objs, policy, frag_index, rx_node_index)
# verify that tx and rx both generate the same suffix hashes...
tx_hashes = tx_df_mgr.get_hashes(
self.device, self.partition, suffixes, policy)
rx_hashes = rx_df_mgr.get_hashes(
self.device, self.partition, suffixes, policy)
self.assertEqual(suffixes, tx_hashes.keys()) # sanity
self.assertEqual(tx_hashes, rx_hashes)
# sanity check - run ssync again and expect no sync activity
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
sender.connect, trace = self.make_connect_wrapper(sender)
sender()
results = self._analyze_trace(trace)
self.assertEqual(1, len(results['tx_missing']))
self.assertFalse(results['rx_missing'])
self.assertFalse(results['tx_updates'])
self.assertFalse(results['rx_updates'])
@patch_policies
class TestSsyncReplication(TestBaseSsync):

View File

@ -671,7 +671,7 @@ class TestReceiver(unittest.TestCase):
self.controller._diskfile_router = diskfile.DiskFileRouter(
self.conf, self.controller.logger)
# make rx disk file but don't commit it, so .durable is missing
# make rx disk file but don't commit it, so durable state is missing
ts1 = next(make_timestamp_iter()).internal
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1',
@ -714,7 +714,7 @@ class TestReceiver(unittest.TestCase):
self.controller._diskfile_router = diskfile.DiskFileRouter(
self.conf, self.controller.logger)
# make rx disk file but don't commit it, so .durable is missing
# make rx disk file but don't commit it, so durable state is missing
ts1 = next(make_timestamp_iter()).internal
object_dir = utils.storage_directory(
os.path.join(self.testdir, 'sda1',

View File

@ -1825,16 +1825,6 @@ class TestObjectController(unittest.TestCase):
contents = ''.join(df.reader())
got_pieces.add(contents)
# check presence for a .durable file for the timestamp
durable_file = os.path.join(
_testdir, node['device'], storage_directory(
diskfile.get_data_dir(policy),
partition, hash_path('a', 'ec-con', 'o1')),
utils.Timestamp(df.timestamp).internal + '.durable')
if os.path.isfile(durable_file):
got_durable.append(True)
lmeta = dict((k.lower(), v) for k, v in meta.items())
got_indices.add(
lmeta['x-object-sysmeta-ec-frag-index'])
@ -1855,11 +1845,24 @@ class TestObjectController(unittest.TestCase):
lmeta['etag'],
md5(contents).hexdigest())
# check presence for a durable data file for the timestamp
durable_file = (
utils.Timestamp(df.timestamp).internal +
'#%s' % lmeta['x-object-sysmeta-ec-frag-index'] +
'#d.data')
durable_file = os.path.join(
_testdir, node['device'], storage_directory(
diskfile.get_data_dir(policy),
partition, hash_path('a', 'ec-con', 'o1')),
durable_file)
if os.path.isfile(durable_file):
got_durable.append(True)
self.assertEqual(expected_pieces, got_pieces)
self.assertEqual(set(('0', '1', '2')), got_indices)
# verify at least 2 puts made it all the way to the end of 2nd
# phase, ie at least 2 .durable statuses were written
# phase, ie at least 2 durable statuses were written
num_durable_puts = sum(d is True for d in got_durable)
self.assertGreaterEqual(num_durable_puts, 2)
@ -1908,16 +1911,21 @@ class TestObjectController(unittest.TestCase):
node['device'], partition, 'a',
'ec-con', 'o2', policy=ec_policy)
with df.open():
meta = df.get_metadata()
contents = ''.join(df.reader())
fragment_archives.append(contents)
self.assertEqual(len(contents), expected_length)
# check presence for a .durable file for the timestamp
# check presence for a durable data file for the timestamp
durable_file = (
utils.Timestamp(df.timestamp).internal +
'#%s' % meta['X-Object-Sysmeta-Ec-Frag-Index'] +
'#d.data')
durable_file = os.path.join(
_testdir, node['device'], storage_directory(
diskfile.get_data_dir(ec_policy),
partition, hash_path('a', 'ec-con', 'o2')),
utils.Timestamp(df.timestamp).internal + '.durable')
durable_file)
if os.path.isfile(durable_file):
got_durable.append(True)
@ -1947,7 +1955,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(seg, obj[segment_start:segment_end])
# verify at least 2 puts made it all the way to the end of 2nd
# phase, ie at least 2 .durable statuses were written
# phase, ie at least 2 durable statuses were written
num_durable_puts = sum(d is True for d in got_durable)
self.assertGreaterEqual(num_durable_puts, 2)
@ -5618,8 +5626,8 @@ class TestECGets(unittest.TestCase):
:param node_state: a dict that maps a node index to the desired state
for that node. Each desired state is a list of
dicts, with each dict describing object reference,
frag_index and file extensions to be moved to the
node's hash_dir.
frag_index and whether the file moved to the node's
hash_dir should be marked as durable or not.
"""
(prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv,
obj2srv, obj3srv) = _test_servers
@ -5682,19 +5690,19 @@ class TestECGets(unittest.TestCase):
# node state is in form:
# {node_index: [{ref: object reference,
# frag_index: index,
# exts: ['.data' etc]}, ...],
# durable: True or False}, ...],
# node_index: ...}
for node_index, state in node_state.items():
dest = node_hash_dirs[node_index]
for frag_info in state:
src = node_tmp_dirs[frag_info['frag_index']][frag_info['ref']]
src_files = [f for f in os.listdir(src)
if f.endswith(frag_info['exts'])]
self.assertEqual(len(frag_info['exts']), len(src_files),
'Bad test setup for node %s, obj %s'
% (node_index, frag_info['ref']))
for f in src_files:
move(os.path.join(src, f), os.path.join(dest, f))
src_files = os.listdir(src)
# sanity check, expect just a single .data file
self.assertFalse(src_files[1:])
dest_file = src_files[0].replace(
'#d', '#d' if frag_info['durable'] else '')
move(os.path.join(src, src_files[0]),
os.path.join(dest, dest_file))
# do an object GET
get_req = Request.blank(obj_path, method='GET')
@ -5707,9 +5715,9 @@ class TestECGets(unittest.TestCase):
# durable missing from 2/3 nodes
node_state = {
0: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))],
1: [dict(ref='obj1', frag_index=1, exts=('.data',))],
2: [dict(ref='obj1', frag_index=2, exts=('.data',))]
0: [dict(ref='obj1', frag_index=0, durable=True)],
1: [dict(ref='obj1', frag_index=1, durable=False)],
2: [dict(ref='obj1', frag_index=2, durable=False)]
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
@ -5719,9 +5727,9 @@ class TestECGets(unittest.TestCase):
# all files missing on 1 node, durable missing from 1/2 other nodes
# durable missing from 2/3 nodes
node_state = {
0: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))],
0: [dict(ref='obj1', frag_index=0, durable=True)],
1: [],
2: [dict(ref='obj1', frag_index=2, exts=('.data',))]
2: [dict(ref='obj1', frag_index=2, durable=False)]
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
@ -5730,9 +5738,9 @@ class TestECGets(unittest.TestCase):
# durable missing from all 3 nodes
node_state = {
0: [dict(ref='obj1', frag_index=0, exts=('.data',))],
1: [dict(ref='obj1', frag_index=1, exts=('.data',))],
2: [dict(ref='obj1', frag_index=2, exts=('.data',))]
0: [dict(ref='obj1', frag_index=0, durable=False)],
1: [dict(ref='obj1', frag_index=1, durable=False)],
2: [dict(ref='obj1', frag_index=2, durable=False)]
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
@ -5746,8 +5754,8 @@ class TestECGets(unittest.TestCase):
# scenario: only two frags, both on same node
node_state = {
0: [],
1: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable')),
dict(ref='obj1', frag_index=1, exts=('.data',))],
1: [dict(ref='obj1', frag_index=0, durable=True),
dict(ref='obj1', frag_index=1, durable=False)],
2: []
}
@ -5758,9 +5766,9 @@ class TestECGets(unittest.TestCase):
# scenario: all 3 frags on same node
node_state = {
0: [],
1: [dict(ref='obj1', frag_index=0, exts=('.data', '.durable')),
dict(ref='obj1', frag_index=1, exts=('.data',)),
dict(ref='obj1', frag_index=2, exts=('.data',))],
1: [dict(ref='obj1', frag_index=0, durable=True),
dict(ref='obj1', frag_index=1, durable=False),
dict(ref='obj1', frag_index=2, durable=False)],
2: []
}
@ -5778,32 +5786,32 @@ class TestECGets(unittest.TestCase):
# newer non-durable frags do not prevent proxy getting the durable obj1
node_state = {
0: [dict(ref='obj3', frag_index=0, exts=('.data',)),
dict(ref='obj2', frag_index=0, exts=('.data',)),
dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))],
1: [dict(ref='obj3', frag_index=1, exts=('.data',)),
dict(ref='obj2', frag_index=1, exts=('.data',)),
dict(ref='obj1', frag_index=1, exts=('.data', '.durable'))],
2: [dict(ref='obj3', frag_index=2, exts=('.data',)),
dict(ref='obj2', frag_index=2, exts=('.data',)),
dict(ref='obj1', frag_index=2, exts=('.data', '.durable'))],
0: [dict(ref='obj3', frag_index=0, durable=False),
dict(ref='obj2', frag_index=0, durable=False),
dict(ref='obj1', frag_index=0, durable=True)],
1: [dict(ref='obj3', frag_index=1, durable=False),
dict(ref='obj2', frag_index=1, durable=False),
dict(ref='obj1', frag_index=1, durable=True)],
2: [dict(ref='obj3', frag_index=2, durable=False),
dict(ref='obj2', frag_index=2, durable=False),
dict(ref='obj1', frag_index=2, durable=True)],
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, objs['obj1']['body'])
# .durables at two timestamps: in this scenario proxy is guaranteed
# durable frags at two timestamps: in this scenario proxy is guaranteed
# to see the durable at ts_2 with one of the first 2 responses, so will
# then prefer that when requesting from third obj server
node_state = {
0: [dict(ref='obj3', frag_index=0, exts=('.data',)),
dict(ref='obj2', frag_index=0, exts=('.data',)),
dict(ref='obj1', frag_index=0, exts=('.data', '.durable'))],
1: [dict(ref='obj3', frag_index=1, exts=('.data',)),
dict(ref='obj2', frag_index=1, exts=('.data', '.durable'))],
2: [dict(ref='obj3', frag_index=2, exts=('.data',)),
dict(ref='obj2', frag_index=2, exts=('.data', '.durable'))],
0: [dict(ref='obj3', frag_index=0, durable=False),
dict(ref='obj2', frag_index=0, durable=False),
dict(ref='obj1', frag_index=0, durable=True)],
1: [dict(ref='obj3', frag_index=1, durable=False),
dict(ref='obj2', frag_index=1, durable=True)],
2: [dict(ref='obj3', frag_index=2, durable=False),
dict(ref='obj2', frag_index=2, durable=True)],
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
@ -5826,10 +5834,10 @@ class TestECGets(unittest.TestCase):
# back two responses with frag index 1, and will then return to node 0
# for frag_index 0.
node_state = {
0: [dict(ref='obj1a', frag_index=0, exts=('.data',)),
dict(ref='obj1a', frag_index=1, exts=('.data',))],
1: [dict(ref='obj1b', frag_index=1, exts=('.data', '.durable'))],
2: [dict(ref='obj1c', frag_index=1, exts=('.data', '.durable'))]
0: [dict(ref='obj1a', frag_index=0, durable=False),
dict(ref='obj1a', frag_index=1, durable=False)],
1: [dict(ref='obj1b', frag_index=1, durable=True)],
2: [dict(ref='obj1c', frag_index=1, durable=True)]
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
@ -5840,9 +5848,9 @@ class TestECGets(unittest.TestCase):
# 404 (the third, 'extra', obj server GET will return 404 because it
# will be sent frag prefs that exclude frag_index 1)
node_state = {
0: [dict(ref='obj1a', frag_index=1, exts=('.data',))],
1: [dict(ref='obj1b', frag_index=1, exts=('.data', '.durable'))],
2: [dict(ref='obj1c', frag_index=1, exts=('.data',))]
0: [dict(ref='obj1a', frag_index=1, durable=False)],
1: [dict(ref='obj1b', frag_index=1, durable=True)],
2: [dict(ref='obj1c', frag_index=1, durable=False)]
}
resp = self._setup_nodes_and_do_GET(objs, node_state)
@ -5947,7 +5955,6 @@ class TestObjectDisconnectCleanup(unittest.TestCase):
def test_ec_disconnect_cleans_up(self):
self._check_disconnect_cleans_up('ec')
found_files = self.find_files()
self.assertEqual(found_files['.durable'], [])
self.assertEqual(found_files['.data'], [])
def test_repl_chunked_transfer_disconnect_cleans_up(self):
@ -5958,7 +5965,6 @@ class TestObjectDisconnectCleanup(unittest.TestCase):
def test_ec_chunked_transfer_disconnect_cleans_up(self):
self._check_disconnect_cleans_up('ec', is_chunked=True)
found_files = self.find_files()
self.assertEqual(found_files['.durable'], [])
self.assertEqual(found_files['.data'], [])