Introduce reprocessing API

Depends-on: https://review.opendev.org/c/openstack/cloudkitty/+/777442
Depends-on: https://review.opendev.org/c/openstack/requirements/+/799315

Implements: https://review.opendev.org/c/openstack/cloudkitty-specs/+/791245

Change-Id: Idb0032eba17d83409344ab58153097ac70814e86
This commit is contained in:
Rafael Weingärtner 2021-07-02 10:26:11 -03:00 committed by Pierre Riteau
parent 249ea00192
commit 65af204a1c
25 changed files with 2353 additions and 113 deletions

1
.gitignore vendored
View File

@ -22,6 +22,7 @@ var/
*.egg
cloudkitty.egg-info
.idea/
.python-version
# Configuration file
etc/cloudkitty/cloudkitty.conf.sample

View File

@ -34,6 +34,7 @@ API_MODULES = [
'cloudkitty.api.v2.scope',
'cloudkitty.api.v2.dataframes',
'cloudkitty.api.v2.summary',
'cloudkitty.api.v2.task'
]

View File

@ -196,7 +196,7 @@ class ScopeState(base.BaseResource):
'scope:patch_state',
{'tenant_id': scope_id or flask.request.context.project_id}
)
results = self._storage_state.get_all(identifier=scope_id)
results = self._storage_state.get_all(identifier=scope_id, active=None)
if len(results) < 1:
raise http_exceptions.NotFound(
@ -217,7 +217,8 @@ class ScopeState(base.BaseResource):
collector=collector,
active=active)
storage_scopes = self._storage_state.get_all(identifier=scope_id)
storage_scopes = self._storage_state.get_all(
identifier=scope_id, active=active)
update_storage_scope = storage_scopes[0]
return {
'scope_id': update_storage_scope.identifier,

View File

@ -0,0 +1,35 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from cloudkitty.api.v2 import utils as api_utils
def init(app):
api_utils.do_init(app, 'task', [
{
'module': __name__ + '.' + 'reprocess',
'resource_class': 'ReprocessSchedulerPostApi',
'url': '/reprocesses',
},
{
'module': __name__ + '.' + 'reprocess',
'resource_class': 'ReprocessSchedulerGetApi',
'url': '/reprocesses/<path_scope_id>',
},
{
'module': __name__ + '.' + 'reprocess',
'resource_class': 'ReprocessesSchedulerGetApi',
'url': '/reprocesses',
},
])
return app

View File

@ -0,0 +1,280 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from datetimerange import DateTimeRange
import flask
from oslo_log import log
import voluptuous
from werkzeug import exceptions as http_exceptions
from cloudkitty.api.v2 import base
from cloudkitty.api.v2 import utils as api_utils
from cloudkitty.common import policy
from cloudkitty import storage_state
from cloudkitty.storage_state.models import ReprocessingScheduler
from cloudkitty.utils import tz as tzutils
from cloudkitty.utils import validation as validation_utils
LOG = log.getLogger(__name__)
ALL_SCOPES_OPTION = 'ALL'
def dt_from_iso_as_utc(date_string):
return tzutils.dt_from_iso(date_string, as_utc=True)
class ReprocessSchedulerPostApi(base.BaseResource):
def __init__(self, *args, **kwargs):
super(ReprocessSchedulerPostApi, self).__init__(*args, **kwargs)
self.storage_state_manager = storage_state.StateManager()
self.schedule_reprocessing_db = storage_state.ReprocessingSchedulerDb()
@api_utils.add_input_schema('body', {
voluptuous.Required('scope_ids'): api_utils.MultiQueryParam(str),
voluptuous.Required('start_reprocess_time'):
voluptuous.Coerce(dt_from_iso_as_utc),
voluptuous.Required('end_reprocess_time'):
voluptuous.Coerce(dt_from_iso_as_utc),
voluptuous.Required('reason'): api_utils.SingleQueryParam(str),
})
def post(self, scope_ids=[], start_reprocess_time=None,
end_reprocess_time=None, reason=None):
policy.authorize(
flask.request.context,
'schedule:task_reprocesses',
{'tenant_id': flask.request.context.project_id or scope_ids}
)
ReprocessSchedulerPostApi.validate_inputs(
end_reprocess_time, reason, scope_ids, start_reprocess_time)
if ALL_SCOPES_OPTION in scope_ids:
scope_ids = []
if not isinstance(scope_ids, list):
scope_ids = [scope_ids]
all_scopes_to_reprocess = self.storage_state_manager.get_all(
identifier=scope_ids, offset=None, limit=None)
ReprocessSchedulerPostApi.check_if_there_are_invalid_scopes(
all_scopes_to_reprocess, end_reprocess_time, scope_ids,
start_reprocess_time)
ReprocessSchedulerPostApi.validate_start_end_for_reprocessing(
all_scopes_to_reprocess, end_reprocess_time, start_reprocess_time)
self.validate_reprocessing_schedules_overlaps(
all_scopes_to_reprocess, end_reprocess_time, start_reprocess_time)
for scope in all_scopes_to_reprocess:
schedule = ReprocessingScheduler(
identifier=scope.identifier, reason=reason,
start_reprocess_time=start_reprocess_time,
end_reprocess_time=end_reprocess_time)
LOG.debug("Persisting scope reprocessing schedule [%s].", schedule)
self.schedule_reprocessing_db.persist(schedule)
return {}, 202
@staticmethod
def validate_inputs(
end_reprocess_time, reason, scope_ids, start_reprocess_time):
ReprocessSchedulerPostApi.validate_scope_ids(scope_ids)
if not reason.strip():
raise http_exceptions.BadRequest(
"Empty or blank reason text is not allowed. Please, do "
"inform/register the reason for the reprocessing of a "
"previously processed timestamp.")
if end_reprocess_time < start_reprocess_time:
raise http_exceptions.BadRequest(
"End reprocessing timestamp [%s] cannot be less than "
"start reprocessing timestamp [%s]."
% (start_reprocess_time, end_reprocess_time))
@staticmethod
def validate_scope_ids(scope_ids):
option_all_selected = False
for s in scope_ids:
if s == ALL_SCOPES_OPTION:
option_all_selected = True
continue
if option_all_selected and len(scope_ids) != 1:
raise http_exceptions.BadRequest(
"Cannot use 'ALL' with scope ID [%s]. Either schedule a "
"reprocessing for all active scopes using 'ALL' option, "
"or inform only the scopes you desire to schedule a "
"reprocessing." % scope_ids)
@staticmethod
def check_if_there_are_invalid_scopes(
all_scopes_to_reprocess, end_reprocess_time, scope_ids,
start_reprocess_time):
invalid_scopes = []
for s in scope_ids:
scope_exist_in_db = False
for scope_to_reprocess in all_scopes_to_reprocess:
if s == scope_to_reprocess.identifier:
scope_exist_in_db = True
break
if not scope_exist_in_db:
invalid_scopes.append(s)
if invalid_scopes:
raise http_exceptions.BadRequest(
"Scopes %s scheduled to reprocess [start=%s, end=%s] "
"do not exist."
% (invalid_scopes, start_reprocess_time, end_reprocess_time))
@staticmethod
def validate_start_end_for_reprocessing(all_scopes_to_reprocess,
end_reprocess_time,
start_reprocess_time):
for scope in all_scopes_to_reprocess:
last_processed_timestamp = scope.last_processed_timestamp
if start_reprocess_time > last_processed_timestamp:
raise http_exceptions.BadRequest(
"Cannot execute a reprocessing [start=%s] for scope [%s] "
"starting after the last possible timestamp [%s]."
% (start_reprocess_time, scope, last_processed_timestamp))
if end_reprocess_time > scope.last_processed_timestamp:
raise http_exceptions.BadRequest(
"Cannot execute a reprocessing [end=%s] for scope [%s] "
"ending after the last possible timestamp [%s]."
% (end_reprocess_time, scope, last_processed_timestamp))
def validate_reprocessing_schedules_overlaps(
self, all_scopes_to_reprocess, end_reprocess_time,
start_reprocess_time):
scheduling_range = DateTimeRange(
start_reprocess_time, end_reprocess_time)
for scope_to_reprocess in all_scopes_to_reprocess:
all_reprocessing_schedules = self.schedule_reprocessing_db.get_all(
identifier=[scope_to_reprocess.identifier])
LOG.debug("All schedules [%s] for reprocessing found for scope "
"[%s]", all_reprocessing_schedules, scope_to_reprocess)
if not all_reprocessing_schedules:
LOG.debug(
"No need to validate possible collision of reprocessing "
"for scope [%s] because it does not have active "
"reprocessing schedules." % scope_to_reprocess)
continue
for schedule in all_reprocessing_schedules:
scheduled_range = DateTimeRange(
tzutils.local_to_utc(schedule.start_reprocess_time),
tzutils.local_to_utc(schedule.end_reprocess_time))
try:
if scheduling_range.is_intersection(scheduled_range):
raise http_exceptions.BadRequest(
self.generate_overlap_error_message(
scheduled_range, scheduling_range,
scope_to_reprocess))
except ValueError as e:
raise http_exceptions.BadRequest(
self.generate_overlap_error_message(
scheduled_range, scheduling_range,
scope_to_reprocess) + "Error: [%s]." % e)
@staticmethod
def generate_overlap_error_message(scheduled_range, scheduling_range,
scope_to_reprocess):
return "Cannot schedule a reprocessing for scope [%s] for " \
"reprocessing time [%s], because it already has a schedule " \
"for a similar time range [%s]." % (scope_to_reprocess,
scheduling_range,
scheduled_range)
ACCEPTED_GET_REPROCESSING_REQUEST_ORDERS = ['asc', 'desc']
class ReprocessSchedulerGetApi(base.BaseResource):
def __init__(self, *args, **kwargs):
super(ReprocessSchedulerGetApi, self).__init__(*args, **kwargs)
self.schedule_reprocessing_db = storage_state.ReprocessingSchedulerDb()
@api_utils.paginated
@api_utils.add_input_schema('query', {
voluptuous.Optional('scope_ids'): api_utils.MultiQueryParam(str),
voluptuous.Optional('order', default="desc"):
api_utils.SingleQueryParam(str)
})
@api_utils.add_output_schema({'results': [{
voluptuous.Required('reason'): validation_utils.get_string_type(),
voluptuous.Required('scope_id'): validation_utils.get_string_type(),
voluptuous.Required('start_reprocess_time'):
validation_utils.get_string_type(),
voluptuous.Required('end_reprocess_time'):
validation_utils.get_string_type(),
voluptuous.Required('current_reprocess_time'):
validation_utils.get_string_type(),
}]})
def get(self, scope_ids=[], path_scope_id=None, offset=0, limit=100,
order="desc"):
if path_scope_id and scope_ids:
LOG.warning("Filtering by scope IDs [%s] and path scope ID [%s] "
"does not make sense. You should use only one of "
"them. We will use only the path scope ID for this "
"request.", scope_ids, path_scope_id)
if path_scope_id:
scope_ids = [path_scope_id]
policy.authorize(
flask.request.context,
'schedule:get_task_reprocesses',
{'tenant_id': flask.request.context.project_id or scope_ids}
)
if not isinstance(scope_ids, list):
scope_ids = [scope_ids]
if order not in ACCEPTED_GET_REPROCESSING_REQUEST_ORDERS:
raise http_exceptions.BadRequest(
"The order [%s] is not valid. Accepted values are %s.",
order, ACCEPTED_GET_REPROCESSING_REQUEST_ORDERS)
schedules = self.schedule_reprocessing_db.get_all(
identifier=scope_ids, remove_finished=False,
offset=offset, limit=limit, order=order)
return {
'results': [{
'scope_id': s.identifier,
'reason': s.reason,
'start_reprocess_time': s.start_reprocess_time.isoformat(),
'end_reprocess_time': s.end_reprocess_time.isoformat(),
'current_reprocess_time':
s.current_reprocess_time.isoformat() if
s.current_reprocess_time else "",
} for s in schedules]}
class ReprocessesSchedulerGetApi(ReprocessSchedulerGetApi):
def __init__(self, *args, **kwargs):
super(ReprocessesSchedulerGetApi, self).__init__(*args, **kwargs)

View File

@ -25,7 +25,7 @@ def main():
# before the prepare_service(), making cfg.CONF returning default values
# systematically.
from cloudkitty import orchestrator
orchestrator.OrchestratorServiceManager().run()
orchestrator.CloudKittyServiceManager().run()
if __name__ == '__main__':

View File

@ -24,6 +24,7 @@ from cloudkitty.common.policies.v1 import storage as v1_storage
from cloudkitty.common.policies.v2 import dataframes as v2_dataframes
from cloudkitty.common.policies.v2 import scope as v2_scope
from cloudkitty.common.policies.v2 import summary as v2_summary
from cloudkitty.common.policies.v2 import tasks as v2_tasks
def list_rules():
@ -37,4 +38,5 @@ def list_rules():
v2_dataframes.list_rules(),
v2_scope.list_rules(),
v2_summary.list_rules(),
v2_tasks.list_rules()
)

