WIP version for Yanyan to continue work on

This commit is contained in:
tengqm 2014-12-30 12:08:56 +08:00
parent c54f39a9c2
commit 8d58688638
4 changed files with 167 additions and 112 deletions

View File

@ -35,7 +35,7 @@ class Cluster(object):
'INIT', 'ACTIVE', 'ERROR', 'DELETED', 'UPDATING',
)
def __init__(self, name, profile, size=0, **kwargs):
def __init__(self, name, profile_id, size=0, **kwargs):
'''
Intialize a cluster object.
The cluster defaults to have 0 nodes with no profile assigned.
@ -43,23 +43,26 @@ class Cluster(object):
self.name = name
self.profile_id = profile_id
self.user = kwargs.get('user')
self.project = kwargs.get('project')
self.domain = kwargs.get('domain')
self.user = kwargs.get('user', '')
self.project = kwargs.get('project', '')
self.domain = kwargs.get('domain', '')
self.parent = kwargs.get('parent')
self.created_time = None
self.updated_time = None
self.deleted_time = None
# node_count is only the 'current count', not the desired count
# (i.e. size)
self.desired_size = size
self.node_count = 0
self.next_index = 0
self.timeout = 0
self.timeout = kwargs.get('timeout', 0)
self.status = ''
self.status_reason = ''
self.status = self.INIT
self.status_reason = 'Initializing'
self.data = {}
self.tags = {}
self.tags = kwargs.get('tags', {})
# persist object into database very early because:
# 1. object creation may be a time consuming task
@ -68,12 +71,21 @@ class Cluster(object):
db_api.create_cluster(self)
# rt is a dict for runtime data
self.rt = dict(size=size,
nodes={},
self.rt = dict(nodes={},
policies={})
def _set_status(self, context, status):
pass
now = datetime.datetime.utcnow()
if status == self.ACTIVE:
if self.status == self.INIT:
kwargs['created_time'] = now
else:
kwargs['updated_time'] = now
elif status == self.DELETED:
kwargs['deleted_time'] = now
kwargs['status'] = status
db_api.cluster_update(self.context, {'status': status})
#event.info(context, self.id, status,
# log status to log file
# generate event record
@ -109,7 +121,7 @@ class Cluster(object):
node_list = self.get_nodes()
for n in node_list:
node = nodes.Node(None, profile_id, cluster_id)
node = nodes.Node(None, profile.id, cluster_id)
action = actions.NodeAction(context, node, 'UPDATE', **kwargs)
# start a thread asynchronously
@ -190,7 +202,7 @@ class Cluster(object):
message = _('No cluster exists with id "%s"') % str(cluster_id)
raise exception.NotFound(message)
return cls._from_db(context, cluster)
return cls.from_db(context, cluster)
@classmethod
def load_all(cls, context, limit=None, marker=None, sort_keys=None,
@ -200,29 +212,28 @@ class Cluster(object):
sort_dir, filters, tenant_safe,
show_deleted, show_nested) or []
for cluster in clusters:
yield cls._from_db(context, cluster)
yield cls.from_db(context, cluster)
@classmethod
def _from_db(cls, context, cluster):
# TODO: calculate current size based on nodes
size = self.size
return cls(context, cluster.name, cluster.profile, size,
def from_db(cls, context, cluster):
# TODO(Qiming): calculate current size based on nodes
return cls(context, cluster.name, cluster.profile,
id=cluster.id, status=cluster.status,
status_reason=cluster_status_reason,
status_reason=cluster.status_reason,
parent=cluster.parent,
project=cluster.project,
created_time=cluster.created_time,
updated_time=cluster.updated_time,
deleted_time=cluster.deleted_time,
domain = cluster.domain,
timeout = cluster.timeout,
domain=cluster.domain,
timeout=cluster.timeout,
user=cluster.user)
def to_dict(self):
info = {
rpc_api.CLUSTER_NAME: self.name,
rpc_api.CLUSTER_PROFILE: self.profile,
rpc_api.CLUSTER_SIZE: self.size,
rpc_api.CLUSTER_SIZE: self.node_count,
rpc_api.CLUSTER_UUID: self.id,
rpc_api.CLUSTER_PARENT: self.parent,
rpc_api.CLUSTER_DOMAIN: self.domain,
@ -235,5 +246,5 @@ class Cluster(object):
rpc_api.CLUSTER_STATUS_REASON: self.status_reason,
rpc_api.CLUSTER_TIMEOUT: self.timeout,
}
return info

View File

@ -26,20 +26,21 @@ class Node(object):
'''
statuses = (
ACTIVE, ERROR, DELETED, UPDATING,
INIT, ACTIVE, ERROR, DELETED, UPDATING,
) = (
'ACTIVE', 'ERROR', 'DELETED', 'UPDATING',
'INITIALIZING', 'ACTIVE', 'ERROR', 'DELETED', 'UPDATING',
)
def __init__(self, name, profile_id, cluster_id=None, **kwargs):
self.id = None
if name:
self.name = name
else:
# TODO
# Using self.physical_resource_name() to generate a unique name
self.name = 'node-name-tmp'
self.physical_id = None
self.cluster_id = cluster_id
self.physical_id = ''
self.cluster_id = cluster_id or ''
self.profile_id = profile_id
if cluster_id is None:
self.index = -1
@ -51,11 +52,41 @@ class Node(object):
self.updated_time = None
self.deleted_time = None
self.status = self.ACTIVE
self.status_reason = 'Initialized'
self.status = self.INIT
self.status_reason = 'Initializing'
self.data = {}
self.tags = {}
# TODO: store this to database
self.store()
def store(self):
'''
Store the node record into database table.
The invocation of DB API could be a node_create or a node_update,
depending on whether node has an ID assigned.
'''
values = {
'name': self.name,
'physical_id': self.physical_id,
'cluster_id': self.cluster_id,
'profile_id': self.profile_id,
'index': self.index,
'role': self.role,
'created_time': self.created_time,
'updated_time': self.updated_time,
'deleted_time': self.deleted_time,
'status': self.status,
'status_reason': self.status_reason,
'data': self.data,
'tags': self.tags,
}
if self.id:
db_api.node_update(self.context, self.id, values)
else:
node = db_api.node_create(self.context, values)
self.id = node.id
def create(self, name, profile_id, cluster_id=None, **kwargs):
# TODO: invoke profile to create new object and get the physical id
@ -65,6 +96,8 @@ class Node(object):
def delete(self):
node = db_api.get_node(self.id)
physical_id = node.physical_id
# TODO: invoke profile to delete this object
# TODO: check if actions are working on it and can be canceled

