Support task concurrency

Without this changes Astute will ignore information about
task strategy and will run as many as possible tasks on suitable nodes.
The only limit which can be used in this case is node concurrency,
but it will decrise deployment performance

Also change log level for message about concurrency
from info to debug, because in mostly cases they are required
only for developers

DocImpact
Closes-Bug: #1571014

Change-Id: I4f27da4abdaf4939002b62d05a44e23d3f1a3d5e
This commit is contained in:
Vladimir Sharshov (warpc) 2016-04-15 21:17:25 +03:00
parent 5415b86ac1
commit e4b84a3383
5 changed files with 157 additions and 33 deletions

View File

@ -26,7 +26,8 @@ module Astute
deployment_info, offline_uids = pre_deployment_process(deployment_info)
tasks_graph = support_virtual_node(tasks_graph)
support_virtual_node(tasks_graph)
unzip_graph(tasks_graph, tasks_directory)
Deployment::Log.logger = Astute.logger
cluster = TaskCluster.new
@ -40,27 +41,9 @@ module Astute
node.set_status_failed if offline_uids.include? node_id
end
tasks_graph.each do |node_id, tasks|
tasks.each do |task|
cluster[node_id].graph.create_task(
task['id'],
task.merge({'node_id' => node_id})
.reverse_merge(tasks_directory.fetch(task['id'], {}))
)
end
end
tasks_graph.each do |node_id, tasks|
tasks.each do |task|
task.fetch('requires', []).each do |d_t|
cluster[node_id][task['id']].depends cluster[d_t['node_id']][d_t['name']]
end
task.fetch('required_for', []).each do |d_t|
cluster[node_id][task['id']].depended_on cluster[d_t['node_id']][d_t['name']]
end
end
end
setup_tasks(tasks_graph, cluster)
setup_task_depends(tasks_graph, cluster)
setup_task_concurrency(tasks_graph, cluster)
write_graph_to_file(cluster)
result = cluster.run
@ -69,6 +52,60 @@ module Astute
private
def unzip_graph(tasks_graph, tasks_directory)
tasks_graph.each do |node_id, tasks|
tasks.each do |task|
task.merge!({'node_id' => node_id})
.reverse_merge(tasks_directory.fetch(task['id'], {}))
end
end
tasks_graph
end
def setup_tasks(tasks_graph, cluster)
tasks_graph.each do |node_id, tasks|
tasks.each do |task|
cluster[node_id].graph.create_task(task['id'], task)
end
end
end
def setup_task_depends(tasks_graph, cluster)
tasks_graph.each do |node_id, tasks|
tasks.each do |task|
task.fetch('requires', []).each do |d_t|
cluster[node_id][task['id']].depends(
cluster[d_t['node_id']][d_t['name']])
end
task.fetch('required_for', []).each do |d_t|
cluster[node_id][task['id']].depended_on(
cluster[d_t['node_id']][d_t['name']])
end
end
end
end
def setup_task_concurrency(tasks_graph, cluster)
tasks_graph.each do |_node_id, tasks|
tasks.each do |task|
cluster.task_concurrency[task['id']].maximum = task_concurrency_value(task)
end
end
end
def task_concurrency_value(task)
strategy = task.fetch('parameters', {}).fetch('strategy', {})
value = case strategy['type']
when 'one_by_one' then 1
when 'parallel' then strategy['amount'].to_i
else 0
end
return value if value >= 0
raise DeploymentEngineError, "Task concurrency expect only "\
"non-negative integer, but got #{value}. Please check task #{task}"
end
def pre_deployment_process(deployment_info)
return [[],[]] if deployment_info.blank?
@ -227,7 +264,7 @@ module Astute
@ctx,
"systemtype",
all_uids,
check_result=false,
_check_result=false,
10
)
available_nodes = systemtype.get_type.select do |node|

View File

@ -126,10 +126,10 @@ module Deployment
return unless concurrency_present?
if status_to == :busy
cluster.node_concurrency.increment
info "Increasing node concurrency to: #{cluster.node_concurrency.current}"
debug "Increasing node concurrency to: #{cluster.node_concurrency.current}"
elsif status_from == :busy
cluster.node_concurrency.decrement
info "Decreasing node concurrency to: #{cluster.node_concurrency.current}"
debug "Decreasing node concurrency to: #{cluster.node_concurrency.current}"
end
end

View File

@ -159,12 +159,13 @@ module Deployment
# @return [void]
def status_changes_concurrency(status_from, status_to)
return unless concurrency_present?
return if status_from == status_to
if status_to == :running
node.cluster.task_concurrency[name].increment
info "Increasing task concurrency to: #{node.cluster.task_concurrency[name].current}"
debug "Increasing task concurrency to: #{node.cluster.task_concurrency[name].current}"
elsif status_from == :running
node.cluster.task_concurrency[name].decrement
info "Decreasing task concurrency to: #{node.cluster.task_concurrency[name].current}"
debug "Decreasing task concurrency to: #{node.cluster.task_concurrency[name].current}"
end
end

