Split provision and deletion into several deployment tasks

Several changes:
- new task type 'master_shell': run task on master node using
  node context;
- new task type 'move_to_bootstrap': move non-bootstrap node to
  bootstrap, remove and add all nodes to Cobbler;
- add new task type similar to noop: skipped, stage;
- add new task type 'erase_node': erase node as task;
- refactoring reporting message: now it simple and protect
  from sending duplicate message for any formats
- allow to setup node report behavior using
  node_statuses_transitions in tasks_metadata in case of
  successful, stopped or failed

Change-Id: Iac128fc9d8c764269bebb3e95d6ba9e4a086f919
This commit is contained in:
Vladimir Sharshov (warpc) 2016-07-12 18:17:48 +03:00
parent 30e09c10f9
commit 52bc1ed156
17 changed files with 434 additions and 280 deletions

View File

@ -151,6 +151,82 @@ module Astute
existent_nodes
end
def existent_node?(cobbler_name)
return false unless @engine.system_exists?(cobbler_name)
Astute.logger.info("Node #{cobbler_name} already exists in cobbler")
true
end
def edit_node(cobbler_name, data)
begin
Astute.logger.info("Changing cobbler system #{cobbler_name}")
@engine.item_from_hash('system', cobbler_name, data, :item_preremove => false)
rescue RuntimeError => e
Astute.logger.error("Error occured while changing cobbler system #{cobbler_name}")
raise e
end
ensure
sync
end
def netboot_node(cobbler_name, state)
begin
Astute.logger.info("Changing node netboot state #{cobbler_name}")
@engine.netboot(cobbler_name, state)
rescue RuntimeError => e
Astute.logger.error("Error while changing node netboot state #{cobbler_name}")
raise e
end
ensure
sync
end
def remove_node(cobbler_name, retries=3, interval=2)
Astute.logger.info("Node to remove: #{cobbler_name}")
retries.times do
unless @engine.system_exists?(cobbler_name)
Astute.logger.info("System is not in cobbler: #{cobbler_name}")
return
else
Astute.logger.info("Trying to remove system from cobbler: #{cobbler_name}")
@engine.remove_system(cobbler_name)
end
return unless @engine.system_exists?(cobbler_name)
sleep(interval) if interval > 0
end
ensure
Astute.logger.error("Cannot remove node #{cobbler_name} from cobbler") if @engine.system_exists?(cobbler_name)
sync
end
def add_node(node)
cobbler_name = node['slave_name']
begin
Astute.logger.info("Adding #{cobbler_name} into cobbler")
@engine.item_from_hash('system', cobbler_name, node, :item_preremove => true)
rescue RuntimeError => e
Astute.logger.error("Error occured while adding system #{cobbler_name} to cobbler")
raise e
end
ensure
sync
end
def node_mac_duplicate_names(node)
mac_duplicate_names = []
Astute.logger.info("Trying to find MAC duplicates for node #{node['slave_name']}")
if node['interfaces']
node['interfaces'].each do |iname, ihash|
if ihash['mac_address']
Astute.logger.info("Trying to find system with MAC: #{ihash['mac_address']}")
found_node = @engine.system_by_mac(ihash['mac_address'])
mac_duplicate_names << found_node['name'] if found_node
end
end
end
mac_duplicate_names.uniq
end
def get_mac_duplicate_names(nodes)
mac_duplicate_names = []
nodes.each do |node|

View File

@ -53,8 +53,7 @@ module Astute
def task_deploy(up_reporter, task_id, deployment_options = {})
time_start = Time.now.to_i
proxy_reporter = ProxyReporter::TaskProxyReporter.new(
up_reporter,
deployment_options[:tasks_graph].keys
up_reporter
)
context = Context.new(task_id, proxy_reporter)
Astute.logger.info "Task based deployment will be used"

View File

