396 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			396 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| #    Copyright (C) 2014 AT&T Labs All Rights Reserved.
 | |
| #
 | |
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | |
| #    not use this file except in compliance with the License. You may obtain
 | |
| #    a copy of the License at
 | |
| #
 | |
| #         http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| #    Unless required by applicable law or agreed to in writing, software
 | |
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | |
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | |
| #    License for the specific language governing permissions and limitations
 | |
| #    under the License.
 | |
| 
 | |
| import contextlib
 | |
| import logging
 | |
| 
 | |
| from kazoo import exceptions as k_exc
 | |
| from kazoo.protocol import paths
 | |
| 
 | |
| from taskflow import exceptions as exc
 | |
| from taskflow.openstack.common import jsonutils
 | |
| from taskflow.persistence.backends import base
 | |
| from taskflow.persistence import logbook
 | |
| from taskflow.utils import kazoo_utils as k_utils
 | |
| from taskflow.utils import misc
 | |
| from taskflow.utils import persistence_utils as p_utils
 | |
| 
 | |
| LOG = logging.getLogger(__name__)
 | |
| 
 | |
| # Transaction support was added in 3.4.0
 | |
| MIN_ZK_VERSION = (3, 4, 0)
 | |
| 
 | |
| 
 | |
| class ZkBackend(base.Backend):
 | |
|     """ZooKeeper as backend storage implementation
 | |
| 
 | |
|     Example conf (use Kazoo):
 | |
| 
 | |
|     conf = {
 | |
|         "hosts": "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181",
 | |
|         "path": "/taskflow",
 | |
|     }
 | |
|     """
 | |
|     def __init__(self, conf, client=None):
 | |
|         super(ZkBackend, self).__init__(conf)
 | |
|         path = str(conf.get("path", "/taskflow"))
 | |
|         if not path:
 | |
|             raise ValueError("Empty zookeeper path is disallowed")
 | |
|         if not paths.isabs(path):
 | |
|             raise ValueError("Zookeeper path must be absolute")
 | |
|         self._path = path
 | |
|         if client is not None:
 | |
|             self._client = client
 | |
|             self._owned = False
 | |
|         else:
 | |
|             self._client = k_utils.make_client(conf)
 | |
|             self._owned = True
 | |
|         self._validated = False
 | |
| 
 | |
|     @property
 | |
|     def path(self):
 | |
|         return self._path
 | |
| 
 | |
|     def get_connection(self):
 | |
|         conn = ZkConnection(self, self._client)
 | |
|         if not self._validated:
 | |
|             conn.validate()
 | |
|             self._validated = True
 | |
|         return conn
 | |
| 
 | |
|     def close(self):
 | |
|         self._validated = False
 | |
|         if not self._owned:
 | |
|             return
 | |
|         try:
 | |
|             k_utils.finalize_client(self._client)
 | |
|         except (k_exc.KazooException, k_exc.ZookeeperError) as e:
 | |
|             raise exc.StorageError("Unable to finalize client", e)
 | |
| 
 | |
| 
 | |
| class ZkConnection(base.Connection):
 | |
|     def __init__(self, backend, client):
 | |
|         self._backend = backend
 | |
|         self._client = client
 | |
|         self._book_path = paths.join(self._backend.path, "books")
 | |
|         self._flow_path = paths.join(self._backend.path, "flow_details")
 | |
|         self._task_path = paths.join(self._backend.path, "task_details")
 | |
|         with self._exc_wrapper():
 | |
|             # NOOP if already started.
 | |
|             self._client.start()
 | |
| 
 | |
|     def validate(self):
 | |
|         with self._exc_wrapper():
 | |
|             k_utils.check_compatible(self._client, MIN_ZK_VERSION)
 | |
| 
 | |
|     @property
 | |
|     def backend(self):
 | |
|         return self._backend
 | |
| 
 | |
|     @property
 | |
|     def book_path(self):
 | |
|         return self._book_path
 | |
| 
 | |
|     @property
 | |
|     def flow_path(self):
 | |
|         return self._flow_path
 | |
| 
 | |
|     @property
 | |
|     def task_path(self):
 | |
|         return self._task_path
 | |
| 
 | |
|     def close(self):
 | |
|         pass
 | |
| 
 | |
|     def upgrade(self):
 | |
|         """Creates the initial paths (if they already don't exist)."""
 | |
|         with self._exc_wrapper():
 | |
|             for path in (self.book_path, self.flow_path, self.task_path):
 | |
|                 self._client.ensure_path(path)
 | |
| 
 | |
|     @contextlib.contextmanager
 | |
|     def _exc_wrapper(self):
 | |
|         """Exception wrapper which wraps kazoo exceptions and groups them
 | |
|         to taskflow exceptions.
 | |
|         """
 | |
|         try:
 | |
