From 8fe4bfefaa13d087763f948151b9a82790ef89f0 Mon Sep 17 00:00:00 2001
From: Kota Tsuyuzaki <tsuyuzaki.kota@lab.ntt.co.jp>
Date: Thu, 2 Mar 2017 03:31:28 -0800
Subject: [PATCH] TestObjectController refactoring

From the related change of ECDuplication, Swift have a couple of Test
classes for EC policy, normal EC and EC Duplication, in the
test/unit/proxy/test_server.py. To enable the classes, the related change
abstracts the EC test cases as the ECTestMixin class to gather test
methods into one place but it was worse because TestObjectController did
still have both test cases for replication and for ec that may be hard
to understand the test class structure.

Hence, this patch attempts to refactor the structure as

From:

     ECTestMixin
            |
    -------------------------------------
    |                                   |
TestObjectController           TestObjectControllerECDuplication
(for replication and EC)       (for EC Duplication Policy)

To:

    BaseTestObjectController
            |
    --------------------------------------
    |                                    |
TestReplicatedObjectController  BaseTestECObjectController
(for replication)                        |
                          ---------------------------------
                          |                               |
                TestECObjectController    TestECDuplicationObjectController
                (for EC policy)           (for EC Duplication Policy)

Some more cleanups are in follow up patches because this patch shows a lot
of moving code chunks which could be hard to compare the diff. To make
the review easy, this patch forcus on ONLY the structure changes as
possible.

Related-Change: Idd155401982a2c48110c30b480966a863f6bd305
Related-Change: I25a3f8fc837706d78dca226fe282d9e5ead65a0d
Change-Id: Ifd3d0fa66773e640bb61cc528f7a1b2358e97d91
---
 test/unit/proxy/test_mem_server.py |    5 +-
 test/unit/proxy/test_server.py     | 1795 ++++++++++++++--------------
 2 files changed, 906 insertions(+), 894 deletions(-)

diff --git a/test/unit/proxy/test_mem_server.py b/test/unit/proxy/test_mem_server.py
index 2221ee926e..336b98b766 100644
--- a/test/unit/proxy/test_mem_server.py
+++ b/test/unit/proxy/test_mem_server.py
@@ -33,7 +33,8 @@ class TestProxyServer(test_server.TestProxyServer):
     pass
 
 
-class TestObjectController(test_server.TestObjectController):
+class TestReplicatedObjectController(
+        test_server.TestReplicatedObjectController):
     def test_PUT_no_etag_fallocate(self):
         # mem server doesn't call fallocate(), believe it or not
         pass
@@ -42,6 +43,8 @@ class TestObjectController(test_server.TestObjectController):
     def test_policy_IO(self):
         pass
 
+
+class TestECObjectController(test_server.TestECObjectController):
     def test_PUT_ec(self):
         pass
 
diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py
index 6136ee96f7..8839d7e92b 100644
--- a/test/unit/proxy/test_server.py
+++ b/test/unit/proxy/test_server.py
@@ -1143,7 +1143,63 @@ class TestProxyServerLoading(unittest.TestCase):
             self.assertTrue(policy.object_ring)
 
 
-class ECTestMixin(object):
+class BaseTestObjectController(object):
+    """
+    A root of TestObjController that implements helper methods for child
+    TestObjControllers.
+    """
+    def setUp(self):
+        self.app = proxy_server.Application(
+            None, FakeMemcache(),
+            logger=debug_logger('proxy-ut'),
+            account_ring=FakeRing(),
+            container_ring=FakeRing())
+        # clear proxy logger result for each test
+        _test_servers[0].logger._clear()
+
+    def tearDown(self):
+        self.app.account_ring.set_replicas(3)
+        self.app.container_ring.set_replicas(3)
+        for policy in POLICIES:
+            policy.object_ring = FakeRing(base_port=3000)
+
+    def assert_status_map(self, method, statuses, expected, raise_exc=False):
+        with save_globals():
+            kwargs = {}
+            if raise_exc:
+                kwargs['raise_exc'] = raise_exc
+
+            set_http_connect(*statuses, **kwargs)
+            self.app.memcache.store = {}
+            req = Request.blank('/v1/a/c/o',
+                                headers={'Content-Length': '0',
+                                         'Content-Type': 'text/plain'})
+            self.app.update_request(req)
+            try:
+                res = method(req)
+            except HTTPException as res:
+                pass
+            self.assertEqual(res.status_int, expected)
+
+            # repeat test
+            set_http_connect(*statuses, **kwargs)
+            self.app.memcache.store = {}
+            req = Request.blank('/v1/a/c/o',
+                                headers={'Content-Length': '0',
+                                         'Content-Type': 'text/plain'})
+            self.app.update_request(req)
+            try:
+                res = method(req)
+            except HTTPException as res:
+                pass
+            self.assertEqual(res.status_int, expected)
+
+    def _sleep_enough(self, condition):
+        for sleeptime in (0.1, 1.0):
+            sleep(sleeptime)
+            if condition():
+                break
+
     def put_container(self, policy_name, container_name):
         # Note: only works if called with unpatched policies
         prolis = _test_sockets[0]
@@ -1161,494 +1217,6 @@ class ECTestMixin(object):
         exp = 'HTTP/1.1 2'
         self.assertEqual(headers[:len(exp)], exp)
 
