Add periodic task to sync up bay status

This patch adds periodic task `sync_bay_status` which will be used for syncing
bay's status from heat stack, we will pull bays which status in
[bay_status.CREATE_IN_PROGRESS,
 bay_status.UPDATE_IN_PROGRESS,
 bay_status.DELETE_IN_PROGRESS]
which are all in a temporary status, and try to sync up the status with heat's
stack.

status changes will be like this:

bay_status              stack_status           sync up bay_status to
----------              ------------           ---------------------
CREATE_IN_PROGRESS      CREATE_COMPLETE        CREATE_COMPLETE
UPDATE_IN_PROGRESS      UPDATE_COMPLETE        UPDATE_COMPLETE
DELETE_IN_PROGRESS      DELETE_COMPLETE        DELETE_COMPLETE

CREATE_IN_PROGRESS      CREATE_FAILED          CREATE_FAILED
UPDATE_IN_PROGRESS      UPDATE_FAILED          UPDATE_FAILED
DELETE_IN_PROGRESS      DELETE_FAILED          DELETE_FAILED

CREATE_IN_PROGRESS      Not Found              CREATE_FAILED
UPDATE_IN_PROGRESS      Not Found              UPDATE_FAILED
DELETE_IN_PROGRESS      Not Found              destroy

Partial-Implements: blueprint add-periodic-task
Co-Authored-By: ShaoHe Feng <shaohe.feng@intel.com>
Change-Id: Ie9cc4d3f03c7938a8d988010604da79c9b8a22fd
This commit is contained in:
Eli Qiao 2015-06-23 11:57:29 +08:00
parent b00a0c4b74
commit fd57cb4372
4 changed files with 233 additions and 1 deletions

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from eventlet.green import threading
from oslo_context import context from oslo_context import context
@ -87,3 +88,26 @@ def make_admin_context(show_deleted=False):
is_admin=True, is_admin=True,
show_deleted=show_deleted) show_deleted=show_deleted)
return context return context
_CTX_STORE = threading.local()
_CTX_KEY = 'current_ctx'
def has_ctx():
return hasattr(_CTX_STORE, _CTX_KEY)
def ctx():
return getattr(_CTX_STORE, _CTX_KEY)
def set_ctx(new_ctx):
if not new_ctx and has_ctx():
delattr(_CTX_STORE, _CTX_KEY)
if hasattr(context._request_store, 'context'):
delattr(context._request_store, 'context')
if new_ctx:
setattr(_CTX_STORE, _CTX_KEY, new_ctx)
setattr(context._request_store, 'context', new_ctx)

View File

@ -12,19 +12,101 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import functools
import six
from oslo_log import log from oslo_log import log
from oslo_service import periodic_task from oslo_service import periodic_task
from oslo_service import threadgroup from oslo_service import threadgroup
from magnum.common import clients
from magnum.common import context
from magnum.i18n import _LI
from magnum.i18n import _LW
from magnum import objects
from magnum.objects.bay import Status as bay_status
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
def set_context(func):
@functools.wraps(func)
def handler(self, ctx):
ctx = context.make_admin_context()
context.set_ctx(ctx)
func(self, ctx)
context.set_ctx(None)
return handler
class MagnumPeriodicTasks(periodic_task.PeriodicTasks): class MagnumPeriodicTasks(periodic_task.PeriodicTasks):
'''Magnum periodic Task class '''Magnum periodic Task class
Any periodic task job need to be added into this class Any periodic task job need to be added into this class
''' '''
pass
@periodic_task.periodic_task(run_immediately=True)
@set_context
def sync_bay_status(self, ctx):
try:
LOG.debug('Starting to sync up bay status')
osc = clients.OpenStackClients(ctx)
filters = [bay_status.CREATE_IN_PROGRESS,
bay_status.UPDATE_IN_PROGRESS,
bay_status.DELETE_IN_PROGRESS]
bays = objects.Bay.list_all(ctx, filters=filters)
if not bays:
return
sid_to_bay_mapping = {bay.stack_id: bay for bay in bays}
bay_stack_ids = sid_to_bay_mapping.keys()
stacks = osc.heat().stacks.list(global_tenant=True,
filters={'id': bay_stack_ids})
sid_to_stack_mapping = {s.id: s for s in stacks}
for sid in (six.viewkeys(sid_to_bay_mapping) &
six.viewkeys(sid_to_stack_mapping)):
stack = sid_to_stack_mapping[sid]
bay = sid_to_bay_mapping[sid]
if bay.status != stack.stack_status:
old_status = bay.status
bay.status = stack.stack_status
bay.save()
LOG.info(_LI("Sync up bay with id %(id)s from "
"%(old_status)s to %(status)s"),
{'id': bay.id, 'old_status': old_status,
'status': bay.status})
for sid in (six.viewkeys(sid_to_bay_mapping) -
six.viewkeys(sid_to_stack_mapping)):
bay = sid_to_bay_mapping[sid]
if bay.status == bay_status.DELETE_IN_PROGRESS:
bay.destroy()
LOG.info(_LI("Bay with id %(id)s has been deleted due "
"to stack with id %(sid)s not found in "
"heat."),
{'id': bay.id, 'sid': sid})
elif bay.status == bay_status.CREATE_IN_PROGRESS:
bay.status = bay_status.CREATE_FAILED
bay.save()
LOG.info(_LI("Bay with id %(id)s has been set to "
"%(status)s due to stack with id %(sid)s "
"not found in heat."),
{'id': bay.id, 'status': bay.status,
'sid': sid})
elif bay.status == bay_status.UPDATE_IN_PROGRESS:
bay.status = bay_status.UPDATE_FAILED
bay.save()
LOG.info(_LI("Bay with id %(id)s has been set to "
"%(status)s due to stack with id %(sid)s "
"not found in heat."),
{'id': bay.id, 'status': bay.status,
'sid': sid})
except Exception as e:
LOG.warn(_LW("Ignore error [%s] when syncing up bay status."), e,
exc_info=True)
def setup(conf): def setup(conf):

