Refactor evoque engine

see oslo usage and magnum

Change-Id: I0d78e2ef4fc028cd102fe344000572ab07324ec0
This commit is contained in:
lawrancejing 2015-11-09 07:10:34 +00:00
parent 79434310b2
commit f664668cf0
20 changed files with 440 additions and 745 deletions

View File

@ -15,24 +15,19 @@ from pecan import rest
from oslo_log import log
from evoque.rpc import client as rpc_client
LOG = log.getLogger(__name__)
class TicketController(rest.RestController):
def __init__(self):
self.rpc_client = rpc_client.EngineClient()
@pecan.expose('json')
def get(self):
return {"version": "1.0.0"}
@pecan.expose('json')
def post(self, **kwargs):
ticket = self.rpc_client.ticket_create(
{"fake": 'fake'}, kwargs['name'])
ticket = pecan.request.rpcapi.ticket_create(
name=kwargs['name'])
return ticket

View File

@ -15,6 +15,7 @@ import pecan
import webob.exc
from werkzeug import serving
from evoque.api import hooks
from evoque import exceptions
from evoque import service
@ -25,6 +26,10 @@ PECAN_CONFIG = {
'app': {
'root': 'evoque.api.RootController',
'modules': ['evoque.api'],
'hooks': [
hooks.ContextHook(),
hooks.RPCHook(),
],
},
}
@ -56,6 +61,7 @@ def setup_app(config=PECAN_CONFIG, cfg=None):
app = pecan.make_app(
config['app']['root'],
debug=pecan_debug,
hooks=config['app']['hooks'],
guess_content_type_from_ext=False,
)

71
evoque/api/hooks.py Normal file
View File

@ -0,0 +1,71 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pecan import hooks
from evoque.common import context
from evoque.engine.ticket import api as ticket_api
class ContextHook(hooks.PecanHook):
"""Configures a request context and attaches it to the request.
The following HTTP request headers are used:
X-User-Name:
Used for context.user_name.
X-User-Id:
Used for context.user_id.
X-Project-Name:
Used for context.project.
X-Project-Id:
Used for context.project_id.
X-Auth-Token:
Used for context.auth_token.
X-Roles:
Used for context.roles.
"""
def before(self, state):
headers = state.request.headers
user_name = headers.get('X-User-Name')
user_id = headers.get('X-User-Id')
project = headers.get('X-Project-Name')
project_id = headers.get('X-Project-Id')
domain_id = headers.get('X-User-Domain-Id')
domain_name = headers.get('X-User-Domain-Name')
auth_token = headers.get('X-Auth-Token')
roles = headers.get('X-Roles', '').split(',')
auth_token_info = state.request.environ.get('keystone.token_info')
state.request.context = context.make_context(
auth_token=auth_token,
auth_token_info=auth_token_info,
user_name=user_name,
user_id=user_id,
project_name=project,
project_id=project_id,
domain_id=domain_id,
domain_name=domain_name,
roles=roles)
class RPCHook(hooks.PecanHook):
"""Attach the rpcapi object to the request so controllers can get to it."""
def before(self, state):
state.request.rpcapi = ticket_api.API(context=state.request.context)

View File

@ -11,10 +11,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from evoque.common import messaging
from evoque.api import app
def main():
messaging.setup()
from evoque.api import app
app.build_server()

View File

