diff --git a/swiftclient/client.py b/swiftclient/client.py
index d0ff52ea..985cad89 100644
--- a/swiftclient/client.py
+++ b/swiftclient/client.py
@@ -36,7 +36,7 @@ import six
 
 from swiftclient import version as swiftclient_version
 from swiftclient.exceptions import ClientException
-from swiftclient.utils import LengthWrapper
+from swiftclient.utils import LengthWrapper, ReadableToIterable
 
 AUTH_VERSIONS_V1 = ('1.0', '1', 1)
 AUTH_VERSIONS_V2 = ('2.0', '2', 2)
@@ -333,8 +333,8 @@ def get_auth_keystone(auth_url, user, key, os_options, **kwargs):
     except exceptions.Unauthorized:
         msg = 'Unauthorized. Check username, password and tenant name/id.'
         if auth_version in AUTH_VERSIONS_V3:
-            msg = 'Unauthorized. Check username/id, password, ' \
-                  + 'tenant name/id and user/tenant domain name/id.'
+            msg = ('Unauthorized. Check username/id, password, '
+                   'tenant name/id and user/tenant domain name/id.')
         raise ClientException(msg)
     except exceptions.AuthorizationFailure as err:
         raise ClientException('Authorization Failure. %s' % err)
@@ -388,8 +388,7 @@ def get_auth(auth_url, user, key, **kwargs):
         # We are handling a special use case here where the user argument
         # specifies both the user name and tenant name in the form tenant:user
         if user and not kwargs.get('tenant_name') and ':' in user:
-            (os_options['tenant_name'],
-             user) = user.split(':')
+            os_options['tenant_name'], user = user.split(':')
 
         # We are allowing to have an tenant_name argument in get_auth
         # directly without having os_options
