From 3637b1abd9240f7feeebc7f299d42fe8815d7dcd Mon Sep 17 00:00:00 2001
From: Anish Kachinthaya <akachinthaya@nvidia.com>
Date: Mon, 11 Mar 2024 19:02:10 -0400
Subject: [PATCH] add bytes of expiring objects to queue entry

The size in bytes from object metadata of expiring objects are stored in
expirey queue entries under the content_type field.

The x-content-type-timestamp take from object metadata is provided along
with the x-content-type update so the container replicator resolves the
latest content-type and ensures eventual consistency.

UpgradeImpact: During rolling upgrades you should expect expirer queue
entries to continue lacking swift_expirer_bytes= annotations until ALL
object servers replicas have been upgraded to new code.

Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>

Change-Id: Ie4b25f1bd16def4069878983049b83de06f68e54
---
 swift/obj/expirer.py                |  35 ++-
 swift/obj/server.py                 |  63 +++--
 test/probe/test_object_expirer.py   | 139 ++++++++++
 test/unit/container/test_backend.py | 110 ++++++++
 test/unit/obj/test_expirer.py       | 109 +++++++-
 test/unit/obj/test_server.py        | 383 +++++++++++++++++++++++++++-
 6 files changed, 804 insertions(+), 35 deletions(-)

diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py
index 122ee80cf5..180aa2302b 100644
--- a/swift/obj/expirer.py
+++ b/swift/obj/expirer.py
@@ -29,7 +29,8 @@ from swift.common.daemon import Daemon
 from swift.common.internal_client import InternalClient, UnexpectedResponse
 from swift.common.utils import get_logger, dump_recon_cache, split_path, \
     Timestamp, config_true_value, normalize_delete_at_timestamp, \
-    RateLimitedIterator, md5, non_negative_float, non_negative_int
+    RateLimitedIterator, md5, non_negative_float, non_negative_int, \
+    parse_content_type
 from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
     HTTP_PRECONDITION_FAILED
 from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH
@@ -37,6 +38,7 @@ from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH
 from swift.container.reconciler import direct_delete_container_entry
 
 MAX_OBJECTS_TO_CACHE = 100000
+X_DELETE_TYPE = 'text/plain'
 ASYNC_DELETE_TYPE = 'application/async-deleted'
 
 
@@ -67,6 +69,37 @@ def parse_task_obj(task_obj):
     return timestamp, target_account, target_container, target_obj
 
 
+def extract_expirer_bytes_from_ctype(content_type):
+    """
+    Parse a content-type and return the number of bytes.
+
+    :param content_type: a content-type string
+    :return: int or None
+    """
+    content_type, params = parse_content_type(content_type)
+    bytes_size = None
+    for k, v in params:
+        if k == 'swift_expirer_bytes':
+            bytes_size = int(v)
+    return bytes_size
+
+
+def embed_expirer_bytes_in_ctype(content_type, metadata):
+    """
+    Embed number of bytes into content-type.  The bytes should come from
+    content-length on regular objects, but future extensions to "bytes in
+    expirer queue" monitoring may want to more closely consider expiration of
+    large multipart object manifests.
+
+    :param content_type: a content-type string
+    :param metadata: a dict, from Diskfile metadata
+    :return: str
+    """
+    # as best I can tell this key is required by df.open
+    report_bytes = metadata['Content-Length']
+    return "%s;swift_expirer_bytes=%d" % (content_type, int(report_bytes))
+
+
 def read_conf_for_delay_reaping_times(conf):
     delay_reaping_times = {}
     for conf_key in conf:
diff --git a/swift/obj/server.py b/swift/obj/server.py
index 57651066ae..956db55dfd 100644
--- a/swift/obj/server.py
+++ b/swift/obj/server.py
@@ -59,7 +59,8 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
     HTTPInsufficientStorage, HTTPForbidden, HTTPException, HTTPConflict, \
     HTTPServerError, bytes_to_wsgi, wsgi_to_bytes, wsgi_to_str, normalize_etag
 from swift.obj.diskfile import RESERVED_DATAFILE_META, DiskFileRouter
-from swift.obj.expirer import build_task_obj
+from swift.obj.expirer import build_task_obj, embed_expirer_bytes_in_ctype, \
+    X_DELETE_TYPE
 
 
 def iter_mime_headers_and_bodies(wsgi_input, mime_boundary, read_chunk_size):
@@ -437,7 +438,7 @@ class ObjectController(BaseStorageServer):
                 self.container_update_timeout, updates)
 
     def delete_at_update(self, op, delete_at, account, container, obj,
-                         request, objdevice, policy):
+                         request, objdevice, policy, extra_headers=None):
         """
         Update the expiring objects container when objects are updated.
 
@@ -449,6 +450,7 @@ class ObjectController(BaseStorageServer):
         :param request: the original request driving the update
         :param objdevice: device name that the object is in
         :param policy: the BaseStoragePolicy instance (used for tmp dir)
+        :param extra_headers: dict of additional headers for the update
         """
         if config_true_value(
                 request.headers.get('x-backend-replication', 'f')):
@@ -494,8 +496,10 @@ class ObjectController(BaseStorageServer):
             if not updates:
                 updates = [(None, None)]
             headers_out['x-size'] = '0'
-            headers_out['x-content-type'] = 'text/plain'
+            headers_out['x-content-type'] = X_DELETE_TYPE
             headers_out['x-etag'] = 'd41d8cd98f00b204e9800998ecf8427e'
