WIP - Initial attempt at task-based orchestration

- Refactored a few files to prevent import loops
- Created basic Orchestrator and ProviderDriver scheme
for task exection
- Created task model more concurrent-friendly task mgmt
This commit is contained in:
Scott Hussey 2017-04-14 10:52:24 -05:00
parent 5586042858
commit e5dc950f1f
15 changed files with 395 additions and 51 deletions

View File

@ -3,4 +3,12 @@
This is the external facing API service to control the rest
of Drydock and query Drydock-managed data.
Anticipate basing this service on the falcon Python library
Anticipate basing this service on the falcon Python library
## Endpoints ##
### /tasks ###
POST - Create a new orchestration task and submit it for execution
GET - Get status of a task
DELETE - Cancel execution of a task if permitted

View File

@ -11,11 +11,114 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from threading import Thread, Lock
import uuid
import time
import helm_drydock.statemgmt as statemgmt
import helm_drydock.enum as enum
import helm_drydock.model.task as tasks
# This is the interface for the orchestrator to access a driver
# TODO Need to have each driver spin up a seperate thread to manage
# driver tasks and feed them via queue
class ProviderDriver(object):
__init__(self):
pass
def __init__(self, state_manager):
self.state_manager = state_manager
class DriverTask(object):
def execute_task(self, task):
task_manager = DriverTaskManager(task, self.state_manager)
task_manager.start()
while task_manager.is_alive():
self.state_manager.put_task(task)
time.sleep(1)
return
# Execute a single task in a separate thread
class DriverTaskManager(Thread):
def __init__(self, task, state_manager):
super(DriverTaskManager, self).__init__()
if isinstance(task, DriverTask):
self.task = task
else:
raise DriverError("DriverTaskManager must be initialized" \
"with a DriverTask instance")
if isinstance(state_manager, statemgmt.DesignState):
self.state_manager = state_manager
else:
raise DriverError("Invalid state manager specified")
return
def run():
self.task.set_manager(self.name)
if self.task.action == enum.OrchestratorAction.Noop:
task.set_status(enum.TaskStatus.Running)
self.state_manager.put_task(task)
i = 0
while i < 5:
i = i + 1
if task.terminate:
task.set_status(enum.TaskStatus.Terminated)
self.state_manager.put_task(task)
return
else:
time.sleep(1)
task.set_status(enum.TaskStatus.Complete)
self.state_manager.put_task(task)
return
else:
raise DriverError("Unknown Task action")
class DriverTask(tasks.Task):
# subclasses implemented by each driver should override this with the list
# of actions that driver supports
supported_actions = [enum.OrchestratorAction.Noop]
def __init__(self, target_design_id=None,
target_action=None, task_scope={}, **kwargs):
super(DriverTask, self).__init__(**kwargs)
if target_design_id is None:
raise DriverError("target_design_id cannot be None")
self.target_design_id = target_design_id
if target_action in self.supported_actions:
self.target_action = target_action
else:
raise DriverError("DriverTask does not support action %s"
% (target_action))
self.task_scope = task_scope
# The DriverTaskManager thread that is managing this task. We
# don't want a task to be submitted multiple times
self.task_manager = None
def set_manager(self, manager_name):
my_lock = self.get_lock()
if my_lock:
if self.task_manager is None:
self.task_manager = manager_name
else:
self.release_lock()
raise DriverError("Task %s already managed by %s"
% (self.taskid, self.task_manager))
self.release_lock()
return True
raise DriverError("Could not acquire lock")
def get_manager(self):
return self.task_manager

View File

@ -14,7 +14,7 @@
from enum import Enum, unique
@unique
class Action(Enum):
class OrchestratorAction(Enum):
Noop = 'noop'
ValidateDesign = 'validate_design'
VerifySite = 'verify_site'
@ -62,3 +62,14 @@ class NodeStatus(Enum):
FailedBootstrap = 'failed_bootstrap' # Node bootstrapping failed
Bootstrapped = 'bootstrapped' # Node fully bootstrapped
Complete = 'complete' # Node is complete
@unique
class TaskStatus(Enum):
Created = 'created'
Waiting = 'waiting'
Running = 'running'
Stopping = 'stopping'
Terminated = 'terminated'
Errored = 'errored'
Complete = 'complete'
Stopped = 'stopped'

View File

@ -16,4 +16,7 @@ class DesignError(Exception):
pass
class StateError(Exception):
pass
class OrchestratorError(Exception):
pass

View File

@ -18,8 +18,8 @@ import logging
from copy import deepcopy
from helm_drydock.orchestrator.enum import SiteStatus
from helm_drydock.orchestrator.enum import NodeStatus
from helm_drydock.enum import SiteStatus
from helm_drydock.enum import NodeStatus
from helm_drydock.model.network import Network
from helm_drydock.model.network import NetworkLink
from helm_drydock.model import Utils

View File

@ -18,8 +18,8 @@ import logging
from copy import deepcopy
from helm_drydock.orchestrator.enum import SiteStatus
from helm_drydock.orchestrator.enum import NodeStatus
from helm_drydock.enum import SiteStatus
from helm_drydock.enum import NodeStatus
class HardwareProfile(object):

