basic mcollective orchestration with astute
This commit is contained in:
U-GRIDDYNAMICS\ibondar 2012-11-28 16:49:13 +04:00
parent 40997296b8
commit eea165caf5
16 changed files with 440 additions and 0 deletions

View File

@ -0,0 +1,19 @@
$:.unshift File.expand_path('lib', File.dirname(__FILE__))
require 'astute/version'
Gem::Specification.new do |s|
s.name = 'astute'
s.version = Astute::VERSION
s.summary = 'Orchestrator for OpenStack deployment'
s.description = 'Orchestrator of deployment via Puppet & MCollective. Works both with Nailgun and from CLI.'
s.authors = ['Mike Scherbakov']
s.email = ['mscherbakov@mirantis.com']
s.add_dependency 'mcollective-client', '> 2.0.0'
s.files = Dir.glob("{bin,lib,spec}/**/*")
s.executables = ['astute']
s.require_path = 'lib'
end

View File

@ -0,0 +1,23 @@
#!/usr/bin/env ruby
require 'rubygems'
require 'astute'
class ConsoleReporter
def report(msg)
p msg
end
end
reporter = ConsoleReporter.new
nodes = [{'id' => '1', 'ip' => '10.1.1.2', 'uid' => 'devnailgun.mirantis.com', 'role' => 'test_controller'}]
#nodes << {'id' => '2', 'ip' => '10.1.1.3', 'uid' => 'mcoll2', 'role' => 'test_controller'}
networks = [{'id' => 1, 'vlan_id' => 100, 'cidr' => '10.0.0.0/24'},
{'id' => 2, 'vlan_id' => 101, 'cidr' => '192.168.0.0/24'}]
task_id = `uuidgen`.strip
orchestrator = Astute::Orchestrator.new
orchestrator.deploy(reporter, task_id, nodes)
orchestrator.verify_networks(reporter, task_id, nodes, networks)

View File

@ -0,0 +1,27 @@
#!/usr/bin/env ruby
require 'rubygems'
require 'astute'
require 'yaml'
class ConsoleReporter
def report(msg)
p msg
end
end
reporter = ConsoleReporter.new
# load nodes structure from yaml
nodes_raw = YAML::load( File.open( './nodes.yaml' ) )
nodes = Array.new(nodes_raw.size, Hash.new)
indx = 0
nodes_raw.each do |key, value|
nodes[indx] = value
nodes[indx]['uid'] = key
indx = indx + 1
end
task_id = `uuidgen`.strip
orchestrator = Astute::Orchestrator.new
orchestrator.deploy(reporter, task_id, nodes)

View File

@ -0,0 +1,15 @@
fuel-01:
ip: 10.1.1.3
role: controller
fuel-02:
ip: 10.1.1.4
role: controller
fuel-03:
ip: 10.1.1.5
role: controller
fuel-04:
ip: 10.1.1.6
role: compute

View File

@ -0,0 +1,20 @@
require 'json'
require 'logger'
require 'astute/orchestrator'
require 'astute/mclient'
require 'astute/metadata'
require 'astute/deployer'
require 'astute/network'
module Astute
autoload 'Context', 'astute/context'
def self.logger
@logger ||= Logger.new('/var/log/astute.log')
end
def self.logger=(logger)
@logger = logger
end
end

View File

@ -0,0 +1,10 @@
module Astute
class Context
attr_accessor :task_id, :reporter
def initialize(task_id, reporter)
@task_id = task_id
@reporter = reporter
end
end
end

View File

