Implement Manager service.

Implement RPC service to work with plugins and DB.
Base plugin class added.

Implements: blueprint lease-manager
Change-Id: Icbed7fabef6c0673c62f67017e5e9cd8d257b5ee
This commit is contained in:
Dina Belova 2013-09-06 15:57:34 +04:00 committed by Swann Croiset
parent 560ce68f9f
commit f50a50b3fe
16 changed files with 579 additions and 52 deletions

View File

@ -31,37 +31,6 @@ eventlet.monkey_patch(
os=True, select=True, socket=True, thread=True, time=True)
opts = [
cfg.StrOpt('os_auth_protocol',
default='http',
help='Protocol used to access OpenStack Identity service'),
cfg.StrOpt('os_auth_host',
default='127.0.0.1',
help='IP or hostname of machine on which OpenStack Identity '
'service is located'),
cfg.StrOpt('os_auth_port',
default='35357',
help='Port of OpenStack Identity service'),
cfg.StrOpt('os_admin_username',
default='admin',
help='This OpenStack user is used to verify provided tokens. '
'The user must have admin role in <os_admin_tenant_name> '
'tenant'),
cfg.StrOpt('os_admin_password',
default='nova',
help='Password of the admin user'),
cfg.StrOpt('os_admin_tenant_name',
default='admin',
help='Name of tenant where the user is admin'),
cfg.StrOpt('os_auth_version',
default='v2.0',
help='By default use Keystone API v2.0.'),
]
CONF = cfg.CONF
CONF.register_opts(opts)
def make_json_error(ex):
if isinstance(ex, werkzeug_exceptions.HTTPException):
status_code = ex.code
@ -100,22 +69,22 @@ def make_app():
for code in werkzeug_exceptions.default_exceptions.iterkeys():
app.error_handler_spec[None][code] = make_json_error
if CONF.debug and not CONF.log_exchange:
if cfg.CONF.debug and not cfg.CONF.log_exchange:
LOG.debug('Logging of request/response exchange could be enabled using'
' flag --log-exchange')
if CONF.log_exchange:
if cfg.CONF.log_exchange:
app.wsgi_app = debug.Debug.factory(app.config)(app.wsgi_app)
app.wsgi_app = auth_token.filter_factory(
app.config,
auth_host=CONF.os_auth_host,
auth_port=CONF.os_auth_port,
auth_protocol=CONF.os_auth_protocol,
admin_user=CONF.os_admin_username,
admin_password=CONF.os_admin_password,
admin_tenant_name=CONF.os_admin_tenant_name,
auth_version=CONF.os_auth_version,
auth_host=cfg.CONF.os_auth_host,
auth_port=cfg.CONF.os_auth_port,
auth_protocol=cfg.CONF.os_auth_protocol,
admin_user=cfg.CONF.os_admin_username,
admin_password=cfg.CONF.os_admin_password,
admin_tenant_name=cfg.CONF.os_admin_tenant_name,
auth_version=cfg.CONF.os_auth_version,
)(app.wsgi_app)
return app

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from climate import exceptions
from climate.manager import rpcapi as manager_rpcapi
from climate.openstack.common import log as logging
@ -21,11 +23,14 @@ LOG = logging.getLogger(__name__)
class API(object):
def __init__(self):
self.manager_rpcapi = manager_rpcapi.ManagerRPCAPI()
## Leases operations
def get_leases(self):
"""List all existing leases."""
pass
return self.manager_rpcapi.list_leases()
def create_lease(self, data):
"""Create new lease.
@ -33,7 +38,10 @@ class API(object):
:param data: New lease characteristics.
:type data: dict
"""
pass
# here API should go to Keystone API v3 and create trust
trust = 'trust'
data.update({'trust': trust})
return self.manager_rpcapi.create_lease(data)
def get_lease(self, lease_id):
"""Get lease by its ID.
@ -41,7 +49,7 @@ class API(object):
:param lease_id: ID of the lease in Climate DB.
:type lease_id: str
"""
pass
return self.manager_rpcapi.get_lease(lease_id)
def update_lease(self, lease_id, data):
"""Update lease. Only name changing and prolonging may be proceeded.
@ -51,7 +59,22 @@ class API(object):
:param data: New lease characteristics.
:type data: dict
"""
pass
new_name = data.pop('name', None)
end_date = data.pop('end_date', None)
start_date = data.pop('start_date', None)
if data:
raise exceptions.ClimateException('Only name changing and '
'dates changing may be '
'proceeded.')
data = {}
if new_name:
data['name'] = new_name
if end_date:
data['end_date'] = end_date
if start_date:
data['start_date'] = start_date
return self.manager_rpcapi.update_lease(lease_id, data)
def delete_lease(self, lease_id):
"""Delete specified lease.
@ -59,7 +82,7 @@ class API(object):
:param lease_id: ID of the lease in Climate DB.
:type lease_id: str
"""
pass
self.manager_rpcapi.delete_lease(lease_id)
## Plugins operations