@@ -929,7 +928,8 @@ def put_object(url, token=None, container=None, name=None, contents=None,
                       container name is expected to be part of the url
     :param name: object name to put; if None, the object name is expected to be
                  part of the url
-    :param contents: a string or a file like object to read object data from;
+    :param contents: a string, a file like object or an iterable
+                     to read object data from;
                      if None, a zero-byte put will be done
     :param content_length: value to send as content-length header; also limits
                            the amount read from contents; if None, it will be
@@ -983,27 +983,26 @@ def put_object(url, token=None, container=None, name=None, contents=None,
         headers['Content-Type'] = ''
     if not contents:
         headers['Content-Length'] = '0'
-    if hasattr(contents, 'read'):
+
+    if isinstance(contents, (ReadableToIterable, LengthWrapper)):
+        conn.putrequest(path, headers=headers, data=contents)
+    elif hasattr(contents, 'read'):
         if chunk_size is None:
             chunk_size = 65536
+
         if content_length is None:
-            def chunk_reader():
-                while True:
-                    data = contents.read(chunk_size)
-                    if not data:
-                        break
-                    yield data
-            conn.putrequest(path, headers=headers, data=chunk_reader())
+            data = ReadableToIterable(contents, chunk_size, md5=False)
         else:
-            # Fixes https://github.com/kennethreitz/requests/issues/1648
-            data = LengthWrapper(contents, content_length)
-            conn.putrequest(path, headers=headers, data=data)
+            data = LengthWrapper(contents, content_length, md5=False)
+
+        conn.putrequest(path, headers=headers, data=data)
     else:
         if chunk_size is not None:
-            warn_msg = '%s object has no \"read\" method, ignoring chunk_size'\
-                % type(contents).__name__
+            warn_msg = ('%s object has no "read" method, ignoring chunk_size'
+                        % type(contents).__name__)
             warnings.warn(warn_msg, stacklevel=2)
         conn.request('PUT', path, contents, headers)
+
     resp = conn.getresponse()
     body = resp.read()
     headers = {'X-Auth-Token': token}
@@ -1018,7 +1017,8 @@ def put_object(url, token=None, container=None, name=None, contents=None,
                               http_status=resp.status, http_reason=resp.reason,
                               http_response_content=body)
 
-    return resp.getheader('etag', '').strip('"')
+    etag = resp.getheader('etag', '').strip('"')
+    return etag
 
 
 def post_object(url, token, container, name, headers, http_conn=None,
diff --git a/swiftclient/service.py b/swiftclient/service.py
index 2980fd87..f24d4300 100644
--- a/swiftclient/service.py
+++ b/swiftclient/service.py
@@ -39,7 +39,9 @@ from swiftclient import Connection
 from swiftclient.command_helpers import (
     stat_account, stat_container, stat_object
 )
-from swiftclient.utils import config_true_value
+from swiftclient.utils import (
+    config_true_value, ReadableToIterable, LengthWrapper
+)
 from swiftclient.exceptions import ClientException
 from swiftclient.multithreading import MultiThreadingManager
 
@@ -1465,11 +1467,18 @@ class SwiftService(object):
             fp = open(path, 'rb')
             fp.seek(segment_start)
 
+            contents = LengthWrapper(fp, segment_size, md5=True)
             etag = conn.put_object(segment_container,
-                                   segment_name, fp,
+                                   segment_name, contents,
                                    content_length=segment_size,
                                    response_dict=results_dict)
 
+            if etag and etag != contents.get_md5sum():
+                raise SwiftError('Segment upload failed: remote and local '
+                                 'object md5 did not match, {0} != {1}\n'
+                                 'remote segment has not been removed.'
+                                 .format(etag, contents.get_md5sum()))
+
             res.update({
                 'success': True,
                 'response_dict': results_dict,
@@ -1695,21 +1704,28 @@ class SwiftService(object):
                     res['manifest_response_dict'] = mr
             else:
                 res['large_object'] = False
+                obr = {}
                 if path is not None:
-                    obr = {}
-                    conn.put_object(
-                        container, obj, open(path, 'rb'),
-                        content_length=getsize(path), headers=put_headers,
-                        response_dict=obr
-                    )
-                    res['response_dict'] = obr
+                    content_length = getsize(path)
+                    contents = LengthWrapper(open(path, 'rb'), content_length,
+                                             md5=True)
                 else:
-                    obr = {}
-                    conn.put_object(
-                        container, obj, stream, headers=put_headers,
-                        response_dict=obr
-                    )
-                    res['response_dict'] = obr
+                    content_length = None
+                    contents = ReadableToIterable(stream, md5=True)
+
+                etag = conn.put_object(
+                    container, obj, contents,
+                    content_length=content_length, headers=put_headers,
+                    response_dict=obr
+                )
+                res['response_dict'] = obr
+
+                if etag and etag != contents.get_md5sum():
+                    raise SwiftError('Object upload failed: remote and local '
+                                     'object md5 did not match, {0} != {1}\n'
+                                     'remote object has not been removed.'
+                                     .format(etag, contents.get_md5sum()))
+
             if old_manifest or old_slo_manifest_paths:
                 drs = []
                 if old_manifest:
diff --git a/swiftclient/utils.py b/swiftclient/utils.py
index 0f442b39..f0fcc019 100644
--- a/swiftclient/utils.py
+++ b/swiftclient/utils.py
@@ -44,7 +44,7 @@ def prt_bytes(bytes, human_flag):
         mods = list('KMGTPEZY')
         temp = float(bytes)
         if temp > 0:
-            while (temp > 1023):
+            while temp > 1023:
                 try:
                     suffix = mods.pop(0)
                 except IndexError:
@@ -60,7 +60,7 @@ def prt_bytes(bytes, human_flag):
     else:
         bytes = '%12s' % bytes
 
-    return(bytes)
+    return bytes
 
 
 def generate_temp_url(path, seconds, key, method):
@@ -104,23 +104,105 @@ def generate_temp_url(path, seconds, key, method):
             '{sig}&temp_url_expires={exp}'.format(
                 path=path,
                 sig=sig,
-                exp=expiration)
-            )
+                exp=expiration))
+
+
+class NoopMD5(object):
+    def __init__(self, *a, **kw):
+        pass
+
+    def update(self, *a, **kw):
+        pass
+
+    def hexdigest(self, *a, **kw):
+        return ''
+
+
+class ReadableToIterable(object):
+    """
+    Wrap a filelike object and act as an iterator.
+
+    It is recommended to use this class only on files opened in binary mode.
+    Due to the Unicode changes in python 3 files are now opened using an
+    encoding not suitable for use with the md5 class and because of this
+    hit the exception on every call to next. This could cause problems,
+    especially with large files and small chunk sizes.
+    """
+
+    def __init__(self, content, chunk_size=65536, md5=False):
+        """
+        :param content: The filelike object that is yielded from.
+        :param chunk_size: The max size of each yielded item.
+        :param md5: Flag to enable calculating the MD5 of the content
+                    as it is yielded.
+        """
+        self.md5sum = hashlib.md5() if md5 else NoopMD5()
+        self.content = content
+        self.chunk_size = chunk_size
+
+    def get_md5sum(self):
+        return self.md5sum.hexdigest()
+
+    def __next__(self):
+        """
+        Both ``__next__`` and ``next`` are provided to allow compatibility
+        with python 2 and python 3 and their use of ``iterable.next()``
+        and ``next(iterable)`` respectively.
+        """
+        chunk = self.content.read(self.chunk_size)
+        if not chunk:
+            raise StopIteration
+
+        try:
+            self.md5sum.update(chunk)
+        except TypeError:
+            self.md5sum.update(chunk.encode())
+
+        return chunk
+
+    def next(self):
+        return self.__next__()
+
+    def __iter__(self):
+        return self
 
 
 class LengthWrapper(object):
+    """
+    Wrap a filelike object with a maximum length.
 
-    def __init__(self, readable, length):
+    Fix for https://github.com/kennethreitz/requests/issues/1648
+    It is recommended to use this class only on files opened in binary mode.
+    """
+    def __init__(self, readable, length, md5=False):
+        """
+        :param readable: The filelike object to read from.
+        :param length: The maximum amount of content to that can be read from
+                       the filelike object before it is simulated to be
+                       empty.
+        :param md5: Flag to enable calculating the MD5 of the content
+                    as it is read.
+        """
+        self.md5sum = hashlib.md5() if md5 else NoopMD5()
         self._length = self._remaining = length
         self._readable = readable
 
     def __len__(self):
         return self._length
 
+    def get_md5sum(self):
+        return self.md5sum.hexdigest()
+
     def read(self, *args, **kwargs):
         if self._remaining <= 0:
             return ''
-        chunk = self._readable.read(
-            *args, **kwargs)[:self._remaining]
+
+        chunk = self._readable.read(*args, **kwargs)[:self._remaining]
         self._remaining -= len(chunk)
+
+        try:
+            self.md5sum.update(chunk)
+        except TypeError:
+            self.md5sum.update(chunk.encode())
+
         return chunk
diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py
index 0a0af89b..3309813f 100644
--- a/tests/unit/test_service.py
+++ b/tests/unit/test_service.py
@@ -16,14 +16,16 @@ import mock
 import os
 import tempfile
 import testtools
+import time
 from hashlib import md5
 from mock import Mock, PropertyMock
 from six.moves.queue import Queue, Empty as QueueEmptyError
 from six import BytesIO
 
 import swiftclient
-from swiftclient.service import SwiftService, SwiftError
+import swiftclient.utils as utils
 from swiftclient.client import Connection
+from swiftclient.service import SwiftService, SwiftError
 
 
 clean_os_environ = {}
@@ -548,3 +550,270 @@ class TestService(testtools.TestCase):
             except SwiftError as exc:
                 self.assertEqual('Segment size should be an integer value',
                                  exc.value)
+
+
+class TestServiceUpload(testtools.TestCase):
+
+    def _assertDictEqual(self, a, b, m=None):
+        # assertDictEqual is not available in py2.6 so use a shallow check
+        # instead
+        if not m:
+            m = '{0} != {1}'.format(a, b)
+
+        if hasattr(self, 'assertDictEqual'):
+            self.assertDictEqual(a, b, m)
+        else:
+            self.assertTrue(isinstance(a, dict), m)
+            self.assertTrue(isinstance(b, dict), m)
+            self.assertEqual(len(a), len(b), m)
+            for k, v in a.items():
+                self.assertIn(k, b, m)
+                self.assertEqual(b[k], v, m)
+
+    def test_upload_segment_job(self):
+        with tempfile.NamedTemporaryFile() as f:
+            f.write(b'a' * 10)
+            f.write(b'b' * 10)
+            f.write(b'c' * 10)
+            f.flush()
+
+            # Mock the connection to return an empty etag. This
+            # skips etag validation which would fail as the LengthWrapper
+            # isnt read from.
+            mock_conn = mock.Mock()
+            mock_conn.put_object.return_value = ''
+            type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+            expected_r = {
+                'action': 'upload_segment',
+                'for_object': 'test_o',
+                'segment_index': 2,
+                'segment_size': 10,
+                'segment_location': '/test_c_segments/test_s_1',
+                'log_line': 'test_o segment 2',
+                'success': True,
+                'response_dict': {},
+                'segment_etag': '',
+                'attempts': 2,
+            }
+
+            s = SwiftService()
+            r = s._upload_segment_job(conn=mock_conn,
+                                      path=f.name,
+                                      container='test_c',
+                                      segment_name='test_s_1',
+                                      segment_start=10,
+                                      segment_size=10,
+                                      segment_index=2,
+                                      obj_name='test_o',
+                                      options={'segment_container': None})
+
+            self._assertDictEqual(r, expected_r)
+
+            self.assertEqual(mock_conn.put_object.call_count, 1)
+            mock_conn.put_object.assert_called_with('test_c_segments',
+                                                    'test_s_1',
+                                                    mock.ANY,
+                                                    content_length=10,
+                                                    response_dict={})
+            contents = mock_conn.put_object.call_args[0][2]
+            self.assertIsInstance(contents, utils.LengthWrapper)
+            self.assertEqual(len(contents), 10)
+            # This read forces the LengthWrapper to calculate the md5
+            # for the read content.
+            self.assertEqual(contents.read(), b'b' * 10)
+            self.assertEqual(contents.get_md5sum(), md5(b'b' * 10).hexdigest())
+
+    def test_upload_segment_job_etag_mismatch(self):
+        def _consuming_conn(*a, **kw):
+            contents = a[2]
+            contents.read()  # Force md5 calculation
+            return 'badresponseetag'
+
+        with tempfile.NamedTemporaryFile() as f:
+            f.write(b'a' * 10)
+            f.write(b'b' * 10)
+            f.write(b'c' * 10)
+            f.flush()
+
+            mock_conn = mock.Mock()
+            mock_conn.put_object.side_effect = _consuming_conn
+            type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+            s = SwiftService()
+            r = s._upload_segment_job(conn=mock_conn,
+                                      path=f.name,
+                                      container='test_c',
+                                      segment_name='test_s_1',
+                                      segment_start=10,
+                                      segment_size=10,
+                                      segment_index=2,
+                                      obj_name='test_o',
+                                      options={'segment_container': None})
+
+            self.assertIn('error', r)
+            self.assertTrue(r['error'].value.find('md5 did not match') >= 0)
+
+            self.assertEqual(mock_conn.put_object.call_count, 1)
+            mock_conn.put_object.assert_called_with('test_c_segments',
+                                                    'test_s_1',
+                                                    mock.ANY,
+                                                    content_length=10,
+                                                    response_dict={})
+            contents = mock_conn.put_object.call_args[0][2]
+            self.assertEqual(contents.get_md5sum(), md5(b'b' * 10).hexdigest())
+
+    def test_upload_object_job_file(self):
+        # Uploading a file results in the file object being wrapped in a
+        # LengthWrapper. This test sets the options is such a way that much
+        # of _upload_object_job is skipped bringing the critical path down
+        # to around 60 lines to ease testing.
+        with tempfile.NamedTemporaryFile() as f:
+            f.write(b'a' * 30)
+            f.flush()
+            expected_r = {
+                'action': 'upload_object',
+                'attempts': 2,
+                'container': 'test_c',
+                'headers': {},
+                'large_object': False,
+                'object': 'test_o',
+                'response_dict': {},
+                'status': 'uploaded',
+                'success': True,
+            }
+            expected_mtime = float(os.path.getmtime(f.name))
+
+            mock_conn = mock.Mock()
+            mock_conn.put_object.return_value = ''
+            type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+            s = SwiftService()
+            r = s._upload_object_job(conn=mock_conn,
+                                     container='test_c',
+                                     source=f.name,
+                                     obj='test_o',
+                                     options={'changed': False,
+                                              'skip_identical': False,
+                                              'leave_segments': True,
+                                              'header': '',
+                                              'segment_size': 0})
+
+            # Check for mtime and path separately as they are calculated
+            # from the temp file and will be different each time.
+            mtime = float(r['headers']['x-object-meta-mtime'])
+            self.assertAlmostEqual(mtime, expected_mtime, delta=1)
+            del r['headers']['x-object-meta-mtime']
+
+            self.assertEqual(r['path'], f.name)
+            del r['path']
+
+            self._assertDictEqual(r, expected_r)
+            self.assertEqual(mock_conn.put_object.call_count, 1)
+            mock_conn.put_object.assert_called_with('test_c', 'test_o',
+                                                    mock.ANY,
+                                                    content_length=30,
+                                                    headers={},
+                                                    response_dict={})
+            contents = mock_conn.put_object.call_args[0][2]
+            self.assertIsInstance(contents, utils.LengthWrapper)
+            self.assertEqual(len(contents), 30)
+            # This read forces the LengthWrapper to calculate the md5
+            # for the read content.
+            self.assertEqual(contents.read(), b'a' * 30)
+            self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())
+
+    def test_upload_object_job_stream(self):
+        # Streams are wrapped as ReadableToIterable
+        with tempfile.TemporaryFile() as f:
+            f.write(b'a' * 30)
+            f.flush()
+            f.seek(0)
+            expected_r = {
+                'action': 'upload_object',
+                'attempts': 2,
+                'container': 'test_c',
+                'headers': {},
+                'large_object': False,
+                'object': 'test_o',
+                'response_dict': {},
+                'status': 'uploaded',
+                'success': True,
+                'path': None,
+            }
+            expected_mtime = round(time.time())
+
+            mock_conn = mock.Mock()
+            mock_conn.put_object.return_value = ''
+            type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+            s = SwiftService()
+            r = s._upload_object_job(conn=mock_conn,
+                                     container='test_c',
+                                     source=f,
+                                     obj='test_o',
+                                     options={'changed': False,
+                                              'skip_identical': False,
+                                              'leave_segments': True,
+                                              'header': '',
+                                              'segment_size': 0})
+
+            mtime = float(r['headers']['x-object-meta-mtime'])
+            self.assertAlmostEqual(mtime, expected_mtime, delta=10)
+            del r['headers']['x-object-meta-mtime']
+
+            self._assertDictEqual(r, expected_r)
+            self.assertEqual(mock_conn.put_object.call_count, 1)
+            mock_conn.put_object.assert_called_with('test_c', 'test_o',
+                                                    mock.ANY,
+                                                    content_length=None,
+                                                    headers={},
+                                                    response_dict={})
+            contents = mock_conn.put_object.call_args[0][2]
+            self.assertIsInstance(contents, utils.ReadableToIterable)
+            self.assertEqual(contents.chunk_size, 65536)
+            # next retreives the first chunk of the stream or len(chunk_size)
+            # or less, it also forces the md5 to be calculated.
+            self.assertEqual(next(contents), b'a' * 30)
+            self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())
+
+    def test_upload_object_job_etag_mismatch(self):
+        # The etag test for both streams and files use the same code
+        # so only one test should be needed.
+        def _consuming_conn(*a, **kw):
+            contents = a[2]
+            contents.read()  # Force md5 calculation
+            return 'badresponseetag'
+
+        with tempfile.NamedTemporaryFile() as f:
+            f.write(b'a' * 30)
+            f.flush()
+
+            mock_conn = mock.Mock()
+            mock_conn.put_object.side_effect = _consuming_conn
+            type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+            s = SwiftService()
+            r = s._upload_object_job(conn=mock_conn,
+                                     container='test_c',
+                                     source=f.name,
+                                     obj='test_o',
+                                     options={'changed': False,
+                                              'skip_identical': False,
+                                              'leave_segments': True,
+                                              'header': '',
+                                              'segment_size': 0})
+
+            self.assertEqual(r['success'], False)
+            self.assertIn('error', r)
+            self.assertTrue(r['error'].value.find('md5 did not match') >= 0)
+
+            self.assertEqual(mock_conn.put_object.call_count, 1)
+            expected_headers = {'x-object-meta-mtime': mock.ANY}
+            mock_conn.put_object.assert_called_with('test_c', 'test_o',
+                                                    mock.ANY,
+                                                    content_length=30,
+                                                    headers=expected_headers,
+                                                    response_dict={})
+
+            contents = mock_conn.put_object.call_args[0][2]
+            self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())
diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py
index c5a47b86..28aea7d6 100644
--- a/tests/unit/test_shell.py
+++ b/tests/unit/test_shell.py
@@ -400,6 +400,8 @@ class TestShell(unittest.TestCase):
     def test_upload(self, connection, walk):
         connection.return_value.head_object.return_value = {
             'content-length': '0'}