@ -0,0 +1,52 @@
require 'json'
require 'timeout'
PUPPET_TIMEOUT = 30*60
module Astute
module Deployer
private
def self.wait_until_puppet_done(puppetd, previous_run_status)
# Wait for first node is done, than check the next one
# Load to mcollective is reduced by checking only one machine at time in a set
# In fact we need to know if whole set of machines finished deployment
previous_run_status.each do |res|
prev_run = res.results[:data][:lastrun]
last_run = prev_run
while last_run == prev_run
puppetd.discover(:nodes => [res.results[:sender]])
puppet_status = puppetd.status
# logging to false, otherwise we get a message every second
last_run = puppet_status[0].results[:data][:lastrun]
sleep 1 if last_run == prev_run
end
end
end
public
def self.puppet_deploy_with_polling(ctx, nodes)
if nodes.empty?
Astute.logger.info "#{ctx.task_id}: Nodes to deploy are not provided. Do nothing."
return false
end
uids = nodes.map {|n| n['uid']}
puppetd = MClient.new(ctx, "puppetd", uids)
puppet_status = puppetd.status
puppetd.runonce
Astute.logger.debug "Waiting for puppet to finish deployment on all nodes (timeout = #{PUPPET_TIMEOUT} sec)..."
time_before = Time.now
Timeout::timeout(PUPPET_TIMEOUT) do # 30 min for deployment to be done
# Yes, we polling here and yes, it's temporary.
# As a better implementation we can later use separate queue to get result, ex. http://www.devco.net/archives/2012/08/19/mcollective-async-result-handling.php
# or we can rewrite puppet agent not to fork, and increase ttl for mcollective RPC.
wait_until_puppet_done(puppetd, puppet_status)
end
time_spent = Time.now - time_before
Astute.logger.info "#{ctx.task_id}: Spent #{time_spent} seconds on puppet run for following nodes(uids): #{nodes.map {|n| n['uid']}.join(',')}"
end
end
end

View File

@ -0,0 +1,52 @@
require 'mcollective'
module Astute
class MClient
include MCollective::RPC
include Astute
def initialize(ctx, agent, nodes=nil, check_result=true)
@task_id = ctx.task_id
@agent = agent
@nodes = nodes.map { |n| n.to_s }
@check_result = check_result
@mc = rpcclient(agent, :exit_on_failure => false)
@mc.progress = false
unless @nodes.nil?
@mc.discover(:nodes => @nodes)
end
end
def method_missing(method, *args)
res = @mc.send(method, *args)
unless method == :discover
check_mcollective_result(method, res) if @check_result
else
@nodes = args[0][:nodes]
end
return res
end
private
def check_mcollective_result(method, stats)
# Following error might happen because of misconfiguration, ex. direct_addressing = 1 only on client
raise "#{@task_id}: MCollective client failed to call agent '#{@agent}' with method '#{method}' and didn't even return anything. Check logs." if stats.length == 0
if stats.length < @nodes.length
# some nodes didn't respond
nodes_responded = stats.map { |n| n.results[:sender] }
not_responded = @nodes - nodes_responded
raise "#{@task_id}: MCollective agents '#{not_responded.join(',')}' didn't respond."
end
# TODO: should we collect all errors and make one exception with all of data?
stats.each do |node|
status = node.results[:statuscode]
if status != 0
raise "#{@task_id}: MCollective call failed in agent '#{node.agent}', method '#{method}', results: #{node.results.inspect}"
else
Astute.logger.debug "#{@task_id}: MC agent '#{node.agent}', method '#{method}' succeeded, results: #{node.results.inspect}"
end
end
end
end
end

View File

@ -0,0 +1,22 @@
require 'json'
module Astute
module Metadata
def self.publish_facts(ctx, nodes)
if nodes.empty?
Astute.logger.info "#{ctx.task_id}: Nodes to post metadata into are not provided. Do nothing."
return false
end
uids = nodes.map {|n| n['uid']}
Astute.logger.debug "#{ctx.task_id}: nailyfact - storing metadata for nodes: #{uids.join(',')}"
nodes.each do |node|
nailyfact = MClient.new(ctx, "nailyfact", [node['uid']])
metadata = {'role' => node['role']}
# This is synchronious RPC call, so we are sure that data were sent and processed remotely
stats = nailyfact.post(:value => metadata.to_json)
end
end
end
end

View File