|             yield
 | |
|         except self._client.handler.timeout_exception as e:
 | |
|             raise exc.ConnectionFailure("Storage backend timeout: %s" % e)
 | |
|         except k_exc.SessionExpiredError as e:
 | |
|             raise exc.ConnectionFailure("Storage backend session"
 | |
|                                         " has expired: %s" % e)
 | |
|         except k_exc.NoNodeError as e:
 | |
|             raise exc.NotFound("Storage backend node not found: %s" % e)
 | |
|         except k_exc.NodeExistsError as e:
 | |
|             raise exc.AlreadyExists("Storage backend duplicate node: %s" % e)
 | |
|         except (k_exc.KazooException, k_exc.ZookeeperError) as e:
 | |
|             raise exc.StorageError("Storage backend internal error: %s" % e)
 | |
| 
 | |
|     def update_task_details(self, td):
 | |
|         """Update a task_detail transactionally."""
 | |
|         with self._exc_wrapper():
 | |
|             with self._client.transaction() as txn:
 | |
|                 return self._update_task_details(td, txn)
 | |
| 
 | |
|     def _update_task_details(self, td, txn, create_missing=False):
 | |
|         # Determine whether the desired data exists or not.
 | |
|         td_path = paths.join(self.task_path, td.uuid)
 | |
|         try:
 | |
|             td_data, _zstat = self._client.get(td_path)
 | |
|         except k_exc.NoNodeError:
 | |
|             # Not-existent: create or raise exception.
 | |
|             if create_missing:
 | |
|                 txn.create(td_path)
 | |
|                 e_td = logbook.TaskDetail(name=td.name, uuid=td.uuid)
 | |
|             else:
 | |
|                 raise exc.NotFound("No task details found with id: %s"
 | |
|                                    % td.uuid)
 | |
|         else:
 | |
|             # Existent: read it out.
 | |
|             e_td = p_utils.unformat_task_detail(td.uuid,
 | |
|                                                 misc.decode_json(td_data))
 | |
| 
 | |
|         # Update and write it back
 | |
|         e_td = p_utils.task_details_merge(e_td, td)
 | |
|         td_data = p_utils.format_task_detail(e_td)
 | |
|         txn.set_data(td_path, misc.binary_encode(jsonutils.dumps(td_data)))
 | |
|         return e_td
 | |
| 
 | |
|     def get_task_details(self, td_uuid):
 | |
|         """Read a taskdetail.
 | |
| 
 | |
|         *Read-only*, so no need of zk transaction.
 | |
|         """
 | |
|         with self._exc_wrapper():
 | |
|             return self._get_task_details(td_uuid)
 | |
| 
 | |
|     def _get_task_details(self, td_uuid):
 | |
|         td_path = paths.join(self.task_path, td_uuid)
 | |
|         try:
 | |
|             td_data, _zstat = self._client.get(td_path)
 | |
|         except k_exc.NoNodeError:
 | |
|             raise exc.NotFound("No task details found with id: %s" % td_uuid)
 | |
|         else:
 | |
|             return p_utils.unformat_task_detail(td_uuid,
 | |
|                                                 misc.decode_json(td_data))
 | |
| 
 | |
|     def update_flow_details(self, fd):
 | |
|         """Update a flowdetail transactionally."""
 | |
|         with self._exc_wrapper():
 | |
|             with self._client.transaction() as txn:
 | |
|                 return self._update_flow_details(fd, txn)
 | |
| 
 | |
|     def _update_flow_details(self, fd, txn, create_missing=False):
 | |
|         # Determine whether the desired data exists or not
 | |
|         fd_path = paths.join(self.flow_path, fd.uuid)
 | |
|         try:
 | |
|             fd_data, _zstat = self._client.get(fd_path)
 | |
|         except k_exc.NoNodeError:
 | |
|             # Not-existent: create or raise exception
 | |
|             if create_missing:
 | |
|                 txn.create(fd_path)
 | |
|                 e_fd = logbook.FlowDetail(name=fd.name, uuid=fd.uuid)
 | |
|             else:
 | |
|                 raise exc.NotFound("No flow details found with id: %s"
 | |
|                                    % fd.uuid)
 | |
|         else:
 | |
|             # Existent: read it out
 | |
|             e_fd = p_utils.unformat_flow_detail(fd.uuid,
 | |
|                                                 misc.decode_json(fd_data))
 | |
| 
 | |
|         # Update and write it back
 | |
|         e_fd = p_utils.flow_details_merge(e_fd, fd)
 | |
|         fd_data = p_utils.format_flow_detail(e_fd)
 | |
|         txn.set_data(fd_path, misc.binary_encode(jsonutils.dumps(fd_data)))
 | |
|         for td in fd:
 | |
|             td_path = paths.join(fd_path, td.uuid)
 | |