@ -1,68 +1,45 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Evoque Engine Server.
"""
import eventlet
eventlet.monkey_patch()
"""Evoque engine service."""
import os
import sys
# If ../evoque/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'evoque', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
import uuid
from oslo_config import cfg
from oslo_i18n import _lazy
from oslo_log import log as logging
from oslo_service import service
from evoque.common import consts
from evoque.common import messaging
from evoque import opts
from evoque.common import rpc_service
from evoque.engine.ticket import endpoint as ticket_endpoint
from evoque.i18n import _LI
from evoque import service as evoque_service
_lazy.enable_lazy()
LOG = logging.getLogger('evoque.engine')
LOG = logging.getLogger(__name__)
def main():
conf = cfg.ConfigOpts()
evoque_service.prepare_service()
# Register Evoque options
for group, options in opts.list_opts():
conf.register_opts(list(options),
group=None if group == "DEFAULT" else group)
LOG.info(_LI('Starting evoque engine in PID %s') % os.getpid())
logging.register_options(cfg.CONF)
cfg.CONF(project='evoque', prog='evoque-engine')
logging.setup(cfg.CONF, 'evoque-engine')
logging.set_defaults()
messaging.setup()
conductor_id = str(uuid.uuid4())
from evoque.engine import service as engine
endpoints = [
ticket_endpoint.Handler(),
]
srv = engine.EngineService(conf.host, consts.ENGINE_TOPIC)
launcher = service.launch(cfg.CONF, srv,
workers=1)
# the following periodic tasks are intended serve as HA checking
# srv.create_periodic_tasks()
server = rpc_service.Service.create("evoque-engine",
conductor_id, endpoints,
binary='evoque-engine')
launcher = service.launch(cfg.CONF, server)
launcher.wait()

View File

@ -1,85 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
RPC_ATTRS = (
ENGINE_TOPIC,
ENGINE_DISPATCHER_TOPIC,
ENGINE_HEALTH_MGR_TOPIC,
RPC_API_VERSION,
) = (
'evoque-engine',
'engine-dispatcher',
'engine-health_mgr',
'1.0',
)
RPC_PARAMS = (
PARAM_SHOW_DELETED, PARAM_SHOW_NESTED, PARAM_LIMIT, PARAM_MARKER,
PARAM_GLOBAL_PROJECT, PARAM_SHOW_DETAILS,
PARAM_SORT_DIR, PARAM_SORT_KEYS,
) = (
'show_deleted', 'show_nested', 'limit', 'marker',
'global_project', 'show_details',
'sort_dir', 'sort_keys',
)
ACTION_NAMES = (
CLUSTER_CREATE, CLUSTER_DELETE, CLUSTER_UPDATE,
CLUSTER_ADD_NODES, CLUSTER_DEL_NODES, CLUSTER_RESIZE,
CLUSTER_SCALE_OUT, CLUSTER_SCALE_IN,
CLUSTER_ATTACH_POLICY, CLUSTER_DETACH_POLICY, CLUSTER_UPDATE_POLICY,
NODE_CREATE, NODE_DELETE, NODE_UPDATE,
NODE_JOIN, NODE_LEAVE,
POLICY_ENABLE, POLICY_DISABLE, POLICY_UPDATE,
) = (
'CLUSTER_CREATE', 'CLUSTER_DELETE', 'CLUSTER_UPDATE',
'CLUSTER_ADD_NODES', 'CLUSTER_DEL_NODES', 'CLUSTER_RESIZE',
'CLUSTER_SCALE_OUT', 'CLUSTER_SCALE_IN',
'CLUSTER_ATTACH_POLICY', 'CLUSTER_DETACH_POLICY', 'CLUSTER_UPDATE_POLICY',
'NODE_CREATE', 'NODE_DELETE', 'NODE_UPDATE',
'NODE_JOIN', 'NODE_LEAVE',
'POLICY_ENABLE', 'POLICY_DISABLE', 'POLICY_UPDATE',
)
ADJUSTMENT_PARAMS = (
ADJUSTMENT_TYPE, ADJUSTMENT_NUMBER, ADJUSTMENT_MIN_STEP,
ADJUSTMENT_MIN_SIZE, ADJUSTMENT_MAX_SIZE, ADJUSTMENT_STRICT,
) = (
'adjustment_type', 'number', 'min_step',
'min_size', 'max_size', 'strict',
)
ADJUSTMENT_TYPES = (
EXACT_CAPACITY, CHANGE_IN_CAPACITY, CHANGE_IN_PERCENTAGE,
) = (
'EXACT_CAPACITY', 'CHANGE_IN_CAPACITY', 'CHANGE_IN_PERCENTAGE',
)
CLUSTER_ATTRS = (
CLUSTER_NAME, CLUSTER_PROFILE, CLUSTER_DESIRED_CAPACITY,
CLUSTER_MIN_SIZE, CLUSTER_MAX_SIZE, CLUSTER_ID, CLUSTER_PARENT,
CLUSTER_DOMAIN, CLUSTER_PROJECT, CLUSTER_USER,
CLUSTER_CREATED_TIME, CLUSTER_UPDATED_TIME, CLUSTER_DELETED_TIME,
CLUSTER_STATUS, CLUSTER_STATUS_REASON, CLUSTER_TIMEOUT,
CLUSTER_METADATA,
) = (
'name', 'profile_id', 'desired_capacity',
'min_size', 'max_size', 'id', 'parent',
'domain', 'project', 'user',
'created_time', 'updated_time', 'deleted_time',
'status', 'status_reason', 'timeout',
'metadata',
)

View File

@ -2,7 +2,7 @@
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
@ -10,92 +10,68 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_context import context as base_context
from oslo_utils import encodeutils
from evoque.db import api as db_api
from oslo_context import context
class RequestContext(base_context.RequestContext):
'''Stores information about the security context.
class RequestContext(context.RequestContext):
"""Extends security contexts from the OpenStack common library."""
The context encapsulates information related to the user accessing the
the system, as well as additional request information.
'''
def __init__(self, auth_token=None, auth_url=None, domain_id=None,
domain_name=None, user_name=None, user_id=None,
project_name=None, project_id=None, roles=None,
is_admin=False, read_only=False, show_deleted=False,
request_id=None, trust_id=None, auth_token_info=None,
all_tenants=False, **kwargs):
"""Stores several additional request parameters:
def __init__(self, auth_token=None, user=None, project=None,
domain=None, user_domain=None, project_domain=None,
is_admin=None, read_only=False, show_deleted=False,
request_id=None, auth_url=None, trusts=None,
user_name=None, project_name=None, domain_name=None,
user_domain_name=None, project_domain_name=None,
auth_token_info=None, region_name=None, roles=None,
password=None, **kwargs):
'''Initializer of request context.'''
# We still have 'tenant' param because oslo_context still use it.
super(RequestContext, self).__init__(
auth_token=auth_token, user=user, tenant=project,
domain=domain, user_domain=user_domain,
project_domain=project_domain,
read_only=read_only, show_deleted=show_deleted,
request_id=request_id)
# request_id might be a byte array
self.request_id = encodeutils.safe_decode(self.request_id)
# we save an additional 'project' internally for use
self.project = project
# Session for DB access
self._session = None
self.auth_url = auth_url
self.trusts = trusts
:param domain_id: The ID of the domain.
:param domain_name: The name of the domain.
"""
self.user_name = user_name
self.user_id = user_id
self.project_name = project_name
self.project_id = project_id
self.domain_id = domain_id
self.domain_name = domain_name
self.user_domain_name = user_domain_name
self.project_domain_name = project_domain_name
self.roles = roles
self.auth_url = auth_url
self.auth_token_info = auth_token_info
self.region_name = region_name
self.roles = roles or []
self.password = password
self.trust_id = trust_id
self.all_tenants = all_tenants
self.is_admin = is_admin
@property
def session(self):
if self._session is None:
self._session = db_api.get_session()
return self._session
super(RequestContext, self).__init__(auth_token=auth_token,
user=user_name,
tenant=project_name,
is_admin=is_admin,
read_only=read_only,
show_deleted=show_deleted,
request_id=request_id)
def to_dict(self):
return {
'auth_url': self.auth_url,
'auth_token': self.auth_token,
'auth_token_info': self.auth_token_info,
'user': self.user,
'user_name': self.user_name,
'user_domain': self.user_domain,
'user_domain_name': self.user_domain_name,
'project': self.project,
'project_name': self.project_name,
'project_domain': self.project_domain,
'project_domain_name': self.project_domain_name,
'domain': self.domain,
'domain_name': self.domain_name,
'trusts': self.trusts,
'region_name': self.region_name,
'roles': self.roles,
'show_deleted': self.show_deleted,
'is_admin': self.is_admin,
'request_id': self.request_id,
'password': self.password,
}
value = super(RequestContext, self).to_dict()
value.update({'auth_token': self.auth_token,
'auth_url': self.auth_url,
'domain_id': self.domain_id,
'domain_name': self.domain_name,
'user_name': self.user_name,
'user_id': self.user_id,
'project_name': self.project_name,
'project_id': self.project_id,
'is_admin': self.is_admin,
'read_only': self.read_only,
'roles': self.roles,
'show_deleted': self.show_deleted,
'request_id': self.request_id,
'trust_id': self.trust_id,
'auth_token_info': self.auth_token_info,
'all_tenants': self.all_tenants})
return value
@classmethod
def from_dict(cls, values):
return cls(**values)
def make_context(*args, **kwargs):
return RequestContext(*args, **kwargs)

View File

@ -1,107 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo_config import cfg
import oslo_messaging
from oslo_serialization import jsonutils
from evoque.common import context
TRANSPORT = None
NOTIFIER = None
class RequestContextSerializer(oslo_messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.serialize_entity(ctxt, entity)
def deserialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.deserialize_entity(ctxt, entity)
@staticmethod
def serialize_context(ctxt):
return {}
@staticmethod
def deserialize_context(ctxt):
return context.RequestContext.from_dict(ctxt)
class JsonPayloadSerializer(oslo_messaging.NoOpSerializer):
@classmethod
def serialize_entity(cls, context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
def setup(url=None, optional=False):
"""Initialise the oslo_messaging layer."""
global TRANSPORT, NOTIFIER
if url and url.startswith("fake://"):
# NOTE(sileht): oslo_messaging fake driver uses time.sleep
# for task switch, so we need to monkey_patch it
eventlet.monkey_patch(time=True)
if not TRANSPORT:
oslo_messaging.set_transport_defaults('evoque')
exmods = ['evoque.exception']
try:
TRANSPORT = oslo_messaging.get_transport(
cfg.CONF, url, allowed_remote_exmods=exmods)
except oslo_messaging.InvalidTransportURL as e:
TRANSPORT = None
if not optional or e.url:
# NOTE(sileht): oslo_messaging is configured but unloadable
# so reraise the exception
raise
if not NOTIFIER and TRANSPORT:
serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = oslo_messaging.Notifier(TRANSPORT, serializer=serializer)
def cleanup():
"""Cleanup the oslo_messaging layer."""
global TRANSPORT, NOTIFIER
if TRANSPORT:
TRANSPORT.cleanup()
TRANSPORT = NOTIFIER = None
def get_rpc_server(target, endpoint):
"""Return a configured oslo_messaging rpc server."""
serializer = RequestContextSerializer(JsonPayloadSerializer())
return oslo_messaging.get_rpc_server(TRANSPORT, target, [endpoint],
executor='eventlet',
serializer=serializer)
def get_rpc_client(**kwargs):
"""Return a configured oslo_messaging RPCClient."""
target = oslo_messaging.Target(**kwargs)
serializer = RequestContextSerializer(JsonPayloadSerializer())
return oslo_messaging.RPCClient(TRANSPORT, target,
serializer=serializer)
def get_notifier(publisher_id):
"""Return a configured oslo_messaging notifier."""
return NOTIFIER.prepare(publisher_id=publisher_id)

128
evoque/common/rpc.py Normal file
View File

@ -0,0 +1,128 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
'init',
'cleanup',
'set_defaults',
'add_extra_exmods',
'clear_extra_exmods',
'get_allowed_exmods',
'RequestContextSerializer',
'get_client',
'get_server',
'get_notifier',
]
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from evoque.common import context as evoque_context
from evoque import exceptions
CONF = cfg.CONF
TRANSPORT = None
NOTIFIER = None
ALLOWED_EXMODS = [
exceptions.__name__,
]
EXTRA_EXMODS = []
def init(conf):
global TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = messaging.get_transport(conf,
allowed_remote_exmods=exmods)
serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
def cleanup():
global TRANSPORT, NOTIFIER
assert TRANSPORT is not None
assert NOTIFIER is not None
TRANSPORT.cleanup()
TRANSPORT = NOTIFIER = None
def set_defaults(control_exchange):
messaging.set_transport_defaults(control_exchange)
def add_extra_exmods(*args):
EXTRA_EXMODS.extend(args)
def clear_extra_exmods():
del EXTRA_EXMODS[:]
def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS
class JsonPayloadSerializer(messaging.NoOpSerializer):
@staticmethod
def serialize_entity(context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
class RequestContextSerializer(messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.serialize_entity(context, entity)
def deserialize_entity(self, context, entity):
if not self._base:
return entity
return self._base.deserialize_entity(context, entity)
def serialize_context(self, context):
return context.to_dict()
def deserialize_context(self, context):
return evoque_context.RequestContext.from_dict(context)
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet',
serializer=serializer)
def get_notifier(service='container', host=None, publisher_id=None):
assert NOTIFIER is not None
if not publisher_id:
publisher_id = "%s.%s" % (service, host or CONF.host)
return NOTIFIER.prepare(publisher_id=publisher_id)

View File

@ -0,0 +1,83 @@
# Copyright 2014 - Rackspace Hosting
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common RPC service and API tools for Magnum."""
import eventlet
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_service import service
from evoque.common import rpc
# NOTE(paulczar):
# Ubuntu 14.04 forces librabbitmq when kombu is used
# Unfortunately it forces a version that has a crash
# bug. Calling eventlet.monkey_patch() tells kombu
# to use libamqp instead.
eventlet.monkey_patch()
CONF = cfg.CONF
class Service(service.Service):
def __init__(self, topic, server, handlers, binary):
super(Service, self).__init__()
serializer = rpc.RequestContextSerializer(
rpc.JsonPayloadSerializer())
transport = messaging.get_transport(cfg.CONF)
# TODO(asalkeld) add support for version='x.y'
target = messaging.Target(topic=topic, server=server)
self._server = messaging.get_rpc_server(transport, target, handlers,
serializer=serializer)
self.binary = binary
def start(self):
self._server.start()
def wait(self):
self._server.wait()
@classmethod
def create(cls, topic, server, handlers, binary):
service_obj = cls(topic, server, handlers, binary)
return service_obj
class API(object):
def __init__(self, transport=None, context=None, topic=None, server=None,
timeout=None):
serializer = rpc.RequestContextSerializer(
rpc.JsonPayloadSerializer())
if transport is None:
exmods = rpc.get_allowed_exmods()
transport = messaging.get_transport(cfg.CONF,
allowed_remote_exmods=exmods)
self._context = context
if topic is None:
topic = ''
target = messaging.Target(topic=topic, server=server)
self._client = messaging.RPCClient(transport, target,
serializer=serializer,
timeout=timeout)
def _call(self, method, *args, **kwargs):
return self._client.call(self._context, method, *args, **kwargs)
def _cast(self, method, *args, **kwargs):
self._client.cast(self._context, method, *args, **kwargs)
def echo(self, message):
self._cast('echo', message=message)

