From a63bf0bccb70d54012173604bc6fbf7d07469b26 Mon Sep 17 00:00:00 2001 From: tengqm Date: Thu, 18 Dec 2014 15:29:51 +0800 Subject: [PATCH] Initial version This tool helps drop database so that it can be recreated. --- senlin/engine/service.py | 148 +----------------------------- senlin/engine/thread_mgr.py | 176 ++++++++++++++++++++++++++++++++++++ tools/senlin-db-setup | 14 +++ 3 files changed, 191 insertions(+), 147 deletions(-) create mode 100644 senlin/engine/thread_mgr.py create mode 100755 tools/senlin-db-setup diff --git a/senlin/engine/service.py b/senlin/engine/service.py index 19c034ffd..726713348 100644 --- a/senlin/engine/service.py +++ b/senlin/engine/service.py @@ -40,9 +40,9 @@ from senlin.engine import action from senlin.engine import cluster from senlin.engine import node from senlin.engine import senlin_lock +from senlin.engine.thread_mgr import ThreadGroupManager from senlin.openstack.common import log as logging from senlin.openstack.common import service -from senlin.openstack.common import threadgroup from senlin.openstack.common import uuidutils from senlin.rpc import api as rpc_api @@ -61,152 +61,6 @@ def request_context(func): return wrapped -class ThreadGroupManager(object): - - def __init__(self): - super(ThreadGroupManager, self).__init__() - self.groups = {} - self.events = collections.defaultdict(list) - - # Create dummy service task, because when there is nothing queued - # on self.tg the process exits - self.add_timer(cfg.CONF.periodic_interval, self._service_task) - - def _service_task(self): - """ - This is a dummy task which gets queued on the service.Service - threadgroup. Without this service.Service sees nothing running - i.e has nothing to wait() on, so the process exits.. - This could also be used to trigger periodic non-cluster-specific - housekeeping tasks - """ - pass - - def _serialize_profile_info(self): - prof = profiler.get() - trace_info = None - if prof: - trace_info = { - "hmac_key": prof.hmac_key, - "base_id": prof.get_base_id(), - "parent_id": prof.get_id() - } - return trace_info - - def _start_with_trace(self, trace, func, *args, **kwargs): - if trace: - profiler.init(**trace) - return func(*args, **kwargs) - - def start(self, target_id, func, *args, **kwargs): - """ - Run the given method in a sub-thread. - """ - if target_id not in self.groups: - self.groups[target_id] = threadgroup.ThreadGroup() - return self.groups[target_id].add_thread(self._start_with_trace, - self._serialize_profile_info(), - func, *args, **kwargs) - - def start_with_lock(self, cnxt, target, target_type, engine_id, - func, *args, **kwargs): - """ - Try to acquire a lock for operated target and, if successful, - run the given method in a sub-thread. Release the lock when - the thread finishes. - - :param cnxt: RPC context - :param target: Target to be operated on - :param target_type: Type of operated target, e.g. cluster, node, etc. - :param engine_id: The ID of the engine/worker acquiring the lock - :param func: Callable to be invoked in sub-thread - :type func: function or instancemethod - :param args: Args to be passed to func - :param kwargs: Keyword-args to be passed to func. - """ - # TODO: add more target_type support - # TODO: Reimplement this using __new__ method - if target_type == 'cluster': - lock = senlin_lock.ClusterLock(cnxt, target, engine_id) - elif target_type == 'node': - lock = senlin_lock.NodeLock(cnxt, target, engine_id) - with lock.thread_lock(target.id): - th = self.start_with_acquired_lock(target, lock, - func, *args, **kwargs) - return th - - def start_with_acquired_lock(self, target, lock, func, *args, **kwargs): - """ - Run the given method in a sub-thread and release the provided lock - when the thread finishes. - - :param target: Target to be operated on - :type target: senlin.engine.parser.Stack - :param lock: The acquired target lock - :param func: Callable to be invoked in sub-thread - :type func: function or instancemethod - :param args: Args to be passed to func - :param kwargs: Keyword-args to be passed to func - - """ - def release(gt, *args): - """ - Callback function that will be passed to GreenThread.link(). - """ - lock.release(*args) - - th = self.start(target.id, func, *args, **kwargs) - th.link(release, target.id) - return th - - def add_timer(self, target_id, func, *args, **kwargs): - """ - Define a periodic task, to be run in a separate thread, in the target - threadgroups. Periodicity is cfg.CONF.periodic_interval - """ - if target_id not in self.groups: - self.groups[target_id] = threadgroup.ThreadGroup() - self.groups[target_id].add_timer(cfg.CONF.periodic_interval, - func, *args, **kwargs) - - def add_event(self, target_id, event): - self.events[target_id].append(event) - - def remove_event(self, gt, target_id, event): - for e in self.events.pop(target_id, []): - if e is not event: - self.add_event(target_id, e) - - def stop_timers(self, target_id): - if target_id in self.groups: - self.groups[target_id].stop_timers() - - def stop(self, target_id, graceful=False): - '''Stop any active threads on a target.''' - if target_id in self.groups: - self.events.pop(target_id, None) - threadgroup = self.groups.pop(target_id) - threads = threadgroup.threads[:] - - threadgroup.stop(graceful) - threadgroup.wait() - - # Wait for link()ed functions (i.e. lock release) - links_done = dict((th, False) for th in threads) - - def mark_done(gt, th): - links_done[th] = True - - for th in threads: - th.link(mark_done, th) - while not all(links_done.values()): - eventlet.sleep() - - def send(self, target_id, message): - for event in self.events.pop(target_id, []): - event.send(message) - - @profiler.trace_cls("rpc") class EngineListener(service.Service): ''' diff --git a/senlin/engine/thread_mgr.py b/senlin/engine/thread_mgr.py new file mode 100644 index 000000000..ac3f4f536 --- /dev/null +++ b/senlin/engine/thread_mgr.py @@ -0,0 +1,176 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import os + +import eventlet +from oslo.config import cfg +from osprofiler import profiler + +from senlin.common import exception +from senlin.common.i18n import _ +from senlin.common.i18n import _LE +from senlin.common.i18n import _LI +from senlin.common.i18n import _LW +from senlin.engine import senlin_lock +from senlin.openstack.common import log as logging +from senlin.openstack.common import threadgroup + +LOG = logging.getLogger(__name__) + + +class ThreadGroupManager(object): + + def __init__(self): + super(ThreadGroupManager, self).__init__() + self.groups = {} + self.events = collections.defaultdict(list) + + # Create dummy service task, because when there is nothing queued + # on self.tg the process exits + self.add_timer(cfg.CONF.periodic_interval, self._service_task) + + def _service_task(self): + """ + This is a dummy task which gets queued on the service.Service + threadgroup. Without this service.Service sees nothing running + i.e has nothing to wait() on, so the process exits.. + This could also be used to trigger periodic non-cluster-specific + housekeeping tasks + """ + pass + + def _serialize_profile_info(self): + prof = profiler.get() + trace_info = None + if prof: + trace_info = { + "hmac_key": prof.hmac_key, + "base_id": prof.get_base_id(), + "parent_id": prof.get_id() + } + return trace_info + + def _start_with_trace(self, trace, func, *args, **kwargs): + if trace: + profiler.init(**trace) + return func(*args, **kwargs) + + def start(self, target_id, func, *args, **kwargs): + """ + Run the given method in a sub-thread. + """ + if target_id not in self.groups: + self.groups[target_id] = threadgroup.ThreadGroup() + return self.groups[target_id].add_thread(self._start_with_trace, + self._serialize_profile_info(), + func, *args, **kwargs) + + def start_with_lock(self, cnxt, target, target_type, engine_id, + func, *args, **kwargs): + """ + Try to acquire a lock for operated target and, if successful, + run the given method in a sub-thread. Release the lock when + the thread finishes. + + :param cnxt: RPC context + :param target: Target to be operated on + :param target_type: Type of operated target, e.g. cluster, node, etc. + :param engine_id: The ID of the engine/worker acquiring the lock + :param func: Callable to be invoked in sub-thread + :type func: function or instancemethod + :param args: Args to be passed to func + :param kwargs: Keyword-args to be passed to func. + """ + # TODO: add more target_type support + # TODO: Reimplement this using __new__ method + if target_type == 'cluster': + lock = senlin_lock.ClusterLock(cnxt, target, engine_id) + elif target_type == 'node': + lock = senlin_lock.NodeLock(cnxt, target, engine_id) + with lock.thread_lock(target.id): + th = self.start_with_acquired_lock(target, lock, + func, *args, **kwargs) + return th + + def start_with_acquired_lock(self, target, lock, func, *args, **kwargs): + """ + Run the given method in a sub-thread and release the provided lock + when the thread finishes. + + :param target: Target to be operated on + :type target: senlin.engine.parser.Stack + :param lock: The acquired target lock + :param func: Callable to be invoked in sub-thread + :type func: function or instancemethod + :param args: Args to be passed to func + :param kwargs: Keyword-args to be passed to func + + """ + def release(gt, *args): + """ + Callback function that will be passed to GreenThread.link(). + """ + lock.release(*args) + + th = self.start(target.id, func, *args, **kwargs) + th.link(release, target.id) + return th + + def add_timer(self, target_id, func, *args, **kwargs): + """ + Define a periodic task, to be run in a separate thread, in the target + threadgroups. Periodicity is cfg.CONF.periodic_interval + """ + if target_id not in self.groups: + self.groups[target_id] = threadgroup.ThreadGroup() + self.groups[target_id].add_timer(cfg.CONF.periodic_interval, + func, *args, **kwargs) + + def add_event(self, target_id, event): + self.events[target_id].append(event) + + def remove_event(self, gt, target_id, event): + for e in self.events.pop(target_id, []): + if e is not event: + self.add_event(target_id, e) + + def stop_timers(self, target_id): + if target_id in self.groups: + self.groups[target_id].stop_timers() + + def stop(self, target_id, graceful=False): + '''Stop any active threads on a target.''' + if target_id in self.groups: + self.events.pop(target_id, None) + threadgroup = self.groups.pop(target_id) + threads = threadgroup.threads[:] + + threadgroup.stop(graceful) + threadgroup.wait() + + # Wait for link()ed functions (i.e. lock release) + links_done = dict((th, False) for th in threads) + + def mark_done(gt, th): + links_done[th] = True + + for th in threads: + th.link(mark_done, th) + while not all(links_done.values()): + eventlet.sleep() + + def send(self, target_id, message): + for event in self.events.pop(target_id, []): + event.send(message) diff --git a/tools/senlin-db-setup b/tools/senlin-db-setup new file mode 100755 index 000000000..069abcb26 --- /dev/null +++ b/tools/senlin-db-setup @@ -0,0 +1,14 @@ +#!/bin/bash + +MYSQL_ROOT_PW=cheerman +MYSQL_SENLIN_PW=cheerman + +echo "Creating 'senlin' database." +cat << EOF | mysql -u root --password=${MYSQL_ROOT_PW} +DROP DATABASE senlin; +CREATE DATABASE senlin DEFAULT CHARACTER SET utf8; +GRANT ALL ON senlin.* TO 'root'@'localhost' IDENTIFIED BY '${MYSQL_SENLIN_PW}'; +GRANT ALL ON senlin.* TO 'root'@'%' IDENTIFIED BY '${MYSQL_SENLIN_PW}'; +flush privileges; +EOF +