+        connection.return_value.put_object.return_value = (
+            'd41d8cd98f00b204e9800998ecf8427e')
         connection.return_value.attempts = 0
         argv = ["", "upload", "container", self.tmpfile,
                 "-H", "X-Storage-Policy:one"]
@@ -475,6 +477,8 @@ class TestShell(unittest.TestCase):
         connection.return_value.get_object.return_value = ({}, json.dumps(
             [{'name': 'container1/old_seg1'}, {'name': 'container2/old_seg2'}]
         ))
+        connection.return_value.put_object.return_value = (
+            'd41d8cd98f00b204e9800998ecf8427e')
         swiftclient.shell.main(argv)
         connection.return_value.put_object.assert_called_with(
             'container',
@@ -504,6 +508,8 @@ class TestShell(unittest.TestCase):
         connection.return_value.head_object.return_value = {
             'content-length': '0'}
         connection.return_value.attempts = 0
+        connection.return_value.put_object.return_value = (
+            'd41d8cd98f00b204e9800998ecf8427e')
         argv = ["", "upload", "container", self.tmpfile, "-S", "10",
                 "-C", "container"]
         with open(self.tmpfile, "wb") as fh:
diff --git a/tests/unit/test_swiftclient.py b/tests/unit/test_swiftclient.py
index 03600163..9ebcff55 100644
--- a/tests/unit/test_swiftclient.py
+++ b/tests/unit/test_swiftclient.py
@@ -23,9 +23,10 @@ except ImportError:
 
 import six
 import socket
-import types
 import testtools
 import warnings
+import tempfile
+from hashlib import md5
 from six.moves.urllib.parse import urlparse
 from six.moves import reload_module
 
@@ -92,16 +93,22 @@ class TestJsonImport(testtools.TestCase):
         self.assertEqual(c.json_loads, json.loads)
 
 
-class MockHttpResponse():
-    def __init__(self, status=0):
+class MockHttpResponse(object):
+    def __init__(self, status=0, headers=None, verify=False):
         self.status = status
         self.status_code = status
         self.reason = "OK"
         self.buffer = []
         self.requests_params = None
+        self.verify = verify
+        self.md5sum = md5()
+        # zero byte hash
+        self.headers = {'etag': '"d41d8cd98f00b204e9800998ecf8427e"'}
+        if headers:
+            self.headers.update(headers)
 
-        class Raw:
-            def read():
+        class Raw(object):
+            def read(self):
                 pass
         self.raw = Raw()
 
@@ -109,17 +116,21 @@ class MockHttpResponse():
         return ""
 
     def getheader(self, name, default):
-        return ""
+        return self.headers.get(name, default)
 
     def getheaders(self):
         return {"key1": "value1", "key2": "value2"}
 
     def fake_response(self):
-        return MockHttpResponse(self.status)
+        return self
 
     def _fake_request(self, *arg, **kwarg):
         self.status = 200
         self.requests_params = kwarg
+        if self.verify:
+            for chunk in kwarg['data']:
+                self.md5sum.update(chunk)
+
         # This simulate previous httplib implementation that would do a
         # putrequest() and then use putheader() to send header.
         for k, v in kwarg['headers'].items():
@@ -665,7 +676,7 @@ class TestPutObject(MockHttpTest):
         conn = c.http_connection(u'http://www.test.com/')
         mock_file = six.StringIO(u'\u5929\u7a7a\u4e2d\u7684\u4e4c\u4e91')
         args = (u'\u5929\u7a7a\u4e2d\u7684\u4e4c\u4e91',
-                '\u5929\u7a7a\u4e2d\u7684\u4e4c\u4e91',
+                u'\u5929\u7a7a\u4e2d\u7684\u4e4c\u4e91',
                 u'\u5929\u7a7a\u4e2d\u7684\u4e4c\u4e91',
                 u'\u5929\u7a7a\u4e2d\u7684\u4e4c\u4e91',
                 mock_file)
@@ -732,25 +743,22 @@ class TestPutObject(MockHttpTest):
         resp = MockHttpResponse(status=200)
         conn[1].getresponse = resp.fake_response
         conn[1]._request = resp._fake_request
-        astring = 'asdf'
-        astring_len = len(astring)
-        mock_file = six.StringIO(astring)
+        raw_data = b'asdf' * 256
+        raw_data_len = len(raw_data)
 
-        c.put_object(url='http://www.test.com', http_conn=conn,
-                     contents=mock_file, content_length=astring_len)
-        self.assertTrue(isinstance(resp.requests_params['data'],
-                                   swiftclient.utils.LengthWrapper))
-        self.assertEqual(astring_len,
-                         len(resp.requests_params['data'].read()))
+        for kwarg in ({'headers': {'Content-Length': str(raw_data_len)}},
+                      {'content_length': raw_data_len}):
+            with tempfile.TemporaryFile() as mock_file:
+                mock_file.write(raw_data)
+                mock_file.seek(0)
 
-        mock_file = six.StringIO(astring)
-        c.put_object(url='http://www.test.com', http_conn=conn,
-                     headers={'Content-Length': str(astring_len)},
-                     contents=mock_file)
-        self.assertTrue(isinstance(resp.requests_params['data'],
-                                   swiftclient.utils.LengthWrapper))
-        self.assertEqual(astring_len,
-                         len(resp.requests_params['data'].read()))
+                c.put_object(url='http://www.test.com', http_conn=conn,
+                             contents=mock_file, **kwarg)
+
+                req_data = resp.requests_params['data']
+                self.assertTrue(isinstance(req_data,
+                                           swiftclient.utils.LengthWrapper))
+                self.assertEqual(raw_data_len, len(req_data.read()))
 
     def test_chunk_upload(self):
         # Chunked upload happens when no content_length is passed to put_object
@@ -758,19 +766,71 @@ class TestPutObject(MockHttpTest):
         resp = MockHttpResponse(status=200)
         conn[1].getresponse = resp.fake_response
         conn[1]._request = resp._fake_request
-        raw_data = 'asdf' * 256
+        raw_data = b'asdf' * 256
         chunk_size = 16
-        mock_file = six.StringIO(raw_data)
 
-        c.put_object(url='http://www.test.com', http_conn=conn,
-                     contents=mock_file, chunk_size=chunk_size)
-        request_data = resp.requests_params['data']
-        self.assertTrue(isinstance(request_data, types.GeneratorType))
-        data = ''
-        for chunk in request_data:
-            self.assertEqual(chunk_size, len(chunk))
-            data += chunk
-        self.assertEqual(data, raw_data)
+        with tempfile.TemporaryFile() as mock_file:
+            mock_file.write(raw_data)
+            mock_file.seek(0)
+
+            c.put_object(url='http://www.test.com', http_conn=conn,
+                         contents=mock_file, chunk_size=chunk_size)
+            req_data = resp.requests_params['data']
+            self.assertTrue(hasattr(req_data, '__iter__'))
+            data = b''
+            for chunk in req_data:
+                self.assertEqual(chunk_size, len(chunk))
+                data += chunk
+            self.assertEqual(data, raw_data)
+
+    def test_md5_mismatch(self):
+        conn = c.http_connection('http://www.test.com')
+        resp = MockHttpResponse(status=200, verify=True,
+                                headers={'etag': '"badresponseetag"'})
+        conn[1].getresponse = resp.fake_response
+        conn[1]._request = resp._fake_request
+        raw_data = b'asdf' * 256
+        raw_data_md5 = md5(raw_data).hexdigest()
+        chunk_size = 16
+
+        with tempfile.TemporaryFile() as mock_file:
+            mock_file.write(raw_data)
+            mock_file.seek(0)
+
+            contents = swiftclient.utils.ReadableToIterable(mock_file,
+                                                            md5=True)
+
+            etag = c.put_object(url='http://www.test.com',
+                                http_conn=conn,
+                                contents=contents,
+                                chunk_size=chunk_size)
+
+            self.assertNotEquals(etag, contents.get_md5sum())
+            self.assertEquals(raw_data_md5, contents.get_md5sum())
+
+    def test_md5_match(self):
+        conn = c.http_connection('http://www.test.com')
+        raw_data = b'asdf' * 256
+        raw_data_md5 = md5(raw_data).hexdigest()
+        resp = MockHttpResponse(status=200, verify=True,
+                                headers={'etag': '"' + raw_data_md5 + '"'})
+        conn[1].getresponse = resp.fake_response
+        conn[1]._request = resp._fake_request
+        chunk_size = 16
+
+        with tempfile.TemporaryFile() as mock_file:
+            mock_file.write(raw_data)
+            mock_file.seek(0)
+            contents = swiftclient.utils.ReadableToIterable(mock_file,
+                                                            md5=True)
+
+            etag = c.put_object(url='http://www.test.com',
+                                http_conn=conn,
+                                contents=contents,
+                                chunk_size=chunk_size)
+
+            self.assertEquals(raw_data_md5, contents.get_md5sum())
+            self.assertEquals(etag, contents.get_md5sum())
 
     def test_params(self):
         conn = c.http_connection(u'http://www.test.com/')
diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py
index f072aed1..d82d2b8e 100644
--- a/tests/unit/test_utils.py
+++ b/tests/unit/test_utils.py
@@ -14,10 +14,10 @@
 # limitations under the License.
 
 import testtools
-
 import mock
 import six
 import tempfile
+from hashlib import md5
 
 from swiftclient import utils as u
 
@@ -161,39 +161,111 @@ class TestTempURL(testtools.TestCase):
                           self.method)
 
 