@ -0,0 +1,24 @@
module Astute
module Network
def self.check_network(ctx, nodes, networks)
if nodes.length < 2
Astute.logger.info "#{ctx.task_id}: Network checker: at least two nodes are required to check network connectivity. Do nothing."
return []
end
uids = nodes.map {|n| n['uid']}
# TODO Everything breakes if agent not found. We have to handle that
net_probe = MClient.new(ctx, "net_probe", uids)
net_probe.start_frame_listeners(:iflist => ['eth0'].to_json)
# Interface name is hardcoded for now. Later we expect it to be passed from Nailgun backend
data_to_send = {'eth0' => networks.map {|n| n['vlan_id']}.join(',')}
net_probe.send_probing_frames(:interfaces => data_to_send.to_json)
stats = net_probe.get_probing_info
result = stats.map {|node| {'sender' => node.results[:sender], 'data' => node.results[:data]} }
Astute.logger.debug "#{ctx.task_id}: Network checking is done. Raw results: #{result.inspect}"
return result
end
end
end

View File

@ -0,0 +1,106 @@
module Astute
class Orchestrator
def initialize
@deployer = Astute::Deployer.method(:puppet_deploy_with_polling)
@metapublisher = Astute::Metadata.method(:publish_facts)
@check_network = Astute::Network.method(:check_network)
end
def node_type(reporter, task_id, nodes)
context = Context.new(task_id, reporter)
uids = nodes.map {|n| n['uid']}
systemtype = MClient.new(context, "systemtype", uids, check_result=false)
systems = systemtype.get_type
return systems.map {|n| {'uid' => n.results[:sender], 'node_type' => n.results[:data][:node_type].chomp}}
end
def deploy(reporter, task_id, nodes)
context = Context.new(task_id, reporter)
ctrl_nodes = nodes.select {|n| n['role'] == 'controller'}
for node in ctrl_nodes do
deploy_piece(context, node, false)
end
reporter.report({'progress' => 40})
compute_nodes = nodes.select {|n| n['role'] == 'compute'}
deploy_piece(context, compute_nodes, false)
reporter.report({'progress' => 60})
# other_nodes = nodes - ctrl_nodes - compute_nodes
# deploy_piece(context, other_nodes)
return
end
def remove_nodes(reporter, task_id, nodes)
context = Context.new(task_id, reporter)
result = simple_remove_nodes(context, nodes)
return result
end
def verify_networks(reporter, task_id, nodes, networks)
context = Context.new(task_id, reporter)
result = @check_network.call(context, nodes, networks)
result.map! { |node| {'uid' => node['sender'],
'networks' => check_vlans_by_traffic(node['data'][:neighbours]) }
}
return {'networks' => result}
end
private
def simple_remove_nodes(ctx, nodes)
if nodes.empty?
Astute.logger.info "#{ctx.task_id}: Nodes to remove are not provided. Do nothing."
return {'nodes' => nodes}
end
uids = nodes.map {|n| n['uid']}
Astute.logger.info "#{ctx.task_id}: Starting removing of nodes: #{uids.inspect}"
remover = MClient.new(ctx, "erase_node", uids, check_result=false)
result = remover.erase_node(:reboot => true)
Astute.logger.debug "#{ctx.task_id}: Data resieved from nodes: #{result.inspect}"
inaccessible_uids = uids - result.map {|n| n.results[:sender]}
error_nodes = []
erased_nodes = []
result.each do |n|
if n.results[:statuscode] != 0
error_nodes << {'uid' => n.results[:sender],
'error' => "RPC agent 'erase_node' failed. Result: #{n.results.inspect}"}
elsif not n.results[:data][:rebooted]
error_nodes << {'uid' => n.results[:sender],
'error' => "RPC method 'erase_node' failed with message: #{n.results[:data][:error_msg]}"}
else
erased_nodes << {'uid' => n.results[:sender]}
end
end
error_nodes.concat(inaccessible_uids.map {|n| {'uid' => n, 'error' => "Node not answered by RPC."}})
if error_nodes.empty?
answer = {'nodes' => erased_nodes}
else
answer = {'status' => 'error', 'nodes' => erased_nodes, 'error_nodes' => error_nodes}
Astute.logger.error "#{ctx.task_id}: Removing of nodes #{uids.inspect} ends with errors: #{error_nodes.inspect}"
end
Astute.logger.info "#{ctx.task_id}: Finished removing of nodes: #{uids.inspect}"
return answer
end
def deploy_piece(ctx, nodes, publish_role_in_fact)
nodes_roles = nodes.map { |n| { n['uid'] => n['role'] } }
Astute.logger.info "#{ctx.task_id}: Starting deployment of nodes => roles: #{nodes_roles.inspect}"
ctx.reporter.report nodes_status(nodes, 'deploying')
if publish_role_in_fact
@metapublisher.call(ctx, nodes)
end
@deployer.call(ctx, nodes)
ctx.reporter.report nodes_status(nodes, 'ready')
Astute.logger.info "#{ctx.task_id}: Finished deployment of nodes => roles: #{nodes_roles.inspect}"
end
def nodes_status(nodes, status)
{'nodes' => nodes.map { |n| {'uid' => n['uid'], 'status' => status} }}
end
def check_vlans_by_traffic(data)
return data.map{|iface, vlans| {'iface' => iface, 'vlans' => vlans.keys.map{|n| n.to_i} } }
end
end
end

