Merge "Trust usage improvements in sahara"

This commit is contained in:
Jenkins 2016-01-21 21:42:10 +00:00 committed by Gerrit Code Review
commit dcd1c211d7
5 changed files with 103 additions and 122 deletions

View File

@ -153,6 +153,12 @@ class OpsServer(rpc_utils.RPCServer):
return INFRA.get_type_and_version()
def _setup_trust_for_cluster(cluster):
cluster = conductor.cluster_get(context.ctx(), cluster)
trusts.create_trust_for_cluster(cluster)
trusts.use_os_admin_auth_token(cluster)
def ops_error_handler(description):
def decorator(f):
@functools.wraps(f)
@ -207,6 +213,7 @@ def ops_error_handler(description):
def _rollback_cluster(cluster, reason):
_setup_trust_for_cluster(cluster)
context.set_step_type(_("Engine: rollback cluster"))
return INFRA.rollback_cluster(cluster, reason)
@ -222,10 +229,7 @@ def _prepare_provisioning(cluster_id):
nodegroup)
conductor.node_group_update(ctx, nodegroup, update_dict)
if CONF.use_identity_api_v3:
trusts.create_trust_for_cluster(cluster,
expires=not cluster.is_transient)
trusts.use_os_admin_auth_token(cluster)
_setup_trust_for_cluster(cluster)
cluster = conductor.cluster_get(ctx, cluster_id)
@ -246,46 +250,41 @@ def _update_sahara_info(ctx, cluster):
def _provision_cluster(cluster_id):
ctx, cluster, plugin = _prepare_provisioning(cluster_id)
try:
cluster = _update_sahara_info(ctx, cluster)
cluster = _update_sahara_info(ctx, cluster)
# updating cluster infra
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_INFRAUPDATING)
plugin.update_infra(cluster)
# updating cluster infra
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_INFRAUPDATING)
plugin.update_infra(cluster)
# creating instances and configuring them
cluster = conductor.cluster_get(ctx, cluster_id)
context.set_step_type(_("Engine: create cluster"))
INFRA.create_cluster(cluster)
# creating instances and configuring them
cluster = conductor.cluster_get(ctx, cluster_id)
context.set_step_type(_("Engine: create cluster"))
INFRA.create_cluster(cluster)
# configure cluster
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
shares.mount_shares(cluster)
# configure cluster
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
shares.mount_shares(cluster)
context.set_step_type(_("Plugin: configure cluster"))
plugin.configure_cluster(cluster)
context.set_step_type(_("Plugin: configure cluster"))
plugin.configure_cluster(cluster)
# starting prepared and configured cluster
ntp_service.configure_ntp(cluster_id)
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_STARTING)
# starting prepared and configured cluster
ntp_service.configure_ntp(cluster_id)
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_STARTING)
context.set_step_type(_("Plugin: start cluster"))
plugin.start_cluster(cluster)
context.set_step_type(_("Plugin: start cluster"))
plugin.start_cluster(cluster)
# cluster is now up and ready
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ACTIVE)
# cluster is now up and ready
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_ACTIVE)
# schedule execution pending job for cluster
for je in conductor.job_execution_get_all(ctx, cluster_id=cluster.id):
job_manager.run_job(je.id)
finally:
if CONF.use_identity_api_v3 and not cluster.is_transient:
trusts.delete_trust_from_cluster(cluster)
# schedule execution pending job for cluster
for je in conductor.job_execution_get_all(ctx, cluster_id=cluster.id):
job_manager.run_job(je.id)
@ops_error_handler(
@ -293,61 +292,57 @@ def _provision_cluster(cluster_id):
def _provision_scaled_cluster(cluster_id, node_group_id_map):
ctx, cluster, plugin = _prepare_provisioning(cluster_id)
try:
# Decommissioning surplus nodes with the plugin
# Decommissioning surplus nodes with the plugin
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_DECOMMISSIONING)
instances_to_delete = []
for node_group in cluster.node_groups:
new_count = node_group_id_map[node_group.id]
if new_count < node_group.count:
instances_to_delete += node_group.instances[new_count:
node_group.count]
if instances_to_delete:
context.set_step_type(_("Plugin: decommission cluster"))
plugin.decommission_nodes(cluster, instances_to_delete)
# Scaling infrastructure
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_SCALING)
context.set_step_type(_("Engine: scale cluster"))
instance_ids = INFRA.scale_cluster(cluster, node_group_id_map)
# Setting up new nodes with the plugin
if instance_ids:
ntp_service.configure_ntp(cluster_id)
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_DECOMMISSIONING)
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
instances = c_u.get_instances(cluster, instance_ids)
context.set_step_type(_("Plugin: scale cluster"))
plugin.scale_cluster(cluster, instances)
instances_to_delete = []
for node_group in cluster.node_groups:
new_count = node_group_id_map[node_group.id]
if new_count < node_group.count:
instances_to_delete += node_group.instances[new_count:
node_group.count]
if instances_to_delete:
context.set_step_type(_("Plugin: decommission cluster"))
plugin.decommission_nodes(cluster, instances_to_delete)
# Scaling infrastructure
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_SCALING)
context.set_step_type(_("Engine: scale cluster"))
instance_ids = INFRA.scale_cluster(cluster, node_group_id_map)
# Setting up new nodes with the plugin
if instance_ids:
ntp_service.configure_ntp(cluster_id)
cluster = c_u.change_cluster_status(
cluster, c_u.CLUSTER_STATUS_CONFIGURING)
instances = c_u.get_instances(cluster, instance_ids)
context.set_step_type(_("Plugin: scale cluster"))
plugin.scale_cluster(cluster, instances)
c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE)
finally:
if CONF.use_identity_api_v3 and not cluster.is_transient:
trusts.delete_trust_from_cluster(cluster)
c_u.change_cluster_status(cluster, c_u.CLUSTER_STATUS_ACTIVE)
@ops_error_handler(
_("Terminating cluster failed for the following reason(s): {reason}"))
def terminate_cluster(cluster_id):
ctx = context.ctx()
_setup_trust_for_cluster(cluster_id)
job_manager.update_job_statuses(cluster_id=cluster_id)
cluster = conductor.cluster_get(ctx, cluster_id)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
context.set_step_type(_("Plugin: shutdown cluster"))
plugin.on_terminate_cluster(cluster)
context.set_step_type(_("Engine: shutdown cluster"))
INFRA.shutdown_cluster(cluster)
if CONF.use_identity_api_v3:
trusts.delete_trust_from_cluster(cluster)
trusts.delete_trust_from_cluster(cluster)
conductor.cluster_destroy(ctx, cluster)