View File

@ -18,8 +18,8 @@ import logging
from copy import deepcopy
from helm_drydock.orchestrator.enum import SiteStatus
from helm_drydock.orchestrator.enum import NodeStatus
from helm_drydock.enum import SiteStatus
from helm_drydock.enum import NodeStatus
class NetworkLink(object):

View File

@ -19,8 +19,8 @@ import logging
from copy import deepcopy
from helm_drydock.orchestrator.enum import SiteStatus
from helm_drydock.orchestrator.enum import NodeStatus
from helm_drydock.enum import SiteStatus
from helm_drydock.enum import NodeStatus
from helm_drydock.model.hostprofile import HostProfile
from helm_drydock.model import Utils

View File

@ -18,8 +18,8 @@ import logging
from copy import deepcopy
from helm_drydock.orchestrator.enum import SiteStatus
from helm_drydock.orchestrator.enum import NodeStatus
from helm_drydock.enum import SiteStatus
from helm_drydock.enum import NodeStatus
class Site(object):

101
helm_drydock/model/task.py Normal file
View File

@ -0,0 +1,101 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from threading import Lock
import helm_drydock.error as errors
from helm_drydock.enum import TaskStatus, OrchestratorAction
class Task(object):
def __init__(self, **kwargs):
self.task_id = uuid.uuid4()
self.status = TaskStatus.Created
self.terminate = False
self.subtasks = []
parent_task = kwargs.get('parent_task','')
# A lock to prevent concurrency race conditions
self.update_lock = Lock()
def get_id(self):
return self.task_id
# Mark this task and all subtasks as requested termination
# so that the task manager knows termination has been requested
def terminate_task(self):
locked = self.get_lock()
if locked:
# TODO Naively assume subtask termination will succeed for now
for t in self.subtasks:
t.terminate_task()
self.terminate = True
self.release_lock()
else:
raise errors.OrchestratorError("Could not get task update lock")
def set_status(self, status):
locked = self.get_lock()
if locked:
self.status = status
self.release_lock()
else:
raise errors.OrchestratorError("Could not get task update lock")
def get_status(self):
return self.status
def get_lock(self):
locked = self.update_lock.acquire(blocking=True, timeout=10)
return locked
def release_lock(self):
self.update_lock.release()
return
def create_subtask(self, subtask_class, **kwargs):
if self.terminate:
raise errors.OrchestratorError("Cannot create subtask for parent" \
" marked for termination")
locked = self.get_lock()
if locked:
subtask = subtask_class(parent_task=self.get_id(), **kwargs)
self.subtasks.append(subtask.get_id())
self.release_lock()
return subtask
else:
raise errors.OrchestratorError("Could not get task update lock")
class OrchestratorTask(Task):
def __init__(self, **kwargs):
super(OrchestratorTask, self).__init__(**kwargs)
self.action = kwargs.get('action', OrchestratorAction.Noop)
# Validate parameters based on action
self.site = kwargs.get('site', '')
if self.site == '':
raise ValueError("Orchestration Task requires 'site' parameter")
if self.action in [OrchestratorAction.VerifyNode,
OrchestratorAction.PrepareNode,
OrchestratorAction.DeployNode,
OrchestratorAction.DestroyNode]:
self.node_filter = kwargs.get('node_filter', None)

View File

@ -11,23 +11,34 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import time
from enum import Enum, unique
import uuid
import helm_drydock.drivers as drivers
import helm_drydock.model.task as tasks
import helm_drydock.error as errors
from helm_drydock.enum import TaskStatus, OrchestratorAction
class Orchestrator(object):
# enabled_drivers is a map which provider drivers
# should be enabled for use by this orchestrator
def __init__(self, enabled_drivers=None, design_state=None):
def __init__(self, enabled_drivers=None, state_manager=None):
self.enabled_drivers = {}
self.enabled_drivers['oob'] = enabled_drivers.get('oob', None)
self.enabled_drivers['server'] = enabled_drivers.get('server', None)
self.enabled_drivers['network'] = enabled_drivers.get('network', None)
if enabled_drivers is not None:
self.enabled_drivers['oob'] = enabled_drivers.get('oob', None)
self.enabled_drivers['server'] = enabled_drivers.get(
'server', None)
self.enabled_drivers['network'] = enabled_drivers.get(
'network', None)
self.design_state = design_state
self.state_manager = state_manager
self.thread_objs = {}
"""
execute_task
@ -38,34 +49,28 @@ class Orchestrator(object):
module. Based on those 3 inputs, we'll decide what is needed next.
"""
def execute_task(self, task):
if design_state is None:
raise Exception("Cannot execute task without initialized state manager")
if self.state_manager is None:
raise errors.OrchestratorError("Cannot execute task without" \
" initialized state manager")
# Just for testing now, need to implement with enabled_drivers
# logic
if task.action == OrchestratorAction.Noop:
task.set_status(TaskStatus.Running)
self.state_manager.put_task(task)
class OrchestrationTask(object):
def __init__(self, action, **kwargs):
self.taskid = uuid.uuid4()
self.action = action
parent_task = kwargs.get('parent_task','')
# Validate parameters based on action
self.site = kwargs.get('site', '')
if self.site == '':
raise ValueError("Task requires 'site' parameter")
if action in [Action.VerifyNode, Action.PrepareNode,
Action.DeployNode, Action.DestroyNode]:
self.node_filter = kwargs.get('node_filter', None)
def child_task(self, action, **kwargs):
child_task = OrchestrationTask(action, parent_task=self.taskid, site=self.site, **kwargs)
return child_task
driver_task = task.create_subtask(drivers.DriverTask,
target_design_id=0,
target_action=OrchestratorAction.Noop)
self.state_manager.post_task(driver_task)
driver = drivers.ProviderDriver(self.state_manager)
driver.execute_task(driver_task)
task.set_status(driver_task.get_status())
self.state_manager.put_task(task)
return
else:
raise errors.OrchestratorError("Action %s not supported"
% (task.action))

