Merge "Register conductor information on jobboard"

This commit is contained in:
Jenkins
2015-10-20 22:02:21 +00:00
committed by Gerrit Code Review
7 changed files with 134 additions and 0 deletions

View File

@@ -17,6 +17,11 @@ Cache
.. automodule:: taskflow.types.cache .. automodule:: taskflow.types.cache
Entity
======
.. automodule:: taskflow.types.entity
Failure Failure
======= =======

View File

@@ -11,6 +11,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os
import socket
import threading import threading
try: try:
@@ -25,6 +29,7 @@ from taskflow.conductors import base
from taskflow import exceptions as excp from taskflow import exceptions as excp
from taskflow.listeners import logging as logging_listener from taskflow.listeners import logging as logging_listener
from taskflow import logging from taskflow import logging
from taskflow.types import entity
from taskflow.types import timing as tt from taskflow.types import timing as tt
from taskflow.utils import async_utils from taskflow.utils import async_utils
from taskflow.utils import iter_utils from taskflow.utils import iter_utils
@@ -159,9 +164,30 @@ class BlockingConductor(base.Conductor):
LOG.info("Job completed successfully: %s", job) LOG.info("Job completed successfully: %s", job)
return async_utils.make_completed_future(consume) return async_utils.make_completed_future(consume)
def _get_conductor_info(self):
"""For right now we just register the conductor name as:
<conductor_name>@<hostname>:<process_pid>
"""
hostname = socket.gethostname()
pid = os.getpid()
name = '@'.join([
self._name, hostname+":"+str(pid)])
# Can add a lot more information here,
metadata = {
"hostname": hostname,
"pid": pid
}
return entity.Entity("conductor", name, metadata)
def run(self, max_dispatches=None): def run(self, max_dispatches=None):
self._dead.clear() self._dead.clear()
# Register a conductor type entity
self._jobboard.register_entity(self._get_conductor_info())
total_dispatched = 0 total_dispatched = 0
try: try:

View File

@@ -808,6 +808,10 @@ return cmsgpack.pack(result)
ensure_fresh=ensure_fresh, ensure_fresh=ensure_fresh,
board_fetch_func=lambda ensure_fresh: self._fetch_jobs()) board_fetch_func=lambda ensure_fresh: self._fetch_jobs())
def register_entity(self, entity):
# Will implement a redis jobboard conductor register later
pass
@base.check_who @base.check_who
def consume(self, job, who): def consume(self, job, who):
script = self._get_script('consume') script = self._get_script('consume')

View File