View File

@ -0,0 +1,36 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_policy import policy
from cloudkitty.common.policies import base
schedule_policies = [
policy.DocumentedRuleDefault(
name='schedule:task_reprocesses',
check_str=base.ROLE_ADMIN,
description='Schedule a scope for reprocessing',
operations=[{'path': '/v2/task/reprocesses',
'method': 'POST'}]),
policy.DocumentedRuleDefault(
name='schedule:get_task_reprocesses',
check_str=base.ROLE_ADMIN,
description='Get reprocessing schedule tasks for scopes.',
operations=[{'path': '/v2/task/reprocesses',
'method': 'GET'}]),
]
def list_rules():
return schedule_policies

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
from datetime import timedelta
import decimal
import functools
@ -57,9 +59,15 @@ orchestrator_opts = [
'max_workers',
default=multiprocessing.cpu_count(),
sample_default=4,
min=1,
help='Maximal number of workers to run. Defaults to the number of '
'available CPUs'),
min=0,
help='Max number of workers to execute the rating process. Defaults '
'to the number of available CPU cores.'),
cfg.IntOpt(
'max_workers_reprocessing',
default=multiprocessing.cpu_count(),
min=0,
help='Max number of workers to execute the reprocessing. Defaults to '
'the number of available CPU cores.'),
cfg.IntOpt('max_threads',
# NOTE(peschk_l): This is the futurist default
default=multiprocessing.cpu_count() * 5,
@ -272,17 +280,18 @@ def _check_state(obj, period, tenant_id):
class Worker(BaseWorker):
def __init__(self, collector, storage, tenant_id, worker_id):
super(Worker, self).__init__(tenant_id)
self._collector = collector
self._storage = storage
self._period = CONF.collect.period
self._wait_time = CONF.collect.wait_periods * self._period
self._tenant_id = tenant_id
self._worker_id = worker_id
self._log_prefix = '[scope: {scope}, worker: {worker}] '.format(
scope=self._tenant_id, worker=self._worker_id)
self._conf = ck_utils.load_conf(CONF.collect.metrics_conf)
self._state = state.StateManager()
self._check_state = functools.partial(
self.next_timestamp_to_process = functools.partial(
_check_state, self, self._period, self._tenant_id)
super(Worker, self).__init__(self._tenant_id)
@ -295,10 +304,10 @@ class Worker(BaseWorker):
metric,
start_timestamp,
next_timestamp,
self._tenant_id,
self._tenant_id
)
if not data:
raise collector.NoDataCollected
raise collector.NoDataCollected(self._collector, metric)
return name, data
@ -366,63 +375,179 @@ class Worker(BaseWorker):
return dict(filter(lambda x: x[1] is not None, results))
def run(self):
while True:
timestamp = self._check_state()
LOG.debug("Processing timestamp [%s] for storage scope [%s].",
timestamp, self._tenant_id)
if not timestamp:
break
if self._state.get_state(self._tenant_id):
if not self._state.is_storage_scope_active(self._tenant_id):
LOG.debug("Skipping processing for storage scope [%s] "
"because it is marked as inactive.",
self._tenant_id)
break
else:
LOG.debug("No need to check if [%s] is de-activated. "
"We have never processed it before.")
should_continue_processing = self.execute_worker_processing()
while should_continue_processing:
should_continue_processing = self.execute_worker_processing()
def execute_worker_processing(self):
timestamp = self.next_timestamp_to_process()
LOG.debug("Processing timestamp [%s] for storage scope [%s].",
timestamp, self._tenant_id)
if not timestamp:
LOG.debug("Worker [%s] finished processing storage scope [%s].",
self._worker_id, self._tenant_id)
return False
if self._state.get_state(self._tenant_id):
if not self._state.is_storage_scope_active(self._tenant_id):
LOG.debug("Skipping processing for storage scope [%s] because "
"it is marked as inactive.", self._tenant_id)
break
LOG.debug("Skipping processing for storage scope [%s] "
"because it is marked as inactive.",
self._tenant_id)
return False
else:
LOG.debug("No need to check if [%s] is de-activated. "
"We have never processed it before.")
self.do_execute_scope_processing(timestamp)
return True
metrics = list(self._conf['metrics'].keys())
def do_execute_scope_processing(self, timestamp):
metrics = list(self._conf['metrics'].keys())
# Collection
metrics = sorted(metrics)
usage_data = self._do_collection(metrics, timestamp)
# Collection
usage_data = self._do_collection(metrics, timestamp)
LOG.debug("Usage data [%s] found for storage scope [%s] in "
"timestamp [%s].", usage_data, self._tenant_id,
timestamp)
LOG.debug("Usage data [%s] found for storage scope [%s] in "
"timestamp [%s].", usage_data, self._tenant_id,
timestamp)
start_time = timestamp
end_time = tzutils.add_delta(timestamp,
timedelta(seconds=self._period))
# No usage records found in
if not usage_data:
LOG.warning("No usage data for storage scope [%s] on "
"timestamp [%s]. You might want to consider "
"de-activating it.", self._tenant_id, timestamp)
start_time = timestamp
end_time = tzutils.add_delta(timestamp,
timedelta(seconds=self._period))
else:
frame = self.execute_measurements_rating(end_time, start_time,
usage_data)
self.persist_rating_data(end_time, frame, start_time)
frame = dataframe.DataFrame(
start=start_time,
end=end_time,
usage=usage_data,
)
# Rating
for processor in self._processors:
frame = processor.obj.process(frame)
self.update_scope_processing_state_db(timestamp)
# Writing
LOG.debug("Persisting processed frames [%s] for tenant [%s] and "
"time [start=%s,end=%s]", frame, self._tenant_id,
start_time, end_time)
def persist_rating_data(self, end_time, frame, start_time):
LOG.debug("Persisting processed frames [%s] for scope [%s] and time "
"[start=%s,end=%s]", frame, self._tenant_id, start_time,
end_time)
self._storage.push([frame], self._tenant_id)
self._state.set_state(self._tenant_id, timestamp)
self._storage.push([frame], self._tenant_id)
def execute_measurements_rating(self, end_time, start_time, usage_data):
frame = dataframe.DataFrame(
start=start_time,
end=end_time,
usage=usage_data,
)
for processor in self._processors:
original_data = copy.deepcopy(frame)
frame = processor.obj.process(frame)
LOG.debug("Results [%s] for processing [%s] of data points [%s].",
frame, processor.obj.process, original_data)
return frame
def update_scope_processing_state_db(self, timestamp):
self._state.set_state(self._tenant_id, timestamp)
class Orchestrator(cotyledon.Service):
class ReprocessingWorker(Worker):
def __init__(self, collector, storage, tenant_id, worker_id):
self.scope = tenant_id
self.scope_key = None
super(ReprocessingWorker, self).__init__(
collector, storage, self.scope.identifier, worker_id)
self.reprocessing_scheduler_db = state.ReprocessingSchedulerDb()
self.next_timestamp_to_process = self._next_timestamp_to_process
self.load_scope_key()
def load_scope_key(self):
scope_from_db = self._state.get_all(self._tenant_id)
if len(scope_from_db) < 1:
raise Exception("Scope [%s] scheduled for reprocessing does not "
"seem to exist anymore." % self.scope)
if len(scope_from_db) > 1:
raise Exception("Unexpected number of storage state entries found "
"for scope [%s]." % self.scope)
self.scope_key = scope_from_db[0].scope_key
def _next_timestamp_to_process(self):
db_item = self.reprocessing_scheduler_db.get_from_db(
identifier=self.scope.identifier,
start_reprocess_time=self.scope.start_reprocess_time,
end_reprocess_time=self.scope.end_reprocess_time)
if not db_item:
LOG.info("It seems that the processing for schedule [%s] was "
"finished by other worker.", self.scope)
return None
return ReprocessingWorker.generate_next_timestamp(
db_item, self._period)
@staticmethod
def generate_next_timestamp(db_item, processing_period_interval):
new_timestamp = db_item.start_reprocess_time
if db_item.current_reprocess_time:
period_delta = timedelta(seconds=processing_period_interval)
new_timestamp = db_item.current_reprocess_time + period_delta
LOG.debug("Current reprocessed time is [%s], therefore, the next "
"one to process is [%s] based on the processing "
"interval [%s].", db_item.start_reprocess_time,
new_timestamp, processing_period_interval)
else:
LOG.debug("There is no reprocessing for the schedule [%s]. "
"Therefore, we use the start time [%s] as the first "
"time to process.", db_item, new_timestamp)
if new_timestamp <= db_item.end_reprocess_time:
return tzutils.local_to_utc(new_timestamp)
else:
LOG.debug("No need to keep reprocessing schedule [%s] as we "
"processed all requested timestamps.", db_item)
return None
def do_execute_scope_processing(self, timestamp):
end_of_this_processing = timestamp + timedelta(seconds=self._period)
end_of_this_processing = tzutils.local_to_utc(end_of_this_processing)
LOG.debug("Cleaning backend [%s] data for reprocessing scope [%s] "
"for timeframe[start=%s, end=%s].",
self._storage, self.scope, timestamp, end_of_this_processing)
self._storage.delete(
begin=timestamp, end=end_of_this_processing,
filters={self.scope_key: self._tenant_id})
LOG.debug("Executing the reprocessing of scope [%s] for "
"timeframe[start=%s, end=%s].", self.scope, timestamp,
end_of_this_processing)
super(ReprocessingWorker, self).do_execute_scope_processing(timestamp)
def update_scope_processing_state_db(self, timestamp):
LOG.debug("After data is persisted in the storage backend [%s], we "
"will update the scope [%s] current processing time to "
"[%s].", self._storage, self.scope, timestamp)
self.reprocessing_scheduler_db.update_reprocessing_time(
identifier=self.scope.identifier,
start_reprocess_time=self.scope.start_reprocess_time,
end_reprocess_time=self.scope.end_reprocess_time,
new_current_time_stamp=timestamp)
class CloudKittyProcessor(cotyledon.Service):
def __init__(self, worker_id):
self._worker_id = worker_id
super(Orchestrator, self).__init__(self._worker_id)
super(CloudKittyProcessor, self).__init__(self._worker_id)
self.tenants = []
self.fetcher = driver.DriverManager(
FETCHERS_NAMESPACE,
@ -445,9 +570,16 @@ class Orchestrator(cotyledon.Service):
CONF.orchestrator.coordination_url,
uuidutils.generate_uuid().encode('ascii'))
self.coord.start(start_heart=True)
self._check_state = functools.partial(
self.next_timestamp_to_process = functools.partial(
_check_state, self, CONF.collect.period)
self.worker_class = Worker
self.log_worker_initiated()
def log_worker_initiated(self):
LOG.info("Processor worker ID [%s] is initiated as CloudKitty "
"rating processor.", self._worker_id)
def _init_messaging(self):
target = oslo_messaging.Target(topic='cloudkitty',
server=CONF.host,
@ -469,51 +601,125 @@ class Orchestrator(cotyledon.Service):
def run(self):
LOG.debug('Started worker {}.'.format(self._worker_id))
while True:
self.tenants = self.fetcher.get_tenants()
random.shuffle(self.tenants)
LOG.info('[Worker: {w}] {n} tenants loaded for fetcher {f}'.format(
w=self._worker_id, n=len(self.tenants), f=self.fetcher.name))
for tenant_id in self.tenants:
lock_name, lock = get_lock(self.coord, tenant_id)
LOG.debug(
'[Worker: {w}] Trying to acquire lock "{lck}" ...'.format(
w=self._worker_id, lck=lock_name)
)
if lock.acquire(blocking=False):
LOG.debug(
'[Worker: {w}] Acquired lock "{lck}" ...'.format(
w=self._worker_id, lck=lock_name)
)
state = self._check_state(tenant_id)
LOG.debug("Next timestamp [%s] found for processing for "
"storage scope [%s].", state, tenant_id)
if state:
worker = Worker(
self.collector,
self.storage,
tenant_id,
self._worker_id,
)
worker.run()
lock.release()
LOG.debug("Finished processing all storage scopes.")
# FIXME(sheeprine): We may cause a drift here
time.sleep(CONF.collect.period)
self.internal_run()
def terminate(self):
LOG.debug('Terminating worker {}...'.format(self._worker_id))
LOG.debug('Terminating worker {}.'.format(self._worker_id))
self.coord.stop()
LOG.debug('Terminated worker {}.'.format(self._worker_id))
def internal_run(self):
self.load_scopes_to_process()
for tenant_id in self.tenants:
lock_name, lock = get_lock(
self.coord, self.generate_lock_base_name(tenant_id))
class OrchestratorServiceManager(cotyledon.ServiceManager):
LOG.debug('[Worker: {w}] Trying to acquire lock "{lock_name}".'
.format(w=self._worker_id, lock_name=lock_name))
lock_acquired = lock.acquire(blocking=False)
if lock_acquired:
LOG.debug('[Worker: {w}] Acquired lock "{lock_name}".'.format(
w=self._worker_id, lock_name=lock_name))
try:
self.process_scope(tenant_id)
finally:
lock.release()
LOG.debug("Finished processing scopes [%s].", tenant_id)
else:
LOG.debug("Could not acquire lock [%s] for processing "
"scope [%s] with worker [%s].", lock_name,
tenant_id, self.worker_class)
LOG.debug("Finished processing all storage scopes with worker "
"[worker_id=%s, class=%s].",
self._worker_id, self.worker_class)
# FIXME(sheeprine): We may cause a drift here
time.sleep(CONF.collect.period)
def process_scope(self, scope_to_process):
timestamp = self.next_timestamp_to_process(scope_to_process)
LOG.debug("Next timestamp [%s] found for processing for "
"storage scope [%s].", state, scope_to_process)
if not timestamp:
LOG.debug("There is no next timestamp to process for scope [%s]",
scope_to_process)
return
worker = self.worker_class(
self.collector,
self.storage,
scope_to_process,
self._worker_id,
)
worker.run()
def generate_lock_base_name(self, tenant_id):
return tenant_id
def load_scopes_to_process(self):
self.tenants = self.fetcher.get_tenants()
random.shuffle(self.tenants)
LOG.info('[Worker: {w}] Tenants loaded for fetcher {f}'.format(
w=self._worker_id, f=self.fetcher.name))
class CloudKittyReprocessor(CloudKittyProcessor):
def __init__(self, worker_id):
super(CloudKittyReprocessor, self).__init__(worker_id)
self.next_timestamp_to_process = self._next_timestamp_to_process
self.worker_class = ReprocessingWorker
self.reprocessing_scheduler_db = state.ReprocessingSchedulerDb()
def log_worker_initiated(self):
LOG.info("Processor worker ID [%s] is initiated as CloudKitty "
"rating reprocessor.", self._worker_id)
def _next_timestamp_to_process(self, scope):
scope_db = self.reprocessing_scheduler_db.get_from_db(
identifier=scope.identifier,
start_reprocess_time=scope.start_reprocess_time,
end_reprocess_time=scope.end_reprocess_time)
if scope_db:
return ReprocessingWorker.generate_next_timestamp(
scope_db, CONF.collect.period)
else:
LOG.debug("It seems that the processing for schedule [%s] was "
"finished by other CloudKitty reprocessor.", scope)
return None
def load_scopes_to_process(self):
self.tenants = self.reprocessing_scheduler_db.get_all()
random.shuffle(self.tenants)
LOG.info('Reprocessing worker [%s] loaded [%s] schedules to process.',
self._worker_id, len(self.tenants))
def generate_lock_base_name(self, scope):
return "%s-id=%s-start=%s-end=%s-current=%s" % (
self.worker_class, scope.identifier, scope.start_reprocess_time,
scope.end_reprocess_time, scope.current_reprocess_time)
class CloudKittyServiceManager(cotyledon.ServiceManager):
def __init__(self):
super(OrchestratorServiceManager, self).__init__()
self.service_id = self.add(Orchestrator,
workers=CONF.orchestrator.max_workers)
super(CloudKittyServiceManager, self).__init__()
if CONF.orchestrator.max_workers:
self.cloudkitty_processor_service_id = self.add(
CloudKittyProcessor, workers=CONF.orchestrator.max_workers)
else:
LOG.info("No worker configured for CloudKitty processing.")
if CONF.orchestrator.max_workers_reprocessing:
self.cloudkitty_reprocessor_service_id = self.add(
CloudKittyReprocessor,
workers=CONF.orchestrator.max_workers_reprocessing)
else:
LOG.info("No worker configured for CloudKitty reprocessing.")

View File

@ -275,6 +275,10 @@ class InfluxClient(object):
else:
query += filter_query
query += ';'
LOG.debug("InfluxDB query to delete elements filtering by [%s] and "
"with [begin=%s, end=%s]: [%].", filters, begin, end, query)
self._conn.query(query)

View File

@ -16,6 +16,8 @@
from oslo_config import cfg
from oslo_db.sqlalchemy import utils
from oslo_log import log
from sqlalchemy import or_ as or_operation
from sqlalchemy import sql
from cloudkitty import db
from cloudkitty.storage_state import migration
@ -303,3 +305,140 @@ class StateManager(object):
session.close()
return r.active
class ReprocessingSchedulerDb(object):
"""Class to access and operator the reprocessing scheduler in the DB"""
model = models.ReprocessingScheduler
def get_all(self, identifier=None, remove_finished=True,
limit=100, offset=0, order="desc"):
"""Returns all schedules for reprocessing for a given resource
:param identifier: Identifiers of the scopes
:type identifier: str
:param remove_finished: option to remove from the projection the
reprocessing scheduled that already finished.
:type remove_finished: bool
:param limit: optional to restrict the projection
:type limit: int
:param offset: optional to shift the projection
:type offset: int
:param order: optional parameter to indicate the order of the
projection. The ordering field will be the `id`.
:type order: str
"""
session = db.get_session()
session.begin()
query = utils.model_query(self.model, session)
if identifier:
query = query.filter(self.model.identifier.in_(identifier))
if remove_finished:
query = self.remove_finished_processing_schedules(query)
if order:
query = query.order_by(sql.text("id %s" % order))
query = apply_offset_and_limit(limit, offset, query)
result_set = query.all()
session.close()
return result_set
def remove_finished_processing_schedules(self, query):
return query.filter(or_operation(
self.model.current_reprocess_time.is_(None),
self.model.current_reprocess_time < self.model.end_reprocess_time
))
def persist(self, reprocessing_scheduler):
"""Persists the reprocessing_schedule
:param reprocessing_scheduler: reprocessing schedule that we want to
persist in the database.
:type reprocessing_scheduler: models.ReprocessingScheduler
"""
session = db.get_session()
session.begin()
session.add(reprocessing_scheduler)
session.commit()
session.close()
def get_from_db(self, identifier=None, start_reprocess_time=None,
end_reprocess_time=None):
"""Get the reprocessing schedule from DB
:param identifier: Identifier of the scope
:type identifier: str
:param start_reprocess_time: the start time used in the
reprocessing schedule
:type start_reprocess_time: datetime.datetime
:param end_reprocess_time: the end time used in the
reprocessing schedule
:type end_reprocess_time: datetime.datetime
"""
session = db.get_session()
session.begin()
result_set = self._get_db_item(
end_reprocess_time, identifier, session, start_reprocess_time)
session.close()
return result_set
def _get_db_item(self, end_reprocess_time, identifier, session,
start_reprocess_time):
query = utils.model_query(self.model, session)
query = query.filter(self.model.identifier == identifier)
query = query.filter(
self.model.start_reprocess_time == start_reprocess_time)
query = query.filter(
self.model.end_reprocess_time == end_reprocess_time)
query = self.remove_finished_processing_schedules(query)
return query.first()
def update_reprocessing_time(self, identifier=None,
start_reprocess_time=None,
end_reprocess_time=None,
new_current_time_stamp=None):
"""Update current processing time for a reprocessing schedule
:param identifier: Identifier of the scope
:type identifier: str
:param start_reprocess_time: the start time used in the
reprocessing schedule
:type start_reprocess_time: datetime.datetime
:param end_reprocess_time: the end time used in the
reprocessing schedule
:type end_reprocess_time: datetime.datetime
:param new_current_time_stamp: the new current timestamp to set
:type new_current_time_stamp: datetime.datetime
"""
session = db.get_session()
session.begin()
result_set = self._get_db_item(
end_reprocess_time, identifier, session, start_reprocess_time)
if not result_set:
LOG.warning("Trying to update current time to [%s] for identifier "
"[%s] and reprocessing range [start=%, end=%s], but "
"we could not find a this task in the database.",
new_current_time_stamp, identifier,
start_reprocess_time, end_reprocess_time)
return
new_current_time_stamp = tzutils.local_to_utc(
new_current_time_stamp, naive=True)
result_set.current_reprocess_time = new_current_time_stamp
session.commit()
session.close()

View File

@ -38,7 +38,6 @@ def upgrade():
with op.batch_alter_table(name,
copy_from=table,
recreate='always') as batch_op:
batch_op.alter_column('identifier')
batch_op.add_column(
sqlalchemy.Column('scope_activation_toggle_date',
sqlalchemy.DateTime, nullable=False,

View File

@ -0,0 +1,45 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Create reprocessing scheduler table
Revision ID: 9feccd32
Revises: 4d69395f
Create Date: 2021-06-04 16:27:00.595274
"""
from alembic import op
import sqlalchemy
# revision identifiers, used by Alembic.
revision = '9feccd32'
down_revision = '4d69395f'
def upgrade():
op.create_table(
'storage_scope_reprocessing_schedule',
sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column('identifier', sqlalchemy.String(length=256),
nullable=False, unique=False),
sqlalchemy.Column('start_reprocess_time', sqlalchemy.DateTime,
nullable=False),
sqlalchemy.Column('end_reprocess_time', sqlalchemy.DateTime,
nullable=False),
sqlalchemy.Column('current_reprocess_time', sqlalchemy.DateTime,
nullable=True),
sqlalchemy.Column('reason', sqlalchemy.Text, nullable=False),
sqlalchemy.PrimaryKeyConstraint('id'),
mysql_charset='utf8', mysql_engine='InnoDB'
)

View File

@ -20,6 +20,14 @@ from sqlalchemy.ext import declarative
Base = declarative.declarative_base()
def to_string_selected_fields(object_to_print, fields=[]):
object_to_return = {}
if object_to_print:
object_to_return = {
a: y for a, y in object_to_print.items() if a in fields}
return str(object_to_return)
class IdentifierState(Base, models.ModelBase):
"""Represents the state of a given identifier."""
@ -57,3 +65,39 @@ class IdentifierState(Base, models.ModelBase):
server_default=sqlalchemy.sql.func.now())
active = sqlalchemy.Column('active', sqlalchemy.Boolean, nullable=False,
default=True)
def __str__(self):
return to_string_selected_fields(
self, ['id', 'identifier', 'state', 'active'])
class ReprocessingScheduler(Base, models.ModelBase):
"""Represents the reprocessing scheduler table."""
@declarative.declared_attr
def __table_args__(cls):
return (
sqlalchemy.schema.PrimaryKeyConstraint('id'),
)
__tablename__ = 'storage_scope_reprocessing_schedule'
id = sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True)
reason = sqlalchemy.Column("reason", sqlalchemy.Text, nullable=False)
identifier = sqlalchemy.Column("identifier", sqlalchemy.String(256),
nullable=False, unique=False)
start_reprocess_time = sqlalchemy.Column("start_reprocess_time",
sqlalchemy.DateTime,
nullable=False)
end_reprocess_time = sqlalchemy.Column("end_reprocess_time",
sqlalchemy.DateTime,
nullable=False)
current_reprocess_time = sqlalchemy.Column("current_reprocess_time",
sqlalchemy.DateTime,
nullable=True)
def __str__(self):
return to_string_selected_fields(
self, ['id', 'identifier', 'start_reprocess_time',
'end_reprocess_time', 'current_reprocess_time'])

View File

View File

@ -0,0 +1,381 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import datetime
import re
from unittest import mock
from datetimerange import DateTimeRange
from werkzeug import exceptions as http_exceptions
from cloudkitty.api.v2.task import reprocess
from cloudkitty import tests
from cloudkitty.utils import tz as tzutils
class TestReprocessSchedulerPostApi(tests.TestCase):
def setUp(self):
super(TestReprocessSchedulerPostApi, self).setUp()
self.endpoint = reprocess.ReprocessSchedulerPostApi()
self.scope_ids = ["some-other-scope-id",
"5e56cb64-4980-4466-9fce-d0133c0c221e"]
self.start_reprocess_time = tzutils.localized_now()
self.end_reprocess_time =\
self.start_reprocess_time + datetime.timedelta(hours=1)
self.reason = "We are testing the reprocess API."
def test_validate_scope_ids_all_option_with_scope_ids(self):
self.scope_ids.append('ALL')
expected_message = \
"400 Bad Request: Cannot use 'ALL' with scope ID [['some-other-" \
"scope-id', '5e56cb64-4980-4466-9fce-d0133c0c221e', 'ALL']]. " \
"Either schedule a reprocessing for all active scopes using " \
"'ALL' option, or inform only the scopes you desire to schedule " \
"a reprocessing."
expected_message = re.escape(expected_message)
self.assertRaisesRegexp(http_exceptions.BadRequest, expected_message,
self.endpoint.validate_scope_ids,
self.scope_ids)
self.scope_ids.remove('ALL')
self.endpoint.validate_scope_ids(self.scope_ids)
def test_validate_inputs_blank_reason(self):
expected_message = \
"400 Bad Request: Empty or blank reason text is not allowed. " \
"Please, do inform/register the reason for the reprocessing of " \
"a previously processed timestamp."
expected_message = re.escape(expected_message)
self.assertRaisesRegexp(http_exceptions.BadRequest, expected_message,
self.endpoint.validate_inputs,
self.end_reprocess_time, "", self.scope_ids,
self.start_reprocess_time)
self.assertRaisesRegexp(
http_exceptions.BadRequest, expected_message,
self.endpoint.validate_inputs, self.end_reprocess_time,
" ", self.scope_ids, self.start_reprocess_time)
self.endpoint.validate_inputs(
self.end_reprocess_time, self.reason, self.scope_ids,
self.start_reprocess_time)
def test_validate_inputs_end_date_less_than_start_date(self):
original_end_reprocess_time = self.end_reprocess_time
self.end_reprocess_time =\
self.start_reprocess_time - datetime.timedelta(hours=1)
expected_message = \
"400 Bad Request: End reprocessing timestamp [%s] cannot be " \
"less than start reprocessing timestamp [%s]." % (
self.start_reprocess_time, self.end_reprocess_time)
expected_message = re.escape(expected_message)
self.assertRaisesRegexp(http_exceptions.BadRequest, expected_message,
self.endpoint.validate_inputs,
self.end_reprocess_time, self.reason,
self.scope_ids, self.start_reprocess_time)
self.end_reprocess_time = original_end_reprocess_time
self.endpoint.validate_inputs(
self.end_reprocess_time, self.reason, self.scope_ids,
self.start_reprocess_time)
def test_check_if_there_are_invalid_scopes(self):
all_scopes = self.generate_all_scopes_object()
element_removed = all_scopes.pop(0)
expected_message = \
"400 Bad Request: Scopes [\'%s\'] scheduled to reprocess "\
"[start=%s, end=%s] do not exist."\
% (element_removed.identifier, self.start_reprocess_time,
self.end_reprocess_time)
expected_message = re.escape(expected_message)
self.assertRaisesRegexp(
http_exceptions.BadRequest, expected_message,
self.endpoint.check_if_there_are_invalid_scopes, all_scopes,
self.end_reprocess_time, self.scope_ids, self.start_reprocess_time)
all_scopes.append(element_removed)
self.endpoint.check_if_there_are_invalid_scopes(
all_scopes, self.end_reprocess_time, self.scope_ids,
self.start_reprocess_time)
def generate_all_scopes_object(self, last_processed_time=None):
all_scopes = []
def mock_to_string(self):
return "toStringMock"
for s in self.scope_ids:
scope = mock.Mock()
scope.identifier = s
scope.last_processed_timestamp = last_processed_time
scope.__str__ = mock_to_string
all_scopes.append(scope)
return all_scopes
@mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb.get_all")
def test_validate_reprocessing_schedules_overlaps(
self, schedule_get_all_mock):
self.configure_and_execute_overlap_test(schedule_get_all_mock,
self.start_reprocess_time,
self.end_reprocess_time)
self.configure_and_execute_overlap_test(schedule_get_all_mock,
self.end_reprocess_time,
self.start_reprocess_time)
end_reprocess_time =\
self.end_reprocess_time + datetime.timedelta(hours=5)
self.configure_and_execute_overlap_test(schedule_get_all_mock,
self.start_reprocess_time,
end_reprocess_time)
start_reprocess_time =\
self.start_reprocess_time + datetime.timedelta(hours=1)
self.configure_and_execute_overlap_test(schedule_get_all_mock,
start_reprocess_time,
end_reprocess_time)
start_reprocess_time =\
self.start_reprocess_time - datetime.timedelta(hours=1)
self.configure_and_execute_overlap_test(schedule_get_all_mock,
start_reprocess_time,
end_reprocess_time)
start_reprocess_time =\
self.end_reprocess_time + datetime.timedelta(hours=1)
self.configure_schedules_mock(schedule_get_all_mock,
start_reprocess_time,
end_reprocess_time)
self.endpoint.validate_reprocessing_schedules_overlaps(
self.generate_all_scopes_object(), self.end_reprocess_time,
self.start_reprocess_time)
schedule_get_all_mock.assert_has_calls([
mock.call(identifier=[self.scope_ids[0]]),
mock.call(identifier=[self.scope_ids[1]])])
def configure_and_execute_overlap_test(self, schedule_get_all_mock,
start_reprocess_time,
end_reprocess_time):
self.configure_schedules_mock(
schedule_get_all_mock, start_reprocess_time, end_reprocess_time)
scheduling_range = DateTimeRange(
tzutils.utc_to_local(self.start_reprocess_time),
tzutils.utc_to_local(self.end_reprocess_time))
scheduled_range = DateTimeRange(
tzutils.local_to_utc(start_reprocess_time),
tzutils.local_to_utc(end_reprocess_time))
expected_message = \
"400 Bad Request: Cannot schedule a reprocessing for scope " \
"[toStringMock] for reprocessing time [%s], because it already " \
"has a schedule for a similar time range [%s]." \
% (scheduling_range, scheduled_range)
expected_message = re.escape(expected_message)
self.assertRaisesRegexp(
http_exceptions.BadRequest, expected_message,
self.endpoint.validate_reprocessing_schedules_overlaps,
self.generate_all_scopes_object(),
self.end_reprocess_time, self.start_reprocess_time)
schedule_get_all_mock.assert_called_with(
identifier=[self.scope_ids[0]])
def configure_schedules_mock(self, schedule_get_all_mock,
start_reprocess_time, end_reprocess_time):
schedules = []
schedule_get_all_mock.return_value = schedules
all_scopes = self.generate_all_scopes_object()
for s in all_scopes:
schedule_mock = mock.Mock()
schedule_mock.identifier = s.identifier
schedule_mock.start_reprocess_time = start_reprocess_time
schedule_mock.end_reprocess_time = end_reprocess_time
schedules.append(schedule_mock)
def test_validate_start_end_for_reprocessing(self):
all_scopes = self.generate_all_scopes_object(
last_processed_time=self.start_reprocess_time)
base_error_message = "400 Bad Request: Cannot execute a " \
"reprocessing [%s=%s] for scope [toStringMock] " \
"%s after the last possible timestamp [%s]."
start_reprocess_time =\
self.start_reprocess_time + datetime.timedelta(hours=1)
expected_message = base_error_message % ("start",
start_reprocess_time,
"starting",
self.start_reprocess_time)
expected_message = re.escape(expected_message)
self.assertRaisesRegexp(
http_exceptions.BadRequest, expected_message,
self.endpoint.validate_start_end_for_reprocessing, all_scopes,
self.end_reprocess_time, start_reprocess_time)
all_scopes = self.generate_all_scopes_object(
last_processed_time=self.end_reprocess_time)
end_processing_time =\
self.end_reprocess_time + datetime.timedelta(hours=1)
expected_message = base_error_message % ("end",
end_processing_time,
"ending",
self.end_reprocess_time)
expected_message = re.escape(expected_message)
self.assertRaisesRegexp(
http_exceptions.BadRequest, expected_message,
self.endpoint.validate_start_end_for_reprocessing, all_scopes,
end_processing_time, self.start_reprocess_time)
self.endpoint.validate_start_end_for_reprocessing(
all_scopes, self.end_reprocess_time,
self.start_reprocess_time)
all_scopes = self.generate_all_scopes_object(
last_processed_time=self.start_reprocess_time)
self.endpoint.validate_start_end_for_reprocessing(
all_scopes, self.start_reprocess_time,
self.start_reprocess_time)
@mock.patch("flask.request")
@mock.patch("cloudkitty.common.policy.authorize")
@mock.patch("cloudkitty.api.v2.task.reprocess"
".ReprocessSchedulerPostApi.validate_inputs")
@mock.patch("cloudkitty.api.v2.task.reprocess"
".ReprocessSchedulerPostApi"
".check_if_there_are_invalid_scopes")
@mock.patch("cloudkitty.api.v2.task.reprocess."
"ReprocessSchedulerPostApi."
"validate_start_end_for_reprocessing")
@mock.patch("cloudkitty.api.v2.task.reprocess"
".ReprocessSchedulerPostApi"
".validate_reprocessing_schedules_overlaps")
@mock.patch("cloudkitty.storage_state.StateManager.get_all")
@mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb.persist")
def test_post(self, reprocessing_scheduler_db_persist_mock,
state_manager_get_all_mock,
validate_reprocessing_schedules_overlaps_mock,
validate_start_end_for_reprocessing_mock,
check_if_there_are_invalid_scopes_mock, validate_inputs_mock,
policy_mock, request_mock):
state_manager_get_all_mock.return_value =\
self.generate_all_scopes_object()
request_mock.context = mock.Mock()
request_mock.context.project_id = "project_id_mock"
def get_json_mock():
return {"scope_ids": self.scope_ids[0],
"start_reprocess_time": str(self.start_reprocess_time),
"end_reprocess_time": str(self.end_reprocess_time),
"reason": self.reason}
request_mock.get_json = get_json_mock
self.endpoint.post()
self.assertEqual(reprocessing_scheduler_db_persist_mock.call_count, 2)
state_manager_get_all_mock.assert_called_once()
validate_reprocessing_schedules_overlaps_mock.assert_called_once()
validate_start_end_for_reprocessing_mock.assert_called_once()
check_if_there_are_invalid_scopes_mock.assert_called_once()
validate_inputs_mock.assert_called_once()
policy_mock.assert_called_once()
class TestReprocessingSchedulerGetApi(tests.TestCase):
def setUp(self):
super(TestReprocessingSchedulerGetApi, self).setUp()
self.endpoint = reprocess.ReprocessSchedulerGetApi()
@mock.patch("flask.request")
@mock.patch("cloudkitty.common.policy.authorize")
@mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb.get_all")
def test_get(self, reprocessing_db_get_all_mock,
policy_mock, request_mock):
time_now = tzutils.localized_now()
schedule_mock = mock.Mock()
schedule_mock.id = 1
schedule_mock.identifier = "scope_identifier"
schedule_mock.reason = "reason to process"
schedule_mock.current_reprocess_time = time_now
schedule_mock.start_reprocess_time =\
time_now - datetime.timedelta(hours=10)
schedule_mock.end_reprocess_time =\
time_now + datetime.timedelta(hours=10)
reprocessing_db_get_all_mock.return_value = [schedule_mock]
request_mock.context = mock.Mock()
request_mock.args = mock.Mock()
request_mock.args.lists = mock.Mock()
request_mock.args.lists.return_value = []
list_all_return = self.endpoint.get()
self.assertTrue("results" in list_all_return)
self.assertTrue("id" not in list_all_return['results'][0])
self.assertTrue("scope_id" in list_all_return['results'][0])
self.assertTrue("reason" in list_all_return['results'][0])
self.assertTrue(
"current_reprocess_time" in list_all_return['results'][0])
self.assertTrue(
"start_reprocess_time" in list_all_return['results'][0])
self.assertTrue(
"end_reprocess_time" in list_all_return['results'][0])
self.assertEqual("scope_identifier",
list_all_return['results'][0]['scope_id'])
self.assertEqual("reason to process",
list_all_return['results'][0]['reason'])
self.assertEqual(time_now.isoformat(), list_all_return['results'][0][
'current_reprocess_time'])
self.assertEqual((time_now - datetime.timedelta(hours=10)).isoformat(),
list_all_return['results'][0]['start_reprocess_time'])
self.assertEqual((time_now + datetime.timedelta(hours=10)).isoformat(),
list_all_return['results'][0]['end_reprocess_time'])
reprocessing_db_get_all_mock.assert_called_once()
policy_mock.assert_called_once()

View File

@ -14,6 +14,8 @@
# under the License.
#
import datetime
import re
from unittest import mock
from oslo_messaging import conffixture
@ -168,22 +170,96 @@ class OrchestratorTest(tests.TestCase):
self.assertEqual('fake2', worker._processors[2].name)
self.assertEqual(1, worker._processors[2].obj.priority)
@mock.patch("cotyledon.ServiceManager.add")
@mock.patch("cotyledon._service_manager.ServiceManager.__init__")
def test_cloudkitty_service_manager_only_processing(
self, service_manager_init_mock, cotyledon_add_mock):
OrchestratorTest.execute_cloudkitty_service_manager_test(
cotyledon_add_mock=cotyledon_add_mock, max_workers_reprocessing=0,
max_workers=1)
self.assertTrue(service_manager_init_mock.called)
@mock.patch("cotyledon.ServiceManager.add")
@mock.patch("cotyledon._service_manager.ServiceManager.__init__")
def test_cloudkitty_service_manager_only_reprocessing(
self, service_manager_init_mock, cotyledon_add_mock):
OrchestratorTest.execute_cloudkitty_service_manager_test(
cotyledon_add_mock=cotyledon_add_mock, max_workers_reprocessing=1,
max_workers=0)
self.assertTrue(service_manager_init_mock.called)
@mock.patch("cotyledon.ServiceManager.add")
@mock.patch("cotyledon._service_manager.ServiceManager.__init__")
def test_cloudkitty_service_manager_both_processings(
self, service_manager_init_mock, cotyledon_add_mock):
OrchestratorTest.execute_cloudkitty_service_manager_test(
cotyledon_add_mock=cotyledon_add_mock)
self.assertTrue(service_manager_init_mock.called)
@staticmethod
def execute_cloudkitty_service_manager_test(cotyledon_add_mock=None,
max_workers=1,
max_workers_reprocessing=1):
original_conf = orchestrator.CONF
try:
orchestrator.CONF = mock.Mock()
orchestrator.CONF.orchestrator = mock.Mock()
orchestrator.CONF.orchestrator.max_workers = max_workers
orchestrator.CONF.orchestrator.max_workers_reprocessing = \
max_workers_reprocessing
orchestrator.CloudKittyServiceManager()
expected_calls = []
if max_workers:
expected_calls.append(
mock.call(orchestrator.CloudKittyProcessor,
workers=max_workers))
if max_workers_reprocessing:
expected_calls.append(
mock.call(orchestrator.CloudKittyReprocessor,
workers=max_workers_reprocessing))
cotyledon_add_mock.assert_has_calls(expected_calls)
finally:
orchestrator.CONF = original_conf
class WorkerTest(tests.TestCase):
def setUp(self):
super(WorkerTest, self).setUp()
class FakeWorker(orchestrator.Worker):
def __init__(self):
self._tenant_id = 'a'
self._worker_id = '0'
self._log_prefix = '[IGNORE THIS MESSAGE]'
patcher_state_manager_set_state = mock.patch(
"cloudkitty.storage_state.StateManager.set_state")
self.addCleanup(patcher_state_manager_set_state.stop)
self.state_manager_set_state_mock = \
patcher_state_manager_set_state.start()
self.worker = FakeWorker()
self.worker._collect = mock.MagicMock()
self._tenant_id = 'a'
self._worker_id = '0'
self.collector_mock = mock.MagicMock()
self.storage_mock = mock.MagicMock()
self.collector_mock.__str__.return_value = "toString"
load_conf_manager = mock.patch("cloudkitty.utils.load_conf")
self.addCleanup(load_conf_manager.stop)
self.load_conf_mock = load_conf_manager.start()
self.worker = orchestrator.Worker(self.collector_mock,
self.storage_mock, self._tenant_id,
self._worker_id)
def test_do_collection_all_valid(self):
timestamp_now = tzutils.localized_now()
metrics = ['metric{}'.format(i) for i in range(5)]
side_effect = [(
metrics[i],
@ -191,12 +267,15 @@ class WorkerTest(tests.TestCase):
'end': 3600},
'usage': i},
) for i in range(5)]
self.worker._collect.side_effect = side_effect
output = sorted(self.worker._do_collection(metrics, 0).items(),
self.collector_mock.retrieve.side_effect = side_effect
output = sorted(self.worker._do_collection(metrics,
timestamp_now).items(),
key=lambda x: x[1]['usage'])
self.assertEqual(side_effect, output)
def test_do_collection_some_empty(self):
timestamp_now = tzutils.localized_now()
metrics = ['metric{}'.format(i) for i in range(7)]
side_effect = [(
metrics[i],
@ -206,10 +285,772 @@ class WorkerTest(tests.TestCase):
) for i in range(5)]
side_effect.insert(2, collector.NoDataCollected('a', 'b'))
side_effect.insert(4, collector.NoDataCollected('a', 'b'))
self.worker._collect.side_effect = side_effect
output = sorted(self.worker._do_collection(metrics, 0).items(),
self.collector_mock.retrieve.side_effect = side_effect
output = sorted(self.worker._do_collection(
metrics, timestamp_now).items(),
key=lambda x: x[1]['usage'])
self.assertEqual([
i for i in side_effect
if not isinstance(i, collector.NoDataCollected)
], output)
def test_update_scope_processing_state_db(self):
timestamp = tzutils.localized_now()
self.worker.update_scope_processing_state_db(timestamp)
self.state_manager_set_state_mock.assert_has_calls([
mock.call(self.worker._tenant_id, timestamp)
])
@mock.patch("cloudkitty.dataframe.DataFrame")
def test_execute_measurements_rating(self, dataframe_mock):
new_data_frame_mock = mock.Mock()
dataframe_mock.return_value = new_data_frame_mock
processor_mock_1 = mock.Mock()
return_processor_1 = mock.Mock()
processor_mock_1.obj.process.return_value = return_processor_1
processor_mock_2 = mock.Mock()
return_processor_2 = mock.Mock()
processor_mock_2.obj.process.return_value = return_processor_2
self.worker._processors = [processor_mock_1, processor_mock_2]
start_time = tzutils.localized_now()
end_time = start_time + datetime.timedelta(hours=1)
return_of_method = self.worker.execute_measurements_rating(
end_time, start_time, {})
self.assertEqual(return_processor_2, return_of_method)
processor_mock_1.obj.process.assert_has_calls([
mock.call(new_data_frame_mock)
])
processor_mock_2.obj.process.assert_has_calls([
mock.call(return_processor_1)
])
dataframe_mock.assert_has_calls([
mock.call(start=start_time, end=end_time, usage={})
])
def test_persist_rating_data(self):
start_time = tzutils.localized_now()
end_time = start_time + datetime.timedelta(hours=1)
frame = {"id": "sd"}
self.worker.persist_rating_data(end_time, frame, start_time)
self.storage_mock.push.assert_has_calls([
mock.call([frame], self.worker._tenant_id)
])
@mock.patch("cloudkitty.orchestrator.Worker._do_collection")
@mock.patch("cloudkitty.orchestrator.Worker.execute_measurements_rating")
@mock.patch("cloudkitty.orchestrator.Worker.persist_rating_data")
@mock.patch("cloudkitty.orchestrator.Worker"
".update_scope_processing_state_db")
def test_do_execute_scope_processing_with_no_usage_data(
self, update_scope_processing_state_db_mock,
persist_rating_data_mock, execute_measurements_rating_mock,
do_collection_mock):
self.worker._conf = {"metrics": {"metric1": "s", "metric2": "d"}}
do_collection_mock.return_value = None
timestamp_now = tzutils.localized_now()
self.worker.do_execute_scope_processing(timestamp_now)
do_collection_mock.assert_has_calls([
mock.call(["metric1", "metric2"], timestamp_now)
])
self.assertFalse(execute_measurements_rating_mock.called)
self.assertFalse(persist_rating_data_mock.called)
self.assertTrue(update_scope_processing_state_db_mock.called)
@mock.patch("cloudkitty.orchestrator.Worker._do_collection")
@mock.patch("cloudkitty.orchestrator.Worker.execute_measurements_rating")
@mock.patch("cloudkitty.orchestrator.Worker.persist_rating_data")
@mock.patch("cloudkitty.orchestrator.Worker"
".update_scope_processing_state_db")
def test_do_execute_scope_processing_with_usage_data(
self, update_scope_processing_state_db_mock,
persist_rating_data_mock, execute_measurements_rating_mock,
do_collection_mock):
self.worker._conf = {"metrics": {"metric1": "s", "metric2": "d"}}
usage_data_mock = {"some_usage_data": 2}
do_collection_mock.return_value = usage_data_mock
execute_measurements_rating_mock_return = mock.Mock()
execute_measurements_rating_mock.return_value =\
execute_measurements_rating_mock_return
timestamp_now = tzutils.localized_now()
self.worker.do_execute_scope_processing(timestamp_now)
do_collection_mock.assert_has_calls([
mock.call(["metric1", "metric2"], timestamp_now)
])
end_time = tzutils.add_delta(
timestamp_now, datetime.timedelta(seconds=self.worker._period))
execute_measurements_rating_mock.assert_has_calls([
mock.call(end_time, timestamp_now, usage_data_mock)
])
persist_rating_data_mock.assert_has_calls([
mock.call(end_time, execute_measurements_rating_mock_return,
timestamp_now)
])
self.assertTrue(update_scope_processing_state_db_mock.called)
@mock.patch("cloudkitty.storage_state.StateManager.get_state")
@mock.patch("cloudkitty.storage_state.StateManager"
".is_storage_scope_active")
@mock.patch("cloudkitty.orchestrator.Worker.do_execute_scope_processing")
def test_execute_worker_processing_no_next_timestamp(
self, do_execute_scope_processing_mock,
state_manager_is_storage_scope_active_mock,
state_manager_get_stage_mock):
next_timestamp_to_process_mock = mock.Mock()
next_timestamp_to_process_mock.return_value = None
self.worker.next_timestamp_to_process = next_timestamp_to_process_mock
return_method_value = self.worker.execute_worker_processing()
self.assertFalse(return_method_value)
self.assertFalse(state_manager_get_stage_mock.called)
self.assertFalse(state_manager_is_storage_scope_active_mock.called)
self.assertFalse(do_execute_scope_processing_mock.called)
self.assertTrue(next_timestamp_to_process_mock.called)
@mock.patch("cloudkitty.storage_state.StateManager.get_state")
@mock.patch("cloudkitty.storage_state.StateManager"
".is_storage_scope_active")
@mock.patch("cloudkitty.orchestrator.Worker.do_execute_scope_processing")
def test_execute_worker_processing_scope_not_processed_yet(
self, do_execute_scope_processing_mock,
state_manager_is_storage_scope_active_mock,
state_manager_get_stage_mock):
timestamp_now = tzutils.localized_now()
next_timestamp_to_process_mock = mock.Mock()
next_timestamp_to_process_mock.return_value = timestamp_now
self.worker.next_timestamp_to_process = next_timestamp_to_process_mock
state_manager_get_stage_mock.return_value = None
return_method_value = self.worker.execute_worker_processing()
self.assertTrue(return_method_value)
state_manager_get_stage_mock.assert_has_calls([
mock.call(self.worker._tenant_id)
])
do_execute_scope_processing_mock.assert_has_calls([
mock.call(timestamp_now)
])
self.assertFalse(state_manager_is_storage_scope_active_mock.called)
self.assertTrue(next_timestamp_to_process_mock.called)
@mock.patch("cloudkitty.storage_state.StateManager.get_state")
@mock.patch("cloudkitty.storage_state.StateManager"
".is_storage_scope_active")
@mock.patch("cloudkitty.orchestrator.Worker.do_execute_scope_processing")
def test_execute_worker_processing_scope_already_processed_active(
self, do_execute_scope_processing_mock,
state_manager_is_storage_scope_active_mock,
state_manager_get_stage_mock):
timestamp_now = tzutils.localized_now()
next_timestamp_to_process_mock = mock.Mock()
next_timestamp_to_process_mock.return_value = timestamp_now
self.worker.next_timestamp_to_process = next_timestamp_to_process_mock
state_manager_get_stage_mock.return_value = mock.Mock()
state_manager_is_storage_scope_active_mock.return_value = True
return_method_value = self.worker.execute_worker_processing()
self.assertTrue(return_method_value)
state_manager_get_stage_mock.assert_has_calls([
mock.call(self.worker._tenant_id)
])
do_execute_scope_processing_mock.assert_has_calls([
mock.call(timestamp_now)
])
state_manager_is_storage_scope_active_mock.assert_has_calls([
mock.call(self.worker._tenant_id)
])
self.assertTrue(next_timestamp_to_process_mock.called)
@mock.patch("cloudkitty.storage_state.StateManager.get_state")
@mock.patch("cloudkitty.storage_state.StateManager"
".is_storage_scope_active")
@mock.patch("cloudkitty.orchestrator.Worker.do_execute_scope_processing")
def test_execute_worker_processing_scope_already_processed_inactive(
self, do_execute_scope_processing_mock,
state_manager_is_storage_scope_active_mock,
state_manager_get_stage_mock):
timestamp_now = tzutils.localized_now()
next_timestamp_to_process_mock = mock.Mock()
next_timestamp_to_process_mock.return_value = timestamp_now
self.worker.next_timestamp_to_process = next_timestamp_to_process_mock
state_manager_get_stage_mock.return_value = mock.Mock()
state_manager_is_storage_scope_active_mock.return_value = False
return_method_value = self.worker.execute_worker_processing()
self.assertFalse(return_method_value)
state_manager_get_stage_mock.assert_has_calls([
mock.call(self.worker._tenant_id)
])
state_manager_is_storage_scope_active_mock.assert_has_calls([
mock.call(self.worker._tenant_id)
])
self.assertTrue(next_timestamp_to_process_mock.called)
self.assertFalse(do_execute_scope_processing_mock.called)
@mock.patch("cloudkitty.orchestrator.Worker.execute_worker_processing")
def test_run(self, execute_worker_processing_mock):
execute_worker_processing_mock.side_effect = [True, True, False, True]
self.worker.run()
self.assertEqual(execute_worker_processing_mock.call_count, 3)
def test_collect_no_data(self):
metric = "metric1"
timestamp_now = tzutils.localized_now()
self.collector_mock.retrieve.return_value = (metric, None)
expected_message = "Collector 'toString' returned no data for " \
"resource 'metric1'"
expected_message = re.escape(expected_message)
self.assertRaisesRegexp(
collector.NoDataCollected, expected_message, self.worker._collect,
metric, timestamp_now)
next_timestamp = tzutils.add_delta(
timestamp_now, datetime.timedelta(seconds=self.worker._period))
self.collector_mock.retrieve.assert_has_calls([
mock.call(metric, timestamp_now, next_timestamp,
self.worker._tenant_id)])
def test_collect_with_data(self):
metric = "metric1"
timestamp_now = tzutils.localized_now()
usage_data = {"some_usage_data": 3}
self.collector_mock.retrieve.return_value = (metric, usage_data)
return_of_method = self.worker._collect(metric, timestamp_now)
next_timestamp = tzutils.add_delta(
timestamp_now, datetime.timedelta(seconds=self.worker._period))
self.collector_mock.retrieve.assert_has_calls([
mock.call(metric, timestamp_now, next_timestamp,
self.worker._tenant_id)])
self.assertEqual((metric, usage_data), return_of_method)
@mock.patch("cloudkitty.utils.check_time_state")
def test_check_state(self, check_time_state_mock):
state_mock = mock.Mock()
timestamp_now = tzutils.localized_now()
state_mock._state.get_state.return_value = timestamp_now
expected_time = timestamp_now + datetime.timedelta(hours=1)
check_time_state_mock.return_value = \
expected_time
return_of_method = orchestrator._check_state(
state_mock, 3600, self._tenant_id)
self.assertEqual(expected_time, return_of_method)
state_mock._state.get_state.assert_has_calls([
mock.call(self._tenant_id)])
check_time_state_mock.assert_has_calls([
mock.call(timestamp_now, 3600, 2)])
class CloudKittyReprocessorTest(tests.TestCase):
def setUp(self):
super(CloudKittyReprocessorTest, self).setUp()
@mock.patch("cloudkitty.orchestrator.CloudKittyProcessor.__init__")
@mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb")
def test_generate_lock_base_name(self, reprocessing_scheduler_db_mock,
cloudkitty_processor_init_mock):
scope_mock = mock.Mock()
scope_mock.identifier = "scope_identifier"
return_generate_lock_name = orchestrator.CloudKittyReprocessor(
1).generate_lock_base_name(scope_mock)
expected_lock_name = "<class 'cloudkitty.orchestrator." \
"ReprocessingWorker'>-id=scope_identifier-" \
"start=%s-end=%s-current=%s" % (
scope_mock.start_reprocess_time,
scope_mock.end_reprocess_time,
scope_mock.current_reprocess_time)
self.assertEqual(expected_lock_name, return_generate_lock_name)
cloudkitty_processor_init_mock.assert_called_once()
reprocessing_scheduler_db_mock.assert_called_once()
@mock.patch("cloudkitty.orchestrator.CloudKittyProcessor.__init__")
@mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb.get_all")
def test_load_scopes_to_process(self, scheduler_db_mock_get_all_mock,
cloudkitty_processor_init_mock):
scheduler_db_mock_get_all_mock.return_value = ["teste"]
reprocessor = CloudKittyReprocessorTest.create_cloudkitty_reprocessor()
reprocessor.load_scopes_to_process()
self.assertEqual(["teste"], reprocessor.tenants)
cloudkitty_processor_init_mock.assert_called_once()
scheduler_db_mock_get_all_mock.assert_called_once()
@mock.patch("cloudkitty.orchestrator.CloudKittyProcessor.__init__")
@mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb.get_from_db")
def test_next_timestamp_to_process_processing_finished(
self, scheduler_db_mock_get_from_db_mock,
cloudkitty_processor_init_mock):
start_time = tzutils.localized_now()
scope = CloudKittyReprocessorTest.create_scope_mock(start_time)
scheduler_db_mock_get_from_db_mock.return_value = None
reprocessor = CloudKittyReprocessorTest.create_cloudkitty_reprocessor()
next_timestamp = reprocessor._next_timestamp_to_process(scope)
expected_calls = [
mock.call(identifier=scope.identifier,
start_reprocess_time=scope.start_reprocess_time,
end_reprocess_time=scope.end_reprocess_time)]
self.assertIsNone(next_timestamp)
cloudkitty_processor_init_mock.assert_called_once()
scheduler_db_mock_get_from_db_mock.assert_has_calls(expected_calls)
@staticmethod
def create_scope_mock(start_time):
scope = mock.Mock()
scope.identifier = "scope_identifier"
scope.start_reprocess_time = start_time
scope.current_reprocess_time = None
scope.end_reprocess_time = start_time + datetime.timedelta(hours=1)
return scope
@staticmethod
def create_cloudkitty_reprocessor():
reprocessor = orchestrator.CloudKittyReprocessor(1)
reprocessor._worker_id = 1
return reprocessor
@mock.patch("cloudkitty.orchestrator.CloudKittyProcessor.__init__")
@mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb.get_from_db")
def test_next_timestamp_to_process(
self, scheduler_db_mock_get_from_db_mock,
cloudkitty_processor_init_mock):
start_time = tzutils.localized_now()
scope = CloudKittyReprocessorTest.create_scope_mock(start_time)
scheduler_db_mock_get_from_db_mock.return_value = scope
reprocessor = CloudKittyReprocessorTest.create_cloudkitty_reprocessor()
next_timestamp = reprocessor._next_timestamp_to_process(scope)
expected_calls = [
mock.call(identifier=scope.identifier,
start_reprocess_time=scope.start_reprocess_time,
end_reprocess_time=scope.end_reprocess_time)]
# There is no current timestamp in the mock object.
# Therefore, the next to process is the start timestamp
expected_next_timestamp = start_time
self.assertEqual(expected_next_timestamp, next_timestamp)
cloudkitty_processor_init_mock.assert_called_once()
scheduler_db_mock_get_from_db_mock.assert_has_calls(expected_calls)
class CloudKittyProcessorTest(tests.TestCase):
def setUp(self):
super(CloudKittyProcessorTest, self).setUp()
patcher_oslo_messaging_target = mock.patch("oslo_messaging.Target")
self.addCleanup(patcher_oslo_messaging_target.stop)
self.oslo_messaging_target_mock = patcher_oslo_messaging_target.start()
patcher_messaging_get_server = mock.patch(
"cloudkitty.messaging.get_server")
self.addCleanup(patcher_messaging_get_server.stop)
self.messaging_get_server_mock = patcher_messaging_get_server.start()
patcher_driver_manager = mock.patch("stevedore.driver.DriverManager")
self.addCleanup(patcher_driver_manager.stop)
self.driver_manager_mock = patcher_driver_manager.start()
get_collector_manager = mock.patch(
"cloudkitty.collector.get_collector")
self.addCleanup(get_collector_manager.stop)
self.get_collector_mock = get_collector_manager.start()
self.worker_id = 1
self.cloudkitty_processor = orchestrator.CloudKittyProcessor(
self.worker_id)
def test_init_messaging(self):
server_mock = mock.Mock()
self.messaging_get_server_mock.return_value = server_mock
target_object_mock = mock.Mock()
self.oslo_messaging_target_mock.return_value = target_object_mock
self.cloudkitty_processor._init_messaging()
server_mock.start.assert_called_once()
self.oslo_messaging_target_mock.assert_has_calls([
mock.call(topic='cloudkitty', server=orchestrator.CONF.host,
version='1.0')])
self.messaging_get_server_mock.assert_has_calls([
mock.call(target_object_mock, [
self.cloudkitty_processor._rating_endpoint,
self.cloudkitty_processor._scope_endpoint])])
@mock.patch("time.sleep")
@mock.patch("cloudkitty.orchestrator.CloudKittyProcessor."
"load_scopes_to_process")
@mock.patch("cloudkitty.orchestrator.CloudKittyProcessor."
"process_scope")
@mock.patch("cloudkitty.orchestrator.get_lock")
def test_internal_run(self, get_lock_mock, process_scope_mock,
load_scopes_to_process_mock, sleep_mock):
lock_mock = mock.Mock()
lock_mock.acquire.return_value = True
get_lock_mock.return_value = ("lock_name", lock_mock)
self.cloudkitty_processor.tenants = ["tenant1"]
self.cloudkitty_processor.internal_run()
lock_mock.acquire.assert_has_calls([mock.call(blocking=False)])
lock_mock.release.assert_called_once()
get_lock_mock.assert_has_calls(
[mock.call(self.cloudkitty_processor.coord, "tenant1")])
sleep_mock.assert_called_once()
process_scope_mock.assert_called_once()
load_scopes_to_process_mock.assert_called_once()
@mock.patch("cloudkitty.orchestrator.Worker")
def test_process_scope_no_next_timestamp(self, worker_class_mock):
original_next_timestamp_method = \
self.cloudkitty_processor.next_timestamp_to_process
next_timestamp_mock_method = mock.Mock()
try:
self.cloudkitty_processor.next_timestamp_to_process =\
next_timestamp_mock_method
scope_mock = mock.Mock()
next_timestamp_mock_method.return_value = None
self.cloudkitty_processor.process_scope(scope_mock)
next_timestamp_mock_method.assert_has_calls(
[mock.call(scope_mock)])
self.assertFalse(worker_class_mock.called)
finally:
self.cloudkitty_processor.next_timestamp_to_process =\
original_next_timestamp_method
@mock.patch("cloudkitty.orchestrator.Worker")
def test_process_scope(self, worker_class_mock):
original_next_timestamp_method =\
self.cloudkitty_processor.next_timestamp_to_process
next_timestamp_mock_method = mock.Mock()
worker_mock = mock.Mock()
worker_class_mock.return_value = worker_mock
original_worker_class = self.cloudkitty_processor.worker_class
self.cloudkitty_processor.worker_class = worker_class_mock
try:
self.cloudkitty_processor.next_timestamp_to_process =\
next_timestamp_mock_method
scope_mock = mock.Mock()
next_timestamp_mock_method.return_value = tzutils.localized_now()
self.cloudkitty_processor.process_scope(scope_mock)
next_timestamp_mock_method.assert_has_calls(
[mock.call(scope_mock)])
worker_class_mock.assert_has_calls(
[mock.call(self.cloudkitty_processor.collector,
self.cloudkitty_processor.storage, scope_mock,
self.cloudkitty_processor._worker_id)])
worker_mock.run.assert_called_once()
finally:
self.cloudkitty_processor.next_timestamp_to_process =\
original_next_timestamp_method
self.cloudkitty_processor.worker_class = original_worker_class
def test_generate_lock_base_name(self):
generated_lock_name = self.cloudkitty_processor.\
generate_lock_base_name("scope_id")
self.assertEqual("scope_id", generated_lock_name)
def test_load_scopes_to_process(self):
fetcher_mock = mock.Mock()
self.cloudkitty_processor.fetcher = fetcher_mock
fetcher_mock.get_tenants.return_value = ["scope_1"]
self.cloudkitty_processor.load_scopes_to_process()
fetcher_mock.get_tenants.assert_called_once()
self.assertEqual(["scope_1"], self.cloudkitty_processor.tenants)
def test_terminate(self):
coordinator_mock = mock.Mock()
self.cloudkitty_processor.coord = coordinator_mock
self.cloudkitty_processor.terminate()
coordinator_mock.stop.assert_called_once()
class ReprocessingWorkerTest(tests.TestCase):
def setUp(self):
super(ReprocessingWorkerTest, self).setUp()
patcher_reprocessing_scheduler_db_get_from_db = mock.patch(
"cloudkitty.storage_state.ReprocessingSchedulerDb.get_from_db")
self.addCleanup(patcher_reprocessing_scheduler_db_get_from_db.stop)
self.reprocessing_scheduler_db_get_from_db_mock =\
patcher_reprocessing_scheduler_db_get_from_db.start()
patcher_state_manager_get_all = mock.patch(
"cloudkitty.storage_state.StateManager.get_all")
self.addCleanup(patcher_state_manager_get_all.stop)
self.state_manager_get_all_mock = patcher_state_manager_get_all.start()
self.collector_mock = mock.Mock()
self.storage_mock = mock.Mock()
self.scope_key_mock = "key_mock"
self.worker_id = 1
self.scope_id = "scope_id1"
self.scope_mock = mock.Mock()
self.scope_mock.identifier = self.scope_id
load_conf_manager = mock.patch("cloudkitty.utils.load_conf")
self.addCleanup(load_conf_manager.stop)
self.load_conf_mock = load_conf_manager.start()
def to_string_scope_mock(self):
return "toStringMock"
self.scope_mock.__str__ = to_string_scope_mock
self.scope_mock.scope_key = self.scope_key_mock
self.state_manager_get_all_mock.return_value = [self.scope_mock]
self.reprocessing_worker = self.create_reprocessing_worker()
self.mock_scheduler = mock.Mock()
self.mock_scheduler.identifier = self.scope_id
self.start_schedule_mock = tzutils.localized_now()
self.mock_scheduler.start_reprocess_time = self.start_schedule_mock
self.mock_scheduler.current_reprocess_time = None
self.mock_scheduler.end_reprocess_time =\
self.start_schedule_mock + datetime.timedelta(hours=1)
def create_reprocessing_worker(self):
return orchestrator.ReprocessingWorker(
self.collector_mock, self.storage_mock, self.scope_mock,
self.worker_id)
def test_load_scope_key_scope_not_found(self):
self.state_manager_get_all_mock.return_value = []
expected_message = "Scope [toStringMock] scheduled for reprocessing " \
"does not seem to exist anymore."
expected_message = re.escape(expected_message)
self.assertRaisesRegexp(Exception, expected_message,
self.reprocessing_worker.load_scope_key)
self.state_manager_get_all_mock.assert_has_calls([
mock.call(self.reprocessing_worker._tenant_id)])
def test_load_scope_key_more_than_one_scope_found(self):
self.state_manager_get_all_mock.return_value = [
self.scope_mock, self.scope_mock]
expected_message = "Unexpected number of storage state entries " \
"found for scope [toStringMock]."
expected_message = re.escape(expected_message)
self.assertRaisesRegexp(Exception, expected_message,
self.reprocessing_worker.load_scope_key)
self.state_manager_get_all_mock.assert_has_calls([
mock.call(self.reprocessing_worker._tenant_id)])
def test_load_scope_key(self):
self.reprocessing_worker.load_scope_key()
self.state_manager_get_all_mock.assert_has_calls([
mock.call(self.reprocessing_worker._tenant_id)])
self.assertEqual(self.scope_key_mock,
self.reprocessing_worker.scope_key)
@mock.patch("cloudkitty.orchestrator.ReprocessingWorker"
".generate_next_timestamp")
def test_next_timestamp_to_process_no_db_item(
self, generate_next_timestamp_mock):
self.reprocessing_scheduler_db_get_from_db_mock.return_value = []
self.reprocessing_worker._next_timestamp_to_process()
self.reprocessing_scheduler_db_get_from_db_mock.assert_has_calls([
mock.call(
identifier=self.scope_mock.identifier,
start_reprocess_time=self.scope_mock.start_reprocess_time,
end_reprocess_time=self.scope_mock.end_reprocess_time)])
self.assertFalse(generate_next_timestamp_mock.called)
@mock.patch("cloudkitty.orchestrator.ReprocessingWorker"
".generate_next_timestamp")
def test_next_timestamp_to_process(self, generate_next_timestamp_mock):
self.reprocessing_scheduler_db_get_from_db_mock.\
return_value = self.scope_mock
self.reprocessing_worker._next_timestamp_to_process()
self.reprocessing_scheduler_db_get_from_db_mock.assert_has_calls([
mock.call(
identifier=self.scope_mock.identifier,
start_reprocess_time=self.scope_mock.start_reprocess_time,
end_reprocess_time=self.scope_mock.end_reprocess_time)])
generate_next_timestamp_mock.assert_has_calls([
mock.call(self.scope_mock, self.reprocessing_worker._period)])
def test_generate_next_timestamp_no_current_processing(self):
next_timestamp = self.reprocessing_worker.generate_next_timestamp(
self.mock_scheduler, 300)
self.assertEqual(self.start_schedule_mock, next_timestamp)
self.mock_scheduler.start_reprocess_time += datetime.timedelta(hours=2)
next_timestamp = self.reprocessing_worker.generate_next_timestamp(
self.mock_scheduler, 300)
self.assertIsNone(next_timestamp)
def test_generate_next_timestamp_with_current_processing(self):
period = 300
self.mock_scheduler.current_reprocess_time =\
self.start_schedule_mock + datetime.timedelta(seconds=period)
expected_next_time_stamp =\
self.mock_scheduler.current_reprocess_time + datetime.timedelta(
seconds=period)
next_timestamp = self.reprocessing_worker.generate_next_timestamp(
self.mock_scheduler, period)
self.assertEqual(expected_next_time_stamp, next_timestamp)
self.mock_scheduler.current_reprocess_time +=\
datetime.timedelta(hours=2)
next_timestamp = self.reprocessing_worker.generate_next_timestamp(
self.mock_scheduler, period)
self.assertIsNone(next_timestamp)
@mock.patch("cloudkitty.orchestrator.Worker.do_execute_scope_processing")
def test_do_execute_scope_processing(
self, do_execute_scope_processing_mock_from_worker):
now_timestamp = tzutils.localized_now()
self.reprocessing_worker.do_execute_scope_processing(now_timestamp)
expected_end = tzutils.localized_now() + datetime.timedelta(
seconds=self.reprocessing_worker._period)
self.storage_mock.delete.assert_has_calls([
mock.call(begin=now_timestamp, end=expected_end,
filters={self.reprocessing_worker.scope_key:
self.reprocessing_worker._tenant_id})])
do_execute_scope_processing_mock_from_worker.assert_has_calls([
mock.call(now_timestamp)])
@mock.patch("cloudkitty.storage_state.ReprocessingSchedulerDb"
".update_reprocessing_time")
def test_update_scope_processing_state_db(
self, update_reprocessing_time_mock):
timestamp_now = tzutils.localized_now()
self.reprocessing_worker.update_scope_processing_state_db(
timestamp_now)
start_time = self.reprocessing_worker.scope.start_reprocess_time
end_time = self.reprocessing_worker.scope.end_reprocess_time
update_reprocessing_time_mock.assert_has_calls([
mock.call(
identifier=self.reprocessing_worker.scope.identifier,
start_reprocess_time=start_time, end_reprocess_time=end_time,
new_current_time_stamp=timestamp_now)])

View File

@ -753,15 +753,21 @@
# Coordination backend URL (string value)
#coordination_url = file:///var/lib/cloudkitty/locks
# Maximal number of workers to run. Defaults to the number of
# available CPUs (integer value)
# Minimum value: 1
# Max number of workers to execute the rating process. Defaults to the
# number of available CPU cores. (integer value)
# Minimum value: 0
#
# This option has a sample default set, which means that
# its actual default value may vary from the one documented
# below.
#max_workers = 4
# Max number of workers to execute the reprocessing. Defaults to the
# number of available CPU cores. (integer value)
# Minimum value: 0
#max_workers_reprocessing = 8
# Maximal number of threads to use per worker. Defaults to 5 times the
# number of available CPUs (integer value)
# Minimum value: 1

View File

@ -105,3 +105,11 @@
# GET /v2/summary
#"summary:get_summary": "rule:admin_or_owner"
# Schedule a scope for reprocessing
# POST /v2/task/reprocesses
#"schedule:task_reprocesses": "role:admin"
# Get reprocessing schedule tasks for scopes.
# GET /v2/task/reprocesses
#"schedule:get_task_reprocesses": "role:admin"

View File

@ -3,3 +3,4 @@
.. include:: dataframes/dataframes.inc
.. include:: scope/scope.inc
.. include:: summary/summary.inc
.. include:: task/reprocessing.inc

View File

@ -0,0 +1,147 @@
======================
Task schedule endpoint
======================
CloudKitty has a task endpoint `/v2/task/<type_of_task>`, which allows
operators to schedule administrative tasks, such as reprocessing.
Currently, the only task available is the reprocessing one, which is avaiable
via the following endpoints.
- POST `/v2/task/reprocesses` -- to create a reprocessing task.
- GET `/v2/task/reprocesses/<path_scope_id>` -- to retrieve a reprocessing task.
- GET `/v2/task/reprocesses` -- to retrieve all reprocessing task.
Create a reprocessing task
==========================
The endpoint used to schedule a reprocessing task. The scheduled tasks are
loaded to execution once every processing cycle, as defined in the
CloudKitty `period` configuration.
.. rest_method:: POST `/v2/task/reprocesses`
.. rest_parameters:: task/reprocessing_parameters.yml
- scope_ids: scope_ids
- start_reprocess_time: start_reprocess_time
- end_reprocess_time: end_reprocess_time
- reason: reason
Status codes
------------
.. rest_status_code:: success http_status.yml
- 200
.. rest_status_code:: error http_status.yml
- 400
- 403
- 405
Response
--------
We will return an empty object as the response in case of success:
.. code-block:: javascript
{}
Example
-------
.. code-block:: shell
curl -s -X POST "https://<cloudkitty_server_and_port_here>/v2/task/reprocess" -H "Accept: application/json" -H "User-Agent: python-keystoneclient" -H "X-Auth-Token: ${ACCESS_TOKEN_KEYSTONE}" -H "Content-Type: application/json" -d '{"reason": "Reprocessing test", "scope_ids": "<Some scope ID>", "start_reprocess_time": "2021-06-01 00:00:00+00:00", "end_reprocess_time": "2021-06-01 23:00:00+00:00"}'
The scope IDs can be retrieved via "/v2/scope" API, which is the API that one can use to list all scopes, and their status.
Retrieve a reprocessing task
============================
The endpoint used to retrieve a reprocessing task. By using this endpoint, one
can for instance check the progress of the reprocessing tasks.
.. rest_method:: GET `/v2/task/reprocesses/<path_scope_id>`
.. rest_parameters:: task/reprocessing_parameters.yml
- path_scope_id: path_scope_id
Status codes
------------
.. rest_status_code:: success http_status.yml
- 200
.. rest_status_code:: error http_status.yml
- 400
- 403
- 405
Response
--------
We will return the scope data in case of a valid scope ID:
.. code-block:: javascript
{"scope_id": "scope ID goes here",
"reason": "The reason for this reprocessing for this scope",
"start_reprocess_time": "2021-06-01 00:00:00+00:00",
"end_reprocess_time": "2021-07-01 00:00:00+00:00",
"current_reprocess_time": "2021-06-06 00:00:00+00:00"}
Example
-------
.. code-block:: shell
curl -s -X GET "https://<cloudkitty_server_and_port_here>/v2/task/reprocesses/<scope ID goes here>" -H "Accept: application/json" -H "User-Agent: python-keystoneclient" -H "X-Auth-Token: ${ACCESS_TOKEN_KEYSTONE}"
Retrieve all reprocessing tasks
===============================
The endpoint used to retrieve all reprocessing tasks. By using this endpoint,
one can retrieve all reprocessing tasks scheduled for a scope.
.. rest_method:: GET `/v2/task/reprocesses`
.. rest_parameters:: task/reprocessing_parameters.yml
- scope_ids: scope_ids_query
Status codes
------------
.. rest_status_code:: success http_status.yml
- 200
.. rest_status_code:: error http_status.yml
- 400
- 403
- 405
Response
--------
We will return the scope data in case of a valid scope ID:
.. code-block:: javascript
[{"scope_id": "scope ID goes here",
"reason": "The reason for this reprocessing for this scope",
"start_reprocess_time": "2021-06-01 00:00:00+00:00",
"end_reprocess_time": "2021-07-01 00:00:00+00:00",
"current_reprocess_time": "2021-06-06 00:00:00+00:00"}]
Example
-------
.. code-block:: shell
curl -s -X GET "https://<cloudkitty_server_and_port_here>/v2/task/reprocesses" -H "Accept: application/json" -H "User-Agent: python-keystoneclient" -H "X-Auth-Token: ${ACCESS_TOKEN_KEYSTONE}"

View File

@ -0,0 +1,56 @@
path_scope_id:
in: path
description: |
The scope ID to retrieve.
type: string
required: true
limit:
in: query
description: |
For pagination. The maximum number of results to return.
type: int
required: false
offset:
in: query
description: |
For pagination. The index of the first element that should be returned.
type: int
required: false
scope_ids_query: &scope_ids_query
in: query
description: |
The scope IDs one wants to retrieve the reprocessing tasks of. If not
informed, all reprocessing tasks, for all scopes are retrieved.
required: false
type: string
end_reprocess_time:
in: body
description: |
The end date for the reprocessing task.
type: iso8601 timestamp
required: true
reason:
in: body
description: |
The reason for the reprocessing to take place.
type: string
required: true
scope_ids:
<<: *scope_ids_query
in: body
description: |
The scope IDs to reprocess. Must be comma-separated to schedule more than one.
required: true
start_reprocess_time:
in: body
description: |
The start date for the reprocessing task.
type: iso8601 timestamp
required: true

View File

@ -21,7 +21,7 @@ oslo.middleware==4.1.1 # Apache-2.0
oslo.policy==3.6.0 # Apache-2.0
oslo.utils==4.7.0 # Apache-2.0
oslo.upgradecheck==1.3.0 # Apache-2.0
python-dateutil==2.7.0 # BSD
python-dateutil==2.8.0 # BSD
SQLAlchemy==1.3.20 # MIT
stevedore==3.2.2 # Apache-2.0
tooz==2.7.1 # Apache-2.0
@ -32,6 +32,7 @@ Flask-RESTful==0.3.9 # BSD
cotyledon==1.7.3 # Apache-2.0
futurist==2.3.0 # Apache-2.0
bandit>=1.6.0 # Apache-2.0
datetimerange==0.6.1 # MIT
# test-requirements
coverage==5.3 # Apache-2.0

View File

@ -0,0 +1,5 @@
---
features:
- |
Introduce the reprocessing schedule API, which allows operators to
schedule reprocessing tasks to reprocess scopes in given timeframes.

View File

@ -23,7 +23,7 @@ oslo.middleware>=4.1.1 # Apache-2.0
oslo.policy>=3.6.0 # Apache-2.0
oslo.utils>=4.7.0 # Apache-2.0
oslo.upgradecheck>=1.3.0 # Apache-2.0
python-dateutil>=2.7.0 # BSD
python-dateutil>=2.8.0 # BSD
SQLAlchemy>=1.3.20 # MIT
stevedore>=3.2.2 # Apache-2.0
tooz>=2.7.1 # Apache-2.0
@ -33,3 +33,4 @@ Flask>=2.0.0 # BSD
Flask-RESTful>=0.3.9 # BSD
cotyledon>=1.7.3 # Apache-2.0
futurist>=2.3.0 # Apache-2.0
datetimerange>=0.6.1 # MIT