diff --git a/fenix/utils/time.py b/fenix/utils/time.py new file mode 100644 index 0000000..9005ea8 --- /dev/null +++ b/fenix/utils/time.py @@ -0,0 +1,48 @@ +# Copyright (c) 2018 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import datetime + + +def str_to_datetime(dt_str): + mdate, mtime = dt_str.split() + year, month, day = map(int, mdate.split('-')) + hours, minutes, seconds = map(int, mtime.split(':')) + return datetime.datetime(year, month, day, hours, minutes, seconds) + + +def reply_time_str(wait): + now = datetime.datetime.utcnow() + reply = now - datetime.timedelta( + seconds=wait) + return (reply.strftime('%Y-%m-%d %H:%M:%S')) + + +def time_now_str(): + return datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + + +def is_time_after_time(before, after): + if type(before) == str: + time_before = str_to_datetime(before) + else: + time_before = before + if type(after) == str: + time_after = str_to_datetime(after) + else: + time_after = after + if time_before > time_after: + return True + else: + return False diff --git a/fenix/workflow/workflow.py b/fenix/workflow/workflow.py index c6c559e..18a9a8a 100644 --- a/fenix/workflow/workflow.py +++ b/fenix/workflow/workflow.py @@ -12,12 +12,16 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - +import aodhclient.client as aodhclient from oslo_log import log as logging +import oslo_messaging as messaging from oslo_service import threadgroup from threading import Thread import time +from fenix.utils.identity_auth import get_identity_auth +from fenix.utils.identity_auth import get_session + LOG = logging.getLogger(__name__) @@ -175,6 +179,23 @@ class BaseWorkflow(Thread): 'MAINTENANCE_COMPLETE': 'maintenance_complete', 'MAINTENANCE_DONE': 'maintenance_done', 'FAILED': 'maintenance_failed'} + self.url = "http://%s:%s" % (conf.host, conf.port) + self.auth = get_identity_auth(conf.workflow_user, + conf.workflow_password, + conf.workflow_project) + self.session = get_session(auth=self.auth) + self.aodh = aodhclient.Client('2', self.session) + transport = messaging.get_transport(self.conf) + self.notif_proj = messaging.Notifier(transport, + 'maintenance.planned', + driver='messaging', + topics=['notifications']) + self.notif_proj = self.notif_proj.prepare(publisher_id='fenix') + self.notif_admin = messaging.Notifier(transport, + 'maintenance.host', + driver='messaging', + topics=['notifications']) + self.notif_admin = self.notif_admin.prepare(publisher_id='fenix') def _timer_expired(self, name): LOG.info("%s: timer expired %s" % (self.session_id, name)) @@ -230,3 +251,102 @@ class BaseWorkflow(Thread): time.sleep(1) # IDLE while session removed LOG.info("%s: done" % self.session_id) + + def projects_listen_alarm(self, match_event): + match_projects = ([str(alarm['project_id']) for alarm in + self.aodh.alarm.list() if + str(alarm['event_rule']['event_type']) == + match_event]) + all_projects_match = True + for project in self.session_data.project_names(): + if project not in match_projects: + LOG.error('%s: project %s not ' + 'listening to %s' % + (self.session_id, project, match_event)) + all_projects_match = False + return all_projects_match + + def _project_notify(self, project_id, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata): + reply_url = '%s/v1/maintenance/%s/%s' % (self.url, + self.session_id, + project_id) + + payload = dict(project_id=project_id, + instance_ids=instance_ids, + allowed_actions=allowed_actions, + state=state, + actions_at=actions_at, + reply_at=reply_at, + session_id=self.session_id, + metadata=metadata, + reply_url=reply_url) + + LOG.info('Sending "maintenance.planned" to project: %s' % payload) + + self.notif_proj.info({'some': 'context'}, 'maintenance.scheduled', + payload) + + def _admin_notify(self, project, host, state, session_id): + payload = dict(project_id=project, host=host, state=state, + session_id=session_id) + + LOG.info('Sending "maintenance.host": %s' % payload) + + self.notif_admin.info({'some': 'context'}, 'maintenance.host', payload) + + def projects_answer(self, state, projects): + state_ack = 'ACK_%s' % state + state_nack = 'NACK_%s' % state + for project in projects: + pstate = project.state + if pstate == state: + break + elif pstate == state_ack: + continue + elif pstate == state_nack: + LOG.error('%s: %s from %s' % + (self.session_id, pstate, project.name)) + break + else: + LOG.error('%s: Project %s in invalid state %s' % + (self.session_id, project.name, pstate)) + break + return pstate + + def _project_names_in_state(self, projects, state): + return ([project.name for project in projects if + project.state == state]) + + def wait_projects_state(self, state, timer_name): + state_ack = 'ACK_%s' % state + state_nack = 'NACK_%s' % state + projects = self.session_data.get_projects_with_state() + if not projects: + LOG.error('%s: wait_projects_state %s. Emtpy project list' % + (self.session_id, state)) + while not self.is_timer_expired(timer_name): + answer = self.projects_answer(state, projects) + if answer == state: + pass + else: + self.stop_timer(timer_name) + if answer == state_ack: + LOG.info('all projects in: %s' % state_ack) + return True + elif answer == state_nack: + pnames = self._projects_in_state(projects, answer) + LOG.error('%s: projects rejected with %s: %s' % + (self.session_id, answer, pnames)) + return False + else: + pnames = self._projects_in_state(projects, answer) + LOG.error('%s: projects with invalid state %s: %s' % + (self.session_id, answer, pnames)) + return False + time.sleep(1) + LOG.error('%s: timer %s expired waiting answer to state %s' % + (self.session_id, timer_name, state)) + pnames = self._projects_in_state(projects, state) + LOG.error('%s: projects not answered: %s' % (self.session_id, pnames)) + return False diff --git a/fenix/workflow/workflows/default.py b/fenix/workflow/workflows/default.py index f2e0a1d..55961af 100644 --- a/fenix/workflow/workflows/default.py +++ b/fenix/workflow/workflows/default.py @@ -12,18 +12,17 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - import datetime +import novaclient.client as novaclient from novaclient.exceptions import BadRequest + from oslo_log import log as logging -import oslo_messaging as messaging import time -from fenix.utils.identity_auth import get_identity_auth -from fenix.utils.identity_auth import get_session - -import aodhclient.client as aodhclient -import novaclient.client as novaclient +from fenix.utils.time import is_time_after_time +from fenix.utils.time import reply_time_str +from fenix.utils.time import str_to_datetime +from fenix.utils.time import time_now_str from fenix.workflow.workflow import BaseWorkflow @@ -35,24 +34,7 @@ class Workflow(BaseWorkflow): def __init__(self, conf, session_id, data): super(Workflow, self).__init__(conf, session_id, data) - self.url = "http://%s:%s" % (conf.host, conf.port) - self.auth = get_identity_auth(conf.workflow_user, - conf.workflow_password, - conf.workflow_project) - self.session = get_session(auth=self.auth) - self.aodh = aodhclient.Client('2', self.session) self.nova = novaclient.Client(version='2.34', session=self.session) - transport = messaging.get_transport(self.conf) - self.notif_proj = messaging.Notifier(transport, - 'maintenance.planned', - driver='messaging', - topics=['notifications']) - self.notif_proj = self.notif_proj.prepare(publisher_id='fenix') - self.notif_admin = messaging.Notifier(transport, - 'maintenance.host', - driver='messaging', - topics=['notifications']) - self.notif_admin = self.notif_admin.prepare(publisher_id='fenix') LOG.info("%s: initialized" % self.session_id) def cleanup(self): @@ -112,120 +94,6 @@ class Workflow(BaseWorkflow): ha) LOG.info(str(self.session_data)) - def projects_listen_alarm(self, match_event): - match_projects = ([str(alarm['project_id']) for alarm in - self.aodh.alarm.list() if - str(alarm['event_rule']['event_type']) == - match_event]) - all_projects_match = True - for project in self.session_data.project_names(): - if project not in match_projects: - LOG.error('%s: project %s not ' - 'listening to %s' % - (self.session_id, project, match_event)) - all_projects_match = False - return all_projects_match - - def str_to_datetime(self, dt_str): - mdate, mtime = dt_str.split() - year, month, day = map(int, mdate.split('-')) - hours, minutes, seconds = map(int, mtime.split(':')) - return datetime.datetime(year, month, day, hours, minutes, seconds) - - def reply_time_str(self, wait): - now = datetime.datetime.utcnow() - reply = now - datetime.timedelta( - seconds=wait) - return (reply.strftime('%Y-%m-%d %H:%M:%S')) - - def is_time_after_time(self, before, after): - if type(before) == str: - time_before = self.str_to_datetime(before) - else: - time_before = before - if type(after) == str: - time_after = self.str_to_datetime(after) - else: - time_after = after - if time_before > time_after: - return True - else: - return False - - def _project_notify(self, project_id, instance_ids, allowed_actions, - actions_at, reply_at, state, metadata): - reply_url = '%s/v1/maintenance/%s/%s' % (self.url, - self.session_id, - project_id) - - payload = dict(project_id=project_id, - instance_ids=instance_ids, - allowed_actions=allowed_actions, - state=state, - actions_at=actions_at, - reply_at=reply_at, - session_id=self.session_id, - metadata=metadata, - reply_url=reply_url) - - LOG.info('Sending "maintenance.planned" to project: %s' % payload) - - self.notif_proj.info({'some': 'context'}, 'maintenance.scheduled', - payload) - - def _admin_notify(self, project, host, state, session_id): - payload = dict(project_id=project, host=host, state=state, - session_id=session_id) - - LOG.info('Sending "maintenance.host": %s' % payload) - - self.notif_admin.info({'some': 'context'}, 'maintenance.host', payload) - - def projects_answer(self, state, projects): - state_ack = 'ACK_%s' % state - state_nack = 'NACK_%s' % state - for project in projects: - pstate = project.state - if pstate == state: - break - elif pstate == state_ack: - continue - elif pstate == state_nack: - LOG.error('%s: %s from %s' % - (self.session_id, pstate, project.name)) - break - else: - LOG.error('%s: Project %s in invalid state %s' % - (self.session_id, project.name, pstate)) - break - return pstate - - def wait_projects_state(self, state, timer_name): - state_ack = 'ACK_%s' % state - state_nack = 'NACK_%s' % state - projects = self.session_data.get_projects_with_state() - if not projects: - LOG.error('%s: wait_projects_state %s. Emtpy project list' % - (self.session_id, state)) - while not self.is_timer_expired(timer_name): - answer = self.projects_answer(state, projects) - if answer == state: - pass - else: - self.stop_timer(timer_name) - if answer == state_ack: - LOG.info('all projects in: %s' % state_ack) - return True - elif answer == state_nack: - return False - else: - return False - time.sleep(1) - LOG.error('%s: timer %s expired waiting answer to state %s' % - (self.session_id, timer_name, state)) - LOG.error('%s: project states' % self.session_id) - return False - def confirm_maintenance(self): allowed_actions = [] actions_at = self.session_data.maintenance_at @@ -236,8 +104,8 @@ class Workflow(BaseWorkflow): instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, self.session_id, project) - reply_at = self.reply_time_str(self.conf.project_maintenance_reply) - if self.is_time_after_time(reply_at, actions_at): + reply_at = reply_time_str(self.conf.project_maintenance_reply) + if is_time_after_time(reply_at, actions_at): raise Exception('%s: No time for project to' ' answer in state: %s' % (self.session_id, state)) @@ -250,7 +118,7 @@ class Workflow(BaseWorkflow): def confirm_scale_in(self): allowed_actions = [] - actions_at = self.reply_time_str(self.conf.project_scale_in_reply) + actions_at = reply_time_str(self.conf.project_scale_in_reply) reply_at = actions_at state = 'SCALE_IN' self.session_data.set_projets_state(state) @@ -351,7 +219,7 @@ class Workflow(BaseWorkflow): def confirm_host_to_be_emptied(self, host, state): allowed_actions = ['MIGRATE', 'LIVE_MIGRATE', 'OWN_ACTION'] - actions_at = self.reply_time_str(self.conf.project_maintenance_reply) + actions_at = reply_time_str(self.conf.project_maintenance_reply) reply_at = actions_at self.session_data.set_projects_state_and_host_instances(state, host) for project in self.session_data.project_names(): @@ -372,7 +240,7 @@ class Workflow(BaseWorkflow): def confirm_maintenance_complete(self): state = 'MAINTENANCE_COMPLETE' metadata = self.session_data.metadata - actions_at = self.reply_time_str(self.conf.project_scale_in_reply) + actions_at = reply_time_str(self.conf.project_scale_in_reply) reply_at = actions_at self.session_data.set_projets_state(state) for project in self.session_data.project_names(): @@ -527,11 +395,9 @@ class Workflow(BaseWorkflow): LOG.info('Empty host found') self.state = 'START_MAINTENANCE' - maint_at = self.str_to_datetime( - self.session_data.maintenance_at) + maint_at = str_to_datetime(self.session_data.maintenance_at) if maint_at > datetime.datetime.utcnow(): - time_now = (datetime.datetime.utcnow().strftime( - '%Y-%m-%d %H:%M:%S')) + time_now = time_now_str() LOG.info('Time now: %s maintenance starts: %s....' % (time_now, self.session_data.maintenance_at)) td = maint_at - datetime.datetime.utcnow() @@ -539,7 +405,7 @@ class Workflow(BaseWorkflow): while not self.is_timer_expired('MAINTENANCE_START_TIMEOUT'): time.sleep(1) - time_now = (datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')) + time_now = time_now_str() LOG.info('Time to start maintenance: %s' % time_now) def scale_in(self):