Split sahara into sahara-api and sahara-engine
Split monolitic sahara-api into two services: sahara-api and sahara-engine. The former is a user-facing interface, the later - service doing all the work. Sahara-api sends tasks to sahara-engine via oslo.messaging. See the blueprint for details. Notes: * Used the following Climate RPC code as a baseline: https://github.com/stackforge/climate/blob/master/climate/utils/service.py hence added Julien Danjou to licence header. * Removed the old contents of sahara/utils/rpc.py - that is a prehistoric stuff not used anywhere. * sahara-api still depends on eventlet, while we want to drop this dependency in the end. * periodics run in sahara-api. They shold be moved to engine later. Partially Implements: blueprint scalable-savanna Change-Id: I64275a757b539f3fcddd6e993d6614d492745226
This commit is contained in:
parent
7077fd78a7
commit
4e24c3ca5b
@ -5,6 +5,7 @@ Flask>=0.10,<1.0
|
||||
iso8601>=0.1.9
|
||||
jsonschema>=2.0.0,<3.0.0
|
||||
oslo.config>=1.2.0
|
||||
oslo.messaging>=1.3.0
|
||||
paramiko>=1.9.0
|
||||
pbr>=0.6,!=0.7,<1.0
|
||||
requests>=1.1
|
||||
|
59
sahara/cli/sahara_all.py
Normal file
59
sahara/cli/sahara_all.py
Normal file
@ -0,0 +1,59 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright (c) 2013 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.utils import patches
|
||||
patches.patch_all()
|
||||
|
||||
import gettext
|
||||
import os
|
||||
import sys
|
||||
|
||||
import eventlet
|
||||
from eventlet import wsgi
|
||||
from oslo.config import cfg
|
||||
|
||||
|
||||
# If ../sahara/__init__.py exists, add ../ to Python search path, so that
|
||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
os.pardir,
|
||||
os.pardir))
|
||||
if os.path.exists(os.path.join(possible_topdir,
|
||||
'sahara',
|
||||
'__init__.py')):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
|
||||
gettext.install('sahara', unicode=1)
|
||||
|
||||
|
||||
import sahara.main as server
|
||||
from sahara.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def main():
|
||||
server.setup_common(possible_topdir, 'all-in-one')
|
||||
|
||||
app = server.make_app()
|
||||
|
||||
server.setup_sahara_api(app, 'all-in-one')
|
||||
server.setup_sahara_engine()
|
||||
|
||||
wsgi.server(eventlet.listen((cfg.CONF.host, cfg.CONF.port), backlog=500),
|
||||
app, log=logging.WritableLogger(LOG), debug=False)
|
@ -40,7 +40,6 @@ if os.path.exists(os.path.join(possible_topdir,
|
||||
gettext.install('sahara', unicode=1)
|
||||
|
||||
|
||||
from sahara import config
|
||||
import sahara.main as server
|
||||
from sahara.openstack.common import log as logging
|
||||
|
||||
@ -49,18 +48,11 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def main():
|
||||
dev_conf = os.path.join(possible_topdir,
|
||||
'etc',
|
||||
'sahara',
|
||||
'sahara.conf')
|
||||
config_files = None
|
||||
if os.path.exists(dev_conf):
|
||||
config_files = [dev_conf]
|
||||
|
||||
config.parse_configs(sys.argv[1:], config_files)
|
||||
logging.setup("sahara")
|
||||
server.setup_common(possible_topdir, 'API')
|
||||
|
||||
app = server.make_app()
|
||||
|
||||
server.setup_sahara_api(app, 'distributed')
|
||||
|
||||
wsgi.server(eventlet.listen((cfg.CONF.host, cfg.CONF.port), backlog=500),
|
||||
app, log=logging.WritableLogger(LOG), debug=False)
|
||||
|
49
sahara/cli/sahara_engine.py
Normal file
49
sahara/cli/sahara_engine.py
Normal file
@ -0,0 +1,49 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright (c) 2013 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.utils import patches
|
||||
patches.patch_all()
|
||||
|
||||
import gettext
|
||||
import os
|
||||
import sys
|
||||
|
||||
|
||||
# If ../sahara/__init__.py exists, add ../ to Python search path, so that
|
||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
os.pardir,
|
||||
os.pardir))
|
||||
if os.path.exists(os.path.join(possible_topdir,
|
||||
'sahara',
|
||||
'__init__.py')):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
|
||||
gettext.install('sahara', unicode=1)
|
||||
|
||||
|
||||
import sahara.main as server
|
||||
from sahara.service import ops
|
||||
|
||||
|
||||
def main():
|
||||
server.setup_common(possible_topdir, 'engine')
|
||||
|
||||
server.setup_sahara_engine()
|
||||
|
||||
ops_server = ops.OpsServer()
|
||||
ops_server.start()
|
@ -13,6 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
import flask
|
||||
from keystoneclient.middleware import auth_token
|
||||
from oslo.config import cfg
|
||||
@ -22,25 +25,23 @@ from werkzeug import exceptions as werkzeug_exceptions
|
||||
|
||||
from sahara.api import v10 as api_v10
|
||||
from sahara.api import v11 as api_v11
|
||||
from sahara import config
|
||||
from sahara import context
|
||||
from sahara.middleware import auth_valid
|
||||
from sahara.middleware import log_exchange
|
||||
from sahara.openstack.common import log
|
||||
from sahara.plugins import base as plugins_base
|
||||
from sahara.service import api as service_api
|
||||
from sahara.service.edp import api as edp_api
|
||||
from sahara.service import ops as service_ops
|
||||
from sahara.service import periodic
|
||||
from sahara.utils import api as api_utils
|
||||
from sahara.utils import patches
|
||||
from sahara.utils import remote
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
# Patches minidom's writexml to avoid excess whitespaces in generated xml
|
||||
# configuration files that brakes Hadoop.
|
||||
patches.patch_minidom_writexml()
|
||||
|
||||
opts = [
|
||||
cfg.StrOpt('os_auth_protocol',
|
||||
default='http',
|
||||
@ -77,6 +78,44 @@ CONF = cfg.CONF
|
||||
CONF.register_opts(opts)
|
||||
|
||||
|
||||
def setup_common(possible_topdir, service_name):
|
||||
dev_conf = os.path.join(possible_topdir,
|
||||
'etc',
|
||||
'sahara',
|
||||
'sahara.conf')
|
||||
config_files = None
|
||||
if os.path.exists(dev_conf):
|
||||
config_files = [dev_conf]
|
||||
|
||||
config.parse_configs(sys.argv[1:], config_files)
|
||||
log.setup("sahara")
|
||||
|
||||
LOG.info('Starting Sahara %s' % service_name)
|
||||
|
||||
plugins_base.setup_plugins()
|
||||
|
||||
|
||||
def setup_sahara_api(app, mode):
|
||||
periodic.setup(app)
|
||||
|
||||
#TODO(dmitryme): move periodics to engine, until then the remote
|
||||
# initialization here is a temporal hack
|
||||
remote_driver = _get_remote_driver()
|
||||
remote.setup_remote(remote_driver, None)
|
||||
|
||||
ops = _get_ops_driver(mode)
|
||||
service_api.setup_service_api(ops)
|
||||
edp_api.setup_edp_api(ops)
|
||||
|
||||
|
||||
def setup_sahara_engine():
|
||||
engine = _get_infrastructure_engine()
|
||||
service_ops.setup_ops(engine)
|
||||
|
||||
remote_driver = _get_remote_driver()
|
||||
remote.setup_remote(remote_driver, engine)
|
||||
|
||||
|
||||
def make_app():
|
||||
"""App builder (wsgi)
|
||||
|
||||
@ -101,15 +140,6 @@ def make_app():
|
||||
app.register_blueprint(api_v10.rest, url_prefix='/v1.1')
|
||||
app.register_blueprint(api_v11.rest, url_prefix='/v1.1')
|
||||
|
||||
plugins_base.setup_plugins()
|
||||
periodic.setup(app)
|
||||
|
||||
engine = _get_infrastructure_engine()
|
||||
service_api.setup_service_api(engine)
|
||||
|
||||
remote_driver = _get_remote_driver()
|
||||
remote.setup_remote(remote_driver, engine)
|
||||
|
||||
def make_json_error(ex):
|
||||
status_code = (ex.code
|
||||
if isinstance(ex, werkzeug_exceptions.HTTPException)
|
||||
@ -147,30 +177,35 @@ def make_app():
|
||||
return app
|
||||
|
||||
|
||||
def _load_driver(namespace, name):
|
||||
extension_manager = stevedore.DriverManager(
|
||||
namespace=namespace,
|
||||
name=name,
|
||||
invoke_on_load=True
|
||||
)
|
||||
|
||||
return extension_manager.driver
|
||||
|
||||
|
||||
def _get_infrastructure_engine():
|
||||
"""That should import and return one of
|
||||
sahara.service.instances*.py modules
|
||||
sahara.service.*_engine.py modules
|
||||
"""
|
||||
|
||||
LOG.info("Loading '%s' infrastructure engine" %
|
||||
CONF.infrastructure_engine)
|
||||
|
||||
extension_manager = stevedore.DriverManager(
|
||||
namespace='sahara.infrastructure.engine',
|
||||
name=CONF.infrastructure_engine,
|
||||
invoke_on_load=True
|
||||
)
|
||||
|
||||
return extension_manager.driver
|
||||
return _load_driver('sahara.infrastructure.engine',
|
||||
CONF.infrastructure_engine)
|
||||
|
||||
|
||||
def _get_remote_driver():
|
||||
LOG.info("Loading '%s' remote" % CONF.remote)
|
||||
|
||||
extension_manager = stevedore.DriverManager(
|
||||
namespace='sahara.remote',
|
||||
name=CONF.remote,
|
||||
invoke_on_load=True
|
||||
)
|
||||
return _load_driver('sahara.remote', CONF.remote)
|
||||
|
||||
return extension_manager.driver
|
||||
|
||||
def _get_ops_driver(driver_name):
|
||||
LOG.info("Loading '%s' ops" % driver_name)
|
||||
|
||||
return _load_driver('sahara.run.mode', driver_name)
|
||||
|
@ -22,8 +22,6 @@ from sahara.openstack.common import excutils
|
||||
from sahara.openstack.common import log as logging
|
||||
from sahara.plugins import base as plugin_base
|
||||
from sahara.plugins import provisioning
|
||||
from sahara.service.edp import job_manager as jm
|
||||
from sahara.service import trusts
|
||||
from sahara.utils import general as g
|
||||
from sahara.utils.openstack import nova
|
||||
|
||||
@ -33,13 +31,13 @@ CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
INFRA = None
|
||||
OPS = None
|
||||
|
||||
|
||||
def setup_service_api(engine):
|
||||
global INFRA
|
||||
def setup_service_api(ops):
|
||||
global OPS
|
||||
|
||||
INFRA = engine
|
||||
OPS = ops
|
||||
|
||||
|
||||
## Cluster ops
|
||||
@ -70,14 +68,6 @@ def scale_cluster(id, data):
|
||||
additional = construct_ngs_for_scaling(cluster, additional_node_groups)
|
||||
cluster = conductor.cluster_get(ctx, cluster)
|
||||
|
||||
# update nodegroup image usernames
|
||||
for nodegroup in cluster.node_groups:
|
||||
if additional.get(nodegroup.id):
|
||||
image_username = INFRA.get_node_group_image_username(nodegroup)
|
||||
conductor.node_group_update(
|
||||
ctx, nodegroup, {"image_username": image_username})
|
||||
cluster = conductor.cluster_get(ctx, cluster)
|
||||
|
||||
try:
|
||||
cluster = conductor.cluster_update(ctx, cluster,
|
||||
{"status": "Validating"})
|
||||
@ -98,9 +88,8 @@ def scale_cluster(id, data):
|
||||
if node_group.id not in to_be_enlarged:
|
||||
to_be_enlarged[node_group.id] = node_group.count
|
||||
|
||||
context.spawn("cluster-scaling-%s" % id,
|
||||
_provision_scaled_cluster, id, to_be_enlarged)
|
||||
return conductor.cluster_get(ctx, id)
|
||||
OPS.provision_scaled_cluster(id, to_be_enlarged)
|
||||
return cluster
|
||||
|
||||
|
||||
def create_cluster(values):
|
||||
@ -108,13 +97,6 @@ def create_cluster(values):
|
||||
cluster = conductor.cluster_create(ctx, values)
|
||||
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
|
||||
# update nodegroup image usernames
|
||||
for nodegroup in cluster.node_groups:
|
||||
conductor.node_group_update(
|
||||
ctx, nodegroup,
|
||||
{"image_username": INFRA.get_node_group_image_username(nodegroup)})
|
||||
cluster = conductor.cluster_get(ctx, cluster)
|
||||
|
||||
# validating cluster
|
||||
try:
|
||||
cluster = conductor.cluster_update(ctx, cluster,
|
||||
@ -129,109 +111,9 @@ def create_cluster(values):
|
||||
"status_description": str(e)})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
|
||||
context.spawn("cluster-creating-%s" % cluster.id,
|
||||
_provision_cluster, cluster.id)
|
||||
if CONF.use_identity_api_v3 and cluster.is_transient:
|
||||
trusts.create_trust(cluster)
|
||||
OPS.provision_cluster(cluster.id)
|
||||
|
||||
return conductor.cluster_get(ctx, cluster.id)
|
||||
|
||||
|
||||
def _provision_scaled_cluster(id, node_group_id_map):
|
||||
ctx = context.ctx()
|
||||
cluster = conductor.cluster_get(ctx, id)
|
||||
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
|
||||
# Decommissioning surplus nodes with the plugin
|
||||
|
||||
cluster = conductor.cluster_update(ctx, cluster,
|
||||
{"status": "Decommissioning"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
|
||||
instances_to_delete = []
|
||||
|
||||
for node_group in cluster.node_groups:
|
||||
new_count = node_group_id_map[node_group.id]
|
||||
if new_count < node_group.count:
|
||||
instances_to_delete += node_group.instances[new_count:
|
||||
node_group.count]
|
||||
|
||||
if instances_to_delete:
|
||||
plugin.decommission_nodes(cluster, instances_to_delete)
|
||||
|
||||
# Scaling infrastructure
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Scaling"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
|
||||
instances = INFRA.scale_cluster(cluster, node_group_id_map)
|
||||
|
||||
# Setting up new nodes with the plugin
|
||||
|
||||
if instances:
|
||||
cluster = conductor.cluster_update(ctx, cluster,
|
||||
{"status": "Configuring"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
try:
|
||||
instances = g.get_instances(cluster, instances)
|
||||
plugin.scale_cluster(cluster, instances)
|
||||
except Exception as ex:
|
||||
LOG.exception("Can't scale cluster '%s' (reason: %s)",
|
||||
cluster.name, ex)
|
||||
cluster = conductor.cluster_update(ctx, cluster,
|
||||
{"status": "Error"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
return
|
||||
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Active"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
|
||||
|
||||
def _provision_cluster(cluster_id):
|
||||
ctx = context.ctx()
|
||||
cluster = conductor.cluster_get(ctx, cluster_id)
|
||||
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
|
||||
# updating cluster infra
|
||||
cluster = conductor.cluster_update(ctx, cluster,
|
||||
{"status": "InfraUpdating"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
plugin.update_infra(cluster)
|
||||
|
||||
# creating instances and configuring them
|
||||
cluster = conductor.cluster_get(ctx, cluster_id)
|
||||
INFRA.create_cluster(cluster)
|
||||
|
||||
# configure cluster
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Configuring"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
try:
|
||||
plugin.configure_cluster(cluster)
|
||||
except Exception as ex:
|
||||
LOG.exception("Can't configure cluster '%s' (reason: %s)",
|
||||
cluster.name, ex)
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Error"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
return
|
||||
|
||||
# starting prepared and configured cluster
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Starting"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
try:
|
||||
plugin.start_cluster(cluster)
|
||||
except Exception as ex:
|
||||
LOG.exception("Can't start services for cluster '%s' (reason: %s)",
|
||||
cluster.name, ex)
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Error"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
return
|
||||
|
||||
# cluster is now up and ready
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Active"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
|
||||
# schedule execution pending job for cluster
|
||||
for je in conductor.job_execution_get_all(ctx, cluster_id=cluster.id):
|
||||
jm.run_job(je)
|
||||
return cluster
|
||||
|
||||
|
||||
def terminate_cluster(id):
|
||||
@ -241,12 +123,7 @@ def terminate_cluster(id):
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Deleting"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
|
||||
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
plugin.on_terminate_cluster(cluster)
|
||||
INFRA.shutdown_cluster(cluster)
|
||||
if CONF.use_identity_api_v3:
|
||||
trusts.delete_trust(cluster)
|
||||
conductor.cluster_destroy(ctx, cluster)
|
||||
OPS.terminate_cluster(id)
|
||||
|
||||
|
||||
## ClusterTemplate ops
|
||||
|
@ -18,7 +18,6 @@ from oslo.config import cfg
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
from sahara.openstack.common import log as logging
|
||||
from sahara.plugins import base as plugin_base
|
||||
from sahara.service.edp.binary_retrievers import dispatch
|
||||
from sahara.service.edp import job_manager as manager
|
||||
from sahara.service.edp.workflow_creator import workflow_factory as w_f
|
||||
@ -29,6 +28,15 @@ LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
OPS = None
|
||||
|
||||
|
||||
def setup_edp_api(ops):
|
||||
global OPS
|
||||
|
||||
OPS = ops
|
||||
|
||||
|
||||
def get_job_config_hints(job_type):
|
||||
return w_f.get_possible_job_config(job_type)
|
||||
|
||||
@ -39,17 +47,6 @@ def execute_job(job_id, data):
|
||||
cluster_id = data['cluster_id']
|
||||
configs = data.get('job_configs', {})
|
||||
|
||||
ctx = context.current()
|
||||
cluster = conductor.cluster_get(ctx, cluster_id)
|
||||
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
instance = plugin.get_oozie_server(cluster)
|
||||
|
||||
extra = {}
|
||||
info = None
|
||||
if CONF.use_namespaces and not CONF.use_floating_ips:
|
||||
info = instance.remote().get_neutron_info()
|
||||
extra['neutron'] = info
|
||||
|
||||
# Not in Java job types but present for all others
|
||||
input_id = data.get('input_id', None)
|
||||
output_id = data.get('output_id', None)
|
||||
@ -59,11 +56,11 @@ def execute_job(job_id, data):
|
||||
job_ex_dict = {'input_id': input_id, 'output_id': output_id,
|
||||
'job_id': job_id, 'cluster_id': cluster_id,
|
||||
'info': {'status': 'Pending'}, 'job_configs': configs,
|
||||
'extra': extra}
|
||||
'extra': {}}
|
||||
job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict)
|
||||
|
||||
context.spawn("Starting Job Execution %s" % job_execution.id,
|
||||
manager.run_job, job_execution)
|
||||
OPS.run_edp_job(job_execution.id)
|
||||
|
||||
return job_execution
|
||||
|
||||
|
||||
|
@ -117,12 +117,27 @@ def cancel_job(job_execution_id):
|
||||
return job_execution
|
||||
|
||||
|
||||
def run_job(job_execution):
|
||||
def run_job(job_execution_id):
|
||||
ctx = context.ctx()
|
||||
|
||||
job_execution = conductor.job_execution_get(ctx,
|
||||
job_execution_id)
|
||||
|
||||
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
|
||||
if cluster.status != 'Active':
|
||||
return job_execution
|
||||
return
|
||||
|
||||
if CONF.use_namespaces and not CONF.use_floating_ips:
|
||||
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
oozie = plugin.get_oozie_server(cluster)
|
||||
|
||||
info = oozie.remote().get_neutron_info()
|
||||
extra = job_execution.extra.copy()
|
||||
extra['neutron'] = info
|
||||
|
||||
job_execution = conductor.job_execution_update(ctx,
|
||||
job_execution_id,
|
||||
{'extra': extra})
|
||||
|
||||
job = conductor.job_get(ctx, job_execution.job_id)
|
||||
if not edp.compare_job_type(job.type, edp.JOB_TYPE_JAVA):
|
||||
@ -172,8 +187,6 @@ def run_job(job_execution):
|
||||
datetime.datetime.now()})
|
||||
client.run_job(job_execution, oozie_job_id)
|
||||
|
||||
return job_execution
|
||||
|
||||
|
||||
def upload_job_files(where, job_dir, job, hdfs_user):
|
||||
|
||||
|
228
sahara/service/ops.py
Normal file
228
sahara/service/ops.py
Normal file
@ -0,0 +1,228 @@
|
||||
# Copyright (c) 2014 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo import messaging
|
||||
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
from sahara.openstack.common import log as logging
|
||||
from sahara.plugins import base as plugin_base
|
||||
from sahara.service.edp import job_manager
|
||||
from sahara.service import trusts
|
||||
from sahara.utils import general as g
|
||||
from sahara.utils import rpc as rpc_utils
|
||||
|
||||
|
||||
conductor = c.API
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
INFRA = None
|
||||
|
||||
|
||||
def setup_ops(engine):
|
||||
global INFRA
|
||||
|
||||
INFRA = engine
|
||||
|
||||
|
||||
class LocalOps(object):
|
||||
def provision_cluster(self, cluster_id):
|
||||
context.spawn("cluster-creating-%s" % cluster_id,
|
||||
_provision_cluster, cluster_id)
|
||||
|
||||
def provision_scaled_cluster(self, cluster_id, node_group_id_map):
|
||||
context.spawn("cluster-scaling-%s" % cluster_id,
|
||||
_provision_scaled_cluster, cluster_id, node_group_id_map)
|
||||
|
||||
def terminate_cluster(self, cluster_id):
|
||||
context.spawn("cluster-terminating-%s" % cluster_id,
|
||||
_terminate_cluster, cluster_id)
|
||||
|
||||
def run_edp_job(self, job_execution_id):
|
||||
context.spawn("Starting Job Execution %s" % job_execution_id,
|
||||
_run_edp_job, job_execution_id)
|
||||
|
||||
|
||||
class RemoteOps(rpc_utils.RPCClient):
|
||||
def __init__(self):
|
||||
target = messaging.Target(topic='sahara-ops', version='1.0')
|
||||
super(RemoteOps, self).__init__(target)
|
||||
|
||||
def provision_cluster(self, cluster_id):
|
||||
self.cast('provision_cluster', cluster_id=cluster_id)
|
||||
|
||||
def provision_scaled_cluster(self, cluster_id, node_group_id_map):
|
||||
self.cast('provision_scaled_cluster', cluster_id=cluster_id,
|
||||
node_group_id_map=node_group_id_map)
|
||||
|
||||
def terminate_cluster(self, cluster_id):
|
||||
self.cast('terminate_cluster', cluster_id=cluster_id)
|
||||
|
||||
def run_edp_job(self, job_execution_id):
|
||||
self.cast('run_edp_job', job_execution_id=job_execution_id)
|
||||
|
||||
|
||||
class OpsServer(rpc_utils.RPCServer):
|
||||
def __init__(self):
|
||||
target = messaging.Target(topic='sahara-ops', server=uuid.uuid4(),
|
||||
version='1.0')
|
||||
super(OpsServer, self).__init__(target)
|
||||
|
||||
def provision_cluster(self, cluster_id):
|
||||
_provision_cluster(cluster_id)
|
||||
|
||||
def provision_scaled_cluster(self, cluster_id, node_group_id_map):
|
||||
_provision_scaled_cluster(cluster_id, node_group_id_map)
|
||||
|
||||
def terminate_cluster(self, cluster_id):
|
||||
_terminate_cluster(cluster_id)
|
||||
|
||||
def run_edp_job(self, job_execution_id):
|
||||
_run_edp_job(job_execution_id)
|
||||
|
||||
|
||||
def _prepare_provisioning(cluster_id):
|
||||
ctx = context.ctx()
|
||||
cluster = conductor.cluster_get(ctx, cluster_id)
|
||||
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
|
||||
for nodegroup in cluster.node_groups:
|
||||
conductor.node_group_update(
|
||||
ctx, nodegroup,
|
||||
{"image_username": INFRA.get_node_group_image_username(nodegroup)})
|
||||
|
||||
cluster = conductor.cluster_get(ctx, cluster_id)
|
||||
|
||||
return ctx, cluster, plugin
|
||||
|
||||
|
||||
def _provision_cluster(cluster_id):
|
||||
ctx, cluster, plugin = _prepare_provisioning(cluster_id)
|
||||
|
||||
if CONF.use_identity_api_v3 and cluster.is_transient:
|
||||
trusts.create_trust(cluster)
|
||||
|
||||
# updating cluster infra
|
||||
cluster = conductor.cluster_update(ctx, cluster,
|
||||
{"status": "InfraUpdating"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
plugin.update_infra(cluster)
|
||||
|
||||
# creating instances and configuring them
|
||||
cluster = conductor.cluster_get(ctx, cluster_id)
|
||||
INFRA.create_cluster(cluster)
|
||||
|
||||
# configure cluster
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Configuring"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
try:
|
||||
plugin.configure_cluster(cluster)
|
||||
except Exception as ex:
|
||||
LOG.exception("Can't configure cluster '%s' (reason: %s)",
|
||||
cluster.name, ex)
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Error"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
return
|
||||
|
||||
# starting prepared and configured cluster
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Starting"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
try:
|
||||
plugin.start_cluster(cluster)
|
||||
except Exception as ex:
|
||||
LOG.exception("Can't start services for cluster '%s' (reason: %s)",
|
||||
cluster.name, ex)
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Error"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
return
|
||||
|
||||
# cluster is now up and ready
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Active"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
|
||||
# schedule execution pending job for cluster
|
||||
for je in conductor.job_execution_get_all(ctx, cluster_id=cluster.id):
|
||||
job_manager.run_job(je.id)
|
||||
|
||||
|
||||
def _provision_scaled_cluster(cluster_id, node_group_id_map):
|
||||
ctx, cluster, plugin = _prepare_provisioning(cluster_id)
|
||||
|
||||
# Decommissioning surplus nodes with the plugin
|
||||
|
||||
cluster = conductor.cluster_update(ctx, cluster,
|
||||
{"status": "Decommissioning"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
|
||||
instances_to_delete = []
|
||||
|
||||
for node_group in cluster.node_groups:
|
||||
new_count = node_group_id_map[node_group.id]
|
||||
if new_count < node_group.count:
|
||||
instances_to_delete += node_group.instances[new_count:
|
||||
node_group.count]
|
||||
|
||||
if instances_to_delete:
|
||||
plugin.decommission_nodes(cluster, instances_to_delete)
|
||||
|
||||
# Scaling infrastructure
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Scaling"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
|
||||
instances = INFRA.scale_cluster(cluster, node_group_id_map)
|
||||
|
||||
# Setting up new nodes with the plugin
|
||||
|
||||
if instances:
|
||||
cluster = conductor.cluster_update(ctx, cluster,
|
||||
{"status": "Configuring"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
try:
|
||||
instances = g.get_instances(cluster, instances)
|
||||
plugin.scale_cluster(cluster, instances)
|
||||
except Exception as ex:
|
||||
LOG.exception("Can't scale cluster '%s' (reason: %s)",
|
||||
cluster.name, ex)
|
||||
cluster = conductor.cluster_update(ctx, cluster,
|
||||
{"status": "Error"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
return
|
||||
|
||||
cluster = conductor.cluster_update(ctx, cluster, {"status": "Active"})
|
||||
LOG.info(g.format_cluster_status(cluster))
|
||||
|
||||
|
||||
def _terminate_cluster(cluster_id):
|
||||
ctx = context.ctx()
|
||||
cluster = conductor.cluster_get(ctx, cluster_id)
|
||||
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
|
||||
plugin.on_terminate_cluster(cluster)
|
||||
|
||||
INFRA.shutdown_cluster(cluster)
|
||||
|
||||
if CONF.use_identity_api_v3:
|
||||
trusts.delete_trust(cluster)
|
||||
|
||||
conductor.cluster_destroy(ctx, cluster)
|
||||
|
||||
|
||||
def _run_edp_job(job_execution_id):
|
||||
job_manager.run_job(job_execution_id)
|
@ -21,6 +21,7 @@ import uuid
|
||||
|
||||
from neutronclient.v2_0 import client as neutron_client
|
||||
from novaclient.v1_1 import client as nova_client
|
||||
from saharaclient.api import base as client_base
|
||||
import saharaclient.client as sahara_client
|
||||
from swiftclient import client as swift_client
|
||||
import testtools
|
||||
@ -536,6 +537,23 @@ class ITestCase(testtools.TestCase, testtools.testcase.WithAttributes,
|
||||
if not self.common_config.RETAIN_CLUSTER_AFTER_TEST:
|
||||
if cluster_id:
|
||||
self.sahara.clusters.delete(cluster_id)
|
||||
|
||||
# waiting roughly for 300 seconds for cluster to terminate
|
||||
attempts = 60
|
||||
while attempts > 0:
|
||||
try:
|
||||
self.sahara.clusters.get(cluster_id)
|
||||
except client_base.APIException:
|
||||
# Cluster is finally deleted
|
||||
break
|
||||
|
||||
attempts -= 1
|
||||
time.sleep(5)
|
||||
|
||||
if attempts == 0:
|
||||
self.fail('Cluster failed to terminate in 300 seconds: '
|
||||
'%s' % cluster_id)
|
||||
|
||||
if cluster_template_id:
|
||||
self.sahara.cluster_templates.delete(cluster_template_id)
|
||||
if node_group_template_id_list:
|
||||
|
@ -55,6 +55,9 @@ def eventlet_import_monkey_patched(module):
|
||||
def patch_minidom_writexml():
|
||||
"""Patch for xml.dom.minidom toprettyxml bug with whitespaces around text
|
||||
|
||||
We apply the patch to avoid excess whitespaces in generated xml
|
||||
configuration files that brakes Hadoop.
|
||||
|
||||
(This patch will be applied for all Python versions < 2.7.3)
|
||||
|
||||
Issue: http://bugs.python.org/issue4147
|
||||
|
@ -1,4 +1,5 @@
|
||||
# Copyright (c) 2013 Mirantis Inc.
|
||||
# Copyright (c) 2013 Julien Danjou <julien@danjou.info>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -13,27 +14,64 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import functools
|
||||
from oslo.config import cfg
|
||||
from oslo import messaging
|
||||
|
||||
from sahara.openstack.common.rpc import common as rpc_common
|
||||
from sahara import context
|
||||
from sahara.openstack.common import log as logging
|
||||
|
||||
|
||||
class RpcExceptionPassthrough(object):
|
||||
"""Class to wrap another and translate the ClientExceptions raised by its
|
||||
function calls to the actual ones.
|
||||
"""
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RPCClient(object):
|
||||
def __init__(self, target):
|
||||
self._target = target
|
||||
self.__client = messaging.RPCClient(
|
||||
target=target,
|
||||
transport=messaging.get_transport(cfg.CONF),
|
||||
)
|
||||
|
||||
def cast(self, name, **kwargs):
|
||||
ctx = context.current()
|
||||
self.__client.cast(ctx.to_dict(), name, **kwargs)
|
||||
|
||||
def call(self, name, **kwargs):
|
||||
ctx = context.current()
|
||||
return self.__client.call(ctx.to_dict(), name, **kwargs)
|
||||
|
||||
|
||||
class RPCServer(object):
|
||||
def __init__(self, target):
|
||||
self.__server = messaging.get_rpc_server(
|
||||
target=target,
|
||||
transport=messaging.get_transport(cfg.CONF),
|
||||
endpoints=[ContextEndpointHandler(self)],
|
||||
executor='eventlet'
|
||||
)
|
||||
|
||||
def start(self):
|
||||
self.__server.start()
|
||||
self.__server.wait()
|
||||
|
||||
|
||||
class ContextEndpointHandler(object):
|
||||
def __init__(self, endpoint):
|
||||
self.__endpoint = endpoint
|
||||
|
||||
def __getattr__(self, name):
|
||||
func = getattr(self._target, name)
|
||||
try:
|
||||
method = getattr(self.__endpoint, name)
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except rpc_common.ClientException as e:
|
||||
raise (e._exc_info[1], None, e._exc_info[2])
|
||||
def run_method(ctx, **kwargs):
|
||||
context.set_ctx(context.Context(**ctx))
|
||||
try:
|
||||
return method(**kwargs)
|
||||
finally:
|
||||
context.set_ctx(None)
|
||||
|
||||
return wrapper
|
||||
return run_method
|
||||
except AttributeError:
|
||||
LOG.error("No %(method)s method found implemented in "
|
||||
"%(class)s class",
|
||||
{'method': name, 'class': self.__endpoint})
|
||||
|
@ -29,8 +29,9 @@ data_files =
|
||||
|
||||
[entry_points]
|
||||
console_scripts =
|
||||
sahara-all = sahara.cli.sahara_all:main
|
||||
sahara-api = sahara.cli.sahara_api:main
|
||||
sahara-all = sahara.cli.sahara_api:main
|
||||
sahara-engine = sahara.cli.sahara_engine:main
|
||||
sahara-db-manage = sahara.db.migration.cli:main
|
||||
_sahara-subprocess = sahara.cli.sahara_subprocess:main
|
||||
|
||||
@ -46,6 +47,10 @@ sahara.infrastructure.engine =
|
||||
sahara.remote =
|
||||
ssh = sahara.utils.ssh_remote:SshRemoteDriver
|
||||
|
||||
sahara.run.mode =
|
||||
all-in-one = sahara.service.ops:LocalOps
|
||||
distributed = sahara.service.ops:RemoteOps
|
||||
|
||||
[build_sphinx]
|
||||
all_files = 1
|
||||
build-dir = doc/build
|
||||
|
Loading…
Reference in New Issue
Block a user