-    @unpatch_policies
-    def test_PUT_ec(self):
-        policy = POLICIES[self.ec_policy_index]
-        self.put_container(policy.name, policy.name)
-
-        obj = 'abCD' * 10  # small, so we don't get multiple EC stripes
-        prolis = _test_sockets[0]
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('PUT /v1/a/%s/o1 HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'Etag: "%s"\r\n'
-                 'Content-Length: %d\r\n'
-                 'X-Storage-Token: t\r\n'
-                 'Content-Type: application/octet-stream\r\n'
-                 '\r\n%s' % (policy.name, md5(obj).hexdigest(),
-                             len(obj), obj))
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 201'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        ecd = policy.pyeclib_driver
-        expected_pieces = set(ecd.encode(obj))
-
-        # go to disk to make sure it's there and all erasure-coded
-        partition, nodes = policy.object_ring.get_nodes('a', policy.name, 'o1')
-        conf = {'devices': _testdir, 'mount_check': 'false'}
-        df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[policy]
-
-        got_pieces = set()
-        got_indices = set()
-        got_durable = []
-        for node_index, node in enumerate(nodes):
-            df = df_mgr.get_diskfile(node['device'], partition,
-                                     'a', policy.name, 'o1',
-                                     policy=policy)
-            with df.open():
-                meta = df.get_metadata()
-                contents = ''.join(df.reader())
-                got_pieces.add(contents)
-
-                lmeta = dict((k.lower(), v) for k, v in meta.items())
-                got_indices.add(
-                    lmeta['x-object-sysmeta-ec-frag-index'])
-
-                self.assertEqual(
-                    lmeta['x-object-sysmeta-ec-etag'],
-                    md5(obj).hexdigest())
-                self.assertEqual(
-                    lmeta['x-object-sysmeta-ec-content-length'],
-                    str(len(obj)))
-                self.assertEqual(
-                    lmeta['x-object-sysmeta-ec-segment-size'],
-                    '4096')
-                self.assertEqual(
-                    lmeta['x-object-sysmeta-ec-scheme'],
-                    '%s 2+1' % DEFAULT_TEST_EC_TYPE)
-                self.assertEqual(
-                    lmeta['etag'],
-                    md5(contents).hexdigest())
-
-                # check presence for a durable data file for the timestamp
-                durable_file = (
-                    utils.Timestamp(df.timestamp).internal +
-                    '#%s' % lmeta['x-object-sysmeta-ec-frag-index'] +
-                    '#d.data')
-                durable_file = os.path.join(
-                    _testdir, node['device'], storage_directory(
-                        diskfile.get_data_dir(policy),
-                        partition, hash_path('a', policy.name, 'o1')),
-                    durable_file)
-                if os.path.isfile(durable_file):
-                    got_durable.append(True)
-
-        self.assertEqual(expected_pieces, got_pieces)
-        self.assertEqual(set(('0', '1', '2')), got_indices)
-
-        # verify at least 2 puts made it all the way to the end of 2nd
-        # phase, ie at least 2 durable statuses were written
-        num_durable_puts = sum(d is True for d in got_durable)
-        self.assertGreaterEqual(num_durable_puts, 2)
-
-    @unpatch_policies
-    def test_PUT_ec_multiple_segments(self):
-        ec_policy = POLICIES[self.ec_policy_index]
-        self.put_container(ec_policy.name, ec_policy.name)
-
-        pyeclib_header_size = len(ec_policy.pyeclib_driver.encode("")[0])
-        segment_size = ec_policy.ec_segment_size
-
-        # Big enough to have multiple segments. Also a multiple of the
-        # segment size to get coverage of that path too.
-        obj = 'ABC' * segment_size
-
-        prolis = _test_sockets[0]
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('PUT /v1/a/%s/o2 HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'Content-Length: %d\r\n'
-                 'X-Storage-Token: t\r\n'
-                 'Content-Type: application/octet-stream\r\n'
-                 '\r\n%s' % (ec_policy.name, len(obj), obj))
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 201'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        # it's a 2+1 erasure code, so each fragment archive should be half
-        # the length of the object, plus three inline pyeclib metadata
-        # things (one per segment)
-        expected_length = (len(obj) / 2 + pyeclib_header_size * 3)
-
-        partition, nodes = ec_policy.object_ring.get_nodes(
-            'a', ec_policy.name, 'o2')
-
-        conf = {'devices': _testdir, 'mount_check': 'false'}
-        df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
-
-        got_durable = []
-        fragment_archives = []
-        for node in nodes:
-            df = df_mgr.get_diskfile(
-                node['device'], partition, 'a',
-                ec_policy.name, 'o2', policy=ec_policy)
-            with df.open():
-                meta = df.get_metadata()
-                contents = ''.join(df.reader())
-                fragment_archives.append(contents)
-                self.assertEqual(len(contents), expected_length)
-
-                durable_file = (
-                    utils.Timestamp(df.timestamp).internal +
-                    '#%s' % meta['X-Object-Sysmeta-Ec-Frag-Index'] +
-                    '#d.data')
-                durable_file = os.path.join(
-                    _testdir, node['device'], storage_directory(
-                        diskfile.get_data_dir(ec_policy),
-                        partition, hash_path('a', ec_policy.name, 'o2')),
-                    durable_file)
-                if os.path.isfile(durable_file):
-                    got_durable.append(True)
-
-        # Verify that we can decode each individual fragment and that they
-        # are all the correct size
-        fragment_size = ec_policy.fragment_size
-        nfragments = int(
-            math.ceil(float(len(fragment_archives[0])) / fragment_size))
-
-        for fragment_index in range(nfragments):
-            fragment_start = fragment_index * fragment_size
-            fragment_end = (fragment_index + 1) * fragment_size
-
-            try:
-                frags = [fa[fragment_start:fragment_end]
-                         for fa in fragment_archives]
-                seg = ec_policy.pyeclib_driver.decode(frags)
-            except ECDriverError:
-                self.fail("Failed to decode fragments %d; this probably "
-                          "means the fragments are not the sizes they "
-                          "should be" % fragment_index)
-
-            segment_start = fragment_index * segment_size
-            segment_end = (fragment_index + 1) * segment_size
-
-            self.assertEqual(seg, obj[segment_start:segment_end])
-
-        # verify at least 2 puts made it all the way to the end of 2nd
-        # phase, ie at least 2 .durable statuses were written
-        num_durable_puts = sum(d is True for d in got_durable)
-        self.assertGreaterEqual(num_durable_puts, 2)
-
-    @unpatch_policies
-    def test_PUT_ec_object_etag_mismatch(self):
-        ec_policy = POLICIES[self.ec_policy_index]
-        self.put_container(ec_policy.name, ec_policy.name)
-
-        obj = '90:6A:02:60:B1:08-96da3e706025537fc42464916427727e'
-        prolis = _test_sockets[0]
-        prosrv = _test_servers[0]
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('PUT /v1/a/%s/o3 HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'Etag: %s\r\n'
-                 'Content-Length: %d\r\n'
-                 'X-Storage-Token: t\r\n'
-                 'Content-Type: application/octet-stream\r\n'
-                 '\r\n%s' % (ec_policy.name,
-                             md5('something else').hexdigest(),
-                             len(obj), obj))
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 422'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        # nothing should have made it to disk on the object servers
-        partition, nodes = prosrv.get_object_ring(
-            int(ec_policy)).get_nodes('a', ec_policy.name, 'o3')
-        conf = {'devices': _testdir, 'mount_check': 'false'}
-
-        df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
-
-        for node in nodes:
-            df = df_mgr.get_diskfile(node['device'], partition,
-                                     'a', ec_policy.name, 'o3',
-                                     policy=ec_policy)
-            self.assertRaises(DiskFileNotExist, df.open)
-
-    @unpatch_policies
-    def test_PUT_ec_fragment_archive_etag_mismatch(self):
-        ec_policy = POLICIES[self.ec_policy_index]
-        self.put_container(ec_policy.name, ec_policy.name)
-
-        # Cause a hash mismatch by feeding one particular MD5 hasher some
-        # extra data. The goal here is to get exactly more than one of the
-        # hashers in an object server.
-        count = (ec_policy.object_ring.replica_count - ec_policy.ec_ndata)
-        countdown = [count]
-
-        def busted_md5_constructor(initial_str=""):
-            hasher = md5(initial_str)
-            if countdown[0] > 0:
-                hasher.update('wrong')
-            countdown[0] -= 1
-            return hasher
-
-        obj = 'uvarovite-esurience-cerated-symphysic'
-        prolis = _test_sockets[0]
-        prosrv = _test_servers[0]
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        with mock.patch('swift.obj.server.md5', busted_md5_constructor):
-            fd = sock.makefile()
-            fd.write('PUT /v1/a/%s/pimento HTTP/1.1\r\n'
-                     'Host: localhost\r\n'
-                     'Connection: close\r\n'
-                     'Etag: %s\r\n'
-                     'Content-Length: %d\r\n'
-                     'X-Storage-Token: t\r\n'
-                     'Content-Type: application/octet-stream\r\n'
-                     '\r\n%s' % (ec_policy.name, md5(obj).hexdigest(),
-                                 len(obj), obj))
-            fd.flush()
-            headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 503'  # no quorum
-        self.assertEqual(headers[:len(exp)], exp)
-
-        # replica count - 1 of the fragment archives should have
-        # landed on disk
-        partition, nodes = prosrv.get_object_ring(
-            int(ec_policy)).get_nodes('a', ec_policy.name, 'pimento')
-        conf = {'devices': _testdir, 'mount_check': 'false'}
-
-        df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
-
-        found = 0
-        for node in nodes:
-            df = df_mgr.get_diskfile(node['device'], partition,
-                                     'a', ec_policy.name, 'pimento',
-                                     policy=ec_policy)
-            try:
-                # diskfile open won't succeed because no durable was written,
-                # so look under the hood for data files.
-                files = os.listdir(df._datadir)
-                if len(files) > 0:
-                    # Although the third fragment archive hasn't landed on
-                    # disk, the directory df._datadir is pre-maturely created
-                    # and is empty when we use O_TMPFILE + linkat()
-                    num_data_files = \
-                        len([f for f in files if f.endswith('.data')])
-                    self.assertEqual(1, num_data_files)
-                    found += 1
-            except OSError:
-                pass
-        self.assertEqual(found, ec_policy.ec_ndata)
-
-    @unpatch_policies
-    def test_PUT_ec_fragment_quorum_archive_etag_mismatch(self):
-        ec_policy = POLICIES[self.ec_policy_index]
-        self.put_container("ec", "ec-con")
-
-        def busted_md5_constructor(initial_str=""):
-            hasher = md5(initial_str)
-            hasher.update('wrong')
-            return hasher
-
-        obj = 'uvarovite-esurience-cerated-symphysic'
-        prolis = _test_sockets[0]
-        prosrv = _test_servers[0]
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-
-        call_count = [0]
-
-        def mock_committer(self):
-            call_count[0] += 1
-
-        commit_confirmation = \
-            'swift.proxy.controllers.obj.MIMEPutter.send_commit_confirmation'
-
-        with mock.patch('swift.obj.server.md5', busted_md5_constructor), \
-                mock.patch(commit_confirmation, mock_committer):
-            fd = sock.makefile()
-            fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n'
-                     'Host: localhost\r\n'
-                     'Connection: close\r\n'
-                     'Etag: %s\r\n'
-                     'Content-Length: %d\r\n'
-                     'X-Storage-Token: t\r\n'
-                     'Content-Type: application/octet-stream\r\n'
-                     '\r\n%s' % (md5(obj).hexdigest(), len(obj), obj))
-            fd.flush()
-            headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 503'  # no quorum
-        self.assertEqual(headers[:len(exp)], exp)
-        # Don't send commit to object-server if quorum responses consist of 4xx
-        self.assertEqual(0, call_count[0])
-
-        # no fragment archives should have landed on disk
-        partition, nodes = prosrv.get_object_ring(3).get_nodes(
-            'a', 'ec-con', 'quorum')
-        conf = {'devices': _testdir, 'mount_check': 'false'}
-
-        df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
-
-        for node in nodes:
-            df = df_mgr.get_diskfile(node['device'], partition,
-                                     'a', 'ec-con', 'quorum',
-                                     policy=POLICIES[3])
-            if os.path.exists(df._datadir):
-                self.assertFalse(os.listdir(df._datadir))  # should be empty
-
-    @unpatch_policies
-    def test_PUT_ec_fragment_quorum_bad_request(self):
-        ec_policy = POLICIES[self.ec_policy_index]
-        self.put_container("ec", "ec-con")
-
-        obj = 'uvarovite-esurience-cerated-symphysic'
-        prolis = _test_sockets[0]
-        prosrv = _test_servers[0]
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-
-        call_count = [0]
-
-        def mock_committer(self):
-            call_count[0] += 1
-
-        read_footer = \
-            'swift.obj.server.ObjectController._read_metadata_footer'
-        commit_confirmation = \
-            'swift.proxy.controllers.obj.MIMEPutter.send_commit_confirmation'
-
-        with mock.patch(read_footer) as read_footer_call, \
-                mock.patch(commit_confirmation, mock_committer):
-            # Emulate missing footer MIME doc in all object-servers
-            read_footer_call.side_effect = HTTPBadRequest(
-                body="couldn't find footer MIME doc")
-
-            fd = sock.makefile()
-            fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n'
-                     'Host: localhost\r\n'
-                     'Connection: close\r\n'
-                     'Etag: %s\r\n'
-                     'Content-Length: %d\r\n'
-                     'X-Storage-Token: t\r\n'
-                     'Content-Type: application/octet-stream\r\n'
-                     '\r\n%s' % (md5(obj).hexdigest(), len(obj), obj))
-            fd.flush()
-            headers = readuntil2crlfs(fd)
-
-        # Don't show a result of the bad conversation between proxy-server
-        # and object-server
-        exp = 'HTTP/1.1 503'
-        self.assertEqual(headers[:len(exp)], exp)
-        # Don't send commit to object-server if quorum responses consist of 4xx
-        self.assertEqual(0, call_count[0])
-
-        # no fragment archives should have landed on disk
-        partition, nodes = prosrv.get_object_ring(3).get_nodes(
-            'a', 'ec-con', 'quorum')
-        conf = {'devices': _testdir, 'mount_check': 'false'}
-
-        df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
-
-        for node in nodes:
-            df = df_mgr.get_diskfile(node['device'], partition,
-                                     'a', 'ec-con', 'quorum',
-                                     policy=POLICIES[3])
-            if os.path.exists(df._datadir):
-                self.assertFalse(os.listdir(df._datadir))  # should be empty
-
-    @unpatch_policies
-    def test_PUT_ec_if_none_match(self):
-        ec_policy = POLICIES[self.ec_policy_index]
-        self.put_container(ec_policy.name, ec_policy.name)
-
-        obj = 'ananepionic-lepidophyllous-ropewalker-neglectful'
-        prolis = _test_sockets[0]
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('PUT /v1/a/%s/inm HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'Etag: "%s"\r\n'
-                 'Content-Length: %d\r\n'
-                 'X-Storage-Token: t\r\n'
-                 'Content-Type: application/octet-stream\r\n'
-                 '\r\n%s' % (ec_policy.name, md5(obj).hexdigest(),
-                             len(obj), obj))
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 201'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('PUT /v1/a/%s/inm HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'If-None-Match: *\r\n'
-                 'Etag: "%s"\r\n'
-                 'Content-Length: %d\r\n'
-                 'X-Storage-Token: t\r\n'
-                 'Content-Type: application/octet-stream\r\n'
-                 '\r\n%s' % (ec_policy.name, md5(obj).hexdigest(),
-                             len(obj), obj))
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 412'
-        self.assertEqual(headers[:len(exp)], exp)
-
-    @unpatch_policies
-    def test_GET_ec(self):
-        prolis = _test_sockets[0]
-        prosrv = _test_servers[0]
-
-        ec_policy = POLICIES[self.ec_policy_index]
-        self.put_container(ec_policy.name, ec_policy.name)
-
-        obj = '0123456' * 11 * 17
-
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('PUT /v1/a/%s/go-get-it HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'Content-Length: %d\r\n'
-                 'X-Storage-Token: t\r\n'
-                 'X-Object-Meta-Color: chartreuse\r\n'
-                 'Content-Type: application/octet-stream\r\n'
-                 '\r\n%s' % (ec_policy.name, len(obj), obj))
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 201'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('GET /v1/a/%s/go-get-it HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'X-Storage-Token: t\r\n'
-                 '\r\n' % ec_policy.name)
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 200'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        headers = parse_headers_string(headers)
-        self.assertEqual(str(len(obj)), headers['Content-Length'])
-        self.assertEqual(md5(obj).hexdigest(), headers['Etag'])
-        self.assertEqual('chartreuse', headers['X-Object-Meta-Color'])
-
-        gotten_obj = ''
-        while True:
-            buf = fd.read(64)
-            if not buf:
-                break
-            gotten_obj += buf
-        self.assertEqual(gotten_obj, obj)
-        error_lines = prosrv.logger.get_lines_for_level('error')
-        warn_lines = prosrv.logger.get_lines_for_level('warning')
-        self.assertEqual(len(error_lines), 0)  # sanity
-        self.assertEqual(len(warn_lines), 0)  # sanity
-
     def _test_conditional_GET(self, policy):
         container_name = uuid.uuid4().hex
         object_path = '/v1/a/%s/conditionals' % container_name