View File

@ -15,7 +15,6 @@
import gettext
import os
import socket
import sys
import eventlet
@ -32,12 +31,6 @@ from climate.utils import service as service_utils
opts = [
cfg.StrOpt('host', default=socket.getfqdn(),
help='Name of this node. This can be an opaque identifier. '
'It is not necessarily a hostname, FQDN, or IP address. '
'However, the node name must be valid within '
'an AMQP key, and if using ZeroMQ, a valid '
'hostname, FQDN, or IP address'),
cfg.IntOpt('port', default=1234,
help='Port that will be used to listen on'),
]

39
climate/cmd/manager.py Normal file
View File

@ -0,0 +1,39 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
eventlet.monkey_patch()
import sys
from oslo.config import cfg
from climate.db import api as db_api
from climate.manager import service as manager_service
from climate.openstack.common import service
from climate.utils import service as service_utils
cfg.CONF.import_opt('host', 'climate.config')
def main():
service_utils.prepare_service(sys.argv)
db_api.setup_db()
service.launch(
manager_service.ManagerService(cfg.CONF.host)
).wait()
if __name__ == '__main__':
main()

View File

@ -13,16 +13,52 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
from oslo.config import cfg
cli_opts = [
cfg.BoolOpt('log-exchange', default=False,
help='Log request/response exchange details: environ, '
'headers and bodies'),
cfg.StrOpt('host', default=socket.getfqdn(),
help='Name of this node. This can be an opaque identifier. '
'It is not necessarily a hostname, FQDN, or IP address. '
'However, the node name must be valid within '
'an AMQP key, and if using ZeroMQ, a valid '
'hostname, FQDN, or IP address'),
]
os_opts = [
cfg.StrOpt('os_auth_protocol',
default='http',
help='Protocol used to access OpenStack Identity service'),
cfg.StrOpt('os_auth_host',
default='127.0.0.1',
help='IP or hostname of machine on which OpenStack Identity '
'service is located'),
cfg.StrOpt('os_auth_port',
default='35357',
help='Port of OpenStack Identity service'),
cfg.StrOpt('os_admin_username',
default='admin',
help='This OpenStack user is used to verify provided tokens. '
'The user must have admin role in <os_admin_tenant_name> '
'tenant'),
cfg.StrOpt('os_admin_password',
default='nova',
help='Password of the admin user'),
cfg.StrOpt('os_admin_tenant_name',
default='admin',
help='Name of tenant where the user is admin'),
cfg.StrOpt('os_auth_version',
default='v2.0',
help='We use API v3 to allow trusts using.'),
]
CONF = cfg.CONF
CONF.register_cli_opts(cli_opts)
CONF.register_opts(os_opts)
ARGV = []

View File

