[orchestrator] Reworked deploy method, polling of puppet to get status

This commit is contained in:
Mike Scherbakov 2012-10-11 10:33:18 +04:00 committed by default
parent 3d826eef5c
commit 0ff5425fd3
11 changed files with 389 additions and 27 deletions

View File

@ -1,4 +1,4 @@
require 'naily/reporter'
module Naily
class Dispatcher
@ -14,12 +14,15 @@ module Naily
args
end
def deploy(args)
@orchestrator.deploy(args['nodes'])
def deploy(data)
reporter = Naily::Reporter.new(data['respond_to'])
@orchestrator.deploy(reporter, data['args']['nodes'])
end
def verify_networks(args)
@orchestrator.verify_networks(args['nodes'], args['networks'])
def verify_networks(data)
reporter = Naily::Reporter.new(data['respond_to'])
args = data['args']
@orchestrator.verify_networks(reporter, args['nodes'], args['networks'])
end
end
end

View File

@ -0,0 +1,13 @@
module Naily
class Reporter
def initialize(method)
@method = method
end
def report(msg)
p msg
# TODO call rpc.cast with @method and this msg
end
end
end

View File

@ -77,7 +77,7 @@ module Naily
Naily.logger.info "Processing RPC call #{data['method']}"
begin
result = delegate.send(data['method'], data['args'])
result = delegate.send(data['method'], data)
rescue
Naily.logger.error "Error running RPC method #{data['method']}: #{$!}"
# TODO: send RPC error response

View File

@ -2,6 +2,14 @@
$LOAD_PATH << File.join(File.dirname(__FILE__),"..","lib")
require 'orchestrator'
nodes = [{'mac' => 'nailgun', 'role' => 'test_controller'}]
class DumbReporter
def report(msg)
p msg
end
end
reporter = DumbReporter.new
nodes = [{'mac' => 'devnailgun.mirantis.com', 'role' => 'test_controller'}]
orchestrator = Orchestrator::Orchestrator.new
orchestrator.deploy(nodes)
orchestrator.deploy(reporter, nodes)

View File

@ -1 +1,15 @@
require 'rubygems'
require 'json'
require 'logger'
require 'orchestrator/orchestrator'
module Orchestrator
def self.logger
@logger ||= Logger.new(STDOUT)
end
def self.logger=(logger)
@logger = logger
end
end

View File

