Trusts for longrunning tasks
Create trusts from current user to savanna user when cluster is created, auth savanna user with trust, use savanna user with trust for nova and cinder, when cluster is deleted Implements: blueprint trust-for-scaling-and-edp-operation Change-Id: I558acc01f2663c11879846a51edbf8043ec42676
This commit is contained in:
parent
12b470ab2b
commit
f993d6d333
@ -38,6 +38,7 @@ class Cluster(object):
|
|||||||
name
|
name
|
||||||
description
|
description
|
||||||
tenant_id
|
tenant_id
|
||||||
|
trust_id
|
||||||
is_transient
|
is_transient
|
||||||
plugin_name
|
plugin_name
|
||||||
hadoop_version
|
hadoop_version
|
||||||
|
@ -48,6 +48,7 @@ class Cluster(mb.SavannaBase):
|
|||||||
name = sa.Column(sa.String(80), nullable=False)
|
name = sa.Column(sa.String(80), nullable=False)
|
||||||
description = sa.Column(sa.Text)
|
description = sa.Column(sa.Text)
|
||||||
tenant_id = sa.Column(sa.String(36))
|
tenant_id = sa.Column(sa.String(36))
|
||||||
|
trust_id = sa.Column(sa.String(36))
|
||||||
is_transient = sa.Column(sa.Boolean, default=False)
|
is_transient = sa.Column(sa.Boolean, default=False)
|
||||||
plugin_name = sa.Column(sa.String(80), nullable=False)
|
plugin_name = sa.Column(sa.String(80), nullable=False)
|
||||||
hadoop_version = sa.Column(sa.String(80), nullable=False)
|
hadoop_version = sa.Column(sa.String(80), nullable=False)
|
||||||
|
@ -24,6 +24,7 @@ from savanna.plugins import base as plugin_base
|
|||||||
from savanna.plugins import provisioning
|
from savanna.plugins import provisioning
|
||||||
from savanna.service.edp import job_manager as jm
|
from savanna.service.edp import job_manager as jm
|
||||||
from savanna.service import instances as i
|
from savanna.service import instances as i
|
||||||
|
from savanna.service import trusts
|
||||||
from savanna.utils import general as g
|
from savanna.utils import general as g
|
||||||
from savanna.utils.openstack import nova
|
from savanna.utils.openstack import nova
|
||||||
|
|
||||||
@ -102,6 +103,8 @@ def create_cluster(values):
|
|||||||
|
|
||||||
context.spawn("cluster-creating-%s" % cluster.id,
|
context.spawn("cluster-creating-%s" % cluster.id,
|
||||||
_provision_cluster, cluster.id)
|
_provision_cluster, cluster.id)
|
||||||
|
if CONF.use_identity_api_v3 and cluster.is_transient:
|
||||||
|
trusts.create_trust(cluster)
|
||||||
|
|
||||||
return conductor.cluster_get(ctx, cluster.id)
|
return conductor.cluster_get(ctx, cluster.id)
|
||||||
|
|
||||||
@ -169,8 +172,9 @@ def terminate_cluster(id):
|
|||||||
|
|
||||||
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||||
plugin.on_terminate_cluster(cluster)
|
plugin.on_terminate_cluster(cluster)
|
||||||
|
|
||||||
i.shutdown_cluster(cluster)
|
i.shutdown_cluster(cluster)
|
||||||
|
if CONF.use_identity_api_v3:
|
||||||
|
trusts.delete_trust(cluster)
|
||||||
conductor.cluster_destroy(ctx, cluster)
|
conductor.cluster_destroy(ctx, cluster)
|
||||||
|
|
||||||
|
|
||||||
|
@ -22,7 +22,9 @@ from savanna import context
|
|||||||
from savanna.openstack.common import log
|
from savanna.openstack.common import log
|
||||||
from savanna.openstack.common import periodic_task
|
from savanna.openstack.common import periodic_task
|
||||||
from savanna.openstack.common import threadgroup
|
from savanna.openstack.common import threadgroup
|
||||||
|
from savanna.service import api
|
||||||
from savanna.service.edp import job_manager
|
from savanna.service.edp import job_manager
|
||||||
|
from savanna.service import trusts
|
||||||
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
@ -61,6 +63,9 @@ class SavannaPeriodicTasks(periodic_task.PeriodicTasks):
|
|||||||
def terminate_unneeded_clusters(self, ctx):
|
def terminate_unneeded_clusters(self, ctx):
|
||||||
LOG.debug('Terminating unneeded clusters')
|
LOG.debug('Terminating unneeded clusters')
|
||||||
ctx = context.get_admin_context()
|
ctx = context.get_admin_context()
|
||||||
|
if CONF.use_identity_api_v3:
|
||||||
|
LOG.debug('Terminating unneeded clusters')
|
||||||
|
context.set_ctx(ctx)
|
||||||
for cluster in conductor.cluster_get_all(ctx, status='Active'):
|
for cluster in conductor.cluster_get_all(ctx, status='Active'):
|
||||||
if not cluster.is_transient:
|
if not cluster.is_transient:
|
||||||
continue
|
continue
|
||||||
@ -71,10 +76,17 @@ class SavannaPeriodicTasks(periodic_task.PeriodicTasks):
|
|||||||
|
|
||||||
if jc > 0:
|
if jc > 0:
|
||||||
continue
|
continue
|
||||||
|
if CONF.use_identity_api_v3:
|
||||||
#TODO(akuznetsov) setup trust token and shutdown cluster
|
trusts.use_os_admin_auth_token(cluster)
|
||||||
LOG.debug('Terminated cluster %s with id %s' %
|
api.terminate_cluster(cluster.id)
|
||||||
(cluster.name, cluster.id))
|
LOG.debug('Terminated cluster %s with id %s' %
|
||||||
|
(cluster.name, cluster.id))
|
||||||
|
else:
|
||||||
|
if cluster.status != 'AwaitingTermination':
|
||||||
|
conductor.cluster_update(
|
||||||
|
ctx,
|
||||||
|
cluster,
|
||||||
|
{'status': 'AwaitingTermination'})
|
||||||
context.set_ctx(None)
|
context.set_ctx(None)
|
||||||
|
|
||||||
|
|
||||||
|
65
savanna/service/trusts.py
Normal file
65
savanna/service/trusts.py
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
# Copyright (c) 2013 Mirantis Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
from savanna import conductor as c
|
||||||
|
from savanna import context
|
||||||
|
from savanna.utils.openstack import keystone
|
||||||
|
|
||||||
|
conductor = c.API
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
|
def create_trust(cluster):
|
||||||
|
client = keystone.client()
|
||||||
|
|
||||||
|
trustee_id = keystone.client_for_trusts(
|
||||||
|
CONF.os_admin_username,
|
||||||
|
CONF.os_admin_password,
|
||||||
|
None).user_id
|
||||||
|
|
||||||
|
trust = client.trusts.create(trustor_user=client.user_id,
|
||||||
|
trustee_user=trustee_id,
|
||||||
|
impersonation=True,
|
||||||
|
project=client.tenant_id)
|
||||||
|
conductor.cluster_update(context.current(),
|
||||||
|
cluster,
|
||||||
|
{'trust_id': trust.id})
|
||||||
|
|
||||||
|
|
||||||
|
def use_os_admin_auth_token(cluster):
|
||||||
|
if cluster.trust_id:
|
||||||
|
ctx = context.current()
|
||||||
|
ctx.username = CONF.os_admin_username
|
||||||
|
ctx.tenant_id = cluster.tenant_id
|
||||||
|
client = keystone.client_for_trusts(
|
||||||
|
CONF.os_admin_username,
|
||||||
|
CONF.os_admin_password,
|
||||||
|
cluster.trust_id)
|
||||||
|
ctx.auth_token = client.auth_token
|
||||||
|
ctx.service_catalog = json.dumps(
|
||||||
|
client.service_catalog.catalog['catalog'])
|
||||||
|
|
||||||
|
|
||||||
|
def delete_trust(cluster):
|
||||||
|
if cluster.trust_id:
|
||||||
|
keystone_client = keystone.client_for_trusts(
|
||||||
|
CONF.os_admin_username,
|
||||||
|
CONF.os_admin_password,
|
||||||
|
cluster.trust_id)
|
||||||
|
keystone_client.trusts.delete(cluster.trust_id)
|
@ -16,12 +16,16 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
from savanna import context
|
from savanna import context
|
||||||
import savanna.service.periodic as p
|
import savanna.service.periodic as p
|
||||||
import savanna.tests.unit.conductor.base as test_base
|
import savanna.tests.unit.conductor.base as test_base
|
||||||
from savanna.tests.unit.conductor.manager import test_clusters as tc
|
from savanna.tests.unit.conductor.manager import test_clusters as tc
|
||||||
from savanna.tests.unit.conductor.manager import test_edp as te
|
from savanna.tests.unit.conductor.manager import test_edp as te
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
class TestPeriodicBack(test_base.ConductorManagerTestCase):
|
class TestPeriodicBack(test_base.ConductorManagerTestCase):
|
||||||
@mock.patch('savanna.service.edp.job_manager.get_job_status')
|
@mock.patch('savanna.service.edp.job_manager.get_job_status')
|
||||||
@ -43,7 +47,9 @@ class TestPeriodicBack(test_base.ConductorManagerTestCase):
|
|||||||
mock.call(u'3')])
|
mock.call(u'3')])
|
||||||
|
|
||||||
@mock.patch('savanna.service.edp.job_manager.get_job_status')
|
@mock.patch('savanna.service.edp.job_manager.get_job_status')
|
||||||
def test_cluster_terminate(self, get_job_status):
|
@mock.patch('savanna.service.api.terminate_cluster')
|
||||||
|
def test_cluster_terminate(self, terminate_cluster, get_job_status):
|
||||||
|
CONF.use_identity_api_v3 = True
|
||||||
ctx = context.ctx()
|
ctx = context.ctx()
|
||||||
job = self.api.job_create(ctx, te.SAMPLE_JOB)
|
job = self.api.job_create(ctx, te.SAMPLE_JOB)
|
||||||
ds = self.api.data_source_create(ctx, te.SAMPLE_DATA_SOURCE)
|
ds = self.api.data_source_create(ctx, te.SAMPLE_DATA_SOURCE)
|
||||||
@ -68,9 +74,9 @@ class TestPeriodicBack(test_base.ConductorManagerTestCase):
|
|||||||
"cluster_id": "2"},
|
"cluster_id": "2"},
|
||||||
job, ds, ds)
|
job, ds, ds)
|
||||||
p.SavannaPeriodicTasks().terminate_unneeded_clusters(None)
|
p.SavannaPeriodicTasks().terminate_unneeded_clusters(None)
|
||||||
|
self.assertEqual(1, len(terminate_cluster.call_args_list))
|
||||||
#TODO(akuznetsov) check call of cluster termination,
|
terminated_cluster_id = terminate_cluster.call_args_list[0][0][0]
|
||||||
# it will be added with trust patch
|
self.assertEqual('1', terminated_cluster_id)
|
||||||
|
|
||||||
def _create_job_execution(self, values, job, input, output):
|
def _create_job_execution(self, values, job, input, output):
|
||||||
values.update({"job_id": job['id'],
|
values.update({"job_id": job['id'],
|
||||||
|
@ -15,6 +15,10 @@
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
def url_for(service_catalog, service_type, admin=False, endpoint_type=None):
|
def url_for(service_catalog, service_type, admin=False, endpoint_type=None):
|
||||||
if not endpoint_type:
|
if not endpoint_type:
|
||||||
@ -25,7 +29,12 @@ def url_for(service_catalog, service_type, admin=False, endpoint_type=None):
|
|||||||
service = _get_service_from_catalog(service_catalog, service_type)
|
service = _get_service_from_catalog(service_catalog, service_type)
|
||||||
|
|
||||||
if service:
|
if service:
|
||||||
return _get_case_insensitive(service['endpoints'][0], endpoint_type)
|
if service_type == 'identity':
|
||||||
|
endpoint = _get_identity_endpoint(service)
|
||||||
|
else:
|
||||||
|
endpoint = service['endpoints'][0]
|
||||||
|
return _get_case_insensitive(endpoint,
|
||||||
|
endpoint_type)
|
||||||
else:
|
else:
|
||||||
raise Exception('Service "%s" not found' % service_type)
|
raise Exception('Service "%s" not found' % service_type)
|
||||||
|
|
||||||
@ -47,3 +56,16 @@ def _get_case_insensitive(dictionary, key):
|
|||||||
|
|
||||||
#this will raise an exception as usual if key was not found
|
#this will raise an exception as usual if key was not found
|
||||||
return dictionary[key]
|
return dictionary[key]
|
||||||
|
|
||||||
|
|
||||||
|
def _get_identity_endpoint(service):
|
||||||
|
api_substr = 'v2.0'
|
||||||
|
if CONF.use_identity_api_v3:
|
||||||
|
api_substr = 'v3'
|
||||||
|
|
||||||
|
for endpoint in service['endpoints']:
|
||||||
|
if api_substr in endpoint:
|
||||||
|
return endpoint
|
||||||
|
|
||||||
|
raise Exception('Api %s endpoint not found in service %s'
|
||||||
|
% (api_substr, service['type']))
|
||||||
|
@ -13,21 +13,57 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
from keystoneclient.v2_0 import client as keystone_client
|
from keystoneclient.v2_0 import client as keystone_client
|
||||||
|
from keystoneclient.v3 import client as keystone_client_v3
|
||||||
|
|
||||||
from savanna import context
|
from savanna import context
|
||||||
from savanna.utils.openstack import base
|
from savanna.utils.openstack import base
|
||||||
|
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
opts = [
|
||||||
|
cfg.BoolOpt('use_identity_api_v3',
|
||||||
|
default=False,
|
||||||
|
help='Enables Savanna to use Keystone API v3. '
|
||||||
|
'If that flag is disabled, '
|
||||||
|
'per-job clusters will not be terminated automatically.')
|
||||||
|
]
|
||||||
|
CONF.register_opts(opts)
|
||||||
|
|
||||||
|
|
||||||
def client():
|
def client():
|
||||||
ctx = context.current()
|
ctx = context.current()
|
||||||
identity_url = base.url_for(ctx.service_catalog, 'identity')
|
auth_url = base.url_for(ctx.service_catalog, 'identity')
|
||||||
|
|
||||||
keystone = keystone_client.Client(username=ctx.username,
|
if CONF.use_identity_api_v3:
|
||||||
user_id=ctx.user_id,
|
keystone = keystone_client_v3.Client(username=ctx.username,
|
||||||
token=ctx.token,
|
token=ctx.token,
|
||||||
tenant_name=ctx.tenant_name,
|
tenant_id=ctx.tenant_id,
|
||||||
tenant_id=ctx.tenant_id,
|
auth_url=auth_url)
|
||||||
auth_url=identity_url)
|
keystone.management_url = auth_url
|
||||||
|
else:
|
||||||
|
keystone = keystone_client.Client(username=ctx.username,
|
||||||
|
token=ctx.token,
|
||||||
|
tenant_id=ctx.tenant_id,
|
||||||
|
auth_url=auth_url)
|
||||||
|
|
||||||
return keystone
|
return keystone
|
||||||
|
|
||||||
|
|
||||||
|
def client_for_trusts(username, password, trust_id):
|
||||||
|
if not CONF.use_identity_api_v3:
|
||||||
|
raise Exception('Trusts aren\'t implemented in keystone api'
|
||||||
|
' less than v3')
|
||||||
|
|
||||||
|
ctx = context.current()
|
||||||
|
auth_url = base.url_for(ctx.service_catalog, 'identity')
|
||||||
|
keystone = keystone_client_v3.Client(username=username,
|
||||||
|
password=password,
|
||||||
|
tenant_id=ctx.tenant_id,
|
||||||
|
auth_url=auth_url,
|
||||||
|
trust_id=trust_id)
|
||||||
|
keystone.management_url = auth_url
|
||||||
|
return keystone
|
||||||
|
Loading…
Reference in New Issue
Block a user