@@ -1738,403 +1306,14 @@ class ECTestMixin(object):
         self.assertEqual(len(error_lines), 0)  # sanity
         self.assertEqual(len(warn_lines), 0)  # sanity
 
-    @unpatch_policies
-    def test_conditional_GET_ec(self):
-        policy = POLICIES[self.ec_policy_index]
-        self.assertEqual('erasure_coding', policy.policy_type)  # sanity
-        self._test_conditional_GET(policy)
-
-    @unpatch_policies
-    def test_GET_ec_big(self):
-        prolis = _test_sockets[0]
-        prosrv = _test_servers[0]
-
-        ec_policy = POLICIES[self.ec_policy_index]
-        self.put_container(ec_policy.name, ec_policy.name)
-
-        # our EC segment size is 4 KiB, so this is multiple (3) segments;
-        # we'll verify that with a sanity check
-        obj = 'a moose once bit my sister' * 400
-        self.assertGreater(
-            len(obj), ec_policy.ec_segment_size * 2,
-            "object is too small for proper testing")
-
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('PUT /v1/a/%s/big-obj-get HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'Content-Length: %d\r\n'
-                 'X-Storage-Token: t\r\n'
-                 'Content-Type: application/octet-stream\r\n'
-                 '\r\n%s' % (ec_policy.name, len(obj), obj))
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 201'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('GET /v1/a/%s/big-obj-get HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'X-Storage-Token: t\r\n'
-                 '\r\n' % ec_policy.name)
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 200'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        headers = parse_headers_string(headers)
-        self.assertEqual(str(len(obj)), headers['Content-Length'])
-        self.assertEqual(md5(obj).hexdigest(), headers['Etag'])
-
-        gotten_obj = ''
-        while True:
-            buf = fd.read(64)
-            if not buf:
-                break
-            gotten_obj += buf
-        # This may look like a redundant test, but when things fail, this
-        # has a useful failure message while the subsequent one spews piles
-        # of garbage and demolishes your terminal's scrollback buffer.
-        self.assertEqual(len(gotten_obj), len(obj))
-        self.assertEqual(gotten_obj, obj)
-        error_lines = prosrv.logger.get_lines_for_level('error')
-        warn_lines = prosrv.logger.get_lines_for_level('warning')
-        self.assertEqual(len(error_lines), 0)  # sanity
-        self.assertEqual(len(warn_lines), 0)  # sanity
-
-    @unpatch_policies
-    def test_GET_ec_failure_handling(self):
-        ec_policy = POLICIES[self.ec_policy_index]
-        self.put_container(ec_policy.name, ec_policy.name)
-
-        obj = 'look at this object; it is simply amazing ' * 500
-        prolis = _test_sockets[0]
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('PUT /v1/a/%s/crash-test-dummy HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'Content-Length: %d\r\n'
-                 'X-Storage-Token: t\r\n'
-                 'Content-Type: application/octet-stream\r\n'
-                 '\r\n%s' % (ec_policy.name, len(obj), obj))
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 201'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        def explodey_iter(inner_iter):
-            yield next(inner_iter)
-            raise Exception("doom ba doom")
-
-        def explodey_doc_parts_iter(inner_iter_iter):
-            try:
-                for item in inner_iter_iter:
-                    item = item.copy()  # paranoia about mutable data
-                    item['part_iter'] = explodey_iter(item['part_iter'])
-                    yield item
-            except GeneratorExit:
-                inner_iter_iter.close()
-                raise
-
-        real_ec_app_iter = swift.proxy.controllers.obj.ECAppIter
-
-        def explodey_ec_app_iter(path, policy, iterators, *a, **kw):
-            # Each thing in `iterators` here is a document-parts iterator,
-            # and we want to fail after getting a little into each part.
-            #
-            # That way, we ensure we've started streaming the response to
-            # the client when things go wrong.
-            return real_ec_app_iter(
-                path, policy,
-                [explodey_doc_parts_iter(i) for i in iterators],
-                *a, **kw)
-
-        with mock.patch("swift.proxy.controllers.obj.ECAppIter",
-                        explodey_ec_app_iter):
-            sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-            fd = sock.makefile()
-            fd.write('GET /v1/a/%s/crash-test-dummy HTTP/1.1\r\n'
-                     'Host: localhost\r\n'
-                     'Connection: close\r\n'
-                     'X-Storage-Token: t\r\n'
-                     '\r\n' % ec_policy.name)
-            fd.flush()
-            headers = readuntil2crlfs(fd)
-            exp = 'HTTP/1.1 200'
-            self.assertEqual(headers[:len(exp)], exp)
-
-            headers = parse_headers_string(headers)
-            self.assertEqual(str(len(obj)), headers['Content-Length'])
-            self.assertEqual(md5(obj).hexdigest(), headers['Etag'])
-
-            gotten_obj = ''
-            try:
-                # don't hang the test run when this fails
-                with Timeout(300):
-                    while True:
-                        buf = fd.read(64)
-                        if not buf:
-                            break
-                        gotten_obj += buf
-            except Timeout:
-                self.fail("GET hung when connection failed")
-
-            # Ensure we failed partway through, otherwise the mocks could
-            # get out of date without anyone noticing
-            self.assertTrue(0 < len(gotten_obj) < len(obj))
-
-    @unpatch_policies
-    def test_HEAD_ec(self):
-        prolis = _test_sockets[0]
-        prosrv = _test_servers[0]
-
-        ec_policy = POLICIES[self.ec_policy_index]
-        self.put_container(ec_policy.name, ec_policy.name)
-
-        obj = '0123456' * 11 * 17
-
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('PUT /v1/a/%s/go-head-it HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'Content-Length: %d\r\n'
-                 'X-Storage-Token: t\r\n'
-                 'X-Object-Meta-Color: chartreuse\r\n'
-                 'Content-Type: application/octet-stream\r\n'
-                 '\r\n%s' % (ec_policy.name, len(obj), obj))
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 201'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('HEAD /v1/a/%s/go-head-it HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'X-Storage-Token: t\r\n'
-                 '\r\n' % ec_policy.name)
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 200'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        headers = parse_headers_string(headers)
-        self.assertEqual(str(len(obj)), headers['Content-Length'])
-        self.assertEqual(md5(obj).hexdigest(), headers['Etag'])
-        self.assertEqual('chartreuse', headers['X-Object-Meta-Color'])
-
-        error_lines = prosrv.logger.get_lines_for_level('error')
-        warn_lines = prosrv.logger.get_lines_for_level('warning')
-        self.assertEqual(len(error_lines), 0)  # sanity
-        self.assertEqual(len(warn_lines), 0)  # sanity
-
-    @unpatch_policies
-    def test_GET_ec_404(self):
-        prolis = _test_sockets[0]
-        prosrv = _test_servers[0]
-
-        ec_policy = POLICIES[self.ec_policy_index]
-        self.put_container(ec_policy.name, ec_policy.name)
-
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('GET /v1/a/%s/yes-we-have-no-bananas HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'X-Storage-Token: t\r\n'
-                 '\r\n' % ec_policy.name)
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 404'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        error_lines = prosrv.logger.get_lines_for_level('error')
-        warn_lines = prosrv.logger.get_lines_for_level('warning')
-        self.assertEqual(len(error_lines), 0)  # sanity
-        self.assertEqual(len(warn_lines), 0)  # sanity
-
-    @unpatch_policies
-    def test_HEAD_ec_404(self):
-        prolis = _test_sockets[0]
-        prosrv = _test_servers[0]
-
-        ec_policy = POLICIES[self.ec_policy_index]
-        self.put_container(ec_policy.name, ec_policy.name)
-
-        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-        fd = sock.makefile()
-        fd.write('HEAD /v1/a/%s/yes-we-have-no-bananas HTTP/1.1\r\n'
-                 'Host: localhost\r\n'
-                 'Connection: close\r\n'
-                 'X-Storage-Token: t\r\n'
-                 '\r\n' % ec_policy.name)
-        fd.flush()
-        headers = readuntil2crlfs(fd)
-        exp = 'HTTP/1.1 404'
-        self.assertEqual(headers[:len(exp)], exp)
-
-        error_lines = prosrv.logger.get_lines_for_level('error')
-        warn_lines = prosrv.logger.get_lines_for_level('warning')
-        self.assertEqual(len(error_lines), 0)  # sanity
-        self.assertEqual(len(warn_lines), 0)  # sanity
-
-    @unpatch_policies
-    def test_reload_ring_ec(self):
-        policy = POLICIES[self.ec_policy_index]
-        self.put_container("ec", "ec-con")
-
-        orig_rtime = policy.object_ring._rtime
-        orig_replica_count = policy.object_ring.replica_count
-        # save original file as back up
-        copyfile(policy.object_ring.serialized_path,
-                 policy.object_ring.serialized_path + '.bak')
-
-        try:
-            # overwrite with 2 replica, 2 devices ring
-            obj_devs = []
-            obj_devs.append(
-                {'port': _test_sockets[-3].getsockname()[1],
-                 'device': 'sdg1'})
-            obj_devs.append(
-                {'port': _test_sockets[-2].getsockname()[1],
-                 'device': 'sdh1'})
-            write_fake_ring(policy.object_ring.serialized_path,
-                            *obj_devs)
-
-            def get_ring_reloaded_response(method):
-                # force to reload at the request
-                policy.object_ring._rtime = 0
-
-                trans_data = ['%s /v1/a/ec-con/o2 HTTP/1.1\r\n' % method,
-                              'Host: localhost\r\n',
-                              'Connection: close\r\n',
-                              'X-Storage-Token: t\r\n']
-
-                if method == 'PUT':
-                    # small, so we don't get multiple EC stripes
-                    obj = 'abCD' * 10
-
-                    extra_trans_data = [
-                        'Etag: "%s"\r\n' % md5(obj).hexdigest(),
-                        'Content-Length: %d\r\n' % len(obj),
-                        'Content-Type: application/octet-stream\r\n',
-                        '\r\n%s' % obj
-                    ]
-                    trans_data.extend(extra_trans_data)
-                else:
-                    trans_data.append('\r\n')
-
-                prolis = _test_sockets[0]
-                sock = connect_tcp(('localhost', prolis.getsockname()[1]))
-                fd = sock.makefile()
-                fd.write(''.join(trans_data))
-                fd.flush()
-                headers = readuntil2crlfs(fd)
-
-                # use older ring with rollbacking
-                return headers
-
-            for method in ('PUT', 'HEAD', 'GET', 'POST', 'DELETE'):
-                headers = get_ring_reloaded_response(method)
-                exp = 'HTTP/1.1 20'
-                self.assertEqual(headers[:len(exp)], exp)
-
-                # proxy didn't load newest ring, use older one
-                self.assertEqual(orig_replica_count,
-                                 policy.object_ring.replica_count)
-
-                if method == 'POST':
-                    # Take care fast post here!
-                    orig_post_as_copy = getattr(
-                        _test_servers[0], 'object_post_as_copy', None)
-                    try:
-                        _test_servers[0].object_post_as_copy = False
-                        with mock.patch.object(
-                                _test_servers[0],
-                                'object_post_as_copy', False):
-                            headers = get_ring_reloaded_response(method)
-                    finally:
-                        if orig_post_as_copy is None:
-                            del _test_servers[0].object_post_as_copy
-                        else:
-                            _test_servers[0].object_post_as_copy = \
-                                orig_post_as_copy
-
-                    exp = 'HTTP/1.1 20'
-                    self.assertEqual(headers[:len(exp)], exp)
-                    # sanity
-                    self.assertEqual(orig_replica_count,
-                                     policy.object_ring.replica_count)
-
-        finally:
-            policy.object_ring._rtime = orig_rtime
-            os.rename(policy.object_ring.serialized_path + '.bak',
-                      policy.object_ring.serialized_path)
-
 
 @patch_policies([StoragePolicy(0, 'zero', True,
                                object_ring=FakeRing(base_port=3000))])