@ -1,42 +1,97 @@
require 'json'
require 'timeout'
require 'mcollective'
PUPPET_TIMEOUT = 30*60
module Orchestrator
class Orchestrator
include MCollective::RPC
include MCollective::RPC
private
def check_mcollective_result(stats)
def check_mcollective_result(stats, log=true)
# Following error might happen because of misconfiguration, ex. direct_addressing = 1 only on client
raise "MCollective has failed and didn't even return anything. Check it's logs." if stats.length == 0
result_data = []
stats.each do |agent|
status = agent.results[:statuscode]
raise "MCollective call failed in agent: #{agent}" unless status == 0
result_data << agent.results['data']
if status != 0
@logger.error "MC agent #{agent.agent} has failed, results: #{agent.results.inspect}"
raise "MCollective id='#{agent.results[:sender]}' call failed in agent '#{agent.agent}'"
else
@logger.debug "MC agent #{agent.agent} succeeded, results: #{agent.results.inspect}" if log
end
end
return result_data
end
def wait_until_puppet_done(mc, previous_runs)
# Wait for first node is done, than check the next one
# Load to mcollective is reduced by checking only one machine at time in a set
# In fact we need to know if whole set of machines finished deployment
previous_runs.each do |res|
prev_run = res['ts']
last_run = prev_run
while last_run == prev_run
mc.discover(:nodes => [res['sender']])
puppet_status = mc.status
# logging to false, otherwise we get a message every second
check_mcollective_result(puppet_status, log=false)
last_run = puppet_status[0].results[:data][:lastrun]
sleep 1 if last_run == prev_run
end
end
end
public
def deploy(nodes)
# TODO Check if nodes contains all required data
def initialize
@logger = ::Orchestrator.logger
end
def deploy(reporter, nodes)
macs = nodes.map {|n| n['mac'].gsub(":", "")}
@logger.debug "nailyfact - storing metadata for nodes: #{macs.join(',')}"
nodes.each do |node|
mc = rpcclient("nailyfact")
mc.progress = false
mc.discover(:nodes => [node['mac'].gsub(":", "")])
metadata = {'role' => node['role']}
# This is synchronious RPC call, so we are sure that data were sent and processed remotely
stats = mc.post(:value => metadata.to_json)
check_mcollective_result(stats)
end
macs = nodes.map {|n| n['mac'].gsub(":", "")}
reporter.report({'progress' => 1})
mc = rpcclient("puppetd")
mc.progress = false
mc.discover(:nodes => macs)
stats = mc.runonce
check_mcollective_result(stats)
printrpc mc.status
sleep 5
printrpc mc.status
puppet_status = mc.status
check_mcollective_result(puppet_status)
# In results :lastrun we get the time when Puppet finished it's work last time
previous_runs = puppet_status.map { |res| {'sender' => res.results[:sender],
'ts' => res.results[:data][:lastrun]} }
check_mcollective_result(mc.runonce)
reporter.report({'progress' => 2})
@logger.debug "Waiting for puppet to finish deploymen on all nodes (timeout = #{PUPPET_TIMEOUT} sec)..."
time_before = Time.now
Timeout::timeout(PUPPET_TIMEOUT) do # 30 min for deployment to be done
wait_until_puppet_done(mc, previous_runs)
end
time_spent = Time.now - time_before
@logger.info "Spent #{time_spent} seconds on puppet run for following nodes(macs): #{nodes.map {|n| n['mac']}.join(',')}"
reporter.report({'progress' => 100})
end
def verify_networks(nodes, networks)
def verify_networks(reporter, nodes, networks)
macs = nodes.map {|n| n['mac'].gsub(":", "")}
mc = rpcclient("net_probe")
mc.progress = false

View File

@ -0,0 +1,88 @@
metadata :name => "puppetd",
:description => "Run puppet agent, get its status, and enable/disable it",
:author => "R.I.Pienaar",
:license => "Apache License 2.0",
:version => "1.8",
:url => "https://github.com/puppetlabs/mcollective-plugins",
:timeout => 20
action "last_run_summary", :description => "Get a summary of the last puppet run" do
display :always
output :time,
:description => "Time per resource type",
:display_as => "Times"
output :resources,
:description => "Overall resource counts",
:display_as => "Resources"
output :changes,
:description => "Number of changes",
:display_as => "Changes"
output :events,
:description => "Number of events",
:display_as => "Events"
output :version,
:description => "Puppet and Catalog versions",
:display_as => "Versions"
end
action "enable", :description => "Enable puppet agent" do
output :output,
:description => "String indicating status",
:display_as => "Status"
end
action "disable", :description => "Disable puppet agent" do
output :output,
:description => "String indicating status",
:display_as => "Status"
end
action "runonce", :description => "Invoke a single puppet run" do
#input :forcerun,
# :prompt => "Force puppet run",
# :description => "Should the puppet run happen immediately?",
# :type => :string,
# :validation => '^.+$',
# :optional => true,
# :maxlength => 5
output :output,
:description => "Output from puppet agent",
:display_as => "Output"
end
action "status", :description => "Get puppet agent's status" do
display :always
output :status,
:description => "The status of the puppet agent: disabled, running, idling or stopped",
:display_as => "Status"
output :enabled,
:description => "Whether puppet agent is enabled",
:display_as => "Enabled"
output :running,
:description => "Whether puppet agent is running",
:display_as => "Running"
output :idling,
:description => "Whether puppet agent is idling",
:display_as => "Idling"
output :stopped,
:description => "Whether puppet agent is stopped",
:display_as => "Stopped"
output :lastrun,
:description => "When puppet agent last ran",
:display_as => "Last Run"
output :output,
:description => "String displaying agent status",
:display_as => "Status"
end