View File

View File

@ -0,0 +1,126 @@
# Copyright 2015 Intel, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from magnum.common import context
from magnum.common.rpc_service import CONF
from magnum.db.sqlalchemy import api as dbapi
from magnum import objects
from magnum.objects.bay import Status as bay_status
from magnum.service import periodic
from magnum.tests import base
from magnum.tests.unit.db import utils
periodic_opts = [
cfg.BoolOpt('periodic_enable',
default=True,
help='Enable periodic tasks.'),
cfg.IntOpt('periodic_interval_max',
default=60,
help='Max interval size between periodic tasks execution in '
'seconds.'),
]
ctx = context.make_admin_context()
bay1 = utils.get_test_bay(id=1, stack_id='11',
status=bay_status.CREATE_IN_PROGRESS)
bay2 = utils.get_test_bay(id=2, stack_id='22',
status=bay_status.DELETE_IN_PROGRESS)
bay3 = utils.get_test_bay(id=3, stack_id='33',
status=bay_status.UPDATE_IN_PROGRESS)
bay1 = objects.Bay(ctx, **bay1)
bay2 = objects.Bay(ctx, **bay2)
bay3 = objects.Bay(ctx, **bay3)
class fake_stack(object):
def __init__(self, **kw):
for key, val in kw.items():
setattr(self, key, val)
class PeriodictTestCase(base.TestCase):
@mock.patch.object(objects.Bay, 'list_all')
@mock.patch('magnum.common.clients.OpenStackClients')
@mock.patch.object(dbapi.Connection, 'destroy_bay')
@mock.patch.object(dbapi.Connection, 'update_bay')
def test_sync_bay_status_changes(self, mock_db_update, mock_db_destroy,
mock_oscc, mock_bay_list):
mock_heat_client = mock.MagicMock()
stack1 = fake_stack(id='11', stack_status=bay_status.CREATE_COMPLETE)
stack3 = fake_stack(id='33', stack_status=bay_status.UPDATE_COMPLETE)
mock_heat_client.stacks.list.return_value = [stack1, stack3]
mock_osc = mock_oscc.return_value
mock_osc.heat.return_value = mock_heat_client
mock_bay_list.return_value = [bay1, bay2, bay3]
mock_keystone_client = mock.MagicMock()
mock_keystone_client.client.project_id = "fake_project"
mock_osc.keystone.return_value = mock_keystone_client
periodic.MagnumPeriodicTasks(CONF).sync_bay_status(None)
self.assertEqual(bay1.status, bay_status.CREATE_COMPLETE)
mock_db_destroy.assert_called_once_with(bay2.uuid)
self.assertEqual(bay3.status, bay_status.UPDATE_COMPLETE)
@mock.patch.object(objects.Bay, 'list_all')
@mock.patch('magnum.common.clients.OpenStackClients')
def test_sync_bay_status_not_changes(self, mock_oscc, mock_bay_list):
mock_heat_client = mock.MagicMock()
stack1 = fake_stack(id='11',
stack_status=bay_status.CREATE_IN_PROGRESS)
stack2 = fake_stack(id='22',
stack_status=bay_status.DELETE_IN_PROGRESS)
stack3 = fake_stack(id='33',
stack_status=bay_status.UPDATE_IN_PROGRESS)
mock_heat_client.stacks.list.return_value = [stack1, stack2, stack3]
mock_osc = mock_oscc.return_value
mock_osc.heat.return_value = mock_heat_client
mock_bay_list.return_value = [bay1, bay2, bay3]
periodic.MagnumPeriodicTasks(CONF).sync_bay_status(None)
self.assertEqual(bay1.status, bay_status.CREATE_IN_PROGRESS)
self.assertEqual(bay2.status, bay_status.DELETE_IN_PROGRESS)
self.assertEqual(bay3.status, bay_status.UPDATE_IN_PROGRESS)
@mock.patch.object(objects.Bay, 'list_all')
@mock.patch('magnum.common.clients.OpenStackClients')
@mock.patch.object(dbapi.Connection, 'destroy_bay')
@mock.patch.object(dbapi.Connection, 'update_bay')
def test_sync_bay_status_heat_not_found(self, mock_db_update,
mock_db_destroy, mock_oscc,
mock_bay_list):
mock_heat_client = mock.MagicMock()
mock_heat_client.stacks.list.return_value = []
mock_osc = mock_oscc.return_value
mock_osc.heat.return_value = mock_heat_client
mock_bay_list.return_value = [bay1, bay2, bay3]
mock_keystone_client = mock.MagicMock()
mock_keystone_client.client.project_id = "fake_project"
mock_osc.keystone.return_value = mock_keystone_client
periodic.MagnumPeriodicTasks(CONF).sync_bay_status(None)
self.assertEqual(bay1.status, bay_status.CREATE_FAILED)
mock_db_destroy.assert_called_once_with(bay2.uuid)
self.assertEqual(bay3.status, bay_status.UPDATE_FAILED)