@ -302,6 +302,12 @@ def event_get_all_sorted_by_filters(sort_key, sort_dir, filters):
if 'status' in filters:
events_query = \
events_query.filter(models.Event.status == filters['status'])
if 'lease_id' in filters:
events_query = \
events_query.filter(models.Event.lease_id == filters['lease_id'])
if 'event_type' in filters:
events_query = events_query.filter(models.Event.event_type ==
filters['event_type'])
events_query = events_query.order_by(
sort_fn[sort_dir](getattr(models.Event, sort_key))

View File

@ -0,0 +1,14 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

56
climate/manager/rpcapi.py Normal file
View File

@ -0,0 +1,56 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from climate.utils import service
CONF = cfg.CONF
CONF.import_opt('rpc_topic', 'climate.manager.service', 'manager')
class ManagerRPCAPI(service.RpcProxy):
"""Client side for the Manager RPC API.
Used from other services to communicate with climate-manager service.
"""
BASE_RPC_API_VERSION = '1.0'
def __init__(self):
"""Initiate RPC API client with needed topic and RPC version."""
super(ManagerRPCAPI, self).__init__(
topic=CONF.manager.rpc_topic,
default_version=self.BASE_RPC_API_VERSION,
)
def get_lease(self, lease_id):
"""Get detailed info about some lease."""
return self.call('get_lease', lease_id=lease_id)
def list_leases(self):
"""List all leases."""
return self.call('list_leases')
def create_lease(self, lease_values):
"""Create lease with specified parameters."""
return self.call('create_lease', lease_values=lease_values)
def update_lease(self, lease_id, values):
"""Update lease with passes values dictionary."""
return self.call('update_lease', lease_id=lease_id, values=values)
def delete_lease(self, lease_id):
"""Delete specified lease."""
return self.cast('delete_lease', lease_id=lease_id)

213
climate/manager/service.py Normal file
View File

@ -0,0 +1,213 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import eventlet
import six
from oslo.config import cfg
from stevedore import enabled
from climate.db import api as db_api
from climate import exceptions
from climate.openstack.common import log as logging
from climate.openstack.common.rpc import service as rpc_service
from climate.utils import service as service_utils
manager_opts = [
cfg.StrOpt('rpc_topic',
default='climate.manager',
help='The topic Climate uses for climate-manager messages.'),
cfg.ListOpt('plugins',
default=['dummy.vm.plugin'],
help='All plugins to use (one for every resource type to '
'support.)'),
]
CONF = cfg.CONF
CONF.register_opts(manager_opts, 'manager')
LOG = logging.getLogger(__name__)
class ManagerService(rpc_service.Service):
"""Service class for the climate-manager service.
Responsible for working with Climate DB, scheduling logic, running events,
working with plugins, etc.
"""
RPC_API_VERSION = '1.0'
def __init__(self, host):
super(ManagerService, self).__init__(host, CONF.manager.rpc_topic)
self.plugins = self._get_plugins()
self.resource_actions = self._setup_actions()
def start(self):
super(ManagerService, self).start()
self.tg.add_timer(10, self._event)
def _get_plugins(self):
"""Return dict of resource-plugin class pairs."""
config_plugins = CONF.manager.plugins
plugins = {}
extension_manager = enabled.EnabledExtensionManager(
check_func=lambda ext: ext.name in config_plugins,
namespace='climate.resource.plugins',
invoke_on_load=True
)
for ext in extension_manager.extensions:
if ext.obj.resource_type in plugins:
raise exceptions.ClimateException(
'You have provided several plugins for one resource type '
'in configuration file. '
'Please set one plugin per resource type.'
)
plugins[ext.obj.resource_type] = ext.obj
if len(plugins) < len(config_plugins):
raise exceptions.ClimateException('Not all requested plugins are '
'loaded.')
return plugins
def _setup_actions(self):
"""Setup actions for each resource type supported.
BasePlugin interface provides only on_start and on_end behaviour now.
If there are some configs needed by plugin, they should be returned
from get_plugin_opts method. These flags are registered in
[resource_type] group of configuration file.
"""
actions = {}
for resource_type, plugin in six.iteritems(self.plugins):
plugin = self.plugins[resource_type]
CONF.register_opts(plugin.get_plugin_opts(), group=resource_type)
actions[resource_type] = {}
actions[resource_type]['on_start'] = plugin.on_start
actions[resource_type]['on_end'] = plugin.on_end
return actions
@service_utils.with_empty_context
def _event(self):
"""Tries to commit event.
If there is an event in Climate DB to be done, do it and change its
status to 'DONE'.
"""
LOG.debug('Trying to get event from DB.')
events = db_api.event_get_all_sorted_by_filters(
sort_key='time',
sort_dir='asc',
filters={'status': 'UNDONE'}
)
if not events:
return
event = events[0]
if event['time'] < datetime.datetime.utcnow():
db_api.event_update(event['id'], {'status': 'IN_PROGRESS'})
event_type = event['event_type']
event_fn = getattr(self, event_type, None)
if event_fn is None:
raise exceptions.ClimateException('Event type %s is not '
'supported' % event_type)
try:
eventlet.spawn_n(service_utils.with_empty_context(event_fn),
event['lease_id'], event['id'])
except Exception:
db_api.event_update(event['id'], {'status': 'ERROR'})
LOG.exception('Error occurred while event handling.')
@service_utils.export_context
def get_lease(self, lease_id):
return db_api.lease_get(lease_id)
@service_utils.export_context
def list_leases(self):
return db_api.lease_list()
@service_utils.export_context
def create_lease(self, lease_values):
start_date = lease_values['start_date']
end_date = lease_values['end_date']
if start_date == 'now':
start_date = datetime.datetime.utcnow()
else:
start_date = datetime.datetime.strptime(start_date,
"%Y-%m-%d %H:%M")
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d %H:%M")
lease_values['start_date'] = start_date
lease_values['end_date'] = end_date
if not lease_values.get('events'):
lease_values['events'] = []
lease_values['events'].append({'event_type': 'start_lease',
'time': start_date,
'status': 'UNDONE'})
lease_values['events'].append({'event_type': 'end_lease',
'time': end_date,
'status': 'UNDONE'})
lease = db_api.lease_create(lease_values)
return db_api.lease_get(lease['id'])
@service_utils.export_context
def update_lease(self, lease_id, values):
if values:
db_api.lease_update(lease_id, values)
return db_api.lease_get(lease_id)
@service_utils.export_context
def delete_lease(self, lease_id):
lease = self.get_lease(lease_id)
for reservation in lease['reservations']:
self.plugins[reservation['resource_type']]\
.on_end(reservation['resource_id'])
db_api.lease_destroy(lease_id)
def start_lease(self, lease_id, event_id):
self._basic_action(lease_id, event_id, 'on_start', 'active')
def end_lease(self, lease_id, event_id):
self._basic_action(lease_id, event_id, 'on_end', 'deleted')
def _basic_action(self, lease_id, event_id, action_time,
reservation_status=None):
"""Commits basic lease actions such as starting and ending."""
lease = self.get_lease(lease_id)
for reservation in lease['reservations']:
resource_type = reservation['resource_type']
self.resource_actions[resource_type][action_time](
reservation['resource_id']
)
if reservation_status is not None:
db_api.reservation_update(reservation['id'],
{'status': reservation_status})
db_api.event_update(event_id, {'status': 'DONE'})

View File

@ -0,0 +1,14 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

68
climate/plugins/base.py Normal file
View File

@ -0,0 +1,68 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from oslo.config import cfg
from climate.openstack.common import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class BasePlugin(object):
__metaclass__ = abc.ABCMeta
resource_type = 'none'
title = None
description = None
def get_plugin_opts(self):
"""Plugin can expose some options that should be specified in conf file
For example:
def get_plugin_opts(self):
return [
cfg.StrOpt('mandatory-conf', required=True),
cfg.StrOpt('optional_conf', default="42"),
]
"""
return []
def setup(self, conf):
"""Plugin initialization
:param conf: plugin-specific configurations
"""
pass
def to_dict(self):
return {
'resource_type': self.resource_type,
'title': self.title,
'description': self.description,
}
@abc.abstractmethod
def on_end(self, resource_id):
"""Delete resource."""
pass
@abc.abstractmethod
def on_start(self, resource_id):
"""Wake up resource."""
pass

View File

@ -0,0 +1,31 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from climate.plugins import base
class DummyVMPlugin(base.BasePlugin):
"""Plugin for VM resource that does nothing."""
resource_type = 'virtual:instance'
title = 'Dummy VM Plugin'
description = 'This plugin does nothing.'
def on_start(self, resource_id):
"""Dummy VM plugin does nothing."""
return 'VM %s should be waked up this moment.' % resource_id
def on_end(self, resource_id):
"""Dummy VM plugin does nothing."""
return 'VM %s should be deleted this moment.' % resource_id

View File

@ -15,10 +15,54 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
from oslo.config import cfg
from climate import context
from climate.openstack.common import log
from climate.openstack.common import rpc
import climate.openstack.common.rpc.proxy as rpc_proxy
class RpcProxy(rpc_proxy.RpcProxy):
def cast(self, name, topic=None, version=None, ctx=None, **kwargs):
if ctx is None:
ctx = context.Context.current()
msg = self.make_msg(name, **kwargs)
return super(RpcProxy, self).cast(ctx, msg,
topic=topic, version=version)
def call(self, name, topic=None, version=None, ctx=None, **kwargs):
if ctx is None:
ctx = context.Context.current()
msg = self.make_msg(name, **kwargs)
return super(RpcProxy, self).call(ctx, msg,
topic=topic, version=version)
def export_context(func):
@functools.wraps(func)
def decorator(manager, ctx, *args, **kwargs):
try:
context.Context.current()
except RuntimeError:
new_ctx = context.Context(**ctx.values)
with new_ctx:
return func(manager, *args, **kwargs)
else:
return func(manager, ctx, *args, **kwargs)
return decorator
def with_empty_context(func):
@functools.wraps(func)
def decorator(*args, **kwargs):
with context.Context():
return func(*args, **kwargs)
return decorator
def prepare_service(argv=[]):

15
etc/climate.conf.example Normal file
View File

@ -0,0 +1,15 @@
[DEFAULT]
os_auth_host=<auth_host>
os_auth_port=<auth_port>
os_auth_protocol=<http, for example>
os_admin_username=<username>
os_admin_password=<password>
os_admin_tenant_name=<tenant_name>
[manager]
plugins=dummy.vm.plugin
[virtual:instance]
on_start = wake_up
on_end = delete

View File

@ -3,10 +3,12 @@ pbr>=0.5.21,<1.0
eventlet>=0.13.0
Flask>=0.10,<1.0a0
iso8601>=0.1.4
kombu>=2.4.8
oslo.config>=1.2.0
python-novaclient>=2.15.0
netaddr
python-keystoneclient>=0.3.2
Routes>=1.12.3
SQLAlchemy>=0.7.8,<=0.7.99
stevedore>=0.10
WebOb>=1.2.3,<1.3a0

View File

@ -31,6 +31,10 @@ console_scripts =
climate-api=climate.cmd.api:main
climate-scheduler=climate.cmd.scheduler:main
climate-rpc-zmq-receiver=climate.cmd.rpc_zmq_receiver:main
climate-manager=climate.cmd.manager:main
climate.resource.plugins =
dummy.vm.plugin=climate.plugins.dummy_vm_plugin:DummyVMPlugin
[build_sphinx]
all_files = 1