View File

@ -0,0 +1,180 @@
module MCollective
module Agent
# An agent to manage the Puppet Daemon
#
# Configuration Options:
# puppetd.splaytime - Number of seconds within which to splay; no splay
# by default
# puppetd.statefile - Where to find the state.yaml file; defaults to
# /var/lib/puppet/state/state.yaml
# puppetd.lockfile - Where to find the lock file; defaults to
# /var/lib/puppet/state/puppetdlock
# puppetd.puppetd - Where to find the puppet agent binary; defaults to
# /usr/bin/puppet agent
# puppetd.summary - Where to find the summary file written by Puppet
# 2.6.8 and newer; defaults to
# /var/lib/puppet/state/last_run_summary.yaml
# puppetd.pidfile - Where to find puppet agent's pid file; defaults to
# /var/run/puppet/agent.pid
class Puppetd<RPC::Agent
metadata :name => "puppetd",
:description => "Run puppet agent, get its status, and enable/disable it",
:author => "R.I.Pienaar",
:license => "Apache License 2.0",
:version => "1.8",
:url => "http://projects.puppetlabs.com/projects/mcollective-plugins/wiki/AgentPuppetd",
:timeout => 30
def startup_hook
@splaytime = @config.pluginconf["puppetd.splaytime"].to_i || 0
@lockfile = @config.pluginconf["puppetd.lockfile"] || "/var/lib/puppet/state/puppetdlock"
@statefile = @config.pluginconf["puppetd.statefile"] || "/var/lib/puppet/state/state.yaml"
@pidfile = @config.pluginconf["puppet.pidfile"] || "/var/run/puppet/agent.pid"
@puppetd = @config.pluginconf["puppetd.puppetd"] || "/usr/bin/puppet agent"
@last_summary = @config.pluginconf["puppet.summary"] || "/var/lib/puppet/state/last_run_summary.yaml"
end
action "last_run_summary" do
last_run_summary
end
action "enable" do
enable
end
action "disable" do
disable
end
action "runonce" do
runonce
end
action "status" do
set_status
end
private
def last_run_summary
summary = YAML.load_file(@last_summary)
reply[:resources] = {"failed"=>0, "changed"=>0, "total"=>0, "restarted"=>0, "out_of_sync"=>0}.merge(summary["resources"])
["time", "events", "changes", "version"].each do |dat|
reply[dat.to_sym] = summary[dat]
end
end
def set_status
reply[:status] = puppet_daemon_status
reply[:running] = reply[:status] == 'running' ? 1 : 0
reply[:enabled] = reply[:status] == 'disabled' ? 0 : 1
reply[:idling] = reply[:status] == 'idling' ? 1 : 0
reply[:stopped] = reply[:status] == 'stopped' ? 1 : 0
reply[:lastrun] = 0
reply[:lastrun] = File.stat(@statefile).mtime.to_i if File.exists?(@statefile)
reply[:output] = "Currently #{reply[:status]}; last completed run #{Time.now.to_i - reply[:lastrun]} seconds ago"
end
def puppet_daemon_status
locked = File.exists?(@lockfile)
disabled = locked && File::Stat.new(@lockfile).zero?
has_pid = File.exists?(@pidfile)
return 'disabled' if disabled
return 'running' if locked && has_pid
return 'idling' if ! locked && has_pid
return 'stopped' if ! has_pid
end
def runonce
set_status
case (reply[:status])
when 'disabled' then # can't run
reply.fail "Empty Lock file exists; puppet agent is disabled."
when 'running' then # can't run two simultaniously
reply.fail "Lock file and PID file exist; puppet agent is running."
when 'idling' then # signal daemon
pid = File.read(@pidfile)
if pid !~ /^\d+$/
reply.fail "PID file does not contain a PID; got #{pid.inspect}"
else
begin
::Process.kill(0, Integer(pid)) # check that pid is alive
# REVISIT: Should we add an extra round of security here, and
# ensure that the PID file is securely owned, or that the target
# process looks like Puppet? Otherwise a malicious user could
# theoretically signal arbitrary processes with this...
begin
::Process.kill("USR1", Integer(pid))
reply[:output] = "Signalled daemonized puppet agent to run (process #{Integer(pid)}); " + (reply[:output] || '')
rescue Exception => e
reply.fail "Failed to signal the puppet agent daemon (process #{pid}): #{e}"
end
rescue Errno::ESRCH => e
# PID is invalid, run puppet onetime as usual
runonce_background
end
end
when 'stopped' then # just run
runonce_background
else
reply.fail "Unknown puppet agent status: #{reply[:status]}"
end
end
def runonce_background
cmd = [@puppetd, "--onetime"]
unless request[:forcerun]
if @splaytime && @splaytime > 0
cmd << "--splaylimit" << @splaytime << "--splay"
end
end
cmd = cmd.join(" ")
output = reply[:output] || ''
run(cmd, :stdout => :output, :chomp => true)
reply[:output] = "Called #{cmd}, " + output + (reply[:output] || '')
end
def enable
if File.exists?(@lockfile)
stat = File::Stat.new(@lockfile)
if stat.zero?
File.unlink(@lockfile)
reply[:output] = "Lock removed"
else
reply[:output] = "Currently running; can't remove lock"
end
else
reply.fail "Already enabled"
end
end
def disable
if File.exists?(@lockfile)
stat = File::Stat.new(@lockfile)
stat.zero? ? reply.fail("Already disabled") : reply.fail("Currently running; can't remove lock")
else
begin
File.open(@lockfile, "w") { |file| }
reply[:output] = "Lock created"
rescue Exception => e
reply.fail "Could not create lock: #{e}"
end
end
end
end
end
end
# vi:tabstop=2:expandtab:ai:filetype=ruby