View File

@ -44,20 +44,20 @@ def get_backend():
def model_query(context, *args):
session = _session(context)
session = _session()
query = session.query(*args)
return query
def _session(context):
return (context and context.session) or get_session()
def _session():
return get_session()
# Tickets
def ticket_create(context, values):
ticket_ref = models.Ticket()
ticket_ref.update(values)
ticket_ref.save(_session(context))
ticket_ref.save(_session())
return ticket_ref

View File

@ -1,97 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_context import context as oslo_context
from oslo_log import log as logging
import oslo_messaging
from oslo_service import service
from evoque.common import consts
from evoque.common import messaging as rpc_messaging
from evoque.i18n import _LI
LOG = logging.getLogger(__name__)
OPERATIONS = (
START_ACTION, CANCEL_ACTION, STOP
) = (
'start_action', 'cancel_action', 'stop'
)
class Dispatcher(service.Service):
'''Listen on an AMQP queue named for the engine.
Receive notification from engine services and schedule actions.
'''
def __init__(self, engine_service, topic, version, thread_group_mgr):
super(Dispatcher, self).__init__()
self.TG = thread_group_mgr
self.engine_id = engine_service.engine_id
self.topic = topic
self.version = version
def start(self):
super(Dispatcher, self).start()
self.target = oslo_messaging.Target(server=self.engine_id,
topic=self.topic,
version=self.version)
server = rpc_messaging.get_rpc_server(self.target, self)
server.start()
def listening(self, ctxt):
'''Respond affirmatively to confirm that engine is still alive.'''
return True
def stop(self):
super(Dispatcher, self).stop()
# Wait for all action threads to be finished
LOG.info(_LI("Stopping all action threads of engine %s"),
self.engine_id)
# Stop ThreadGroup gracefully
self.TG.stop(True)
LOG.info(_LI("All action threads have been finished"))
def notify(method, engine_id=None, **kwargs):
'''Send notification to dispatcher
:param method: remote method to call
:param engine_id: dispatcher to notify; None implies broadcast
'''
client = rpc_messaging.get_rpc_client(version=consts.RPC_API_VERSION)
if engine_id:
# Notify specific dispatcher identified by engine_id
call_context = client.prepare(
version=consts.RPC_API_VERSION,
topic=consts.ENGINE_DISPATCHER_TOPIC,
server=engine_id)
else:
# Broadcast to all disptachers
call_context = client.prepare(
version=consts.RPC_API_VERSION,
topic=consts.ENGINE_DISPATCHER_TOPIC)
try:
# We don't use ctext parameter in action progress
# actually. But since RPCClient.call needs this param,
# we use oslo current context here.
call_context.call(oslo_context.get_current(), method, **kwargs)
return True
except oslo_messaging.MessagingTimeout:
return False
def start_action(engine_id=None, **kwargs):
return notify(START_ACTION, engine_id, **kwargs)

