From fdb64d9014ec9c0ff5811699cdc47ca345bd8ed9 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 8 May 2013 12:17:10 -0700 Subject: [PATCH] Move simplification over --- taskflow/__init__.py | 3 - taskflow/job.py | 21 +++- taskflow/jobboard.py | 58 ++++++++++ taskflow/jobboard/__init__.py | 26 ----- taskflow/jobboard/api.py | 95 ---------------- taskflow/jobboard/drivers/__init__.py | 17 --- taskflow/jobboard/drivers/db.py | 18 --- taskflow/jobboard/drivers/memory.py | 18 --- taskflow/jobboard/drivers/mq.py | 18 --- taskflow/jobboard/drivers/zk.py | 39 ------- taskflow/locks/__init__.py | 26 ----- taskflow/locks/api.py | 128 ---------------------- taskflow/locks/drivers/__init__.py | 17 --- taskflow/locks/drivers/db.py | 28 ----- taskflow/locks/drivers/file.py | 28 ----- taskflow/locks/drivers/memory.py | 28 ----- taskflow/locks/drivers/zk.py | 62 ----------- taskflow/logbook.py | 96 ++++++++++++++++ taskflow/logbook/__init__.py | 26 ----- taskflow/logbook/api.py | 151 -------------------------- taskflow/logbook/drivers/db.py | 17 --- taskflow/logbook/drivers/memory.py | 17 --- taskflow/logbook/drivers/zk.py | 108 ------------------ 23 files changed, 169 insertions(+), 876 deletions(-) create mode 100644 taskflow/jobboard.py delete mode 100644 taskflow/jobboard/__init__.py delete mode 100644 taskflow/jobboard/api.py delete mode 100644 taskflow/jobboard/drivers/__init__.py delete mode 100644 taskflow/jobboard/drivers/db.py delete mode 100644 taskflow/jobboard/drivers/memory.py delete mode 100644 taskflow/jobboard/drivers/mq.py delete mode 100644 taskflow/jobboard/drivers/zk.py delete mode 100644 taskflow/locks/__init__.py delete mode 100644 taskflow/locks/api.py delete mode 100644 taskflow/locks/drivers/__init__.py delete mode 100644 taskflow/locks/drivers/db.py delete mode 100644 taskflow/locks/drivers/file.py delete mode 100644 taskflow/locks/drivers/memory.py delete mode 100644 taskflow/locks/drivers/zk.py create mode 100644 taskflow/logbook.py delete mode 100644 taskflow/logbook/__init__.py delete mode 100644 taskflow/logbook/api.py delete mode 100644 taskflow/logbook/drivers/db.py delete mode 100644 taskflow/logbook/drivers/memory.py delete mode 100644 taskflow/logbook/drivers/zk.py diff --git a/taskflow/__init__.py b/taskflow/__init__.py index cd5101951..355c6409b 100644 --- a/taskflow/__init__.py +++ b/taskflow/__init__.py @@ -32,6 +32,3 @@ class Failure(object): self.name = name self.workflow = workflow self.exception = exception - - - diff --git a/taskflow/job.py b/taskflow/job.py index 887308338..81e8cad1a 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -17,6 +17,9 @@ # under the License. import abc +import uuid + +from nova CLAIMED = 'claimed' UNCLAIMED = 'unclaimed' @@ -25,17 +28,18 @@ UNCLAIMED = 'unclaimed' class Job(object): __metaclass__ = abc.ABCMeta - def __init__(self, name, type, reservation): + def __init__(self, name, type, context): self.name = name - # A link back to the reservation which - # can be used to query information about - # this job (and its subsequent - # workflows and tasks). - self.reservation = reservation # TBD - likely more details about this job self.details = None self.state = UNCLAIMED self.owner = None + self.tracking_id = str(uuid.uuid4()) + self.context = context + + def uri(self): + return "%s://%s/%s" % (self.type, self.name, + self.tracking_id) @abc.abstractproperty def type(self): @@ -48,6 +52,11 @@ class Job(object): @abc.abstractmethod def claim(self, owner): # This can be used to transition this job from unclaimed to claimed. + # + # This must be done in a way that likely uses some type of locking or + # ownership transfer so that only a single entity gets this job to work + # on. This will avoid multi-job ownership, which can lead to + # inconsistent state. raise NotImplementedError() @abc.abstractmethod diff --git a/taskflow/jobboard.py b/taskflow/jobboard.py new file mode 100644 index 000000000..13541d66d --- /dev/null +++ b/taskflow/jobboard.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + + +class JobBoard(object): + """Base class for job boards.""" + + __metaclass__ = abc.ABCMeta + + def __init__(self): + self._listeners = [] + + @abc.abstractmethod + def post(self, job): + raise NotImplementedError() + + def _notify_posted(self, job): + for i in self._listeners: + i.notify_posted(job) + + @abc.abstractmethod + def await(self, blocking=True): + raise NotImplementedError() + + def subscribe(self, listener): + self._listeners.append(listener) + + def unsubscribe(self, listener): + if listener in self._listeners: + self._listeners.remove(listener) + + def close(self): + """Allows the job board provider to free any resources that it has.""" + pass + + +class ProxyJobBoard(JobBoard): + def post(self, context, job): + + raise NotImplementedError() + diff --git a/taskflow/jobboard/__init__.py b/taskflow/jobboard/__init__.py deleted file mode 100644 index 8ae860495..000000000 --- a/taskflow/jobboard/__init__.py +++ /dev/null @@ -1,26 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -The jobboard service for Nova. Different implementations can be plugged -according to the Nova configuration. -""" - -from nova.workflow.jobboard import api - -API = api.API diff --git a/taskflow/jobboard/api.py b/taskflow/jobboard/api.py deleted file mode 100644 index f5ff74204..000000000 --- a/taskflow/jobboard/api.py +++ /dev/null @@ -1,95 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc -import contextlib - -from oslo.config import cfg - -from nova.openstack.common import importutils -from nova.openstack.common import log as logging - -LOG = logging.getLogger(__name__) -jobboard_driver_opt = cfg.StrOpt('job_board_driver', - default='memory', - help='The driver that satisfies the ' - 'job providing service ( ' - 'valid options are: db, mq, zk, memory)') - -CONF = cfg.CONF -CONF.register_opts([jobboard_driver_opt]) - - -class API(object): - - _driver = None - _driver_name_class_mapping = { - # TBD - } - - def __new__(cls, *args, **kwargs): - '''Create an instance of the lock provider API. - - args and kwargs are passed down to the job board driver when it gets - created. - ''' - if not cls._driver: - LOG.debug(_('Job board driver defined as an instance of %s'), - str(CONF.job_board_driver)) - driver_name = CONF.job_board_driver - try: - driver_class = cls._driver_name_class_mapping[driver_name] - except KeyError: - raise TypeError(_("Unknown job board driver name: %s") - % driver_name) - cls._driver = importutils.import_object(driver_class, - *args, **kwargs) - utils.check_isinstance(cls._driver, JobBoardDriver) - return super(API, cls).__new__(cls) - - -class JobBoardDriver(object): - """Base class for job board drivers.""" - - __metaclass__ = abc.ABCMeta - - def __init__(self): - self._listeners = [] - - @abc.abstractmethod - def post(self, job): - raise NotImplementedError() - - def _notify_posted(self, job): - for i in self._listeners: - i.notify_posted(job) - - @abc.abstractmethod - def await(self, blocking=True): - raise NotImplementedError() - - def subscribe(self, listener): - self._listeners.append(listener) - - def unsubscribe(self, listener): - if listener in self._listeners: - self._listeners.remove(listener) - - def close(self): - """Allows the job board provider to free any resources that it has.""" - pass diff --git a/taskflow/jobboard/drivers/__init__.py b/taskflow/jobboard/drivers/__init__.py deleted file mode 100644 index 830dd2e7c..000000000 --- a/taskflow/jobboard/drivers/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. diff --git a/taskflow/jobboard/drivers/db.py b/taskflow/jobboard/drivers/db.py deleted file mode 100644 index 27ef755aa..000000000 --- a/taskflow/jobboard/drivers/db.py +++ /dev/null @@ -1,18 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - diff --git a/taskflow/jobboard/drivers/memory.py b/taskflow/jobboard/drivers/memory.py deleted file mode 100644 index 27ef755aa..000000000 --- a/taskflow/jobboard/drivers/memory.py +++ /dev/null @@ -1,18 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - diff --git a/taskflow/jobboard/drivers/mq.py b/taskflow/jobboard/drivers/mq.py deleted file mode 100644 index 27ef755aa..000000000 --- a/taskflow/jobboard/drivers/mq.py +++ /dev/null @@ -1,18 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - diff --git a/taskflow/jobboard/drivers/zk.py b/taskflow/jobboard/drivers/zk.py deleted file mode 100644 index afb346690..000000000 --- a/taskflow/jobboard/drivers/zk.py +++ /dev/null @@ -1,39 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from kazoo import client as kazoo_client - -from nova.workflow.jobboard import api - -from oslo.config import cfg - - -CONF = cfg.CONF -CONF.import_opt('address', 'nova.servicegroup.zk', group='zookeeper') -CONF.import_opt('recv_timeout', 'nova.servicegroup.zk', group='zookeeper') - - -class JobBoard(api.JobBoardDriver): - def __init__(self): - super(JobBoard, self).__init__() - self._client = kazoo_client.KazooClient(hosts=CONF.address, - timeout=CONF.recv_timeout) - self._client.start() - - def post(self, job): - diff --git a/taskflow/locks/__init__.py b/taskflow/locks/__init__.py deleted file mode 100644 index f869bbcbe..000000000 --- a/taskflow/locks/__init__.py +++ /dev/null @@ -1,26 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -The lock provider service for Nova. Different implementations can be plugged -according to the Nova configuration. -""" - -from nova.workflow.locks import api - -API = api.API diff --git a/taskflow/locks/api.py b/taskflow/locks/api.py deleted file mode 100644 index 335767aa6..000000000 --- a/taskflow/locks/api.py +++ /dev/null @@ -1,128 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc -import contextlib - -from oslo.config import cfg - -from nova.openstack.common import importutils -from nova.openstack.common import log as logging -from nova import utils - - -LOG = logging.getLogger(__name__) -lockprovider_driver_opt = cfg.StrOpt('lock_provider_driver', - default='memory', - help='The driver that satisfies the ' - 'lock providing service (' - 'valid options are: db, file, ' - 'memory, zk)') - -CONF = cfg.CONF -CONF.register_opts([lockprovider_driver_opt]) - - -class API(object): - - _driver = None - _driver_name_class_mapping = { - # This can attempt to provide a distributed lock but extreme! care - # has to be taken to know when to expire a database lock, as well as - # extreme! care has to be taken when acquiring said lock. It is likely - # not possible to guarantee locking consistently and correctness when - # using a database to do locking. - 'db': 'nova.locks.drivers.db.LockProvider', - # This can provide per system level locks but can not provide across - # system level locks. - 'file': 'nova.locks.drivers.file.LockProvider', - # Note this driver can be used for distributed locking of resources. - 'zk': 'nova.locks.drivers.zk.LockProvider', - # This driver is pretty much only useful for testing since it can - # only provide per-process locking using greenlet/thread level locking. - 'memory': 'nova.locks.drivers.memory.LockProvider', - } - - def __new__(cls, *args, **kwargs): - '''Create an instance of the lock provider API. - - args and kwargs are passed down to the lock provider driver when it - gets created (if applicable). - ''' - if not cls._driver: - LOG.debug(_('Lock provider driver defined as an instance of %s'), - str(CONF.lock_provider_driver)) - driver_name = CONF.lock_provider_driver - try: - driver_class = cls._driver_name_class_mapping[driver_name] - except KeyError: - raise TypeError(_("Unknown lock provider driver name: %s") - % driver_name) - cls._driver = importutils.import_object(driver_class, - *args, **kwargs) - utils.check_isinstance(cls._driver, LockProvider) - return super(API, cls).__new__(cls) - - -class Lock(object): - """Base class for what a lock (distributed or local or in-between) should - provide""" - - __metaclass__ = abc.ABCMeta - - def __init__(self, resource_uri, blocking=True): - self.uri = resource_uri - self.blocking = blocking - - @abc.abstractmethod - def acquire(self): - raise NotImplementedError() - - @abc.abstractmethod - def release(self): - raise NotImplementedError() - - @abc.abstractmethod - def is_locked(self): - raise NotImplementedError() - - @abc.abstractmethod - def cancel(self): - raise NotImplementedError() - - def __enter__(self): - self.acquire() - - def __exit__(self, type, value, traceback): - self.release() - - -class LockProvider(object): - """Base class for lock provider drivers.""" - - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod - def provide(self, resource_uri, blocking=True): - """Returns a single lock object which can be used to acquire the lock - on the given resource uri""" - raise NotImplementedError() - - def close(self): - """Allows the lock provider to free any resources that it has.""" - pass diff --git a/taskflow/locks/drivers/__init__.py b/taskflow/locks/drivers/__init__.py deleted file mode 100644 index 830dd2e7c..000000000 --- a/taskflow/locks/drivers/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. diff --git a/taskflow/locks/drivers/db.py b/taskflow/locks/drivers/db.py deleted file mode 100644 index 214332b11..000000000 --- a/taskflow/locks/drivers/db.py +++ /dev/null @@ -1,28 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from nova.workflow.locks import api - - -class Lock(api.Lock): - pass - - -class LockProvider(api.LockProvider): - def provide(self, resource_uri, blocking=True): - return Lock(resource_uri, blocking) diff --git a/taskflow/locks/drivers/file.py b/taskflow/locks/drivers/file.py deleted file mode 100644 index 214332b11..000000000 --- a/taskflow/locks/drivers/file.py +++ /dev/null @@ -1,28 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from nova.workflow.locks import api - - -class Lock(api.Lock): - pass - - -class LockProvider(api.LockProvider): - def provide(self, resource_uri, blocking=True): - return Lock(resource_uri, blocking) diff --git a/taskflow/locks/drivers/memory.py b/taskflow/locks/drivers/memory.py deleted file mode 100644 index 214332b11..000000000 --- a/taskflow/locks/drivers/memory.py +++ /dev/null @@ -1,28 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from nova.workflow.locks import api - - -class Lock(api.Lock): - pass - - -class LockProvider(api.LockProvider): - def provide(self, resource_uri, blocking=True): - return Lock(resource_uri, blocking) diff --git a/taskflow/locks/drivers/zk.py b/taskflow/locks/drivers/zk.py deleted file mode 100644 index e4eaac2c8..000000000 --- a/taskflow/locks/drivers/zk.py +++ /dev/null @@ -1,62 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from kazoo import client as kazoo_client -from kazoo.recipe import lock as kazoo_lock - -from nova.workflow.locks import api - -from oslo.config import cfg - - -CONF = cfg.CONF -CONF.import_opt('address', 'nova.servicegroup.zk', group='zookeeper') -CONF.import_opt('recv_timeout', 'nova.servicegroup.zk', group='zookeeper') - - -class Lock(api.Lock): - def __init__(self, resource_uri, blocking, client): - super(Lock, self).__init__(resource_uri, blocking) - self._client = client - self._lock = kazoo_lock.Lock(client, resource_uri) - - def acquire(self): - return self._lock.acquire(self._blocking) - - def is_locked(self): - return self._lock.is_acquired - - def release(self): - return self._lock.release() - - def cancel(self): - return self._lock.cancel() - - -class LockProvider(api.LockProvider): - def __init__(self, *args, **kwargs): - self._client = kazoo_client.KazooClient(hosts=CONF.address, - timeout=CONF.recv_timeout) - self._client.start() - - def provide(self, resource_uri, blocking=True): - return Lock(self._client, resource_uri, blocking) - - def close(self): - if self._client: - self._client.stop() diff --git a/taskflow/logbook.py b/taskflow/logbook.py new file mode 100644 index 000000000..12f1694dd --- /dev/null +++ b/taskflow/logbook.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +from oslo.config import cfg + +"""Define APIs for the logbook providers.""" + +from nova.openstack.common import importutils +from nova.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +class RecordNotFound(Exception): + pass + + +class LogBook(object): + """Base class for what a logbook (distributed or local or in-between) + should provide""" + + __metaclass__ = abc.ABCMeta + + def __init__(self, resource_uri): + self.uri = resource_uri + + @abc.abstractmethod + def add_record(self, name, metadata=None): + """Atomically adds a new entry to the given logbook with the supplied + metadata (if any).""" + raise NotImplementedError() + + @abc.abstractmethod + def fetch_record(self, name): + """Fetchs a record with the given name and returns any metadata about + said record.""" + raise NotImplementedError() + + @abc.abstractmethod + def __contains__(self, name): + """Determines if any entry with the given name exists in this + logbook.""" + raise NotImplementedError() + + @abc.abstractmethod + def mark(self, name, metadata, merge_functor=None): + """Marks the given logbook entry (which must exist) with the given + metadata, if said entry already exists then the provided merge functor + or a default function, will be activated to merge the existing metadata + with the supplied metadata.""" + raise NotImplementedError() + + @abc.abstractmethod + def __iter__(self): + """Iterates over all names and metadata and provides back both of these + via a (name, metadata) tuple. The order will be in the same order that + they were added.""" + raise NotImplementedError() + + def close(self): + """Allows the job board provider to free any resources that it has.""" + pass + + +class DBLogBook(LogBook): + """Base class for a logbook impl that uses a backing database.""" + + def __init__(self, context, job): + super(DBLogBook, self).__init__(job.uri) + self.context = context + self.job = job + + def close(self): + # Free the db connection + pass + + +class MemoryLogBook(LogBook): + pass \ No newline at end of file diff --git a/taskflow/logbook/__init__.py b/taskflow/logbook/__init__.py deleted file mode 100644 index ed33d8e2b..000000000 --- a/taskflow/logbook/__init__.py +++ /dev/null @@ -1,26 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -The logbook provider service for Nova. Different implementations can be plugged -according to the Nova configuration. -""" - -from nova.workflow.logbook import api - -API = api.API diff --git a/taskflow/logbook/api.py b/taskflow/logbook/api.py deleted file mode 100644 index d130afa9e..000000000 --- a/taskflow/logbook/api.py +++ /dev/null @@ -1,151 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc - -from oslo.config import cfg - -"""Define APIs for the logbook providers.""" - -from nova.openstack.common import importutils -from nova.openstack.common import log as logging - -LOG = logging.getLogger(__name__) -logprovider_driver_opt = cfg.StrOpt('logbook_provider_driver', - default='memory', - help='The driver for that satisfies the ' - 'remote lock providing service ( ' - 'valid options are: db, memory, zk)') - -CONF = cfg.CONF -CONF.register_opts([logprovider_driver_opt]) - - -class API(object): - - _driver = None - _driver_name_class_mapping = { - # Note these drivers can be used for persistant logging. - 'zk': 'nova.logbook.drivers.zk.ZooKeeperProviderDriver', - 'db': 'nova.logbook.drivers.db.DBProviderDriver', - - # This driver is pretty much only useful for testing. - 'memory': 'nova.logbook.drivers.memory.MemoryProviderDriver', - } - - def __new__(cls, *args, **kwargs): - '''Create an instance of the logbook provider API. - - args and kwargs are passed down to the logbook provider driver when it - gets created. - ''' - if not cls._driver: - LOG.debug(_('Logbook provider driver defined as an instance of %s'), - str(CONF.logbook_provider_driver)) - driver_name = CONF.logbook_provider_driver - try: - driver_class = cls._driver_name_class_mapping[driver_name] - except KeyError: - raise TypeError(_("Unknown logbook provider driver name: %s") - % driver_name) - cls._driver = importutils.import_object(driver_class, - *args, **kwargs) - utils.check_isinstance(cls._driver, LogBookProviderDriver) - return super(API, cls).__new__(cls) - - -class RecordNotFound(Exception): - pass - - -class LogBook(object): - """Base class for what a logbook (distributed or local or in-between) - should provide""" - - __metaclass__ = abc.ABCMeta - - def __init__(self, resource_uri): - self.uri = resource_uri - - @abc.abstractmethod - def add_record(self, name, metadata=None): - """Atomically adds a new entry to the given logbook with the supplied - metadata (if any).""" - raise NotImplementedError() - - @abc.abstractmethod - def fetch_record(self, name): - """Fetchs a record with the given name and returns any metadata about - said record.""" - raise NotImplementedError() - - @abc.abstractmethod - def __contains__(self, name): - """Determines if any entry with the given name exists in this - logbook.""" - raise NotImplementedError() - - @abc.abstractmethod - def mark(self, name, metadata, merge_functor=None): - """Marks the given logbook entry (which must exist) with the given - metadata, if said entry already exists then the provided merge functor - or a default function, will be activated to merge the existing metadata - with the supplied metadata.""" - raise NotImplementedError() - - @abc.abstractmethod - def __iter__(self): - """Iterates over all names and metadata and provides back both of these - via a (name, metadata) tuple. The order will be in the same order that - they were added.""" - raise NotImplementedError() - - -class LogBookProvider(object): - """Base class for logbook provider drivers.""" - - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod - def provide(self, resource_uri): - """Returns a new (if not existent) logbook object or used logbook - object (if already existent) which can be used to determine the actions - performed on a given resource uri""" - raise NotImplementedError() - - def close(self): - """Allows the log provider to free any resources that it has.""" - pass diff --git a/taskflow/logbook/drivers/db.py b/taskflow/logbook/drivers/db.py deleted file mode 100644 index 830dd2e7c..000000000 --- a/taskflow/logbook/drivers/db.py +++ /dev/null @@ -1,17 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. diff --git a/taskflow/logbook/drivers/memory.py b/taskflow/logbook/drivers/memory.py deleted file mode 100644 index 830dd2e7c..000000000 --- a/taskflow/logbook/drivers/memory.py +++ /dev/null @@ -1,17 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. diff --git a/taskflow/logbook/drivers/zk.py b/taskflow/logbook/drivers/zk.py deleted file mode 100644 index b65dc38b3..000000000 --- a/taskflow/logbook/drivers/zk.py +++ /dev/null @@ -1,108 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from kazoo import client as kazoo_client - -from nova.openstack.common import jsonutils -from nova.workflow.logbook import api - -from oslo.config import cfg - - -CONF = cfg.CONF -CONF.import_opt('address', 'nova.servicegroup.zk', group='zookeeper') -CONF.import_opt('recv_timeout', 'nova.servicegroup.zk', group='zookeeper') - - -class LogBook(api.LogBook): - prefix = "entry-" - - def __init__(self, client, resource_uri): - super(LogBook, self).__init__(self, "%s/logbook" % (resource_uri)) - self._client = client - self._paths_made = False - self._paths = [self.uri] - - def _ensure_paths(self): - if self._paths_made: - return - for p in self._paths: - self._client.ensure_path(p) - self._paths_made = True - - def add_record(self, name, metadata=None): - self._ensure_paths() - (path, value) = self._make_storage(name, metadata) - self._client.create(path, jsonutils.dumps(value), sequence=True) - - def _make_storage(self, name, metadata): - path = "{root}/{prefix}".format(root=self.uri, prefix=self.prefix) - value = { - 'name': name, - 'metadata': metadata, - } - return (path, value) - - def __iter__(self): - for c in self._client.get_children(self.uri + "/"): - if not c.startswith(self.prefix): - continue - (value, _zk) = self._client.get("%s/%s" % (self.uri, c)) - value = jsonutils.loads(value) - yield (value['name'], value['metadata']) - - def __contains__(self, name): - for (n, metadata) in self: - if name == n: - return True - return False - - def mark(self, name, metadata, merge_func=None): - if merge_func is None: - merge_func = lambda old,new : new - for c in self._client.get_children(self.uri + "/"): - if not c.startswith(self.prefix): - continue - (value, _zk) = self._client.get("%s/%s" % (self.uri, c)) - value = jsonutils.loads(value) - if value['name'] == name: - value['metadata'] = merge_func(value['metadata'], metadata) - self._client.set("%s/%s" % (self.uri, c), - jsonutils.dumps(value)) - return - raise api.RecordNotFound() - - def fetch_record(self, name): - for n, metadata in self: - if name == n: - return metadata - raise api.RecordNotFound() - - -class LogBookProvider(api.LogBookProvider): - def __init__(self, *args, **kwargs): - self._client = kazoo_client.KazooClient(hosts=CONF.address, - timeout=CONF.recv_timeout) - self._client.start() - - def close(self): - if self._client: - self._client.stop() - - def provide(self, resource_uri): - return LogBook(self._client, resource_uri)