+class TestReadableToIterable(testtools.TestCase):
+
+    def test_iter(self):
+        chunk_size = 4
+        write_data = tuple(x.encode() for x in ('a', 'b', 'c', 'd'))
+        actual_md5sum = md5()
+
+        with tempfile.TemporaryFile() as f:
+            for x in write_data:
+                f.write(x * chunk_size)
+                actual_md5sum.update(x * chunk_size)
+            f.seek(0)
+            data = u.ReadableToIterable(f, chunk_size, True)
+
+            for i, data_chunk in enumerate(data):
+                self.assertEquals(chunk_size, len(data_chunk))
+                self.assertEquals(data_chunk, write_data[i] * chunk_size)
+
+            self.assertEquals(actual_md5sum.hexdigest(), data.get_md5sum())
+
+    def test_md5_creation(self):
+        # Check creation with a real and noop md5 class
+        data = u.ReadableToIterable(None, None, md5=True)
+        self.assertEquals(md5().hexdigest(), data.get_md5sum())
+        self.assertTrue(isinstance(data.md5sum, type(md5())))
+
+        data = u.ReadableToIterable(None, None, md5=False)
+        self.assertEquals('', data.get_md5sum())
+        self.assertTrue(isinstance(data.md5sum, type(u.NoopMD5())))
+
+    def test_unicode(self):
+        # Check no errors are raised if unicode data is feed in.
+        unicode_data = u'abc'
+        actual_md5sum = md5(unicode_data.encode()).hexdigest()
+        chunk_size = 2
+
+        with tempfile.TemporaryFile(mode='w+') as f:
+            f.write(unicode_data)
+            f.seek(0)
+            data = u.ReadableToIterable(f, chunk_size, True)
+
+            x = next(data)
+            self.assertEquals(2, len(x))
+            self.assertEquals(unicode_data[:2], x)
+
+            x = next(data)
+            self.assertEquals(1, len(x))
+            self.assertEquals(unicode_data[2:], x)
+
+            self.assertEquals(actual_md5sum, data.get_md5sum())
+
+
 class TestLengthWrapper(testtools.TestCase):
 
     def test_stringio(self):