View File

@ -359,7 +359,7 @@ describe Deployment::Task do
end
it 'ready task is counted as a ready task' do
subject.status == :ready
subject.status = :ready
is_expected.to be_ready
end
end
@ -385,7 +385,7 @@ describe Deployment::Task do
end
it 'ready task is counted as a ready task' do
subject.status == :ready
subject.status = :ready
is_expected.to be_ready
end
end
@ -411,8 +411,15 @@ describe Deployment::Task do
expect(cluster.task_concurrency[subject.name].current).to eq 1
end
it 'do nothing when the status of the task was updated but not changed' do
subject.status = :running
expect(cluster.task_concurrency[subject.name].current).to eq 2
subject.status = :running
expect(cluster.task_concurrency[subject.name].current).to eq 2
end
it 'ready task is counted as a ready task' do
subject.status == :ready
subject.status = :ready
is_expected.to be_ready
end
end
@ -439,7 +446,7 @@ describe Deployment::Task do
end
it 'ready task is NOT counted as a ready task' do
subject.status == :ready
subject.status = :ready
is_expected.not_to be_ready
end
end

View File

@ -40,7 +40,8 @@ describe Astute::TaskDeployment do
"fail_on_error"=>true,
"required_for"=>[],
"requires"=> [],
"id"=>"ironic_post_swift_key"
"id"=>"ironic_post_swift_key",
"parameters"=>{}
}],
"null"=> [{
"skipped"=>true,
@ -48,6 +49,7 @@ describe Astute::TaskDeployment do
"fail_on_error"=>false,
"required_for"=>[],
"requires"=>[],
"parameters"=>{},
"id"=>"post_deployment_start"}]
}
end
@ -164,13 +166,90 @@ describe Astute::TaskDeployment do
Astute::TaskCluster.any_instance.stubs(:run).returns({:success => true})
Deployment::Log.expects(:logger=).with(Astute.logger)
Deployment::Cluster.any_instance.stubs(:run).returns({:success => true})
task_deployment.deploy(
deployment_info: deployment_info,
tasks_graph: tasks_graph,
tasks_directory: tasks_directory)
end
context 'task concurrency' do
let(:task_concurrency) { mock('task_concurrency') }
before(:each) do
Astute::TaskPreDeploymentActions.any_instance.stubs(:process)
task_deployment.stubs(:write_graph_to_file)
ctx.stubs(:report)
task_deployment.stubs(:remove_failed_nodes).returns([deployment_info, []])
Astute::TaskCluster.any_instance.stubs(:run).returns({:success => true})
Deployment::Concurrency::Counter.any_instance
.stubs(:maximum=).with(
Astute.config.max_nodes_per_call)
end
it 'should setup 0 if no task concurrency setup' do
Deployment::Concurrency::Counter.any_instance.expects(:maximum=).with(0).times(5)
task_deployment.deploy(
deployment_info: deployment_info,
tasks_graph: tasks_graph,
tasks_directory: tasks_directory)
end
it 'it should setup 1 if task concurrency type one_by_one' do
tasks_graph['1'].first['parameters']['strategy'] =
{'type' => 'one_by_one'}
Deployment::Concurrency::Counter.any_instance.expects(:maximum=)
.with(0).times(4)
Deployment::Concurrency::Counter.any_instance.expects(:maximum=)
.with(1)
task_deployment.deploy(
deployment_info: deployment_info,
tasks_graph: tasks_graph,
tasks_directory: tasks_directory)
end
it 'should setup task concurrency as amount if type is parallel' do
tasks_graph['1'].first['parameters']['strategy'] =
{'type' => 'parallel', 'amount' => 7}
Deployment::Concurrency::Counter.any_instance.expects(:maximum=)
.with(0).times(4)
Deployment::Concurrency::Counter.any_instance.expects(:maximum=)
.with(7)
task_deployment.deploy(
deployment_info: deployment_info,
tasks_graph: tasks_graph,
tasks_directory: tasks_directory)
end
it 'should setup 0 if task strategy is parallel and amount do not set' do
tasks_graph['1'].first['parameters']['strategy'] = {'type' => 'parallel'}
Deployment::Concurrency::Counter.any_instance.expects(:maximum=)
.with(0).times(5)
task_deployment.deploy(
deployment_info: deployment_info,
tasks_graph: tasks_graph,
tasks_directory: tasks_directory)
end
it 'should raise error if amount is non-positive integer and type is parallel' do
tasks_graph['1'].first['parameters']['strategy'] =
{'type' => 'parallel', 'amount' => -4}
Deployment::Concurrency::Counter.any_instance.expects(:maximum=)
.with(0).times(2)
expect {task_deployment.deploy(
deployment_info: deployment_info,
tasks_graph: tasks_graph,
tasks_directory: tasks_directory)}.to raise_error(
Astute::DeploymentEngineError, /expect only non-negative integer, but got -4./
)
end
end
context 'config' do
around(:each) do |example|
max_nodes_old_value = Astute.config.max_nodes_per_call