Files
deb-python-taskflow/taskflow/tests/unit/persistence/test_zk_persistence.py
Changbin Liu a8f341a6c8 Implement ZooKeeper as persistence storage backend
Test cases are in taskflow/tests/unit/persistence/test_zk_persistence
and taskflow/tests/unit/persistence/test_zake_persistence. The former
requires a working ZooKeeper cluster, and the latter uses a fake
in-memory backend. Right now, due to the lack of ZooKeeper in Jenkins,
the former test is disabled. To enable it, simply uncomment the code
and change "hosts" in conf to your ZooKeeper cluster's addresses.

Also moved some shared helper functions from
taskflow/persistence/backends/impl_dir to
taskflow/utils/persistence_utils.

Implements: blueprint zk-logbook

Change-Id: Ia1b7ef8312a1761515515fd206b36be135119737
2014-01-28 23:04:55 -05:00

45 lines
1.5 KiB
Python

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2014 AT&T Labs All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from taskflow.persistence.backends import impl_zookeeper
from taskflow import test
from taskflow.tests.unit.persistence import base
@testtools.skipIf(True, 'ZooKeeper is not available in Jenkins')
class ZkPersistenceTest(test.TestCase, base.PersistenceTestMixin):
def _get_connection(self):
return self._backend.get_connection()
def setUp(self):
super(ZkPersistenceTest, self).setUp()
conf = {
'hosts': "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181",
'path': "/taskflow",
}
self._backend = impl_zookeeper.ZkBackend(conf)
conn = self._get_connection()
conn.upgrade()
def tearDown(self):
super(ZkPersistenceTest, self).tearDown()
conn = self._get_connection()
conn.clear_all()