Implementation of stopping/resetting env
Added implementation of environment resetting Implements: blueprint nailgun-reset-env Change-Id: Ib2c8d2f7f570ba4293f7930cf4837a7ec09e222b
This commit is contained in:
parent
9d8b3ba659
commit
94cfa00d5c
@ -52,7 +52,12 @@ def forbid_client_caching(handler):
|
||||
@decorator
|
||||
def content_json(func, *args, **kwargs):
|
||||
web.header('Content-Type', 'application/json')
|
||||
data = func(*args, **kwargs)
|
||||
try:
|
||||
data = func(*args, **kwargs)
|
||||
except web.HTTPError as http_error:
|
||||
if isinstance(http_error.data, (dict, list)):
|
||||
http_error.data = build_json_response(http_error.data)
|
||||
raise
|
||||
return build_json_response(data)
|
||||
|
||||
|
||||
|
@ -40,6 +40,8 @@ from nailgun.errors import errors
|
||||
from nailgun.logger import logger
|
||||
from nailgun.task.manager import ApplyChangesTaskManager
|
||||
from nailgun.task.manager import ClusterDeletionManager
|
||||
from nailgun.task.manager import ResetEnvironmentTaskManager
|
||||
from nailgun.task.manager import StopDeploymentTaskManager
|
||||
|
||||
|
||||
class ClusterHandler(BaseHandler):
|
||||
@ -266,6 +268,80 @@ class ClusterChangesHandler(BaseHandler):
|
||||
return TaskHandler.render(task)
|
||||
|
||||
|
||||
# TODO(enchantner): refactor these (DRY), maybe
|
||||
# by inheritance from base DeferredTaskHandler
|
||||
|
||||
|
||||
class ClusterStopDeploymentHandler(BaseHandler):
|
||||
|
||||
@content_json
|
||||
def PUT(self, cluster_id):
|
||||
""":returns: JSONized Task object.
|
||||
:http: * 202 (deployment stopping initiated)
|
||||
* 400 (can't stop deployment)
|
||||
* 404 (environment not found in db)
|
||||
"""
|
||||
cluster = self.get_object_or_404(Cluster, cluster_id)
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
u"Trying to stop deployment "
|
||||
u"on environment '{0}'".format(
|
||||
cluster_id
|
||||
)
|
||||
)
|
||||
task_manager = StopDeploymentTaskManager(
|
||||
cluster_id=cluster.id
|
||||
)
|
||||
task = task_manager.execute()
|
||||
except errors.StopAlreadyRunning as exc:
|
||||
err = web.conflict
|
||||
err.message = exc.message
|
||||
raise err
|
||||
except Exception as exc:
|
||||
logger.warn(u'Error during execution '
|
||||
u'deployment stopping task: {0}'.format(str(exc)))
|
||||
raise web.badrequest(str(exc))
|
||||
|
||||
raise web.webapi.HTTPError(
|
||||
status="202 Accepted",
|
||||
data=TaskHandler.render(task)
|
||||
)
|
||||
|
||||
|
||||
class ClusterResetHandler(BaseHandler):
|
||||
|
||||
@content_json
|
||||
def PUT(self, cluster_id):
|
||||
""":returns: JSONized Task object.
|
||||
:http: * 202 (environment reset initiated)
|
||||
* 400 (can't reset environment)
|
||||
* 404 (environment not found in db)
|
||||
"""
|
||||
cluster = self.get_object_or_404(Cluster, cluster_id)
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
u"Trying to reset environment '{0}'".format(
|
||||
cluster_id
|
||||
)
|
||||
)
|
||||
task_manager = ResetEnvironmentTaskManager(
|
||||
cluster_id=cluster.id
|
||||
)
|
||||
task = task_manager.execute()
|
||||
except Exception as exc:
|
||||
logger.warn(u'Error during execution '
|
||||
u'environment resetting '
|
||||
u'task: {0}'.format(str(exc)))
|
||||
raise web.badrequest(str(exc))
|
||||
|
||||
raise web.webapi.HTTPError(
|
||||
status="202 Accepted",
|
||||
data=TaskHandler.render(task)
|
||||
)
|
||||
|
||||
|
||||
class ClusterAttributesHandler(BaseHandler):
|
||||
"""Cluster attributes handler
|
||||
"""
|
||||
|
@ -25,6 +25,8 @@ from nailgun.api.handlers.cluster import ClusterChangesHandler
|
||||
from nailgun.api.handlers.cluster import ClusterCollectionHandler
|
||||
from nailgun.api.handlers.cluster import ClusterGeneratedData
|
||||
from nailgun.api.handlers.cluster import ClusterHandler
|
||||
from nailgun.api.handlers.cluster import ClusterResetHandler
|
||||
from nailgun.api.handlers.cluster import ClusterStopDeploymentHandler
|
||||
|
||||
from nailgun.api.handlers.disks import NodeDefaultsDisksHandler
|
||||
from nailgun.api.handlers.disks import NodeDisksHandler
|
||||
@ -120,6 +122,10 @@ urls = (
|
||||
ProvisionSelectedNodes,
|
||||
r'/clusters/(?P<cluster_id>\d+)/deploy/?$',
|
||||
DeploySelectedNodes,
|
||||
r'/clusters/(?P<cluster_id>\d+)/stop_deployment/?$',
|
||||
ClusterStopDeploymentHandler,
|
||||
r'/clusters/(?P<cluster_id>\d+)/reset/?$',
|
||||
ClusterResetHandler,
|
||||
|
||||
r'/nodes/?$',
|
||||
NodeCollectionHandler,
|
||||
|
@ -17,6 +17,74 @@ from sqlalchemy.dialects import postgresql
|
||||
from nailgun.db.sqlalchemy.models.fields import JSON
|
||||
|
||||
|
||||
old_cluster_status_options = (
|
||||
'new',
|
||||
'deployment',
|
||||
'operational',
|
||||
'error',
|
||||
'remove'
|
||||
)
|
||||
new_cluster_status_options = sorted(
|
||||
old_cluster_status_options + ('stopped',)
|
||||
)
|
||||
|
||||
old_task_names_options = (
|
||||
'super',
|
||||
'deploy',
|
||||
'deployment',
|
||||
'provision',
|
||||
'node_deletion',
|
||||
'cluster_deletion',
|
||||
'check_before_deployment',
|
||||
'check_networks',
|
||||
'verify_networks',
|
||||
'check_dhcp',
|
||||
'verify_network_connectivity',
|
||||
'redhat_setup',
|
||||
'redhat_check_credentials',
|
||||
'redhat_check_licenses',
|
||||
'redhat_download_release',
|
||||
'redhat_update_cobbler_profile',
|
||||
'dump',
|
||||
'capacity_log'
|
||||
)
|
||||
new_task_names_options = sorted(
|
||||
old_task_names_options + (
|
||||
'stop_deployment',
|
||||
'reset_environment'
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def upgrade_enum(table, column_name, enum_name, old_options, new_options):
|
||||
old_type = sa.Enum(*old_options, name=enum_name)
|
||||
new_type = sa.Enum(*new_options, name=enum_name)
|
||||
tmp_type = sa.Enum(*new_options, name="_" + enum_name)
|
||||
# Create a tempoary type, convert and drop the "old" type
|
||||
tmp_type.create(op.get_bind(), checkfirst=False)
|
||||
op.execute(
|
||||
u'ALTER TABLE {0} ALTER COLUMN {1} TYPE _{2}'
|
||||
u' USING {1}::text::_{2}'.format(
|
||||
table,
|
||||
column_name,
|
||||
enum_name
|
||||
)
|
||||
)
|
||||
old_type.drop(op.get_bind(), checkfirst=False)
|
||||
# Create and convert to the "new" type
|
||||
new_type.create(op.get_bind(), checkfirst=False)
|
||||
op.execute(
|
||||
u'ALTER TABLE {0} ALTER COLUMN {1} TYPE {2}'
|
||||
u' USING {1}::text::{2}'.format(
|
||||
table,
|
||||
column_name,
|
||||
enum_name
|
||||
)
|
||||
)
|
||||
tmp_type.drop(op.get_bind(), checkfirst=False)
|
||||
### end Alembic commands ###
|
||||
|
||||
|
||||
def upgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_table('global_parameters')
|
||||
@ -32,7 +100,25 @@ def upgrade():
|
||||
'network_groups',
|
||||
sa.Column('meta', JSON(), nullable=True)
|
||||
)
|
||||
### end Alembic commands ###
|
||||
|
||||
# CLUSTER STATUS ENUM UPGRADE
|
||||
upgrade_enum(
|
||||
"clusters", # table
|
||||
"status", # column
|
||||
"cluster_status", # ENUM name
|
||||
old_cluster_status_options, # old options
|
||||
new_cluster_status_options # new options
|
||||
)
|
||||
|
||||
# TASK NAME ENUM UPGRADE
|
||||
upgrade_enum(
|
||||
"tasks", # table
|
||||
"name", # column
|
||||
"task_name", # ENUM name
|
||||
old_task_names_options, # old options
|
||||
new_task_names_options # new options
|
||||
)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
@ -134,4 +220,21 @@ def downgrade():
|
||||
),
|
||||
sa.PrimaryKeyConstraint('id', name=u'plugins_pkey')
|
||||
)
|
||||
# CLUSTER STATUS ENUM DOWNGRADE
|
||||
upgrade_enum(
|
||||
"clusters", # table
|
||||
"status", # column
|
||||
"cluster_status", # ENUM name
|
||||
new_cluster_status_options, # old options
|
||||
old_cluster_status_options # new options
|
||||
)
|
||||
|
||||
# TASK NAME ENUM DOWNGRADE
|
||||
upgrade_enum(
|
||||
"tasks", # table
|
||||
"name", # column
|
||||
"task_name", # ENUM name
|
||||
new_task_names_options, # old options
|
||||
old_task_names_options # new options
|
||||
)
|
||||
### end Alembic commands ###
|
||||
|
@ -30,7 +30,9 @@ from sqlalchemy.orm import relationship, backref
|
||||
from nailgun.db import db
|
||||
from nailgun.db.sqlalchemy.models.base import Base
|
||||
from nailgun.db.sqlalchemy.models.fields import JSON
|
||||
from nailgun.db.sqlalchemy.models.node import Node
|
||||
from nailgun.db.sqlalchemy.models.release import Release
|
||||
from nailgun.db.sqlalchemy.models.task import Task
|
||||
from nailgun.logger import logger
|
||||
from nailgun.settings import settings
|
||||
from nailgun.utils import dict_merge
|
||||
@ -55,7 +57,14 @@ class ClusterChanges(Base):
|
||||
class Cluster(Base):
|
||||
__tablename__ = 'clusters'
|
||||
MODES = ('multinode', 'ha_full', 'ha_compact')
|
||||
STATUSES = ('new', 'deployment', 'operational', 'error', 'remove')
|
||||
STATUSES = (
|
||||
'new',
|
||||
'deployment',
|
||||
'stopped',
|
||||
'operational',
|
||||
'error',
|
||||
'remove'
|
||||
)
|
||||
NET_MANAGERS = ('FlatDHCPManager', 'VlanManager')
|
||||
GROUPING = ('roles', 'hardware', 'both')
|
||||
# Neutron-related
|
||||
@ -165,12 +174,19 @@ class Cluster(Base):
|
||||
|
||||
@property
|
||||
def are_attributes_locked(self):
|
||||
return self.status != "new" or any(
|
||||
map(
|
||||
lambda x: x.name == "deploy" and x.status == "running",
|
||||
self.tasks
|
||||
)
|
||||
)
|
||||
if db().query(Task).filter_by(
|
||||
cluster_id=self.id,
|
||||
name="deploy",
|
||||
status="running"
|
||||
).count():
|
||||
return True
|
||||
elif self.status in ["new", "stopped"] and not \
|
||||
db().query(Node).filter_by(
|
||||
cluster_id=self.id,
|
||||
status="ready"
|
||||
).count():
|
||||
return False
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def validate(cls, data):
|
||||
|
@ -46,6 +46,8 @@ class Task(Base):
|
||||
'deploy',
|
||||
'deployment',
|
||||
'provision',
|
||||
'stop_deployment',
|
||||
'reset_environment',
|
||||
|
||||
'node_deletion',
|
||||
'cluster_deletion',
|
||||
@ -90,7 +92,8 @@ class Task(Base):
|
||||
parent_id = Column(Integer, ForeignKey('tasks.id'))
|
||||
subtasks = relationship(
|
||||
"Task",
|
||||
backref=backref('parent', remote_side=[id])
|
||||
backref=backref('parent', remote_side=[id]),
|
||||
cascade="all,delete"
|
||||
)
|
||||
notifications = relationship(
|
||||
"Notification",
|
||||
|
@ -34,7 +34,9 @@ default_messages = {
|
||||
# deployment errors
|
||||
"CheckBeforeDeploymentError": "Pre-Deployment check wasn't successful",
|
||||
"DeploymentAlreadyStarted": "Deployment already started",
|
||||
"DeploymentNotRunning": "Deployment is not running",
|
||||
"DeletionAlreadyStarted": "Environment removal already started",
|
||||
"StopAlreadyRunning": "Stopping deployment already initiated",
|
||||
"FailedProvisioning": "Failed to start provisioning",
|
||||
"WrongNodeStatus": "Wrong node status",
|
||||
"NodeOffline": "Node is offline",
|
||||
|
@ -57,6 +57,8 @@ class ProvisioningSerializer(object):
|
||||
'uid': node.uid,
|
||||
'power_address': node.ip,
|
||||
'name': TaskHelper.make_slave_name(node.id),
|
||||
# right now it duplicates to avoid possible issues
|
||||
'slave_name': TaskHelper.make_slave_name(node.id),
|
||||
'hostname': node.fqdn,
|
||||
'power_pass': cls.get_ssh_key_path(node),
|
||||
|
||||
|
@ -46,6 +46,18 @@ naily_queue = Queue(
|
||||
routing_key='naily'
|
||||
)
|
||||
|
||||
naily_service_exchange = Exchange(
|
||||
'naily_service',
|
||||
'fanout',
|
||||
durable=False,
|
||||
auto_delete=True
|
||||
)
|
||||
|
||||
naily_service_queue = Queue(
|
||||
'naily_service',
|
||||
exchange=naily_service_exchange
|
||||
)
|
||||
|
||||
nailgun_exchange = Exchange(
|
||||
'nailgun',
|
||||
'topic',
|
||||
@ -59,14 +71,16 @@ nailgun_queue = Queue(
|
||||
)
|
||||
|
||||
|
||||
def cast(name, message):
|
||||
def cast(name, message, service=False):
|
||||
logger.debug(
|
||||
"RPC cast to orchestrator:\n{0}".format(
|
||||
json.dumps(message, indent=4)
|
||||
)
|
||||
)
|
||||
use_queue = naily_queue if not service else naily_service_queue
|
||||
use_exchange = naily_exchange if not service else naily_service_exchange
|
||||
with Connection(conn_str) as conn:
|
||||
with conn.Producer(serializer='json') as producer:
|
||||
producer.publish(message,
|
||||
exchange=naily_exchange, routing_key=name,
|
||||
declare=[naily_queue])
|
||||
exchange=use_exchange, routing_key=name,
|
||||
declare=[use_queue])
|
||||
|
@ -39,7 +39,9 @@ from nailgun.task.helpers import TaskHelper
|
||||
def get_task_by_uuid(uuid):
|
||||
task = db().query(Task).filter_by(uuid=uuid).first()
|
||||
if not task:
|
||||
raise errors.CannotFindTask('Cannot find task with uuid %s' % uuid)
|
||||
raise errors.CannotFindTask(
|
||||
'Cannot find task with uuid {0}'.format(uuid)
|
||||
)
|
||||
|
||||
return task
|
||||
|
||||
@ -274,7 +276,7 @@ class NailgunReceiver(object):
|
||||
node_db = db().query(Node).get(uid)
|
||||
|
||||
if not node_db:
|
||||
logger.warn('Task with uid "{0}" not found'.format(uid))
|
||||
logger.warn('Node with uid "{0}" not found'.format(uid))
|
||||
continue
|
||||
|
||||
if node.get('status') == 'error':
|
||||
@ -424,6 +426,146 @@ class NailgunReceiver(object):
|
||||
)
|
||||
TaskHelper.update_task_status(task.uuid, status, progress, message)
|
||||
|
||||
@classmethod
|
||||
def stop_deployment_resp(cls, **kwargs):
|
||||
logger.info(
|
||||
"RPC method stop_deployment_resp received: %s" %
|
||||
json.dumps(kwargs)
|
||||
)
|
||||
task_uuid = kwargs.get('task_uuid')
|
||||
nodes = kwargs.get('nodes', [])
|
||||
message = kwargs.get('error')
|
||||
status = kwargs.get('status')
|
||||
progress = kwargs.get('progress')
|
||||
|
||||
task = get_task_by_uuid(task_uuid)
|
||||
|
||||
stop_tasks = db().query(Task).filter_by(
|
||||
cluster_id=task.cluster_id,
|
||||
).filter(
|
||||
Task.name.in_(["deploy", "deployment"])
|
||||
).all()
|
||||
if not stop_tasks:
|
||||
logger.warning("stop_deployment_resp: deployment tasks \
|
||||
not found for environment '%s'!", task.cluster_id)
|
||||
|
||||
if status == "ready":
|
||||
task.cluster.status = "stopped"
|
||||
|
||||
if stop_tasks:
|
||||
map(db().delete, stop_tasks)
|
||||
|
||||
db().commit()
|
||||
|
||||
update_nodes = db().query(Node).filter(
|
||||
Node.id.in_([
|
||||
n["uid"] for n in nodes
|
||||
]),
|
||||
Node.cluster_id == task.cluster_id
|
||||
).yield_per(100)
|
||||
|
||||
update_nodes.update(
|
||||
{
|
||||
"online": False,
|
||||
"status": "discover",
|
||||
"pending_addition": True
|
||||
},
|
||||
synchronize_session='fetch'
|
||||
)
|
||||
|
||||
for n in update_nodes:
|
||||
n.roles, n.pending_roles = n.pending_roles, n.roles
|
||||
|
||||
db().commit()
|
||||
|
||||
message = (
|
||||
u"Deployment of environment '{0}' "
|
||||
u"was successfully stopped".format(
|
||||
task.cluster.name or task.cluster_id
|
||||
)
|
||||
)
|
||||
|
||||
notifier.notify(
|
||||
"done",
|
||||
message,
|
||||
task.cluster_id
|
||||
)
|
||||
|
||||
TaskHelper.update_task_status(
|
||||
task_uuid,
|
||||
status,
|
||||
progress,
|
||||
message
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def reset_environment_resp(cls, **kwargs):
|
||||
logger.info(
|
||||
"RPC method reset_environment_resp received: %s",
|
||||
json.dumps(kwargs)
|
||||
)
|
||||
task_uuid = kwargs.get('task_uuid')
|
||||
nodes = kwargs.get('nodes')
|
||||
message = kwargs.get('error')
|
||||
status = kwargs.get('status')
|
||||
progress = kwargs.get('progress')
|
||||
|
||||
task = get_task_by_uuid(task_uuid)
|
||||
|
||||
if status == "ready":
|
||||
|
||||
# restoring pending changes
|
||||
task.cluster.status = "new"
|
||||
task.cluster.add_pending_changes("attributes")
|
||||
task.cluster.add_pending_changes("networks")
|
||||
|
||||
for node in task.cluster.nodes:
|
||||
task.cluster.add_pending_changes(
|
||||
"disks",
|
||||
node_id=node.id
|
||||
)
|
||||
|
||||
update_nodes = db().query(Node).filter(
|
||||
Node.id.in_([
|
||||
n["uid"] for n in nodes
|
||||
]),
|
||||
Node.cluster_id == task.cluster_id
|
||||
).yield_per(100)
|
||||
|
||||
update_nodes.update(
|
||||
{
|
||||
"online": False,
|
||||
"status": "discover",
|
||||
"pending_addition": True
|
||||
},
|
||||
synchronize_session='fetch'
|
||||
)
|
||||
|
||||
for n in update_nodes:
|
||||
n.roles, n.pending_roles = n.pending_roles, n.roles
|
||||
|
||||
db().commit()
|
||||
|
||||
message = (
|
||||
u"Environment '{0}' "
|
||||
u"was successfully resetted".format(
|
||||
task.cluster.name or task.cluster_id
|
||||
)
|
||||
)
|
||||
|
||||
notifier.notify(
|
||||
"done",
|
||||
message,
|
||||
task.cluster_id
|
||||
)
|
||||
|
||||
TaskHelper.update_task_status(
|
||||
task.uuid,
|
||||
status,
|
||||
progress,
|
||||
message
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def verify_networks_resp(cls, **kwargs):
|
||||
logger.info(
|
||||
|
@ -293,6 +293,8 @@ class FakeProvisionThread(FakeThread):
|
||||
super(FakeProvisionThread, self).run()
|
||||
receiver = NailgunReceiver
|
||||
|
||||
self.sleep(self.tick_interval)
|
||||
|
||||
# Since we just add systems to cobbler and reboot nodes
|
||||
# We think this task is always successful if it is launched.
|
||||
kwargs = {
|
||||
@ -343,6 +345,63 @@ class FakeDeletionThread(FakeThread):
|
||||
(cores, ram), node_id=node.id)
|
||||
|
||||
|
||||
class FakeStopDeploymentThread(FakeThread):
|
||||
def run(self):
|
||||
super(FakeStopDeploymentThread, self).run()
|
||||
receiver = NailgunReceiver
|
||||
kwargs = {
|
||||
'task_uuid': self.task_uuid,
|
||||
'stop_task_uuid': self.data['args']['stop_task_uuid'],
|
||||
'nodes': self.data['args']['nodes'],
|
||||
'status': 'ready',
|
||||
'progress': 100
|
||||
}
|
||||
resp_method = getattr(receiver, self.respond_to)
|
||||
resp_method(**kwargs)
|
||||
|
||||
self.sleep(3)
|
||||
nodes_db = db().query(Node).filter(
|
||||
Node.id.in_([
|
||||
n['uid'] for n in self.data['args']['nodes']
|
||||
])
|
||||
).all()
|
||||
|
||||
for n in nodes_db:
|
||||
self.sleep(2)
|
||||
n.online = True
|
||||
n.status = "discover"
|
||||
db().add(n)
|
||||
db().commit()
|
||||
|
||||
|
||||
class FakeResetEnvironmentThread(FakeThread):
|
||||
def run(self):
|
||||
super(FakeResetEnvironmentThread, self).run()
|
||||
receiver = NailgunReceiver
|
||||
kwargs = {
|
||||
'task_uuid': self.task_uuid,
|
||||
'nodes': self.data['args']['nodes'],
|
||||
'status': 'ready',
|
||||
'progress': 100
|
||||
}
|
||||
resp_method = getattr(receiver, self.respond_to)
|
||||
resp_method(**kwargs)
|
||||
|
||||
self.sleep(5)
|
||||
nodes_db = db().query(Node).filter(
|
||||
Node.id.in_([
|
||||
n['uid'] for n in self.data['args']['nodes']
|
||||
])
|
||||
).all()
|
||||
|
||||
for n in nodes_db:
|
||||
self.sleep(2)
|
||||
n.online = True
|
||||
n.status = "discover"
|
||||
db().add(n)
|
||||
db().commit()
|
||||
|
||||
|
||||
class FakeVerificationThread(FakeThread):
|
||||
def run(self):
|
||||
super(FakeVerificationThread, self).run()
|
||||
@ -548,6 +607,8 @@ FAKE_THREADS = {
|
||||
'provision': FakeProvisionThread,
|
||||
'deploy': FakeDeploymentThread,
|
||||
'remove_nodes': FakeDeletionThread,
|
||||
'stop_deploy_task': FakeStopDeploymentThread,
|
||||
'reset_environment': FakeResetEnvironmentThread,
|
||||
'verify_networks': FakeVerificationThread,
|
||||
'check_dhcp': FakeCheckingDhcpThread,
|
||||
'download_release': DownloadReleaseThread,
|
||||
|
@ -253,6 +253,11 @@ class TaskHelper(object):
|
||||
cls.__update_cluster_to_deployment_error(cluster)
|
||||
elif task.name == 'provision' and task.status == 'error':
|
||||
cls.__update_cluster_to_provisioning_error(cluster)
|
||||
elif task.name == 'stop_deployment':
|
||||
if task.status == 'error':
|
||||
cls.__set_cluster_status(cluster, 'error')
|
||||
else:
|
||||
cls.__set_cluster_status(cluster, 'stopped')
|
||||
|
||||
db().commit()
|
||||
|
||||
|
@ -84,11 +84,21 @@ class ApplyChangesTaskManager(TaskManager):
|
||||
if task.status == "running":
|
||||
raise errors.DeploymentAlreadyStarted()
|
||||
elif task.status in ("ready", "error"):
|
||||
for subtask in task.subtasks:
|
||||
db().delete(subtask)
|
||||
db().delete(task)
|
||||
db().commit()
|
||||
|
||||
obsolete_tasks = db().query(Task).filter_by(
|
||||
cluster_id=self.cluster.id,
|
||||
).filter(
|
||||
Task.name.in_([
|
||||
'stop_deployment',
|
||||
'reset_environment'
|
||||
])
|
||||
)
|
||||
for task in obsolete_tasks:
|
||||
db().delete(task)
|
||||
db().commit()
|
||||
|
||||
task_messages = []
|
||||
|
||||
nodes_to_delete = TaskHelper.nodes_to_delete(self.cluster)
|
||||
@ -392,6 +402,104 @@ class DeploymentTaskManager(TaskManager):
|
||||
return task_deployment
|
||||
|
||||
|
||||
class StopDeploymentTaskManager(TaskManager):
|
||||
|
||||
def execute(self):
|
||||
stop_running = db().query(Task).filter_by(
|
||||
cluster=self.cluster,
|
||||
name='stop_deployment'
|
||||
).first()
|
||||
if stop_running:
|
||||
if stop_running.status == 'running':
|
||||
raise errors.StopAlreadyRunning(
|
||||
"Stopping deployment task "
|
||||
"is already launched"
|
||||
)
|
||||
else:
|
||||
db().delete(stop_running)
|
||||
db().commit()
|
||||
|
||||
deploy_running = db().query(Task).filter_by(
|
||||
cluster=self.cluster,
|
||||
name='deployment',
|
||||
status='running'
|
||||
).first()
|
||||
if not deploy_running:
|
||||
provisioning_running = db().query(Task).filter_by(
|
||||
cluster=self.cluster,
|
||||
name='provision',
|
||||
status='running'
|
||||
).first()
|
||||
if provisioning_running:
|
||||
raise errors.DeploymentNotRunning(
|
||||
u"Provisioning interruption for environment "
|
||||
u"'{0}' is not implemented right now".format(
|
||||
self.cluster.id
|
||||
)
|
||||
)
|
||||
raise errors.DeploymentNotRunning(
|
||||
u"Nothing to stop - deployment is "
|
||||
u"not running on environment '{0}'".format(
|
||||
self.cluster.id
|
||||
)
|
||||
)
|
||||
|
||||
task = Task(
|
||||
name="stop_deployment",
|
||||
cluster=self.cluster
|
||||
)
|
||||
db().add(task)
|
||||
db.commit()
|
||||
self._call_silently(
|
||||
task,
|
||||
tasks.StopDeploymentTask,
|
||||
deploy_task=deploy_running
|
||||
)
|
||||
return task
|
||||
|
||||
|
||||
class ResetEnvironmentTaskManager(TaskManager):
|
||||
|
||||
def execute(self):
|
||||
deploy_running = db().query(Task).filter_by(
|
||||
cluster=self.cluster,
|
||||
name='deploy',
|
||||
status='running'
|
||||
).first()
|
||||
if deploy_running:
|
||||
raise errors.DeploymentAlreadyStarted(
|
||||
u"Can't reset environment '{0}' when "
|
||||
u"deployment is running".format(
|
||||
self.cluster.id
|
||||
)
|
||||
)
|
||||
|
||||
obsolete_tasks = db().query(Task).filter_by(
|
||||
cluster_id=self.cluster.id,
|
||||
).filter(
|
||||
Task.name.in_([
|
||||
'deploy',
|
||||
'deployment',
|
||||
'stop_deployment'
|
||||
])
|
||||
)
|
||||
for task in obsolete_tasks:
|
||||
db().delete(task)
|
||||
db().commit()
|
||||
|
||||
task = Task(
|
||||
name="reset_environment",
|
||||
cluster=self.cluster
|
||||
)
|
||||
db().add(task)
|
||||
db.commit()
|
||||
self._call_silently(
|
||||
task,
|
||||
tasks.ResetEnvironmentTask
|
||||
)
|
||||
return task
|
||||
|
||||
|
||||
class CheckNetworksTaskManager(TaskManager):
|
||||
|
||||
def execute(self, data, check_admin_untagged=False):
|
||||
|
@ -196,7 +196,8 @@ class DeletionTask(object):
|
||||
nodes_to_delete.append({
|
||||
'id': node.id,
|
||||
'uid': node.id,
|
||||
'roles': node.roles
|
||||
'roles': node.roles,
|
||||
'slave_name': TaskHelper.make_slave_name(node.id)
|
||||
})
|
||||
|
||||
if USE_FAKE:
|
||||
@ -237,13 +238,11 @@ class DeletionTask(object):
|
||||
nodes_to_delete.remove(node)
|
||||
|
||||
# only real tasks
|
||||
engine_nodes = []
|
||||
if not USE_FAKE:
|
||||
for node in nodes_to_delete_constant:
|
||||
slave_name = TaskHelper.make_slave_name(node['id'])
|
||||
slave_name = node['slave_name']
|
||||
logger.debug("Pending node to be removed from cobbler %s",
|
||||
slave_name)
|
||||
engine_nodes.append(slave_name)
|
||||
try:
|
||||
node_db = db().query(Node).get(node['id'])
|
||||
if node_db and node_db.fqdn:
|
||||
@ -289,8 +288,7 @@ class DeletionTask(object):
|
||||
'url': settings.COBBLER_URL,
|
||||
'username': settings.COBBLER_USER,
|
||||
'password': settings.COBBLER_PASSWORD,
|
||||
},
|
||||
'engine_nodes': engine_nodes
|
||||
}
|
||||
}
|
||||
}
|
||||
# only fake tasks
|
||||
@ -301,6 +299,74 @@ class DeletionTask(object):
|
||||
rpc.cast('naily', msg_delete)
|
||||
|
||||
|
||||
class StopDeploymentTask(object):
|
||||
|
||||
@classmethod
|
||||
def message(cls, task, deploy_task):
|
||||
nodes_to_stop = db().query(Node).filter(
|
||||
Node.cluster_id == task.cluster.id
|
||||
).filter(
|
||||
not_(Node.status == 'ready')
|
||||
).yield_per(100)
|
||||
return {
|
||||
"method": "stop_deploy_task",
|
||||
"respond_to": "stop_deployment_resp",
|
||||
"args": {
|
||||
"task_uuid": task.uuid,
|
||||
"stop_task_uuid": deploy_task.uuid,
|
||||
"nodes": [
|
||||
{
|
||||
'uid': n.uid,
|
||||
'roles': n.roles,
|
||||
'slave_name': TaskHelper.make_slave_name(n.id)
|
||||
} for n in nodes_to_stop
|
||||
],
|
||||
"engine": {
|
||||
"url": settings.COBBLER_URL,
|
||||
"username": settings.COBBLER_USER,
|
||||
"password": settings.COBBLER_PASSWORD,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def execute(cls, task, deploy_task):
|
||||
msg_stop = cls.message(task, deploy_task)
|
||||
rpc.cast('naily', msg_stop, service=True)
|
||||
|
||||
|
||||
class ResetEnvironmentTask(object):
|
||||
|
||||
@classmethod
|
||||
def message(cls, task):
|
||||
nodes_to_reset = db().query(Node).filter(
|
||||
Node.cluster_id == task.cluster.id
|
||||
).yield_per(100)
|
||||
return {
|
||||
"method": "reset_environment",
|
||||
"respond_to": "reset_environment_resp",
|
||||
"args": {
|
||||
"task_uuid": task.uuid,
|
||||
"nodes": [
|
||||
{
|
||||
'uid': n.uid,
|
||||
'roles': n.roles,
|
||||
'slave_name': TaskHelper.make_slave_name(n.id)
|
||||
} for n in nodes_to_reset
|
||||
],
|
||||
"engine": {
|
||||
"url": settings.COBBLER_URL,
|
||||
"username": settings.COBBLER_USER,
|
||||
"password": settings.COBBLER_PASSWORD,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def execute(cls, task):
|
||||
rpc.cast('naily', cls.message(task))
|
||||
|
||||
|
||||
class ClusterDeletionTask(object):
|
||||
|
||||
@classmethod
|
||||
|
@ -465,6 +465,30 @@ class Environment(object):
|
||||
if item.get('pk') == pk and item.get('model') == model:
|
||||
return item
|
||||
|
||||
def launch_provisioning_selected(self, nodes_uids=None):
|
||||
if self.clusters:
|
||||
if not nodes_uids:
|
||||
nodes_uids = [n.uid for n in self.clusters[0].nodes]
|
||||
action_url = reverse(
|
||||
'ProvisionSelectedNodes',
|
||||
kwargs={'cluster_id': self.clusters[0].id}
|
||||
) + '?nodes={0}'.format(','.join(nodes_uids))
|
||||
resp = self.app.put(
|
||||
action_url,
|
||||
'{}',
|
||||
headers=self.default_headers,
|
||||
expect_errors=True
|
||||
)
|
||||
self.tester.assertEquals(200, resp.status)
|
||||
response = json.loads(resp.body)
|
||||
return self.db.query(Task).filter_by(
|
||||
uuid=response['uuid']
|
||||
).first()
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
"Nothing to provision - try creating cluster"
|
||||
)
|
||||
|
||||
def launch_deployment(self):
|
||||
if self.clusters:
|
||||
resp = self.app.put(
|
||||
@ -482,6 +506,46 @@ class Environment(object):
|
||||
"Nothing to deploy - try creating cluster"
|
||||
)
|
||||
|
||||
def stop_deployment(self, expect_http=202):
|
||||
if self.clusters:
|
||||
resp = self.app.put(
|
||||
reverse(
|
||||
'ClusterStopDeploymentHandler',
|
||||
kwargs={'cluster_id': self.clusters[0].id}),
|
||||
expect_errors=True,
|
||||
headers=self.default_headers)
|
||||
self.tester.assertEquals(expect_http, resp.status)
|
||||
if not str(expect_http).startswith("2"):
|
||||
return resp.body
|
||||
response = json.loads(resp.body)
|
||||
return self.db.query(Task).filter_by(
|
||||
uuid=response['uuid']
|
||||
).first()
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
"Nothing to stop - try creating cluster"
|
||||
)
|
||||
|
||||
def reset_environment(self, expect_http=202):
|
||||
if self.clusters:
|
||||
resp = self.app.put(
|
||||
reverse(
|
||||
'ClusterResetHandler',
|
||||
kwargs={'cluster_id': self.clusters[0].id}),
|
||||
expect_errors=True,
|
||||
headers=self.default_headers)
|
||||
self.tester.assertEquals(resp.status, expect_http)
|
||||
if not str(expect_http).startswith("2"):
|
||||
return resp.body
|
||||
response = json.loads(resp.body)
|
||||
return self.db.query(Task).filter_by(
|
||||
uuid=response['uuid']
|
||||
).first()
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
"Nothing to reset - try creating cluster"
|
||||
)
|
||||
|
||||
def launch_verify_networks(self, data=None):
|
||||
if self.clusters:
|
||||
net_urls = {
|
||||
@ -747,6 +811,8 @@ class BaseUnitTest(BaseTestCase):
|
||||
|
||||
def fake_tasks(fake_rpc=True,
|
||||
mock_rpc=True,
|
||||
tick_count=99,
|
||||
tick_interval=1,
|
||||
**kwargs):
|
||||
def wrapper(func):
|
||||
func = mock.patch(
|
||||
@ -755,11 +821,11 @@ def fake_tasks(fake_rpc=True,
|
||||
)(func)
|
||||
func = mock.patch(
|
||||
'nailgun.task.fake.settings.FAKE_TASKS_TICK_COUNT',
|
||||
99
|
||||
tick_count
|
||||
)(func)
|
||||
func = mock.patch(
|
||||
'nailgun.task.fake.settings.FAKE_TASKS_TICK_INTERVAL',
|
||||
1
|
||||
tick_interval
|
||||
)(func)
|
||||
if fake_rpc and not kwargs:
|
||||
func = mock.patch(
|
||||
|
57
nailgun/nailgun/test/integration/test_reset_environment.py
Normal file
57
nailgun/nailgun/test/integration/test_reset_environment.py
Normal file
@ -0,0 +1,57 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2013 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from nailgun.test.base import BaseIntegrationTest
|
||||
from nailgun.test.base import fake_tasks
|
||||
|
||||
|
||||
class TestResetEnvironment(BaseIntegrationTest):
|
||||
|
||||
def tearDown(self):
|
||||
self._wait_for_threads()
|
||||
super(TestResetEnvironment, self).tearDown()
|
||||
|
||||
@fake_tasks()
|
||||
def test_reset_environment(self):
|
||||
self.env.create(
|
||||
cluster_kwargs={},
|
||||
nodes_kwargs=[
|
||||
{"name": "First",
|
||||
"pending_addition": True},
|
||||
{"name": "Second",
|
||||
"roles": ["compute"],
|
||||
"pending_addition": True}
|
||||
]
|
||||
)
|
||||
cluster_db = self.env.clusters[0]
|
||||
supertask = self.env.launch_deployment()
|
||||
self.env.wait_ready(supertask, 60)
|
||||
|
||||
for n in cluster_db.nodes:
|
||||
self.assertEquals(n.status, "ready")
|
||||
self.assertEquals(n.pending_addition, False)
|
||||
|
||||
reset_task = self.env.reset_environment()
|
||||
self.env.wait_ready(reset_task, 60)
|
||||
|
||||
self.assertEquals(cluster_db.status, "new")
|
||||
|
||||
for n in cluster_db.nodes:
|
||||
self.assertEquals(n.online, False)
|
||||
self.assertEquals(n.status, "discover")
|
||||
self.assertEquals(n.pending_addition, True)
|
||||
self.assertEquals(n.roles, [])
|
||||
self.assertNotEquals(n.pending_roles, [])
|
81
nailgun/nailgun/test/integration/test_stop_deployment.py
Normal file
81
nailgun/nailgun/test/integration/test_stop_deployment.py
Normal file
@ -0,0 +1,81 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2013 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from nailgun.db import db
|
||||
from nailgun.db.sqlalchemy.models.task import Task
|
||||
|
||||
from nailgun.test.base import BaseIntegrationTest
|
||||
from nailgun.test.base import fake_tasks
|
||||
|
||||
|
||||
class TestStopDeployment(BaseIntegrationTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestStopDeployment, self).setUp()
|
||||
self.env.create(
|
||||
cluster_kwargs={},
|
||||
nodes_kwargs=[
|
||||
{"name": "First",
|
||||
"pending_addition": True},
|
||||
{"name": "Second",
|
||||
"roles": ["compute"],
|
||||
"pending_addition": True}
|
||||
]
|
||||
)
|
||||
self.cluster = self.env.clusters[0]
|
||||
self.controller = self.env.nodes[0]
|
||||
self.compute = self.env.nodes[1]
|
||||
self.node_uids = [n.uid for n in self.cluster.nodes][:3]
|
||||
|
||||
def tearDown(self):
|
||||
self._wait_for_threads()
|
||||
super(TestStopDeployment, self).tearDown()
|
||||
|
||||
@fake_tasks()
|
||||
def test_stop_deployment(self):
|
||||
supertask = self.env.launch_deployment()
|
||||
deploy_task_uuid = supertask.uuid
|
||||
stop_task = self.env.stop_deployment()
|
||||
self.env.wait_ready(stop_task, 60)
|
||||
self.assertIsNone(
|
||||
db().query(Task).filter_by(
|
||||
uuid=deploy_task_uuid
|
||||
).first()
|
||||
)
|
||||
self.assertEquals(self.cluster.status, "stopped")
|
||||
self.assertEquals(stop_task.progress, 100)
|
||||
|
||||
for n in self.cluster.nodes:
|
||||
self.assertEquals(n.online, False)
|
||||
self.assertEquals(n.roles, [])
|
||||
self.assertNotEquals(n.pending_roles, [])
|
||||
|
||||
@fake_tasks(tick_interval=3)
|
||||
def test_stop_provisioning(self):
|
||||
provisioning_task = self.env.launch_provisioning_selected(
|
||||
self.node_uids
|
||||
)
|
||||
stop_task_resp = self.env.stop_deployment(
|
||||
expect_http=400
|
||||
)
|
||||
self.db.refresh(provisioning_task)
|
||||
self.assertEquals(
|
||||
stop_task_resp,
|
||||
u"Provisioning interruption for environment "
|
||||
u"'{0}' is not implemented right now".format(
|
||||
self.cluster.id
|
||||
)
|
||||
)
|
@ -95,7 +95,6 @@ class TestTaskManagers(BaseIntegrationTest):
|
||||
|
||||
args, kwargs = nailgun.task.manager.rpc.cast.call_args
|
||||
self.assertEquals(len(args[1]['args']['nodes']), 0)
|
||||
self.assertEquals(len(args[1]['args']['engine_nodes']), 0)
|
||||
|
||||
self.env.refresh_nodes()
|
||||
for n in self.env.nodes:
|
||||
|
Loading…
Reference in New Issue
Block a user