Refactored node and task concurrency
Change-Id: Ifbc756a4653d65eeee73bbc112f3ba89582db7d7
This commit is contained in:
parent
baf1d3bb7e
commit
d2a5d4f5fa
|
@ -20,3 +20,5 @@ require 'fuel_deployment/task'
|
|||
require 'fuel_deployment/graph'
|
||||
require 'fuel_deployment/node'
|
||||
require 'fuel_deployment/cluster'
|
||||
require 'fuel_deployment/concurrency/group'
|
||||
require 'fuel_deployment/concurrency/counter'
|
||||
|
|
|
@ -23,11 +23,15 @@ module Deployment
|
|||
#
|
||||
# attr [Object] id Misc identifier of this process
|
||||
# @attr_reader [Hash<Symbol => Deployment::Node>] nodes The nodes of this cluster
|
||||
# @attr [Deployment::Concurrency::Counter] node_concurrency Controls the
|
||||
# maximum number of nodes running tasks at the same time
|
||||
class Cluster
|
||||
# @param [String] id Cluster name
|
||||
def initialize(id=nil)
|
||||
@nodes = {}
|
||||
@id = id
|
||||
@node_concurrency = Deployment::Concurrency::Counter.new
|
||||
@task_concurrency = Deployment::Concurrency::Group.new
|
||||
end
|
||||
|
||||
include Enumerable
|
||||
|
@ -35,6 +39,8 @@ module Deployment
|
|||
|
||||
attr_accessor :id
|
||||
attr_reader :nodes
|
||||
attr_reader :node_concurrency
|
||||
attr_reader :task_concurrency
|
||||
|
||||
# Add an existing node object to the cluster
|
||||
# @param [Deployment::Node] node a new node object
|
||||
|
@ -203,8 +209,10 @@ module Deployment
|
|||
def process_node(node)
|
||||
debug "Process node: #{node}"
|
||||
hook 'pre_node', node
|
||||
return if node.skipped?
|
||||
node.poll
|
||||
return unless node.online?
|
||||
hook 'post_node_poll', node
|
||||
return unless node.ready?
|
||||
ready_task = node.ready_task
|
||||
return unless ready_task
|
||||
ready_task.run
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
module Deployment
|
||||
|
||||
# The Concurrency module contains objects that implement the task
|
||||
# and node concurrency features.
|
||||
module Concurrency
|
||||
# The counter object can keep the current and maximum values,
|
||||
# increment and decrement them and check if the current value
|
||||
# is not bigger then the maximum value.
|
||||
# @attr current [Integer]
|
||||
# @attr maximum [Integer]
|
||||
class Counter
|
||||
def initialize(maximum=0, current=0)
|
||||
self.maximum = maximum
|
||||
self.current = current
|
||||
end
|
||||
|
||||
attr_reader :current
|
||||
attr_reader :maximum
|
||||
|
||||
# Set the current value of this counter
|
||||
# @param value [Integer]
|
||||
# @return [Integer]
|
||||
def current=(value)
|
||||
@current = to_value value
|
||||
end
|
||||
|
||||
# Set the maximum value of this counter
|
||||
# @param value [Integer]
|
||||
# @return [Integer]
|
||||
def maximum=(value)
|
||||
@maximum = to_value value
|
||||
end
|
||||
|
||||
# Convert a value to a positive integer
|
||||
# @param value [String,Integer]
|
||||
# @return [Integer]
|
||||
def to_value(value)
|
||||
begin
|
||||
value = Integer value
|
||||
return 0 unless value > 0
|
||||
value
|
||||
rescue
|
||||
0
|
||||
end
|
||||
end
|
||||
|
||||
# Increase this counter's current value by one
|
||||
# @return [Integer]
|
||||
def increment
|
||||
self.current += 1
|
||||
end
|
||||
alias :inc :increment
|
||||
|
||||
# Decrease this counter's current value by one
|
||||
# @return [Integer]
|
||||
def decrement
|
||||
self.current -= 1
|
||||
end
|
||||
alias :dec :decrement
|
||||
|
||||
# Set this counter's current value to zero
|
||||
# @return [Integer]
|
||||
def zero
|
||||
self.current = 0
|
||||
end
|
||||
|
||||
# Is the current value lesser or equal to the maximum value
|
||||
# @return [true,false]
|
||||
def active?
|
||||
return true unless maximum_set?
|
||||
current < maximum
|
||||
end
|
||||
alias :available? :active?
|
||||
|
||||
# Is the current value bigger then the maximum value
|
||||
# @return [true,false]
|
||||
def inactive?
|
||||
not active?
|
||||
end
|
||||
alias :overflow? :inactive?
|
||||
|
||||
# Check if the maximum value is set
|
||||
# @return [true,false]
|
||||
def maximum_set?
|
||||
maximum != 0
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,100 @@
|
|||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
module Deployment
|
||||
module Concurrency
|
||||
# The concurrency group can keep the collection on Counter
|
||||
# objects, create new ones and retrieve the saved ones by their name
|
||||
# @attr_reader group [Hash<Symbol => Deployment::Concurrency::Counter>]
|
||||
class Group
|
||||
def initialize
|
||||
@group = {}
|
||||
end
|
||||
|
||||
attr_accessor :group
|
||||
|
||||
include Enumerable
|
||||
|
||||
# Loop through all defined Counter objects
|
||||
# @yield [Deployment::Concurrency::Counter]
|
||||
def each(&block)
|
||||
group.each_value(&block)
|
||||
end
|
||||
|
||||
# Create a new Counter object by the given name
|
||||
# @param key [Symbol, String]
|
||||
# @return [Deployment::Concurrency::Counter]
|
||||
def create(key, *args)
|
||||
key = to_key key
|
||||
self.set key, Deployment::Concurrency::Counter.new(*args)
|
||||
end
|
||||
|
||||
# Check if there is a concurrency object by this name
|
||||
# @param key [String,Symbol]
|
||||
# @return [true,false]
|
||||
def key?(key)
|
||||
key = to_key key
|
||||
@group.key? key
|
||||
end
|
||||
alias :exists? :key?
|
||||
|
||||
# Assign a Concurrency object to a key
|
||||
# @param key [Symbol, String]
|
||||
# @param value [Deployment::Concurrency::Counter]
|
||||
# @return [Deployment::Concurrency::Counter]
|
||||
def set(key, value)
|
||||
raise Deployment::InvalidArgument.new self, 'The value should be a Counter object!', value unless value.is_a? Deployment::Concurrency::Counter
|
||||
key = to_key key
|
||||
@group[key] = value
|
||||
end
|
||||
alias :[]= :set
|
||||
|
||||
# Remove a defined Counter object by its name
|
||||
# @param key [Symbol, String]
|
||||
def delete(key)
|
||||
key = to_key key
|
||||
@group.delete key if @group.key? key
|
||||
end
|
||||
alias :remove :delete
|
||||
|
||||
# Retrieve a Concurrency object by the given name
|
||||
# or create a new one if there is no one saved ny this name
|
||||
# @param key [Symbol, String]
|
||||
# @return [Deployment::Concurrency::Counter]
|
||||
def get(key)
|
||||
key = to_key key
|
||||
return @group[key] if @group.key? key
|
||||
create key
|
||||
end
|
||||
alias :[] :get
|
||||
|
||||
# Convert a value to symbol
|
||||
# @param key [Symbol,String]
|
||||
# @return [Symbol]
|
||||
def to_key(key)
|
||||
return key if key.is_a? Symbol
|
||||
processed_key = nil
|
||||
unless key.nil?
|
||||
begin
|
||||
processed_key = key.to_s.to_sym
|
||||
rescue
|
||||
nil
|
||||
end
|
||||
end
|
||||
raise Deployment::InvalidArgument.new self, "The value '#{key}' cannot be used as a concurrency name!" unless processed_key
|
||||
processed_key
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -248,10 +248,10 @@ module Deployment
|
|||
# returns nil if there is no such task
|
||||
# @return [Deployment::Task, nil]
|
||||
def ready_task
|
||||
select do |task|
|
||||
task.ready?
|
||||
end.max_by do |task|
|
||||
task.weight
|
||||
select do |task1|
|
||||
task1.ready?
|
||||
end.max_by do |task2|
|
||||
task2.weight
|
||||
end
|
||||
end
|
||||
alias :next_task :ready_task
|
||||
|
|
|
@ -24,7 +24,7 @@ module Deployment
|
|||
# @attr [Symbol] status The node's status
|
||||
# @attr [String] name The node's name
|
||||
# @attr [Deployment::Task] task The currently running task of this node
|
||||
# @attr [Deploymnet::Cluster] cluster The cluster this node is assigned to
|
||||
# @attr [Deployment::Cluster] cluster The cluster this node is assigned to
|
||||
# @attr [Deployment::Graph] graph The Graph assigned to this node
|
||||
# @attr [Numeric, String] id Misc id that can be used by this node
|
||||
# @attr [true, false] critical This node is critical for the deployment
|
||||
|
@ -69,6 +69,7 @@ module Deployment
|
|||
def status=(value)
|
||||
value = value.to_sym
|
||||
raise Deployment::InvalidArgument.new self, 'Invalid node status!', value unless ALLOWED_STATUSES.include? value
|
||||
status_changes_concurrency @status, value
|
||||
@status = value
|
||||
end
|
||||
|
||||
|
@ -100,6 +101,38 @@ module Deployment
|
|||
@cluster = cluster
|
||||
end
|
||||
|
||||
# Check if this node has a Concurrency::Counter set
|
||||
# and it has a defined maximum value
|
||||
# @return [true,false]
|
||||
def concurrency_present?
|
||||
return false unless cluster.is_a? Deployment::Cluster
|
||||
return false unless cluster.node_concurrency.is_a? Deployment::Concurrency::Counter
|
||||
cluster.node_concurrency.maximum_set?
|
||||
end
|
||||
|
||||
# Check if this node has a free concurrency slot to run a new task
|
||||
# @return [true,false]
|
||||
def concurrency_available?
|
||||
return true unless concurrency_present?
|
||||
cluster.node_concurrency.available?
|
||||
end
|
||||
|
||||
# Increase or decrease the node concurrency value
|
||||
# when the node's status is changed.
|
||||
# @param [Symbol] status_from
|
||||
# @param [Symbol] status_to
|
||||
# @return [void]
|
||||
def status_changes_concurrency(status_from, status_to)
|
||||
return unless concurrency_present?
|
||||
if status_to == :busy
|
||||
cluster.node_concurrency.increment
|
||||
info "Increasing node concurrency to: #{cluster.node_concurrency.current}"
|
||||
elsif status_from == :busy
|
||||
cluster.node_concurrency.decrement
|
||||
info "Decreasing node concurrency to: #{cluster.node_concurrency.current}"
|
||||
end
|
||||
end
|
||||
|
||||
# The node have finished all its tasks
|
||||
# or has one of finished statuses
|
||||
# @return [true, false]
|
||||
|
@ -107,6 +140,13 @@ module Deployment
|
|||
FINISHED_STATUSES.include? status or tasks_are_finished?
|
||||
end
|
||||
|
||||
# Check if this node is ready to receive a task: it's online and
|
||||
# concurrency slots are available.
|
||||
# @return [true, false]
|
||||
def ready?
|
||||
online? and concurrency_available?
|
||||
end
|
||||
|
||||
# The node is online and can accept new tasks
|
||||
# @return [true, false]
|
||||
def online?
|
||||
|
|
|
@ -134,132 +134,40 @@ module Deployment
|
|||
value
|
||||
end
|
||||
|
||||
# Get the current concurrency value for a given task
|
||||
# or perform an action with this value.
|
||||
# @param [Deployment::Task, String, Symbol] task
|
||||
# @param [Symbol] action
|
||||
# @option action [Symbol] :inc Increase the value
|
||||
# @option action [Symbol] :dec Decrease the value
|
||||
# @option action [Symbol] :reset Set the value to zero
|
||||
# @option action [Symbol] :set Manually set the value
|
||||
# @param [Integer] value Manually set to this value
|
||||
# @return [Integer]
|
||||
def self.current_concurrency(task, action = :get, value = nil)
|
||||
@current_concurrency = {} unless @current_concurrency
|
||||
task = task.name if task.is_a? Deployment::Task
|
||||
key = task.to_sym
|
||||
@current_concurrency[key] = 0 unless @current_concurrency[key]
|
||||
return @current_concurrency[key] unless action
|
||||
if action == :inc
|
||||
@current_concurrency[key] += 1
|
||||
elsif action == :dec
|
||||
@current_concurrency[key] -= 1
|
||||
elsif action == :zero
|
||||
@current_concurrency[key] = 0
|
||||
elsif action == :set
|
||||
begin
|
||||
@current_concurrency[key] = Integer(value)
|
||||
rescue TypeError, ArgumentError
|
||||
raise Deployment::InvalidArgument.new self, 'Current concurrency should be an integer number!', value
|
||||
end
|
||||
end
|
||||
@current_concurrency[key] = 0 if @current_concurrency[key] < 0
|
||||
@current_concurrency[key]
|
||||
# Check if this task has a Concurrency::Counter defined in the Group
|
||||
# identified by this task's name and it has a defined maximum value
|
||||
# @return [true,false]
|
||||
def concurrency_present?
|
||||
return false unless node.is_a? Deployment::Node
|
||||
return false unless node.cluster.is_a? Deployment::Cluster
|
||||
return false unless node.cluster.task_concurrency.is_a? Deployment::Concurrency::Group
|
||||
return false unless node.cluster.task_concurrency.key? name
|
||||
node.cluster.task_concurrency[name].maximum_set?
|
||||
end
|
||||
|
||||
# Get or set the maximum concurrency value for a given task.
|
||||
# Value is set if the second argument is provided.
|
||||
# @param [Deployment::Task, String, Symbol] task
|
||||
# @param [Integer, nil] value
|
||||
# @return [Integer]
|
||||
def self.maximum_concurrency(task, value = nil)
|
||||
@maximum_concurrency = {} unless @maximum_concurrency
|
||||
task = task.name if task.is_a? Deployment::Task
|
||||
key = task.to_sym
|
||||
@maximum_concurrency[key] = 0 unless @maximum_concurrency[key]
|
||||
return @maximum_concurrency[key] unless value
|
||||
begin
|
||||
@maximum_concurrency[key] = Integer(value)
|
||||
rescue TypeError, ArgumentError
|
||||
raise Deployment::InvalidArgument.new self, 'Maximum concurrency should be an integer number!', value
|
||||
end
|
||||
@maximum_concurrency[key]
|
||||
# Check if this task has a free concurrency slot to run
|
||||
# @return [true,false]
|
||||
def concurrency_available?
|
||||
return true unless concurrency_present?
|
||||
node.cluster.task_concurrency[name].available?
|
||||
end
|
||||
|
||||
# Get the maximum concurrency
|
||||
# @return [Integer]
|
||||
def maximum_concurrency
|
||||
self.class.maximum_concurrency self
|
||||
end
|
||||
|
||||
# Set the maximum concurrency
|
||||
# @param [Integer] value
|
||||
# @return [Integer]
|
||||
def maximum_concurrency=(value)
|
||||
self.class.maximum_concurrency self, value
|
||||
end
|
||||
|
||||
# Increase or decrease the concurrency value
|
||||
# Increase or decrease the task concurrency value
|
||||
# when the task's status is changed.
|
||||
# @param [Symbol] status_from
|
||||
# @param [Symbol] status_to
|
||||
# @return [void]
|
||||
def status_changes_concurrency(status_from, status_to)
|
||||
return unless maximum_concurrency_is_set?
|
||||
return unless concurrency_present?
|
||||
if status_to == :running
|
||||
current_concurrency_increase
|
||||
info "Increasing concurrency to: #{current_concurrency}"
|
||||
node.cluster.task_concurrency[name].increment
|
||||
info "Increasing task concurrency to: #{node.cluster.task_concurrency[name].current}"
|
||||
elsif status_from == :running
|
||||
current_concurrency_decrease
|
||||
info "Decreasing concurrency to: #{current_concurrency}"
|
||||
node.cluster.task_concurrency[name].decrement
|
||||
info "Decreasing task concurrency to: #{node.cluster.task_concurrency[name].current}"
|
||||
end
|
||||
end
|
||||
|
||||
# Get the current concurrency
|
||||
# @return [Integer]
|
||||
def current_concurrency
|
||||
self.class.current_concurrency self
|
||||
end
|
||||
|
||||
# Increase the current concurrency by one
|
||||
# @return [Integer]
|
||||
def current_concurrency_increase
|
||||
self.class.current_concurrency self, :inc
|
||||
end
|
||||
|
||||
# Decrease the current concurrency by one
|
||||
# @return [Integer]
|
||||
def current_concurrency_decrease
|
||||
self.class.current_concurrency self, :dec
|
||||
end
|
||||
|
||||
# Reset the current concurrency to zero
|
||||
# @return [Integer]
|
||||
def current_concurrency_zero
|
||||
self.class.current_concurrency self, :zero
|
||||
end
|
||||
|
||||
# Manually set the current concurrency value
|
||||
# @param [Integer] value
|
||||
# @return [Integer]
|
||||
def current_concurrency=(value)
|
||||
self.class.current_concurrency self, :set, value
|
||||
end
|
||||
|
||||
# Check if there are concurrency slots available
|
||||
# to run this task.
|
||||
# @return [true, false]
|
||||
def concurrency_available?
|
||||
return true unless maximum_concurrency_is_set?
|
||||
current_concurrency < maximum_concurrency
|
||||
end
|
||||
|
||||
# Check if the maximum concurrency of this task is set
|
||||
# @return [true, false]
|
||||
def maximum_concurrency_is_set?
|
||||
maximum_concurrency > 0
|
||||
end
|
||||
|
||||
ALLOWED_STATUSES.each do |status|
|
||||
method_name = "set_status_#{status}".to_sym
|
||||
define_method(method_name) do
|
||||
|
@ -488,6 +396,7 @@ module Deployment
|
|||
# @return [true, false]
|
||||
def ready?
|
||||
poll_dependencies
|
||||
return false unless concurrency_available?
|
||||
status == :ready
|
||||
end
|
||||
|
||||
|
@ -564,3 +473,4 @@ module Deployment
|
|||
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
# all nodes have no more tasks to run.
|
||||
module Deployment
|
||||
# The current module version
|
||||
VERSION = '0.3.0'
|
||||
VERSION = '0.4.0'
|
||||
|
||||
# Get the current module version
|
||||
# @return [String]
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
require 'spec_helper'
|
||||
require 'set'
|
||||
|
||||
describe Deployment::Node do
|
||||
describe Deployment::Cluster do
|
||||
|
||||
let(:cluster) do
|
||||
cluster = Deployment::Cluster.new
|
||||
|
@ -66,7 +66,7 @@ describe Deployment::Node do
|
|||
subject { cluster }
|
||||
|
||||
context '#attributes' do
|
||||
it 'have an id' do
|
||||
it 'has an id' do
|
||||
expect(subject.id).to eq 'test'
|
||||
end
|
||||
|
||||
|
@ -75,10 +75,18 @@ describe Deployment::Node do
|
|||
expect(subject.id).to eq 1
|
||||
end
|
||||
|
||||
it 'have nodes' do
|
||||
it 'has nodes' do
|
||||
expect(subject.nodes).to eq({:node1 => node1, :node2 => node2})
|
||||
end
|
||||
|
||||
it 'has node_concurrency' do
|
||||
expect(subject.node_concurrency).to be_a Deployment::Concurrency::Counter
|
||||
end
|
||||
|
||||
it 'has task_concurrency' do
|
||||
expect(subject.task_concurrency).to be_a Deployment::Concurrency::Group
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
context '#nodes processing' do
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
require 'spec_helper'
|
||||
require 'set'
|
||||
|
||||
describe Deployment::Concurrency::Counter do
|
||||
let(:counter) do
|
||||
Deployment::Concurrency::Counter.new(0, 0)
|
||||
end
|
||||
|
||||
subject { counter }
|
||||
|
||||
it 'can create an instance' do
|
||||
is_expected.to be_a Deployment::Concurrency::Counter
|
||||
end
|
||||
|
||||
it 'can get the current value' do
|
||||
expect(subject.current).to eq 0
|
||||
end
|
||||
|
||||
it 'can set the current value' do
|
||||
subject.current = 5
|
||||
expect(subject.current).to eq 5
|
||||
end
|
||||
|
||||
it 'can get the maximum value' do
|
||||
expect(subject.maximum).to eq 0
|
||||
end
|
||||
|
||||
it 'can set the maximum value' do
|
||||
subject.maximum = 5
|
||||
expect(subject.maximum).to eq 5
|
||||
end
|
||||
|
||||
it 'can convert objects to values' do
|
||||
expect(subject.to_value 2).to eq 2
|
||||
expect(subject.to_value 0).to eq 0
|
||||
expect(subject.to_value '1').to eq 1
|
||||
expect(subject.to_value 'a').to eq 0
|
||||
expect(subject.to_value nil).to eq 0
|
||||
expect(subject.to_value []).to eq 0
|
||||
expect(subject.to_value true).to eq 0
|
||||
end
|
||||
|
||||
it 'can increment the current value' do
|
||||
subject.current = 1
|
||||
subject.increment
|
||||
expect(subject.current).to eq 2
|
||||
end
|
||||
|
||||
it 'can decrement the current value, but not below zero' do
|
||||
subject.current = 1
|
||||
subject.decrement
|
||||
expect(subject.current).to eq 0
|
||||
subject.decrement
|
||||
expect(subject.current).to eq 0
|
||||
end
|
||||
|
||||
it 'can zero the current value' do
|
||||
subject.current = 5
|
||||
subject.zero
|
||||
expect(subject.current).to eq 0
|
||||
end
|
||||
|
||||
it 'can check that counter is active - current value is less then maximum' do
|
||||
subject.maximum = 2
|
||||
subject.current = 1
|
||||
is_expected.to be_active
|
||||
is_expected.not_to be_inactive
|
||||
end
|
||||
|
||||
it 'can check that counter is inactive - current value more then maximum' do
|
||||
subject.maximum = 1
|
||||
subject.current = 2
|
||||
is_expected.not_to be_active
|
||||
is_expected.to be_inactive
|
||||
end
|
||||
|
||||
it 'is NOT active if the current value is equal to the maximum value' do
|
||||
subject.maximum = 2
|
||||
subject.current = 2
|
||||
is_expected.not_to be_active
|
||||
end
|
||||
|
||||
it 'can check if the maximum value is set - is more then zero' do
|
||||
subject.maximum = 1
|
||||
is_expected.to be_maximum_set
|
||||
subject.maximum = 0
|
||||
is_expected.not_to be_maximum_set
|
||||
end
|
||||
|
||||
it 'is active if the maximum value is not set regardless of the current value' do
|
||||
subject.current = 10
|
||||
subject.maximum = 1
|
||||
is_expected.not_to be_active
|
||||
subject.maximum = 0
|
||||
is_expected.to be_active
|
||||
end
|
||||
|
||||
end
|
|
@ -0,0 +1,99 @@
|
|||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
require 'spec_helper'
|
||||
require 'set'
|
||||
|
||||
describe Deployment::Concurrency::Group do
|
||||
let(:counter) do
|
||||
Deployment::Concurrency::Counter.new(0, 0)
|
||||
end
|
||||
|
||||
let(:group) do
|
||||
group = Deployment::Concurrency::Group.new
|
||||
group['test'] = counter
|
||||
group
|
||||
end
|
||||
|
||||
subject { group }
|
||||
|
||||
it 'can create an instance' do
|
||||
is_expected.to be_a Deployment::Concurrency::Group
|
||||
end
|
||||
|
||||
it 'has the group Hash attribute' do
|
||||
expect(subject.group).to be_a Hash
|
||||
end
|
||||
|
||||
it 'can check if there is a Counter by its name' do
|
||||
expect(group.key? 'test').to eq true
|
||||
expect(group.key? 'missing').to eq false
|
||||
end
|
||||
|
||||
it 'can get an existing counter' do
|
||||
expect(subject['test']).to eq counter
|
||||
end
|
||||
|
||||
it 'can set an existing counter' do
|
||||
subject['test1'] = counter
|
||||
expect(subject['test1']).to eq counter
|
||||
end
|
||||
|
||||
it 'can remove an existing counter' do
|
||||
expect(group.key? 'test').to eq true
|
||||
group.delete 'test'
|
||||
expect(group.key? 'test').to eq false
|
||||
end
|
||||
|
||||
it 'will refuse to set an incorrect value' do
|
||||
expect do
|
||||
subject['a'] = 1
|
||||
end.to raise_error Deployment::InvalidArgument, /value should be a Counter/
|
||||
end
|
||||
|
||||
it 'can create a new Counter in the group' do
|
||||
expect(group.key? 'my_counter').to eq false
|
||||
subject.create 'my_counter'
|
||||
expect(group.key? 'my_counter').to eq true
|
||||
end
|
||||
|
||||
it 'will create a new instance of Counter if asked to get a nonexistent Counter' do
|
||||
expect(group.key? 'another_counter').to eq false
|
||||
expect(subject['another_counter']).to be_a Deployment::Concurrency::Counter
|
||||
expect(group.key? 'another_counter').to eq true
|
||||
end
|
||||
|
||||
it 'can convert an object to a key' do
|
||||
expect(subject.to_key :a).to eq :a
|
||||
expect(subject.to_key 'a').to eq :a
|
||||
expect(subject.to_key 1).to eq :'1'
|
||||
end
|
||||
|
||||
it 'will refuse to use a value that cannot be converted to a key' do
|
||||
expect do
|
||||
subject.to_key nil
|
||||
end.to raise_error Deployment::InvalidArgument, /cannot be used/
|
||||
end
|
||||
|
||||
it 'can loop through all Counter objects' do
|
||||
expect(subject.each.to_a).to eq [counter]
|
||||
end
|
||||
|
||||
it 'can act as an Enumerable container with Counters' do
|
||||
maximum_values = subject.map do |counter|
|
||||
counter.maximum
|
||||
end
|
||||
expect(maximum_values).to eq [0]
|
||||
end
|
||||
end
|
|
@ -154,6 +154,84 @@ describe Deployment::Node do
|
|||
end
|
||||
end
|
||||
|
||||
context '#concurrency' do
|
||||
|
||||
context 'maximum is not set' do
|
||||
it 'concurrency is NOT present' do
|
||||
is_expected.not_to be_concurrency_present
|
||||
end
|
||||
|
||||
it 'concurrency is available' do
|
||||
is_expected.to be_concurrency_available
|
||||
end
|
||||
|
||||
it 'will not try to count node concurrency if maximum is not set' do
|
||||
subject.status = :busy
|
||||
expect(subject.cluster.node_concurrency.current).to eq 0
|
||||
subject.status = :successful
|
||||
expect(subject.cluster.node_concurrency.current).to eq 0
|
||||
end
|
||||
|
||||
it 'online node is counted as a ready node' do
|
||||
subject.status == :online
|
||||
is_expected.to be_ready
|
||||
end
|
||||
end
|
||||
context 'maximum is set and active' do
|
||||
before(:each) do
|
||||
cluster.node_concurrency.maximum = 2
|
||||
cluster.node_concurrency.current = 1
|
||||
end
|
||||
|
||||
it 'concurrency is present' do
|
||||
is_expected.to be_concurrency_present
|
||||
end
|
||||
|
||||
it 'concurrency is available' do
|
||||
is_expected.to be_concurrency_available
|
||||
end
|
||||
|
||||
it 'can change the current concurrency when the status of the node changes' do
|
||||
subject.status = :busy
|
||||
expect(subject.cluster.node_concurrency.current).to eq 2
|
||||
subject.status = :successful
|
||||
expect(subject.cluster.node_concurrency.current).to eq 1
|
||||
end
|
||||
|
||||
it 'online node is counted as a ready node' do
|
||||
subject.status == :online
|
||||
is_expected.to be_ready
|
||||
end
|
||||
end
|
||||
|
||||
context 'maximum is set and not active' do
|
||||
before(:each) do
|
||||
cluster.node_concurrency.maximum = 1
|
||||
cluster.node_concurrency.current = 2
|
||||
end
|
||||
|
||||
it 'concurrency is present' do
|
||||
is_expected.to be_concurrency_present
|
||||
end
|
||||
|
||||
it 'concurrency is NOT available' do
|
||||
is_expected.not_to be_concurrency_available
|
||||
end
|
||||
|
||||
it 'can change the current concurrency when the status of the node changes' do
|
||||
subject.status = :busy
|
||||
expect(subject.cluster.node_concurrency.current).to eq 3
|
||||
subject.status = :successful
|
||||
expect(subject.cluster.node_concurrency.current).to eq 2
|
||||
end
|
||||
|
||||
it 'online node is NOT counted as a ready node' do
|
||||
subject.status == :online
|
||||
is_expected.not_to be_ready
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context '#inspection' do
|
||||
|
||||
it 'can to_s' do
|
||||
|
|
|
@ -341,84 +341,107 @@ describe Deployment::Task do
|
|||
end
|
||||
|
||||
context '#concurrency' do
|
||||
it 'has maximum_concurrency' do
|
||||
expect(subject.maximum_concurrency).to eq 0
|
||||
|
||||
context 'concurrency is not defined' do
|
||||
it 'concurrency is NOT present' do
|
||||
is_expected.not_to be_concurrency_present
|
||||
end
|
||||
|
||||
it 'concurrency is available' do
|
||||
is_expected.to be_concurrency_available
|
||||
end
|
||||
|
||||
it 'will not try to change the current concurrency when the status of the task changes' do
|
||||
subject.status = :running
|
||||
expect(cluster.task_concurrency[subject.name].current).to eq 0
|
||||
subject.status = :successful
|
||||
expect(cluster.task_concurrency[subject.name].current).to eq 0
|
||||
end
|
||||
|
||||
it 'ready task is counted as a ready task' do
|
||||
subject.status == :ready
|
||||
is_expected.to be_ready
|
||||
end
|
||||
end
|
||||
|
||||
it 'can set maximum_concurrency' do
|
||||
subject.maximum_concurrency = 1
|
||||
expect(subject.maximum_concurrency).to eq 1
|
||||
subject.maximum_concurrency = '2'
|
||||
expect(subject.maximum_concurrency).to eq 2
|
||||
expect do
|
||||
subject.maximum_concurrency = 'value'
|
||||
end.to raise_error Deployment::InvalidArgument, /should be an integer/
|
||||
context 'defined, but maximum is not set' do
|
||||
before(:each) do
|
||||
cluster.task_concurrency.create 'task1'
|
||||
end
|
||||
|
||||
it 'concurrency is NOT present' do
|
||||
is_expected.not_to be_concurrency_present
|
||||
end
|
||||
|
||||
it 'concurrency is available' do
|
||||
is_expected.to be_concurrency_available
|
||||
end
|
||||
|
||||
it 'will not try to change the current concurrency when the status of the task changes' do
|
||||
subject.status = :running
|
||||
expect(cluster.task_concurrency[subject.name].current).to eq 0
|
||||
subject.status = :successful
|
||||
expect(cluster.task_concurrency[subject.name].current).to eq 0
|
||||
end
|
||||
|
||||
it 'ready task is counted as a ready task' do
|
||||
subject.status == :ready
|
||||
is_expected.to be_ready
|
||||
end
|
||||
end
|
||||
|
||||
it 'can read the current concurrency counter' do
|
||||
expect(subject.current_concurrency).to eq 0
|
||||
context 'maximum is set and active' do
|
||||
before(:each) do
|
||||
cluster.task_concurrency['task1'].maximum = 2
|
||||
cluster.task_concurrency['task1'].current = 1
|
||||
end
|
||||
|
||||
it 'concurrency is present' do
|
||||
is_expected.to be_concurrency_present
|
||||
end
|
||||
|
||||
it 'concurrency is available' do
|
||||
is_expected.to be_concurrency_available
|
||||
end
|
||||
|
||||
it 'can change the current concurrency when the status of the task changes' do
|
||||
subject.status = :running
|
||||
expect(cluster.task_concurrency[subject.name].current).to eq 2
|
||||
subject.status = :successful
|
||||
expect(cluster.task_concurrency[subject.name].current).to eq 1
|
||||
end
|
||||
|
||||
it 'ready task is counted as a ready task' do
|
||||
subject.status == :ready
|
||||
is_expected.to be_ready
|
||||
end
|
||||
end
|
||||
|
||||
it 'can increase the current concurrency counter' do
|
||||
subject.current_concurrency_zero
|
||||
expect(subject.current_concurrency_increase).to eq 1
|
||||
expect(subject.current_concurrency).to eq 1
|
||||
subject.current_concurrency_increase
|
||||
expect(subject.current_concurrency).to eq 2
|
||||
end
|
||||
context 'maximum is set and not active' do
|
||||
before(:each) do
|
||||
cluster.task_concurrency['task1'].maximum = 1
|
||||
cluster.task_concurrency['task1'].current = 2
|
||||
end
|
||||
|
||||
it 'can decrease the current concurrency counter' do
|
||||
subject.current_concurrency_zero
|
||||
subject.current_concurrency_increase
|
||||
expect(subject.current_concurrency).to eq 1
|
||||
expect(subject.current_concurrency_decrease).to eq 0
|
||||
expect(subject.current_concurrency).to eq 0
|
||||
expect(subject.current_concurrency_decrease).to eq 0
|
||||
expect(subject.current_concurrency).to eq 0
|
||||
end
|
||||
it 'concurrency is present' do
|
||||
is_expected.to be_concurrency_present
|
||||
end
|
||||
|
||||
it 'can manually set the current concurrency value' do
|
||||
subject.current_concurrency_zero
|
||||
expect(subject.current_concurrency = 100).to eq 100
|
||||
expect(subject.current_concurrency).to eq 100
|
||||
expect(subject.current_concurrency = -100).to eq -100
|
||||
expect(subject.current_concurrency).to eq 0
|
||||
end
|
||||
it 'concurrency is NOT available' do
|
||||
is_expected.not_to be_concurrency_available
|
||||
end
|
||||
|
||||
it 'can reset the current concurrency' do
|
||||
subject.current_concurrency_zero
|
||||
subject.current_concurrency_increase
|
||||
expect(subject.current_concurrency).to eq 1
|
||||
expect(subject.current_concurrency_zero).to eq 0
|
||||
expect(subject.current_concurrency).to eq 0
|
||||
end
|
||||
it 'can change the current concurrency when the status of the task changes' do
|
||||
subject.status = :running
|
||||
expect(cluster.task_concurrency[subject.name].current).to eq 3
|
||||
subject.status = :successful
|
||||
expect(cluster.task_concurrency[subject.name].current).to eq 2
|
||||
end
|
||||
|
||||
it 'can check that the concurrency is available' do
|
||||
subject.current_concurrency_zero
|
||||
expect(subject.concurrency_available?).to eq true
|
||||
subject.maximum_concurrency = 1
|
||||
expect(subject.concurrency_available?).to eq true
|
||||
subject.current_concurrency_increase
|
||||
expect(subject.concurrency_available?).to eq false
|
||||
subject.current_concurrency_decrease
|
||||
expect(subject.concurrency_available?).to eq true
|
||||
subject.current_concurrency_increase
|
||||
expect(subject.concurrency_available?).to eq false
|
||||
subject.maximum_concurrency = 2
|
||||
expect(subject.concurrency_available?).to eq true
|
||||
end
|
||||
|
||||
it 'can change the current concurrency when the status changes for all nodes' do
|
||||
task1.current_concurrency_zero
|
||||
task1.maximum_concurrency = 1
|
||||
task1.status = :running
|
||||
expect(task1.current_concurrency).to eq 1
|
||||
expect(task2_1.current_concurrency).to eq 1
|
||||
expect(task2_2.current_concurrency).to eq 0
|
||||
task1.status = :successful
|
||||
expect(task1.current_concurrency).to eq 0
|
||||
expect(task2_1.current_concurrency).to eq 0
|
||||
expect(task2_2.current_concurrency).to eq 0
|
||||
it 'ready task is NOT counted as a ready task' do
|
||||
subject.status == :ready
|
||||
is_expected.not_to be_ready
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
#!/usr/bin/env ruby
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
require File.absolute_path File.join File.dirname(__FILE__), 'test_node.rb'
|
||||
|
||||
cluster = Deployment::TestCluster.new
|
||||
cluster.id = 'node_concurrency'
|
||||
cluster.plot = true if options[:plot]
|
||||
|
||||
node1 = Deployment::TestNode.new 'node1', cluster
|
||||
node2 = Deployment::TestNode.new 'node2', cluster
|
||||
node3 = Deployment::TestNode.new 'node3', cluster
|
||||
node4 = Deployment::TestNode.new 'node4', cluster
|
||||
node5 = Deployment::TestNode.new 'node5', cluster
|
||||
node6 = Deployment::TestNode.new 'node6', cluster
|
||||
|
||||
node1.add_new_task('task1')
|
||||
node1.add_new_task('final')
|
||||
|
||||
node2.add_new_task('task1')
|
||||
node2.add_new_task('final')
|
||||
|
||||
node3.add_new_task('task1')
|
||||
node3.add_new_task('final')
|
||||
|
||||
node4.add_new_task('task1')
|
||||
node4.add_new_task('final')
|
||||
|
||||
node5.add_new_task('task1')
|
||||
node5.add_new_task('final')
|
||||
|
||||
node6.add_new_task('task1')
|
||||
node6.add_new_task('final')
|
||||
|
||||
node1['final'].after node1['task1']
|
||||
node2['final'].after node2['task1']
|
||||
node3['final'].after node3['task1']
|
||||
node4['final'].after node4['task1']
|
||||
node5['final'].after node5['task1']
|
||||
node6['final'].after node6['task1']
|
||||
|
||||
cluster.node_concurrency.maximum = 2
|
||||
|
||||
if options[:plot]
|
||||
cluster.make_image 'start'
|
||||
end
|
||||
|
||||
if options[:interactive]
|
||||
binding.pry
|
||||
else
|
||||
cluster.run
|
||||
end
|
|
@ -0,0 +1,60 @@
|
|||
#!/usr/bin/env ruby
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
require File.absolute_path File.join File.dirname(__FILE__), 'test_node.rb'
|
||||
|
||||
cluster = Deployment::TestCluster.new
|
||||
cluster.id = 'task_concurrency'
|
||||
cluster.plot = true if options[:plot]
|
||||
|
||||
node1 = Deployment::TestNode.new 'node1', cluster
|
||||
node2 = Deployment::TestNode.new 'node2', cluster
|
||||
node3 = Deployment::TestNode.new 'node3', cluster
|
||||
node4 = Deployment::TestNode.new 'node4', cluster
|
||||
node5 = Deployment::TestNode.new 'node5', cluster
|
||||
|
||||
node1.add_new_task('task1')
|
||||
node1.add_new_task('final')
|
||||
|
||||
node2.add_new_task('task1')
|
||||
node2.add_new_task('final')
|
||||
|
||||
node3.add_new_task('task1')
|
||||
node3.add_new_task('final')
|
||||
|
||||
node4.add_new_task('task1')
|
||||
node4.add_new_task('final')
|
||||
|
||||
node5.add_new_task('task1')
|
||||
node5.add_new_task('final')
|
||||
|
||||
node1['final'].after node1['task1']
|
||||
node2['final'].after node2['task1']
|
||||
node3['final'].after node3['task1']
|
||||
node4['final'].after node4['task1']
|
||||
node5['final'].after node5['task1']
|
||||
|
||||
cluster.task_concurrency['task1'].maximum = 3
|
||||
cluster.task_concurrency['final'].maximum = 2
|
||||
|
||||
if options[:plot]
|
||||
cluster.make_image 'start'
|
||||
end
|
||||
|
||||
if options[:interactive]
|
||||
binding.pry
|
||||
else
|
||||
cluster.run
|
||||
end
|
|
@ -78,7 +78,7 @@ module Deployment
|
|||
|
||||
class TestCluster < Cluster
|
||||
attr_accessor :plot
|
||||
def hook_post_node(*args)
|
||||
def hook_pre_node(*args)
|
||||
make_image if plot
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue