Adding a unit test file for ceph_ops

This commit is contained in:
Chris Holcombe 2016-01-21 09:38:47 -08:00
parent 1977cdbde1
commit 32ff93e8d0
5 changed files with 222 additions and 12 deletions

View File

@ -1,3 +1,3 @@
__author__ = 'chris'
import sys
sys.path.append('hooks')
sys.path.append('hooks')

View File

@ -6,7 +6,9 @@ sys.path.append('hooks')
import rados
from charmhelpers.core.hookenv import log, action_get, action_fail
from charmhelpers.contrib.storage.linux.ceph import pool_set, set_pool_quota, snapshot_pool, remove_pool_snapshot
from charmhelpers.contrib.storage.linux.ceph import pool_set, \
set_pool_quota, snapshot_pool, remove_pool_snapshot
# Connect to Ceph via Librados and return a connection
def connect():
@ -79,17 +81,23 @@ def pool_stats():
def delete_pool_snapshot():
pool_name = action_get("pool-name")
snapshot_name = action_get("snapshot-name")
remove_pool_snapshot(service='ceph', pool_name=pool_name, snapshot_name=snapshot_name)
remove_pool_snapshot(service='ceph',
pool_name=pool_name,
snapshot_name=snapshot_name)
# Note only one or the other can be set
def set_pool_max_bytes():
pool_name = action_get("pool-name")
max_bytes = action_get("max")
set_pool_quota(service='ceph', pool_name=pool_name, max_bytes=max_bytes)
set_pool_quota(service='ceph',
pool_name=pool_name,
max_bytes=max_bytes)
def snapshot_ceph_pool():
pool_name = action_get("pool-name")
snapshot_name = action_get("snapshot-name")
snapshot_pool(service='ceph', pool_name=pool_name, snapshot_name=snapshot_name)
snapshot_pool(service='ceph',
pool_name=pool_name,
snapshot_name=snapshot_name)

View File

@ -3,7 +3,6 @@
# Copyright 2015 Canonical Ltd.
#
import json
import six
from charmhelpers.contrib.storage.linux.ceph import validator, \
erasure_profile_exists, ErasurePool, set_pool_quota, \
@ -18,7 +17,6 @@ from charmhelpers.core.hookenv import (
)
from charmhelpers.contrib.storage.linux.ceph import (
create_pool,
pool_exists,
delete_pool)
@ -261,7 +259,6 @@ def process_requests_v1(reqs):
# setup to use them for these operations.
svc = 'admin'
if op == "create-pool":
pool_name = req.get('name')
pool_type = req.get('pool-type') # "replicated" | "erasure"
# Default to replicated if pool_type isn't given

View File