View File

@ -13,12 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils as json
from oslo_utils import timeutils
import six
from sahara import conductor as c
@ -33,14 +30,8 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def _get_expiry():
'''Get the time at which a trust should expire based on configuration'''
return timeutils.utcnow() + datetime.timedelta(
hours=CONF.cluster_operation_trust_expiration_hours)
def create_trust(trustor, trustee, role_names, impersonation=True,
project_id=None, expires=True):
project_id=None, allow_redelegation=False):
'''Create a trust and return it's identifier
:param trustor: The user delegating the trust, this is an auth plugin.
@ -55,7 +46,8 @@ def create_trust(trustor, trustee, role_names, impersonation=True,
:param project_id: The project that the trust will be scoped into,
default is the trustor's project id.
:param expires: The trust will expire if this is set to True.
:param allow_redelegation: Allow redelegation parameter for cluster
trusts.
:returns: A valid trust id.
@ -65,7 +57,6 @@ def create_trust(trustor, trustee, role_names, impersonation=True,
if project_id is None:
project_id = keystone.project_id_from_auth(trustor)
try:
expires_at = _get_expiry() if expires else None
trustor_user_id = keystone.user_id_from_auth(trustor)
trustee_user_id = keystone.user_id_from_auth(trustee)
client = keystone.client_from_auth(trustor)
@ -74,7 +65,7 @@ def create_trust(trustor, trustee, role_names, impersonation=True,
impersonation=impersonation,
role_names=role_names,
project=project_id,
expires_at=expires_at)
allow_redelegation=allow_redelegation)
LOG.debug('Created trust {trust_id}'.format(
trust_id=six.text_type(trust.id)))
return trust.id
@ -94,18 +85,20 @@ def create_trust_for_cluster(cluster, expires=True):
:param expires: The trust will expire if this is set to True.
'''
ctx = context.current()
trustor = keystone.auth()
trustee = keystone.auth_for_admin(
project_name=CONF.keystone_authtoken.admin_tenant_name)
cluster = conductor.cluster_get(ctx, cluster)
if CONF.use_identity_api_v3 and not cluster.trust_id:
trustor = keystone.auth()
trustee = keystone.auth_for_admin(
project_name=CONF.keystone_authtoken.admin_tenant_name)
trust_id = create_trust(trustor=trustor,
trustee=trustee,
role_names=ctx.roles,
expires=expires)
trust_id = create_trust(trustor=trustor,
trustee=trustee,
role_names=ctx.roles,
allow_redelegation=True)
conductor.cluster_update(ctx,
cluster,
{'trust_id': trust_id})
conductor.cluster_update(ctx,
cluster,
{'trust_id': trust_id})
def delete_trust(trustee, trust_id):
@ -139,10 +132,11 @@ def delete_trust_from_cluster(cluster):
:param cluster: The cluster to delete the trust from.
'''
if cluster.trust_id:
ctx = context.current()
cluster = conductor.cluster_get(ctx, cluster)
if CONF.use_identity_api_v3 and cluster.trust_id:
keystone_auth = keystone.auth_for_admin(trust_id=cluster.trust_id)
delete_trust(keystone_auth, cluster.trust_id)
ctx = context.current()
conductor.cluster_update(ctx,
cluster,
{'trust_id': None})
@ -159,8 +153,9 @@ def use_os_admin_auth_token(cluster):
:param cluster: The cluster to use for tenant and trust identification.
'''
if cluster.trust_id:
ctx = context.current()
ctx = context.current()
cluster = conductor.cluster_get(ctx, cluster)
if CONF.use_identity_api_v3 and cluster.trust_id:
ctx.username = CONF.keystone_authtoken.admin_user
ctx.tenant_id = cluster.tenant_id
ctx.auth_plugin = keystone.auth_for_admin(

View File

@ -118,10 +118,11 @@ class TestOPS(base.SaharaWithDbTestCase):
'plugin.scale_cluster'], self.SEQUENCE,
'Order of calls is wrong')
@mock.patch('sahara.service.ops._setup_trust_for_cluster')
@mock.patch('sahara.service.ops.CONF')
@mock.patch('sahara.service.trusts.delete_trust_from_cluster')
@mock.patch('sahara.context.ctx')
def test_terminate_cluster(self, p_ctx, p_delete_trust, p_conf):
def test_terminate_cluster(self, p_ctx, p_delete_trust, p_conf, p_set):
del self.SEQUENCE[:]
base_plugins.PLUGINS = FakePlugin()
base_plugins.PLUGINS.get_plugin.return_value = FakePlugin()