+            if extra_headers:
+                headers_out.update(extra_headers)
         else:
             if not config_true_value(
                 request.headers.get(
@@ -620,6 +624,24 @@ class ObjectController(BaseStorageServer):
                     override = key.lower().replace(override_prefix, 'x-')
                     update_headers[override] = val
 
+    def _conditional_delete_at_update(self, request, device, account,
+                                      container, obj, policy, metadata,
+                                      orig_delete_at, new_delete_at):
+        if new_delete_at:
+            extra_headers = {
+                'x-content-type': embed_expirer_bytes_in_ctype(
+                    X_DELETE_TYPE, metadata),
+                'x-content-type-timestamp':
+                metadata.get('X-Timestamp'),
+            }
+            self.delete_at_update(
+                'PUT', new_delete_at, account, container, obj, request,
+                device, policy, extra_headers)
+        if orig_delete_at and orig_delete_at != new_delete_at:
+            self.delete_at_update(
+                'DELETE', orig_delete_at, account, container, obj,
+                request, device, policy)
+
     @public
     @timing_stats()
     def POST(self, request):
@@ -675,15 +697,11 @@ class ObjectController(BaseStorageServer):
                         wsgi_to_bytes(header_key).title())
                     metadata[header_caps] = request.headers[header_key]
             orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
-            if orig_delete_at != new_delete_at:
-                if new_delete_at:
-                    self.delete_at_update(
-                        'PUT', new_delete_at, account, container, obj, request,
-                        device, policy)
-                if orig_delete_at:
-                    self.delete_at_update('DELETE', orig_delete_at, account,
-                                          container, obj, request, device,
-                                          policy)
+            disk_file_metadata = disk_file.get_datafile_metadata()
+            self._conditional_delete_at_update(
+                request, device, account, container, obj, policy,
+                disk_file_metadata, orig_delete_at, new_delete_at
+            )
         else:
             # preserve existing metadata, only content-type may be updated
             metadata = dict(disk_file.get_metafile_metadata())
@@ -993,15 +1011,10 @@ class ObjectController(BaseStorageServer):
                              orig_metadata, footers_metadata, metadata):
         orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
         new_delete_at = int(request.headers.get('X-Delete-At') or 0)