-        contents = six.StringIO('a' * 100)
-        data = u.LengthWrapper(contents, 42)
+        contents = six.StringIO(u'a' * 100)
+        data = u.LengthWrapper(contents, 42, True)
+        s = u'a' * 42
+        read_data = u''.join(iter(data.read, ''))
+
         self.assertEqual(42, len(data))
-        read_data = ''.join(iter(data.read, ''))
         self.assertEqual(42, len(read_data))
-        self.assertEqual('a' * 42, read_data)
+        self.assertEqual(s, read_data)
+        self.assertEqual(md5(s.encode()).hexdigest(), data.get_md5sum())
+
+    def test_bytesio(self):
+        contents = six.BytesIO(b'a' * 100)
+        data = u.LengthWrapper(contents, 42, True)
+        s = b'a' * 42
+        read_data = b''.join(iter(data.read, ''))
+
+        self.assertEqual(42, len(data))
+        self.assertEqual(42, len(read_data))
+        self.assertEqual(s, read_data)
+        self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
 
     def test_tempfile(self):
-        with tempfile.NamedTemporaryFile(mode='w') as f:
-            f.write('a' * 100)
+        with tempfile.NamedTemporaryFile(mode='wb') as f:
+            f.write(b'a' * 100)
             f.flush()
-            contents = open(f.name)
-            data = u.LengthWrapper(contents, 42)
+            contents = open(f.name, 'rb')
+            data = u.LengthWrapper(contents, 42, True)
+            s = b'a' * 42
+            read_data = b''.join(iter(data.read, ''))
+
             self.assertEqual(42, len(data))