View File

@ -0,0 +1,3 @@
module Astute
VERSION = '0.0.1'
end

View File

@ -0,0 +1,11 @@
/:=$(BUILD_DIR)/gems/
$/astute-0.0.1.gem: astute/astute.gemspec \
$(addprefix astute/bin/,$(call find-files,astute/bin)) \
$(addprefix astute/lib/,$(call find-files,astute/lib)) \
$(addprefix astute/spec/,$(call find-files,astute/spec))
@mkdir -p $(@D)
cd $(<D) && \
gem build $(<F)
mv $(<D)/astute-*.gem $@

View File

@ -0,0 +1,10 @@
#!/bin/bash
for agent in `ls ../mcagent/`; do
echo "Linking agent $agent of mcollective.."
ln -sf `readlink -f ../mcagent/$agent` /usr/libexec/mcollective/mcollective/agent/$agent
done
ln -sfT `readlink -f ../puppet/nailytest` /etc/puppet/modules/nailytest
ln -sf `readlink -f ../puppet/nailytest/examples/site.pp` /etc/puppet/manifests/site.pp
ln -sf `readlink -f ../bootstrap/sync/usr/bin/net_probe.py` /usr/bin/net_probe.py
uuidgen > /etc/bootif # for net_probe plugin
service mcollective restart

View File

@ -0,0 +1,43 @@
require File.join(File.dirname(__FILE__), "..", "spec_helper")
require 'mcollective'
require 'json'
include MCollective::RPC
NODE = "devnailgun.mirantis.com"
describe "MCollective" do
context "When MC agent is up and running" do
it "it should send echo message to MC agent and get it back" do
data_to_send = "simple message of node '#{NODE}'"
mc = rpcclient("fake")
mc.progress = false
mc.discover(:nodes => [NODE])
stats = mc.echo(:msg => data_to_send)
check_mcollective_result(stats)
stats[0].results[:data][:msg].should eql("Hello, it is my reply: #{data_to_send}")
end
it "it should update facts file with new key-value and could get it back" do
data_to_send = {"anykey" => rand(2**30).to_s, "other" => "static"}
mc = rpcclient("nailyfact")
mc.progress = false
mc.discover(:nodes => [NODE])
stats = mc.post(:value => data_to_send.to_json)
check_mcollective_result(stats)
stats = mc.get(:key => "anykey")
check_mcollective_result(stats)
stats[0].results[:data][:value].should eql(data_to_send['anykey'])
stats = mc.get(:key => "other")
check_mcollective_result(stats)
stats[0].results[:data][:value].should eql(data_to_send['other'])
end
end
end
private
def check_mcollective_result(stats)
stats.should have(1).items
stats[0].results[:statuscode].should eql(0)
end

View File

@ -0,0 +1,3 @@
$LOAD_PATH << File.join(File.dirname(__FILE__),"..","lib")
require 'rspec'
require 'astute'