View File

@ -2,4 +2,5 @@ class nailytest::test_controller {
file { "/tmp/controller-file":
content => "$role",
}
exec { "/bin/sleep 3": }
}

View File

@ -1,8 +1,8 @@
#!/bin/bash
for agent in `ls mcollective/agent/`; do
echo "Linking agent $agent of mcollective.."
ln -sf `readlink -f mcollective/agent/$agent` /usr/share/mcollective/plugins/mcollective/agent/$agent
ln -sf `readlink -f mcollective/agent/$agent` /usr/libexec/mcollective/mcollective/agent/$agent
done
ln -sfT `readlink -f puppet/modules/nailytest` /etc/puppet/modules/nailytest
ln -sf `readlink -f puppet/manifests/site.pp` /etc/puppet/manifests/site.pp
restart mcollective
service mcollective restart

View File

@ -3,14 +3,15 @@ require 'mcollective'
require 'json'
include MCollective::RPC
NODE = "devnailgun.mirantis.com"
describe "MCollective" do
context "When MC agent is up and running" do
it "it should send echo message to MC agent and get it back" do
node = "nailgun"
data_to_send = "simple message of node '#{node}'"
data_to_send = "simple message of node '#{NODE}'"
mc = rpcclient("fake")
mc.progress = false
mc.discover(:nodes => [node])
mc.discover(:nodes => [NODE])
stats = mc.echo(:msg => data_to_send)
check_mcollective_result(stats)
stats[0].results[:data][:msg].should eql("Hello, it is my reply: #{data_to_send}")
@ -18,10 +19,9 @@ describe "MCollective" do
it "it should update facts file with new key-value and could get it back" do
data_to_send = {"anykey" => rand(2**30).to_s, "other" => "static"}
node = "nailgun"
mc = rpcclient("nailyfact")
mc.progress = false
mc.discover(:nodes => [node])
mc.discover(:nodes => [NODE])
stats = mc.post(:value => data_to_send.to_json)
check_mcollective_result(stats)