Refactor unit test about sqlalchemy for supporting v1

Change-Id: I119155b8ac5fc7be6e5ef8b1db13df38595e64c4
This commit is contained in:
gengchc2 2018-12-20 17:58:50 -08:00
parent 4048106834
commit 2dc322497e
12 changed files with 1681 additions and 6 deletions

View File

@ -0,0 +1,416 @@
# (c) Copyright 2018 ZTE Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for manipulating Action via the DB API"""
import copy
from oslo_config import cfg
from freezer_api.tests.unit import common
from freezer_api.tests.unit.sqlalchemy import base
CONF = cfg.CONF
class DbActionTestCase(base.DbTestCase):
def setUp(self):
super(DbActionTestCase, self).setUp()
self.fake_action_0 = common.get_fake_action_0()
self.fake_action_2 = common.get_fake_action_2()
self.fake_action_3 = common.get_fake_action_3()
self.freezer_action_0 = self.fake_action_0.get('freezer_action')
self.freezer_action_2 = self.fake_action_2.get('freezer_action')
self.fake_action_id = common.get_fake_action_id()
CONF.enable_v1_api = True
def tearDown(self):
super(DbActionTestCase, self).tearDown()
CONF.enable_v1_api = False
def test_add_and_get_action(self):
action_doc = copy.deepcopy(self.fake_action_0)
action_id = self.dbapi.add_action(user_id=self.fake_action_0.
get('user_id'),
doc=action_doc)
self.assertIsNotNone(action_id)
result = self.dbapi.get_action(user_id=self.fake_action_0.
get('user_id'),
action_id=action_id)
self.assertIsNotNone(result)
self.assertEqual(result.get('max_retries'),
self.fake_action_0.get('max_retries'))
self.assertEqual(result.get('max_retries_interval'),
self.fake_action_0.get('max_retries_interval'))
freezer_action = result.get('freezer_action')
self.assertEqual(freezer_action.get('action'),
self.freezer_action_0.get('action'))
self.assertEqual(freezer_action.get('backup_name'),
self.freezer_action_0.get('backup_name'))
self.assertEqual(freezer_action.get('container'),
self.freezer_action_0.get('container'))
self.assertEqual(freezer_action.get('src_file'),
self.freezer_action_0.get('src_file'))
self.assertEqual(freezer_action.get('mode'),
self.freezer_action_0.get('mode'))
def test_add_and_delete_action(self):
action_doc = copy.deepcopy(self.fake_action_0)
action_id = self.dbapi.add_action(user_id=self.fake_action_0.
get('user_id'),
doc=action_doc)
self.assertIsNotNone(action_id)
result = self.dbapi.delete_action(user_id=self.fake_action_0.
get('user_id'),
action_id=action_id)
self.assertIsNotNone(result)
self.assertEqual(result, action_id)
result = self.dbapi.get_action(user_id=self.fake_action_0.
get('user_id'),
action_id=action_id)
self.assertEqual(len(result), 0)
def test_add_and_update_action(self):
action_doc = copy.deepcopy(self.fake_action_0)
action_id = self.dbapi.add_action(user_id=self.fake_action_0.
get('user_id'),
doc=action_doc)
self.assertIsNotNone(action_id)
patch_doc = copy.deepcopy(self.fake_action_2)
result = self.dbapi.update_action(user_id=self.fake_action_2.
get('user_id'),
patch_doc=patch_doc,
action_id=action_id)
self.assertIsNotNone(result)
self.assertEqual(result, action_id)
result = self.dbapi.get_action(user_id=self.fake_action_2.
get('user_id'),
action_id=action_id)
self.assertEqual(result.get('max_retries'),
self.fake_action_2.get('max_retries'))
self.assertEqual(result.get('max_retries_interval'),
self.fake_action_2.get('max_retries_interval'))
freezer_action = result.get('freezer_action')
self.assertEqual(freezer_action.get('action'),
self.freezer_action_2.get('action'))
def test_add_and_replace_action(self):
action_doc = copy.deepcopy(self.fake_action_0)
action_id = self.dbapi.add_action(user_id=self.fake_action_0.
get('user_id'),
doc=action_doc)
self.assertIsNotNone(action_id)
patch_doc = copy.deepcopy(self.fake_action_2)
result = self.dbapi.replace_action(user_id=self.fake_action_2.
get('user_id'),
doc=patch_doc,
action_id=action_id)
self.assertIsNotNone(result)
self.assertEqual(result, action_id)
result = self.dbapi.get_action(user_id=self.fake_action_2.
get('user_id'),
action_id=action_id)
self.assertEqual(result.get('max_retries'),
self.fake_action_2.get('max_retries'))
self.assertEqual(result.get('max_retries_interval'),
self.fake_action_2.get('max_retries_interval'))
freezer_action = result.get('freezer_action')
self.assertEqual(freezer_action.get('action'),
self.freezer_action_2.get('action'))
patch_doc1 = copy.deepcopy(self.fake_action_0)
result = self.dbapi.replace_action(user_id=self.fake_action_2.
get('user_id'),
doc=patch_doc1,
action_id=self.fake_action_id)
self.assertIsNotNone(result)
result = self.dbapi.get_action(user_id=self.fake_action_2.
get('user_id'),
action_id=self.fake_action_id)
self.assertEqual(result.get('action_id'), self.fake_action_id)
def test_add_and_search_action(self):
count = 0
actionids = []
while(count < 20):
doc = copy.deepcopy(self.fake_action_3)
action_id = common.get_fake_action_id()
doc['action_id'] = action_id
result = self.dbapi.add_action(user_id=self.fake_action_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(result)
self.assertEqual(result, action_id)
actionids.append(action_id)
count += 1
result = self.dbapi.search_action(user_id=self.fake_action_3.
get('user_id'),
limit=10,
offset=0)
self.assertIsNotNone(result)
self.assertEqual(len(result), 10)
for index in range(len(result)):
actionmap = result[index]
self.assertEqual(actionids[index], actionmap['action_id'])
def test_action_list_with_search_match_and_match_not(self):
count = 0
actionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_action_3)
action_id = common.get_fake_action_id()
doc['action_id'] = action_id
if count in [0, 4, 8, 12, 16]:
doc['max_retries'] = 10
if count in [4, 12]:
doc['freezer_action']['mode'] = 'nova'
result = self.dbapi.add_action(user_id=self.fake_action_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(result)
self.assertEqual(result, action_id)
actionids.append(action_id)
count += 1
search_opt = {'match_not': [{'mode': 'nova'}],
'match': [{'max_retries': 10}]}
result = self.dbapi.search_action(user_id=self.fake_action_3.
get('user_id'),
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 3)
for index in range(len(result)):
actionmap = result[index]
self.assertEqual(10, actionmap['max_retries'])
self.assertEqual('fs',
actionmap['freezer_action']['mode'])
def test_action_list_with_search_match_list(self):
count = 0
actionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_action_3)
action_id = common.get_fake_action_id()
doc['action_id'] = action_id
if count in [0, 4, 8, 12, 16]:
doc['max_retries'] = 10
if count in [4, 12]:
doc['freezer_action']['mode'] = 'nova'
result = self.dbapi.add_action(user_id=self.fake_action_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(result)
self.assertEqual(result, action_id)
actionids.append(action_id)
count += 1
search_opt = {'match': [{'max_retries': 10},
{'mode': 'nova'}]}
result = self.dbapi.search_action(user_id=self.fake_action_3.
get('user_id'),
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
for index in range(len(result)):
actionmap = result[index]
self.assertEqual(10, actionmap['max_retries'])
self.assertEqual('nova',
actionmap['freezer_action']['mode'])
def test_action_list_with_search_match_not_list(self):
count = 0
actionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_action_3)
action_id = common.get_fake_action_id()
doc['action_id'] = action_id
if count in [0, 4, 8, 12, 16]:
doc['max_retries'] = 10
if count in [4, 12]:
doc['freezer_action']['mode'] = 'nova'
result = self.dbapi.add_action(user_id=self.fake_action_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(result)
self.assertEqual(result, action_id)
actionids.append(action_id)
count += 1
search_opt = {'match_not':
[{'mode': 'nova'},
{'max_retries': 5}]}
result = self.dbapi.search_action(user_id=self.fake_action_3.
get('user_id'),
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 3)
for index in range(len(result)):
actionmap = result[index]
self.assertEqual(10, actionmap['max_retries'])
self.assertEqual('fs',
actionmap['freezer_action']['mode'])
def test_action_list_with_search_with_all_opt_one_match(self):
count = 0
actionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_action_3)
action_id = common.get_fake_action_id()
doc['action_id'] = action_id
if count in [0, 4, 8, 12, 16]:
doc['max_retries'] = 10
result = self.dbapi.add_action(user_id=self.fake_action_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(result)
self.assertEqual(result, action_id)
actionids.append(action_id)
count += 1
search_opt = {'match': [{'_all': '[{"max_retries": 10}]'}]}
result = self.dbapi.search_action(user_id=self.fake_action_3.
get('user_id'),
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 5)
for index in range(len(result)):
actionmap = result[index]
self.assertEqual(10, actionmap['max_retries'])
def test_action_list_with_search_with_all_opt_two_matchs(self):
count = 0
actionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_action_3)
action_id = common.get_fake_action_id()
doc['action_id'] = action_id
if count in [0, 4, 8, 12, 16]:
doc['max_retries'] = 10
if count in [4, 12]:
doc['freezer_action']['mode'] = 'nova'
result = self.dbapi.add_action(user_id=self.fake_action_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(result)
self.assertEqual(result, action_id)
actionids.append(action_id)
count += 1
search_opt = {'match':
[{'_all':
'[{"max_retries": 10}, '
'{"mode": "nova"}]'}]}
result = self.dbapi.search_action(user_id=self.fake_action_3.
get('user_id'),
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
for index in range(len(result)):
actionmap = result[index]
self.assertEqual(10, actionmap['max_retries'])
self.assertEqual('nova',
actionmap['freezer_action']['mode'])
def test_action_list_with_search_with_error_all_opt_return_alltuples(self):
count = 0
actionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_action_3)
action_id = common.get_fake_action_id()
doc['action_id'] = action_id
if count in [0, 4, 8, 12, 16]:
doc['max_retries'] = 10
result = self.dbapi.add_action(user_id=self.fake_action_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(result)
self.assertEqual(result, action_id)
actionids.append(action_id)
count += 1
search_opt = {'match': [{'_all': '{"max_retries": 10}'}]}
result = self.dbapi.search_action(user_id=self.fake_action_3.
get('user_id'),
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 20)
search_opt = {'match': [{'_all': 'max_retries=10'}]}
result = self.dbapi.search_action(user_id=self.fake_action_3.
get('user_id'),
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 20)

View File

@ -0,0 +1,108 @@
# (c) Copyright 2018 ZTE Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for manipulating Backup via the DB API"""
import copy
from oslo_config import cfg
from freezer_api.tests.unit import common
from freezer_api.tests.unit.sqlalchemy import base
CONF = cfg.CONF
class DbBackupTestCase(base.DbTestCase):
def setUp(self):
super(DbBackupTestCase, self).setUp()
self.fake_backup_metadata = common.get_fake_backup_metadata()
self.fake_user_id = common.fake_data_0_user_id
self.fake_user_name = common.fake_data_0_user_name
CONF.enable_v1_api = True
def tearDown(self):
super(DbBackupTestCase, self).tearDown()
CONF.enable_v1_api = False
def test_add_and_get_backup(self):
backup_doc = copy.deepcopy(self.fake_backup_metadata)
backup_id = self.dbapi.add_backup(user_id=self.fake_user_id,
user_name=self.fake_user_name,
doc=backup_doc)
self.assertIsNotNone(backup_id)
result = self.dbapi.get_backup(user_id=self.fake_user_id,
backup_id=backup_id)
self.assertIsNotNone(result)
self.assertEqual(result.get('user_name'),
self.fake_user_name)
self.assertEqual(result.get('client_id'),
self.fake_backup_metadata.get('client_id'))
self.assertEqual(result.get('user_id'),
self.fake_user_id)
backup_metadata = result.get('backup_metadata')
self.assertEqual(backup_metadata,
self.fake_backup_metadata)
def test_add_and_delete_backup(self):
backup_doc = copy.deepcopy(self.fake_backup_metadata)
backup_id = self.dbapi.add_backup(user_id=self.fake_user_id,
user_name=self.fake_user_name,
doc=backup_doc)
self.assertIsNotNone(backup_id)
result = self.dbapi.delete_backup(user_id=self.fake_user_id,
backup_id=backup_id)
self.assertIsNotNone(result)
self.assertEqual(result, backup_id)
result = self.dbapi.get_backup(user_id=self.fake_user_id,
backup_id=backup_id)
self.assertEqual(len(result), 0)
def test_add_and_search_backup(self):
count = 0
backupids = []
while (count < 20):
backup_doc = copy.deepcopy(self.fake_backup_metadata)
backup_id = self.dbapi.add_backup(user_id=self.fake_user_id,
user_name=self.fake_user_name,
doc=backup_doc)
self.assertIsNotNone(backup_id)
backupids.append(backup_id)
count += 1
result = self.dbapi.search_backup(user_id=self.fake_user_id,
limit=10,
offset=0)
self.assertIsNotNone(result)
self.assertEqual(len(result), 10)
for index in range(len(result)):
backupmap = result[index]
self.assertEqual(backupids[index], backupmap['backup_id'])

View File

@ -0,0 +1,330 @@
# (c) Copyright 2018 ZTE Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for manipulating Client via the DB API"""
import copy
from oslo_config import cfg
from freezer_api.tests.unit import common
from freezer_api.tests.unit.sqlalchemy import base
CONF = cfg.CONF
class DbClientTestCase(base.DbTestCase):
def setUp(self):
super(DbClientTestCase, self).setUp()
self.fake_client_0 = common.get_fake_client_0()
self.fake_client_doc = self.fake_client_0.get('client')
self.fake_user_id = self.fake_client_0.get('user_id')
CONF.enable_v1_api = True
def tearDown(self):
super(DbClientTestCase, self).tearDown()
CONF.enable_v1_api = False
def test_add_and_get_client(self):
client_doc = copy.deepcopy(self.fake_client_doc)
client_id = self.dbapi.add_client(user_id=self.fake_user_id,
doc=client_doc)
self.assertIsNotNone(client_id)
result = self.dbapi.get_client(user_id=self.fake_user_id,
client_id=client_id)
self.assertIsNotNone(result)
self.assertEqual(len(result), 1)
self.assertEqual(result[0].get('user_id'),
self.fake_user_id)
client = result[0].get('client')
self.assertEqual(client.get('client_id'),
self.fake_client_doc.get('client_id'))
self.assertEqual(client.get('description'),
self.fake_client_doc.get('description'))
def test_add_and_delete_client(self):
client_doc = copy.deepcopy(self.fake_client_doc)
client_id = self.dbapi.add_client(user_id=self.fake_user_id,
doc=client_doc)
self.assertIsNotNone(client_id)
result = self.dbapi.delete_client(user_id=self.fake_user_id,
client_id=client_id)
self.assertIsNotNone(result)
self.assertEqual(result, client_id)
result = self.dbapi.get_client(user_id=self.fake_user_id,
client_id=client_id)
self.assertEqual(len(result), 0)
def test_add_and_search_client(self):
count = 0
clientids = []
while (count < 20):
client_doc = copy.deepcopy(self.fake_client_doc)
clientid = common.get_fake_client_id()
client_doc['client_id'] = clientid
client_id = self.dbapi.add_client(user_id=self.fake_user_id,
doc=client_doc)
self.assertIsNotNone(client_id)
self.assertEqual(clientid, client_id)
clientids.append(client_id)
count += 1
result = self.dbapi.get_client(user_id=self.fake_user_id,
limit=10,
offset=0)
self.assertIsNotNone(result)
self.assertEqual(len(result), 10)
for index in range(len(result)):
clientmap = result[index]
clientid = clientmap['client'].get('client_id')
self.assertEqual(clientids[index], clientid)
def test_add_and_search_client_with_search_match_and_match_not(self):
count = 0
clientids = []
while (count < 20):
client_doc = copy.deepcopy(self.fake_client_doc)
clientid = common.get_fake_client_id()
client_doc['client_id'] = clientid
client_doc['hostname'] = "node1"
if count in [0, 4, 8, 12, 16]:
client_doc['description'] = "tecs"
if count in [4, 12]:
client_doc['hostname'] = 'node2'
client_id = self.dbapi.add_client(user_id=self.fake_user_id,
doc=client_doc)
self.assertIsNotNone(client_id)
self.assertEqual(clientid, client_id)
clientids.append(client_id)
count += 1
search_opt = {'match_not': [{'hostname': 'node2'}],
'match': [{'description': 'tecs'}]}
result = self.dbapi.get_client(user_id=self.fake_user_id,
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 3)
for index in range(len(result)):
clientmap = result[index]
hostname = clientmap['client'].get('hostname')
description = clientmap['client'].get('description')
self.assertEqual('node1', hostname)
self.assertEqual('tecs', description)
def test_add_and_search_client_with_search_match_list(self):
count = 0
clientids = []
while (count < 20):
client_doc = copy.deepcopy(self.fake_client_doc)
clientid = common.get_fake_client_id()
client_doc['client_id'] = clientid
client_doc['hostname'] = "node1"
if count in [0, 4, 8, 12, 16]:
client_doc['description'] = "tecs"
if count in [4, 12]:
client_doc['hostname'] = 'node2'
client_id = self.dbapi.add_client(user_id=self.fake_user_id,
doc=client_doc)
self.assertIsNotNone(client_id)
self.assertEqual(clientid, client_id)
clientids.append(client_id)
count += 1
search_opt = {'match': [{'hostname': 'node2'},
{'description': 'tecs'}]}
result = self.dbapi.get_client(user_id=self.fake_user_id,
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
for index in range(len(result)):
clientmap = result[index]
hostname = clientmap['client'].get('hostname')
description = clientmap['client'].get('description')
self.assertEqual('node2', hostname)
self.assertEqual('tecs', description)
def test_add_and_search_client_with_search_match_not_list(self):
count = 0
clientids = []
while (count < 20):
client_doc = copy.deepcopy(self.fake_client_doc)
clientid = common.get_fake_client_id()
client_doc['client_id'] = clientid
client_doc['hostname'] = "node1"
if count in [0, 4, 8, 12, 16]:
client_doc['description'] = "tecs"
if count in [4, 12]:
client_doc['hostname'] = 'node2'
client_id = self.dbapi.add_client(user_id=self.fake_user_id,
doc=client_doc)
self.assertIsNotNone(client_id)
self.assertEqual(clientid, client_id)
clientids.append(client_id)
count += 1
search_opt = {'match_not': [{'hostname': 'node2'},
{'description': 'some usefule text here'}]}
result = self.dbapi.get_client(user_id=self.fake_user_id,
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 3)
for index in range(len(result)):
clientmap = result[index]
hostname = clientmap['client'].get('hostname')
description = clientmap['client'].get('description')
self.assertEqual('node1', hostname)
self.assertEqual('tecs', description)
def test_add_and_search_client_with_all_opt_one_match(self):
count = 0
clientids = []
while (count < 20):
client_doc = copy.deepcopy(self.fake_client_doc)
clientid = common.get_fake_client_id()
client_doc['client_id'] = clientid
client_doc['hostname'] = "node1"
if count in [0, 4, 8, 12, 16]:
client_doc['description'] = "tecs"
client_id = self.dbapi.add_client(user_id=self.fake_user_id,
doc=client_doc)
self.assertIsNotNone(client_id)
self.assertEqual(clientid, client_id)
clientids.append(client_id)
count += 1
search_opt = {'match': [{'_all': '[{"description": "tecs"}]'}]}
result = self.dbapi.get_client(user_id=self.fake_user_id,
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 5)
for index in range(len(result)):
clientmap = result[index]
description = clientmap['client'].get('description')
self.assertEqual('tecs', description)
def test_add_and_search_client_with_all_opt_two_match(self):
count = 0
clientids = []
while (count < 20):
client_doc = copy.deepcopy(self.fake_client_doc)
clientid = common.get_fake_client_id()
client_doc['client_id'] = clientid
client_doc['hostname'] = "node1"
if count in [0, 4, 8, 12, 16]:
client_doc['hostname'] = "node2"
if count in [4, 12]:
client_doc['description'] = "tecs"
client_id = self.dbapi.add_client(user_id=self.fake_user_id,
doc=client_doc)
self.assertIsNotNone(client_id)
self.assertEqual(clientid, client_id)
clientids.append(client_id)
count += 1
search_opt = {'match':
[{'_all':
'[{"description": "tecs"}, '
'{"hostname": "node2"}]'}]}
result = self.dbapi.get_client(user_id=self.fake_user_id,
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
for index in range(len(result)):
clientmap = result[index]
description = clientmap['client'].get('description')
hostname = clientmap['client'].get('hostname')
self.assertEqual('tecs', description)
self.assertEqual('node2', hostname)
def test_add_and_search_client_with_error_all_opt_return_alltuples(self):
count = 0
clientids = []
while (count < 20):
client_doc = copy.deepcopy(self.fake_client_doc)
clientid = common.get_fake_client_id()
client_doc['client_id'] = clientid
client_doc['hostname'] = "node1"
if count in [0, 4, 8, 12, 16]:
client_doc['hostname'] = "node2"
client_id = self.dbapi.add_client(user_id=self.fake_user_id,
doc=client_doc)
self.assertIsNotNone(client_id)
self.assertEqual(clientid, client_id)
clientids.append(client_id)
count += 1
search_opt = {'match': [{'_all': '{"hostname": "node2"}'}]}
result = self.dbapi.get_client(user_id=self.fake_user_id,
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 20)
search_opt = {'match': [{'_all': 'hostname=node2'}]}
result = self.dbapi.get_client(user_id=self.fake_user_id,
limit=20,
offset=0,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 20)

View File

@ -0,0 +1,405 @@
# -*- encoding: utf-8 -*-
# (c) Copyright 2018 ZTE Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for manipulating job via the DB API"""
import copy
from oslo_config import cfg
from freezer_api.tests.unit import common
from freezer_api.tests.unit.sqlalchemy import base
CONF = cfg.CONF
class DbJobTestCase(base.DbTestCase):
def setUp(self):
super(DbJobTestCase, self).setUp()
self.fake_job_0 = common.get_fake_job_0()
self.fake_job_0.pop('job_id')
self.fake_job_2 = common.get_fake_job_2()
self.fake_job_2.pop('job_id')
self.fake_job_3 = common.get_fake_job_3()
self.fake_job_3.pop('job_id')
self.fake_job_id = common.get_fake_job_id()
CONF.enable_v1_api = True
def tearDown(self):
super(DbJobTestCase, self).tearDown()
CONF.enable_v1_api = False
def test_add_and_get_job(self):
job_doc = copy.deepcopy(self.fake_job_0)
job_id = self.dbapi.add_job(user_id=self.fake_job_0.get('user_id'),
doc=job_doc)
self.assertIsNotNone(job_id)
result = self.dbapi.get_job(user_id=self.fake_job_0.get('user_id'),
job_id=job_id)
self.assertIsNotNone(result)
self.assertEqual(result.get('client_id'),
self.fake_job_0.get('client_id'))
self.assertEqual(result.get('description'),
self.fake_job_0.get('description'))
self.assertEqual(result.get('job_actions'),
self.fake_job_0.get('job_actions'))
self.assertEqual(result.get('user_id'),
self.fake_job_0.get('user_id'))
self.assertEqual(result.get('job_schedule').get('status'),
self.fake_job_0.get('job_schedule').get('status'))
self.assertEqual(result.get('job_schedule').get('result'),
self.fake_job_0.get('job_schedule').get('result'))
self.assertEqual(result.get('job_schedule').get('schedule_date'),
self.fake_job_0.get('job_schedule').
get('schedule_date'))
self.assertEqual(result.get('job_schedule').
get('schedule_interval'),
self.fake_job_0.get('job_schedule').
get('schedule_interval'))
def test_add_and_delete_job(self):
job_doc = copy.deepcopy(self.fake_job_0)
job_id = self.dbapi.add_job(user_id=self.fake_job_0.get('user_id'),
doc=job_doc)
self.assertIsNotNone(job_id)
result = self.dbapi.delete_job(user_id=self.fake_job_0.get('user_id'),
job_id=job_id)
self.assertIsNotNone(result)
self.assertEqual(result, job_id)
result = self.dbapi.get_job(user_id=self.fake_job_0.get('user_id'),
job_id=job_id)
self.assertEqual(len(result), 0)
def test_add_and_update_job(self):
job_doc = copy.deepcopy(self.fake_job_0)
job_id = self.dbapi.add_job(user_id=self.fake_job_0.get('user_id'),
doc=job_doc)
self.assertIsNotNone(job_id)
patch_doc = copy.deepcopy(self.fake_job_2)
result = self.dbapi.update_job(user_id=self.fake_job_2.get('user_id'),
job_id=job_id,
patch_doc=patch_doc)
self.assertIsNotNone(result)
self.assertEqual(result, 0)
result = self.dbapi.get_job(user_id=self.fake_job_0.get('user_id'),
job_id=job_id)
self.assertIsNotNone(result)
self.assertEqual(result.get('client_id'),
self.fake_job_2.get('client_id'))
self.assertEqual(result.get('description'),
self.fake_job_2.get('description'))
self.assertEqual(result.get('job_schedule').
get('schedule_interval'),
self.fake_job_2.get('job_schedule').
get('schedule_interval'))
self.assertEqual(result.get('job_actions'),
self.fake_job_2.get('job_actions'))
def test_add_and_replace_job(self):
job_doc = copy.deepcopy(self.fake_job_0)
job_id = self.dbapi.add_job(user_id=self.fake_job_0.get('user_id'),
doc=job_doc)
self.assertIsNotNone(job_id)
patch_doc = copy.deepcopy(self.fake_job_2)
result = self.dbapi.replace_job(user_id=self.
fake_job_2.get('user_id'),
job_id=job_id,
doc=patch_doc)
self.assertIsNotNone(result)
self.assertEqual(result, job_id)
result = self.dbapi.get_job(user_id=self.fake_job_2.get('user_id'),
job_id=job_id)
self.assertIsNotNone(result)
self.assertEqual(result.get('client_id'),
self.fake_job_2.get('client_id'))
self.assertEqual(result.get('description'),
self.fake_job_2.get('description'))
self.assertEqual(result.get('job_schedule').
get('schedule_interval'),
self.fake_job_2.get('job_schedule').
get('schedule_interval'))
self.assertEqual(result.get('job_actions'),
self.fake_job_2.get('job_actions'))
patch_doc1 = copy.deepcopy(self.fake_job_0)
result = self.dbapi.replace_job(user_id=self.
fake_job_2.get('user_id'),
job_id=self.fake_job_id,
doc=patch_doc1)
self.assertIsNotNone(result)
result = self.dbapi.get_job(user_id=self.fake_job_2.get('user_id'),
job_id=self.fake_job_id)
self.assertEqual(result.get('job_id'), self.fake_job_id)
def test_job_list_without_search(self):
count = 0
jobids = []
while (count < 20):
doc = copy.deepcopy(self.fake_job_3)
job_id = self.dbapi.add_job(user_id=self.fake_job_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(job_id)
jobids.append(job_id)
count += 1
result = self.dbapi.search_job(user_id=self.fake_job_3.
get('user_id'),
offset=0,
limit=10)
self.assertIsNotNone(result)
self.assertEqual(len(result), 10)
for index in range(len(result)):
jobmap = result[index]
self.assertEqual(jobids[index], jobmap['job_id'])
def test_job_list_with_search_match_and_match_not(self):
count = 0
jobids = []
while (count < 20):
doc = copy.deepcopy(self.fake_job_3)
if count in [0, 4, 8, 12, 16]:
doc['client_id'] = "node1"
if count in [4, 12]:
doc['job_schedule']['schedule_interval'] = '10 days'
job_id = self.dbapi.add_job(user_id=self.fake_job_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(job_id)
jobids.append(job_id)
count += 1
search_opt = {'match_not': [{'schedule_interval': '10 days'}],
'match': [{'client_id': 'node1'}]}
result = self.dbapi.search_job(user_id=self.fake_job_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 3)
for index in range(len(result)):
jobmap = result[index]
self.assertEqual('node1', jobmap['client_id'])
self.assertEqual('14 days',
jobmap['job_schedule']['schedule_interval'])
def test_job_list_with_search_match_list(self):
count = 0
jobids = []
while (count < 20):
doc = copy.deepcopy(self.fake_job_3)
if count in [0, 4, 8, 12, 16]:
doc['client_id'] = "node1"
if count in [4, 12]:
doc['job_schedule']['schedule_interval'] = '10 days'
job_id = self.dbapi.add_job(user_id=self.fake_job_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(job_id)
jobids.append(job_id)
count += 1
search_opt = {'match': [{'client_id': 'node1'},
{'schedule_interval': '10 days'}]}
result = self.dbapi.search_job(user_id=self.fake_job_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
for index in range(len(result)):
jobmap = result[index]
self.assertEqual('node1', jobmap['client_id'])
self.assertEqual('10 days',
jobmap['job_schedule']['schedule_interval'])
def test_job_list_with_search_match_not_list(self):
count = 0
jobids = []
while (count < 20):
doc = copy.deepcopy(self.fake_job_3)
if count in [0, 4, 8, 12, 16]:
doc['client_id'] = "node1"
if count in [4, 12]:
doc['job_schedule']['schedule_interval'] = '10 days'
job_id = self.dbapi.add_job(user_id=self.fake_job_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(job_id)
jobids.append(job_id)
count += 1
search_opt = {'match_not':
[{'schedule_interval': '10 days'},
{'client_id': 'mytenantid_myhostname2'}]}
result = self.dbapi.search_job(user_id=self.fake_job_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 3)
for index in range(len(result)):
jobmap = result[index]
self.assertEqual('node1', jobmap['client_id'])
self.assertEqual('14 days',
jobmap['job_schedule']['schedule_interval'])
def test_job_list_with_search_with_all_opt_one_match(self):
count = 0
jobids = []
while (count < 20):
doc = copy.deepcopy(self.fake_job_3)
if count in [0, 4, 8, 12, 16]:
doc['client_id'] = "node1"
job_id = self.dbapi.add_job(user_id=self.fake_job_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(job_id)
jobids.append(job_id)
count += 1
search_opt = {'match': [{'_all': '[{"client_id": "node1"}]'}]}
result = self.dbapi.search_job(user_id=self.fake_job_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 5)
for index in range(len(result)):
jobmap = result[index]
self.assertEqual('node1', jobmap['client_id'])
def test_job_list_with_search_with_all_opt_two_matches(self):
count = 0
jobids = []
while (count < 20):
doc = copy.deepcopy(self.fake_job_3)
if count in [0, 4, 8, 12, 16]:
doc['client_id'] = "node1"
if count in [4, 12]:
doc['job_schedule']['schedule_interval'] = '10 days'
job_id = self.dbapi.add_job(user_id=self.fake_job_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(job_id)
jobids.append(job_id)
count += 1
search_opt = {'match':
[{'_all':
'[{"client_id": "node1"}, '
'{"schedule_interval": "10 days"}]'}]}
result = self.dbapi.search_job(user_id=self.fake_job_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
for index in range(len(result)):
jobmap = result[index]
self.assertEqual('node1', jobmap['client_id'])
self.assertEqual('10 days',
jobmap['job_schedule']['schedule_interval'])
def test_job_list_with_search_with_error_all_opt_return_alltuples(self):
count = 0
jobids = []
while (count < 20):
doc = copy.deepcopy(self.fake_job_3)
if count in [0, 4, 8, 12, 16]:
doc['client_id'] = "node1"
job_id = self.dbapi.add_job(user_id=self.fake_job_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(job_id)
jobids.append(job_id)
count += 1
search_opt = {'match': [{'_all': '{"client_id": "node1"}'}]}
result = self.dbapi.search_job(user_id=self.fake_job_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 20)
search_opt = {'match': [{'_all': 'client_id=node1'}]}
result = self.dbapi.search_job(user_id=self.fake_job_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 20)
def test_job_list_with_search_and_offset_and_limit(self):
count = 0
jobids = []
while (count < 20):
doc = copy.deepcopy(self.fake_job_3)
if count in [0, 4, 8, 12, 16]:
doc['client_id'] = "node1"
job_id = self.dbapi.add_job(user_id=self.fake_job_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(job_id)
jobids.append(job_id)
count += 1
# There are 5 records.
search_opt = {'match': [{'_all': '[{"client_id": "node1"}]'}]}
# First, we can get 3 tuples
result = self.dbapi.search_job(user_id=self.fake_job_3.
get('user_id'),
offset=0,
limit=3,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 3)
for index in range(len(result)):
jobmap = result[index]
self.assertEqual('node1', jobmap['client_id'])
# Second, we can get 2 tuples
result = self.dbapi.search_job(user_id=self.fake_job_3.
get('user_id'),
offset=3,
limit=3,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
for index in range(len(result)):
jobmap = result[index]
self.assertEqual('node1', jobmap['client_id'])

View File

@ -0,0 +1,407 @@
# -*- encoding: utf-8 -*-
# (c) Copyright 2018 ZTE Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for manipulating job via the DB API"""
import copy
from oslo_config import cfg
from freezer_api.tests.unit import common
from freezer_api.tests.unit.sqlalchemy import base
CONF = cfg.CONF
class DbSessionTestCase(base.DbTestCase):
def setUp(self):
super(DbSessionTestCase, self).setUp()
self.fake_session_0 = common.get_fake_session_0()
self.fake_session_0.pop('session_id')
self.fake_session_2 = common.get_fake_session_2()
self.fake_session_2.pop('session_id')
self.fake_session_3 = common.get_fake_session_3()
self.fake_session_3.pop('session_id')
self.fake_session_id = common.get_fake_session_id()
CONF.enable_v1_api = True
def tearDown(self):
super(DbSessionTestCase, self).tearDown()
CONF.enable_v1_api = False
def test_add_and_get_session(self):
session_doc = copy.deepcopy(self.fake_session_0)
session_id = self.dbapi.add_session(user_id=self.fake_session_0.
get('user_id'),
doc=session_doc)
self.assertIsNotNone(session_id)
result = self.dbapi.get_session(user_id=self.fake_session_0.
get('user_id'),
session_id=session_id)
self.assertIsNotNone(result)
self.assertEqual(result.get('session_tag'),
self.fake_session_0.get('session_tag'))
self.assertEqual(result.get('description'),
self.fake_session_0.get('description'))
self.assertEqual(result.get('hold_off'),
self.fake_session_0.get('hold_off'))
self.assertEqual(result.get('time_start'),
self.fake_session_0.get('time_start'))
self.assertEqual(result.get('time_end'),
self.fake_session_0.get('time_end'))
self.assertEqual(result.get('user_id'),
self.fake_session_0.get('user_id'))
self.assertEqual(result.get('schedule'),
self.fake_session_0.get('schedule'))
def test_add_and_delete_session(self):
session_doc = copy.deepcopy(self.fake_session_0)
session_id = self.dbapi.add_session(user_id=self.fake_session_0.
get('user_id'),
doc=session_doc)
self.assertIsNotNone(session_id)
result = self.dbapi.delete_session(user_id=self.fake_session_0.
get('user_id'),
session_id=session_id)
self.assertIsNotNone(result)
self.assertEqual(result, session_id)
result = self.dbapi.get_session(user_id=self.fake_session_0.
get('user_id'),
session_id=session_id)
self.assertEqual(len(result), 0)
def test_add_and_update_session(self):
session_doc = copy.deepcopy(self.fake_session_0)
session_id = self.dbapi.add_session(user_id=self.fake_session_0.
get('user_id'),
doc=session_doc)
self.assertIsNotNone(session_id)
patch_doc = copy.deepcopy(self.fake_session_2)
result = self.dbapi.update_session(user_id=self.fake_session_0.
get('user_id'),
session_id=session_id,
patch_doc=patch_doc)
self.assertIsNotNone(result)
self.assertEqual(result, 0)
result = self.dbapi.get_session(user_id=self.fake_session_0.
get('user_id'),
session_id=session_id)
self.assertIsNotNone(result)
self.assertEqual(result.get('description'),
self.fake_session_2.get('description'))
self.assertEqual(result.get('hold_off'),
self.fake_session_2.get('hold_off'))
self.assertEqual(result.get('schedule').get('schedule_date'),
self.fake_session_2.get('schedule').
get('schedule_date'))
def test_add_and_replace_session(self):
session_doc = copy.deepcopy(self.fake_session_0)
session_id = self.dbapi.add_session(user_id=self.fake_session_0.
get('user_id'),
doc=session_doc)
self.assertIsNotNone(session_id)
patch_doc = copy.deepcopy(self.fake_session_2)
result = self.dbapi.replace_session(user_id=self.
fake_session_2.get('user_id'),
session_id=session_id,
doc=patch_doc)
self.assertIsNotNone(result)
self.assertEqual(result, session_id)
result = self.dbapi.get_session(user_id=self.fake_session_2.
get('user_id'),
session_id=session_id)
self.assertIsNotNone(result)
self.assertEqual(result.get('description'),
self.fake_session_2.get('description'))
self.assertEqual(result.get('hold_off'),
self.fake_session_2.get('hold_off'))
self.assertEqual(result.get('schedule').get('schedule_date'),
self.fake_session_2.get('schedule').
get('schedule_date'))
patch_doc1 = copy.deepcopy(self.fake_session_0)
result = self.dbapi.replace_session(user_id=self.
fake_session_0.get('user_id'),
session_id=self.fake_session_id,
doc=patch_doc1)
self.assertIsNotNone(result)
result = self.dbapi.get_session(user_id=self.fake_session_2.
get('user_id'),
session_id=self.fake_session_id)
self.assertEqual(result.get('session_id'), self.fake_session_id)
def test_session_list_without_search(self):
count = 0
sessionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_session_3)
session_id = self.dbapi.add_session(user_id=self.fake_session_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(session_id)
sessionids.append(session_id)
count += 1
result = self.dbapi.search_session(user_id=self.fake_session_3.
get('user_id'),
offset=0,
limit=10)
self.assertIsNotNone(result)
self.assertEqual(len(result), 10)
for index in range(len(result)):
sessionmap = result[index]
self.assertEqual(sessionids[index], sessionmap['session_id'])
def test_session_list_with_search_match_and_match_not(self):
count = 0
sessionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_session_3)
if count in [0, 4, 8, 12, 16]:
doc['hold_off'] = 100
if count in [4, 12]:
doc['schedule']['schedule_date'] = \
'2018-12-12T00:00:00'
session_id = self.dbapi.add_session(user_id=self.fake_session_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(session_id)
sessionids.append(session_id)
count += 1
search_opt = {'match': [{'hold_off': 100}],
'match_not':
[{'schedule_date': '2018-12-12T00:00:00'}]
}
result = self.dbapi.search_session(user_id=self.fake_session_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 3)
for index in range(len(result)):
sessionmap = result[index]
self.assertEqual(100, sessionmap['hold_off'])
self.assertEqual('2018-11-14T16:20:00',
sessionmap['schedule']['schedule_date'])
def test_session_list_with_search_match_list(self):
count = 0
sessionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_session_3)
if count in [0, 4, 8, 12, 16]:
doc['hold_off'] = 100
if count in [4, 12]:
doc['schedule']['schedule_date'] = '2018-12-12T00:00:00'
session_id = self.dbapi.add_session(user_id=self.fake_session_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(session_id)
sessionids.append(session_id)
count += 1
search_opt = {'match': [{'hold_off': 100},
{'schedule_date': '2018-12-12T00:00:00'}]}
result = self.dbapi.search_session(user_id=self.fake_session_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
for index in range(len(result)):
sessionmap = result[index]
self.assertEqual(100, sessionmap['hold_off'])
self.assertEqual('2018-12-12T00:00:00',
sessionmap['schedule']['schedule_date'])
def test_session_list_with_search_match_not_list(self):
count = 0
sessionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_session_3)
if count in [0, 4, 8, 12, 16]:
doc['hold_off'] = 100
if count in [4, 12]:
doc['schedule']['schedule_date'] = '2018-12-12T00:00:00'
session_id = self.dbapi.add_session(user_id=self.fake_session_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(session_id)
sessionids.append(session_id)
count += 1
search_opt = {'match_not': [{'schedule_date': '2018-11-14T16:20:00'},
{'hold_off': 60}]}
result = self.dbapi.search_session(user_id=self.fake_session_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
for index in range(len(result)):
sessionmap = result[index]
self.assertEqual(100, sessionmap['hold_off'])
self.assertEqual('2018-12-12T00:00:00',
sessionmap['schedule']['schedule_date'])
def test_session_list_with_search_with_all_opt_one_match(self):
count = 0
sessionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_session_3)
if count in [0, 4, 8, 12, 16]:
doc['hold_off'] = 100
session_id = self.dbapi.add_session(user_id=self.fake_session_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(session_id)
sessionids.append(session_id)
count += 1
search_opt = {'match': [{'_all': '[{"hold_off": 100}]'}]}
result = self.dbapi.search_session(user_id=self.fake_session_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 5)
for index in range(len(result)):
sessionmap = result[index]
self.assertEqual(100, sessionmap['hold_off'])
def test_session_list_with_search_with_all_opt_two_matches(self):
count = 0
sessionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_session_3)
if count in [0, 4, 8, 12, 16]:
doc['hold_off'] = 100
if count in [4, 12]:
doc['schedule']['schedule_date'] = '2018-12-12T00:00:00'
session_id = self.dbapi.add_session(user_id=self.fake_session_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(session_id)
sessionids.append(session_id)
count += 1
search_opt = {'match': [{'_all': '[{"hold_off": 100},'
'{"schedule_date": '
'"2018-12-12T00:00:00"}]'}]}
result = self.dbapi.search_session(user_id=self.fake_session_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
for index in range(len(result)):
sessionmap = result[index]
self.assertEqual(100, sessionmap['hold_off'])
self.assertEqual('2018-12-12T00:00:00',
sessionmap['schedule']['schedule_date'])
def test_session_list_with_search_error_all_opt_return_alltuples(self):
count = 0
sessionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_session_3)
if count in [0, 4, 8, 12, 16]:
doc['hold_off'] = 100
session_id = self.dbapi.add_session(user_id=self.fake_session_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(session_id)
sessionids.append(session_id)
count += 1
search_opt = {'match': [{'_all': '{"hold_off": 100}'}]}
result = self.dbapi.search_session(user_id=self.fake_session_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 20)
search_opt = {'match': [{'_all': 'hold_off=100'}]}
result = self.dbapi.search_session(user_id=self.fake_session_3.
get('user_id'),
offset=0,
limit=20,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 20)
def test_session_list_with_search_and_offset_and_limit(self):
count = 0
sessionids = []
while (count < 20):
doc = copy.deepcopy(self.fake_session_3)
if count in [0, 4, 8, 12, 16]:
doc['hold_off'] = 100
session_id = self.dbapi.add_session(user_id=self.fake_session_3.
get('user_id'),
doc=doc)
self.assertIsNotNone(session_id)
sessionids.append(session_id)
count += 1
# There are 5 records.
search_opt = {'match': [{'_all': '[{"hold_off": 100}]'}]}
# First, we can get 3 tuples
result = self.dbapi.search_session(user_id=self.fake_session_3.
get('user_id'),
offset=0,
limit=3,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 3)
for index in range(len(result)):
sessionmap = result[index]
self.assertEqual(100, sessionmap['hold_off'])
# Second, we can get 2 tuples
result = self.dbapi.search_session(user_id=self.fake_session_3.
get('user_id'),
offset=3,
limit=3,
search=search_opt)
self.assertIsNotNone(result)
self.assertEqual(len(result), 2)
for index in range(len(result)):
sessionmap = result[index]
self.assertEqual(100, sessionmap['hold_off'])

View File

@ -175,8 +175,11 @@ class DbActionTestCase(base.DbTestCase):
doc=patch_doc1,
action_id=self.fake_action_id)
self.assertIsNotNone(result)
self.assertEqual(result, self.fake_action_id)
result = self.dbapi.get_action(project_id=self.fake_project_id,
user_id=self.fake_action_2.
get('user_id'),
action_id=self.fake_action_id)
self.assertEqual(result.get('action_id'), self.fake_action_id)
def test_add_and_search_action(self):
count = 0

View File

@ -156,8 +156,10 @@ class DbJobTestCase(base.DbTestCase):
doc=patch_doc1,
project_id=self.fake_project_id)
self.assertIsNotNone(result)
self.assertEqual(result, self.fake_job_id)
result = self.dbapi.get_job(project_id=self.fake_project_id,
user_id=self.fake_job_2.get('user_id'),
job_id=self.fake_job_id)
self.assertEqual(result.get('job_id'), self.fake_job_id)
def test_job_list_without_search(self):
count = 0

View File

@ -166,8 +166,12 @@ class DbSessionTestCase(base.DbTestCase):
project_id=self.fake_session_2.
get('project_id'))
self.assertIsNotNone(result)
self.assertEqual(result, self.fake_session_id)
result = self.dbapi.get_session(project_id=self.fake_session_2.
get('project_id'),
user_id=self.fake_session_2.
get('user_id'),
session_id=self.fake_session_id)
self.assertEqual(result.get('session_id'), self.fake_session_id)
def test_session_list_without_search(self):
count = 0