Delete objects.
- Todo containers next.
This commit is contained in:
@@ -19,7 +19,7 @@ import logging
|
|||||||
import swiftclient
|
import swiftclient
|
||||||
import eventlet
|
import eventlet
|
||||||
|
|
||||||
from swsync.objects import sync_object
|
from swsync.objects import sync_object, delete_object
|
||||||
from swsync.utils import get_config
|
from swsync.utils import get_config
|
||||||
|
|
||||||
|
|
||||||
@@ -27,7 +27,8 @@ class Containers(object):
|
|||||||
"""Containers syncornization."""
|
"""Containers syncornization."""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.max_gthreads = int(get_config("sync", "max_gthreads"))
|
self.max_gthreads = int(get_config("sync", "max_gthreads"))
|
||||||
self.objects_cls = sync_object
|
self.sync_object = sync_object
|
||||||
|
self.delete_object = delete_object
|
||||||
|
|
||||||
def sync(self, orig_storage_cnx, orig_storage_url,
|
def sync(self, orig_storage_cnx, orig_storage_url,
|
||||||
orig_token, dest_storage_cnx, dest_storage_url, dest_token,
|
orig_token, dest_storage_cnx, dest_storage_url, dest_token,
|
||||||
@@ -58,18 +59,27 @@ class Containers(object):
|
|||||||
set1 = set((x['last_modified'], x['name']) for x in orig_objects)
|
set1 = set((x['last_modified'], x['name']) for x in orig_objects)
|
||||||
set2 = set((x['last_modified'], x['name']) for x in dest_objects)
|
set2 = set((x['last_modified'], x['name']) for x in dest_objects)
|
||||||
diff = set1 - set2
|
diff = set1 - set2
|
||||||
|
delete_diff = set2 - set1
|
||||||
|
|
||||||
if not diff:
|
if not diff and not delete_diff:
|
||||||
return
|
return
|
||||||
|
|
||||||
pool = eventlet.GreenPool(size=self.max_gthreads)
|
pool = eventlet.GreenPool(size=self.max_gthreads)
|
||||||
pile = eventlet.GreenPile(pool)
|
pile = eventlet.GreenPile(pool)
|
||||||
|
|
||||||
for obj in diff:
|
for obj in diff:
|
||||||
logging.info("sending: %s ts:%s", obj[1], obj[0])
|
logging.info("sending: %s ts:%s", obj[1], obj[0])
|
||||||
pile.spawn(self.objects_cls,
|
pile.spawn(self.sync_object,
|
||||||
orig_storage_url,
|
orig_storage_url,
|
||||||
orig_token,
|
orig_token,
|
||||||
dest_storage_url,
|
dest_storage_url,
|
||||||
dest_token, container_name,
|
dest_token, container_name,
|
||||||
obj)
|
obj)
|
||||||
|
|
||||||
|
for obj in delete_diff:
|
||||||
|
logging.info("deleting: %s ts:%s", obj[1], obj[0])
|
||||||
|
pile.spawn(self.delete_object,
|
||||||
|
dest_storage_url,
|
||||||
|
dest_token, container_name,
|
||||||
|
obj)
|
||||||
pool.waitall()
|
pool.waitall()
|
||||||
|
|||||||
@@ -77,6 +77,15 @@ def get_object(storage_url, token,
|
|||||||
return (resp_headers, object_body)
|
return (resp_headers, object_body)
|
||||||
|
|
||||||
|
|
||||||
|
def delete_object(dest_storage_url, dest_token,
|
||||||
|
container_name, object_name_etag):
|
||||||
|
|
||||||
|
delete_object = "%s/%s/%s" % (dest_storage_url,
|
||||||
|
container_name, object_name_etag[1])
|
||||||
|
delete_headers = {'x-auth-token': dest_token}
|
||||||
|
swiftclient.delete_object(delete_object, headers=delete_headers)
|
||||||
|
|
||||||
|
|
||||||
def sync_object(orig_storage_url, orig_token, dest_storage_url,
|
def sync_object(orig_storage_url, orig_token, dest_storage_url,
|
||||||
dest_token, container_name, object_name_etag):
|
dest_token, container_name, object_name_etag):
|
||||||
orig_headers, orig_body = get_object(orig_storage_url,
|
orig_headers, orig_body = get_object(orig_storage_url,
|
||||||
|
|||||||
@@ -32,10 +32,10 @@ class TestContainers(test_base.TestCase):
|
|||||||
|
|
||||||
self.tenant_name = 'foo1'
|
self.tenant_name = 'foo1'
|
||||||
self.tenant_id = TENANTS_LIST[self.tenant_name]['id']
|
self.tenant_id = TENANTS_LIST[self.tenant_name]['id']
|
||||||
self.orig_storage_url = "%s/AUTH_%s" % (STORAGE_ORIG, self.tenant_id)
|
self.orig_storage_url = '%s/AUTH_%s' % (STORAGE_ORIG, self.tenant_id)
|
||||||
self.orig_storage_cnx = (urlparse.urlparse(self.orig_storage_url),
|
self.orig_storage_cnx = (urlparse.urlparse(self.orig_storage_url),
|
||||||
None)
|
None)
|
||||||
self.dest_storage_url = "%s/AUTH_%s" % (STORAGE_DEST, self.tenant_id)
|
self.dest_storage_url = '%s/AUTH_%s' % (STORAGE_DEST, self.tenant_id)
|
||||||
self.dest_storage_cnx = (urlparse.urlparse(self.dest_storage_url),
|
self.dest_storage_cnx = (urlparse.urlparse(self.dest_storage_url),
|
||||||
None)
|
None)
|
||||||
|
|
||||||
@@ -46,7 +46,7 @@ class TestContainers(test_base.TestCase):
|
|||||||
get_cnt_called.append(args)
|
get_cnt_called.append(args)
|
||||||
|
|
||||||
def head_container(*args, **kwargs):
|
def head_container(*args, **kwargs):
|
||||||
raise swiftclient.client.ClientException("Not Here")
|
raise swiftclient.client.ClientException('Not Here')
|
||||||
|
|
||||||
def get_container(_, token, name, **kwargs):
|
def get_container(_, token, name, **kwargs):
|
||||||
for clist in CONTAINERS_LIST:
|
for clist in CONTAINERS_LIST:
|
||||||
@@ -58,18 +58,26 @@ class TestContainers(test_base.TestCase):
|
|||||||
self.stubs.Set(swiftclient, 'head_container', head_container)
|
self.stubs.Set(swiftclient, 'head_container', head_container)
|
||||||
|
|
||||||
self.container_cls.sync(
|
self.container_cls.sync(
|
||||||
self.orig_storage_cnx, self.orig_storage_url, "token",
|
self.orig_storage_cnx, self.orig_storage_url, 'token',
|
||||||
self.dest_storage_cnx, self.dest_storage_url, "token",
|
self.dest_storage_cnx, self.dest_storage_url, 'token',
|
||||||
"cont1"
|
'cont1'
|
||||||
)
|
)
|
||||||
self.assertEqual(len(get_cnt_called), 1)
|
self.assertEqual(len(get_cnt_called), 1)
|
||||||
|
|
||||||
def test_dont_sync_dest(self):
|
def test_delete_dest(self):
|
||||||
|
# probably need to change that to mox properly
|
||||||
get_cnt_called = []
|
get_cnt_called = []
|
||||||
sync_object_called = []
|
sync_object_called = []
|
||||||
|
delete_object_called = []
|
||||||
|
|
||||||
|
def delete_object(*args, **kwargs):
|
||||||
|
delete_object_called.append((args, kwargs))
|
||||||
|
self.stubs.Set(swsync.objects.swiftclient,
|
||||||
|
'delete_object', delete_object)
|
||||||
|
|
||||||
def head_container(*args, **kwargs):
|
def head_container(*args, **kwargs):
|
||||||
pass
|
return True
|
||||||
|
self.stubs.Set(swiftclient, 'head_container', head_container)
|
||||||
|
|
||||||
def get_container(*args, **kwargs):
|
def get_container(*args, **kwargs):
|
||||||
# MASTER
|
# MASTER
|
||||||
@@ -89,20 +97,21 @@ class TestContainers(test_base.TestCase):
|
|||||||
def sync_object(*args, **kwargs):
|
def sync_object(*args, **kwargs):
|
||||||
sync_object_called.append(args)
|
sync_object_called.append(args)
|
||||||
|
|
||||||
self.stubs.Set(swiftclient, 'head_container', head_container)
|
|
||||||
self.stubs.Set(swiftclient, 'get_container', get_container)
|
self.stubs.Set(swiftclient, 'get_container', get_container)
|
||||||
self.container_cls.objects_cls = sync_object
|
|
||||||
|
self.container_cls.sync_object = sync_object
|
||||||
|
|
||||||
self.container_cls.sync(
|
self.container_cls.sync(
|
||||||
self.orig_storage_cnx,
|
self.orig_storage_cnx,
|
||||||
self.orig_storage_url,
|
self.orig_storage_url,
|
||||||
"token",
|
'token',
|
||||||
self.dest_storage_cnx,
|
self.dest_storage_cnx,
|
||||||
self.dest_storage_url,
|
self.dest_storage_url,
|
||||||
"token",
|
'token',
|
||||||
"cont1")
|
'cont1')
|
||||||
|
|
||||||
self.assertEqual(len(sync_object_called), 0)
|
self.assertEqual(len(sync_object_called), 0)
|
||||||
|
self.assertEqual(len(delete_object_called), 1)
|
||||||
|
|
||||||
def test_sync(self):
|
def test_sync(self):
|
||||||
get_cnt_called = []
|
get_cnt_called = []
|
||||||
@@ -130,15 +139,15 @@ class TestContainers(test_base.TestCase):
|
|||||||
|
|
||||||
self.stubs.Set(swiftclient, 'head_container', head_container)
|
self.stubs.Set(swiftclient, 'head_container', head_container)
|
||||||
self.stubs.Set(swiftclient, 'get_container', get_container)
|
self.stubs.Set(swiftclient, 'get_container', get_container)
|
||||||
self.container_cls.objects_cls = sync_object
|
self.container_cls.sync_object = sync_object
|
||||||
|
|
||||||
self.container_cls.sync(
|
self.container_cls.sync(
|
||||||
self.orig_storage_cnx,
|
self.orig_storage_cnx,
|
||||||
self.orig_storage_url,
|
self.orig_storage_url,
|
||||||
"token",
|
'token',
|
||||||
self.dest_storage_cnx,
|
self.dest_storage_cnx,
|
||||||
self.dest_storage_url,
|
self.dest_storage_url,
|
||||||
"token",
|
'token',
|
||||||
"cont1")
|
'cont1')
|
||||||
|
|
||||||
self.assertEqual(sync_object_called[0][-1][1], 'NEWOBJ')
|
self.assertEqual(sync_object_called[0][-1][1], 'NEWOBJ')
|
||||||
|
|||||||
Reference in New Issue
Block a user