karbor/karbor/tests/fullstack/test_scheduled_operations.py

173 lines
6.5 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import eventlet
from datetime import datetime
from functools import partial
from karbor.common import constants
from karbor.services.operationengine.engine.triggers.timetrigger \
.timeformats import calendar_time
from karbor.tests.fullstack import karbor_base
from karbor.tests.fullstack import karbor_objects as objects
from karbor.tests.fullstack import utils
pattern = "BEGIN:VEVENT\nRRULE:FREQ=WEEKLY;INTERVAL=1;\nEND:VEVENT"
DEFAULT_PROPERTY = {
'pattern': pattern,
'format': 'calendar'
}
class ScheduledOperationsTest(karbor_base.KarborBaseTest):
"""Test Scheduled Operations operation
"""
def setUp(self):
super(ScheduledOperationsTest, self).setUp()
providers = self.provider_list()
self.assertTrue(len(providers))
self.provider_id = self.provider_id_noop
def _create_scheduled_operation(
self,
resources,
trigger_properties=DEFAULT_PROPERTY,
operation_name=None):
plan = self.store(objects.Plan())
plan.create(self.provider_id, resources)
operation_definition = {'plan_id': plan.id,
'provider_id': self.provider_id}
trigger = self.store(objects.Trigger())
trigger.create('time', trigger_properties)
operation = objects.ScheduledOperation()
operation.create('protect', trigger.id,
operation_definition, operation_name)
return operation
def _create_for_volume(self,
trigger_properties=DEFAULT_PROPERTY,
operation_name=None):
volume = self.store(objects.Volume())
volume.create(1)
return self._create_scheduled_operation([volume, ],
trigger_properties,
operation_name)
def _create_for_server(self,
trigger_properties=DEFAULT_PROPERTY,
operation_name=None):
server = self.store(objects.Server())
server.create()
return self._create_scheduled_operation([server, ],
trigger_properties,
operation_name)
def test_scheduled_operations_create_no_scheduled(self):
name = "KarborFullstack-Scheduled-Operation-no-scheduled"
operation = self.store(self._create_for_volume(operation_name=name))
item = self.karbor_client.scheduled_operations.get(operation.id)
self.assertEqual(name, item.name)
items = self.karbor_client.scheduled_operations.list()
ids = [item_.id for item_ in items]
self.assertTrue(operation.id in ids)
@staticmethod
def _wait_timestamp(pattern, start_time, freq):
if not isinstance(freq, int) or freq <= 0:
return 0
cur_time = copy.deepcopy(start_time)
cal_obj = calendar_time.ICal(start_time, pattern)
for i in range(freq):
next_time = cal_obj.compute_next_time(cur_time)
cur_time = next_time
return (next_time - start_time).seconds
def _checkpoint_status(self, checkpoint_id, status):
try:
cp = self.karbor_client.checkpoints.get(self.provider_id,
checkpoint_id)
except Exception:
return False
if status is None or cp.status == status:
return True
else:
return False
def test_scheduled_operations_create_and_scheduled(self):
freq = 2
eventlet_grace = 20
pattern = "BEGIN:VEVENT\nRRULE:FREQ=MINUTELY;INTERVAL=2;\nEND:VEVENT"
cur_property = {'pattern': pattern, 'format': 'calendar'}
operation = self.store(self._create_for_volume(cur_property))
start_time = datetime.now().replace(microsecond=0)
sleep_time = self._wait_timestamp(pattern, start_time, freq)
sleep_time += eventlet_grace
self.assertNotEqual(0, sleep_time)
eventlet.sleep(sleep_time)
items = self.karbor_client.checkpoints.list(self.provider_id)
operation_item = self.karbor_client.scheduled_operations.get(
operation.id)
plan_id = operation_item.operation_definition["plan_id"]
cps = list(filter(lambda x: x.protection_plan["id"] == plan_id, items))
self.assertEqual(freq, len(cps))
for cp in cps:
utils.wait_until_true(
partial(self._checkpoint_status,
cp.id,
constants.CHECKPOINT_STATUS_AVAILABLE),
timeout=objects.LONG_TIMEOUT, sleep=objects.LONG_SLEEP
)
checkpoint = self.store(objects.Checkpoint())
checkpoint._provider_id = self.provider_id
checkpoint.id = cp.id
def test_scheduled_operations_list(self):
operation1 = self.store(self._create_for_volume())
operation2 = self.store(self._create_for_server())
items = self.karbor_client.scheduled_operations.list()
ids = [item.id for item in items]
self.assertTrue(operation1.id in ids)
self.assertTrue(operation2.id in ids)
def test_scheduled_operations_get(self):
name = "KarborFullstack-Scheduled-Operation-Test-Get"
operation = self._create_for_volume(operation_name=name)
self.store(operation)
item = self.karbor_client.scheduled_operations.get(operation.id)
self.assertEqual(item.name, name)
self.assertEqual(item.id, operation.id)
def test_scheduled_operations_delete(self):
name = "KarborFullstack-Scheduled-Operation-Test-Delete"
operation = self._create_for_volume(operation_name=name)
item = self.karbor_client.scheduled_operations.get(operation.id)
self.assertEqual(name, item.name)
operation.close()
items = self.karbor_client.scheduled_operations.list()
ids = [item_.id for item_ in items]
self.assertTrue(operation.id not in ids)