View File

@ -58,7 +58,7 @@ class EngineListener(service.Service):
def __init__(self, host, engine_id, thread_group_mgr):
super(EngineListener, self).__init__()
self.thread_group_mgr = thread_group_mgr
self.TG = thread_group_mgr
self.engine_id = engine_id
self.host = host
@ -78,10 +78,10 @@ class EngineListener(service.Service):
def stop_cluster(self, ctxt, cluster_id):
'''Stop any active threads on a cluster.'''
self.thread_group_mgr.stop(cluster_id)
self.TG.stop(cluster_id)
def send(self, ctxt, cluster_id, message):
self.thread_group_mgr.send(cluster_id, message)
self.TG.send(cluster_id, message)
@profiler.trace_cls("rpc")
@ -100,20 +100,21 @@ class EngineService(service.Service):
def __init__(self, host, topic, manager=None):
super(EngineService, self).__init__()
# TODO(Qiming): call environment.initialize() when environment
# is ready
self.host = host
self.topic = topic
# The following are initialized here, but assigned in start() which
# happens after the fork when spawning multiple worker processes
self.engine_id = None
self.thread_group_mgr = None
self.TG= None
self.target = None
def start(self):
self.engine_id = senlin_lock.BaseLock.generate_engine_id()
self.thread_group_mgr = ThreadGroupManager()
self.listener = EngineListener(self.host, self.engine_id,
self.thread_group_mgr)
self.TG = ThreadGroupManager()
self.listener = EngineListener(self.host, self.engine_id, self.TG)
LOG.debug("Starting listener for engine %s" % self.engine_id)
self.listener.start()
@ -135,14 +136,14 @@ class EngineService(service.Service):
pass
# Wait for all active threads to be finished
for cluster_id in self.thread_group_mgr.groups.keys():
for cluster_id in self.TG.groups.keys():
# Ignore dummy service task
if cluster_id == cfg.CONF.periodic_interval:
continue
LOG.info(_LI("Waiting cluster %s processing to be finished"),
cluster_id)
# Stop threads gracefully
self.thread_group_mgr.stop(cluster_id, True)
self.TG.stop(cluster_id, True)
LOG.info(_LI("cluster %s processing was finished"), cluster_id)
# Terminate the engine process
@ -245,39 +246,29 @@ class EngineService(service.Service):
return {'clusters': clusters_info}
@request_context
def create_cluster(self, cnxt, cluster_name, size, profile,
owner_id=None, nested_depth=0, user_creds_id=None,
cluster_user_project_id=None):
"""
def create_cluster(self, cntxt, name, profile_id, size, args):
'''
Handle request to perform a create action on a cluster
:param cnxt: RPC context.
:param cluster_name: Name of the cluster you want to create.
:param size: Size of cluster you want to create.
:param profile: Profile used to create cluster nodes.
:param owner_id: parent cluster ID for nested clusters, only
expected when called from another senlin-engine
(not a user option)
:param nested_depth: the nested depth for nested clusters, only
expected when called from another senlin-engine
:param user_creds_id: the parent user_creds record for nested clusters
:param cluster_user_project_id: the parent cluster_user_project_id for
nested clusters
"""
LOG.info(_LI('Creating cluster %s'), cluster_name)
:param cntxt: RPC context.
:param name: Name of the cluster to created.
:param profile_id: Profile used to create cluster nodes.
:param size: Desired size of cluster to be created.
:param args: A dictionary of other parameters
'''
LOG.info(_LI('Creating cluster %s'), name)
# TODO: construct real kwargs based on input for cluster creating
kwargs = {}
kwargs['owner_id'] = owner_id
kwargs['nested_depth'] = nested_depth
kwargs['user_creds_id'] = user_creds_id
kwargs['cluster_user_project_id'] = cluster_user_project_id
kwargs = {
'parent': args.get('parent', ''),
'user': cntxt.get('username', ''),
'project': cntxt.get('tenant_id', ''),
'timeout': args.get('timeout', 0),
'tags': args.get('tags', {}),
}
cluster = clusters.Cluster(cluster_name, size, profile, **kwargs)
action = actions.ClusterAction(cnxt, cluster, 'CREATE', **kwargs)
self.thread_group_mgr.start_with_lock(cnxt, cluster, 'cluster',
self.engine_id, action.execute)
cluster = clusters.Cluster(name, profile_id, size, **kwargs)
action = actions.Action(cnxt, cluster, 'CREATE', **kwargs)
self.TG.start_action_woker(action, self.engine_id)
return cluster.id
@ -304,12 +295,12 @@ class EngineService(service.Service):
msg = _('Updating a cluster which has been deleted')
raise exception.NotSupported(feature=msg)
kwargs = {}
kwargs['profile'] = profile
action = actions.ClusterAction(cnxt, cluster, 'UPDATE', **kwargs)
kwargs = {
'profile_id': profile_id
}
self.thread_group_mgr.start_with_lock(cnxt, cluster, 'cluster',
self.engine_id, action.execute)
action = actions.Action(cnxt, cluster, 'UPDATE', **kwargs)
self.TG.start_action_worker(action, self.engine_id)
return cluster.id
@ -332,9 +323,9 @@ class EngineService(service.Service):
# Successfully acquired lock
if acquire_result is None:
self.thread_group_mgr.stop_timers(cluster.id)
self.TG.stop_timers(cluster.id)
action = actions.ClusterAction(cnxt, cluster, 'DELETE')
self.thread_group_mgr.start_with_acquired_lock(cluster, lock,
self.TG.start_with_acquired_lock(cluster, lock,
action.execute)
return
@ -343,7 +334,7 @@ class EngineService(service.Service):
# give threads which are almost complete an opportunity to
# finish naturally before force stopping them
eventlet.sleep(0.2)
self.thread_group_mgr.stop(cluster.id)
self.TG.stop(cluster.id)
# Another active engine has the lock
elif senlin_lock.ClusterLock.engine_alive(cnxt, acquire_result):
stop_result = self._remote_call(
@ -363,7 +354,7 @@ class EngineService(service.Service):
cluster = clusters.Cluster.load(cnxt, cluster=db_cluster)
action = actions.ClusterAction(cnxt, cluster, 'DELETE')
self.thread_group_mgr.start_with_lock(cnxt, cluster, 'cluster',
self.TG.start_with_lock(cnxt, cluster, 'cluster',
self.engine_id, action.execute)
return None
@ -380,7 +371,7 @@ class EngineService(service.Service):
cluster = clusters.Cluster.load(cnxt, cluster=db_cluster)
action = actions.ClusterAction(cnxt, cluster, 'SUSPEND')
self.thread_group_mgr.start_with_lock(cnxt, cluster, 'cluster',
self.TG.start_with_lock(cnxt, cluster, 'cluster',
self.engine_id, action.execute)
@request_context
@ -394,7 +385,7 @@ class EngineService(service.Service):
cluster = clusters.Cluster.load(cnxt, cluster=db_cluster)
action = actions.ClusterAction(cnxt, cluster, 'RESUME')
self.thread_group_mgr.start_with_lock(cnxt, cluster, 'cluster',
self.TG.start_with_lock(cnxt, cluster, 'cluster',
self.engine_id, action.execute)
def _remote_call(self, cnxt, lock_engine_id, call, *args, **kwargs):

View File

@ -1,15 +1,14 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import os
@ -40,42 +39,26 @@ class ThreadGroupManager(object):
# Create dummy service task, because when there is nothing queued
# on self.tg the process exits
self.add_timer(cfg.CONF.periodic_interval, self._service_task)
def _service_task(self):
"""
'''
This is a dummy task which gets queued on the service.Service
threadgroup. Without this service.Service sees nothing running
i.e has nothing to wait() on, so the process exits..
This could also be used to trigger periodic non-cluster-specific
housekeeping tasks
"""
'''
# TODO(Yanyan): have this task call dbapi purge events
pass
def _serialize_profile_info(self):
prof = profiler.get()
trace_info = None
if prof:
trace_info = {
"hmac_key": prof.hmac_key,
"base_id": prof.get_base_id(),
"parent_id": prof.get_id()
}
return trace_info
def _start_with_trace(self, trace, func, *args, **kwargs):
if trace:
profiler.init(**trace)
return func(*args, **kwargs)
def start(self, target_id, func, *args, **kwargs):
"""
Run the given method in a sub-thread.
"""
if target_id not in self.groups:
self.groups[target_id] = threadgroup.ThreadGroup()
return self.groups[target_id].add_thread(self._start_with_trace,
self._serialize_profile_info(),
func, *args, **kwargs)
return self.groups[target_id].add_thread(func, *args, **kwargs)
def start_with_lock(self, cnxt, target, target_type, engine_id,
func, *args, **kwargs):
@ -99,6 +82,7 @@ class ThreadGroupManager(object):
lock = senlin_lock.ClusterLock(cnxt, target, engine_id)
elif target_type == 'node':
lock = senlin_lock.NodeLock(cnxt, target, engine_id)
with lock.thread_lock(target.id):
th = self.start_with_acquired_lock(target, lock,
func, *args, **kwargs)
@ -168,9 +152,45 @@ class ThreadGroupManager(object):
for th in threads:
th.link(mark_done, th)
while not all(links_done.values()):
eventlet.sleep()
def send(self, target_id, message):
for event in self.events.pop(target_id, []):
event.send(message)
def action_proc(self, action, worker_id):
'''
Thread procedure.
'''
status = action.get_status()
while status in (action.INIT, action.WAITING):
# TODO(Qiming): Handle 'start_time' field of an action
yield
status = action.get_status()
# Exit quickly if action has been taken care of or marked
# completed or cancelled by other activities
if status != action.READY:
return
done = False
while not done:
# Take over the action
action.set_status(action.RUNNING)
result = action.execute()
if result == action.OK:
action.set_status(action.SUCCEEDED)
done = True
elif result == action.ERROR:
action.set_status(action.FAILED)
done = True
elif result == action.RETRY:
continue
def start_action_worker(self, action, engine_id):
self.start(self.action_proc, engine_id)