@ -175,7 +175,7 @@ module Astute
# use upload file task.
# Synchronous (blocking) call
def upload_file(node_uid, mco_params={})
upload_mclient = Astute::MClient.new(
upload_mclient = MClient.new(
@ctx,
"uploadfile",
Array(node_uid)

View File

@ -17,6 +17,13 @@ module Astute
class TaskCluster < Deployment::Cluster
attr_accessor :noop_run
def initialize(id=nil)
super
@node_statuses_transitions = {}
end
attr_accessor :node_statuses_transitions
def hook_post_gracefully_stop(*args)
report_new_node_status(args[0])
end

View File

@ -16,6 +16,15 @@ require_relative '../fuel_deployment'
module Astute
class TaskDeployment
#TODO(vsharshov): remove this default after adding support of node
# status transition to Nailgun
NODE_STATUSES_TRANSITIONS = {
'successful' => {'status' => 'ready'},
'stopped' => {'status' => 'stopped'},
'failed' => {'status' => 'error', 'error_type' => 'deploy'}
}
def initialize(context, cluster_class=TaskCluster, node_class=TaskNode)
@ctx = context
@cluster_class = cluster_class
@ -44,7 +53,15 @@ module Astute
false
)
offline_uids = fail_offline_nodes(tasks_graph)
cluster.node_statuses_transitions = tasks_metadata.fetch(
'node_statuses_transitions',
NODE_STATUSES_TRANSITIONS
)
offline_uids = fail_offline_nodes(
tasks_graph,
cluster.node_statuses_transitions
)
critical_uids = critical_node_uids(cluster.fault_tolerance_groups)
tasks_graph.keys.each do |node_id|
@ -67,13 +84,12 @@ module Astute
dry_run = deployment_options.fetch(:dry_run, false)
Deployment::Log.logger = Astute.logger if Astute.respond_to? :logger
write_graph_to_file(cluster)
if dry_run
result = Hash.new
result[:success] = true
result = if dry_run
{:success => true }
else
result = cluster.run
run_result = cluster.run
# imitate dry_run results for noop run after deployment
result = {:success => true } if cluster.noop_run
cluster.noop_run ? {:success => true } : run_result
end
report_deploy_result(result)
end
@ -151,10 +167,8 @@ module Astute
if result[:success] && result.fetch(:failed_nodes, []).empty?
@ctx.report('status' => 'ready', 'progress' => 100)
elsif result[:success] && result.fetch(:failed_nodes, []).present?
report_failed_nodes(result)
@ctx.report('status' => 'ready', 'progress' => 100)
else
report_failed_nodes(result)
@ctx.report(
'status' => 'error',
'progress' => 100,
@ -163,26 +177,6 @@ module Astute
end
end
def report_failed_nodes(result)
result.fetch(:failed_nodes, []).each do |node|
node_status = {
'uid' => node.id,
'status' => 'error',
'error_type' => 'deploy',
'error_msg' => result[:status]
}
task = result[:failed_tasks].find{ |t| t.node == node }
if task
node_status.merge!({
'deployment_graph_task_name' => task.name,
'task_status' => task.status.to_s
})
end
@ctx.report('nodes' => [node_status])
end
end
def write_graph_to_file(deployment)
return unless Astute.config.enable_graph_file
graph_file = File.join(
@ -216,7 +210,7 @@ module Astute
end
def critical_node_uids(fault_tolerance_groups)
return [] unless fault_tolerance_groups
return [] if fault_tolerance_groups.blank?
critical_nodes = fault_tolerance_groups.inject([]) do |critical_uids, group|
critical_uids += group['node_ids'] if group['fault_tolerance'].zero?
critical_uids
@ -225,16 +219,14 @@ module Astute
critical_nodes
end
def fail_offline_nodes(tasks_graph)
def fail_offline_nodes(tasks_graph, node_statuses_transitions)
offline_uids = detect_offline_nodes(tasks_graph.keys)
if offline_uids.present?
nodes = offline_uids.map do |uid|
{'uid' => uid,
'status' => 'error',
'error_type' => 'provision',
'error_msg' => 'Node is not ready for deployment: '\
'mcollective has not answered'
}
}.merge(node_statuses_transitions.fetch('failed', {}))
end
@ctx.report_and_update_status(
@ -260,18 +252,16 @@ module Astute
uids.delete('virtual_sync_node')
# In case of big amount of nodes we should do several calls to be sure
# about node status
if !uids.empty?
if uids.present?
Astute.config.mc_retries.times.each do
systemtype = Astute::MClient.new(
systemtype = MClient.new(
@ctx,
"systemtype",
uids,
_check_result=false,
10
)
available_nodes = systemtype.get_type.select do |node|
node.results[:data][:node_type].chomp == "target"
end
available_nodes = systemtype.get_type
available_uids += available_nodes.map { |node| node.results[:sender] }
uids -= available_uids

View File

@ -42,7 +42,6 @@ module Astute
@ctx.report({
'nodes' => [{
'uid' => id,
'status' => 'deploying',
'deployment_graph_task_name' => task.name,
'progress' => current_progress_bar,
'task_status' => task.status.to_s,
@ -55,30 +54,20 @@ module Astute
end
def report_node_status
deploy_status = if !finished?
'deploying'
elsif successful?
'ready'
elsif skipped?
'stopped'
else
'error'
end
node_status = {
'uid' => id,
'status' => deploy_status,
'progress' => current_progress_bar,
}
node_status.merge!(node_report_status)
node_status.merge!(
'deployment_graph_task_name' => task.name,
'task_status' => task.status.to_s,
'custom' => @task_engine.summary
) if task
node_status.merge!('error_type' => 'deploy') if
deploy_status == 'error'
node_status.merge!(
'error_msg' => "Task #{task.name} failed on node #{name}"
) if task.failed?
@ctx.report('nodes' => [node_status])
end
@ -111,21 +100,11 @@ module Astute
end
def select_task_engine(data)
# TODO: replace by Object.const_get(type.split('_').collect(&:capitalize).join)
case data['type']
when 'shell' then noop_run? ? NoopShell.new(data, @ctx) : Shell.new(data, @ctx)
when 'puppet' then noop_run? ? NoopPuppet.new(data, @ctx) : Puppet.new(data, @ctx)
when 'upload_file' then noop_run? ? NoopUploadFile.new(data, @ctx) : UploadFile.new(data, @ctx)
when 'upload_files' then noop_run? ? NoopUploadFiles.new(data, @ctx) : UploadFiles.new(data, @ctx)
when 'reboot' then noop_run? ? NoopReboot.new(data, @ctx) : Reboot.new(data, @ctx)
when 'sync' then noop_run? ? NoopSync.new(data, @ctx) : Sync.new(data, @ctx)
when 'cobbler_sync' then noop_run? ? NoopCobblerSync.new(data, @ctx) : CobblerSync.new(data, @ctx)
when 'copy_files' then noop_run? ? NoopCopyFiles.new(data, @ctx) : CopyFiles.new(data, @ctx)
when 'noop' then Noop.new(data, @ctx)
when 'stage' then Noop.new(data, @ctx)
when 'skipped' then Noop.new(data, @ctx)
else raise TaskValidationError, "Unknown task type '#{data['type']}'"
end
noop_prefix = noop_run? ? "Noop" : ""
task_class_name = noop_prefix + data['type'].split('_').collect(&:capitalize).join
Object.const_get('Astute::' + task_class_name).new(data, @ctx)
rescue => e
raise TaskValidationError, "Unknown task type '#{data['type']}'. Detailed: #{e.message}"
end
def report_running?(data)
@ -135,5 +114,18 @@ module Astute
def noop_run?
cluster.noop_run
end
def node_report_status
if !finished?
{}
elsif successful?
cluster.node_statuses_transitions.fetch('successful', {})
elsif skipped?
cluster.node_statuses_transitions.fetch('stopped', {})
else
cluster.node_statuses_transitions.fetch('failed', {})
end
end
end
end

View File

@ -12,12 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
require 'digest/md5'
module Astute
module ProxyReporter
class TaskProxyReporter
STATES = ['deploying', 'ready', 'error', 'stopped']
FINAL_STATES = ['ready', 'error', 'stopped']
INTEGRATED_STATES = ['error', 'stopped']
REPORT_REAL_TASK_STATE_MAP = {
'running' => 'running',
@ -30,20 +31,19 @@ module Astute
'virtual_sync_node' => nil
}
def initialize(up_reporter, nodes_uids=[])
def initialize(up_reporter)
@up_reporter = up_reporter
@nodes = nodes_uids.inject({}) do |nodes, node_uid|
nodes.merge(node_uid => {'status' => 'pending', 'progress' => nil})
end
@messages_cache = []
end
def report(original_data)
return if duplicate?(original_data)
data = original_data.deep_dup
if data['nodes']
nodes_to_report = get_nodes_to_report(data['nodes'])
return if nodes_to_report.empty? # Let's report only if nodes updated
update_saved_nodes(nodes_to_report)
data['nodes'] = nodes_to_report
end
@ -63,7 +63,7 @@ module Astute
return node unless are_fields_valid?(node)
convert_task_status_to_status(node)
normalization_progress(node)
compare_with_previous_state(node)
return node
end
def are_fields_valid?(node)
@ -75,26 +75,17 @@ module Astute
['master', 'virtual_sync_node'].include?(node['uid'])
end
def valid_status?(status)
STATES.include? status.to_s
end
def valid_task_status?(status)
REPORT_REAL_TASK_STATE_MAP.keys.include? status.to_s
end
def final_status?(status)
FINAL_STATES.include? status.to_s
def integrated_status?(status)
INTEGRATED_STATES.include? status.to_s
end
# Validate of basic fields in message about node
def are_node_basic_fields_valid?(node)
err = []
err << "Status provided '#{node['status']}' is not supported" if
node['status'] && !valid_status?(node['status'])
err << "progress value provided, but no status" if
!node['status'] && node['progress']
err << "Node uid is not provided" unless node['uid']
err.any? ? fail_validation(node, err) : true
@ -117,43 +108,14 @@ module Astute
end
# Normalization of progress field: ensures that the scaling progress was
# in range from 0 to 100 and has a value of 100 fot the final node
# in range from 0 to 100 and has a value of 100 fot the integrated node
# status
def normalization_progress(node)
if node['progress']
node['progress'] = 100 if node['progress'] > 100 ||
['ready', 'error'].include?(node['status'])
node['progress'] = 100 if node['progress'] > 100
node['progress'] = 0 if node['progress'] < 0
else
node['progress'] = 100 if final_status?(node['status'])
end
end
# Comparison information about node with previous state
def compare_with_previous_state(node)
saved_node = @nodes[node['uid']]
return node unless saved_node
node_progress = node['progress'] || saved_node['progress'].to_i
return if final_status?(saved_node['status']) &&
!final_status?(node['status'])
# Allow to send only node progress/status update
return if node_progress.to_i <= saved_node['progress'].to_i &&
node['status'] == saved_node['status'] &&
node['deployment_graph_task_name'] == saved_node['deployment_graph_task_name'] &&
node['task_status'] == saved_node['task_status']
node
end
def update_saved_nodes(new_nodes)
new_nodes.each do |node|
saved_node = @nodes[node['uid']]
if saved_node
node.each {|k, v| saved_node[k] = v}
else
@nodes[node['uid']] = node
end
node['progress'] = 100 if integrated_status?(node['status'])
end
end
@ -176,6 +138,18 @@ module Astute
false
end
# Save message digest to protect server from
# message flooding. Sure, because of Hash is complicated structure
# which does not respect order and can be generate different strings
# but we still catch most of possible duplicates.
def duplicate?(data)
msg_digest = Digest::MD5.hexdigest(data.to_s)
return true if @messages_cache.include?(msg_digest)
@messages_cache << msg_digest
return false
end
end
end
end

View File

@ -0,0 +1,48 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Astute
class EraseNode < Task
private
def process
erase_node
end
def calculate_status
succeed!
end
def validation
validate_presence(@task, 'node_id')
end
def erase_node
remover = MClient.new(
@ctx,
"erase_node",
Array(@task['node_id']),
_check_result=false)
response = remover.erase_node(:reboot => false)
Astute.logger.debug "#{@ctx.task_id}: Data received from node "\
"#{@task['node_id']} :\n#{response.pretty_inspect}"
rescue Astute::MClientTimeout, Astute::MClientError => e
Astute.logger.error("#{@ctx.task_id}: #{task_name} mcollective " \
"erase node command failed with error #{e.message}")
failed!
end
end
end

View File

@ -0,0 +1,62 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'erb'
module Astute
class MasterShell < Task
# Accept to run shell tasks using existing shell asynchronous
# mechanism. It will run task on master node.
def initialize(task, context)
super
@shell_task = nil
end
def summary
@shell_task.summary
rescue
{}
end
private
def process
@shell_task = Shell.new(
generate_master_shell,
@ctx
)
@shell_task.run
end
def calculate_status
self.status = @shell_task.status
end
def validation
validate_presence(@task['parameters'], 'cmd')
end
def setup_default
@task['parameters']['timeout'] ||= Astute.config.shell_timeout
@task['parameters']['cwd'] ||= Astute.config.shell_cwd
@task['parameters']['retries'] ||= Astute.config.mc_retries
@task['parameters']['interval'] ||= Astute.config.mc_retry_interval
end
def generate_master_shell
@task.merge('node_id' => 'master')
end
end
end

View File

@ -0,0 +1,85 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Astute
class MoveToBootstrap < Task
def initialize(task, context)
super
@work_thread = nil
end
private
def process
cobbler = CobblerManager.new(
@task['parameters']['provisioning_info']['engine'],
@ctx.reporter
)
@work_thread = Thread.new do
is_exist = cobbler.existent_node?(@task['parameters']['provisioning_info']['slave_name'])
# Change node type to prevent wrong node detection as provisioned
# Also this type if node will not rebooted, Astute will be allowed
# to try to reboot such nodes again
change_nodes_type('reprovisioned') if is_exist
bootstrap_profile = @task['parameters']['provisioning_info']['profile'] ||
Astute.config.bootstrap_profile
cobbler.edit_node(@task['parameters']['provisioning_info']['slave_name'],
{'profile' => bootstrap_profile})
cobbler.netboot_node(@task['parameters']['provisioning_info']['slave_name'],
true)
Reboot.new({'node_id' => @task['node_id']}, @ctx).sync_run if is_exist
Rsyslogd.send_sighup(
@ctx,
@task['parameters']['provisioning_info']['engine']['master_ip']
)
cobbler.remove_node(@task['parameters']['provisioning_info']['slave_name'])
# NOTE(kozhukalov): We try to find out if there are systems
# in the Cobbler with the same MAC addresses. If so, Cobbler is going
# to throw MAC address duplication error. We need to remove these
# nodes.
mac_duplicate_names = cobbler.node_mac_duplicate_names(@task['parameters']['provisioning_info'])
cobbler.remove_nodes(mac_duplicate_names.map {|n| {'slave_name' => n}})
cobbler.add_node(@task['parameters']['provisioning_info'])
end
end
def calculate_status
@work_thread.join and succeed! unless @work_thread.alive?
end
def validation
validate_presence(@task['parameters'], 'provisioning_info')
validate_presence(@task, 'node_id')
end
def change_nodes_type(type="image")
run_shell_without_check(
@task['node_id'],
"echo '#{type}' > /etc/nailgun_systemtype",
_timeout=5
)[:stdout]
rescue Astute::MClientTimeout, Astute::MClientError => e
Astute.logger.debug("#{@ctx.task_id}: #{task_name} mcollective " \
"change type command failed with error #{e.message}")
nil
end
end
end

View File

@ -53,14 +53,14 @@ module Astute
end
def setup_default
@task['parameters']['timeout'] ||= Astute.config.reboot_timeout
@task.fetch('parameters', {})['timeout'] ||= Astute.config.reboot_timeout
end
def reboot
run_shell_without_check(
@task['node_id'],
RebootCommand::CMD,
timeout=2
_timeout=2
)
rescue Astute::MClientTimeout, Astute::MClientError => e
Astute.logger.error("#{@ctx.task_id}: #{task_name} mcollective " \
@ -72,7 +72,7 @@ module Astute
run_shell_without_check(
@task['node_id'],
"stat --printf='%Y' /proc/1",
timeout=2
_timeout=2
)[:stdout].to_i
rescue Astute::MClientTimeout, Astute::MClientError => e
Astute.logger.debug("#{@ctx.task_id}: #{task_name} mcollective " \

View File

@ -0,0 +1,21 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'astute/tasks/noop'
module Astute
class Skipped < Noop
end
end

21
lib/astute/tasks/stage.rb Normal file
View File

@ -0,0 +1,21 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'astute/tasks/noop'
module Astute
class Stage < Noop
end
end

View File

@ -90,8 +90,7 @@ describe Astute::Orchestrator do
Astute::TaskDeployment.any_instance.stubs(:deploy)
Astute::ProxyReporter::TaskProxyReporter.expects(:new).with(
@reporter,
tasks_graph.keys
@reporter
)
@orchestrator.task_deploy(

View File

@ -358,12 +358,7 @@ describe Astute::TaskDeployment do
it 'failed status' do
failed_node = mock('node')
failed_node.expects(:id).returns('1')
failed_task = mock('task')
failed_task.expects(:node).returns(failed_node)
failed_task.expects(:name).returns('test')
failed_task.expects(:status).returns(:failed)
Astute::TaskCluster.any_instance.stubs(:run).returns({
:success => false,
@ -372,14 +367,6 @@ describe Astute::TaskDeployment do
:status => 'Failed because of'})
task_deployment.stubs(:fail_offline_nodes).returns([])
task_deployment.stubs(:write_graph_to_file)
ctx.expects(:report).with('nodes' => [{
'uid' => '1',
'status' => 'error',
'error_type' => 'deploy',
'error_msg' => 'Failed because of',
'deployment_graph_task_name' => 'test',
'task_status' => 'failed'
}])
ctx.expects(:report).with({
'status' => 'error',
'progress' => 100,

View File

@ -96,7 +96,6 @@ describe Astute::TaskNode do
Astute::Puppet.any_instance.stubs(:run)
ctx.expects(:report).with('nodes' => [{
'uid' => 'node_id',
'status' => 'deploying',
'progress' => 0,
'deployment_graph_task_name' => 'openstack-haproxy-mysqld',
'task_status' => 'running',
@ -185,13 +184,13 @@ describe Astute::TaskNode do
it 'skipped' do
task_data['type'] = "skipped"
Astute::Noop.any_instance.expects(:run)
Astute::Skipped.any_instance.expects(:run)
task_node.run(task)
end
it 'stage' do
task_data['type'] = "stage"
Astute::Noop.any_instance.expects(:run)
Astute::Stage.any_instance.expects(:run)
task_node.run(task)
end
@ -225,7 +224,7 @@ describe Astute::TaskNode do
task_data['type'] = "unknown"
expect{task_node.run(task)}.to raise_error(
Astute::TaskValidationError,
"Unknown task type 'unknown'")
"Unknown task type 'unknown'. Detailed: uninitialized constant Astute::Unknown")
end
end # support task type
end
@ -307,7 +306,6 @@ describe Astute::TaskNode do
ctx.expects(:report).with({
'nodes' => [{
'uid' => 'node_id',
'status' => 'deploying',
'deployment_graph_task_name' => task.name,
'task_status' => 'running',
'progress' => 0}]
@ -316,6 +314,10 @@ describe Astute::TaskNode do
end
it 'should report ready if task successful and no more task' do
cluster.node_statuses_transitions['successful'] = {
'status' => 'ready'
}
Astute::Puppet.any_instance.expects(:status).returns(:successful)
task_node.run(task)
ctx.expects(:report).with({
@ -343,12 +345,15 @@ describe Astute::TaskNode do
end
it 'should report ready if task skipped and no more task' do
cluster.node_statuses_transitions['successful'] = {
'status' => 'ready'
}
task_node.run(task)
ctx.expects(:report).with({
'nodes' => [{
'uid' => 'node_id',
'status' => 'ready',
'deployment_graph_task_name' => task.name,
'status' => 'ready',
'custom' => {},
'task_status' => 'skipped',
'progress' => 100}]
@ -366,7 +371,6 @@ describe Astute::TaskNode do
ctx.expects(:report).with({
'nodes' => [{
'uid' => 'node_id',
'status' => 'deploying',
'deployment_graph_task_name' => task.name,
'custom' => {},
'task_status' => 'skipped',
@ -377,7 +381,13 @@ describe Astute::TaskNode do
end
it 'should report error if task failed and no more task' do
cluster.node_statuses_transitions['failed'] = {
'status' => 'error',
'error_type' => 'deploy'
}
Astute::Puppet.any_instance.expects(:status).returns(:failed)
task_node.run(task)
ctx.expects(:report).with({
'nodes' => [{
@ -387,6 +397,7 @@ describe Astute::TaskNode do
'custom' => {},
'task_status' => 'failed',
'error_type' => 'deploy',
'error_msg' => "Task #{task.name} failed on node node_id",
'progress' => 100}]
})
task_node.poll
@ -398,12 +409,10 @@ describe Astute::TaskNode do
'second_task',
task_data.merge({'node_id' => 'node_id'})
)
task_node.run(task)
ctx.expects(:report).with({
'nodes' => [{
'uid' => 'node_id',
'status' => 'deploying',
'deployment_graph_task_name' => task.name,
'custom' => {},
'task_status' => 'successful',

View File

@ -34,9 +34,7 @@ describe "TaskProxyReporter" do
'status' => 'ready',
'uid' => '1',
'deployment_graph_task_name' => 'test_1',
'task_status' => 'ready',
'progress' => 100}
]
'task_status' => 'ready'}]
}
end
@ -66,37 +64,6 @@ describe "TaskProxyReporter" do
5.times { reporter.report(msg) }
end
it "reports only updated node" do
expected_msg_2 = {'nodes' => [{
'status' => 'deploying',
'uid' => '2',
'deployment_graph_task_name' => 'test_1',
'task_status' => 'running',
'progress' => 54}]
}
up_reporter.expects(:report).with(expected_msg)
up_reporter.expects(:report).with(expected_msg_2)
reporter.report(msg)
reporter.report(msg_pr)
end
it "reports only if progress value is greater" do
msg1 = {'nodes' => [{'status' => 'deploying', 'uid' => '1', 'progress' => 54,
'deployment_graph_task_name' => 'test_1', 'task_status' => 'running'},
{'status' => 'deploying', 'uid' => '2', 'progress' => 54,
'deployment_graph_task_name' => 'test_1', 'task_status' => 'running'}]}
msg2 = Marshal.load(Marshal.dump(msg1))
msg2['nodes'][1]['progress'] = 100
msg2['nodes'][1]['status'] = 'ready'
updated_node = msg2['nodes'][1]
expected_msg = {'nodes' => [updated_node]}
up_reporter.expects(:report).with(msg1)
up_reporter.expects(:report).with(expected_msg)
reporter.report(msg1)
reporter.report(msg2)
end
it "reports if progress value same, but deployment graph task name different" do
msg1 = {'nodes' => [{'status' => 'deploying', 'uid' => '1', 'progress' => 54,
'deployment_graph_task_name' => 'test_1', 'task_status' => 'running'}]}
@ -171,67 +138,34 @@ describe "TaskProxyReporter" do
reporter.report(input_msg)
end
it "adjusts progress to 100 if status ready and no progress given" do
it "adjusts progress to 100 if status error and no progress given" do
input_msg = {'nodes' => [{'uid' => 1,
'status' => 'ready',
'status' => 'error',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'successful'}]}
'task_status' => 'failed'}]}
expected_msg = {'nodes' => [{'uid' => 1,
'status' => 'ready',
'status' => 'error',
'progress' => 100,
'deployment_graph_task_name' => 'test_2',
'task_status' => 'ready'}]}
'task_status' => 'error'}]}
up_reporter.expects(:report).with(expected_msg)
reporter.report(input_msg)
end
it "adjusts progress to 100 if status ready with progress" do
it "adjusts progress to 100 if status stopped and no progress given" do
input_msg = {'nodes' => [{'uid' => 1,
'status' => 'ready',
'status' => 'stopped',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'successful',
'progress' => 50}]}
'task_status' => 'skipped'}]}
expected_msg = {'nodes' => [{'uid' => 1,
'status' => 'ready',
'status' => 'stopped',
'progress' => 100,
'deployment_graph_task_name' => 'test_2',
'task_status' => 'ready'}]}
'task_status' => 'skipped'}]}
up_reporter.expects(:report).with(expected_msg)
reporter.report(input_msg)
end
it "does not report if node was in ready, and trying to set is deploying" do
msg1 = {'nodes' => [{'uid' => 1,
'status' => 'ready',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'successful'}]}
msg2 = {'nodes' => [{'uid' => 2,
'status' => 'ready',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'successful'}]}
msg3 = {'nodes' => [{'uid' => 1,
'status' => 'deploying',
'progress' => 100,
'deployment_graph_task_name' => 'test_2',
'task_status' => 'successful'}]}
expected_msg_1 = {'nodes' => [{'uid' => 1,
'status' => 'ready',
'progress' => 100,
'deployment_graph_task_name' => 'test_2',
'task_status' => 'ready'}]}
expected_msg_2 = {'nodes' => [{'uid' => 2,
'status' => 'ready',
'progress' => 100,
'deployment_graph_task_name' => 'test_2',
'task_status' => 'ready'}]}
up_reporter.expects(:report).with(expected_msg_1)
up_reporter.expects(:report).with(expected_msg_2)
up_reporter.expects(:report).never
reporter.report(msg1)
reporter.report(msg2)
5.times { reporter.report(msg3) }
end
it "reports even not all keys provided" do
msg1 = {'nodes' => [{'uid' => 1,
'status' => 'deploying',
@ -244,8 +178,7 @@ describe "TaskProxyReporter" do
expected_msg2 = {'nodes' => [{'uid' => 2,
'status' => 'ready',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'ready',
'progress' => 100}]}
'task_status' => 'ready'}]}
up_reporter.expects(:report).with(msg1)
up_reporter.expects(:report).with(expected_msg2)
reporter.report(msg1)
@ -298,7 +231,7 @@ describe "TaskProxyReporter" do
reporter.report(msg2)
end
it "reports if status is greater" do
it "reports if task status is changed" do
msgs = [
{'nodes' => [{'uid' => 1,
'status' => 'deploying',
@ -317,8 +250,7 @@ describe "TaskProxyReporter" do
'uid' => 1,
'status' => 'ready',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'ready',
'progress' => 100}]}
'task_status' => 'ready'}]}
up_reporter.expects(:report).with(msgs[0])
up_reporter.expects(:report).with(expected_msg2)
@ -349,8 +281,7 @@ describe "TaskProxyReporter" do
'uid' => 1,
'status' => 'ready',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'ready',
'progress' => 100}]}
'task_status' => 'ready'}]}
expected_msg3 = {'nodes' => [{
'uid' => 1,
@ -365,53 +296,6 @@ describe "TaskProxyReporter" do
msgs.each {|msg| reporter.report(msg)}
end
it "doesn't update progress if it less than previous progress with same status" do
msgs = [
{'nodes' => [{'uid' => 1,
'status' => 'deploying',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'running',
'progress' => 50}]},
{'nodes' => [{'uid' => 1,
'status' => 'deploying',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'running',
'progress' => 10 }]},
]
up_reporter.expects(:report).with(msgs[0])
up_reporter.expects(:report).never
msgs.each {|msg| reporter.report(msg)}
end
it "doesn't forget previously reported attributes" do
msgs = [
{'nodes' => [{'uid' => 1,
'status' => 'deploying',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'running',
'progress' => 50}]},
{'nodes' => [{'uid' => 1,
'status' => 'deploying',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'running'}]},
{'nodes' => [{'uid' => 1,
'status' => 'deploying',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'running',
'key' => 'value',
'progress' => 60 }]},
{'nodes' => [{'uid' => 1,
'status' => 'deploying',
'deployment_graph_task_name' => 'test_2',
'task_status' => 'running',
'progress' => 0 }]},
]
up_reporter.expects(:report).with(msgs[0])
up_reporter.expects(:report).with(msgs[2])
up_reporter.expects(:report).never
msgs.each {|msg| reporter.report(msg)}
end
it 'should report if status changed, but progress is not' do
msgs = [
{'nodes' => [{'uid' => 1,