-class TestObjectController(ECTestMixin, unittest.TestCase):
-    ec_policy_index = 3
-
-    def setUp(self):
-        self.app = proxy_server.Application(
-            None, FakeMemcache(),
-            logger=debug_logger('proxy-ut'),
-            account_ring=FakeRing(),
-            container_ring=FakeRing())
-        # clear proxy logger result for each test
-        _test_servers[0].logger._clear()
-
-    def tearDown(self):
-        self.app.account_ring.set_replicas(3)
-        self.app.container_ring.set_replicas(3)
-        for policy in POLICIES:
-            policy.object_ring = FakeRing(base_port=3000)
-
-    def assert_status_map(self, method, statuses, expected, raise_exc=False):
-        with save_globals():
-            kwargs = {}
-            if raise_exc:
-                kwargs['raise_exc'] = raise_exc
-
-            set_http_connect(*statuses, **kwargs)
-            self.app.memcache.store = {}
-            req = Request.blank('/v1/a/c/o',
-                                headers={'Content-Length': '0',
-                                         'Content-Type': 'text/plain'})
-            self.app.update_request(req)
-            try:
-                res = method(req)
-            except HTTPException as res:
-                pass
-            self.assertEqual(res.status_int, expected)
-
-            # repeat test
-            set_http_connect(*statuses, **kwargs)
-            self.app.memcache.store = {}
-            req = Request.blank('/v1/a/c/o',
-                                headers={'Content-Length': '0',
-                                         'Content-Type': 'text/plain'})
-            self.app.update_request(req)
-            try:
-                res = method(req)
-            except HTTPException as res:
-                pass
-            self.assertEqual(res.status_int, expected)
-
-    def _sleep_enough(self, condition):
-        for sleeptime in (0.1, 1.0):
-            sleep(sleeptime)
-            if condition():
-                break
-
+class TestReplicatedObjectController(
+        BaseTestObjectController, unittest.TestCase):
+    """
+    Test suite for replication policy
+    """
     @unpatch_policies
     def test_policy_IO(self):
         def check_file(policy, cont, devs, check_val):