-        if orig_delete_at != new_delete_at:
-            if new_delete_at:
-                self.delete_at_update(
-                    'PUT', new_delete_at, account, container, obj, request,
-                    device, policy)
-            if orig_delete_at:
-                self.delete_at_update(
-                    'DELETE', orig_delete_at, account, container, obj,
-                    request, device, policy)
+
+        self._conditional_delete_at_update(request, device, account, container,
+                                           obj, policy, metadata,
+                                           orig_delete_at, new_delete_at)
 
         update_headers = HeaderKeyDict({
             'x-size': metadata['Content-Length'],
@@ -1262,10 +1275,10 @@ class ObjectController(BaseStorageServer):
             else:
                 # differentiate success from no object at all
                 response_class = HTTPNoContent
-        if orig_delete_at:
-            self.delete_at_update('DELETE', orig_delete_at, account,
-                                  container, obj, request, device,
-                                  policy)
+        self._conditional_delete_at_update(
+            request, device, account, container, obj, policy, {},
+            orig_delete_at, 0
+        )
         if orig_timestamp < req_timestamp:
             try:
                 disk_file.delete(req_timestamp)
diff --git a/test/probe/test_object_expirer.py b/test/probe/test_object_expirer.py
index b08f427ec9..d0cbebf098 100644
--- a/test/probe/test_object_expirer.py
+++ b/test/probe/test_object_expirer.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from collections import Counter
 import json
 import random
 import time
@@ -22,6 +23,8 @@ from io import BytesIO
 from swift.common.internal_client import InternalClient, UnexpectedResponse
 from swift.common.manager import Manager
 from swift.common.utils import Timestamp, config_true_value
+from swift.common import direct_client
+from swift.obj.expirer import extract_expirer_bytes_from_ctype
 
 from test.probe.common import ReplProbeTest, ENABLED_POLICIES
 from test.probe.brain import BrainSplitter
@@ -472,6 +475,142 @@ class TestObjectExpirer(ReplProbeTest):
                 headers={'X-Backend-Open-Expired': True})
         self.assertEqual(e.exception.resp.status_int, 404)
 
+    def test_expirer_object_bytes_eventual_consistency(self):
+        obj_brain = BrainSplitter(self.url, self.token, self.container_name,
+                                  self.object_name, 'object', self.policy)
+
+        obj_brain.put_container()
+
+        def put_object(content_length=0):
+            try:
+                self.client.upload_object(BytesIO(bytes(content_length)),
+                                          self.account, self.container_name,
+                                          self.object_name)
+            except UnexpectedResponse as e:
+                self.fail(
+                    'Expected 201 for PUT object but got %s' % e.resp.status)
+
+        t0_content_length = 24
+        put_object(content_length=t0_content_length)
+
+        try:
+            metadata = self.client.get_object_metadata(
+                self.account, self.container_name, self.object_name)
+        except UnexpectedResponse as e:
+            self.fail(
+                'Expected 200 for HEAD object but got %s' % e.resp.status)
+
+        assert metadata['content-length'] == str(t0_content_length)
+        t0 = metadata['x-timestamp']
+
+        obj_brain.stop_primary_half()
+
+        t1_content_length = 32
+        put_object(content_length=t1_content_length)
+
+        try:
+            metadata = self.client.get_object_metadata(
+                self.account, self.container_name, self.object_name)
+        except UnexpectedResponse as e:
+            self.fail(
+                'Expected 200 for HEAD object but got %s' % e.resp.status)
+
+        assert metadata['content-length'] == str(t1_content_length)
+        t1 = metadata['x-timestamp']
+
+        # some object servers recovered
+        obj_brain.start_primary_half()
+
+        head_responses = []
+
+        for node in obj_brain.ring.devs:
+            metadata = direct_client.direct_head_object(
+                node, obj_brain.part, self.account, self.container_name,
+                self.object_name)
+            head_responses.append(metadata)
+
+        timestamp_counts = Counter([
+            resp['X-Timestamp'] for resp in head_responses
+        ])
+        expected_counts = {t0: 2, t1: 2}
+        self.assertEqual(expected_counts, timestamp_counts)
+
+        # Do a POST to update object metadata (timestamp x-delete-at)
+        # POST will create an expiry queue entry with 2 landing on t0, 1 on t1
+        self.client.set_object_metadata(
+            self.account, self.container_name, self.object_name,
+            metadata={'X-Delete-After': '5'}, acceptable_statuses=(2,)
+        )
+
+        # Run the container updater once to register new container containing
+        # expirey queue entry
+        Manager(['container-updater']).once()
+
+        # Find the name of the container containing the expiring object
+        expiring_containers = list(
+            self.client.iter_containers('.expiring_objects')
+        )
+        self.assertEqual(1, len(expiring_containers))
+
+        expiring_container = expiring_containers[0]
+        expiring_container_name = expiring_container['name']
+
+        # Verify that there is one expiring object
+        expiring_objects = list(
+            self.client.iter_objects('.expiring_objects',
+                                     expiring_container_name)
+        )
+        self.assertEqual(1, len(expiring_objects))
+
+        # Get the nodes of the expiring container
+        expiring_container_part_num, expiring_container_nodes = \
+            self.client.container_ring.get_nodes('.expiring_objects',
+                                                 expiring_container_name)
+
+        # Verify that there are only 3 such nodes
+        self.assertEqual(3, len(expiring_container_nodes))
+
+        listing_records = []
+        for node in expiring_container_nodes:
+            metadata, container_data = direct_client.direct_get_container(
+                node, expiring_container_part_num, '.expiring_objects',
+                expiring_container_name)
+            # Verify there is metadata for only one object
+            self.assertEqual(1, len(container_data))
+            listing_records.append(container_data[0])
+
+        # Check for inconsistent metadata
+        byte_size_counts = Counter([
+            extract_expirer_bytes_from_ctype(resp['content_type'])
+            for resp in listing_records
+        ])
+        expected_byte_size_counts = {
+            t0_content_length: 2,
+            t1_content_length: 1
+        }
+
+        self.assertEqual(expected_byte_size_counts, byte_size_counts)
+
+        # Run the replicator to update expirey queue entries
+        Manager(['container-replicator']).once()
+
+        listing_records = []
+        for node in expiring_container_nodes:
+            metadata, container_data = direct_client.direct_get_container(
+                node, expiring_container_part_num, '.expiring_objects',
+                expiring_container_name)
+            self.assertEqual(1, len(container_data))
+            listing_records.append(container_data[0])
+
+        # Ensure that metadata is now consistent
+        byte_size_counts = Counter([
+            extract_expirer_bytes_from_ctype(resp['content_type'])
+            for resp in listing_records
+        ])
+        expected_byte_size_counts = {t1_content_length: 3}
+
+        self.assertEqual(expected_byte_size_counts, byte_size_counts)
+
     def _test_expirer_delete_outdated_object_version(self, object_exists):
         # This test simulates a case where the expirer tries to delete
         # an outdated version of an object.
diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py
index 097f711dec..bfb49a3d7c 100644
--- a/test/unit/container/test_backend.py
+++ b/test/unit/container/test_backend.py
@@ -7179,3 +7179,113 @@ class TestModuleFunctions(unittest.TestCase):
         self.assertIn(sr1, to_add)
         self.assertIn(sr2, to_add)
         self.assertEqual({'a/o'}, to_delete)
+
+
+class TestExpirerBytesCtypeTimestamp(test_db.TestDbBase):
+
+    def setUp(self):
+        super(TestExpirerBytesCtypeTimestamp, self).setUp()
+        self.ts = make_timestamp_iter()
+        self.policy = POLICIES.default
+
+    def _get_broker(self):
+        broker = ContainerBroker(self.db_path,
+                                 account='.expiring_objects',
+                                 container='1234')
+        broker.initialize(next(self.ts).internal, self.policy.idx)
+        return broker
+
+    def test_in_order_expirer_bytes_ctype(self):
+        broker = self._get_broker()
+
+        put1_ts = next(self.ts)
+        put2_ts = next(self.ts)
+        post_ts = next(self.ts)
+
+        broker.put_object(
+            '1234-a/c/o', post_ts.internal, 0,
+            'text/plain;swift_expirer_bytes=1',
+            'd41d8cd98f00b204e9800998ecf8427e',
+            storage_policy_index=self.policy.idx,
+            ctype_timestamp=put1_ts.internal)
+        broker.put_object(
+            '1234-a/c/o', post_ts.internal, 0,
+            'text/plain;swift_expirer_bytes=2',
+            'd41d8cd98f00b204e9800998ecf8427e',
+            storage_policy_index=self.policy.idx,
+            ctype_timestamp=put2_ts.internal)
+
+        self.assertEqual([{
+            'content_type': 'text/plain;swift_expirer_bytes=2',
+            'created_at': encode_timestamps(post_ts, put2_ts, put2_ts),
+            'deleted': 0,
+            'etag': 'd41d8cd98f00b204e9800998ecf8427e',
+            'name': '1234-a/c/o',
+            'size': 0,
+            'storage_policy_index': self.policy.idx,
+        }], broker.get_objects())
+
+    def test_out_of_order_expirer_bytes_ctype(self):
+        broker = self._get_broker()
+
+        put1_ts = next(self.ts)
+        put2_ts = next(self.ts)
+        post_ts = next(self.ts)
+
+        broker.put_object(
+            '1234-a/c/o', post_ts.internal, 0,
+            'text/plain;swift_expirer_bytes=2',
+            'd41d8cd98f00b204e9800998ecf8427e',
+            storage_policy_index=self.policy.idx,
+            ctype_timestamp=put2_ts.internal)
+        # order doesn't matter, more recent put2_ts ctype_timestamp wins
+        broker.put_object(
+            '1234-a/c/o', post_ts.internal, 0,
+            'text/plain;swift_expirer_bytes=1',
+            'd41d8cd98f00b204e9800998ecf8427e',
+            storage_policy_index=self.policy.idx,
+            ctype_timestamp=put1_ts.internal)
+
+        self.assertEqual([{
+            'content_type': 'text/plain;swift_expirer_bytes=2',
+            'created_at': encode_timestamps(post_ts, put2_ts, put2_ts),
+            'deleted': 0,
+            'etag': 'd41d8cd98f00b204e9800998ecf8427e',
+            'name': '1234-a/c/o',
+            'size': 0,
+            'storage_policy_index': self.policy.idx,
+        }], broker.get_objects())
+
+    def test_unupgraded_expirer_bytes_ctype(self):
+        broker = self._get_broker()
+
+        put1_ts = next(self.ts)
+        post_ts = next(self.ts)
+
+        broker.put_object(
+            '1234-a/c/o', post_ts.internal, 0,
+            'text/plain',
+            'd41d8cd98f00b204e9800998ecf8427e',
+            storage_policy_index=self.policy.idx)
+        # since the un-upgraded server's task creation request arrived w/o a
+        # ctype_timestamp, the row treats it's ctype timestamp as being the
+        # same as the x-timestamp that created the row (the post_ts) - which is
+        # more recent than the put1_ts used as the ctype_timestamp from the
+        # already-upgraded server
+        broker.put_object(
+            '1234-a/c/o', post_ts.internal, 0,
+            'text/plain;swift_expirer_bytes=1',
+            'd41d8cd98f00b204e9800998ecf8427e',
+            storage_policy_index=self.policy.idx,
+            ctype_timestamp=put1_ts.internal)
+
+        # so the un-upgraded row wins
+        self.assertEqual([{
+            'content_type': 'text/plain',
+            'created_at': post_ts,
+            'deleted': 0,
+            'etag': 'd41d8cd98f00b204e9800998ecf8427e',
+            'name': '1234-a/c/o',
+            'size': 0,
+            'storage_policy_index': self.policy.idx,
+        }], broker.get_objects())
diff --git a/test/unit/obj/test_expirer.py b/test/unit/obj/test_expirer.py
index a406939443..0fe134d3d7 100644
--- a/test/unit/obj/test_expirer.py
+++ b/test/unit/obj/test_expirer.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import os
 from time import time
 from unittest import main, TestCase
 from test.debug_logger import debug_logger
@@ -29,7 +30,7 @@ from six.moves import urllib
 
 from swift.common import internal_client, utils, swob
 from swift.common.utils import Timestamp
-from swift.obj import expirer
+from swift.obj import expirer, diskfile
 
 
 def not_random():
@@ -95,6 +96,112 @@ class FakeInternalClient(object):
         pass
 
 
+class TestExpirerHelpers(TestCase):
+
+    def test_add_expirer_bytes_to_ctype(self):
+        self.assertEqual(
+            'text/plain;swift_expirer_bytes=10',
+            expirer.embed_expirer_bytes_in_ctype(
+                'text/plain', {'Content-Length': 10}))
+        self.assertEqual(
+            'text/plain;some_foo=bar;swift_expirer_bytes=10',
+            expirer.embed_expirer_bytes_in_ctype(
+                'text/plain;some_foo=bar', {'Content-Length': '10'}))
+        # you could probably make a case it'd be better to replace an existing
+        # value if the swift_expirer_bytes key already exists in the content
+        # type; but in the only case we use this function currently the content
+        # type is hard coded to text/plain
+        self.assertEqual(
+            'text/plain;some_foo=bar;swift_expirer_bytes=10;'
+            'swift_expirer_bytes=11',
+            expirer.embed_expirer_bytes_in_ctype(
+                'text/plain;some_foo=bar;swift_expirer_bytes=10',
+                {'Content-Length': '11'}))
+
+    def test_extract_expirer_bytes_from_ctype(self):
+        self.assertEqual(10, expirer.extract_expirer_bytes_from_ctype(
+            'text/plain;swift_expirer_bytes=10'))
+        self.assertEqual(10, expirer.extract_expirer_bytes_from_ctype(
+            'text/plain;swift_expirer_bytes=10;some_foo=bar'))
+
+    def test_inverse_add_extract_bytes_from_ctype(self):
+        ctype_bytes = [
+            ('null', 0),
+            ('text/plain', 10),
+            ('application/octet-stream', 42),
+            ('application/json', 512),
+            ('gzip', 1000044),
+        ]
+        for ctype, expirer_bytes in ctype_bytes:
+            embedded_ctype = expirer.embed_expirer_bytes_in_ctype(
+                ctype, {'Content-Length': expirer_bytes})
+            found_bytes = expirer.extract_expirer_bytes_from_ctype(
+                embedded_ctype)
+            self.assertEqual(expirer_bytes, found_bytes)
+
+    def test_add_invalid_expirer_bytes_to_ctype(self):
+        self.assertRaises(TypeError,
+                          expirer.embed_expirer_bytes_in_ctype, 'nill', None)
+        self.assertRaises(TypeError,
+                          expirer.embed_expirer_bytes_in_ctype, 'bar', 'foo')
+        self.assertRaises(KeyError,
+                          expirer.embed_expirer_bytes_in_ctype, 'nill', {})
+        self.assertRaises(TypeError,
+                          expirer.embed_expirer_bytes_in_ctype, 'nill',
+                          {'Content-Length': None})
+        self.assertRaises(ValueError,
+                          expirer.embed_expirer_bytes_in_ctype, 'nill',
+                          {'Content-Length': 'foo'})
+        # perhaps could be an error
+        self.assertEqual(
+            'weird/float;swift_expirer_bytes=15',
+            expirer.embed_expirer_bytes_in_ctype('weird/float',
+                                                 {'Content-Length': 15.9}))
+
+    def test_embed_expirer_bytes_from_diskfile_metadata(self):
+        self.logger = debug_logger('test-expirer')
+        self.ts = make_timestamp_iter()
+        self.devices = mkdtemp()
+        self.conf = {
+            'mount_check': 'false',
+            'devices': self.devices,
+        }
+        self.df_mgr = diskfile.DiskFileManager(self.conf, logger=self.logger)
+        utils.mkdirs(os.path.join(self.devices, 'sda1'))
+        df = self.df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o', policy=0)
+
+        ts = next(self.ts)
+        with df.create() as writer:
+            writer.write(b'test')
+            writer.put({
+                # wrong key/case here would KeyError
+                'X-Timestamp': ts.internal,
+                # wrong key/case here would cause quarantine on read
+                'Content-Length': '4',
+            })
+
+        metadata = df.read_metadata()
+        # the Content-Type in the metadata is irrelevant; this method is used
+        # to create the content_type of an expirer queue task object
+        embeded_ctype_entry = expirer.embed_expirer_bytes_in_ctype(
+            'text/plain', metadata)
+        self.assertEqual('text/plain;swift_expirer_bytes=4',
+                         embeded_ctype_entry)
+
+    def test_extract_missing_bytes_from_ctype(self):
+        self.assertEqual(
+            None, expirer.extract_expirer_bytes_from_ctype('text/plain'))
+        self.assertEqual(
+            None, expirer.extract_expirer_bytes_from_ctype(
+                'text/plain;swift_bytes=10'))
+        self.assertEqual(
+            None, expirer.extract_expirer_bytes_from_ctype(
+                'text/plain;bytes=21'))
+        self.assertEqual(
+            None, expirer.extract_expirer_bytes_from_ctype(
+                'text/plain;some_foo=bar;other-baz=buz'))
+
+
 class TestObjectExpirer(TestCase):
     maxDiff = None
     internal_client = None
diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py
index 02e1d6a0ba..d07486292d 100644
--- a/test/unit/obj/test_server.py
+++ b/test/unit/obj/test_server.py
@@ -133,6 +133,15 @@ class TestTpoolSize(unittest.TestCase):
         self.assertEqual([], mock_snt.mock_calls)
 
 
+class SameReqEnv(object):
+
+    def __init__(self, req):
+        self.environ = req.environ
+
+    def __eq__(self, other):
+        return self.environ == other.environ
+
+
 @patch_policies(test_policies)
 class TestObjectController(BaseTestCase):
     """Test swift.obj.server.ObjectController"""
@@ -5704,7 +5713,8 @@ class TestObjectController(BaseTestCase):
              'method': 'PUT',
              'ssl': False,
              'headers': HeaderKeyDict({
-                 'x-content-type': 'text/plain',
+                 'x-content-type': 'text/plain;swift_expirer_bytes=0',
+                 'x-content-type-timestamp': utils.Timestamp('12345').internal,
                  'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
                  'x-size': '0',
                  'x-timestamp': utils.Timestamp('12345').internal,
@@ -5724,7 +5734,8 @@ class TestObjectController(BaseTestCase):
              'method': 'PUT',
              'ssl': False,
              'headers': HeaderKeyDict({
-                 'x-content-type': 'text/plain',
+                 'x-content-type': 'text/plain;swift_expirer_bytes=0',
+                 'x-content-type-timestamp': utils.Timestamp('12345').internal,
                  'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
                  'x-size': '0',
                  'x-timestamp': utils.Timestamp('12345').internal,
@@ -6867,9 +6878,10 @@ class TestObjectController(BaseTestCase):
 
         self.object_controller.delete_at_update = fake_delete_at_update
 
+        timestamp0 = normalize_timestamp(time())
         req = Request.blank(
             '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
-            headers={'X-Timestamp': normalize_timestamp(time()),
+            headers={'X-Timestamp': timestamp0,
                      'Content-Length': '4',
                      'Content-Type': 'application/octet-stream',
                      'X-Backend-Storage-Policy-Index': int(policy),
@@ -6905,7 +6917,10 @@ class TestObjectController(BaseTestCase):
         self.assertEqual(
             given_args, [
                 'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
-                given_args[5], 'sda1', policy])
+                given_args[5], 'sda1', policy, {
+                    'x-content-type': 'text/plain;swift_expirer_bytes=4',
+                    'x-content-type-timestamp': timestamp0
+                }])
 
         while given_args:
             given_args.pop()
@@ -6925,7 +6940,10 @@ class TestObjectController(BaseTestCase):
         self.assertEqual(
             given_args, [
                 'PUT', int(delete_at_timestamp2), 'a', 'c', 'o',
-                given_args[5], 'sda1', policy,
+                given_args[5], 'sda1', policy, {
+                    'x-content-type': 'text/plain;swift_expirer_bytes=4',
+                    'x-content-type-timestamp': timestamp0
+                },
                 'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
                 given_args[5], 'sda1', policy])
 
@@ -6967,7 +6985,10 @@ class TestObjectController(BaseTestCase):
         self.assertEqual(
             given_args, [
                 'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
-                given_args[5], 'sda1', policy])
+                given_args[5], 'sda1', policy, {
+                    'x-content-type': 'text/plain;swift_expirer_bytes=4',
+                    'x-content-type-timestamp': timestamp1
+                }])
 
         while given_args:
             given_args.pop()
@@ -6987,10 +7008,14 @@ class TestObjectController(BaseTestCase):
         req.body = 'TEST'
         resp = req.get_response(self.object_controller)
         self.assertEqual(resp.status_int, 201)
+        self.maxDiff = None
         self.assertEqual(
             given_args, [
                 'PUT', int(delete_at_timestamp2), 'a', 'c', 'o',
-                given_args[5], 'sda1', policy,
+                given_args[5], 'sda1', policy, {
+                    'x-content-type': 'text/plain;swift_expirer_bytes=4',
+                    'x-content-type-timestamp': timestamp2
+                },
                 'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
                 given_args[5], 'sda1', policy])
 
@@ -7590,6 +7615,345 @@ class TestObjectController(BaseTestCase):
         resp = req.get_response(self.object_controller)
         self.assertEqual(resp.status_int, 400)
 
+    def test_extra_headers_contain_object_bytes(self):
+        timestamp1 = next(self.ts).normal
+        delete_at_timestamp1 = int(time() + 1000)
+        delete_at_container1 = str(
+            delete_at_timestamp1 /
+            self.object_controller.expiring_objects_container_divisor *
+            self.object_controller.expiring_objects_container_divisor)
+        req = Request.blank(
+            '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
+            headers={'X-Timestamp': timestamp1,
+                     'Content-Length': '4096',
+                     'Content-Type': 'application/octet-stream',
+                     'X-Delete-At': str(delete_at_timestamp1),
+                     'X-Delete-At-Container': delete_at_container1})
+        req.body = '\x00' * 4096
+        with mock.patch.object(self.object_controller, 'delete_at_update') \
+                as fake_delete_at_update:
+            resp = req.get_response(self.object_controller)
+        self.assertEqual(resp.status_int, 201)
+        self.assertEqual(fake_delete_at_update.call_args_list, [mock.call(
+            'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
+            SameReqEnv(req), 'sda1', POLICIES[0], {
+                'x-content-type': 'text/plain;swift_expirer_bytes=4096',
+                'x-content-type-timestamp': timestamp1
+            })])
+
+        timestamp2 = next(self.ts).normal
+        req = Request.blank(
+            '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
+            headers={'X-Timestamp': timestamp2,
+                     'Content-Length': '5120',
+                     'Content-Type': 'application/octet-stream',
+                     'X-Delete-At': str(delete_at_timestamp1),
+                     'X-Delete-At-Container': delete_at_container1})
+        req.body = '\x00' * 5120
+        with mock.patch.object(self.object_controller, 'delete_at_update') \
+                as fake_delete_at_update:
+            resp = req.get_response(self.object_controller)
+        self.assertEqual(resp.status_int, 201)
+        self.assertEqual(fake_delete_at_update.call_args_list, [mock.call(
+            'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
+            SameReqEnv(req), 'sda1', POLICIES[0], {
+                'x-content-type': 'text/plain;swift_expirer_bytes=5120',
+                'x-content-type-timestamp': timestamp2
+            }
+        )])
+
+        timestamp3 = next(self.ts).normal
+        delete_at_timestamp2 = str(int(next(self.ts)) + 2000)
+        req = Request.blank(
+            '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
+            headers={'X-Timestamp': timestamp3,
+                     'X-Delete-At': delete_at_timestamp2})
+        with mock.patch.object(self.object_controller, 'delete_at_update') \
+                as fake_delete_at_update:
+            resp = req.get_response(self.object_controller)
+        self.assertEqual(resp.status_int, 202)
+        self.assertEqual(fake_delete_at_update.call_args_list, [mock.call(
+            'PUT', int(delete_at_timestamp2), 'a', 'c', 'o',
+            SameReqEnv(req), 'sda1', POLICIES[0], {
+                'x-content-type': 'text/plain;swift_expirer_bytes=5120',
+                'x-content-type-timestamp': timestamp2
+            },
+        ), mock.call(
+            'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
+            SameReqEnv(req), 'sda1', POLICIES[0]
+        )])
+
+        timestamp4 = next(self.ts).normal
+        req = Request.blank(
+            '/sda1/p/a/c/o',
+            environ={'REQUEST_METHOD': 'DELETE'},
+            headers={'X-Timestamp': timestamp4,
+                     'Content-Type': 'application/octet-stream'})
+        with mock.patch.object(self.object_controller, 'delete_at_update') \
+                as fake_delete_at_update:
+            resp = req.get_response(self.object_controller)
+        self.assertEqual(resp.status_int, 204)
+        self.assertEqual(fake_delete_at_update.call_args_list, [mock.call(
+            'DELETE', int(delete_at_timestamp2), 'a', 'c', 'o',
+            SameReqEnv(req), 'sda1', POLICIES[0]
+        )])
+
+    def test_delete_at_overwrite_same_expiration_different_bytes(self):
+        container_updates = []
+
+        def capture_updates(ip, port, method, path, headers, *args, **kwargs):
+            container_updates.append((ip, port, method, path, headers))
+
+        policy = random.choice(list(POLICIES))
+        delete_at = int(next(self.ts)) + 30
+        delete_at_container = utils.get_expirer_container(delete_at, 86400,
+                                                          'a', 'c', 'o')
+        base_headers = {
+            'X-Backend-Storage-Policy-Index': int(policy),
+            'Content-Type': 'application/octet-stream',
+            # we exclude the user container listing updates for brevity
+            # 'X-Container-Partition': '20',
+            # 'X-Container-Host': '1.2.3.4:5105',
+            # 'X-Container-Device': 'sdb1',
+            'X-Delete-At': str(delete_at),
+            'X-Delete-At-Container': delete_at_container,
+            'X-Delete-At-Host': "10.1.1.1:6201",
+            'X-Delete-At-Partition': '6237',
+            'X-Delete-At-Device': 'sdp',
+        }
+        if policy.policy_type == EC_POLICY:
+            base_headers['X-Object-Sysmeta-Ec-Frag-Index'] = '2'
+
+        put1_ts = next(self.ts)
+        put1_size = 4042
+        req1 = Request.blank(
+            '/sda1/p/a/c/o', method='PUT', body='\x01' * put1_size,
+            headers=dict(base_headers, **{
+                'X-Timestamp': put1_ts.normal,
+                'Content-Length': str(put1_size),
+                'X-Trans-Id': 'txn1',
+            }))
+        put2_ts = next(self.ts)
+        put2_size = 2044
+        req2 = Request.blank(
+            '/sda1/p/a/c/o', method='PUT', body='\x02' * put2_size,
+            headers=dict(base_headers, **{
+                'X-Timestamp': put2_ts.normal,
+                'Content-Length': str(put2_size),
+                'X-Trans-Id': 'txn2',
+            }))
+        with fake_spawn(), mocked_http_conn(
+                200, 200, give_connect=capture_updates):
+            resp1 = req1.get_response(self.object_controller)
+            resp2 = req2.get_response(self.object_controller)
+        self.assertEqual(resp1.status_int, 201)
+        self.assertEqual(resp2.status_int, 201)
+
+        self.assertEqual([(
+            '10.1.1.1', '6201', 'PUT',
+            '/sdp/6237/.expiring_objects/%s/%s-a/c/o' % (
+                delete_at_container, delete_at
+            ), {
+                'X-Backend-Storage-Policy-Index': '0',
+                'X-Timestamp': put1_ts.normal,
+                'X-Trans-Id': 'txn1',
+                'Referer': 'PUT http://localhost/sda1/p/a/c/o',
+                'X-Size': '0',
+                'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e',
+                'X-Content-Type':
+                'text/plain;swift_expirer_bytes=%s' % put1_size,
+                'X-Content-Type-Timestamp': put1_ts.normal,
+                'User-Agent': 'object-server %s' % os.getpid(),
+            }
+        ), (
+            '10.1.1.1', '6201', 'PUT',
+            '/sdp/6237/.expiring_objects/%s/%s-a/c/o' % (
+                delete_at_container, delete_at
+            ), {
+                'X-Backend-Storage-Policy-Index': '0',
+                'X-Timestamp': put2_ts.normal,
+                'X-Trans-Id': 'txn2',
+                'Referer': 'PUT http://localhost/sda1/p/a/c/o',
+                'X-Size': '0',
+                'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e',
+                'X-Content-Type':
+                'text/plain;swift_expirer_bytes=%s' % put2_size,
+                'X-Content-Type-Timestamp': put2_ts.normal,
+                'User-Agent': 'object-server %s' % os.getpid(),
+            }
+        )], container_updates)
+
+        async_pendings = []
+        async_pending_dir = os.path.join(
+            self.testdir, 'sda1', diskfile.get_async_dir(policy))
+        for dirpath, _, filenames in os.walk(async_pending_dir):
+            for filename in filenames:
+                async_pendings.append(os.path.join(dirpath, filename))
+
+        self.assertEqual(len(async_pendings), 0)
+
+    def test_delete_at_POST_update_same_expiration(self):
+        container_updates = []
+
+        def capture_updates(ip, port, method, path, headers, *args, **kwargs):
+            container_updates.append((ip, port, method, path, headers))
+
+        policy = random.choice(list(POLICIES))
+        put_ts = next(self.ts)
+        put_size = 1548
+        put_delete_at = int(next(self.ts)) + 30
+        put_delete_at_container = utils.get_expirer_container(
+            put_delete_at, 86400, 'a', 'c', 'o')
+        put_req = Request.blank(
+            '/sda1/p/a/c/o', method='PUT', body='\x01' * put_size,
+            headers={
+                'X-Backend-Storage-Policy-Index': int(policy),
+                'X-Timestamp': put_ts.normal,
+                'Content-Length': str(put_size),
+                'X-Trans-Id': 'txn1',
+                'Content-Type': 'application/octet-stream',
+                # we exclude the user container listing updates for brevity
+                # 'X-Container-Partition': '20',
+                # 'X-Container-Host': '1.2.3.4:5105',
+                # 'X-Container-Device': 'sdb1',
+                'X-Delete-At': str(put_delete_at),
+                'X-Delete-At-Container': put_delete_at_container,
+                'X-Delete-At-Host': "10.1.1.1:6201",
+                'X-Delete-At-Partition': '6237',
+                'X-Delete-At-Device': 'sdp',
+            })
+        if policy.policy_type == EC_POLICY:
+            put_req.headers['X-Object-Sysmeta-Ec-Frag-Index'] = '3'
+
+        with fake_spawn(), mocked_http_conn(
+                200, give_connect=capture_updates):
+            put_resp = put_req.get_response(self.object_controller)
+        self.assertEqual(put_resp.status_int, 201)
+
+        self.assertEqual([(
+            '10.1.1.1', '6201', 'PUT',
+            '/sdp/6237/.expiring_objects/%s/%s-a/c/o' % (
+                put_delete_at_container, put_delete_at
+            ), {
+                'X-Backend-Storage-Policy-Index': '0',
+                'X-Timestamp': put_ts.normal,
+                'X-Trans-Id': 'txn1',
+                'Referer': 'PUT http://localhost/sda1/p/a/c/o',
+                'X-Size': '0',
+                'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e',
+                'X-Content-Type':
+                'text/plain;swift_expirer_bytes=%s' % put_size,
+                'X-Content-Type-Timestamp': put_ts.normal,
+                'User-Agent': 'object-server %s' % os.getpid(),
+            }
+        )], container_updates)
+
+        # reset container updates
+        container_updates = []
+
+        delete_at = int(next(self.ts)) + 100
+        self.assertNotEqual(delete_at, put_delete_at)  # sanity
+        delete_at_container = utils.get_expirer_container(
+            delete_at, 86400, 'a', 'c', 'o')
+
+        base_headers = {
+            'X-Backend-Storage-Policy-Index': int(policy),
+            # we exclude the user container listing updates for brevity
+            # 'X-Container-Partition': '20',
+            # 'X-Container-Host': '1.2.3.4:5105',
+            # 'X-Container-Device': 'sdb1',
+            'X-Delete-At': str(delete_at),
+            'X-Delete-At-Container': delete_at_container,
+            'X-Delete-At-Host': "10.2.2.2:6202",
+            'X-Delete-At-Partition': '592',
+            'X-Delete-At-Device': 'sdm',
+        }
+
+        post1_ts = next(self.ts)
+        req1 = Request.blank(
+            '/sda1/p/a/c/o', method='POST', headers=dict(base_headers, **{
+                'X-Timestamp': post1_ts.normal,
+                'X-Trans-Id': 'txn2',
+            }))
+        post2_ts = next(self.ts)
+        req2 = Request.blank(
+            '/sda1/p/a/c/o', method='POST', headers=dict(base_headers, **{
+                'X-Timestamp': post2_ts.normal,
+                'X-Trans-Id': 'txn3',
+            }))
+
+        with fake_spawn(), mocked_http_conn(
+                200, 200, give_connect=capture_updates):
+            resp1 = req1.get_response(self.object_controller)
+            resp2 = req2.get_response(self.object_controller)
+        self.assertEqual(resp1.status_int, 202)
+        self.assertEqual(resp2.status_int, 202)
+
+        self.assertEqual([(
+            '10.2.2.2', '6202', 'PUT',
+            '/sdm/592/.expiring_objects/%s/%s-a/c/o' % (
+                delete_at_container, delete_at
+            ), {
+                'X-Backend-Storage-Policy-Index': '0',
+                # this the PUT from the POST-1
+                'X-Timestamp': post1_ts.normal,
+                'X-Trans-Id': 'txn2',
+                'Referer': 'POST http://localhost/sda1/p/a/c/o',
+                'X-Size': '0',
+                'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e',
+                'X-Content-Type':
+                'text/plain;swift_expirer_bytes=%s' % put_size,
+                'X-Content-Type-Timestamp': put_ts.normal,
+                'User-Agent': 'object-server %s' % os.getpid(),
+            }
+        ), (
+            '10.2.2.2', '6202', 'PUT',
+            '/sdm/592/.expiring_objects/%s/%s-a/c/o' % (
+                delete_at_container, delete_at
+            ), {
+                'X-Backend-Storage-Policy-Index': '0',
+                # this the PUT from POST-2
+                'X-Timestamp': post2_ts.normal,
+                'X-Trans-Id': 'txn3',
+                'Referer': 'POST http://localhost/sda1/p/a/c/o',
+                'X-Size': '0',
+                'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e',
+                'X-Content-Type':
+                'text/plain;swift_expirer_bytes=%s' % put_size,
+                'X-Content-Type-Timestamp': put_ts.normal,
+                'User-Agent': 'object-server %s' % os.getpid(),
+            }
+        )], container_updates)
+
+        async_pendings = []
+        async_pending_dir = os.path.join(
+            self.testdir, 'sda1', diskfile.get_async_dir(policy))
+        for dirpath, _, filenames in os.walk(async_pending_dir):
+            for filename in filenames:
+                async_pendings.append(os.path.join(dirpath, filename))
+
+        self.assertEqual(len(async_pendings), 1)
+
+        async_updates = []
+        for pending_file in async_pendings:
+            with open(pending_file, 'rb') as fh:
+                async_pending = pickle.load(fh)
+                async_updates.append(async_pending)
+        self.assertEqual([{
+            'op': 'DELETE',
+            'account': '.expiring_objects',
+            'container': delete_at_container,
+            'obj': '%s-a/c/o' % put_delete_at,
+            'headers': {
+                'X-Backend-Storage-Policy-Index': '0',
+                # only POST-1 has to clear the orig PUT delete-at
+                'X-Timestamp': post1_ts.normal,
+                'X-Trans-Id': 'txn2',
+                'Referer': 'POST http://localhost/sda1/p/a/c/o',
+                'User-Agent': 'object-server %s' % os.getpid(),
+            },
+        }], async_updates)
+
     def test_DELETE_calls_delete_at(self):
         given_args = []
 
@@ -7615,7 +7979,10 @@ class TestObjectController(BaseTestCase):
         self.assertEqual(resp.status_int, 201)
         self.assertEqual(given_args, [
             'PUT', int(delete_at_timestamp1), 'a', 'c', 'o',
-            given_args[5], 'sda1', POLICIES[0]])
+            given_args[5], 'sda1', POLICIES[0], {
+                'x-content-type': 'text/plain;swift_expirer_bytes=4',
+                'x-content-type-timestamp': timestamp1
+            }])
 
         while given_args:
             given_args.pop()