diff --git a/doc/source/types.rst b/doc/source/types.rst index 254ed28a..b27a3fb4 100644 --- a/doc/source/types.rst +++ b/doc/source/types.rst @@ -17,6 +17,11 @@ Cache .. automodule:: taskflow.types.cache +Entity +====== + +.. automodule:: taskflow.types.entity + Failure ======= diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py index d8f2b4c3..3fd5cb92 100644 --- a/taskflow/conductors/backends/impl_blocking.py +++ b/taskflow/conductors/backends/impl_blocking.py @@ -11,6 +11,10 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +import os +import socket + import threading try: @@ -25,6 +29,7 @@ from taskflow.conductors import base from taskflow import exceptions as excp from taskflow.listeners import logging as logging_listener from taskflow import logging +from taskflow.types import entity from taskflow.types import timing as tt from taskflow.utils import async_utils from taskflow.utils import iter_utils @@ -159,9 +164,30 @@ class BlockingConductor(base.Conductor): LOG.info("Job completed successfully: %s", job) return async_utils.make_completed_future(consume) + def _get_conductor_info(self): + """For right now we just register the conductor name as: + + @: + + """ + hostname = socket.gethostname() + pid = os.getpid() + name = '@'.join([ + self._name, hostname+":"+str(pid)]) + # Can add a lot more information here, + metadata = { + "hostname": hostname, + "pid": pid + } + + return entity.Entity("conductor", name, metadata) + def run(self, max_dispatches=None): self._dead.clear() + # Register a conductor type entity + self._jobboard.register_entity(self._get_conductor_info()) + total_dispatched = 0 try: diff --git a/taskflow/jobs/backends/impl_redis.py b/taskflow/jobs/backends/impl_redis.py index 92a13dae..6f210ac3 100644 --- a/taskflow/jobs/backends/impl_redis.py +++ b/taskflow/jobs/backends/impl_redis.py @@ -808,6 +808,10 @@ return cmsgpack.pack(result) ensure_fresh=ensure_fresh, board_fetch_func=lambda ensure_fresh: self._fetch_jobs()) + def register_entity(self, entity): + # Will implement a redis jobboard conductor register later + pass + @base.check_who def consume(self, job, who): script = self._get_script('consume') diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 15b54b13..dc38a0e4 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -236,6 +236,10 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): #: Znode child path created under root path that contains trashed jobs. TRASH_FOLDER = ".trash" + #: Znode child path created under root path that contains registered + #: entities. + ENTITY_FOLDER = ".entities" + #: Znode **prefix** that job entries have. JOB_PREFIX = 'job' @@ -259,6 +263,9 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._path = path self._trash_path = self._path.replace(k_paths.basename(self._path), self.TRASH_FOLDER) + self._entity_path = self._path.replace( + k_paths.basename(self._path), + self.ENTITY_FOLDER) # The backend to load the full logbooks from, since what is sent over # the data connection is only the logbook uuid and name, and not the # full logbook. @@ -300,6 +307,11 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): """Path where all trashed job znodes will be stored.""" return self._trash_path + @property + def entity_path(self): + """Path where all conductor info znodes will be stored.""" + return self._entity_path + @property def job_count(self): return len(self._known_jobs) @@ -552,6 +564,22 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): return (misc.decode_json(lock_data), lock_stat, misc.decode_json(job_data), job_stat) + def register_entity(self, entity): + entity_type = entity.kind + if entity_type == 'conductor': + entity_path = k_paths.join(self.entity_path, entity_type) + self._client.ensure_path(entity_path) + + conductor_name = entity.name + self._client.create(k_paths.join(entity_path, + conductor_name), + value=misc.binary_encode( + jsonutils.dumps(entity.to_dict())), + ephemeral=True) + else: + raise excp.NotImplementedError( + "Not implemented for other entity type '%s'" % entity_type) + @base.check_who def consume(self, job, who): with self._wrap(job.uuid, job.path, diff --git a/taskflow/jobs/base.py b/taskflow/jobs/base.py index 81e4a574..9e95ee1c 100644 --- a/taskflow/jobs/base.py +++ b/taskflow/jobs/base.py @@ -386,6 +386,10 @@ class JobBoard(object): this must be the same name that was used for claiming this job. """ + @abc.abstractmethod + def register_entity(self, entity): + """Register an entity to the jobboard('s backend), e.g: a conductor""" + @abc.abstractproperty def connected(self): """Returns if this jobboard is connected.""" diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py index 375c8dc4..1e0a9760 100644 --- a/taskflow/tests/unit/jobs/test_zk_job.py +++ b/taskflow/tests/unit/jobs/test_zk_job.py @@ -17,6 +17,7 @@ import contextlib import threading +from kazoo.protocol import paths as k_paths from kazoo.recipe import watchers from oslo_serialization import jsonutils from oslo_utils import uuidutils @@ -25,12 +26,14 @@ import testtools from zake import fake_client from zake import utils as zake_utils +from taskflow import exceptions as excp from taskflow.jobs.backends import impl_zookeeper from taskflow import states from taskflow import test from taskflow.test import mock from taskflow.tests.unit.jobs import base from taskflow.tests import utils as test_utils +from taskflow.types import entity from taskflow.utils import kazoo_utils from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils @@ -259,3 +262,34 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): }, 'details': {}, }, jsonutils.loads(misc.binary_decode(paths[path_key]['data']))) + + def test_register_entity(self): + conductor_name = "conductor-abc@localhost:4123" + entity_instance = entity.Entity("conductor", + conductor_name, + {}) + with base.connect_close(self.board): + self.board.register_entity(entity_instance) + # Check '.entity' node has been created + self.assertTrue(self.board.entity_path in self.client.storage.paths) + + conductor_entity_path = k_paths.join(self.board.entity_path, + 'conductor', + conductor_name) + self.assertTrue(conductor_entity_path in self.client.storage.paths) + conductor_data = ( + self.client.storage.paths[conductor_entity_path]['data']) + self.assertTrue(len(conductor_data) > 0) + self.assertDictEqual({ + 'name': conductor_name, + 'kind': 'conductor', + 'metadata': {}, + }, jsonutils.loads(misc.binary_decode(conductor_data))) + + entity_instance_2 = entity.Entity("non-sense", + "other_name", + {}) + with base.connect_close(self.board): + self.assertRaises(excp.NotImplementedError, + self.board.register_entity, + entity_instance_2) diff --git a/taskflow/types/entity.py b/taskflow/types/entity.py new file mode 100644 index 00000000..d46927ce --- /dev/null +++ b/taskflow/types/entity.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Rackspace Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class Entity(object): + """Entity object(s) to be registered on jobboard. + + Now only supports 'kind' of 'conductor'. + """ + def __init__(self, kind, name, metadata): + self.kind = kind + self.name = name + self.metadata = metadata + + def to_dict(self): + return { + 'kind': self.kind, + 'name': self.name, + 'metadata': self.metadata + }