View File

@ -1,226 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import uuid
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_service import service
from oslo_service import threadgroup
from osprofiler import profiler
import six
from evoque.common import consts
from evoque.common import messaging as rpc_messaging
from evoque.db import api as db_api
from evoque.engine import dispatcher
from evoque.i18n import _LE
from evoque.i18n import _LI
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class ThreadGroupManager(object):
def __init__(self):
super(ThreadGroupManager, self).__init__()
self.groups = {}
self.events = collections.defaultdict(list)
# Create dummy service task, because when there is nothing queued
# on self.tg the process exits
self.add_timer(60, self._service_task)
def _service_task(self):
"""
This is a dummy task which gets queued on the service.Service
threadgroup. Without this service.Service sees nothing running
i.e has nothing to wait() on, so the process exits..
This could also be used to trigger periodic non-stack-specific
housekeeping tasks
"""
pass
def _serialize_profile_info(self):
prof = profiler.get()
trace_info = None
if prof:
trace_info = {
"hmac_key": prof.hmac_key,
"base_id": prof.get_base_id(),
"parent_id": prof.get_id()
}
return trace_info
def _start_with_trace(self, trace, func, *args, **kwargs):
if trace:
profiler.init(**trace)
return func(*args, **kwargs)
def start(self, stack_id, func, *args, **kwargs):
"""
Run the given method in a sub-thread.
"""
if stack_id not in self.groups:
self.groups[stack_id] = threadgroup.ThreadGroup()
return self.groups[stack_id].add_thread(self._start_with_trace,
self._serialize_profile_info(),
func, *args, **kwargs)
def start_with_acquired_lock(self, stack, lock, func, *args, **kwargs):
"""
Run the given method in a sub-thread and release the provided lock
when the thread finishes.
:param stack: Stack to be operated on
:type stack: heat.engine.parser.Stack
:param lock: The acquired stack lock
:type lock: heat.engine.stack_lock.StackLock
:param func: Callable to be invoked in sub-thread
:type func: function or instancemethod
:param args: Args to be passed to func
:param kwargs: Keyword-args to be passed to func
"""
def release(gt):
"""
Callback function that will be passed to GreenThread.link().
"""
lock.release()
th = self.start(stack.id, func, *args, **kwargs)
th.link(release)
return th
def add_timer(self, stack_id, func, *args, **kwargs):
"""
Define a periodic task, to be run in a separate thread, in the stack
threadgroups. Periodicity is cfg.CONF.periodic_interval
"""
if stack_id not in self.groups:
self.groups[stack_id] = threadgroup.ThreadGroup()
self.groups[stack_id].add_timer(60,
func, *args, **kwargs)
def add_event(self, stack_id, event):
self.events[stack_id].append(event)
def remove_event(self, gt, stack_id, event):
for e in self.events.pop(stack_id, []):
if e is not event:
self.add_event(stack_id, e)
def stop_timers(self, stack_id):
if stack_id in self.groups:
self.groups[stack_id].stop_timers()
def stop(self, stack_id, graceful=False):
'''Stop any active threads on a stack.'''
if stack_id in self.groups:
self.events.pop(stack_id, None)
threadgroup = self.groups.pop(stack_id)
threads = threadgroup.threads[:]
threadgroup.stop(graceful)
threadgroup.wait()
# Wait for link()ed functions (i.e. lock release)
links_done = dict((th, False) for th in threads)
def mark_done(gt, th):
links_done[th] = True
for th in threads:
th.link(mark_done, th)
while not all(six.itervalues(links_done)):
eventlet.sleep()
def send(self, stack_id, message):
for event in self.events.pop(stack_id, []):
event.send(message)
class EngineService(service.Service):
'''Lifecycle manager for a running service engine.
- All the methods in here are called from the RPC client.
- If a RPC call does not have a corresponding method here, an exception
will be thrown.
- Arguments to these calls are added dynamically and will be treated as
keyword arguments by the RPC client.
'''
def __init__(self, host, topic, manager=None):
super(EngineService, self).__init__()
self.host = host
self.topic = topic
self.dispatcher_topic = consts.ENGINE_DISPATCHER_TOPIC
self.health_mgr_topic = consts.ENGINE_HEALTH_MGR_TOPIC
# The following are initialized here, but assigned in start() which
# happens after the fork when spawning multiple worker processes
self.engine_id = None
self.TG = None
self.target = None
self._rpc_server = None
def start(self):
self.engine_id = str(uuid.uuid4())
self.TG = ThreadGroupManager()
# create a dispatcher greenthread for this engine.
self.dispatcher = dispatcher.Dispatcher(self,
self.dispatcher_topic,
consts.RPC_API_VERSION,
self.TG)
LOG.info(_LI("Starting dispatcher for engine %s"), self.engine_id)
self.dispatcher.start()
target = oslo_messaging.Target(version=consts.RPC_API_VERSION,
server=self.host,
topic=self.topic)
self.target = target
self._rpc_server = rpc_messaging.get_rpc_server(target, self)
self._rpc_server.start()
super(EngineService, self).start()
def _stop_rpc_server(self):
# Stop RPC connection to prevent new requests
LOG.info(_LI("Stopping engine service..."))
try:
self._rpc_server.stop()
self._rpc_server.wait()
LOG.info(_LI('Engine service stopped successfully'))
except Exception as ex:
LOG.error(_LE('Failed to stop engine service: %s'),
six.text_type(ex))
def stop(self):
self._stop_rpc_server()
# Notify dispatcher to stop all action threads it started.
LOG.info(_LI("Stopping dispatcher for engine %s"), self.engine_id)
self.dispatcher.stop()
self.TG.stop()
super(EngineService, self).stop()
def ticket_create(self, context, name):
values = {'name': name}
ticket = db_api.ticket_create(context, values)
return ticket

View File

@ -0,0 +1,22 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from evoque.common import rpc_service
class API(rpc_service.API):
def __init__(self, transport=None, context=None, topic=None):
super(API, self).__init__(transport, context,
topic="evoque-engine")
def ticket_create(self, name):
return self._call('ticket_create', name=name)

View File

@ -0,0 +1,24 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from evoque.db import api as db_api
class Handler(object):
def __init__(self):
super(Handler, self).__init__()
def ticket_create(self, context, name):
values = {'name': name}
ticket = db_api.ticket_create(context, values)
return ticket

View File

@ -10,9 +10,18 @@
# License for the specific language governing permissions and limitations
# under the License.
import multiprocessing
import socket
from oslo_config import cfg
try:
default_workers = multiprocessing.cpu_count() or 1
except NotImplementedError:
default_workers = 1
def list_opts():
return [
("api", (
@ -25,7 +34,7 @@ def list_opts():
cfg.BoolOpt('pecan_debug',
default=False,
help='Toggle Pecan Debug Middleware.'),
cfg.IntOpt('workers', min=1,
cfg.IntOpt('workers', default=default_workers,
help='Number of workers for Evoque API server. '
'By default the available number of CPU is used.'),
cfg.IntOpt('max_limit',
@ -33,9 +42,9 @@ def list_opts():
help=('The maximum number of items returned in a '
'single response from a collection resource')),
)),
(None, (
("DEFAULT", (
cfg.StrOpt('host',
default='0.0.0.0',
default=socket.getfqdn(),
help='The listen IP for the Evoque engine server.'),
)),
]

View File

@ -1,72 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
'''
Client side of the evoque engine RPC API.
'''
from evoque.common import consts
from evoque.common import messaging
class EngineClient(object):
'''Client side of the evoque engine rpc API.'''
BASE_RPC_API_VERSION = '1.0'
def __init__(self):
self._client = messaging.get_rpc_client(
topic=consts.ENGINE_TOPIC,
server='0.0.0.0',
version=self.BASE_RPC_API_VERSION)
@staticmethod
def make_msg(method, **kwargs):
return method, kwargs
def call(self, ctxt, msg, version=None):
method, kwargs = msg
if version is not None:
client = self._client.prepare(version=version)
else:
client = self._client
return client.call(ctxt, method, **kwargs)
def cast(self, ctxt, msg, version=None):
method, kwargs = msg
if version is not None:
client = self._client.prepare(version=version)
else:
client = self._client
return client.cast(ctxt, method, **kwargs)
def local_error_name(self, error):
'''Returns the name of the error with any _Remote postfix removed.
:param error: Remote raised error to derive the name from.
'''
error_name = error.__class__.__name__
return error_name.split('_Remote')[0]
def ignore_error_named(self, error, name):
'''Raises the error unless its local name matches the supplied name
:param error: Remote raised error to derive the local name from.
:param name: Name to compare local name to.
'''
if self.local_error_name(error) != name:
raise error
def ticket_create(self, ctxt, name):
return self.call(ctxt, self.make_msg('ticket_create',
name=name))

View File

@ -11,35 +11,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import multiprocessing
import sys
from oslo_config import cfg
from oslo_log import log
from oslo_log import log as logging
from evoque import opts
LOG = log.getLogger(__name__)
LOG = logging.getLogger(__name__)
def prepare_service(args=None):
conf = cfg.ConfigOpts()
log.register_options(conf)
def prepare_service():
# Register Evoque options
for group, options in opts.list_opts():
conf.register_opts(list(options),
group=None if group == "DEFAULT" else group)
cfg.CONF.register_opts(
list(options),
group=None if group == "DEFAULT" else group)
try:
default_workers = multiprocessing.cpu_count() or 1
except NotImplementedError:
default_workers = 1
logging.register_options(cfg.CONF)
cfg.CONF(sys.argv[1:], project='evoque')
conf.set_default("workers", default_workers, group="api")
logging.setup(cfg.CONF, 'evoque')
conf(args, project='evoque', validate_default_values=True)
log.setup(conf, 'evoque')
conf.log_opt_values(LOG, logging.DEBUG)
return conf
return cfg.CONF

View File

@ -8,6 +8,7 @@ oslo.db>=3.0.0 # Apache-2.0
oslo.i18n>=1.5.0 # Apache-2.0
oslo.log>=1.8.0 # Apache-2.0
oslo.messaging!=1.17.0,!=1.17.1,!=2.6.0,!=2.6.1,>=1.16.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0
pecan>=1.0.0
SQLAlchemy<1.1.0,>=0.9.9
sqlalchemy-migrate>=0.9.6