Merge "Add Functional Tests for PUT Object Copy"

This commit is contained in:
Jenkins
2015-03-17 04:23:47 +00:00
committed by Gerrit Code Review

View File

@@ -19,6 +19,7 @@ from swift3.test.functional.s3_test_client import Connection
from swift3.test.functional.utils import get_error_code,\
assert_common_response_headers
from swift3.test.functional import Swift3FunctionalTestCase
from swift3.etree import fromstring
class TestSwift3Object(Swift3FunctionalTestCase):
@@ -40,6 +41,21 @@ class TestSwift3Object(Swift3FunctionalTestCase):
self.assertTrue(headers['etag'] is not None)
self.assertEquals(headers['content-length'], '0')
# PUT Object Copy
self.conn.make_request('PUT', 'dst_bucket')
headers = {'x-amz-copy-source': '/%s/%s' % (self.bucket, obj)}
status, headers, body = \
self.conn.make_request('PUT', 'dst_bucket', 'dst_obj',
headers=headers)
self.assertEquals(status, 200)
assert_common_response_headers(self, headers)
self.assertEquals(headers['content-length'], str(len(body)))
elem = fromstring(body, 'CopyObjectResult')
self.assertTrue(elem.find('LastModified').text is not None)
self.assertTrue(elem.find('ETag').text is not None)
# GET Object
status, headers, body = \
self.conn.make_request('GET', self.bucket, obj)
@@ -79,6 +95,38 @@ class TestSwift3Object(Swift3FunctionalTestCase):
self.conn.make_request('PUT', 'bucket2', 'object')
self.assertEquals(get_error_code(body), 'NoSuchBucket')
def test_put_object_copy_error(self):
obj = 'object'
self.conn.make_request('PUT', self.bucket, obj)
dst_bucket = 'dst_bucket'
self.conn.make_request('PUT', dst_bucket)
dst_obj = 'dst_object'
headers = {'x-amz-copy-source': '/%s/%s' % (self.bucket, obj)}
auth_error_conn = Connection(aws_secret_key='invalid')
status, headers, body = \
auth_error_conn.make_request('PUT', dst_bucket, dst_obj, headers)
self.assertEquals(get_error_code(body), 'SignatureDoesNotMatch')
# /src/nothing -> /dst/dst
headers = {'X-Amz-Copy-Source': '/%s/%s' % (self.bucket, 'nothing')}
status, headers, body = \
self.conn.make_request('PUT', dst_bucket, dst_obj, headers)
self.assertEquals(get_error_code(body), 'NoSuchKey')
# /nothing/src -> /dst/dst
headers = {'X-Amz-Copy-Source': '/%s/%s' % ('nothing', obj)}
status, headers, body = \
self.conn.make_request('PUT', dst_bucket, dst_obj, headers)
# TODO: source bucket is not check.
# self.assertEquals(get_error_code(body), 'NoSuchBucket')
# /src/src -> /nothing/dst
headers = {'X-Amz-Copy-Source': '/%s/%s' % (self.bucket, obj)}
status, headers, body = \
self.conn.make_request('PUT', 'nothing', dst_obj, headers)
self.assertEquals(get_error_code(body), 'NoSuchBucket')
def test_get_object_error(self):
obj = 'object'
self.conn.make_request('PUT', self.bucket, obj)