|             # NOTE(harlowja): create an entry in the flow detail path
 | |
|             # for the provided task detail so that a reference exists
 | |
|             # from the flow detail to its task details.
 | |
|             if not self._client.exists(td_path):
 | |
|                 txn.create(td_path)
 | |
|             e_fd.add(self._update_task_details(td, txn, create_missing=True))
 | |
|         return e_fd
 | |
| 
 | |
|     def get_flow_details(self, fd_uuid):
 | |
|         """Read a flowdetail.
 | |
| 
 | |
|         *Read-only*, so no need of zk transaction.
 | |
|         """
 | |
|         with self._exc_wrapper():
 | |
|             return self._get_flow_details(fd_uuid)
 | |
| 
 | |
|     def _get_flow_details(self, fd_uuid):
 | |
|         fd_path = paths.join(self.flow_path, fd_uuid)
 | |
|         try:
 | |
|             fd_data, _zstat = self._client.get(fd_path)
 | |
|         except k_exc.NoNodeError:
 | |
|             raise exc.NotFound("No flow details found with id: %s" % fd_uuid)
 | |
| 
 | |
|         fd = p_utils.unformat_flow_detail(fd_uuid, misc.decode_json(fd_data))
 | |
|         for td_uuid in self._client.get_children(fd_path):
 | |
|             fd.add(self._get_task_details(td_uuid))
 | |
|         return fd
 | |
| 
 | |
|     def save_logbook(self, lb):
 | |
|         """Save (update) a log_book transactionally."""
 | |
| 
 | |
|         def _create_logbook(lb_path, txn):
 | |
|             lb_data = p_utils.format_logbook(lb, created_at=None)
 | |
|             txn.create(lb_path, misc.binary_encode(jsonutils.dumps(lb_data)))
 | |
|             for fd in lb:
 | |
|                 # NOTE(harlowja): create an entry in the logbook path
 | |
|                 # for the provided flow detail so that a reference exists
 | |
|                 # from the logbook to its flow details.
 | |
|                 txn.create(paths.join(lb_path, fd.uuid))
 | |
|                 fd_path = paths.join(self.flow_path, fd.uuid)
 | |
|                 fd_data = jsonutils.dumps(p_utils.format_flow_detail(fd))
 | |
|                 txn.create(fd_path, misc.binary_encode(fd_data))
 | |
|                 for td in fd:
 | |
|                     # NOTE(harlowja): create an entry in the flow detail path
 | |
|                     # for the provided task detail so that a reference exists
 | |
|                     # from the flow detail to its task details.
 | |
|                     txn.create(paths.join(fd_path, td.uuid))
 | |
|                     td_path = paths.join(self.task_path, td.uuid)
 | |
|                     td_data = jsonutils.dumps(p_utils.format_task_detail(td))
 | |
|                     txn.create(td_path, misc.binary_encode(td_data))
 | |
|             return lb
 | |
| 
 | |
|         def _update_logbook(lb_path, lb_data, txn):
 | |
|             e_lb = p_utils.unformat_logbook(lb.uuid, misc.decode_json(lb_data))
 | |
|             e_lb = p_utils.logbook_merge(e_lb, lb)
 | |
|             lb_data = p_utils.format_logbook(e_lb, created_at=lb.created_at)
 | |
|             txn.set_data(lb_path, misc.binary_encode(jsonutils.dumps(lb_data)))
 | |
|             for fd in lb:
 | |
|                 fd_path = paths.join(lb_path, fd.uuid)
 | |
|                 if not self._client.exists(fd_path):
 | |
|                     # NOTE(harlowja): create an entry in the logbook path
 | |
|                     # for the provided flow detail so that a reference exists
 | |
|                     # from the logbook to its flow details.
 | |
|                     txn.create(fd_path)
 | |
|                 e_fd = self._update_flow_details(fd, txn, create_missing=True)
 | |
|                 e_lb.add(e_fd)
 | |
|             return e_lb
 | |
| 
 | |
|         with self._exc_wrapper():
 | |
|             with self._client.transaction() as txn:
 | |
|                 # Determine whether the desired data exists or not.
 | |
|                 lb_path = paths.join(self.book_path, lb.uuid)
 | |
|                 try:
 | |
|                     lb_data, _zstat = self._client.get(lb_path)
 | |
|                 except k_exc.NoNodeError:
 | |
|                     # Create a new logbook since it doesn't exist.
 | |
|                     e_lb = _create_logbook(lb_path, txn)
 | |
|                 else:
 | |
|                     # Otherwise update the existing logbook instead.
 | |
|                     e_lb = _update_logbook(lb_path, lb_data, txn)
 | |
|                 # Finally return (updated) logbook.
 | |
|                 return e_lb
 | |
| 
 | |
|     def _get_logbook(self, lb_uuid):
 | |
