When running the examples, especially when running them in TRACE/BLATHER/DEBUG logging level these updates make it more clear = what is being processed, the messages being sent/acked/received and what there contents are. Change-Id: I94a497c9064df30197454ae480fe3d471ba1dc7d
		
			
				
	
	
		
			234 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			234 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
#    Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
 | 
						|
#
 | 
						|
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
#    not use this file except in compliance with the License. You may obtain
 | 
						|
#    a copy of the License at
 | 
						|
#
 | 
						|
#         http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
#    Unless required by applicable law or agreed to in writing, software
 | 
						|
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
#    License for the specific language governing permissions and limitations
 | 
						|
#    under the License.
 | 
						|
 | 
						|
import functools
 | 
						|
 | 
						|
from futurist import periodics
 | 
						|
from oslo_utils import timeutils
 | 
						|
 | 
						|
from taskflow.engines.action_engine import executor
 | 
						|
from taskflow.engines.worker_based import dispatcher
 | 
						|
from taskflow.engines.worker_based import protocol as pr
 | 
						|
from taskflow.engines.worker_based import proxy
 | 
						|
from taskflow.engines.worker_based import types as wt
 | 
						|
from taskflow import exceptions as exc
 | 
						|
from taskflow import logging
 | 
						|
from taskflow import task as task_atom
 | 
						|
from taskflow.utils import kombu_utils as ku
 | 
						|
from taskflow.utils import misc
 | 
						|
from taskflow.utils import threading_utils as tu
 | 
						|
 | 
						|