@@ -5632,17 +4811,847 @@ class TestObjectController(ECTestMixin, unittest.TestCase):
         ])
 
 
-class TestObjectControllerECDuplication(ECTestMixin, unittest.TestCase):
-    ec_policy_index = 4
+class BaseTestECObjectController(BaseTestObjectController):
 
-    def setUp(self):
-        self.app = proxy_server.Application(
-            None, FakeMemcache(),
-            logger=debug_logger('proxy-ut'),
-            account_ring=FakeRing(),
-            container_ring=FakeRing())
-        # clear proxy logger result for each test
-        _test_servers[0].logger._clear()
+    @unpatch_policies
+    def test_PUT_ec(self):
+        policy = POLICIES[self.ec_policy_index]
+        self.put_container(policy.name, policy.name)
+
+        obj = 'abCD' * 10  # small, so we don't get multiple EC stripes
+        prolis = _test_sockets[0]
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('PUT /v1/a/%s/o1 HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'Etag: "%s"\r\n'
+                 'Content-Length: %d\r\n'
+                 'X-Storage-Token: t\r\n'
+                 'Content-Type: application/octet-stream\r\n'
+                 '\r\n%s' % (policy.name, md5(obj).hexdigest(),
+                             len(obj), obj))
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 201'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        ecd = policy.pyeclib_driver
+        expected_pieces = set(ecd.encode(obj))
+
+        # go to disk to make sure it's there and all erasure-coded
+        partition, nodes = policy.object_ring.get_nodes('a', policy.name, 'o1')
+        conf = {'devices': _testdir, 'mount_check': 'false'}
+        df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[policy]
+
+        got_pieces = set()
+        got_indices = set()
+        got_durable = []
+        for node_index, node in enumerate(nodes):
+            df = df_mgr.get_diskfile(node['device'], partition,
+                                     'a', policy.name, 'o1',
+                                     policy=policy)
+            with df.open():
+                meta = df.get_metadata()
+                contents = ''.join(df.reader())
+                got_pieces.add(contents)
+
+                lmeta = dict((k.lower(), v) for k, v in meta.items())
+                got_indices.add(
+                    lmeta['x-object-sysmeta-ec-frag-index'])
+
+                self.assertEqual(
+                    lmeta['x-object-sysmeta-ec-etag'],
+                    md5(obj).hexdigest())
+                self.assertEqual(
+                    lmeta['x-object-sysmeta-ec-content-length'],
+                    str(len(obj)))
+                self.assertEqual(
+                    lmeta['x-object-sysmeta-ec-segment-size'],
+                    '4096')
+                self.assertEqual(
+                    lmeta['x-object-sysmeta-ec-scheme'],
+                    '%s 2+1' % DEFAULT_TEST_EC_TYPE)
+                self.assertEqual(
+                    lmeta['etag'],
+                    md5(contents).hexdigest())
+
+                # check presence for a durable data file for the timestamp
+                durable_file = (
+                    utils.Timestamp(df.timestamp).internal +
+                    '#%s' % lmeta['x-object-sysmeta-ec-frag-index'] +
+                    '#d.data')
+                durable_file = os.path.join(
+                    _testdir, node['device'], storage_directory(
+                        diskfile.get_data_dir(policy),
+                        partition, hash_path('a', policy.name, 'o1')),
+                    durable_file)
+                if os.path.isfile(durable_file):
+                    got_durable.append(True)
+
+        self.assertEqual(expected_pieces, got_pieces)
+        self.assertEqual(set(('0', '1', '2')), got_indices)
+
+        # verify at least 2 puts made it all the way to the end of 2nd
+        # phase, ie at least 2 durable statuses were written
+        num_durable_puts = sum(d is True for d in got_durable)
+        self.assertGreaterEqual(num_durable_puts, 2)
+
+    @unpatch_policies
+    def test_PUT_ec_multiple_segments(self):
+        ec_policy = POLICIES[self.ec_policy_index]
+        self.put_container(ec_policy.name, ec_policy.name)
+
+        pyeclib_header_size = len(ec_policy.pyeclib_driver.encode("")[0])
+        segment_size = ec_policy.ec_segment_size
+
+        # Big enough to have multiple segments. Also a multiple of the
+        # segment size to get coverage of that path too.
+        obj = 'ABC' * segment_size
+
+        prolis = _test_sockets[0]
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('PUT /v1/a/%s/o2 HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'Content-Length: %d\r\n'
+                 'X-Storage-Token: t\r\n'
+                 'Content-Type: application/octet-stream\r\n'
+                 '\r\n%s' % (ec_policy.name, len(obj), obj))
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 201'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        # it's a 2+1 erasure code, so each fragment archive should be half
+        # the length of the object, plus three inline pyeclib metadata
+        # things (one per segment)
+        expected_length = (len(obj) / 2 + pyeclib_header_size * 3)
+
+        partition, nodes = ec_policy.object_ring.get_nodes(
+            'a', ec_policy.name, 'o2')
+
+        conf = {'devices': _testdir, 'mount_check': 'false'}
+        df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
+
+        got_durable = []
+        fragment_archives = []
+        for node in nodes:
+            df = df_mgr.get_diskfile(
+                node['device'], partition, 'a',
+                ec_policy.name, 'o2', policy=ec_policy)
+            with df.open():
+                meta = df.get_metadata()
+                contents = ''.join(df.reader())
+                fragment_archives.append(contents)
+                self.assertEqual(len(contents), expected_length)
+
+                durable_file = (
+                    utils.Timestamp(df.timestamp).internal +
+                    '#%s' % meta['X-Object-Sysmeta-Ec-Frag-Index'] +
+                    '#d.data')
+                durable_file = os.path.join(
+                    _testdir, node['device'], storage_directory(
+                        diskfile.get_data_dir(ec_policy),
+                        partition, hash_path('a', ec_policy.name, 'o2')),
+                    durable_file)
+                if os.path.isfile(durable_file):
+                    got_durable.append(True)
+
+        # Verify that we can decode each individual fragment and that they
+        # are all the correct size
+        fragment_size = ec_policy.fragment_size
+        nfragments = int(
+            math.ceil(float(len(fragment_archives[0])) / fragment_size))
+
+        for fragment_index in range(nfragments):
+            fragment_start = fragment_index * fragment_size
+            fragment_end = (fragment_index + 1) * fragment_size
+
+            try:
+                frags = [fa[fragment_start:fragment_end]
+                         for fa in fragment_archives]
+                seg = ec_policy.pyeclib_driver.decode(frags)
+            except ECDriverError:
+                self.fail("Failed to decode fragments %d; this probably "
+                          "means the fragments are not the sizes they "
+                          "should be" % fragment_index)
+
+            segment_start = fragment_index * segment_size
+            segment_end = (fragment_index + 1) * segment_size
+
+            self.assertEqual(seg, obj[segment_start:segment_end])
+
+        # verify at least 2 puts made it all the way to the end of 2nd
+        # phase, ie at least 2 .durable statuses were written
+        num_durable_puts = sum(d is True for d in got_durable)
+        self.assertGreaterEqual(num_durable_puts, 2)
+
+    @unpatch_policies
+    def test_PUT_ec_object_etag_mismatch(self):
+        ec_policy = POLICIES[self.ec_policy_index]
+        self.put_container(ec_policy.name, ec_policy.name)
+
+        obj = '90:6A:02:60:B1:08-96da3e706025537fc42464916427727e'
+        prolis = _test_sockets[0]
+        prosrv = _test_servers[0]
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('PUT /v1/a/%s/o3 HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'Etag: %s\r\n'
+                 'Content-Length: %d\r\n'
+                 'X-Storage-Token: t\r\n'
+                 'Content-Type: application/octet-stream\r\n'
+                 '\r\n%s' % (ec_policy.name,
+                             md5('something else').hexdigest(),
+                             len(obj), obj))
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 422'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        # nothing should have made it to disk on the object servers
+        partition, nodes = prosrv.get_object_ring(
+            int(ec_policy)).get_nodes('a', ec_policy.name, 'o3')
+        conf = {'devices': _testdir, 'mount_check': 'false'}
+
+        df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
+
+        for node in nodes:
+            df = df_mgr.get_diskfile(node['device'], partition,
+                                     'a', ec_policy.name, 'o3',
+                                     policy=ec_policy)
+            self.assertRaises(DiskFileNotExist, df.open)
+
+    @unpatch_policies
+    def test_PUT_ec_fragment_archive_etag_mismatch(self):
+        ec_policy = POLICIES[self.ec_policy_index]
+        self.put_container(ec_policy.name, ec_policy.name)
+
+        # Cause a hash mismatch by feeding one particular MD5 hasher some
+        # extra data. The goal here is to get exactly more than one of the
+        # hashers in an object server.
+        count = (ec_policy.object_ring.replica_count - ec_policy.ec_ndata)
+        countdown = [count]
+
+        def busted_md5_constructor(initial_str=""):
+            hasher = md5(initial_str)
+            if countdown[0] > 0:
+                hasher.update('wrong')
+            countdown[0] -= 1
+            return hasher
+
+        obj = 'uvarovite-esurience-cerated-symphysic'
+        prolis = _test_sockets[0]
+        prosrv = _test_servers[0]
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        with mock.patch('swift.obj.server.md5', busted_md5_constructor):
+            fd = sock.makefile()
+            fd.write('PUT /v1/a/%s/pimento HTTP/1.1\r\n'
+                     'Host: localhost\r\n'
+                     'Connection: close\r\n'
+                     'Etag: %s\r\n'
+                     'Content-Length: %d\r\n'
+                     'X-Storage-Token: t\r\n'
+                     'Content-Type: application/octet-stream\r\n'
+                     '\r\n%s' % (ec_policy.name, md5(obj).hexdigest(),
+                                 len(obj), obj))
+            fd.flush()
+            headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 503'  # no quorum
+        self.assertEqual(headers[:len(exp)], exp)
+
+        # replica count - 1 of the fragment archives should have
+        # landed on disk
+        partition, nodes = prosrv.get_object_ring(
+            int(ec_policy)).get_nodes('a', ec_policy.name, 'pimento')
+        conf = {'devices': _testdir, 'mount_check': 'false'}
+
+        df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
+
+        found = 0
+        for node in nodes:
+            df = df_mgr.get_diskfile(node['device'], partition,
+                                     'a', ec_policy.name, 'pimento',
+                                     policy=ec_policy)
+            try:
+                # diskfile open won't succeed because no durable was written,
+                # so look under the hood for data files.
+                files = os.listdir(df._datadir)
+                if len(files) > 0:
+                    # Although the third fragment archive hasn't landed on
+                    # disk, the directory df._datadir is pre-maturely created
+                    # and is empty when we use O_TMPFILE + linkat()
+                    num_data_files = \
+                        len([f for f in files if f.endswith('.data')])
+                    self.assertEqual(1, num_data_files)
+                    found += 1
+            except OSError:
+                pass
+        self.assertEqual(found, ec_policy.ec_ndata)
+
+    @unpatch_policies
+    def test_PUT_ec_fragment_quorum_archive_etag_mismatch(self):
+        ec_policy = POLICIES[self.ec_policy_index]
+        self.put_container("ec", "ec-con")
+
+        def busted_md5_constructor(initial_str=""):
+            hasher = md5(initial_str)
+            hasher.update('wrong')
+            return hasher
+
+        obj = 'uvarovite-esurience-cerated-symphysic'
+        prolis = _test_sockets[0]
+        prosrv = _test_servers[0]
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+
+        call_count = [0]
+
+        def mock_committer(self):
+            call_count[0] += 1
+
+        commit_confirmation = \
+            'swift.proxy.controllers.obj.MIMEPutter.send_commit_confirmation'
+
+        with mock.patch('swift.obj.server.md5', busted_md5_constructor), \
+                mock.patch(commit_confirmation, mock_committer):
+            fd = sock.makefile()
+            fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n'
+                     'Host: localhost\r\n'
+                     'Connection: close\r\n'
+                     'Etag: %s\r\n'
+                     'Content-Length: %d\r\n'
+                     'X-Storage-Token: t\r\n'
+                     'Content-Type: application/octet-stream\r\n'
+                     '\r\n%s' % (md5(obj).hexdigest(), len(obj), obj))
+            fd.flush()
+            headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 503'  # no quorum
+        self.assertEqual(headers[:len(exp)], exp)
+        # Don't send commit to object-server if quorum responses consist of 4xx
+        self.assertEqual(0, call_count[0])
+
+        # no fragment archives should have landed on disk
+        partition, nodes = prosrv.get_object_ring(3).get_nodes(
+            'a', 'ec-con', 'quorum')
+        conf = {'devices': _testdir, 'mount_check': 'false'}
+
+        df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
+
+        for node in nodes:
+            df = df_mgr.get_diskfile(node['device'], partition,
+                                     'a', 'ec-con', 'quorum',
+                                     policy=ec_policy)
+            if os.path.exists(df._datadir):
+                self.assertFalse(os.listdir(df._datadir))  # should be empty
+
+    @unpatch_policies
+    def test_PUT_ec_fragment_quorum_bad_request(self):
+        ec_policy = POLICIES[self.ec_policy_index]
+        self.put_container("ec", "ec-con")
+
+        obj = 'uvarovite-esurience-cerated-symphysic'
+        prolis = _test_sockets[0]
+        prosrv = _test_servers[0]
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+
+        call_count = [0]
+
+        def mock_committer(self):
+            call_count[0] += 1
+
+        read_footer = \
+            'swift.obj.server.ObjectController._read_metadata_footer'
+        commit_confirmation = \
+            'swift.proxy.controllers.obj.MIMEPutter.send_commit_confirmation'
+
+        with mock.patch(read_footer) as read_footer_call, \
+                mock.patch(commit_confirmation, mock_committer):
+            # Emulate missing footer MIME doc in all object-servers
+            read_footer_call.side_effect = HTTPBadRequest(
+                body="couldn't find footer MIME doc")
+
+            fd = sock.makefile()
+            fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n'
+                     'Host: localhost\r\n'
+                     'Connection: close\r\n'
+                     'Etag: %s\r\n'
+                     'Content-Length: %d\r\n'
+                     'X-Storage-Token: t\r\n'
+                     'Content-Type: application/octet-stream\r\n'
+                     '\r\n%s' % (md5(obj).hexdigest(), len(obj), obj))
+            fd.flush()
+            headers = readuntil2crlfs(fd)
+
+        # Don't show a result of the bad conversation between proxy-server
+        # and object-server
+        exp = 'HTTP/1.1 503'
+        self.assertEqual(headers[:len(exp)], exp)
+        # Don't send commit to object-server if quorum responses consist of 4xx
+        self.assertEqual(0, call_count[0])
+
+        # no fragment archives should have landed on disk
+        partition, nodes = prosrv.get_object_ring(3).get_nodes(
+            'a', 'ec-con', 'quorum')
+        conf = {'devices': _testdir, 'mount_check': 'false'}
+
+        df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy]
+
+        for node in nodes:
+            df = df_mgr.get_diskfile(node['device'], partition,
+                                     'a', 'ec-con', 'quorum',
+                                     policy=ec_policy)
+            if os.path.exists(df._datadir):
+                self.assertFalse(os.listdir(df._datadir))  # should be empty
+
+    @unpatch_policies
+    def test_PUT_ec_if_none_match(self):
+        ec_policy = POLICIES[self.ec_policy_index]
+        self.put_container(ec_policy.name, ec_policy.name)
+
+        obj = 'ananepionic-lepidophyllous-ropewalker-neglectful'
+        prolis = _test_sockets[0]
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('PUT /v1/a/%s/inm HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'Etag: "%s"\r\n'
+                 'Content-Length: %d\r\n'
+                 'X-Storage-Token: t\r\n'
+                 'Content-Type: application/octet-stream\r\n'
+                 '\r\n%s' % (ec_policy.name, md5(obj).hexdigest(),
+                             len(obj), obj))
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 201'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('PUT /v1/a/%s/inm HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'If-None-Match: *\r\n'
+                 'Etag: "%s"\r\n'
+                 'Content-Length: %d\r\n'
+                 'X-Storage-Token: t\r\n'
+                 'Content-Type: application/octet-stream\r\n'
+                 '\r\n%s' % (ec_policy.name, md5(obj).hexdigest(),
+                             len(obj), obj))
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 412'
+        self.assertEqual(headers[:len(exp)], exp)
+
+    @unpatch_policies
+    def test_GET_ec(self):
+        prolis = _test_sockets[0]
+        prosrv = _test_servers[0]
+
+        ec_policy = POLICIES[self.ec_policy_index]
+        self.put_container(ec_policy.name, ec_policy.name)
+
+        obj = '0123456' * 11 * 17
+
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('PUT /v1/a/%s/go-get-it HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'Content-Length: %d\r\n'
+                 'X-Storage-Token: t\r\n'
+                 'X-Object-Meta-Color: chartreuse\r\n'
+                 'Content-Type: application/octet-stream\r\n'
+                 '\r\n%s' % (ec_policy.name, len(obj), obj))
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 201'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('GET /v1/a/%s/go-get-it HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'X-Storage-Token: t\r\n'
+                 '\r\n' % ec_policy.name)
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 200'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        headers = parse_headers_string(headers)
+        self.assertEqual(str(len(obj)), headers['Content-Length'])
+        self.assertEqual(md5(obj).hexdigest(), headers['Etag'])
+        self.assertEqual('chartreuse', headers['X-Object-Meta-Color'])
+
+        gotten_obj = ''
+        while True:
+            buf = fd.read(64)
+            if not buf:
+                break
+            gotten_obj += buf
+        self.assertEqual(gotten_obj, obj)
+        error_lines = prosrv.logger.get_lines_for_level('error')
+        warn_lines = prosrv.logger.get_lines_for_level('warning')
+        self.assertEqual(len(error_lines), 0)  # sanity
+        self.assertEqual(len(warn_lines), 0)  # sanity
+
+    @unpatch_policies
+    def test_conditional_GET_ec(self):
+        policy = POLICIES[self.ec_policy_index]
+        self.assertEqual('erasure_coding', policy.policy_type)  # sanity
+        self._test_conditional_GET(policy)
+
+    @unpatch_policies
+    def test_GET_ec_big(self):
+        prolis = _test_sockets[0]
+        prosrv = _test_servers[0]
+
+        ec_policy = POLICIES[self.ec_policy_index]
+        self.put_container(ec_policy.name, ec_policy.name)
+
+        # our EC segment size is 4 KiB, so this is multiple (3) segments;
+        # we'll verify that with a sanity check
+        obj = 'a moose once bit my sister' * 400
+        self.assertGreater(
+            len(obj), ec_policy.ec_segment_size * 2,
+            "object is too small for proper testing")
+
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('PUT /v1/a/%s/big-obj-get HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'Content-Length: %d\r\n'
+                 'X-Storage-Token: t\r\n'
+                 'Content-Type: application/octet-stream\r\n'
+                 '\r\n%s' % (ec_policy.name, len(obj), obj))
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 201'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('GET /v1/a/%s/big-obj-get HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'X-Storage-Token: t\r\n'
+                 '\r\n' % ec_policy.name)
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 200'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        headers = parse_headers_string(headers)
+        self.assertEqual(str(len(obj)), headers['Content-Length'])
+        self.assertEqual(md5(obj).hexdigest(), headers['Etag'])
+
+        gotten_obj = ''
+        while True:
+            buf = fd.read(64)
+            if not buf:
+                break
+            gotten_obj += buf
+        # This may look like a redundant test, but when things fail, this
+        # has a useful failure message while the subsequent one spews piles
+        # of garbage and demolishes your terminal's scrollback buffer.
+        self.assertEqual(len(gotten_obj), len(obj))
+        self.assertEqual(gotten_obj, obj)
+        error_lines = prosrv.logger.get_lines_for_level('error')
+        warn_lines = prosrv.logger.get_lines_for_level('warning')
+        self.assertEqual(len(error_lines), 0)  # sanity
+        self.assertEqual(len(warn_lines), 0)  # sanity
+
+    @unpatch_policies
+    def test_GET_ec_failure_handling(self):
+        ec_policy = POLICIES[self.ec_policy_index]
+        self.put_container(ec_policy.name, ec_policy.name)
+
+        obj = 'look at this object; it is simply amazing ' * 500
+        prolis = _test_sockets[0]
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('PUT /v1/a/%s/crash-test-dummy HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'Content-Length: %d\r\n'
+                 'X-Storage-Token: t\r\n'
+                 'Content-Type: application/octet-stream\r\n'
+                 '\r\n%s' % (ec_policy.name, len(obj), obj))
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 201'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        def explodey_iter(inner_iter):
+            yield next(inner_iter)
+            raise Exception("doom ba doom")
+
+        def explodey_doc_parts_iter(inner_iter_iter):
+            try:
+                for item in inner_iter_iter:
+                    item = item.copy()  # paranoia about mutable data
+                    item['part_iter'] = explodey_iter(item['part_iter'])
+                    yield item
+            except GeneratorExit:
+                inner_iter_iter.close()
+                raise
+
+        real_ec_app_iter = swift.proxy.controllers.obj.ECAppIter
+
+        def explodey_ec_app_iter(path, policy, iterators, *a, **kw):
+            # Each thing in `iterators` here is a document-parts iterator,
+            # and we want to fail after getting a little into each part.
+            #
+            # That way, we ensure we've started streaming the response to
+            # the client when things go wrong.
+            return real_ec_app_iter(
+                path, policy,
+                [explodey_doc_parts_iter(i) for i in iterators],
+                *a, **kw)
+
+        with mock.patch("swift.proxy.controllers.obj.ECAppIter",
+                        explodey_ec_app_iter):
+            sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+            fd = sock.makefile()
+            fd.write('GET /v1/a/%s/crash-test-dummy HTTP/1.1\r\n'
+                     'Host: localhost\r\n'
+                     'Connection: close\r\n'
+                     'X-Storage-Token: t\r\n'
+                     '\r\n' % ec_policy.name)
+            fd.flush()
+            headers = readuntil2crlfs(fd)
+            exp = 'HTTP/1.1 200'
+            self.assertEqual(headers[:len(exp)], exp)
+
+            headers = parse_headers_string(headers)
+            self.assertEqual(str(len(obj)), headers['Content-Length'])
+            self.assertEqual(md5(obj).hexdigest(), headers['Etag'])
+
+            gotten_obj = ''
+            try:
+                # don't hang the test run when this fails
+                with Timeout(300):
+                    while True:
+                        buf = fd.read(64)
+                        if not buf:
+                            break
+                        gotten_obj += buf
+            except Timeout:
+                self.fail("GET hung when connection failed")
+
+            # Ensure we failed partway through, otherwise the mocks could
+            # get out of date without anyone noticing
+            self.assertTrue(0 < len(gotten_obj) < len(obj))
+
+    @unpatch_policies
+    def test_HEAD_ec(self):
+        prolis = _test_sockets[0]
+        prosrv = _test_servers[0]
+
+        ec_policy = POLICIES[self.ec_policy_index]
+        self.put_container(ec_policy.name, ec_policy.name)
+
+        obj = '0123456' * 11 * 17
+
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('PUT /v1/a/%s/go-head-it HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'Content-Length: %d\r\n'
+                 'X-Storage-Token: t\r\n'
+                 'X-Object-Meta-Color: chartreuse\r\n'
+                 'Content-Type: application/octet-stream\r\n'
+                 '\r\n%s' % (ec_policy.name, len(obj), obj))
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 201'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('HEAD /v1/a/%s/go-head-it HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'X-Storage-Token: t\r\n'
+                 '\r\n' % ec_policy.name)
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 200'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        headers = parse_headers_string(headers)
+        self.assertEqual(str(len(obj)), headers['Content-Length'])
+        self.assertEqual(md5(obj).hexdigest(), headers['Etag'])
+        self.assertEqual('chartreuse', headers['X-Object-Meta-Color'])
+
+        error_lines = prosrv.logger.get_lines_for_level('error')
+        warn_lines = prosrv.logger.get_lines_for_level('warning')
+        self.assertEqual(len(error_lines), 0)  # sanity
+        self.assertEqual(len(warn_lines), 0)  # sanity
+
+    @unpatch_policies
+    def test_GET_ec_404(self):
+        prolis = _test_sockets[0]
+        prosrv = _test_servers[0]
+
+        ec_policy = POLICIES[self.ec_policy_index]
+        self.put_container(ec_policy.name, ec_policy.name)
+
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('GET /v1/a/%s/yes-we-have-no-bananas HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'X-Storage-Token: t\r\n'
+                 '\r\n' % ec_policy.name)
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 404'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        error_lines = prosrv.logger.get_lines_for_level('error')
+        warn_lines = prosrv.logger.get_lines_for_level('warning')
+        self.assertEqual(len(error_lines), 0)  # sanity
+        self.assertEqual(len(warn_lines), 0)  # sanity
+
+    @unpatch_policies
+    def test_HEAD_ec_404(self):
+        prolis = _test_sockets[0]
+        prosrv = _test_servers[0]
+
+        ec_policy = POLICIES[self.ec_policy_index]
+        self.put_container(ec_policy.name, ec_policy.name)
+
+        sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+        fd = sock.makefile()
+        fd.write('HEAD /v1/a/%s/yes-we-have-no-bananas HTTP/1.1\r\n'
+                 'Host: localhost\r\n'
+                 'Connection: close\r\n'
+                 'X-Storage-Token: t\r\n'
+                 '\r\n' % ec_policy.name)
+        fd.flush()
+        headers = readuntil2crlfs(fd)
+        exp = 'HTTP/1.1 404'
+        self.assertEqual(headers[:len(exp)], exp)
+
+        error_lines = prosrv.logger.get_lines_for_level('error')
+        warn_lines = prosrv.logger.get_lines_for_level('warning')
+        self.assertEqual(len(error_lines), 0)  # sanity
+        self.assertEqual(len(warn_lines), 0)  # sanity
+
+    @unpatch_policies
+    def test_reload_ring_ec(self):
+        policy = POLICIES[self.ec_policy_index]
+        self.put_container("ec", "ec-con")
+
+        orig_rtime = policy.object_ring._rtime
+        orig_replica_count = policy.object_ring.replica_count
+        # save original file as back up
+        copyfile(policy.object_ring.serialized_path,
+                 policy.object_ring.serialized_path + '.bak')
+
+        try:
+            # overwrite with 2 replica, 2 devices ring
+            obj_devs = []
+            obj_devs.append(
+                {'port': _test_sockets[-3].getsockname()[1],
+                 'device': 'sdg1'})
+            obj_devs.append(
+                {'port': _test_sockets[-2].getsockname()[1],
+                 'device': 'sdh1'})
+            write_fake_ring(policy.object_ring.serialized_path,
+                            *obj_devs)
+
+            def get_ring_reloaded_response(method):
+                # force to reload at the request
+                policy.object_ring._rtime = 0
+
+                trans_data = ['%s /v1/a/ec-con/o2 HTTP/1.1\r\n' % method,
+                              'Host: localhost\r\n',
+                              'Connection: close\r\n',
+                              'X-Storage-Token: t\r\n']
+
+                if method == 'PUT':
+                    # small, so we don't get multiple EC stripes
+                    obj = 'abCD' * 10
+
+                    extra_trans_data = [
+                        'Etag: "%s"\r\n' % md5(obj).hexdigest(),
+                        'Content-Length: %d\r\n' % len(obj),
+                        'Content-Type: application/octet-stream\r\n',
+                        '\r\n%s' % obj
+                    ]
+                    trans_data.extend(extra_trans_data)
+                else:
+                    trans_data.append('\r\n')
+
+                prolis = _test_sockets[0]
+                sock = connect_tcp(('localhost', prolis.getsockname()[1]))
+                fd = sock.makefile()
+                fd.write(''.join(trans_data))
+                fd.flush()
+                headers = readuntil2crlfs(fd)
+
+                # use older ring with rollbacking
+                return headers
+
+            for method in ('PUT', 'HEAD', 'GET', 'POST', 'DELETE'):
+                headers = get_ring_reloaded_response(method)
+                exp = 'HTTP/1.1 20'
+                self.assertEqual(headers[:len(exp)], exp)
+
+                # proxy didn't load newest ring, use older one
+                self.assertEqual(orig_replica_count,
+                                 policy.object_ring.replica_count)
+
+                if method == 'POST':
+                    # Take care fast post here!
+                    orig_post_as_copy = getattr(
+                        _test_servers[0], 'object_post_as_copy', None)
+                    try:
+                        _test_servers[0].object_post_as_copy = False
+                        with mock.patch.object(
+                                _test_servers[0],
+                                'object_post_as_copy', False):
+                            headers = get_ring_reloaded_response(method)
+                    finally:
+                        if orig_post_as_copy is None:
+                            del _test_servers[0].object_post_as_copy
+                        else:
+                            _test_servers[0].object_post_as_copy = \
+                                orig_post_as_copy
+
+                    exp = 'HTTP/1.1 20'
+                    self.assertEqual(headers[:len(exp)], exp)
+                    # sanity
+                    self.assertEqual(orig_replica_count,
+                                     policy.object_ring.replica_count)
+
+        finally:
+            policy.object_ring._rtime = orig_rtime
+            os.rename(policy.object_ring.serialized_path + '.bak',
+                      policy.object_ring.serialized_path)
+
+
+@patch_policies([StoragePolicy(0, 'zero', True,
+                               object_ring=FakeRing(base_port=3000))])
+class TestECObjectController(BaseTestECObjectController, unittest.TestCase):
+    ec_policy_index = 3
+
+
+@patch_policies([StoragePolicy(0, 'zero', True,
+                               object_ring=FakeRing(base_port=3000))])
+class TestECDuplicationObjectController(
+        BaseTestECObjectController, unittest.TestCase):
+    ec_policy_index = 4
 
 
 class TestECMismatchedFA(unittest.TestCase):