-            read_data = ''.join(iter(data.read, ''))
             self.assertEqual(42, len(read_data))
-            self.assertEqual('a' * 42, read_data)
+            self.assertEqual(s, read_data)
+            self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
 
     def test_segmented_file(self):
-        with tempfile.NamedTemporaryFile(mode='w') as f:
+        with tempfile.NamedTemporaryFile(mode='wb') as f:
             segment_length = 1024
             segments = ('a', 'b', 'c', 'd')
             for c in segments:
-                f.write(c * segment_length)
+                f.write((c * segment_length).encode())
             f.flush()
             for i, c in enumerate(segments):
-                contents = open(f.name)
+                contents = open(f.name, 'rb')
                 contents.seek(i * segment_length)
-                data = u.LengthWrapper(contents, segment_length)
+                data = u.LengthWrapper(contents, segment_length, True)
+                read_data = b''.join(iter(data.read, ''))
+                s = (c * segment_length).encode()
+
                 self.assertEqual(segment_length, len(data))
-                read_data = ''.join(iter(data.read, ''))
                 self.assertEqual(segment_length, len(read_data))
-                self.assertEqual(c * segment_length, read_data)
+                self.assertEqual(s, read_data)
+                self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
diff --git a/tests/unit/utils.py b/tests/unit/utils.py
index 2467ca6d..88d6d129 100644
--- a/tests/unit/utils.py
+++ b/tests/unit/utils.py
@@ -127,7 +127,7 @@ def fake_http_connect(*code_iter, **kwargs):
                        'last-modified': self.timestamp,
                        'x-object-meta-test': 'testing',
                        'etag':
-                       self.etag or '"68b329da9893e34099c7d8ad5cb9c940"',
+                       self.etag or '"d41d8cd98f00b204e9800998ecf8427e"',
                        'x-works': 'yes',
                        'x-account-container-count': 12345}
             if not self.timestamp: