From a623aa5be57b103b41bc20afb954a6a5f0effc78 Mon Sep 17 00:00:00 2001 From: gholt Date: Mon, 17 Jan 2011 14:36:28 -0800 Subject: [PATCH 1/2] client.py: Reset of streams during upload retries --- swift/common/client.py | 53 ++++++++++++------ test/unit/common/test_client.py | 95 +++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+), 16 deletions(-) diff --git a/swift/common/client.py b/swift/common/client.py index e3536e894f..b12742e4ae 100644 --- a/swift/common/client.py +++ b/swift/common/client.py @@ -688,7 +688,7 @@ class Connection(object): """Convenience class to make requests that will also retry the request""" def __init__(self, authurl, user, key, retries=5, preauthurl=None, - preauthtoken=None, snet=False): + preauthtoken=None, snet=False, starting_backoff=1): """ :param authurl: authenitcation URL :param user: user name to authenticate as @@ -708,6 +708,7 @@ class Connection(object): self.token = preauthtoken self.attempts = 0 self.snet = snet + self.starting_backoff = starting_backoff def get_auth(self): return get_auth(self.authurl, self.user, self.key, snet=self.snet) @@ -715,9 +716,9 @@ class Connection(object): def http_connection(self): return http_connection(self.url) - def _retry(self, func, *args, **kwargs): + def _retry(self, reset_func, func, *args, **kwargs): self.attempts = 0 - backoff = 1 + backoff = self.starting_backoff while self.attempts <= self.retries: self.attempts += 1 try: @@ -746,10 +747,12 @@ class Connection(object): raise sleep(backoff) backoff *= 2 + if reset_func: + reset_func(func, *args, **kwargs) def head_account(self): """Wrapper for :func:`head_account`""" - return self._retry(head_account) + return self._retry(None, head_account) def get_account(self, marker=None, limit=None, prefix=None, full_listing=False): @@ -757,16 +760,16 @@ class Connection(object): # TODO(unknown): With full_listing=True this will restart the entire # listing with each retry. Need to make a better version that just # retries where it left off. - return self._retry(get_account, marker=marker, limit=limit, + return self._retry(None, get_account, marker=marker, limit=limit, prefix=prefix, full_listing=full_listing) def post_account(self, headers): """Wrapper for :func:`post_account`""" - return self._retry(post_account, headers) + return self._retry(None, post_account, headers) def head_container(self, container): """Wrapper for :func:`head_container`""" - return self._retry(head_container, container) + return self._retry(None, head_container, container) def get_container(self, container, marker=None, limit=None, prefix=None, delimiter=None, full_listing=False): @@ -774,43 +777,61 @@ class Connection(object): # TODO(unknown): With full_listing=True this will restart the entire # listing with each retry. Need to make a better version that just # retries where it left off. - return self._retry(get_container, container, marker=marker, + return self._retry(None, get_container, container, marker=marker, limit=limit, prefix=prefix, delimiter=delimiter, full_listing=full_listing) def put_container(self, container, headers=None): """Wrapper for :func:`put_container`""" - return self._retry(put_container, container, headers=headers) + return self._retry(None, put_container, container, headers=headers) def post_container(self, container, headers): """Wrapper for :func:`post_container`""" - return self._retry(post_container, container, headers) + return self._retry(None, post_container, container, headers) def delete_container(self, container): """Wrapper for :func:`delete_container`""" - return self._retry(delete_container, container) + return self._retry(None, delete_container, container) def head_object(self, container, obj): """Wrapper for :func:`head_object`""" - return self._retry(head_object, container, obj) + return self._retry(None, head_object, container, obj) def get_object(self, container, obj, resp_chunk_size=None): """Wrapper for :func:`get_object`""" - return self._retry(get_object, container, obj, + return self._retry(None, get_object, container, obj, resp_chunk_size=resp_chunk_size) + def _put_object_reset(self, func, container, obj, contents, *args, + **kwargs): + seek = getattr(contents, 'seek', None) + if seek: + seek(0) + def put_object(self, container, obj, contents, content_length=None, etag=None, chunk_size=65536, content_type=None, headers=None): """Wrapper for :func:`put_object`""" - return self._retry(put_object, container, obj, contents, + + def _default_reset(*args, **kwargs): + raise ClientException('put_object(%r, %r, ...) failure and no ' + 'ability to reset contents for reupload.' % (container, obj)) + + reset_func = _default_reset + tell = getattr(contents, 'tell', None) + seek = getattr(contents, 'seek', None) + if tell and seek: + orig_pos = tell() + reset_func = lambda *a, **k: seek(orig_pos) + + return self._retry(reset_func, put_object, container, obj, contents, content_length=content_length, etag=etag, chunk_size=chunk_size, content_type=content_type, headers=headers) def post_object(self, container, obj, headers): """Wrapper for :func:`post_object`""" - return self._retry(post_object, container, obj, headers) + return self._retry(None, post_object, container, obj, headers) def delete_object(self, container, obj): """Wrapper for :func:`delete_object`""" - return self._retry(delete_object, container, obj) + return self._retry(None, delete_object, container, obj) diff --git a/test/unit/common/test_client.py b/test/unit/common/test_client.py index 739cba75e3..233ec429f7 100644 --- a/test/unit/common/test_client.py +++ b/test/unit/common/test_client.py @@ -14,7 +14,10 @@ # limitations under the License. # TODO: More tests +import socket import unittest +from StringIO import StringIO +from urlparse import urlparse # TODO: mock http connection class with more control over headers from test.unit.proxy.test_server import fake_http_connect @@ -377,5 +380,97 @@ class TestConnection(MockHttpTest): self.assertEquals(conn.url, 'http://www.new.com') self.assertEquals(conn.token, 'new') + def test_reset_stream(self): + + class LocalContents(object): + + def __init__(self, tell_value=0): + self.already_read = False + self.seeks = [] + self.tell_value = tell_value + + def tell(self): + return self.tell_value + + def seek(self, position): + self.seeks.append(position) + self.already_read = False + + def read(self, size=-1): + if self.already_read: + return '' + else: + self.already_read = True + return 'abcdef' + + class LocalConnection(object): + + def putrequest(self, *args, **kwargs): + return + + def putheader(self, *args, **kwargs): + return + + def endheaders(self, *args, **kwargs): + return + + def send(self, *args, **kwargs): + raise socket.error('oops') + + def request(self, *args, **kwargs): + return + + def getresponse(self, *args, **kwargs): + self.status = 200 + return self + + def getheader(self, *args, **kwargs): + return '' + + def read(self, *args, **kwargs): + return '' + + def local_http_connection(url): + parsed = urlparse(url) + return parsed, LocalConnection() + + orig_conn = c.http_connection + try: + c.http_connection = local_http_connection + conn = c.Connection('http://www.example.com', 'asdf', 'asdf', + retries=1, starting_backoff=.0001) + + contents = LocalContents() + exc = None + try: + conn.put_object('c', 'o', contents) + except socket.error, err: + exc = err + self.assertEquals(contents.seeks, [0]) + self.assertEquals(str(exc), 'oops') + + contents = LocalContents(tell_value=123) + exc = None + try: + conn.put_object('c', 'o', contents) + except socket.error, err: + exc = err + self.assertEquals(contents.seeks, [123]) + self.assertEquals(str(exc), 'oops') + + contents = LocalContents() + contents.tell = None + exc = None + try: + conn.put_object('c', 'o', contents) + except c.ClientException, err: + exc = err + self.assertEquals(contents.seeks, []) + self.assertEquals(str(exc), "put_object('c', 'o', ...) failure " + "and no ability to reset contents for reupload.") + finally: + c.http_connection = orig_conn + + if __name__ == '__main__': unittest.main() From 56738b161fe024c8f5725cb382dd61a29d77fa64 Mon Sep 17 00:00:00 2001 From: gholt Date: Mon, 17 Jan 2011 14:44:55 -0800 Subject: [PATCH 2/2] client.py: Dead code removal --- swift/common/client.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/swift/common/client.py b/swift/common/client.py index bc9dfa1ed5..8d144735ee 100644 --- a/swift/common/client.py +++ b/swift/common/client.py @@ -810,12 +810,6 @@ class Connection(object): return self._retry(None, get_object, container, obj, resp_chunk_size=resp_chunk_size) - def _put_object_reset(self, func, container, obj, contents, *args, - **kwargs): - seek = getattr(contents, 'seek', None) - if seek: - seek(0) - def put_object(self, container, obj, contents, content_length=None, etag=None, chunk_size=65536, content_type=None, headers=None):