@@ -236,6 +236,10 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
#: Znode child path created under root path that contains trashed jobs. #: Znode child path created under root path that contains trashed jobs.
TRASH_FOLDER = ".trash" TRASH_FOLDER = ".trash"
#: Znode child path created under root path that contains registered
#: entities.
ENTITY_FOLDER = ".entities"
#: Znode **prefix** that job entries have. #: Znode **prefix** that job entries have.
JOB_PREFIX = 'job' JOB_PREFIX = 'job'
@@ -259,6 +263,9 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
self._path = path self._path = path
self._trash_path = self._path.replace(k_paths.basename(self._path), self._trash_path = self._path.replace(k_paths.basename(self._path),
self.TRASH_FOLDER) self.TRASH_FOLDER)
self._entity_path = self._path.replace(
k_paths.basename(self._path),
self.ENTITY_FOLDER)
# The backend to load the full logbooks from, since what is sent over # The backend to load the full logbooks from, since what is sent over
# the data connection is only the logbook uuid and name, and not the # the data connection is only the logbook uuid and name, and not the
# full logbook. # full logbook.
@@ -300,6 +307,11 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
"""Path where all trashed job znodes will be stored.""" """Path where all trashed job znodes will be stored."""
return self._trash_path return self._trash_path
@property
def entity_path(self):
"""Path where all conductor info znodes will be stored."""
return self._entity_path
@property @property
def job_count(self): def job_count(self):
return len(self._known_jobs) return len(self._known_jobs)
@@ -552,6 +564,22 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
return (misc.decode_json(lock_data), lock_stat, return (misc.decode_json(lock_data), lock_stat,
misc.decode_json(job_data), job_stat) misc.decode_json(job_data), job_stat)
def register_entity(self, entity):
entity_type = entity.kind
if entity_type == 'conductor':
entity_path = k_paths.join(self.entity_path, entity_type)
self._client.ensure_path(entity_path)
conductor_name = entity.name
self._client.create(k_paths.join(entity_path,
conductor_name),
value=misc.binary_encode(
jsonutils.dumps(entity.to_dict())),
ephemeral=True)
else:
raise excp.NotImplementedError(
"Not implemented for other entity type '%s'" % entity_type)
@base.check_who @base.check_who
def consume(self, job, who): def consume(self, job, who):
with self._wrap(job.uuid, job.path, with self._wrap(job.uuid, job.path,

View File

@@ -386,6 +386,10 @@ class JobBoard(object):
this must be the same name that was used for claiming this job. this must be the same name that was used for claiming this job.
""" """
@abc.abstractmethod
def register_entity(self, entity):
"""Register an entity to the jobboard('s backend), e.g: a conductor"""
@abc.abstractproperty @abc.abstractproperty
def connected(self): def connected(self):
"""Returns if this jobboard is connected.""" """Returns if this jobboard is connected."""

View File

@@ -17,6 +17,7 @@
import contextlib import contextlib
import threading import threading
from kazoo.protocol import paths as k_paths
from kazoo.recipe import watchers from kazoo.recipe import watchers
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from oslo_utils import uuidutils from oslo_utils import uuidutils
@@ -25,12 +26,14 @@ import testtools
from zake import fake_client from zake import fake_client
from zake import utils as zake_utils from zake import utils as zake_utils
from taskflow import exceptions as excp
from taskflow.jobs.backends import impl_zookeeper from taskflow.jobs.backends import impl_zookeeper
from taskflow import states from taskflow import states
from taskflow import test from taskflow import test
from taskflow.test import mock from taskflow.test import mock
from taskflow.tests.unit.jobs import base from taskflow.tests.unit.jobs import base
from taskflow.tests import utils as test_utils from taskflow.tests import utils as test_utils
from taskflow.types import entity
from taskflow.utils import kazoo_utils from taskflow.utils import kazoo_utils
from taskflow.utils import misc from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
@@ -259,3 +262,34 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
}, },
'details': {}, 'details': {},
}, jsonutils.loads(misc.binary_decode(paths[path_key]['data']))) }, jsonutils.loads(misc.binary_decode(paths[path_key]['data'])))
def test_register_entity(self):
conductor_name = "conductor-abc@localhost:4123"
entity_instance = entity.Entity("conductor",
conductor_name,
{})
with base.connect_close(self.board):
self.board.register_entity(entity_instance)
# Check '.entity' node has been created
self.assertTrue(self.board.entity_path in self.client.storage.paths)
conductor_entity_path = k_paths.join(self.board.entity_path,
'conductor',
conductor_name)
self.assertTrue(conductor_entity_path in self.client.storage.paths)
conductor_data = (
self.client.storage.paths[conductor_entity_path]['data'])
self.assertTrue(len(conductor_data) > 0)
self.assertDictEqual({
'name': conductor_name,
'kind': 'conductor',
'metadata': {},
}, jsonutils.loads(misc.binary_decode(conductor_data)))
entity_instance_2 = entity.Entity("non-sense",
"other_name",
{})
with base.connect_close(self.board):
self.assertRaises(excp.NotImplementedError,
self.board.register_entity,
entity_instance_2)

33
taskflow/types/entity.py Normal file
View File

@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Rackspace Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class Entity(object):
"""Entity object(s) to be registered on jobboard.
Now only supports 'kind' of 'conductor'.
"""
def __init__(self, kind, name, metadata):
self.kind = kind
self.name = name
self.metadata = metadata
def to_dict(self):
return {
'kind': self.kind,
'name': self.name,
'metadata': self.metadata
}