From c5b11b155239c6b563f548c73237718dda9449cb Mon Sep 17 00:00:00 2001 From: "Vladimir Sharshov (warpc)" Date: Fri, 12 Dec 2014 13:36:44 +0300 Subject: [PATCH] Granular deployment Async puppet deployment task support. Use puppet nailgun hooks as main deployment mechanism for new(6.1) and old(<6.1) clusters. TODO: - more tests for puppet tasks; - refactoring. Implements blueprint granular-deployment-based-on-tasks Change-Id: I28928e86ea4017288478703c6075b315b120349a --- lib/astute.rb | 3 + .../deployment_engine/granular_deployment.rb | 182 ++++++++++ lib/astute/orchestrator.rb | 11 + lib/astute/puppet_task.rb | 233 +++++++++++++ lib/astute/puppetd.rb | 183 +--------- lib/astute/server/dispatcher.rb | 25 +- lib/astute/task_manager.rb | 51 +++ spec/unit/nailgun_spec.rb | 186 ++++++++++ spec/unit/puppetd_spec.rb | 317 ++++-------------- 9 files changed, 760 insertions(+), 431 deletions(-) create mode 100644 lib/astute/deployment_engine/granular_deployment.rb create mode 100644 lib/astute/puppet_task.rb create mode 100644 lib/astute/task_manager.rb create mode 100644 spec/unit/nailgun_spec.rb diff --git a/lib/astute.rb b/lib/astute.rb index 3a5948f7..103a49da 100644 --- a/lib/astute.rb +++ b/lib/astute.rb @@ -32,12 +32,15 @@ require 'astute/network' require 'astute/puppetd' require 'astute/deployment_engine/nailyfact' require 'astute/deployment_engine/tasklib' +require 'astute/deployment_engine/granular_deployment' require 'astute/cobbler' require 'astute/cobbler_manager' require 'astute/image_provision' require 'astute/dump' require 'astute/deploy_actions' require 'astute/nailgun_hooks' +require 'astute/puppet_task' +require 'astute/task_manager' ['/astute/pre_deployment_actions/*.rb', '/astute/pre_deploy_actions/*.rb', diff --git a/lib/astute/deployment_engine/granular_deployment.rb b/lib/astute/deployment_engine/granular_deployment.rb new file mode 100644 index 00000000..86a8945a --- /dev/null +++ b/lib/astute/deployment_engine/granular_deployment.rb @@ -0,0 +1,182 @@ +# Copyright 2014 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +class Astute::DeploymentEngine::GranularDeployment < Astute::DeploymentEngine + + NAILGUN_STATUS = ['ready', 'error', 'deploying'] + + def deploy_piece(nodes, retries=1) + return false unless validate_nodes(nodes) + + @ctx.reporter.report(nodes_status(nodes, 'deploying', {'progress' => 0})) + log_preparation(nodes) + + Astute.logger.info "#{@ctx.task_id}: Starting deployment" + + @running_tasks = {} + @nodes_roles = nodes.inject({}) { |h, n| h.merge({n['uid'] => n['role']}) } + @nodes_by_uid = nodes.inject({}) { |h, n| h.merge({ n['uid'] => n }) } + + begin + @task_manager = Astute::TaskManager.new(nodes) + @hook_context = Astute::Context.new( + @ctx.task_id, + HookReporter.new, + Astute::LogParser::NoParsing.new + ) + deploy_nodes(nodes) + rescue => e + # We should fail all nodes in case of post deployment + # process. In other case they will not sending back + # for redeploy + report_nodes = nodes.uniq{ |n| n['uid'] }.map do |node| + { 'uid' => node['uid'], + 'status' => 'error', + 'role' => 'hook', + 'error_type' => 'deploy' + } + end + + @ctx.report_and_update_status('nodes' => report_nodes) + raise e + end + + Astute.logger.info "#{@ctx.task_id}: Finished deployment of nodes" \ + " => roles: #{@nodes_roles.inspect}" + end + + def puppet_task(node_id, task) + # Use fake reporter because of logic. We need to handle report here + Astute::PuppetTask.new( + @hook_context, + @nodes_by_uid[node_id], # Use single node uid instead of task['uids'] + retries=2, + task['parameters']['puppet_manifest'], + task['parameters']['puppet_modules'], + task['parameters']['cwd'], + task['parameters']['timeout'] + ) + end + + def run_task(node_id, task) + Astute.logger.info "#{@ctx.task_id}: run task '#{task.to_yaml}' on node #{node_id}" + @running_tasks[node_id] = puppet_task(node_id, task) + @running_tasks[node_id].run + end + + def check_status(node_id) + status = @running_tasks[node_id].status + if NAILGUN_STATUS.include? status + status + else + raise "Internal error. Unknown status '#{status}'" + end + end + + def deploy_nodes(nodes) + @task_manager.node_uids.each { |n| task = @task_manager.next_task(n) and run_task(n, task) } + + while @task_manager.task_in_queue? + nodes_to_report = [] + @task_manager.node_uids.each do |node_id| + if task = @task_manager.current_task(node_id) + case status = check_status(node_id) + when 'ready' + Astute.logger.info "Task '#{task}' on node uid=#{node_id} ended successfully" + new_task = @task_manager.next_task(node_id) + if new_task + run_task(node_id, new_task) + else + nodes_to_report << process_success_node(node_id, task) + end + when 'deploying' + progress_report = process_running_node(node_id, task, nodes) + nodes_to_report << progress_report if progress_report + when 'error' + Astute.logger.error "Task '#{task}' on node #{node_id} valid, but failed" + nodes_to_report << process_fail_node(node_id, task) + else + raise "Internal error. Known status '#{status}', but handler not provided" + end + else + Astute.logger.debug "No more tasks provided for node #{node_id}" + end + + @ctx.report_and_update_status('nodes' => nodes_to_report) if nodes_to_report.present? + + break unless @task_manager.task_in_queue? + sleep Astute.config.PUPPET_DEPLOY_INTERVAL + end + end + end + + def process_success_node(node_id, task) + Astute.logger.info "No more tasks provided for node #{node_id}. All node " \ + "tasks completed successfully" + { + "uid" => node_id, + 'status' => 'ready', + 'role' => @nodes_roles[node_id], + "progress" => 100, + 'task' => task + } + end + + def process_fail_node(node_id, task) + Astute.logger.error "No more tasks will be executed on the node #{node_id}" + @task_manager.delete_node(node_id) + { + 'uid' => node_id, + 'status' => 'error', + 'error_type' => 'deploy', + 'role' => @nodes_roles[node_id], + 'task' => task + } + end + + def process_running_node(node_id, task, nodes) + begin + # Pass nodes because logs calculation needs IP address of node, not just uid + nodes_progress = @ctx.deploy_log_parser.progress_calculate(Array(node_id), nodes) + if nodes_progress.present? + nodes_progress.map! { |x| x.merge!( + 'status' => 'deploying', + 'role' => @nodes_roles[x['uid']], + 'task' => task + ) } + nodes_progress.first + else + nil + end + rescue => e + Astute.logger.warn "Some error occurred when parse logs for nodes progress: #{e.message}, "\ + "trace: #{e.format_backtrace}" + nil + end + end + + def log_preparation(nodes) + @ctx.deploy_log_parser.prepare(nodes) + rescue => e + Astute.logger.warn "Some error occurred when prepare LogParser: " \ + "#{e.message}, trace: #{e.format_backtrace}" + end + + class HookReporter + def report(msg) + Astute.logger.debug msg + end + end + +end diff --git a/lib/astute/orchestrator.rb b/lib/astute/orchestrator.rb index 310d5f9f..36dc066e 100644 --- a/lib/astute/orchestrator.rb +++ b/lib/astute/orchestrator.rb @@ -53,6 +53,17 @@ module Astute ) end + def granular_deploy(up_reporter, task_id, deployment_info, pre_deployment=[], post_deployment=[]) + deploy_cluster( + up_reporter, + task_id, + deployment_info, + Astute::DeploymentEngine::GranularDeployment, + pre_deployment, + post_deployment + ) + end + def provision(reporter, task_id, engine_attrs, nodes) raise "Nodes to provision are not provided!" if nodes.empty? provision_method = engine_attrs['provision_method'] || 'cobbler' diff --git a/lib/astute/puppet_task.rb b/lib/astute/puppet_task.rb new file mode 100644 index 00000000..4b77227a --- /dev/null +++ b/lib/astute/puppet_task.rb @@ -0,0 +1,233 @@ +# Copyright 2014 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +require 'timeout' + +module Astute + class PuppetTask + + def initialize(ctx, node, retries=1, puppet_manifest=nil, puppet_modules=nil, cwd=nil, timeout=nil) + @ctx = ctx + @node = node + @retries = retries + @puppet_manifest = puppet_manifest || '/etc/puppet/manifests/site.pp' + @puppet_modules = puppet_modules || '/etc/puppet/modules' + @cwd = cwd || '/' + @time_obsorver = TimeObserver.new(timeout || Astute.config.PUPPET_TIMEOUT) + @prev_summary = nil + @is_hung = false + end + + def run + Astute.logger.debug "Waiting for puppet to finish deployment on " \ + "node #{@node['uid']} (timeout = #{@time_obsorver.time_limit} sec)..." + @time_obsorver.start + @prev_summary ||= puppet_status + puppetd_runonce + end + + # expect to run this method with respect of Astute.config.PUPPET_FADE_INTERVAL + def status + # TODO(vsharshov): Should we raise error? + raise Timeout::Error unless @time_obsorver.enough_time? + + last_run = puppet_status + status = node_status(last_run) + Astute.logger.debug "Node #{@node['uid']}(#{@node['role']}) status: #{status}" + + result = case status + when 'succeed' + processing_succeed_node + when 'running' + processing_running_node + when 'error' + processing_error_node(last_run) + end + + #TODO(vsharshov): Should we move it to control module? + @ctx.report_and_update_status('nodes' => [result]) if result + + # ready, error or deploying + result.fetch('status', 'deploying') + end + + private + + def puppetd + puppetd = MClient.new(@ctx, "puppetd", [@node['uid']]) + puppetd.on_respond_timeout do |uids| + nodes = uids.map do |uid| + { + 'uid' => uid, + 'status' => 'error', + 'error_type' => 'deploy', + 'role' => @node['role'] + } + end + @ctx.report_and_update_status('nodes' => nodes) + end + puppetd + end + + def puppet_status + puppetd.last_run_summary.first[:data] + end + + def puppet_run + puppetd.runonce( + :puppet_debug => true, + :manifest => @puppet_manifest, + :modules => @puppet_modules, + :cwd => @cwd + ) + end + + def running?(status) + ['running'].include? status[:status] + end + + def idling?(status) + ['idling'].include? status[:status] + end + + def stopped?(status) + ['stopped', 'disabled'].include? status[:status] + end + + def succeed?(status) + status[:status] == 'stopped' && + status[:resources]['failed'].to_i == 0 && + status[:resources]['failed_to_restart'].to_i == 0 && + status[:time]['last_run'] != (@prev_summary && @prev_summary[:time]['last_run']) + end + + # Runs puppetd.runonce only if puppet is stopped on the host at the time + # If it isn't stopped, we wait a bit and try again. + # Returns list of nodes uids which appear to be with hung puppet. + def puppetd_runonce + started = Time.now.to_i + while Time.now.to_i - started < Astute.config.PUPPET_FADE_TIMEOUT + status = puppet_status + + is_stopped = stopped?(status) + is_idling = idling?(status) + is_running = running?(status) + + #Try to kill 'idling' process and run again by 'runonce' call + puppet_run if is_stopped || is_idling + + break if !is_running && !is_idling + sleep Astute.config.PUPPET_FADE_INTERVAL + end + + if is_running || is_idling + Astute.logger.warn "Following nodes have puppet hung " \ + "(#{is_running ? 'running' : 'idling'}): '#{@node['uid']}'" + @is_hung = true + else + @is_hung = false + end + end + + def node_status(last_run) + case + when @is_hung + 'error' + when succeed?(last_run) && !@is_hung + 'succeed' + when (running?(last_run) || idling?(status)) && !@is_hung + 'running' + when stopped?(status) && !succeed?(last_run) && !@is_hung + 'error' + else + msg = "Unknow status: " \ + "is_hung #{@is_hung}, succeed? #{succeed?(last_run)}, " \ + "running? #{running?(last_run)}, stopped? #{stopped?(status)}, " \ + "idling? #{idling?(status)}" + raise msg + end + end + + def processing_succeed_node + Astute.logger.debug "Puppet completed within #{@time_obsorver.stop} seconds" + { 'uid' => @node['uid'], 'status' => 'ready', 'role' => @node['role'] } + end + + def processing_error_node(last_run) + if @retries > 0 + @retries -= 1 + Astute.logger.debug "Puppet on node #{@node['uid']} will be "\ + "restarted. #{@retries} retries remained." + Astute.logger.info "Retrying to run puppet for following error " \ + "nodes: #{@node['uid']}" + puppetd_runonce + # We need this magic with prev_summary to reflect new puppetd run statuses.. + @prev_summary = last_run + node_report_format('status' => 'deploying') + else + Astute.logger.debug "Node #{@node['uid']} has failed to deploy. " \ + "There is no more retries for puppet run." + node_report_format('status' => 'error', 'error_type' => 'deploy') + end + end + + def processing_running_node + nodes_to_report = [] + begin + # Pass nodes because logs calculation needs IP address of node, not just uid + nodes_progress = @ctx.deploy_log_parser.progress_calculate([@node['uid']], [@node]) + if nodes_progress.present? + Astute.logger.debug "Got progress for nodes: #{nodes_progress.inspect}" + + # Nodes with progress are running, so they are not included in nodes_to_report yet + nodes_progress.map! { |x| x.merge!('status' => 'deploying', 'role' => @node['role']) } + nodes_to_report = nodes_progress + end + rescue => e + Astute.logger.warn "Some error occurred when parse logs for " \ + "nodes progress: #{e.message}, trace: #{e.format_backtrace}" + end + nodes_to_report.first || node_report_format('status' => 'deploying') + end + + def node_report_format(add_info={}) + add_info.merge('uid' => @node['uid'], 'role' => @node['role']) + end + + end #PuppetTask + + class TimeObserver + + def initialize(timeout) + @timeout = timeout + end + + def start + @time_before = Time.now + end + + def stop + (Time.now - @time_before).to_i + end + + def enough_time? + Time.now - @time_before < time_limit + end + + def time_limit + @timeout + end + end #TimeObserver + +end \ No newline at end of file diff --git a/lib/astute/puppetd.rb b/lib/astute/puppetd.rb index b6911e7a..84b8fca2 100644 --- a/lib/astute/puppetd.rb +++ b/lib/astute/puppetd.rb @@ -1,4 +1,4 @@ -# Copyright 2013 Mirantis, Inc. +# Copyright 2014 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -21,8 +21,7 @@ module Astute def self.deploy(ctx, nodes, retries=2, puppet_manifest=nil, puppet_modules=nil, cwd=nil) @ctx = ctx - @nodes_roles = nodes.inject({}) { |h, n| h.merge({n['uid'].to_s => n['role']}) } - @node_retries = nodes.inject({}) { |h, n| h.merge({n['uid'].to_s => retries}) } + @retries = retries @nodes = nodes @puppet_manifest = puppet_manifest || '/etc/puppet/manifests/site.pp' @puppet_modules = puppet_modules || '/etc/puppet/modules' @@ -32,7 +31,7 @@ module Astute nodes (timeout = #{Astute.config.PUPPET_TIMEOUT} sec)..." time_before = Time.now - deploy_nodes(@nodes.map { |n| n['uid'] }) + deploy_nodes time_spent = Time.now - time_before Astute.logger.info "#{@ctx.task_id}: Spent #{time_spent} seconds on puppet run "\ @@ -40,180 +39,18 @@ module Astute end private - # Runs puppetd.runonce only if puppet is stopped on the host at the time - # If it isn't stopped, we wait a bit and try again. - # Returns list of nodes uids which appear to be with hung puppet. - def self.puppetd_runonce(uids) - started = Time.now.to_i - while Time.now.to_i - started < Astute.config.PUPPET_FADE_TIMEOUT - running_uids = puppetd(uids).last_run_summary.select { |x| - ['running', 'idling'].include?(x.results[:data][:status]) - }.map { |n| n.results[:sender] } - stopped_uids = uids - running_uids - @nodes.select { |n| stopped_uids.include? n['uid'] } - .group_by { |n| n['debug'] } - .each do |debug, stop_nodes| - puppetd(stop_nodes.map { |n| n['uid'] }).runonce( - :puppet_debug => true, - :manifest => @puppet_manifest, - :modules => @puppet_modules, - :cwd => @cwd - ) - end - break if running_uids.empty? + def self.deploy_nodes + puppet_tasks = @nodes.map { |n| puppet_task(n) } + puppet_tasks.each(&:run) - uids = running_uids - sleep Astute.config.PUPPET_FADE_INTERVAL - end - Astute.logger.debug "puppetd_runonce completed within #{Time.now.to_i - started} seconds." - Astute.logger.warn "Following nodes have puppet hung: '#{running_uids.join(',')}'" if running_uids.present? - running_uids - end - - def self.calc_nodes_status(last_run, prev_run, hung_nodes=[]) - # Finished are those which are not in running state, - # and changed their last_run time, which is changed after application of catalog, - # at the time of updating last_run_summary file. At that particular time puppet is - # still running, and will finish in a couple of seconds. - # If Puppet had crashed before it got a catalog (e.g. certificate problems), - # it didn't update last_run_summary file and switched to 'stopped' state. - - # Consider only hung nodes which was in last_run - hung_nodes = hung_nodes & last_run.map { |n| n.results[:sender] } - - stopped = last_run.select { |x| ['stopped', 'disabled'].include? x.results[:data][:status] } - - # Select all finished nodes which not failed and changed last_run time. - succeed_nodes = stopped.select { |n| - prev_n = prev_run.find{|ps| ps.results[:sender] == n.results[:sender] } - - n.results[:data][:status] == 'stopped' && - n.results[:data][:resources]['failed'].to_i == 0 && - n.results[:data][:resources]['failed_to_restart'].to_i == 0 && - n.results[:data][:time]['last_run'] != (prev_n && prev_n.results[:data][:time]['last_run']) - }.map{|x| x.results[:sender] } - - stopped_nodes = stopped.map { |x| x.results[:sender] } - error_nodes = stopped_nodes - succeed_nodes - running_nodes = last_run.map {|n| n.results[:sender]} - stopped_nodes - - # Hunged nodes can change state at this moment(success, error or still run), - # but we should to turn it on only in error_nodes - succeed_nodes -= hung_nodes - error_nodes = (error_nodes + hung_nodes).uniq - running_nodes -= hung_nodes - - nodes_to_check = running_nodes + succeed_nodes + error_nodes - all_nodes = last_run.map { |n| n.results[:sender] } - if nodes_to_check.size != all_nodes.size - raise "Internal error. Check: #{nodes_to_check.inspect}, passed #{all_nodes.inspect}" - end - {'succeed' => succeed_nodes, 'error' => error_nodes, 'running' => running_nodes} - end - - - def self.puppetd(uids) - puppetd = MClient.new(@ctx, "puppetd", Array(uids)) - puppetd.on_respond_timeout do |uids| - nodes = uids.map do |uid| - { 'uid' => uid, 'status' => 'error', 'error_type' => 'deploy', 'role' => @nodes_roles[uid] } - end - @ctx.report_and_update_status('nodes' => nodes) - end - puppetd - end - - def self.processing_error_nodes(error_nodes) - nodes_to_report = [] - nodes_to_retry = [] - - error_nodes.each do |uid| - if @node_retries[uid] > 0 - @node_retries[uid] -= 1 - Astute.logger.debug "Puppet on node #{uid.inspect} will be restarted. "\ - "#{@node_retries[uid]} retries remained." - nodes_to_retry << uid - else - Astute.logger.debug "Node #{uid.inspect} has failed to deploy. There is no more retries for puppet run." - nodes_to_report << {'uid' => uid, 'status' => 'error', 'error_type' => 'deploy', 'role' => @nodes_roles[uid] } - end - end - - return nodes_to_report, nodes_to_retry - end - - def self.processing_running_nodes(running_nodes) - nodes_to_report = [] - if running_nodes.present? - begin - # Pass nodes because logs calculation needs IP address of node, not just uid - nodes_progress = @ctx.deploy_log_parser.progress_calculate(running_nodes, @nodes) - if nodes_progress.present? - Astute.logger.debug "Got progress for nodes: #{nodes_progress.inspect}" - # Nodes with progress are running, so they are not included in nodes_to_report yet - nodes_progress.map! { |x| x.merge!('status' => 'deploying', 'role' => @nodes_roles[x['uid']]) } - nodes_to_report = nodes_progress - end - rescue => e - Astute.logger.warn "Some error occurred when parse logs for nodes progress: #{e.message}, "\ - "trace: #{e.format_backtrace}" - end - end - nodes_to_report - end - - def self.processing_succeed_nodes(succeed_nodes) - succeed_nodes.map do |uid| - { 'uid' => uid, 'status' => 'ready', 'role' => @nodes_roles[uid] } + while puppet_tasks.any? { |t| t.status == 'deploying' } + sleep Astute.config.PUPPET_DEPLOY_INTERVAL end end - # As I (Andrey Danin) understand, Puppet agent goes through these steps: - # * Puppetd has 'stopped' state. - # * We run it as a run_once, and puppetd goes to 'idling' state - it trying to - # retrieve catalog. - # * If it can't retrieve catalog, it goes back to 'stopped' state without - # any update of last_run_summary file. - # * If puppetd retrieve catalog, it goes to 'running' state, which means - # it appying catalog to system. - # * When puppetd finished catalog run, it updates last_run_summary file - # but stays in 'running' state for a while. - # * After puppetd finished all internal jobs connected with finished catalog, - # it goes to 'idling' state. - # * After a short time it goes to 'stopped' state because we ran it as a run_once. - def self.deploy_nodes(nodes_to_check) - Timeout::timeout(Astute.config.PUPPET_TIMEOUT) do - prev_summary = puppetd(nodes_to_check).last_run_summary - hung_nodes = puppetd_runonce(nodes_to_check) - - while nodes_to_check.present? - last_run = puppetd(nodes_to_check).last_run_summary - calc_nodes = calc_nodes_status(last_run, prev_summary, hung_nodes) - Astute.logger.debug "Nodes statuses: #{calc_nodes.inspect}" - - report_succeed = processing_succeed_nodes calc_nodes['succeed'] - report_error, nodes_to_retry = processing_error_nodes(calc_nodes['error']) - report_running = processing_running_nodes(calc_nodes['running']) - - nodes_to_report = report_succeed + report_error + report_running - @ctx.report_and_update_status('nodes' => nodes_to_report) if nodes_to_report.present? - - if nodes_to_retry.present? - Astute.logger.info "Retrying to run puppet for following error nodes: #{nodes_to_retry.join(',')}" - hung_nodes = puppetd_runonce(nodes_to_retry) - # We need this magic with prev_summary to reflect new puppetd run statuses.. - prev_summary.delete_if { |x| nodes_to_retry.include?(x.results[:sender]) } - prev_summary += last_run.select { |x| nodes_to_retry.include?(x.results[:sender]) } - end - - # we will iterate only over running nodes and those that we restart deployment for - nodes_to_check = calc_nodes['running'] + nodes_to_retry - - break if nodes_to_check.empty? - sleep Astute.config.PUPPET_DEPLOY_INTERVAL - end - end + def self.puppet_task(n) + PuppetTask.new(@ctx, n, @retries, @puppet_manifest, @puppet_modules, @cwd) end end diff --git a/lib/astute/server/dispatcher.rb b/lib/astute/server/dispatcher.rb index e77230fb..76e20750 100644 --- a/lib/astute/server/dispatcher.rb +++ b/lib/astute/server/dispatcher.rb @@ -91,6 +91,26 @@ module Astute end end + def granular_deploy(data) + Astute.logger.info("'granular_deploy' method called with data: #{data.inspect}") + + reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid']) + begin + @orchestrator.granular_deploy( + reporter, + data['args']['task_uuid'], + data['args']['deployment_info'], + data['args']['pre_deployment'] || [], + data['args']['post_deployment'] || [] + ) + reporter.report('status' => 'ready', 'progress' => 100) + rescue Timeout::Error + msg = "Timeout of deployment is exceeded." + Astute.logger.error msg + reporter.report('status' => 'error', 'error' => msg) + end + end + def verify_networks(data) data.fetch('subtasks', []).each do |subtask| if self.respond_to?(subtask['method']) @@ -114,7 +134,7 @@ module Astute reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid']) result = @orchestrator.multicast_verification(reporter, data['args']['task_uuid'], data['args']['nodes']) report_result(result, reporter) - end + end def dump_environment(data) task_id = data['args']['task_uuid'] @@ -186,7 +206,8 @@ module Astute Astute.logger.info "Try to kill running task #{target_task_uuid}" service_data[:main_work_thread].kill - result = if ['deploy', 'task_deployment'].include? service_data[:tasks_queue].current_task_method + result = if ['deploy', 'task_deployment', 'granular_deploy'].include? + service_data[:tasks_queue].current_task_method @orchestrator.stop_puppet_deploy(reporter, task_uuid, nodes) @orchestrator.remove_nodes(reporter, task_uuid, data['args']['engine'], nodes) else diff --git a/lib/astute/task_manager.rb b/lib/astute/task_manager.rb new file mode 100644 index 00000000..0c11c1a5 --- /dev/null +++ b/lib/astute/task_manager.rb @@ -0,0 +1,51 @@ +# Copyright 2014 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +module Astute + + class TaskManager + def initialize(nodes) + @tasks = nodes.inject({}) do |h, n| + h.merge({n['uid'] => n['tasks'].sort_by{ |f| f['priority'] }.each}) + end + + @current_task = {} + Astute.logger.info "The following tasks will be performed on nodes: " \ + "#{@tasks.map {|k, v| {k => v.to_a}}.to_yaml}" + end + + def current_task(node_id) + @current_task[node_id] + end + + def next_task(node_id) + @current_task[node_id] = @tasks[node_id].next + rescue StopIteration + @current_task[node_id] = nil + delete_node(node_id) + end + + def delete_node(node_id) + @tasks[node_id] = nil + end + + def task_in_queue? + @tasks.select{ |_k,v| v }.present? + end + + def node_uids + @tasks.select{ |_k,v| v }.keys + end + end +end \ No newline at end of file diff --git a/spec/unit/nailgun_spec.rb b/spec/unit/nailgun_spec.rb new file mode 100644 index 00000000..f1a74f73 --- /dev/null +++ b/spec/unit/nailgun_spec.rb @@ -0,0 +1,186 @@ +# Copyright 2014 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +require File.join(File.dirname(__FILE__), '../spec_helper') + +include Astute + +describe "Granular deployment engine" do + include SpecHelpers + + let(:ctx) { + ctx = mock + ctx.stubs(:task_id) + ctx.stubs(:deploy_log_parser).returns(Astute::LogParser::NoParsing.new) + ctx.stubs(:status).returns({}) + reporter = mock + reporter.stubs(:report) + up_reporter = Astute::ProxyReporter::DeploymentProxyReporter.new(reporter, nodes) + ctx.stubs(:reporter).returns(up_reporter) + ctx + } + + let(:deploy_engine) do + Astute::DeploymentEngine::GranularDeployment.new(ctx) + end + + let(:upload_file_hook) do + { + "priority" => 100, + "type" => "upload_file", + "fail_on_error" => false, + "diagnostic_name" => "upload-example-1.0", + "uids" => ['2', '3'], + "parameters" => { + "path" => "/etc/yum.repos.d/fuel_awesome_plugin-0.1.0.repo", + "data" => "[fuel_awesome_plugin-0.1.0]\\nname=Plugin fuel_awesome_plugin-0.1.0 repository\\nbaseurl=http => //10.20.0.2 => 8080/plugins/fuel_awesome_plugin-0.1.0/repositories/centos\\ngpgcheck=0" + } + } + end + + let(:sync_hook) do + { + "priority" => 200, + "type" => "sync", + "fail_on_error" => false, + "diagnostic_name" => "sync-example-1.0", + "uids" => ['1', '2'], + "parameters" => { + "src" => "rsync://10.20.0.2/plugins/fuel_awesome_plugin-0.1.0/deployment_scripts/", + "dst" => "/etc/fuel/plugins/fuel_awesome_plugin-0.1.0/" + } + } + end + + let(:shell_hook) do + { + "priority" => 100, + "type" => "shell", + "fail_on_error" => false, + "diagnostic_name" => "shell-example-1.0", + "uids" => ['1','2','3'], + "parameters" => { + "cmd" => "./deploy.sh", + "cwd" => "/etc/fuel/plugins/fuel_awesome_plugin-0.1.0/", + "timeout" => 60 + } + } + end + + let(:puppet_hook) do + { + "priority" => 300, + "type" => "puppet", + "fail_on_error" => false, + "diagnostic_name" => "puppet-example-1.0", + "uids" => ['1', '3'], + "parameters" => { + "puppet_manifest" => "cinder_glusterfs.pp", + "puppet_modules" => "modules", + "cwd" => "/etc/fuel/plugins/plugin_name-1.0", + "timeout" => 42 + } + } + end + + let(:nodes) do + [ + { + 'uid' => '45', + 'priority' => 200, + 'role' => 'ceph', + 'tasks' => [ + { + 'priority' => 100, + 'type' => 'puppet', + 'uids' => ['45'] + }, + { + 'priority' => 300, + 'type' => 'puppet', + 'uids' => ['45'] + } + ] + }, + { + 'uid' => '46', + 'priority' => 200, + 'role' => 'compute', + 'tasks' => [ + { + 'priority' => 100, + 'type' => 'puppet', + 'uids' => ['46'] + }, + { + 'priority' => 200, + 'type' => 'puppet', + 'uids' => ['46'] + }, + { + 'priority' => 300, + 'type' => 'puppet', + 'uids' => ['46'] + } + ] + } + ] + end + + describe '#deploy_piace' do + it 'should run tasks using puppet task' do + ctx.stubs(:report_and_update_status) + deploy_engine.expects(:deploy_nodes).with(nodes) + + deploy_engine.deploy_piece(nodes) + end + + it 'should report error status if error erased' do + error_report = {'nodes' => + [ + { + 'uid' => '45', + 'status' => 'error', + 'role' => 'hook', + 'error_type' => 'deploy' + }, + { + 'uid' => '46', + 'status' => 'error', + 'role' => 'hook', + 'error_type' => 'deploy' + } + ] + } + + ctx.expects(:report_and_update_status).with(error_report) + deploy_engine.expects(:deploy_nodes).with(nodes).raises("Error simulation") + + deploy_engine.deploy_piece(nodes) rescue nil + end + + it 'should not raise errir if no nodes was sent' do + expect{ deploy_engine.deploy_piece([])}.to_not raise_error + end + + it 'should prepare log for parsing' do + deploy_engine.stubs(:deploy_nodes).with(nodes) + + ctx.deploy_log_parser.expects(:prepare).with(nodes).once + deploy_engine.deploy_piece(nodes) + end + end # 'deploy_piace' + +end # 'describe' diff --git a/spec/unit/puppetd_spec.rb b/spec/unit/puppetd_spec.rb index 700f0314..47fc0505 100644 --- a/spec/unit/puppetd_spec.rb +++ b/spec/unit/puppetd_spec.rb @@ -1,4 +1,4 @@ -# Copyright 2013 Mirantis, Inc. +# Copyright 2014 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -17,269 +17,74 @@ require File.join(File.dirname(__FILE__), '../spec_helper') include Astute -describe "Puppetd" do +describe "PuppetdDeployer" do include SpecHelpers - context "PuppetdDeployer" do - let(:reporter) { mock('reporter') } - - let(:ctx) do - Context.new("task id", ProxyReporter::DeploymentProxyReporter.new(reporter), Astute::LogParser::NoParsing.new) - end - - let(:nodes) { [{'uid' => '1', 'role' => 'compute'}] } - - let(:rpcclient) { mock_rpcclient(nodes) } - - let(:last_run_result) do + let(:nodes) do + [ { - :statuscode =>0, - :data => { - :changes => {"total" => 1}, - :time => {"last_run" => 1358425701}, - :resources => {"failed" => 0}, - :status => "stopped", - :enabled => 1, - :stopped => 1, - :idling => 0, - :running => 0, - :runtime => 1358425701 - }, - :sender=>"1" + 'uid' => '1', + 'role' => 'primary-controller', + 'tasks' => [ + {'name' => 'pr_controller_1', 'description' => 'test1'}, + {'name' => 'pr_controller_2', 'description' => 'test2'}, + {'name' => 'controller_3', 'description' => 'test3'} + ], + 'fail_if_error' => true + }, + { + 'uid' => '2', + 'role' => 'controller', + 'tasks' => [ + {'name' => 'controller_1', 'description' => 'test1'}, + {'name' => 'controller_3', 'description' => 'test3'} + ], + 'fail_if_error' => false } + ] + end + + let(:ctx) do + ctx = mock + ctx.stubs(:task_id) + ctx.stubs(:deploy_log_parser).returns(Astute::LogParser::NoParsing.new) + ctx.stubs(:status).returns({}) + reporter = mock + reporter.stubs(:report) + up_reporter = Astute::ProxyReporter::DeploymentProxyReporter.new(reporter, nodes) + ctx.stubs(:reporter).returns(up_reporter) + ctx + end + + describe '.deploy' do + it 'should deploy nodes' do + PuppetdDeployer.expects(:deploy_nodes).once + + PuppetdDeployer.deploy(ctx, nodes) end - let(:last_run_result_running) do - res = deep_copy(last_run_result) - res[:data].merge!(:status => 'running', :running => 1, :stopped => 0) - res + it 'should use puppet task for deploy' do + puppet_task = mock('puppet_task') + PuppetdDeployer.expects(:puppet_task).with(nodes[0]).returns(puppet_task) + PuppetdDeployer.expects(:puppet_task).with(nodes[1]).returns(puppet_task) + puppet_task.expects(:run).times(nodes.size) + puppet_task.stubs(:status).returns('ready') + + PuppetdDeployer.deploy(ctx, nodes) end - let(:last_run_result_fail) do - res = deep_copy(last_run_result_running) - res[:data].merge!(:runtime => 1358426000, - :time => {"last_run" => 1358426000}, - :resources => {"failed" => 1} - ) - res - end + it 'should sleep between status checks' do + puppet_task = mock('puppet_task') + PuppetdDeployer.expects(:puppet_task).with(nodes[0]).returns(puppet_task) + PuppetdDeployer.expects(:puppet_task).with(nodes[1]).returns(puppet_task) + puppet_task.stubs(:run).times(nodes.size) + puppet_task.stubs(:status).returns('deploying') + .then.returns('ready') + .then.returns('ready') - let(:last_run_failed) do - res = deep_copy(last_run_result_fail) - res[:data].merge!(:status => 'stopped', :stopped => 1, :running => 0) - res - end - - let(:last_run_result_finished) do - res = deep_copy last_run_result - res[:data][:time]['last_run'] = 1358428000 - res[:data][:status] = 'stopped' - res - end - - context 'reportet behavior' do - - let(:prepare_mcollective_env) do - last_run_result_new = deep_copy last_run_result - last_run_result_new[:data][:time]['last_run'] = 1358426000 - - rpcclient_new_res = mock_mc_result(last_run_result_new) - rpcclient_finished_res = mock_mc_result(last_run_result_finished) - rpcclient_valid_result = mock_mc_result(last_run_result) - - rpcclient.stubs(:last_run_summary).returns([rpcclient_valid_result]).then. - returns([rpcclient_valid_result]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([rpcclient_new_res]).then. - returns([rpcclient_finished_res]) - - rpcclient - end - - it "reports ready status for node if puppet deploy finished successfully" do - prepare_mcollective_env - - reporter.expects(:report).with('nodes' => [{'uid' => '1', 'status' => 'ready', 'progress' => 100, 'role' => 'compute'}]) - rpcclient.expects(:runonce).at_least_once.returns([mock_mc_result(last_run_result)]) - - Astute::PuppetdDeployer.deploy(ctx, nodes, retries=0) - end - - context 'multiroles behavior' do - let(:nodes) { [{'uid' => '1', 'role' => 'compute'}] } - let(:nodes_multiroles) { [{'uid' => '1', 'role' => 'controller'}] } - before(:each) do - @ctx = Context.new("task id", - ProxyReporter::DeploymentProxyReporter.new(reporter, nodes + nodes_multiroles), - Astute::LogParser::NoParsing.new - ) - end - - it "it should not send final status before all roles of node will deploy" do - prepare_mcollective_env - - reporter.expects(:report).with('nodes' => [{'uid' => '1', 'status' => 'deploying', 'progress' => 50, 'role' => 'compute'}]) - rpcclient.expects(:runonce).at_least_once.returns([mock_mc_result(last_run_result)]) - - Astute::PuppetdDeployer.deploy(@ctx, nodes, retries=0) - end - end - - end - - context "puppet state transitions" do - - let(:last_run_result_idle_pre) do - res = deep_copy(last_run_result) - res[:data].merge!(:status => 'idling', :idling => 1, :stopped => 0) - res - end - - let(:last_run_result_idle_post) do - res = deep_copy(last_run_result_fail) - res[:data].merge!(:status => 'idling', :idling => 1, :running => 0) - mock_mc_result res - end - - it "publishes error status for node if puppet failed (a full cycle)" do - rpcclient.stubs(:last_run_summary).times(9). - returns([ mock_mc_result(last_run_result) ]).then. - returns([ mock_mc_result(last_run_result) ]).then. - returns([ mock_mc_result(last_run_result_idle_pre) ]).then. - returns([ mock_mc_result(last_run_result_idle_pre) ]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_result_fail) ]).then. - returns([ mock_mc_result(last_run_result_fail) ]).then. - returns([ mock_mc_result(last_run_failed) ]) - - reporter.expects(:report).with('nodes' => [{'status' => 'error', 'error_type' => 'deploy', 'uid' => '1', 'role' => 'compute'}]) - rpcclient.expects(:runonce).once. - returns([ mock_mc_result(last_run_result) ]) - - Astute::PuppetdDeployer.deploy(ctx, nodes, 0) - end - - it "publishes error status for node if puppet failed (a cycle w/o idle states)" do - rpcclient.stubs(:last_run_summary).times(6). - returns([ mock_mc_result(last_run_result) ]).then. - returns([ mock_mc_result(last_run_result) ]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_result_fail) ]).then. - returns([ mock_mc_result(last_run_failed) ]) - - reporter.expects(:report).with('nodes' => [{'status' => 'error', 'error_type' => 'deploy', 'uid' => '1', 'role' => 'compute'}]) - rpcclient.expects(:runonce).once. - returns([ mock_mc_result(last_run_result) ]) - - Astute::PuppetdDeployer.deploy(ctx, nodes, 0) - end - - it "publishes error status for node if puppet failed (a cycle w/o idle and finishing states)" do - rpcclient.stubs(:last_run_summary).times(4). - returns([ mock_mc_result(last_run_result) ]).then. - returns([ mock_mc_result(last_run_result) ]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_failed) ]) - - reporter.expects(:report).with('nodes' => [{'status' => 'error', 'error_type' => 'deploy', 'uid' => '1', 'role' => 'compute'}]) - rpcclient.expects(:runonce).once. - returns([ mock_mc_result(last_run_result) ]) - - Astute::PuppetdDeployer.deploy(ctx, nodes, 0) - end - - it "publishes error status for node if puppet failed (a cycle with one running state only)" do - rpcclient.stubs(:last_run_summary).times(5). - returns([ mock_mc_result(last_run_result) ]).then. - returns([ mock_mc_result(last_run_result) ]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_result_fail) ]).then. - returns([ mock_mc_result(last_run_failed) ]) - - reporter.expects(:report).with('nodes' => [{'status' => 'error', 'error_type' => 'deploy', 'uid' => '1', 'role' => 'compute'}]) - rpcclient.expects(:runonce).once. - returns([ mock_mc_result(last_run_result) ]) - - Astute::PuppetdDeployer.deploy(ctx, nodes, 0) - end - - it "publishes error status for node if puppet failed (a cycle w/o any transitional states)" do - rpcclient.stubs(:last_run_summary).times(3). - returns([ mock_mc_result(last_run_result) ]).then. - returns([ mock_mc_result(last_run_result) ]).then. - returns([ mock_mc_result(last_run_failed) ]) - - reporter.expects(:report).with('nodes' => [{'status' => 'error', 'error_type' => 'deploy', 'uid' => '1', 'role' => 'compute'}]) - rpcclient.expects(:runonce).once. - returns([ mock_mc_result(last_run_result) ]) - - Astute::PuppetdDeployer.deploy(ctx, nodes, 0) - end - - context '' do - around(:each) do |example| - old_value = Astute.config.PUPPET_FADE_INTERVAL - example.run - Astute.config.PUPPET_FADE_INTERVAL = old_value - end - - before(:each) do - Astute.config.PUPPET_FADE_INTERVAL = 1 - end - - it "publishes error status for node if puppet running alien task (attempts been exhausted)" do - rpcclient.stubs(:last_run_summary).at_least(3). - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_result_running) ]) - - reporter.expects(:report).with('nodes' => [{'status' => 'error', 'error_type' => 'deploy', 'uid' => '1', 'role' => 'compute'}]) - rpcclient.expects(:runonce).never - - Astute::PuppetdDeployer.deploy(ctx, nodes, 0) - end - - it "ignore exit code of puppet running of alien task (waited for alien task stop and launched own)" do - rpcclient.stubs(:last_run_summary).at_least(3). - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_failed) ]).then. - returns([ mock_mc_result(last_run_failed) ]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_result_running) ]).then. - returns([ mock_mc_result(last_run_result_finished) ]) - - rpcclient.expects(:runonce).at_least(1).returns([ mock_mc_result(last_run_result) ]) - reporter.expects(:report).with('nodes' => [{'uid' => '1', 'status' => 'ready', 'progress' => 100, 'role' => 'compute'}]) - - Astute::PuppetdDeployer.deploy(ctx, nodes, 1) - end - end - - end - - it "retries to run puppet if it fails" do - rpcclient_valid_result = mock_mc_result(last_run_result) - rpcclient_failed = mock_mc_result(last_run_failed) - rpcclient_fail = mock_mc_result(last_run_result_fail) - rpcclient_succeed = mock_mc_result(last_run_result_finished) - - rpcclient.stubs(:last_run_summary).returns([rpcclient_valid_result]).then. - returns([rpcclient_valid_result]).then. - returns([rpcclient_failed]).then. - returns([rpcclient_failed]).then. - returns([rpcclient_fail]).then. - returns([rpcclient_succeed]) - - reporter.expects(:report).with('nodes' => [{'uid' => '1', 'status' => 'ready', 'progress' => 100, 'role' => 'compute'}]) - rpcclient.expects(:runonce).at_least_once.returns([rpcclient_valid_result]) - - MClient.any_instance.stubs(:rpcclient).returns(rpcclient) - Astute::PuppetdDeployer.deploy(ctx, nodes, retries=1) + PuppetdDeployer.expects(:sleep).with(Astute.config.PUPPET_DEPLOY_INTERVAL) + PuppetdDeployer.deploy(ctx, nodes) end end -end + +end \ No newline at end of file