diff --git a/naily/bin/nailyd b/naily/bin/nailyd index fae9b0343a..2544942047 100755 --- a/naily/bin/nailyd +++ b/naily/bin/nailyd @@ -1,12 +1,12 @@ #!/usr/bin/env ruby require 'naily' - require 'logger' require 'ostruct' require 'optparse' require 'yaml' require 'amqp' +require 'raemon' options = OpenStruct.new options.daemonize = false @@ -14,117 +14,62 @@ options.pidfile = '/var/run/nailyd.pid' options.config_path = '/etc/naily/nailyd.conf' options.log_path = nil options.log_level = 'debug' +options.workers = 10 OptionParser.new do |opts| opts.banner = 'Usage: nailyd [options]' - - opts.separator '' - opts.separator 'Options:' - - opts.on('-d', '--[no-]daemonize', 'Daemonize server') { |flag| options.daemonize = flag } - opts.on('-P', '--pidfile PATH', 'Path to pidfile') { |path| options.pidfile = path } + opts.separator "\nOptions:" + opts.on('-d', '--[no-]daemonize', 'Daemonize server') do |flag| + options.daemonize = flag + end + opts.on('-P', '--pidfile PATH', 'Path to pidfile') do |path| + options.pidfile = path + end + opts.on('-w', '--workers NUMBER', 'Number of worker processes') do |number| + options.workers = number.to_i + end opts.on('-c', '--config PATH', 'Use custom config file') do |path| unless File.exists?(path) - puts "Error: config file #{options.config_path} was not found" + puts "Error: config file #{path} was not found" exit end - options.config_path = path end - opts.on('-l', '--logfile PATH' 'Log file path') do |path| options.log_path = path end - levels = %w{fatal error warn info debug} opts.on('--loglevel LEVEL', levels, "Logging level (#{levels.join(', ')})") do |level| options.log_level = level end - opts.on_tail('-h', '--help', 'Show this message') do puts opts exit end - opts.on_tail('-v', '--version', 'Show version') do puts Naily::VERSION exit end end.parse! -Naily.config.update(YAML.load(File.read(options.config_path))) if File.exists?(options.config_path) - if options.daemonize # After daemonize we can't log to STDOUT, pick a default log file options.log_path ||= "#{Dir.pwd}/naily.log" - - require 'daemons' - Daemons.daemonize :app_name => 'naily' - end +Naily.config.update(YAML.load(File.read(options.config_path))) if File.exists?(options.config_path) Naily.logger = options.log_path ? Logger.new(options.log_path) : Logger.new(STDOUT) Naily.logger.level = Logger.const_get(options.log_level.upcase) -Naily.logger.formatter = proc {|severity, datetime, progname, msg| - severity_map = {'DEBUG' => 'debug', 'INFO' => 'info', 'WARN' => 'warning', - 'ERROR' => 'err', 'FATAL' => 'crit'} - "#{datetime.strftime("%Y-%m-%dT%H:%M:%S")} #{severity_map[severity]}: #{msg}\n" -} - -begin - File.open(options.pidfile, 'w') { |f| f.write($$) } if options.daemonize -rescue - Naily.logger.error "Exception: #{$!}" -end - -def connection_options - { - :host => Naily.config.broker_host, - :port => Naily.config.broker_port, - :username => Naily.config.broker_username, - :password => Naily.config.broker_password, - }.reject { |k, v| v.nil? } +Naily.logger.formatter = proc do |severity, datetime, progname, msg| + severity_map = {'DEBUG' => 'debug', 'INFO' => 'info', 'WARN' => 'warning', 'ERROR' => 'err', 'FATAL' => 'crit'} + "#{Process.pid} #{datetime.strftime("%Y-%m-%dT%H:%M:%S")} #{severity_map[severity]}: #{msg}\n" end Naily.logger.info "Starting..." -EM.run do - AMQP.logging = true - connection = AMQP.connect(connection_options) - channel = AMQP::Channel.new(connection) - channel.on_error do |ch, error| - Naily.logger.fatal "Channel error #{error.inspect}" - stop(connection) - end - exchange = channel.topic(Naily.config.broker_exchange, :durable => true) - - producer = Naily::Producer.new(channel, exchange) - delegate = Naily.config.delegate || Naily::Dispatcher.new(producer) - - server = Naily::Server.new(channel, exchange, delegate, producer) - server.run - - # Example - #reporter = Naily::Reporter.new(producer, "some_method") - #reporter.report("some haha") - - Signal.trap('INT') do - Naily.logger.info "Got INT signal, stopping" - stop(connection) - end - - Signal.trap('TERM') do - Naily.logger.info "Got TERM signal, stopping" - stop(connection) - end - - def stop(connection, &block) - if connection - connection.close { EM.stop(&block) } - else - EM.stop(&block) - end - end -end - -File.unlink(options.pidfile) if options.daemonize # NOTE: bug? +Raemon::Master.start(options.workers, Naily::Worker, + :detach => options.daemonize, + :name => 'naily', + :pid_file => options.pidfile, + :logger => Naily.logger +) diff --git a/naily/lib/naily.rb b/naily/lib/naily.rb index 178ddf5c47..326ae12b7c 100644 --- a/naily/lib/naily.rb +++ b/naily/lib/naily.rb @@ -6,26 +6,18 @@ require 'logger' require 'json' module Naily + autoload 'Worker', 'naily/worker' autoload 'Server', 'naily/server' autoload 'Producer', 'naily/producer' autoload 'Dispatcher', 'naily/dispatcher' autoload 'Reporter', 'naily/reporter' - @logger ||= Logger.new(STDOUT) - @logger.formatter = proc {|severity, datetime, progname, msg| - severity_map = {'DEBUG' => 'debug', 'INFO' => 'info', 'WARN' => 'warning', - 'ERROR' => 'err', 'FATAL' => 'crit'} - "#{datetime.strftime("%Y-%m-%dT%H:%M:%S")} #{severity_map[severity]}: #{msg}\n" - } - Astute.logger = @logger - def self.logger @logger end def self.logger=(logger) Astute.logger = logger - @logger = logger + @logger = logger end - end diff --git a/naily/lib/naily/server.rb b/naily/lib/naily/server.rb index 84452d6d36..45bbb83e55 100644 --- a/naily/lib/naily/server.rb +++ b/naily/lib/naily/server.rb @@ -1,5 +1,4 @@ require 'json' -require 'thread' module Naily class Server @@ -14,11 +13,8 @@ module Naily queue = @channel.queue(Naily.config.broker_queue, :durable => true) queue.bind(@exchange, :routing_key => Naily.config.broker_queue) - Astute::MClient.semaphore = Mutex.new queue.subscribe do |header, payload| - Thread.new do - dispatch payload - end + dispatch payload end Naily.logger.info "Server started" @@ -49,14 +45,13 @@ module Naily Naily.logger.info "Processing RPC call #{data['method']}" begin - result = @delegate.send(data['method'], data) + @delegate.send(data['method'], data) rescue Exception => e Naily.logger.error "Error running RPC method #{data['method']}: #{e.message}, trace: #{e.backtrace.inspect}" if data['respond_to'] reporter = Naily::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid']) reporter.report({'status' => 'error', 'error' => "Error occurred while running method '#{data['method']}'. See logs of Orchestrator for details."}) end - return end end end diff --git a/naily/lib/naily/worker.rb b/naily/lib/naily/worker.rb new file mode 100644 index 0000000000..525d03a48b --- /dev/null +++ b/naily/lib/naily/worker.rb @@ -0,0 +1,75 @@ +require 'raemon' + +module Naily + class Worker + include Raemon::Worker + + def start + super + start_heartbeat + end + + def stop + super + begin + @connection.close{ stop_event_machine } + ensure + stop_event_machine + end + end + + def run + EM.run do + initialize_server.run + end + end + + private + + def start_heartbeat + @heartbeat ||= Thread.new do + sleep 30 + heartbeat! + end + end + + def initialize_server + initialize_amqp + @server = Naily::Server.new(@channel, @exchange, @delegate, @producer) + end + + def initialize_amqp + AMQP.logging = true + @connection = AMQP.connect(connection_options) + create_channel + @exchange = @channel.topic(Naily.config.broker_exchange, :durable => true) + @producer = Naily::Producer.new(@channel, @exchange) + @delegate = Naily.config.delegate || Naily::Dispatcher.new(@producer) + rescue => ex + Naily.logger.error "Exception during AMQP connection initialization: #{ex}" + sleep 15 + retry + end + + def create_channel + @channel = AMQP::Channel.new(@connection) + @channel.on_error do |ch, error| + Naily.logger.fatal "Channel error #{error.inspect}" + stop + end + end + + def connection_options + { + :host => Naily.config.broker_host, + :port => Naily.config.broker_port, + :username => Naily.config.broker_username, + :password => Naily.config.broker_password, + }.reject{|k, v| v.nil? } + end + + def stop_event_machine + EM.stop_event_loop if EM.reactor_running? + end + end +end \ No newline at end of file diff --git a/naily/naily.gemspec b/naily/naily.gemspec index 530869b515..d4d44db8ee 100644 --- a/naily/naily.gemspec +++ b/naily/naily.gemspec @@ -11,11 +11,11 @@ Gem::Specification.new do |s| s.authors = ['Maxim Kulkin'] s.email = ['mkulkin@mirantis.com'] - s.add_dependency 'daemons' - s.add_dependency 'amqp' - s.add_dependency 'symboltable', '>= 1.0.2' + s.add_dependency 'amqp', '0.9.10' s.add_dependency 'astute' - s.add_dependency 'json' + s.add_dependency 'json', '1.6.1' + s.add_dependency 'raemon', '0.3.0' + s.add_dependency 'symboltable', '1.0.2' s.files = Dir.glob("{bin,lib}/**/*") s.executables = ['nailyd']