View File

@ -46,14 +46,14 @@ class TestTrusts(base.SaharaTestCase):
client = self._client()
client_from_auth.return_value = client
trust_id = trusts.create_trust(trustor, trustee,
"role_names", expires=True)
"role_names")
client.trusts.create.assert_called_with(
trustor_user="trustor_id",
trustee_user="trustee_id",
impersonation=True,
role_names="role_names",
project="tenant_id",
expires_at=mock.ANY
allow_redelegation=False,
)
self.assertEqual("trust_id", trust_id)
@ -61,25 +61,27 @@ class TestTrusts(base.SaharaTestCase):
client = self._client()
client_from_auth.return_value = client
trust_id = trusts.create_trust(trustor, trustee, "role_names",
project_id='injected_project',
expires=False)
project_id='injected_project')
client.trusts.create.assert_called_with(trustor_user="trustor_id",
trustee_user="trustee_id",
impersonation=True,
role_names="role_names",
project="injected_project",
expires_at=None)
allow_redelegation=False)
self.assertEqual("trust_id", trust_id)
@mock.patch('sahara.conductor.API.cluster_get')
@mock.patch('sahara.conductor.API.cluster_update')
@mock.patch('sahara.service.trusts.create_trust')
@mock.patch('sahara.utils.openstack.keystone.auth_for_admin')
@mock.patch('sahara.context.current')
def test_create_trust_for_cluster(self, context_current, auth_for_admin,
create_trust, cluster_update):
create_trust, cluster_update, cl_get):
self.override_config('admin_tenant_name', 'admin_project',
group='keystone_authtoken')
trustor_auth = mock.Mock()
fake_cluster = mock.Mock(trust_id=None)
cl_get.return_value = fake_cluster
ctx = mock.Mock(roles="role_names", auth_plugin=trustor_auth)
context_current.return_value = ctx
trustee_auth = mock.Mock()
@ -92,7 +94,7 @@ class TestTrusts(base.SaharaTestCase):
create_trust.assert_called_with(trustor=trustor_auth,
trustee=trustee_auth,
role_names='role_names',
expires=True)
allow_redelegation=True)
cluster_update.assert_called_with(ctx, "cluster",
cluster_update.assert_called_with(ctx, fake_cluster,
{"trust_id": "trust_id"})

View File

@ -32,18 +32,6 @@ opts = [
'If that flag is disabled, '
'per-job clusters will not be terminated '
'automatically.'),
cfg.IntOpt('cluster_operation_trust_expiration_hours',
default=24,
help='Defines the period of time (in hours) after which trusts '
'created to allow sahara to create or scale a cluster '
'will expire. Note that this value should be '
'significantly larger than the value of the '
'cleanup_time_for_incomplete_clusters configuration key '
'if use of the cluster cleanup feature is desired (the '
'trust must last at least as long as a cluster could '
'validly take to stall in its creation, plus the '
'timeout value set in that key, plus one hour for the '
'period of the cleanup job).'),
# TODO(mimccune) The following should be integrated into a custom
# auth section
cfg.StrOpt('admin_user_domain_name',