|         lb_path = paths.join(self.book_path, lb_uuid)
 | |
|         try:
 | |
|             lb_data, _zstat = self._client.get(lb_path)
 | |
|         except k_exc.NoNodeError:
 | |
|             raise exc.NotFound("No logbook found with id: %s" % lb_uuid)
 | |
|         else:
 | |
|             lb = p_utils.unformat_logbook(lb_uuid,
 | |
|                                           misc.decode_json(lb_data))
 | |
|             for fd_uuid in self._client.get_children(lb_path):
 | |
|                 lb.add(self._get_flow_details(fd_uuid))
 | |
|             return lb
 | |
| 
 | |
|     def get_logbook(self, lb_uuid):
 | |
|         """Read a logbook.
 | |
| 
 | |
|         *Read-only*, so no need of zk transaction.
 | |
|         """
 | |
|         with self._exc_wrapper():
 | |
|             return self._get_logbook(lb_uuid)
 | |
| 
 | |
|     def get_logbooks(self):
 | |
|         """Read all logbooks.
 | |
| 
 | |
|         *Read-only*, so no need of zk transaction.
 | |
|         """
 | |
|         with self._exc_wrapper():
 | |
|             for lb_uuid in self._client.get_children(self.book_path):
 | |
|                 yield self._get_logbook(lb_uuid)
 | |
| 
 | |
|     def destroy_logbook(self, lb_uuid):
 | |
|         """Destroy (delete) a log_book transactionally."""
 | |
| 
 | |
|         def _destroy_task_details(td_uuid, txn):
 | |
|             td_path = paths.join(self.task_path, td_uuid)
 | |
|             if not self._client.exists(td_path):
 | |
|                 raise exc.NotFound("No task details found with id: %s"
 | |
|                                    % td_uuid)
 | |
|             txn.delete(td_path)
 | |
| 
 | |
|         def _destroy_flow_details(fd_uuid, txn):
 | |
|             fd_path = paths.join(self.flow_path, fd_uuid)
 | |
|             if not self._client.exists(fd_path):
 | |
|                 raise exc.NotFound("No flow details found with id: %s"
 | |
|                                    % fd_uuid)
 | |
|             for td_uuid in self._client.get_children(fd_path):
 | |
|                 _destroy_task_details(td_uuid, txn)
 | |
|                 txn.delete(paths.join(fd_path, td_uuid))
 | |
|             txn.delete(fd_path)
 | |
| 
 | |
|         def _destroy_logbook(lb_uuid, txn):
 | |
|             lb_path = paths.join(self.book_path, lb_uuid)
 | |
|             if not self._client.exists(lb_path):
 | |
|                 raise exc.NotFound("No logbook found with id: %s" % lb_uuid)
 | |
|             for fd_uuid in self._client.get_children(lb_path):
 | |
|                 _destroy_flow_details(fd_uuid, txn)
 | |
|                 txn.delete(paths.join(lb_path, fd_uuid))
 | |
|             txn.delete(lb_path)
 | |
| 
 | |
|         with self._exc_wrapper():
 | |
|             with self._client.transaction() as txn:
 | |
|                 _destroy_logbook(lb_uuid, txn)
 | |
| 
 | |
|     def clear_all(self, delete_dirs=True):
 | |
|         """Delete all data transactioanlly."""
 | |
|         with self._exc_wrapper():
 | |
|             with self._client.transaction() as txn:
 | |
| 
 | |
|                 # Delete all data under logbook path.
 | |
|                 for lb_uuid in self._client.get_children(self.book_path):
 | |
|                     lb_path = paths.join(self.book_path, lb_uuid)
 | |
|                     for fd_uuid in self._client.get_children(lb_path):
 | |
|                         txn.delete(paths.join(lb_path, fd_uuid))
 | |
|                     txn.delete(lb_path)
 | |
| 
 | |
|                 # Delete all data under flowdetail path.
 | |
|                 for fd_uuid in self._client.get_children(self.flow_path):
 | |
|                     fd_path = paths.join(self.flow_path, fd_uuid)
 | |
|                     for td_uuid in self._client.get_children(fd_path):
 | |
|                         txn.delete(paths.join(fd_path, td_uuid))
 | |
|                     txn.delete(fd_path)
 | |
| 
 | |
|                 # Delete all data under taskdetail path.
 | |
|                 for td_uuid in self._client.get_children(self.task_path):
 | |
|                     td_path = paths.join(self.task_path, td_uuid)
 | |
|                     txn.delete(td_path)
 | |
| 
 | |
|                 # Delete containing directories.
 | |
|                 if delete_dirs:
 | |
|                     txn.delete(self.book_path)
 | |
|                     txn.delete(self.task_path)
 | |
|                     txn.delete(self.flow_path)
 | 