@ -60,13 +60,17 @@ class CephBrokerTestCase(unittest.TestCase):
@mock.patch('ceph_broker.create_pool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_erasure_pool(self, mock_log, mock_pool_exists,
def test_process_requests_create_erasure_pool(self, mock_log,
mock_pool_exists,
mock_create_pool):
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'ops': [{'op': 'create-pool', 'name':
'foo', 'erasure-type': 'jerasure',
'failure-domain': 'host', 'k': 3, 'm': 2}]})
'ops': [{'op': 'create-pool',
'name': 'foo',
'erasure-type': 'jerasure',
'failure-domain': 'host',
'k': 3,
'm': 2}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_create_pool.assert_called_with(service='admin', name='foo')

201
unit_tests/test_ceph_ops.py Normal file
View File

@ -0,0 +1,201 @@
__author__ = 'chris'
import json
from hooks import ceph_broker
import mock
import unittest
class TestCephOps(unittest.TestCase):
"""
@mock.patch('ceph_broker.log')
def test_connect(self, mock_broker):
self.fail()
"""
@mock.patch('ceph_broker.log')
@mock.patch('ceph_broker.create_erasure_profile')
def test_create_erasure_profile(self, mock_create_erasure, mock_log):
req = json.dumps({'api-version': 1,
'ops': [{
'op': 'create-erasure-profile',
'name': 'foo',
'erasure-type': 'jerasure',
'failure-domain': 'rack',
'k': 3,
'm': 2,
}]})
rc = ceph_broker.process_requests(req)
mock_create_erasure.assert_called_with(service='admin',
profile_name='foo',
k=3,
m=2,
l=None,
failure_domain='rack',
erasure_plugin_name='jerasure')
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.log')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.ReplicatedPool.create')
def test_process_requests_create_replicated_pool(self,
mock_replicated_pool,
mock_pool_exists,
mock_log):
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'create-pool',
'pool-type': 'replicated',
'name': 'foo',
'replicas': 3
}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_replicated_pool.assert_called_with()
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.log')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.ErasurePool.create')
@mock.patch('ceph_broker.erasure_profile_exists')
def test_process_requests_create_erasure_pool(self, mock_profile_exists,
mock_erasure_pool,
mock_pool_exists,
mock_log):
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'create-pool',
'pool-type': 'erasure',
'name': 'foo',
'erasure-profile': 'default'
}]})
rc = ceph_broker.process_requests(reqs)
mock_profile_exists.assert_called_with(service='admin', name='default')
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_erasure_pool.assert_called_with()
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.log')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.Pool.add_cache_tier')
def test_process_requests_create_cache_tier(self, mock_pool,
mock_pool_exists, mock_log):
mock_pool_exists.return_value = True
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'create-cache-tier',
'cold-pool': 'foo',
'hot-pool': 'foo-ssd',
'mode': 'writeback',
'erasure-profile': 'default'
}]})
rc = ceph_broker.process_requests(reqs)
mock_pool_exists.assert_any_call(service='admin', name='foo')
mock_pool_exists.assert_any_call(service='admin', name='foo-ssd')
mock_pool.assert_called_with(cache_pool='foo-ssd', mode='writeback')
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.log')
@mock.patch('ceph_broker.snapshot_pool')
def test_snapshot_pool(self, mock_snapshot_pool, mock_log):
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'snapshot-pool',
'name': 'foo',
'snapshot-name': 'foo-snap1',
}]})
rc = ceph_broker.process_requests(reqs)
mock_snapshot_pool.return_value = 1
mock_snapshot_pool.assert_called_with(service='admin',
pool_name='foo',
snapshot_name='foo-snap1')
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.log')
@mock.patch('ceph_broker.remove_pool_snapshot')
def test_remove_pool_snapshot(self, mock_snapshot_pool, mock_broker):
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'remove-pool-snapshot',
'name': 'foo',
'snapshot-name': 'foo-snap1',
}]})
rc = ceph_broker.process_requests(reqs)
mock_snapshot_pool.assert_called_with(service='admin',
pool_name='foo',
snapshot_name='foo-snap1')
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.log')
@mock.patch('ceph_broker.pool_set')
def test_set_pool_value(self, mock_set_pool, mock_broker):
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'set-pool-value',
'name': 'foo',
'key': 'size',
'value': 3,
}]})
rc = ceph_broker.process_requests(reqs)
mock_set_pool.assert_called_with(service='admin',
pool_name='foo',
key='size',
value=3)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.log')
def test_set_invalid_pool_value(self, mock_broker):
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'set-pool-value',
'name': 'foo',
'key': 'size',
'value': 'abc',
}]})
rc = ceph_broker.process_requests(reqs)
# self.assertRaises(AssertionError)
self.assertEqual(json.loads(rc)['exit-code'], 1)
'''
@mock.patch('ceph_broker.log')
def test_set_invalid_pool_key(self, mock_broker):
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'set-pool-value',
'name': 'foo',
'key': 'foo',
'value': 'abc',
}]})
rc = ceph_broker.process_requests(reqs)
self.assertEqual(json.loads(rc)['exit-code'], 1)
@mock.patch('ceph_broker.log')
def test_list_pools(self, mock_broker):
self.fail()
@mock.patch('ceph_broker.log')
def test_pool_get(self, mock_broker):
self.fail()
@mock.patch('ceph_broker.log')
def test_pool_stats(self, mock_broker):
self.fail()
@mock.patch('ceph_broker.log')
def test_rename_pool(self, mock_broker):
self.fail()
@mock.patch('ceph_broker.log')
def test_set_pool_max_bytes(self, mock_broker):
self.fail()
'''
if __name__ == '__main__':
unittest.main()