Merge "Move Naily to Astute"

This commit is contained in:
Jenkins 2014-03-26 12:14:03 +00:00 committed by Gerrit Code Review
commit 3df411acf2
21 changed files with 36 additions and 963 deletions

View File

@ -52,15 +52,14 @@ a new environment. The Nailgun service creates a JSON data structure
with the environment settings, its nodes and their roles and puts this
file into the *RabbitMQ* queue. This message should be received by one
of the worker processes who will actually deploy the environment. These
processes are called *Naily*.
processes are called *Astute*.
.. uml::
package "Master Node" {
component "Nailgun"
interface "RabbitMQ"
package "Naily Worker" {
component Naily
package "Astute Worker" {
component Astute
}
@ -69,12 +68,11 @@ processes are called *Naily*.
}
[Nailgun] -> [RabbitMQ] : Put task into Nailgun queue
[Naily] <- [RabbitMQ] : Take task from Nailgun queue
[Naily] -> [Astute]
[Astute] <- [RabbitMQ] : Take task from Nailgun queue
[Astute] -> [Cobbler] : Set node's settings through XML-RPC
[Cobbler] -> [DHCP and TFTP]
The Naily workers are listening to the RabbitMQ queue and receives
The Astute workers are listening to the RabbitMQ queue and receives
messages. They use the *Astute* library which implements all deployment
actions. First, it starts the provisioning of the environment's nodes.
Astute uses XML-RPC to set these nodes' configuration in Cobbler and

View File

@ -174,10 +174,10 @@ For all additional features and needs you may refer to Alembic documentation:
http://alembic.readthedocs.org/en/latest/tutorial.html
Astute and Naily
Astute
----------------
#. Astute and Naily can be found in fuel-astute repository
#. Astute can be found in fuel-astute repository
#. Install Ruby dependencies::

View File

@ -19,9 +19,8 @@ You can find a detailed breakdown of how this works in the
Following components are involved in managing this process:
- Astute: deployment orchestrator, manages the Puppet cluster (via
MCollective) and the Cobbler provisioning service (over XML-RPC)
- Naily: RPC consumer implementing communication between Nailgun and
Astute over AMQP protocol
MCollective) and the Cobbler provisioning service (over XML-RPC).
RPC consumer implementing communication for Nailgun over AMQP protocol
- Nailgun [#fn1]_: Web UI backend based on the web.py framework,
includes following sub-components:
@ -59,8 +58,8 @@ Logical Architecture Diagram:
[RPC Receiver] --> [Nailgun DB]
}
[Provisioner (Cobbler)] --> [DHCP, DNS, TFTP]
[RPC Consumer (Naily)] --> [RPC Receiver] : AMQP
[RPC Consumer (Naily)] --> [Orchestrator (Astute)] : AMQP
[RPC Consumer (Astute)] --> [RPC Receiver] : AMQP
[RPC Consumer (Astute)] --> [Orchestrator (Astute)] : AMQP
[Orchestrator (Astute)] --> [MCollective]
}
package "Target Node" {

View File

@ -29,8 +29,7 @@ OS Provisioning
WebUser -> Nailgun: add nodes to cluster
WebUser -> Nailgun: deploy cluster
|||
Nailgun -> Naily: Provision CentOS
Naily -> Astute: Provision CentOS
Nailgun -> Astute: Provision CentOS
Astute -> Cobbler: Provision CentOS
Cobbler -> NodePXE: ssh to reboot
Cobbler --> NodePXE: CentOS image
@ -45,8 +44,7 @@ Networks Verification
actor WebUser
WebUser -> Nailgun: verify networks (cluster #1)
Nailgun -> Naily: verify nets (100-120 vlans)
Naily -> Astute: verify nets
Nailgun -> Astute: verify nets (100-120 vlans)
Astute -> MC: start listeners
MC -> net_probe.py: forks to listen
MC --> Astute: listening
@ -59,8 +57,7 @@ Networks Verification
MC -> net_probe.py: stop listeners
net_probe.py --> MC: result
MC --> Astute: result graph
Astute --> Naily: vlans Ok
Naily --> Nailgun: response
Astute --> Nailgun: response vlans Ok
Nailgun --> WebUser: response
@ -70,16 +67,14 @@ Details on Cluster Provisioning & Deployment (via Facter extension)
title Cluster Deployment
actor WebUser
Nailgun -> Naily: Provision,Deploy
Naily -> Astute: Provision,Deploy
Nailgun -> Astute: Provision,Deploy
Astute -> MC: Type of nodes?
MC -> Astute: bootstrap
Astute -> Cobbler: create system,reboot
Astute -> MC: Type of nodes?
MC --> Astute: booted in target OS
Astute --> Naily: provisioned
Naily --> Nailgun: provisioned
Astute --> Nailgun: provisioned
Nailgun --> WebUser: status on UI
Astute -> MC: Create /etc/astute.yaml
@ -91,29 +86,28 @@ Details on Cluster Provisioning & Deployment (via Facter extension)
Puppet -> Puppet: applies $role
Puppet --> MC: done
MC --> Astute: deploy is done
Astute --> Naily: deploy is done
Naily --> Nailgun: deploy is done
Astute --> Nailgun: deploy is done
Nailgun --> WebUser: deploy is done
Once deploy and provisioning messages are accepted by Naily, provisioining
method is called in Astute. Provisioning part creates system in Cobbler and
calls reboot over Cobbler. Then Astute uses `MCollective direct addressing
mode
Once deploy and provisioning messages are accepted by Astute, provisioning
method is called. Provisioning part creates system in Cobbler and
calls reboot over Cobbler. Then Astute uses `MCollective direct addressing
mode
<http://www.devco.net/archives/2012/06/19/mcollective-direct-addressing-mode.ph
p>`_
to check if all required nodes are available, include puppet agent on them. If
to check if all required nodes are available, include puppet agent on them. If
some nodes are not yet ready, Astute waits for a few seconds and tries to
request again. When nodes are booted in target OS, Astute uses upload_file
MCollective plugin to push data to a special file */etc/astute.yaml* on the
request again. When nodes are booted in target OS, Astute uses upload_file
MCollective plugin to push data to a special file */etc/astute.yaml* on the
target system.
Data include role and all other variables needed for deployment. Then, Astute
calls puppetd MCollective plugin to start deployment. Puppet is started on
Data include role and all other variables needed for deployment. Then, Astute
calls puppetd MCollective plugin to start deployment. Puppet is started on
nodes.
Accordingly, puppet agent starts its run. Modules contain facter extension,
which runs before deployment. Extension reads data from */etc/astute.yaml*
placed by mcollective, and extends Facter data with it as a single fact, which
is then parsed by *parseyaml* function to create *$::fuel_settings* data
Accordingly, puppet agent starts its run. Modules contain facter extension,
which runs before deployment. Extension reads data from */etc/astute.yaml*
placed by mcollective, and extends Facter data with it as a single fact, which
is then parsed by *parseyaml* function to create *$::fuel_settings* data
structure. This structure contains all variables as a single hash and
supports embedding of other rich structures such as nodes hash or arrays.
Case structure in running class chooses appropriate class to import,

View File

@ -75,7 +75,7 @@ class NailgunReceiver(object):
node_db = db().query(Node).get(node['uid'])
if not node_db:
logger.error(
u"Failed to delete node '%s' marked as error from Naily:"
u"Failed to delete node '%s' marked as error from Astute:"
" node doesn't exist", str(node)
)
break

View File

@ -136,11 +136,11 @@ LOGS:
<<: *local_log_type
<<: *python_log_format
path: *api_log
- id: naily
- id: astute
name: "Orchestrator"
<<: *local_log_type
<<: *remote_syslog_log_format
path: '/var/log/naily/naily.log'
path: '/var/log/astute/astute.log'
- id: ostf
name: "HealthCheck"
<<: *local_log_type
@ -403,11 +403,11 @@ DUMP:
- type: dir
path: /etc/nailgun
- type: dir
path: /etc/naily
path: /etc/astute
- type: dir
path: /var/log/nailgun
- type: dir
path: /var/log/naily
path: /var/log/astute
- type: dir
path: /var/log/cobbler
- type: dir

View File

@ -179,7 +179,7 @@ class DeletionTask(object):
USE_FAKE = settings.FAKE_TASKS or settings.FAKE_TASKS_AMQP
# no need to call naily if there are no nodes in cluster
# no need to call astute if there are no nodes in cluster
if respond_to == 'remove_cluster_resp' and \
not list(task.cluster.nodes):
rcvr = rpc.receiver.NailgunReceiver()

View File

@ -15,7 +15,7 @@
<p class="enable-selection">
<%= urlify(linebreaks(task.escape('message'))) %>
<% if (task.match({name: 'redhat_setup'})) { %>
<% var options = {type: 'local', source: 'naily', level: 'warning'} %>
<% var options = {type: 'local', source: 'astute', level: 'warning'} %>
<% logsLink = '#cluster/' + cluster.id + '/logs/' + serializeTabOptions(options) %>
<br/><%- $.t('cluster_page.reconfigure_redhat_account', {opt1: '<a href="#releases">', opt2: '</a>'}) %> <a href="<%- logsLink %>" data-i18n="common.see_logs"></a>.
<% } %>

View File

@ -1,3 +0,0 @@
source 'https://rubygems.org'
gemspec

View File

@ -1,43 +0,0 @@
PATH
remote: .
specs:
naily (0.1.0)
amqp (= 0.9.10)
astute
json (= 1.6.1)
raemon (= 0.3.0)
symboltable (= 1.0.2)
GEM
specs:
activesupport (3.0.10)
amq-client (0.9.12)
amq-protocol (>= 1.2.0)
eventmachine
amq-protocol (1.2.0)
amqp (0.9.10)
amq-client (~> 0.9.12)
amq-protocol (~> 1.2.0)
eventmachine
astute (0.0.1)
activesupport (= 3.0.10)
mcollective-client (= 2.3.1)
symboltable (= 1.0.2)
eventmachine (1.0.3)
i18n (0.6.4)
json (1.6.1)
mcollective-client (2.3.1)
i18n
json
stomp
systemu
raemon (0.3.0)
stomp (1.2.8)
symboltable (1.0.2)
systemu (2.5.2)
PLATFORMS
ruby
DEPENDENCIES
naily!

View File

@ -1,89 +0,0 @@
#!/usr/bin/env ruby
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'naily'
require 'logger'
require 'ostruct'
require 'optparse'
require 'yaml'
require 'amqp'
require 'raemon'
options = OpenStruct.new
options.daemonize = false
options.pidfile = '/var/run/nailyd.pid'
options.config_path = '/etc/naily/nailyd.conf'
options.log_path = nil
options.log_level = 'debug'
options.workers = 1
OptionParser.new do |opts|
opts.banner = 'Usage: nailyd [options]'
opts.separator "\nOptions:"
opts.on('-d', '--[no-]daemonize', 'Daemonize server') do |flag|
options.daemonize = flag
end
opts.on('-P', '--pidfile PATH', 'Path to pidfile') do |path|
options.pidfile = path
end
opts.on('-w', '--workers NUMBER', 'Number of worker processes') do |number|
options.workers = number.to_i
end
opts.on('-c', '--config PATH', 'Use custom config file') do |path|
unless File.exists?(path)
puts "Error: config file #{path} was not found"
exit
end
options.config_path = path
end
opts.on('-l', '--logfile PATH' 'Log file path') do |path|
options.log_path = path
end
levels = %w{fatal error warn info debug}
opts.on('--loglevel LEVEL', levels, "Logging level (#{levels.join(', ')})") do |level|
options.log_level = level
end
opts.on_tail('-h', '--help', 'Show this message') do
puts opts
exit
end
opts.on_tail('-v', '--version', 'Show version') do
puts Naily::VERSION
exit
end
end.parse!
if options.daemonize
# After daemonize we can't log to STDOUT, pick a default log file
options.log_path ||= "#{Dir.pwd}/naily.log"
end
Naily.config.update(YAML.load(File.read(options.config_path))) if File.exists?(options.config_path)
Naily.logger = options.log_path ? Logger.new(options.log_path) : Logger.new(STDOUT)
Naily.logger.level = Logger.const_get(options.log_level.upcase)
Naily.logger.formatter = proc do |severity, datetime, progname, msg|
severity_map = {'DEBUG' => 'debug', 'INFO' => 'info', 'WARN' => 'warning', 'ERROR' => 'err', 'FATAL' => 'crit'}
"#{datetime.strftime("%Y-%m-%dT%H:%M:%S")} #{severity_map[severity]}: [#{Process.pid}] #{msg}\n"
end
Naily.logger.info "Starting..."
Raemon::Master.start(options.workers, Naily::Worker,
:detach => options.daemonize,
:name => 'naily',
:pid_file => options.pidfile,
:logger => Naily.logger
)

View File

@ -1,38 +0,0 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'astute'
require 'naily/version'
require 'naily/config'
require 'logger'
require 'json'
module Naily
autoload 'Worker', 'naily/worker'
autoload 'Server', 'naily/server'
autoload 'Producer', 'naily/producer'
autoload 'Dispatcher', 'naily/dispatcher'
autoload 'Reporter', 'naily/reporter'
autoload 'SubtaskReporter', 'naily/reporter'
def self.logger
@logger
end
def self.logger=(logger)
Astute.logger = logger
@logger = logger
end
end

View File

@ -1,68 +0,0 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'symboltable'
require 'singleton'
module Naily
class ConfigError < StandardError; end
class UnknownOptionError < ConfigError
attr_reader :name
def initialize(name)
super("Unknown config option #{name}")
@name = name
end
end
class MyConfig
include Singleton
attr_reader :configtable
def initialize
# We need new instance of SymbolTable. If we use singleton for SymbolTable,
# the same instance will be used in Astute.
@configtable = SymbolTable.new
end
end
class ParseError < ConfigError
attr_reader :line
def initialize(message, line)
super(message)
@line = line
end
end
def self.config
config = MyConfig.instance.configtable
config.update(default_config) if config.empty?
return config
end
def self.default_config
conf = {}
conf[:broker_host] = 'localhost'
conf[:broker_port] = 5672
conf[:broker_username] = 'mcollective'
conf[:broker_password] = 'mcollective'
conf[:broker_service_queue] = 'naily_service'
conf[:broker_queue] = 'naily'
conf[:broker_publisher_queue] = 'nailgun'
conf[:broker_exchange] = 'nailgun'
return conf
end
end

View File

@ -1,201 +0,0 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'naily/reporter'
module Naily
class Dispatcher
def initialize(producer)
@orchestrator = Astute::Orchestrator.new(nil, log_parsing=true)
@producer = producer
@provisionLogParser = Astute::LogParser::ParseProvisionLogs.new
end
def echo(args)
Naily.logger.info 'Running echo command'
args
end
#
# Main worker actions
#
def download_release(data)
# Example of message = {
# {'method': 'download_release',
# 'respond_to': 'download_release_resp',
# 'args':{
# 'task_uuid': 'task UUID',
# 'release_info':{
# 'release_id': 'release ID',
# 'redhat':{
# 'license_type' :"rhn" or "rhsm",
# 'username': 'username',
# 'password': 'password',
# 'satellite': 'satellite host (for RHN license)'
# 'activation_key': 'activation key (for RHN license)'
# }
# }
# }}
Naily.logger.info("'download_release' method called with data: #{data.inspect}")
reporter = Naily::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid'])
release_info = data['args']['release_info']['redhat']
begin
result = @orchestrator.download_release(reporter, data['args']['task_uuid'], release_info)
rescue Timeout::Error
msg = "Timeout of release download is exceeded."
Naily.logger.error msg
reporter.report({'status' => 'error', 'error' => msg})
return
end
end
def check_redhat_credentials(data)
release = data['args']['release_info']
task_id = data['args']['task_uuid']
reporter = Naily::Reporter.new(@producer, data['respond_to'], task_id)
@orchestrator.check_redhat_credentials(reporter, task_id, release)
end
def check_redhat_licenses(data)
release = data['args']['release_info']
nodes = data['args']['nodes']
task_id = data['args']['task_uuid']
reporter = Naily::Reporter.new(@producer, data['respond_to'], task_id)
@orchestrator.check_redhat_licenses(reporter, task_id, release, nodes)
end
def provision(data)
Naily.logger.info("'provision' method called with data: #{data.inspect}")
reporter = Naily::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid'])
begin
@orchestrator.provision(reporter,
data['args']['provisioning_info']['engine'],
data['args']['provisioning_info']['nodes'])
rescue => e
Naily.logger.error "Error running provisioning: #{e.message}, trace: #{e.backtrace.inspect}"
raise StopIteration
end
@orchestrator.watch_provision_progress(
reporter, data['args']['task_uuid'], data['args']['provisioning_info']['nodes'])
end
def deploy(data)
Naily.logger.info("'deploy' method called with data: #{data.inspect}")
reporter = Naily::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid'])
begin
@orchestrator.deploy(reporter, data['args']['task_uuid'], data['args']['deployment_info'])
reporter.report('status' => 'ready', 'progress' => 100)
rescue Timeout::Error
msg = "Timeout of deployment is exceeded."
Naily.logger.error msg
reporter.report('status' => 'error', 'error' => msg)
end
end
def verify_networks(data)
reporter = Naily::SubtaskReporter.new(@producer, data['respond_to'], data['args']['task_uuid'], data['subtasks'])
result = @orchestrator.verify_networks(reporter, data['args']['task_uuid'], data['args']['nodes'])
report_result(result, reporter)
end
def dump_environment(data)
task_id = data['args']['task_uuid']
reporter = Naily::Reporter.new(@producer, data['respond_to'], task_id)
@orchestrator.dump_environment(reporter, task_id, data['args']['lastdump'])
end
def remove_nodes(data)
task_uuid = data['args']['task_uuid']
reporter = Naily::Reporter.new(@producer, data['respond_to'], task_uuid)
nodes = data['args']['nodes']
engine = data['args']['engine']
result = if nodes.empty?
Naily.logger.debug("#{task_uuid} Node list is empty")
nil
else
@orchestrator.remove_nodes(reporter, task_uuid, engine, nodes)
end
report_result(result, reporter)
end
def reset_environment(data)
remove_nodes(data)
end
#
# Service worker actions
#
def stop_deploy_task(data, service_data)
Naily.logger.debug("'stop_deploy_task' service method called with data: #{data.inspect}")
target_task_uuid = data['args']['stop_task_uuid']
task_uuid = data['args']['task_uuid']
return unless task_in_queue?(target_task_uuid, service_data[:tasks_queue])
Naily.logger.debug("Cancel task #{target_task_uuid}. Start")
if target_task_uuid == service_data[:tasks_queue].current_task_id
reporter = Naily::Reporter.new(@producer, data['respond_to'], task_uuid)
result = stop_current_task(data, service_data, reporter)
report_result(result, reporter)
else
replace_future_task(data, service_data)
end
end
private
def task_in_queue?(task_uuid, tasks_queue)
tasks_queue.task_in_queue?(task_uuid)
end
def replace_future_task(data, service_data)
target_task_uuid = data['args']['stop_task_uuid']
task_uuid = data['args']['task_uuid']
new_task_data = data_for_rm_nodes(data)
Naily.logger.info("Replace running task #{target_task_uuid} to new #{task_uuid} with data: #{new_task_data.inspect}")
service_data[:tasks_queue].replace_task(target_task_uuid, new_task_data)
end
def stop_current_task(data, service_data, reporter)
target_task_uuid = data['args']['stop_task_uuid']
task_uuid = data['args']['task_uuid']
nodes = data['args']['nodes']
Naily.logger.info "Try to kill running task #{target_task_uuid}"
service_data[:main_work_thread].kill
@orchestrator.stop_puppet_deploy(reporter, task_uuid, nodes)
@orchestrator.remove_nodes(reporter, task_uuid, data['args']['engine'], nodes)
end
def data_for_rm_nodes(data)
data['method'] = 'remove_nodes'
data
end
def report_result(result, reporter)
result = {} unless result.instance_of?(Hash)
status = {'status' => 'ready', 'progress' => 100}.merge(result)
reporter.report(status)
end
end
end

View File

@ -1,33 +0,0 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Naily
class Producer
def initialize(exchange)
@exchange = exchange
end
def publish(message, options={})
default_options = {:routing_key => Naily.config.broker_publisher_queue,
:content_type => 'application/json'}
options = default_options.merge(options)
begin
@exchange.publish(message.to_json, options)
rescue
Naily.logger.error "Error publishing message: #{$!}"
end
end
end
end

View File

@ -1,49 +0,0 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Naily
class Reporter
def initialize(producer, method, task_uuid)
@producer = producer
@method = method
@task_uuid = task_uuid
end
def report(msg)
msg_with_task = {'task_uuid' => @task_uuid}.merge(msg)
message = {'method' => @method, 'args' => msg_with_task}
Naily.logger.info "Casting message to fuel: #{message.inspect}"
@producer.publish(message)
end
end
class SubtaskReporter < Reporter
def initialize(producer, method, task_uuid, subtasks)
super(producer, method, task_uuid)
@subtasks = subtasks
end
def report_to_subtask(subtask_name, msg)
if @subtasks[subtask_name] and @subtasks[subtask_name].any?
subtask_msg = {'task_uuid'=>@subtasks[subtask_name]['task_uuid']}.merge(msg)
message = {'method' => @subtasks[subtask_name]['respond_to'],
'args' => subtask_msg}
Naily.logger.info "Casting message to fuel: #{message.inspect}"
@producer.publish(message)
else
Naily.logger.info "No subtask #{subtask_name} for : #{@method}"
end
end
end
end

View File

@ -1,174 +0,0 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'json'
require 'naily/task_queue'
module Naily
class Server
def initialize(channel, exchange, delegate, producer, service_channel, service_exchange)
@channel = channel
@exchange = exchange
@delegate = delegate
@producer = producer
@service_channel = service_channel
@service_exchange = service_exchange
end
def run
@queue = @channel.queue(Naily.config.broker_queue, :durable => true).bind(@exchange)
@service_queue = @service_channel.queue("", :exclusive => true, :auto_delete => true).bind(@service_exchange)
@main_work_thread = nil
@tasks_queue = TaskQueue.new
Thread.new(&method(:register_callbacks))
self
end
private
def register_callbacks
main_worker
service_worker
end
def main_worker
@consumer = AMQP::Consumer.new(@channel, @queue)
@consumer.on_delivery do |metadata, payload|
Naily.logger.debug "Process message from worker queue: #{payload.inspect}"
perform_main_job(metadata, payload)
end
@consumer.consume
end
def service_worker
@service_queue.subscribe do |_, payload|
Naily.logger.debug "Process message from service queue: #{payload.inspect}"
perform_service_job(nil, payload)
end
end
def perform_main_job(metadata, payload)
@main_work_thread = Thread.new do
begin
data = parse_data(payload)
@tasks_queue = TaskQueue.new
@tasks_queue.add_task(data)
dispatch(@tasks_queue)
ensure
metadata.ack
end
end
end
def perform_service_job(metadata, payload)
Thread.new do
service_data = {:main_work_thread => @main_work_thread, :tasks_queue => @tasks_queue}
dispatch(parse_data(payload), service_data)
end
end
def dispatch(data, service_data=nil)
data.each_with_index do |message, i|
begin
dispatch_message message, service_data
rescue StopIteration
Naily.logger.debug "Dispatching aborted by #{message['method']}"
abort_messages messages[(i + 1)..-1]
break
rescue => ex
Naily.logger.error "Error running RPC method #{message['method']}: #{ex.message}, trace: #{ex.backtrace.inspect}"
return_results message, {
'status' => 'error',
'error' => "Error occurred while running method '#{message['method']}'. Inspect Orchestrator logs for the details."
}
break
end
end
end
def dispatch_message(data, service_data=nil)
Naily.logger.debug "Dispatching message: #{data.inspect}"
if Naily.config.fake_dispatch
Naily.logger.debug "Fake dispatch"
return
end
unless @delegate.respond_to?(data['method'])
Naily.logger.error "Unsupported RPC call '#{data['method']}'"
return_results data, {
'status' => 'error',
'error' => "Unsupported method '#{data['method']}' called."
}
return
end
Naily.logger.debug "Main worker task id is #{@tasks_queue.current_task_id}" if service_data.nil?
Naily.logger.info "Processing RPC call '#{data['method']}'"
if !service_data
@delegate.send(data['method'], data)
else
@delegate.send(data['method'], data, service_data)
end
end
def return_results(message, results)
if results.is_a?(Hash) && message['respond_to']
reporter = Naily::Reporter.new(@producer, message['respond_to'], message['args']['task_uuid'])
reporter.report results
end
end
def parse_data(data)
Naily.logger.debug "Got message with payload #{data.inspect}"
messages = nil
begin
messages = JSON.load(data)
rescue => e
Naily.logger.error "Error deserializing payload: #{e.message}, trace: #{e.backtrace.inspect}"
end
messages.is_a?(Array) ? messages : [messages]
end
def abort_messages(messages)
return unless messages && messages.size > 0
messages.each do |message|
begin
Naily.logger.debug "Aborting '#{message['method']}'"
err_msg = {
'status' => 'error',
'error' => 'Task aborted',
'progress' => 100
}
if message['args']['nodes'].instance_of?(Array)
err_nodes = message['args']['nodes'].map do |node|
{'uid' => node['uid'], 'status' => 'error', 'error_type' => 'provision', 'progress' => 0}
end
err_msg.merge!('nodes' => err_nodes)
end
return_results(message, err_msg)
rescue => ex
Naily.logger.debug "Failed to abort '#{message['method']}': #{ex.inspect}"
end
end
end
end
end

View File

@ -1,75 +0,0 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'thread'
module Naily
class TaskQueue
include Enumerable
attr_reader :current_task_id
def initialize
@queue = []
@semaphore = Mutex.new
@current_task_id = nil
end
def add_task(data)
@semaphore.synchronize { data.compact.each { |t| @queue << t } }
end
def replace_task(replacing_task_id, new_task_data)
@semaphore.synchronize do
@queue.map! { |x| find_task_id(x) == replacing_task_id ? new_task_data : x }.flatten!
end
end
def remove_task(replacing_task_id)
replace_task(replacing_task_id, nil)
end
def clear_queue
@semaphore.synchronize { @queue.map! { |x| nil } }
end
def task_in_queue?(task_id)
@semaphore.synchronize { @queue.find { |t| find_task_id(t) == task_id } }
end
def each(&block)
@queue.each do |task|
@semaphore.synchronize do
next if task.nil?
@current_task_id = find_task_id(task)
end
if block_given?
block.call task
else
yield task
end
end
ensure
@semaphore.synchronize { @current_task_id = nil }
end
private
def find_task_id(data)
data && data['args'] && data['args']['task_uuid'] ? data['args']['task_uuid'] : nil
end
end
end

View File

@ -1,17 +0,0 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Naily
VERSION = '0.1.0'
end

View File

@ -1,104 +0,0 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'raemon'
module Naily
class Worker
include Raemon::Worker
def start
super
start_heartbeat
end
def stop
super
begin
@connection.close{ stop_event_machine }
ensure
stop_event_machine
end
end
def run
EM.run do
run_server
end
rescue => e
Naily.logger.error "Exception during worker initialization: #{e.inspect}, trace: #{e.backtrace.inspect}"
sleep 5
retry
end
private
def start_heartbeat
@heartbeat ||= Thread.new do
sleep 30
heartbeat!
end
end
def run_server
AMQP.logging = true
AMQP.connect(connection_options) do |connection|
@connection = configure_connection(connection)
@channel = create_channel(@connection)
@exchange = @channel.topic(Naily.config.broker_exchange, :durable => true)
@service_channel = create_channel(@connection, prefetch=false)
@service_exchange = @service_channel.fanout(Naily.config.broker_service_queue, :auto_delete => true)
@producer = Naily::Producer.new(@exchange)
@delegate = Naily.config.delegate || Naily::Dispatcher.new(@producer)
@server = Naily::Server.new(@channel, @exchange, @delegate, @producer, @service_channel, @service_exchange)
@server.run
end
end
def configure_connection(connection)
connection.on_tcp_connection_loss do |conn, settings|
Naily.logger.warn "Trying to reconnect to message broker..."
conn.reconnect
end
connection
end
def create_channel(connection, prefetch=true)
prefetch_opts = ( prefetch ? {:prefetch => 1} : {} )
channel = AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, prefetch_opts)
channel.auto_recovery = true
channel.on_error do |ch, error|
Naily.logger.fatal "Channel error #{error.inspect}"
stop
end
channel
end
def connection_options
{
:host => Naily.config.broker_host,
:port => Naily.config.broker_port,
:username => Naily.config.broker_username,
:password => Naily.config.broker_password,
}.reject{|k, v| v.nil? }
end
def stop_event_machine
EM.stop_event_loop if EM.reactor_running?
end
end
end

View File

@ -1,24 +0,0 @@
$:.unshift File.expand_path('lib', File.dirname(__FILE__))
require 'naily/version'
Gem::Specification.new do |s|
s.name = 'naily'
s.version = Naily::VERSION
s.summary = 'Backend server for Nailgun'
s.description = 'Nailgun deployment job server'
s.authors = ['Maxim Kulkin']
s.email = ['mkulkin@mirantis.com']
s.add_dependency 'amqp', '0.9.10'
s.add_dependency 'astute'
s.add_dependency 'json', '1.6.1'
s.add_dependency 'raemon', '0.3.0'
s.add_dependency 'symboltable', '1.0.2'
s.files = Dir.glob("{bin,lib}/**/*")
s.executables = ['nailyd']
s.require_path = 'lib'
end