View File

@ -23,6 +23,7 @@ import helm_drydock.model.hostprofile as hostprofile
import helm_drydock.model.network as network
import helm_drydock.model.site as site
import helm_drydock.model.hwprofile as hwprofile
import helm_drydock.model.task as tasks
from helm_drydock.error import DesignError, StateError
@ -38,6 +39,9 @@ class DesignState(object):
self.builds = []
self.builds_lock = Lock()
self.tasks = []
self.tasks_lock = Lock()
return
# TODO Need to lock a design base or change once implementation
@ -166,6 +170,46 @@ class DesignState(object):
else:
raise DesignError("Design change must be a SiteDesign instance")
def get_task(self, task_id):
for t in self.tasks:
if t.get_id() == task_id:
return t
return None
def post_task(self, task):
if task is not None and isinstance(task, tasks.Task):
my_lock = self.tasks_lock.acquire(blocking=True, timeout=10)
if my_lock:
task_id = task.get_id()
matching_tasks = [t for t in self.tasks
if t.get_id() == task_id]
if len(matching_tasks) > 0:
self.tasks_lock.release()
raise StateError("Task %s already created" % task_id)
self.tasks.append(deepcopy(task))
self.tasks_lock.release()
return True
else:
raise StateError("Could not acquire lock")
else:
raise StateError("Task is not the correct type")
def put_task(self, task):
if task is not None and isinstance(task, tasks.Task):
my_lock = self.tasks_lock.acquire(blocking=True, timeout=10)
if my_lock:
task_id = task.get_id()
self.tasks = [t
if t.get_id() != task_id else deepcopy(task)
for t in self.tasks]
self.tasks_lock.release()
return True
else:
raise StateError("Could not acquire lock")
else:
raise StateError("Task is not the correct type")
class SiteDesign(object):
def __init__(self, ischange=False, changeid=None):

View File

@ -20,6 +20,12 @@ Serialization of Drydock internal model as rendered to effective implementation
/drydock/build
/drydock/build/[datestamp] - A point-in-time view of what was deployed with deployment results
## Tasks ##
Management of task state for the internal orchestrator
/drydock/tasks
## Node data ##
Per-node data that can drive introspection as well as accept updates from nodes

View File

@ -45,12 +45,14 @@ setup(name='helm_drydock',
'helm_drydock.ingester.plugins',
'helm_drydock.statemgmt',
'helm_drydock.orchestrator',
'helm_drydock.control'],
'helm_drydock.control',
'helm_drydock.drivers',
'helm_drydock.drivers.oob'],
install_requires=[
'PyYAML',
'oauth',
'requests-oauthlib',
'pyghmi',
'pyghmi>=1.0.18',
'netaddr',
'pecan',
'webob'

View File

@ -0,0 +1,61 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Generic testing for the orchestrator
#
import helm_drydock.orchestrator as orch
import helm_drydock.enum as enum
import helm_drydock.statemgmt as statemgmt
import helm_drydock.model.task as task
import helm_drydock.drivers as drivers
import threading
import time
class TestClass(object):
def test_driver_threading(self):
state_mgr = statemgmt.DesignState()
orchestrator = orch.Orchestrator(state_manager=state_mgr)
orch_task = task.OrchestratorTask(action=enum.OrchestratorAction.Noop,
site="default")
orchestrator.execute_task(orch_task)
# Check that each subtask executed in a different thread than this one
for t in orch_task.subtasks:
if isinstance(t, drivers.DriverTask):
assert t.get_manager() != threading.current_thread().name
def test_task_termination(self):
state_mgr = statemgmt.DesignState()
orchestrator = orch.Orchestrator(state_manager=state_mgr)
orch_task = task.OrchestratorTask(action=enum.OrchestratorAction.Noop,
site="default")
orch_thread = threading.Thread(target=orchestrator.execute_task,
args=(orch_task,))
orch_thread.start()
time.sleep(1)
orch_task.terminate_task()
while orch_thread.is_alive():
time.sleep(1)
assert orch_task.get_status() == enum.TaskStatus.Terminated
for t in orch_task.subtasks:
assert t.get_status() == enum.TaskStatus.Terminated