LOG = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
class WorkerTaskExecutor(executor.TaskExecutor):
 | 
						|
    """Executes tasks on remote workers."""
 | 
						|
 | 
						|
    def __init__(self, uuid, exchange, topics,
 | 
						|
                 transition_timeout=pr.REQUEST_TIMEOUT,
 | 
						|
                 url=None, transport=None, transport_options=None,
 | 
						|
                 retry_options=None):
 | 
						|
        self._uuid = uuid
 | 
						|
        self._requests_cache = wt.RequestsCache()
 | 
						|
        self._transition_timeout = transition_timeout
 | 
						|
        type_handlers = {
 | 
						|
            pr.RESPONSE: dispatcher.Handler(self._process_response,
 | 
						|
                                            validator=pr.Response.validate),
 | 
						|
        }
 | 
						|
        self._proxy = proxy.Proxy(uuid, exchange,
 | 
						|
                                  type_handlers=type_handlers,
 | 
						|
                                  on_wait=self._on_wait, url=url,
 | 
						|
                                  transport=transport,
 | 
						|
                                  transport_options=transport_options,
 | 
						|
                                  retry_options=retry_options)
 | 
						|
        # NOTE(harlowja): This is the most simplest finder impl. that
 | 
						|
        # doesn't have external dependencies (outside of what this engine
 | 
						|
        # already requires); it though does create periodic 'polling' traffic
 | 
						|
        # to workers to 'learn' of the tasks they can perform (and requires
 | 
						|
        # pre-existing knowledge of the topics those workers are on to gather
 | 
						|
        # and update this information).
 | 
						|
        self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics)
 | 
						|
        self._finder.notifier.register(wt.WorkerFinder.WORKER_ARRIVED,
 | 
						|
                                       self._on_worker)
 | 
						|
        self._helpers = tu.ThreadBundle()
 | 
						|
        self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start),
 | 
						|
                           after_start=lambda t: self._proxy.wait(),
 | 
						|
                           before_join=lambda t: self._proxy.stop())
 | 
						|
        p_worker = periodics.PeriodicWorker.create([self._finder])
 | 
						|
        if p_worker:
 | 
						|
            self._helpers.bind(lambda: tu.daemon_thread(p_worker.start),
 | 
						|
                               before_join=lambda t: p_worker.stop(),
 | 
						|
                               after_join=lambda t: p_worker.reset(),
 | 
						|
                               before_start=lambda t: p_worker.reset())
 | 
						|
 | 
						|
    def _on_worker(self, event_type, details):
 | 
						|
        """Process new worker that has arrived (and fire off any work)."""
 | 
						|
        worker = details['worker']
 | 
						|
        for request in self._requests_cache.get_waiting_requests(worker):
 | 
						|
            if request.transition_and_log_error(pr.PENDING, logger=LOG):
 | 
						|
                self._publish_request(request, worker)
 | 
						|
 | 
						|
    def _process_response(self, response, message):
 | 
						|
        """Process response from remote side."""
 | 
						|
        LOG.debug("Started processing response message '%s'",
 | 
						|
                  ku.DelayedPretty(message))
 | 
						|
        try:
 | 
						|
            task_uuid = message.properties['correlation_id']
 | 
						|
        except KeyError:
 | 
						|
            LOG.warning("The 'correlation_id' message property is"
 | 
						|
                        " missing in message '%s'",
 | 
						|
                        ku.DelayedPretty(message))
 | 
						|
        else:
 | 
						|
            request = self._requests_cache.get(task_uuid)
 | 
						|
            if request is not None:
 | 
						|
                response = pr.Response.from_dict(response)
 | 
						|
                LOG.debug("Extracted response '%s' and matched it to"
 | 
						|
                          " request '%s'", response, request)
 | 
						|
                if response.state == pr.RUNNING:
 | 
						|
                    request.transition_and_log_error(pr.RUNNING, logger=LOG)
 | 
						|
                elif response.state == pr.EVENT:
 | 
						|
                    # Proxy the event + details to the task/request notifier...
 | 
						|
                    event_type = response.data['event_type']
 | 
						|
                    details = response.data['details']
 | 
						|
                    request.notifier.notify(event_type, details)
 | 
						|
                elif response.state in (pr.FAILURE, pr.SUCCESS):
 | 
						|
                    moved = request.transition_and_log_error(response.state,
 | 
						|
                                                             logger=LOG)
 | 
						|
                    if moved:
 | 
						|
                        # NOTE(imelnikov): request should not be in the
 | 
						|
                        # cache when another thread can see its result and
 | 
						|
                        # schedule another request with the same uuid; so
 | 
						|
                        # we remove it, then set the result...
 | 
						|
                        del self._requests_cache[request.uuid]
 | 
						|
                        request.set_result(**response.data)
 | 
						|
                else:
 | 
						|
                    LOG.warning("Unexpected response status '%s'",
 | 
						|
                                response.state)
 | 
						|
            else:
 | 
						|
                LOG.debug("Request with id='%s' not found", task_uuid)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _handle_expired_request(request):
 | 
						|
        """Handle expired request.
 | 
						|
 | 
						|
        When request has expired it is removed from the requests cache and
 | 
						|
        the `RequestTimeout` exception is set as a request result.
 | 
						|
        """
 | 
						|
        if request.transition_and_log_error(pr.FAILURE, logger=LOG):
 | 
						|
            # Raise an exception (and then catch it) so we get a nice
 | 
						|
            # traceback that the request will get instead of it getting
 | 
						|
            # just an exception with no traceback...
 | 
						|
            try:
 | 
						|
                request_age = timeutils.delta_seconds(request.created_on,
 | 
						|
                                                      timeutils.utcnow())
 | 
						|
                raise exc.RequestTimeout(
 | 
						|
                    "Request '%s' has expired after waiting for %0.2f"
 | 
						|
                    " seconds for it to transition out of (%s) states"
 | 
						|
                    % (request, request_age, ", ".join(pr.WAITING_STATES)))
 | 
						|
            except exc.RequestTimeout:
 | 
						|
                with misc.capture_failure() as failure:
 | 
						|
                    LOG.debug(failure.exception_str)
 | 
						|
                    request.set_result(failure)
 | 
						|
 | 
						|
    def _on_wait(self):
 | 
						|
        """This function is called cyclically between draining events."""
 | 
						|
        self._requests_cache.cleanup(self._handle_expired_request)
 | 
						|
 | 
						|
    def _submit_task(self, task, task_uuid, action, arguments,
 | 
						|
                     progress_callback=None, **kwargs):
 | 
						|
        """Submit task request to a worker."""
 | 
						|
        request = pr.Request(task, task_uuid, action, arguments,
 | 
						|
                             self._transition_timeout, **kwargs)
 | 
						|
 | 
						|
        # Register the callback, so that we can proxy the progress correctly.
 | 
						|
        if (progress_callback is not None and
 | 
						|
                request.notifier.can_be_registered(
 | 
						|
                    task_atom.EVENT_UPDATE_PROGRESS)):
 | 
						|
            request.notifier.register(task_atom.EVENT_UPDATE_PROGRESS,
 | 
						|
                                      progress_callback)
 | 
						|
            cleaner = functools.partial(request.notifier.deregister,
 | 
						|
                                        task_atom.EVENT_UPDATE_PROGRESS,
 | 
						|
                                        progress_callback)
 | 
						|
            request.result.add_done_callback(lambda fut: cleaner())
 | 
						|
 | 
						|
        # Get task's worker and publish request if worker was found.
 | 
						|
        worker = self._finder.get_worker_for_task(task)
 | 
						|
        if worker is not None:
 | 
						|
            # NOTE(skudriashev): Make sure request is set to the PENDING state
 | 
						|
            # before putting it into the requests cache to prevent the notify
 | 
						|
            # processing thread get list of waiting requests and publish it
 | 
						|
            # before it is published here, so it wouldn't be published twice.
 | 
						|
            if request.transition_and_log_error(pr.PENDING, logger=LOG):
 | 
						|
                self._requests_cache[request.uuid] = request
 | 
						|
                self._publish_request(request, worker)
 | 
						|
        else:
 | 
						|
            LOG.debug("Delaying submission of '%s', no currently known"
 | 
						|
                      " worker/s available to process it", request)
 | 
						|
            self._requests_cache[request.uuid] = request
 | 
						|
 | 
						|
        return request.result
 | 
						|
 | 
						|
    def _publish_request(self, request, worker):
 | 
						|
        """Publish request to a given topic."""
 | 
						|
        LOG.debug("Submitting execution of '%s' to worker '%s' (expecting"
 | 
						|
                  " response identified by reply_to=%s and"
 | 
						|
                  " correlation_id=%s)", request, worker, self._uuid,
 | 
						|
                  request.uuid)
 | 
						|
        try:
 | 
						|
            self._proxy.publish(request, worker.topic,
 | 
						|
                                reply_to=self._uuid,
 | 
						|
                                correlation_id=request.uuid)
 | 
						|
        except Exception:
 | 
						|
            with misc.capture_failure() as failure:
 | 
						|
                LOG.critical("Failed to submit '%s' (transitioning it to"
 | 
						|
                             " %s)", request, pr.FAILURE, exc_info=True)
 | 
						|
                if request.transition_and_log_error(pr.FAILURE, logger=LOG):
 | 
						|
                    del self._requests_cache[request.uuid]
 | 
						|
                    request.set_result(failure)
 | 
						|
 | 
						|
    def execute_task(self, task, task_uuid, arguments,
 | 
						|
                     progress_callback=None):
 | 
						|
        return self._submit_task(task, task_uuid, pr.EXECUTE, arguments,
 | 
						|
                                 progress_callback=progress_callback)
 | 
						|
 | 
						|
    def revert_task(self, task, task_uuid, arguments, result, failures,
 | 
						|
                    progress_callback=None):
 | 
						|
        return self._submit_task(task, task_uuid, pr.REVERT, arguments,
 | 
						|
                                 progress_callback=progress_callback,
 | 
						|
                                 result=result, failures=failures)
 | 
						|
 | 
						|
    def wait_for_workers(self, workers=1, timeout=None):
 | 
						|
        """Waits for geq workers to notify they are ready to do work.
 | 
						|
 | 
						|
        NOTE(harlowja): if a timeout is provided this function will wait
 | 
						|
        until that timeout expires, if the amount of workers does not reach
 | 
						|
        the desired amount of workers before the timeout expires then this will
 | 
						|
        return how many workers are still needed, otherwise it will
 | 
						|
        return zero.
 | 
						|
        """
 | 
						|
        return self._finder.wait_for_workers(workers=workers,
 | 
						|
                                             timeout=timeout)
 | 
						|
 | 
						|
    def start(self):
 | 
						|
        """Starts proxy thread and associated topic notification thread."""
 | 
						|
        self._helpers.start()
 | 
						|
 | 
						|
    def stop(self):
 | 
						|
        """Stops proxy thread and associated topic notification thread."""
 | 
						|
        self._helpers.stop()
 | 
						|
        self._requests_cache.clear(self._handle_expired_request)
 | 
						|
        self._finder.clear()
 |