diff --git a/doc/source/engines.rst b/doc/source/engines.rst index 312c2e36..234ae127 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -450,6 +450,7 @@ Components .. automodule:: taskflow.engines.action_engine.completer .. automodule:: taskflow.engines.action_engine.deciders .. automodule:: taskflow.engines.action_engine.executor +.. automodule:: taskflow.engines.action_engine.process_executor .. automodule:: taskflow.engines.action_engine.runtime .. automodule:: taskflow.engines.action_engine.scheduler .. autoclass:: taskflow.engines.action_engine.scopes.ScopeWalker diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index e31229b3..aea4b1f1 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -31,6 +31,7 @@ import six from taskflow.engines.action_engine import builder from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import executor +from taskflow.engines.action_engine import process_executor from taskflow.engines.action_engine import runtime from taskflow.engines import base from taskflow import exceptions as exc @@ -502,7 +503,7 @@ class ParallelActionEngine(ActionEngine): Type provided Executor used ========================= =============================================== |cft|.ThreadPoolExecutor :class:`~.executor.ParallelThreadTaskExecutor` -|cfp|.ProcessPoolExecutor :class:`~.executor.ParallelProcessTaskExecutor` +|cfp|.ProcessPoolExecutor :class:`~.|pe|.ParallelProcessTaskExecutor` |cf|._base.Executor :class:`~.executor.ParallelThreadTaskExecutor` ========================= =============================================== @@ -514,8 +515,8 @@ Type provided Executor used =========================== =============================================== String (case insensitive) Executor used =========================== =============================================== -``process`` :class:`~.executor.ParallelProcessTaskExecutor` -``processes`` :class:`~.executor.ParallelProcessTaskExecutor` +``process`` :class:`~.|pe|.ParallelProcessTaskExecutor` +``processes`` :class:`~.|pe|.ParallelProcessTaskExecutor` ``thread`` :class:`~.executor.ParallelThreadTaskExecutor` ``threaded`` :class:`~.executor.ParallelThreadTaskExecutor` ``threads`` :class:`~.executor.ParallelThreadTaskExecutor` @@ -531,7 +532,7 @@ String (case insensitive) Executor used workers that are used to dispatch tasks into (this number is bounded by the maximum parallelization your workflow can support). - * ``dispatch_periodicity``: a float (in seconds) that will affect the + * ``wait_timeout``: a float (in seconds) that will affect the parallel process task executor (and therefore is **only** applicable when the executor provided above is of the process variant). This number affects how much time the process task executor waits for messages from @@ -540,6 +541,7 @@ String (case insensitive) Executor used polling while a higher number will involve less polling but a slower time for an engine to notice a task has completed. + .. |pe| replace:: process_executor .. |cfp| replace:: concurrent.futures.process .. |cft| replace:: concurrent.futures.thread .. |cf| replace:: concurrent.futures @@ -555,7 +557,7 @@ String (case insensitive) Executor used _ExecutorTypeMatch((futures.ThreadPoolExecutor,), executor.ParallelThreadTaskExecutor), _ExecutorTypeMatch((futures.ProcessPoolExecutor,), - executor.ParallelProcessTaskExecutor), + process_executor.ParallelProcessTaskExecutor), _ExecutorTypeMatch((futures.Executor,), executor.ParallelThreadTaskExecutor), ] @@ -565,7 +567,7 @@ String (case insensitive) Executor used # will be lower-cased before checking). _executor_str_matchers = [ _ExecutorTextMatch(frozenset(['processes', 'process']), - executor.ParallelProcessTaskExecutor), + process_executor.ParallelProcessTaskExecutor), _ExecutorTextMatch(frozenset(['thread', 'threads', 'threaded']), executor.ParallelThreadTaskExecutor), _ExecutorTextMatch(frozenset(['greenthread', 'greenthreads', diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index ff4f400d..9e5c6f92 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -15,45 +15,19 @@ # under the License. import abc -import collections -from multiprocessing import managers -import os -import pickle -import threading import futurist -from oslo_utils import excutils -from oslo_utils import reflection -from oslo_utils import timeutils -from oslo_utils import uuidutils import six -from six.moves import queue as compat_queue from taskflow import logging -from taskflow import task as task_atom +from taskflow import task as ta from taskflow.types import failure from taskflow.types import notifier -from taskflow.utils import threading_utils # Execution and reversion outcomes. EXECUTED = 'executed' REVERTED = 'reverted' -# See http://bugs.python.org/issue1457119 for why this is so complex... -_PICKLE_ERRORS = [pickle.PickleError, TypeError] -try: - import cPickle as _cPickle - _PICKLE_ERRORS.append(_cPickle.PickleError) -except ImportError: - pass -_PICKLE_ERRORS = tuple(_PICKLE_ERRORS) -_SEND_ERRORS = (IOError, EOFError) -_UPDATE_PROGRESS = task_atom.EVENT_UPDATE_PROGRESS - -# Message types/kind sent from worker/child processes... -_KIND_COMPLETE_ME = 'complete_me' -_KIND_EVENT = 'event' - LOG = logging.getLogger(__name__) @@ -75,7 +49,7 @@ def _revert_retry(retry, arguments): def _execute_task(task, arguments, progress_callback=None): with notifier.register_deregister(task.notifier, - _UPDATE_PROGRESS, + ta.EVENT_UPDATE_PROGRESS, callback=progress_callback): try: task.pre_execute() @@ -91,10 +65,10 @@ def _execute_task(task, arguments, progress_callback=None): def _revert_task(task, arguments, result, failures, progress_callback=None): arguments = arguments.copy() - arguments[task_atom.REVERT_RESULT] = result - arguments[task_atom.REVERT_FLOW_FAILURES] = failures + arguments[ta.REVERT_RESULT] = result + arguments[ta.REVERT_FLOW_FAILURES] = failures with notifier.register_deregister(task.notifier, - _UPDATE_PROGRESS, + ta.EVENT_UPDATE_PROGRESS, callback=progress_callback): try: task.pre_revert() @@ -108,235 +82,6 @@ def _revert_task(task, arguments, result, failures, progress_callback=None): return (REVERTED, result) -class _ViewableSyncManager(managers.SyncManager): - """Manager that exposes its state as methods.""" - - def is_shutdown(self): - return self._state.value == managers.State.SHUTDOWN - - def is_running(self): - return self._state.value == managers.State.STARTED - - -class _Channel(object): - """Helper wrapper around a multiprocessing queue used by a worker.""" - - def __init__(self, queue, identity): - self._queue = queue - self._identity = identity - self._sent_messages = collections.defaultdict(int) - self._pid = None - - @property - def sent_messages(self): - return self._sent_messages - - def put(self, message): - # NOTE(harlowja): this is done in late in execution to ensure that this - # happens in the child process and not the parent process (where the - # constructor is called). - if self._pid is None: - self._pid = os.getpid() - message.update({ - 'sent_on': timeutils.utcnow(), - 'sender': { - 'pid': self._pid, - 'id': self._identity, - }, - }) - if 'body' not in message: - message['body'] = {} - try: - self._queue.put(message) - except _PICKLE_ERRORS: - LOG.warn("Failed serializing message %s", message, exc_info=True) - return False - except _SEND_ERRORS: - LOG.warn("Failed sending message %s", message, exc_info=True) - return False - else: - self._sent_messages[message['kind']] += 1 - return True - - -class _WaitWorkItem(object): - """The piece of work that will executed by a process executor. - - This will call the target function, then wait until the tasks emitted - events/items have been depleted before offically being finished. - - NOTE(harlowja): this is done so that the task function will *not* return - until all of its notifications have been proxied back to its originating - task. If we didn't do this then the executor would see this task as done - and then potentially start tasks that are successors of the task that just - finished even though notifications are still left to be sent from the - previously finished task... - """ - - def __init__(self, channel, barrier, - func, task, *args, **kwargs): - self._channel = channel - self._barrier = barrier - self._func = func - self._task = task - self._args = args - self._kwargs = kwargs - - def _on_finish(self): - sent_events = self._channel.sent_messages.get(_KIND_EVENT, 0) - if sent_events: - message = { - 'created_on': timeutils.utcnow(), - 'kind': _KIND_COMPLETE_ME, - } - if self._channel.put(message): - watch = timeutils.StopWatch() - watch.start() - self._barrier.wait() - LOG.trace("Waited %s seconds until task '%s' %s emitted" - " notifications were depleted", watch.elapsed(), - self._task, sent_events) - - def __call__(self): - args = self._args - kwargs = self._kwargs - try: - return self._func(self._task, *args, **kwargs) - finally: - self._on_finish() - - -class _EventSender(object): - """Sends event information from a child worker process to its creator.""" - - def __init__(self, channel): - self._channel = channel - - def __call__(self, event_type, details): - message = { - 'created_on': timeutils.utcnow(), - 'kind': _KIND_EVENT, - 'body': { - 'event_type': event_type, - 'details': details, - }, - } - self._channel.put(message) - - -class _Target(object): - """An immutable helper object that represents a target of a message.""" - - def __init__(self, task, barrier, identity): - self.task = task - self.barrier = barrier - self.identity = identity - # Counters used to track how many message 'kinds' were proxied... - self.dispatched = collections.defaultdict(int) - - def __repr__(self): - return "<%s at 0x%x targeting '%s' with identity '%s'>" % ( - reflection.get_class_name(self), id(self), - self.task, self.identity) - - -class _Dispatcher(object): - """Dispatches messages received from child worker processes.""" - - # When the run() method is busy (typically in a thread) we want to set - # these so that the thread can know how long to sleep when there is no - # active work to dispatch. - _SPIN_PERIODICITY = 0.01 - - def __init__(self, dispatch_periodicity=None): - if dispatch_periodicity is None: - dispatch_periodicity = self._SPIN_PERIODICITY - if dispatch_periodicity <= 0: - raise ValueError("Provided dispatch periodicity must be greater" - " than zero and not '%s'" % dispatch_periodicity) - self._targets = {} - self._dead = threading.Event() - self._dispatch_periodicity = dispatch_periodicity - self._stop_when_empty = False - - def register(self, identity, target): - self._targets[identity] = target - - def deregister(self, identity): - try: - target = self._targets.pop(identity) - except KeyError: - pass - else: - # Just incase set the barrier to unblock any worker... - target.barrier.set() - if LOG.isEnabledFor(logging.TRACE): - LOG.trace("Dispatched %s messages %s to target '%s' during" - " the lifetime of its existence in the dispatcher", - sum(six.itervalues(target.dispatched)), - dict(target.dispatched), target) - - def reset(self): - self._stop_when_empty = False - self._dead.clear() - if self._targets: - leftover = set(six.iterkeys(self._targets)) - while leftover: - self.deregister(leftover.pop()) - - def interrupt(self): - self._stop_when_empty = True - self._dead.set() - - def _dispatch(self, message): - if LOG.isEnabledFor(logging.TRACE): - LOG.trace("Dispatching message %s (it took %s seconds" - " for it to arrive for processing after being" - " sent)", message, - timeutils.delta_seconds(message['sent_on'], - timeutils.utcnow())) - try: - kind = message['kind'] - sender = message['sender'] - body = message['body'] - except (KeyError, ValueError, TypeError): - LOG.warn("Badly formatted message %s received", message, - exc_info=True) - return - target = self._targets.get(sender['id']) - if target is None: - # Must of been removed... - return - if kind == _KIND_COMPLETE_ME: - target.dispatched[kind] += 1 - target.barrier.set() - elif kind == _KIND_EVENT: - task = target.task - target.dispatched[kind] += 1 - task.notifier.notify(body['event_type'], body['details']) - else: - LOG.warn("Unknown message '%s' found in message from sender" - " %s to target '%s'", kind, sender, target) - - def run(self, queue): - watch = timeutils.StopWatch(duration=self._dispatch_periodicity) - while (not self._dead.is_set() or - (self._stop_when_empty and self._targets)): - watch.restart() - leftover = watch.leftover() - while leftover: - try: - message = queue.get(timeout=leftover) - except compat_queue.Empty: - break - else: - self._dispatch(message) - leftover = watch.leftover() - leftover = watch.leftover() - if leftover: - self._dead.wait(leftover) - - class SerialRetryExecutor(object): """Executes and reverts retries.""" @@ -492,149 +237,3 @@ class ParallelGreenThreadTaskExecutor(ParallelThreadTaskExecutor): if max_workers is None: max_workers = self.DEFAULT_WORKERS return futurist.GreenThreadPoolExecutor(max_workers=max_workers) - - -class ParallelProcessTaskExecutor(ParallelTaskExecutor): - """Executes tasks in parallel using a process pool executor. - - NOTE(harlowja): this executor executes tasks in external processes, so that - implies that tasks that are sent to that external process are pickleable - since this is how the multiprocessing works (sending pickled objects back - and forth) and that the bound handlers (for progress updating in - particular) are proxied correctly from that external process to the one - that is alive in the parent process to ensure that callbacks registered in - the parent are executed on events in the child. - """ - - constructor_options = [ - ('max_workers', lambda v: v if v is None else int(v)), - ('dispatch_periodicity', lambda v: v if v is None else float(v)), - ] - """ - Optional constructor keyword arguments this executor supports. These will - typically be passed via engine options (by a engine user) and converted - into the correct type before being sent into this - classes ``__init__`` method. - """ - - def __init__(self, executor=None, max_workers=None, - dispatch_periodicity=None): - super(ParallelProcessTaskExecutor, self).__init__( - executor=executor, max_workers=max_workers) - self._manager = _ViewableSyncManager() - self._dispatcher = _Dispatcher( - dispatch_periodicity=dispatch_periodicity) - # Only created after starting... - self._worker = None - self._queue = None - - def _create_executor(self, max_workers=None): - return futurist.ProcessPoolExecutor(max_workers=max_workers) - - def start(self): - if threading_utils.is_alive(self._worker): - raise RuntimeError("Worker thread must be stopped via stop()" - " before starting/restarting") - super(ParallelProcessTaskExecutor, self).start() - # These don't seem restartable; make a new one... - if self._manager.is_shutdown(): - self._manager = _ViewableSyncManager() - if not self._manager.is_running(): - self._manager.start() - self._dispatcher.reset() - self._queue = self._manager.Queue() - self._worker = threading_utils.daemon_thread(self._dispatcher.run, - self._queue) - self._worker.start() - - def stop(self): - self._dispatcher.interrupt() - super(ParallelProcessTaskExecutor, self).stop() - if threading_utils.is_alive(self._worker): - self._worker.join() - self._worker = None - self._queue = None - self._dispatcher.reset() - self._manager.shutdown() - self._manager.join() - - def _rebind_task(self, task, clone, channel, progress_callback=None): - # Creates and binds proxies for all events the task could receive - # so that when the clone runs in another process that this task - # can recieve the same notifications (thus making it look like the - # the notifications are transparently happening in this process). - needed = set() - for (event_type, listeners) in task.notifier.listeners_iter(): - if listeners: - needed.add(event_type) - if progress_callback is not None: - needed.add(_UPDATE_PROGRESS) - if needed: - sender = _EventSender(channel) - for event_type in needed: - clone.notifier.register(event_type, sender) - - def _submit_task(self, func, task, *args, **kwargs): - """Submit a function to run the given task (with given args/kwargs). - - NOTE(harlowja): Adjust all events to be proxies instead since we want - those callbacks to be activated in this process, not in the child, - also since typically callbacks are functors (or callables) we can - not pickle those in the first place... - - To make sure people understand how this works, the following is a - lengthy description of what is going on here, read at will: - - So to ensure that we are proxying task triggered events that occur - in the executed subprocess (which will be created and used by the - thing using the multiprocessing based executor) we need to establish - a link between that process and this process that ensures that when a - event is triggered in that task in that process that a corresponding - event is triggered on the original task that was requested to be ran - in this process. - - To accomplish this we have to create a copy of the task (without - any listeners) and then reattach a new set of listeners that will - now instead of calling the desired listeners just place messages - for this process (a dispatcher thread that is created in this class) - to dispatch to the original task (using a common queue + per task - sender identity/target that is used and associated to know which task - to proxy back too, since it is possible that there many be *many* - subprocess running at the same time, each running a different task - and using the same common queue to submit messages back to). - - Once the subprocess task has finished execution, the executor will - then trigger a callback that will remove the task + target from the - dispatcher (which will stop any further proxying back to the original - task). - """ - progress_callback = kwargs.pop('progress_callback', None) - clone = task.copy(retain_listeners=False) - identity = uuidutils.generate_uuid() - target = _Target(task, self._manager.Event(), identity) - channel = _Channel(self._queue, identity) - self._rebind_task(task, clone, channel, - progress_callback=progress_callback) - - def register(): - if progress_callback is not None: - task.notifier.register(_UPDATE_PROGRESS, progress_callback) - self._dispatcher.register(identity, target) - - def deregister(): - if progress_callback is not None: - task.notifier.deregister(_UPDATE_PROGRESS, progress_callback) - self._dispatcher.deregister(identity) - - register() - work = _WaitWorkItem(channel, target.barrier, - func, clone, *args, **kwargs) - try: - fut = self._executor.submit(work) - except RuntimeError: - with excutils.save_and_reraise_exception(): - deregister() - - fut.atom = task - fut.add_done_callback(lambda fut: deregister()) - return fut diff --git a/taskflow/engines/action_engine/process_executor.py b/taskflow/engines/action_engine/process_executor.py new file mode 100644 index 00000000..c45a92df --- /dev/null +++ b/taskflow/engines/action_engine/process_executor.py @@ -0,0 +1,711 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import asyncore +import binascii +import collections +import errno +import functools +import hmac +import math +import os +import pickle +import socket +import struct +import time + +import futurist +from oslo_utils import excutils +import six + +from taskflow.engines.action_engine import executor as base +from taskflow import logging +from taskflow import task as ta +from taskflow.utils import iter_utils +from taskflow.utils import misc +from taskflow.utils import schema_utils as su +from taskflow.utils import threading_utils + +LOG = logging.getLogger(__name__) + +# Internal parent <-> child process protocol schema, message constants... +MAGIC_HEADER = 0xDECAF +CHALLENGE = 'identify_yourself' +CHALLENGE_RESPONSE = 'worker_reporting_in' +ACK = 'ack' +EVENT = 'event' +SCHEMAS = { + # Basic jsonschemas for verifying that the data we get back and + # forth from parent <-> child observes at least a basic expected + # format. + CHALLENGE: { + "type": "string", + "minLength": 1, + }, + ACK: { + "type": "string", + "minLength": 1, + }, + CHALLENGE_RESPONSE: { + "type": "string", + "minLength": 1, + }, + EVENT: { + "type": "object", + "properties": { + 'event_type': { + "type": "string", + }, + 'sent_on': { + "type": "number", + }, + }, + "required": ['event_type', 'sent_on'], + "additionalProperties": True, + }, +} + +# See http://bugs.python.org/issue1457119 for why this is so complex... +_DECODE_ENCODE_ERRORS = [pickle.PickleError, TypeError] +try: + import cPickle + _DECODE_ENCODE_ERRORS.append(cPickle.PickleError) + del cPickle +except (ImportError, AttributeError): + pass +_DECODE_ENCODE_ERRORS = tuple(_DECODE_ENCODE_ERRORS) + +# Use the best pickle from here on out... +from six.moves import cPickle as pickle + + +class UnknownSender(Exception): + """Exception raised when message from unknown sender is recvd.""" + + +class ChallengeIgnored(Exception): + """Exception raised when challenge has not been responded to.""" + + +class Reader(object): + """Reader machine that streams & parses messages that it then dispatches. + + TODO(harlowja): Use python-suitcase in the future when the following + are addressed/resolved and released: + + - https://github.com/digidotcom/python-suitcase/issues/28 + - https://github.com/digidotcom/python-suitcase/issues/29 + + Binary format format is the following (no newlines in actual format):: + + (4 bytes) + (4 bytes) + (1 or more variable bytes) + (4 bytes) + (1 or more variable bytes) + (4 bytes) + (1 or more variable bytes) + """ + + #: Per state memory initializers. + _INITIALIZERS = { + 'magic_header_left': 4, + 'mac_header_left': 4, + 'identity_header_left': 4, + 'msg_header_left': 4, + } + + #: Linear steps/transitions (order matters here). + _TRANSITIONS = tuple([ + 'magic_header_left', + 'mac_header_left', + 'mac_left', + 'identity_header_left', + 'identity_left', + 'msg_header_left', + 'msg_left', + ]) + + def __init__(self, auth_key, dispatch_func, msg_limit=-1): + if not six.callable(dispatch_func): + raise ValueError("Expected provided dispatch function" + " to be callable") + self.auth_key = auth_key + self.dispatch_func = dispatch_func + msg_limiter = iter_utils.iter_forever(msg_limit) + self.msg_count = six.next(msg_limiter) + self._msg_limiter = msg_limiter + self._buffer = misc.BytesIO() + self._state = None + # Local machine variables and such are stored in here. + self._memory = {} + self._transitions = collections.deque(self._TRANSITIONS) + # This is the per state callback handler set. The first entry reads + # the data and the second entry is called after reading is completed, + # typically to save that data into object memory, or to validate + # it. + self._handlers = { + 'magic_header_left': (self._read_field_data, + self._save_and_validate_magic), + 'mac_header_left': (self._read_field_data, + functools.partial(self._save_pos_integer, + 'mac_left')), + 'mac_left': (functools.partial(self._read_data, 'mac'), + functools.partial(self._save_data, 'mac')), + 'identity_header_left': (self._read_field_data, + functools.partial(self._save_pos_integer, + 'identity_left')), + 'identity_left': (functools.partial(self._read_data, 'identity'), + functools.partial(self._save_data, 'identity')), + 'msg_header_left': (self._read_field_data, + functools.partial(self._save_pos_integer, + 'msg_left')), + 'msg_left': (functools.partial(self._read_data, 'msg'), + self._dispatch_and_reset), + } + # Force transition into first state... + self._transition() + + def _save_pos_integer(self, key_name, data): + key_val = struct.unpack("!i", data)[0] + if key_val <= 0: + raise IOError("Invalid %s length received for key '%s', expected" + " greater than zero length" % (key_val, key_name)) + self._memory[key_name] = key_val + return True + + def _save_data(self, key_name, data): + self._memory[key_name] = data + return True + + def _dispatch_and_reset(self, data): + self.dispatch_func( + self._memory['identity'], + # Lazy evaluate so the message can be thrown out as needed + # (instead of the receiver discarding it after the fact)... + functools.partial(_decode_message, self.auth_key, data, + self._memory['mac'])) + self.msg_count = six.next(self._msg_limiter) + self._memory.clear() + + def _transition(self): + try: + self._state = self._transitions.popleft() + except IndexError: + self._transitions.extend(self._TRANSITIONS) + self._state = self._transitions.popleft() + try: + self._memory[self._state] = self._INITIALIZERS[self._state] + except KeyError: + pass + self._handle_func, self._post_handle_func = self._handlers[self._state] + + def _save_and_validate_magic(self, data): + magic_header = struct.unpack("!i", data)[0] + if magic_header != MAGIC_HEADER: + raise IOError("Invalid magic header received, expected 0x%x but" + " got 0x%x for message %s" % (MAGIC_HEADER, + magic_header, + self.msg_count + 1)) + self._memory['magic'] = magic_header + return True + + def _read_data(self, save_key_name, data): + data_len_left = self._memory[self._state] + self._buffer.write(data[0:data_len_left]) + if len(data) < data_len_left: + data_len_left -= len(data) + self._memory[self._state] = data_len_left + return '' + else: + self._memory[self._state] = 0 + buf_data = self._buffer.getvalue() + self._buffer.reset() + self._post_handle_func(buf_data) + self._transition() + return data[data_len_left:] + + def _read_field_data(self, data): + return self._read_data(self._state, data) + + @property + def bytes_needed(self): + return self._memory.get(self._state, 0) + + def feed(self, data): + while len(data): + data = self._handle_func(data) + + +class BadHmacValueError(ValueError): + """Value error raised when an invalid hmac is discovered.""" + + +def _create_random_string(desired_length): + if desired_length <= 0: + return b'' + data_length = int(math.ceil(desired_length / 2.0)) + data = os.urandom(data_length) + hex_data = binascii.hexlify(data) + return hex_data[0:desired_length] + + +def _calculate_hmac(auth_key, body): + mac = hmac.new(auth_key, body).hexdigest() + if isinstance(mac, six.text_type): + mac = mac.encode("ascii") + return mac + + +def _encode_message(auth_key, message, identity, reverse=False): + message = pickle.dumps(message, 2) + message_mac = _calculate_hmac(auth_key, message) + pieces = [ + struct.pack("!i", MAGIC_HEADER), + struct.pack("!i", len(message_mac)), + message_mac, + struct.pack("!i", len(identity)), + identity, + struct.pack("!i", len(message)), + message, + ] + if reverse: + pieces.reverse() + return tuple(pieces) + + +def _decode_message(auth_key, message, message_mac): + tmp_message_mac = _calculate_hmac(auth_key, message) + if tmp_message_mac != message_mac: + raise BadHmacValueError('Invalid message hmac') + return pickle.loads(message) + + +class Channel(object): + """Object that workers use to communicate back to their creator.""" + + def __init__(self, port, identity, auth_key): + self.identity = identity + self.port = port + self.auth_key = auth_key + self.dead = False + self._sent = self._received = 0 + self._socket = None + self._read_pipe = None + self._write_pipe = None + + def close(self): + if self._socket is not None: + self._socket.close() + self._socket = None + self._read_pipe = None + self._write_pipe = None + + def _ensure_connected(self): + if self._socket is None: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setblocking(1) + try: + s.connect(("", self.port)) + except socket.error as e: + with excutils.save_and_reraise_exception(): + s.close() + if e.errno in (errno.ECONNREFUSED, errno.ENOTCONN, + errno.ECONNRESET): + # Don't bother with further connections... + self.dead = True + read_pipe = s.makefile("rb", 0) + write_pipe = s.makefile("wb", 0) + try: + msg = self._do_recv(read_pipe=read_pipe) + su.schema_validate(msg, SCHEMAS[CHALLENGE]) + if msg != CHALLENGE: + raise IOError("Challenge expected not received") + else: + pieces = _encode_message(self.auth_key, + CHALLENGE_RESPONSE, + self.identity) + self._do_send_and_ack(pieces, write_pipe=write_pipe, + read_pipe=read_pipe) + except Exception: + with excutils.save_and_reraise_exception(): + s.close() + else: + self._socket = s + self._read_pipe = read_pipe + self._write_pipe = write_pipe + + def recv(self): + self._ensure_connected() + return self._do_recv() + + def _do_recv(self, read_pipe=None): + if read_pipe is None: + read_pipe = self._read_pipe + msg_capture = collections.deque(maxlen=1) + msg_capture_func = (lambda _from_who, msg_decoder_func: + msg_capture.append(msg_decoder_func())) + reader = Reader(self.auth_key, msg_capture_func, msg_limit=1) + try: + maybe_msg_num = self._received + 1 + bytes_needed = reader.bytes_needed + while True: + blob = read_pipe.read(bytes_needed) + if len(blob) != bytes_needed: + raise EOFError("Read pipe closed while reading %s" + " bytes for potential message %s" + % (bytes_needed, maybe_msg_num)) + reader.feed(blob) + bytes_needed = reader.bytes_needed + except StopIteration: + pass + msg = msg_capture[0] + self._received += 1 + return msg + + def _do_send(self, pieces, write_pipe=None): + if write_pipe is None: + write_pipe = self._write_pipe + for piece in pieces: + write_pipe.write(piece) + write_pipe.flush() + + def _do_send_and_ack(self, pieces, write_pipe=None, read_pipe=None): + self._do_send(pieces, write_pipe=write_pipe) + self._sent += 1 + msg = self._do_recv(read_pipe=read_pipe) + su.schema_validate(msg, SCHEMAS[ACK]) + if msg != ACK: + raise IOError("Failed receiving ack for sent" + " message %s" % self._metrics['sent']) + + def send(self, message): + self._ensure_connected() + self._do_send_and_ack(_encode_message(self.auth_key, message, + self.identity)) + + +class EventSender(object): + """Sends event information from a child worker process to its creator.""" + + def __init__(self, channel): + self._channel = channel + self._pid = None + + def __call__(self, event_type, details): + if not self._channel.dead: + if self._pid is None: + self._pid = os.getpid() + message = { + 'event_type': event_type, + 'details': details, + 'sent_on': time.time(), + } + LOG.trace("Sending %s (from child %s)", message, self._pid) + self._channel.send(message) + + +class DispatcherHandler(asyncore.dispatcher): + """Dispatches from a single connection into a target.""" + + #: Read/write chunk size. + CHUNK_SIZE = 8192 + + def __init__(self, sock, addr, dispatcher): + if six.PY2: + asyncore.dispatcher.__init__(self, map=dispatcher.map, sock=sock) + else: + super(DispatcherHandler, self).__init__(map=dispatcher.map, + sock=sock) + self.blobs_to_write = list(dispatcher.challenge_pieces) + self.reader = Reader(dispatcher.auth_key, self._dispatch) + self.targets = dispatcher.targets + self.tied_to = None + self.challenge_responded = False + self.ack_pieces = _encode_message(dispatcher.auth_key, ACK, + dispatcher.identity, + reverse=True) + self.addr = addr + + def handle_close(self): + self.close() + + def writable(self): + return bool(self.blobs_to_write) + + def handle_write(self): + try: + blob = self.blobs_to_write.pop() + except IndexError: + pass + else: + sent = self.send(blob[0:self.CHUNK_SIZE]) + if sent < len(blob): + self.blobs_to_write.append(blob[sent:]) + + def _send_ack(self): + self.blobs_to_write.extend(self.ack_pieces) + + def _dispatch(self, from_who, msg_decoder_func): + if not self.challenge_responded: + msg = msg_decoder_func() + su.schema_validate(msg, SCHEMAS[CHALLENGE_RESPONSE]) + if msg != CHALLENGE_RESPONSE: + raise ChallengeIgnored("Discarding connection from %s" + " challenge was not responded to" + % self.addr) + else: + LOG.trace("Peer %s (%s) has passed challenge sequence", + self.addr, from_who) + self.challenge_responded = True + self.tied_to = from_who + self._send_ack() + else: + if self.tied_to != from_who: + raise UnknownSender("Sender %s previously identified as %s" + " changed there identity to %s after" + " challenge sequence" % (self.addr, + self.tied_to, + from_who)) + try: + task = self.targets[from_who] + except KeyError: + raise UnknownSender("Unknown message from %s (%s) not matched" + " to any known target" % (self.addr, + from_who)) + msg = msg_decoder_func() + su.schema_validate(msg, SCHEMAS[EVENT]) + if LOG.isEnabledFor(logging.TRACE): + msg_delay = max(0, time.time() - msg['sent_on']) + LOG.trace("Dispatching message from %s (%s) (it took %0.3f" + " seconds for it to arrive for processing after" + " being sent)", self.addr, from_who, msg_delay) + task.notifier.notify(msg['event_type'], msg.get('details')) + self._send_ack() + + def handle_read(self): + data = self.recv(self.CHUNK_SIZE) + if len(data) == 0: + self.handle_close() + else: + try: + self.reader.feed(data) + except (IOError, UnknownSender): + LOG.warn("Invalid received message", exc_info=True) + self.handle_close() + except _DECODE_ENCODE_ERRORS: + LOG.warn("Badly formatted message", exc_info=True) + self.handle_close() + except (ValueError, su.ValidationError): + LOG.warn("Failed validating message", exc_info=True) + self.handle_close() + except ChallengeIgnored: + LOG.warn("Failed challenge sequence", exc_info=True) + self.handle_close() + + +class Dispatcher(asyncore.dispatcher): + """Accepts messages received from child worker processes.""" + + #: See https://docs.python.org/2/library/socket.html#socket.socket.listen + MAX_BACKLOG = 5 + + def __init__(self, map, auth_key, identity): + if six.PY2: + asyncore.dispatcher.__init__(self, map=map) + else: + super(Dispatcher, self).__init__(map=map) + self.identity = identity + self.challenge_pieces = _encode_message(auth_key, CHALLENGE, + identity, reverse=True) + self.auth_key = auth_key + self.targets = {} + + @property + def port(self): + if self.socket is not None: + return self.socket.getsockname()[1] + else: + return None + + def setup(self): + self.targets.clear() + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.bind(("", 0)) + LOG.trace("Accepting dispatch requests on port %s", self.port) + self.listen(self.MAX_BACKLOG) + + def writable(self): + return False + + @property + def map(self): + return self._map + + def handle_close(self): + if self.socket is not None: + self.close() + + def handle_accept(self): + pair = self.accept() + if pair is not None: + sock, addr = pair + addr = "%s:%s" % (addr[0], addr[1]) + LOG.trace("Potentially accepted new connection from %s", addr) + DispatcherHandler(sock, addr, self) + + +class ParallelProcessTaskExecutor(base.ParallelTaskExecutor): + """Executes tasks in parallel using a process pool executor. + + NOTE(harlowja): this executor executes tasks in external processes, so that + implies that tasks that are sent to that external process are pickleable + since this is how the multiprocessing works (sending pickled objects back + and forth) and that the bound handlers (for progress updating in + particular) are proxied correctly from that external process to the one + that is alive in the parent process to ensure that callbacks registered in + the parent are executed on events in the child. + """ + + #: Default timeout used by asyncore io loop (and eventually select/poll). + WAIT_TIMEOUT = 0.01 + + constructor_options = [ + ('max_workers', lambda v: v if v is None else int(v)), + ('wait_timeout', lambda v: v if v is None else float(v)), + ] + """ + Optional constructor keyword arguments this executor supports. These will + typically be passed via engine options (by a engine user) and converted + into the correct type before being sent into this + classes ``__init__`` method. + """ + + def __init__(self, executor=None, + max_workers=None, wait_timeout=None): + super(ParallelProcessTaskExecutor, self).__init__( + executor=executor, max_workers=max_workers) + self._auth_key = _create_random_string(32) + self._dispatcher = Dispatcher({}, self._auth_key, + _create_random_string(32)) + if wait_timeout is None: + self._wait_timeout = self.WAIT_TIMEOUT + else: + if wait_timeout <= 0: + raise ValueError("Provided wait timeout must be greater" + " than zero and not '%s'" % wait_timeout) + self._wait_timeout = wait_timeout + # Only created after starting... + self._worker = None + + def _create_executor(self, max_workers=None): + return futurist.ProcessPoolExecutor(max_workers=max_workers) + + def start(self): + if threading_utils.is_alive(self._worker): + raise RuntimeError("Worker thread must be stopped via stop()" + " before starting/restarting") + super(ParallelProcessTaskExecutor, self).start() + self._dispatcher.setup() + self._worker = threading_utils.daemon_thread( + asyncore.loop, map=self._dispatcher.map, + timeout=self._wait_timeout) + self._worker.start() + + def stop(self): + super(ParallelProcessTaskExecutor, self).stop() + self._dispatcher.close() + if threading_utils.is_alive(self._worker): + self._worker.join() + self._worker = None + + def _submit_task(self, func, task, *args, **kwargs): + """Submit a function to run the given task (with given args/kwargs). + + NOTE(harlowja): Adjust all events to be proxies instead since we want + those callbacks to be activated in this process, not in the child, + also since typically callbacks are functors (or callables) we can + not pickle those in the first place... + + To make sure people understand how this works, the following is a + lengthy description of what is going on here, read at will: + + So to ensure that we are proxying task triggered events that occur + in the executed subprocess (which will be created and used by the + thing using the multiprocessing based executor) we need to establish + a link between that process and this process that ensures that when a + event is triggered in that task in that process that a corresponding + event is triggered on the original task that was requested to be ran + in this process. + + To accomplish this we have to create a copy of the task (without + any listeners) and then reattach a new set of listeners that will + now instead of calling the desired listeners just place messages + for this process (a dispatcher thread that is created in this class) + to dispatch to the original task (using a common accepting socket and + per task sender socket that is used and associated to know + which task to proxy back too, since it is possible that there many + be *many* subprocess running at the same time). + + Once the subprocess task has finished execution, the executor will + then trigger a callback that will remove the task + target from the + dispatcher (which will stop any further proxying back to the original + task). + """ + progress_callback = kwargs.pop('progress_callback', None) + clone = task.copy(retain_listeners=False) + identity = _create_random_string(32) + channel = Channel(self._dispatcher.port, identity, self._auth_key) + + def rebind_task(): + # Creates and binds proxies for all events the task could receive + # so that when the clone runs in another process that this task + # can recieve the same notifications (thus making it look like the + # the notifications are transparently happening in this process). + needed = set() + for (event_type, listeners) in task.notifier.listeners_iter(): + if listeners: + needed.add(event_type) + if progress_callback is not None: + needed.add(ta.EVENT_UPDATE_PROGRESS) + if needed: + sender = EventSender(channel) + for event_type in needed: + clone.notifier.register(event_type, sender) + + def register(): + if progress_callback is not None: + task.notifier.register(ta.EVENT_UPDATE_PROGRESS, + progress_callback) + self._dispatcher.targets[identity] = task + + def deregister(fut=None): + if progress_callback is not None: + task.notifier.deregister(ta.EVENT_UPDATE_PROGRESS, + progress_callback) + self._dispatcher.targets.pop(identity, None) + + rebind_task() + register() + try: + fut = self._executor.submit(func, clone, *args, **kwargs) + except RuntimeError: + with excutils.save_and_reraise_exception(): + deregister() + + fut.atom = task + fut.add_done_callback(deregister) + return fut diff --git a/taskflow/tests/unit/action_engine/test_creation.py b/taskflow/tests/unit/action_engine/test_creation.py index a2390993..1568dfe1 100644 --- a/taskflow/tests/unit/action_engine/test_creation.py +++ b/taskflow/tests/unit/action_engine/test_creation.py @@ -19,6 +19,7 @@ import testtools from taskflow.engines.action_engine import engine from taskflow.engines.action_engine import executor +from taskflow.engines.action_engine import process_executor from taskflow.patterns import linear_flow as lf from taskflow.persistence import backends from taskflow import test @@ -47,7 +48,7 @@ class ParallelCreationTest(test.TestCase): for s in ['process', 'processes']: eng = self._create_engine(executor=s) self.assertIsInstance(eng._task_executor, - executor.ParallelProcessTaskExecutor) + process_executor.ParallelProcessTaskExecutor) def test_thread_executor_creation(self): with futurist.ThreadPoolExecutor(1) as e: @@ -59,7 +60,7 @@ class ParallelCreationTest(test.TestCase): with futurist.ProcessPoolExecutor(1) as e: eng = self._create_engine(executor=e) self.assertIsInstance(eng._task_executor, - executor.ParallelProcessTaskExecutor) + process_executor.ParallelProcessTaskExecutor) @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') def test_green_executor_creation(self): diff --git a/taskflow/tests/unit/action_engine/test_process_executor.py b/taskflow/tests/unit/action_engine/test_process_executor.py new file mode 100644 index 00000000..2bca18f3 --- /dev/null +++ b/taskflow/tests/unit/action_engine/test_process_executor.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import asyncore +import errno +import socket +import threading + +from taskflow.engines.action_engine import process_executor as pu +from taskflow import task +from taskflow import test +from taskflow.test import mock +from taskflow.tests import utils as test_utils + + +class ProcessExecutorHelpersTest(test.TestCase): + def test_reader(self): + capture_buf = [] + + def do_capture(identity, message_capture_func): + capture_buf.append(message_capture_func()) + + r = pu.Reader(b"secret", do_capture) + for data in pu._encode_message(b"secret", ['hi'], b'me'): + self.assertEqual(len(data), r.bytes_needed) + r.feed(data) + + self.assertEqual(1, len(capture_buf)) + self.assertEqual(['hi'], capture_buf[0]) + + def test_bad_hmac_reader(self): + r = pu.Reader(b"secret-2", lambda ident, capture_func: capture_func()) + in_data = b"".join(pu._encode_message(b"secret", ['hi'], b'me')) + self.assertRaises(pu.BadHmacValueError, r.feed, in_data) + + @mock.patch("socket.socket") + def test_no_connect_channel(self, mock_socket_factory): + mock_sock = mock.MagicMock() + mock_socket_factory.return_value = mock_sock + mock_sock.connect.side_effect = socket.error(errno.ECONNREFUSED, + 'broken') + c = pu.Channel(2222, b"me", b"secret") + self.assertRaises(socket.error, c.send, "hi") + self.assertTrue(c.dead) + self.assertTrue(mock_sock.close.called) + + def test_send_and_dispatch(self): + details_capture = [] + + t = test_utils.DummyTask("rcver") + t.notifier.register( + task.EVENT_UPDATE_PROGRESS, + lambda _event_type, details: details_capture.append(details)) + + d = pu.Dispatcher({}, b'secret', b'server-josh') + d.setup() + d.targets[b'child-josh'] = t + + s = threading.Thread(target=asyncore.loop, kwargs={'map': d.map}) + s.start() + self.addCleanup(s.join) + + c = pu.Channel(d.port, b'child-josh', b'secret') + self.addCleanup(c.close) + + send_what = [ + {'progress': 0.1}, + {'progress': 0.2}, + {'progress': 0.3}, + {'progress': 0.4}, + {'progress': 0.5}, + {'progress': 0.6}, + {'progress': 0.7}, + {'progress': 0.8}, + {'progress': 0.9}, + ] + e_s = pu.EventSender(c) + for details in send_what: + e_s(task.EVENT_UPDATE_PROGRESS, details) + + # This forces the thread to shutdown (since the asyncore loop + # will exit when no more sockets exist to process...) + d.close() + + self.assertEqual(len(send_what), len(details_capture)) + self.assertEqual(send_what, details_capture) diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index d16ce574..5a70d844 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import contextlib import functools import threading @@ -49,6 +50,41 @@ _EMPTY_TRANSITIONS = [ ] +class EngineTaskNotificationsTest(object): + def test_run_capture_task_notifications(self): + captured = collections.defaultdict(list) + + def do_capture(bound_name, event_type, details): + progress_capture = captured[bound_name] + progress_capture.append(details) + + flow = lf.Flow("flow") + work_1 = utils.MultiProgressingTask('work-1') + work_1.notifier.register(task.EVENT_UPDATE_PROGRESS, + functools.partial(do_capture, 'work-1')) + work_2 = utils.MultiProgressingTask('work-2') + work_2.notifier.register(task.EVENT_UPDATE_PROGRESS, + functools.partial(do_capture, 'work-2')) + flow.add(work_1, work_2) + + # NOTE(harlowja): These were selected so that float comparison will + # work vs not work... + progress_chunks = tuple([0.2, 0.5, 0.8]) + engine = self._make_engine( + flow, store={'progress_chunks': progress_chunks}) + engine.run() + + expected = [ + {'progress': 0.0}, + {'progress': 0.2}, + {'progress': 0.5}, + {'progress': 0.8}, + {'progress': 1.0}, + ] + for name in ['work-1', 'work-2']: + self.assertEqual(expected, captured[name]) + + class EngineTaskTest(object): def test_run_task_as_flow(self): @@ -59,7 +95,7 @@ class EngineTaskTest(object): expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)'] self.assertEqual(expected, capturer.values) - def test_run_task_with_notifications(self): + def test_run_task_with_flow_notifications(self): flow = utils.ProgressingTask(name='task1') engine = self._make_engine(flow) with utils.CaptureListener(engine) as capturer: @@ -68,7 +104,7 @@ class EngineTaskTest(object): 'task1.t SUCCESS(5)', 'task1.f SUCCESS'] self.assertEqual(expected, capturer.values) - def test_failing_task_with_notifications(self): + def test_failing_task_with_flow_notifications(self): values = [] flow = utils.FailingTask('fail') engine = self._make_engine(flow) @@ -1369,6 +1405,7 @@ class SerialEngineTest(EngineTaskTest, EngineGraphConditionalFlowTest, EngineCheckingTaskTest, EngineDeciderDepthTest, + EngineTaskNotificationsTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, store=None, **kwargs): @@ -1399,6 +1436,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, EngineGraphConditionalFlowTest, EngineCheckingTaskTest, EngineDeciderDepthTest, + EngineTaskNotificationsTest, test.TestCase): _EXECUTOR_WORKERS = 2 @@ -1443,6 +1481,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest, EngineGraphConditionalFlowTest, EngineCheckingTaskTest, EngineDeciderDepthTest, + EngineTaskNotificationsTest, test.TestCase): def _make_engine(self, flow, @@ -1467,6 +1506,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest, EngineMissingDepsTest, EngineGraphConditionalFlowTest, EngineDeciderDepthTest, + EngineTaskNotificationsTest, test.TestCase): _EXECUTOR_WORKERS = 2 @@ -1499,6 +1539,7 @@ class WorkerBasedEngineTest(EngineTaskTest, EngineMissingDepsTest, EngineGraphConditionalFlowTest, EngineDeciderDepthTest, + EngineTaskNotificationsTest, test.TestCase): def setUp(self): super(WorkerBasedEngineTest, self).setUp() diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index 8716fa20..498e3504 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -33,7 +33,7 @@ class TestWorker(test.MockTestCase): self.broker_url = 'test-url' self.exchange = 'test-exchange' self.topic = 'test-topic' - self.endpoint_count = 28 + self.endpoint_count = 29 # patch classes self.executor_mock, self.executor_inst_mock = self.patchClass( diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 0a71c143..ed4d4e98 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -187,6 +187,13 @@ class CaptureListener(capturing.CaptureListener): return name +class MultiProgressingTask(task.Task): + def execute(self, progress_chunks): + for chunk in progress_chunks: + self.update_progress(chunk) + return len(progress_chunks) + + class ProgressingTask(task.Task): def execute(self, **kwargs): self.update_progress(0.0) @@ -217,14 +224,6 @@ class TaskWithFailure(task.Task): raise RuntimeError('Woot!') -class ProgressingTask(task.Task): - - def execute(self, *args, **kwargs): - self.update_progress(0.0) - self.update_progress(1.0) - return 5 - - class FailingTaskWithOneArg(ProgressingTask): def execute(self, x, **kwargs): raise RuntimeError('Woot with %s' % x) diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 07fc3898..ef121dff 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -66,6 +66,14 @@ class StringIO(six.StringIO): self.write(linesep) +class BytesIO(six.BytesIO): + """Byte buffer with some small additions.""" + + def reset(self): + self.seek(0) + self.truncate() + + def get_hostname(unknown_hostname=UNKNOWN_HOSTNAME): """Gets the machines hostname; if not able to returns an invalid one.""" try: