Add fuyaql to allow debug expressions on master node

Sometimes there is needed to test new YAQL expression.
To allow do it easier, include new YAQL shell as a part
of nailgun.

Change-Id: Ibf211adf4b9b111e143b4dc5512fe9c1998c3927
Closes-Bug: #1590007
This commit is contained in:
Stanislaw Bogatkin 2016-06-07 16:53:53 +03:00 committed by Bulat Gaifullin
parent b36d190765
commit 529076b043
6 changed files with 1149 additions and 0 deletions

View File

@ -181,6 +181,16 @@ def load_extensions_parsers(subparsers):
load_alembic_parsers(extensions_parser)
def load_yaql_parsers(subparsers):
yaql_parser = subparsers.add_parser(
'yaql', help='run live YAQL console for cluster'
)
yaql_parser.add_argument(
'-c', '--cluster_id', dest='cluster_id', action='store', type=str,
help='cluster id'
)
def action_dumpdata(params):
import logging
@ -334,6 +344,11 @@ def action_shell(params):
code.interact(local={'db': db, 'settings': settings})
def action_yaql(params):
from nailgun.fuyaql import fuyaql
fuyaql.main(params.cluster_id)
def action_run(params):
from nailgun.settings import settings
@ -371,6 +386,7 @@ if __name__ == "__main__":
load_shell_parsers(subparsers)
load_settings_parsers(subparsers)
load_extensions_parsers(subparsers)
load_yaql_parsers(subparsers)
params, other_params = parser.parse_known_args()
sys.argv.pop(1)

View File

@ -0,0 +1,133 @@
# Fuel-YAQL
Fuel-YAQL is a live YAQL master node console for Fuel to easy evaluate yaql
conditions user wanted to put into task.
How to use:
on Fuel master node, run
> manage.py yaql -c 'CLUSTER_ID'
where 'CLUSTER_ID' is id of existing cluster which you can get by run
> fuel env
command. Cluster id is required there to have an opportunity for use internal Fuel yaql
functions such as 'changed' or 'new'. After this fuel-yaql console will be opened:
> fuel-yaql >
there you can evaluate all functions and conditions you need just by entering
them, for example:
> fuel-yaql> changed($)
> true
This console has some internal commands. There they are:
> fuel-yaql> :show cluster
> Cluster id is: 1, name is: test
shows you the cluster you currently use.
> fuel-yaql> :show node
> Currently used node id is: master
shows you a node in this cluster for which conditions will be evaluated.
> fuel-yaql> :show nodes
> Cluster has nodes with ids: {1: 'controller'}
shows you all nodes in this cluster
> fuel-yaql> :use cluster 1
will switch contexts to another cluster
> fuel-yaql> :use node 1
will switch contexts to another node
> fuel-yaql> :show tasks
shows all tasks in 'deployment', 'error', 'ready' and 'pending' states for
currently selected cluster. In other words, this commands represents a
list of tasks which you can use as a context.
> fuel-yaql> :loadprevious task 5
will switch *old* context to a context of pointed task. It can be worthy if you
want to evaluate an expression not for the current cluster state, but for old one.
> fuel-yaql> :loadcurrent task 10
will switch *new* context to context of pointed task. It can be as worthy
as *:loadprevious* command. Maybe you should know that there is no
restriction to have old context really older than new context - you can switch
them as you want
Fuel itself has several internal yaql functions which are not included to base
yaql interpreter. There they are:
```changed()``` - will show you the difference between new and old contexts
```new()``` - returns you new context data
```old()``` - returns you old context data
```added()``` - returns the diff what was added between old and new context
```deleted()``` - returns the diff what was deleted between old and new contexts
```changedAll($.first, $.second, $.Nth)``` - returns True if all expressions in
parentheses returns non-False
```changedAny($.first, $.second, $.Nth)``` - returns True if any expression in
parentheses returns non-False
# Changelog
## 0.7
[*] Project moved and became a part of Fuel-nailgun
## 0.6
[*] internal fixes, tests added
## 0.5
[+] opportunity to run fuyaql with predefined contexts and expression and return
a result
## 0.4
[*] internal fixes
## 0.3
[+] changelog
[+] switched to readline, so input line doesn't looks like telnet one
[+] internal commands autocomplit by Tab
[+] internal commands ':oldcontext task' and ':newcontext task' added
[*] not creating default existing context as a task anymore. It allows to not
touch DB for a creating temporary task
## 0.2
[+] first usable version
## 0.1
[+] proof of concept created

View File

View File

@ -0,0 +1,37 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class FuCompleter(object):
def __init__(self, words):
self.words = words
self.prefix = None
self.matches = None
def complete(self, text, index):
"""Check for matches words and return them
:param text: text to match
:type text: str
:param index: index to calculate completion results
:type index: int
:return: matched words if any, None if fails
"""
if text != self.prefix:
self.matches = [m for m in self.words if m.startswith(text)]
self.prefix = text
try:
return self.matches[index]
except IndexError:
return None

View File

@ -0,0 +1,403 @@
#!/usr/bin/env python
#
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Fuel YAQL real-time console.
Allow fast and easy test your YAQL expressions on live cluster."""
from __future__ import print_function
import inspect
import json
import readline
import sys
import traceback
from nailgun import consts
from nailgun.fuyaql import completion
from nailgun.logger import logger
from nailgun import objects
from nailgun.orchestrator import deployment_serializers
from nailgun import yaql_ext
logger.disabled = True
class FuYaqlController(object):
CURRENT = 0
EXPECTED = 1
def __init__(self):
self._cluster = None
self._node_id = None
self._tasks = [None, None]
self._infos = [None, None]
self._yaql_context = yaql_ext.create_context(
add_serializers=True, add_datadiff=True
)
self._yaql_engine = yaql_ext.create_engine()
@property
def cluster(self):
return self._cluster
@property
def node_id(self):
return self._node_id
@property
def selected_tasks(self):
return self._tasks
def set_cluster(self, cluster_id=None):
"""Load the cluster object.
:param cluster_id: id of a cluster
"""
cluster = objects.Cluster.get_by_uid(
cluster_id, fail_if_not_found=False
)
if cluster:
self._cluster = cluster
self._set_task(self.EXPECTED, None)
self._set_task(
self.CURRENT,
objects.TransactionCollection.get_last_succeed_run(cluster)
)
return True
return False
def set_task(self, state, task_id=None):
"""Sets the task, which is used to get new state."""
assert self._cluster
task = self._get_task(task_id)
if task is not False:
self._set_task(state, task)
return True
return False
def set_node(self, node_id):
"""Sets the node id."""
info = self._get_info(self.EXPECTED)
if node_id in info:
self._node_id = node_id
return True
return False
def get_node(self):
"""Gets the full information about node."""
assert self._node_id is not None
return self._get_info(self.EXPECTED)[self._node_id]
@staticmethod
def get_clusters():
"""Gets list of all clusters."""
return objects.ClusterCollection.order_by(
objects.ClusterCollection.all(),
'id'
)
def get_tasks(self):
"""Gets all deployment tasks for current cluster."""
assert self.cluster
query = objects.TransactionCollection.filter_by(
None,
cluster_id=self.cluster.id, name=consts.TASK_NAMES.deployment
)
query = objects.TransactionCollection.filter_by_not(
query, deployment_info=None
)
return objects.TransactionCollection.order_by(query, 'id')
def get_nodes(self):
info = self._get_info(self.EXPECTED)
for node_id in sorted(info):
yield info[node_id]
def evaluate(self, expression):
"""Evaluate given YAQL expression
:param expression: YAQL expression which needed to be evaluated
:return: result of evaluation as a string
"""
assert self.cluster
assert self.node_id
context = self._yaql_context.create_child_context()
context['$%new'] = self._get_info(self.EXPECTED)[self._node_id]
context['$%old'] = self._get_info(self.CURRENT).get(self._node_id, {})
parsed_exp = self._yaql_engine(expression)
return parsed_exp.evaluate(data=context['$%new'], context=context)
def _get_task(self, task_id):
"""Gets task by id and checks that it belongs to cluster."""
if not task_id:
return None
task = objects.Transaction.get_by_uid(task_id, fail_if_not_found=False)
if task and task.cluster_id == self.cluster.id:
return task
return False
def _set_task(self, state, task):
self._tasks[state] = task
self._set_info(
state,
objects.Transaction.get_deployment_info(task)
)
def _set_info(self, state, info):
if state == self.EXPECTED:
self._node_id = None
if info is None:
serialized = deployment_serializers.serialize_for_lcm(
self._cluster,
objects.Cluster.get_nodes_not_for_deletion(self._cluster)
)
info = {node['uid']: node for node in serialized}
self._infos[state] = info or {}
def _get_info(self, state):
return self._infos[state]
class FuyaqlInterpreter(object):
COMMANDS = {
':show clusters': 'show_clusters',
':show cluster': 'show_cluster',
':use cluster': 'set_cluster',
':show tasks': 'show_tasks',
':use task2': 'set_task2',
':use task1': 'set_task1',
':show nodes': 'show_nodes',
':show node': 'show_node',
':use node': 'set_node',
':help': 'show_help',
':exit': 'shutdown',
':q': 'shutdown',
}
def __init__(self, cluster_id=None, node_id=None, controller=None):
self.controller = controller or FuYaqlController()
if cluster_id is not None:
self.set_cluster(cluster_id)
if node_id is not None:
self.set_node(node_id)
def show_help(self):
"""Shows this help."""
for cmd in sorted(self.COMMANDS):
doc = getattr(self, self.COMMANDS[cmd]).__doc__
print(cmd, '-', doc)
def show_clusters(self):
"""Shows all clusters which is available for choose."""
cluster_ids = [
self.controller.cluster and self.controller.cluster['id']
]
self.print_list(
('id', 'name', 'status'), self.controller.get_clusters(),
lambda x: cluster_ids.index(x['id'])
)
def show_tasks(self):
"""Shows all tasks which is available for choose."""
task_ids = [
t and t['id'] for t in self.controller.selected_tasks
]
if self._check_cluster():
self.print_list(
('id', 'status'), self.controller.get_tasks(),
lambda x: task_ids.index(x['id'])
)
def show_nodes(self):
"""Shows all tasks which is available for choose."""
node_ids = [self.controller.node_id]
if self._check_cluster():
self.print_list(
('uid', 'status', 'roles'), self.controller.get_nodes(),
lambda x: node_ids.index(x['uid'])
)
def show_cluster(self):
"""Shows details of selected cluster."""
if self.controller.cluster:
self.print_object(
'cluster', ('id', 'name', 'status'), self.controller.cluster
)
else:
print("There is no cluster.")
def show_task2(self):
"""Shows details of task, which belongs to new state of cluster."""
self._show_task(self.controller.EXPECTED)
def show_task1(self):
"""Shows details of task, which belongs to old state of cluster."""
self._show_task(self.controller.CURRENT)
def show_node(self):
"""Shows details of selected node."""
if self.controller.node_id:
self.print_object(
'node',
('uid', 'status', 'roles'),
self.controller.get_node()
)
else:
print("Please select node at first.")
def set_cluster(self, cluster_id):
"""Select the cluster."""
if not self.controller.set_cluster(cluster_id):
print("There is no cluster with id:", cluster_id)
def set_node(self, node_id):
"""Select the node."""
if self._check_cluster():
if not self.controller.set_node(node_id):
print("There is no node with id:", node_id)
def set_task2(self, task_id):
"""Select the task which will belong to state new."""
self._set_task(self.controller.EXPECTED, task_id)
def set_task1(self, task_id):
"""Select the task which will belong to state old."""
self._set_task(self.controller.CURRENT, task_id)
def evaluate_expression(self, exp):
if self._check_node():
return self.controller.evaluate(exp)
def execute_command(self, cmdline):
for cmd in self.COMMANDS:
if (cmdline.startswith(cmd) and
(len(cmdline) == len(cmd) or cmdline[len(cmd)].isspace())):
break
else:
print("Unknown command:", cmdline)
print("Please use :help to see list of available commands")
return
f = getattr(self, self.COMMANDS[cmd])
args = cmdline[len(cmd):].split()
try:
inspect.getcallargs(f, *args)
except (ValueError, TypeError):
print("Not enough arguments for a command were given.")
return
return f(*args)
@staticmethod
def shutdown():
"""Exits."""
sys.exit(0)
def run_loop(self):
"""Create a loop for user input"""
while True:
try:
command = raw_input('fuel-yaql> ').strip()
except EOFError:
return
if not command:
continue
try:
if command.startswith(':'): # Check for internal command
r = self.execute_command(command)
else:
r = self.evaluate_expression(command)
if isinstance(r, (list, dict)):
print(json.dumps(r, indent=4))
elif r is not None:
print(r)
except Exception as e:
print("Unexpected error: {0}".format(e))
traceback.print_exc(sys.stdout)
def _show_task(self, state):
task = self.controller.selected_tasks[state]
if task:
self.print_object('task', ('id', 'status'), task)
else:
print("Please select task at first.")
def _set_task(self, state, task_id):
if self._check_cluster():
next_state = (state + 1) % len(self.controller.selected_tasks)
next_task = self.controller.selected_tasks[next_state]
task_id = int(task_id or 0)
if task_id and next_task:
if next_state > state and task_id > next_task['id']:
print("The task, which belongs to state old cannot be"
" under than task which belongs to state new.")
return
elif next_state < state and task_id < next_task['id']:
print("The task, which belongs to state new cannot be"
" older than task which belongs to state old.")
return
self.controller.set_task(state, task_id)
def _check_cluster(self):
if self.controller.cluster is None:
print("Select cluster at first.")
return False
return True
def _check_node(self):
if self.controller.node_id is None:
print("Select node at first.")
return False
return True
@staticmethod
def print_list(column_names, iterable, get_selected_index=None):
template = '\t|\t'.join('{%d}' % x for x in range(len(column_names)))
print(template.format(*column_names))
print('-' * (sum(len(x) + 5 for x in column_names)))
for column in iterable:
if get_selected_index is not None:
try:
print('*' * (get_selected_index(column) + 1), end=' ')
except ValueError:
pass
print(template.format(*(column.get(x, '-') for x in column_names)))
@staticmethod
def print_object(name, properties, obj):
print(name.title() + ':')
for p in properties:
print("\t{0}:\t{1}".format(p, obj.get(p, '-')))
def main(cluster_id=None, node_id=None):
# set up command history and completion
readline.set_completer_delims(r'''`~!@#$%^&*()-=+[{]}\|;'",<>/?''')
readline.set_completer(completion.FuCompleter(
list(FuyaqlInterpreter.COMMANDS)
).complete)
readline.parse_and_bind('tab: complete')
interpret = FuyaqlInterpreter(cluster_id, node_id)
interpret.run_loop()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,560 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import mock
from nailgun import consts
from nailgun.fuyaql import fuyaql
from nailgun.test import base
@mock.patch('nailgun.fuyaql.fuyaql.objects')
class TestFuyaqlController(base.BaseUnitTest):
def setUp(self):
self.controller = fuyaql.FuYaqlController()
self.new_context = {
1: {
'uid': 1,
'roles': ['primary-controller'],
'debug': 'false',
'cinder': {
'db_password': '9RkYPCQT9V3LerPsp0qvuzmh',
'fixed_key': 'f74ce7f535cb61fc0ee8ba77',
'user_password': '62493085e6cfcaa4638ec08'
}
}
}
def test_set_cluster_with_empty_cluster_value(self, obj_mock):
obj_mock.Cluster.get_by_uid.return_value = None
with mock.patch.object(self.controller, '_set_task') as s_task:
self.assertFalse(self.controller.set_cluster())
obj_mock.Cluster.get_by_uid.assert_called_once_with(
None,
fail_if_not_found=False
)
s_task.assert_not_called()
def test_set_cluster_with_non_empty_cluster_value(self, obj_mock):
obj_mock.Cluster.get_by_uid.return_value = 'cluster'
obj_mock.TransactionCollection.get_last_succeed_run.return_value =\
'task'
with mock.patch.object(self.controller, '_set_task') as s_task:
self.assertTrue(self.controller.set_cluster('id'))
obj_mock.Cluster.get_by_uid.assert_called_once_with(
'id',
fail_if_not_found=False
)
obj_mock.TransactionCollection.get_last_succeed_run.\
assert_called_once_with('cluster')
s_task.assert_any_call(self.controller.EXPECTED, None)
s_task.assert_called_with(self.controller.CURRENT, 'task')
def test_set_task_without_task_id(self, _):
self.controller._cluster = 'cluster'
with mock.patch.object(self.controller, '_set_task') as s_task:
self.assertTrue(self.controller.set_task(self.controller.EXPECTED))
s_task.assert_called_once_with(self.controller.EXPECTED, None)
def test_set_task_when_task_is_found(self, obj_mock):
mock_ = mock.MagicMock(id=1, cluster_id=1)
self.controller._cluster = mock_
obj_mock.Transaction.get_by_uid.return_value = mock_
self.assertTrue(self.controller.set_task(self.controller.CURRENT, 1))
obj_mock.Transaction.get_deployment_info.assert_called_once_with(
mock_
)
def test_set_task_when_is_not_found(self, obj_mock):
self.controller._cluster = 'cluster'
obj_mock.Transaction.get_by_uid.return_value = None
with mock.patch.object(self.controller, '_set_task') as s_task:
self.assertFalse(
self.controller.set_task(self.controller.EXPECTED, 1)
)
s_task.assert_not_called()
def test_set_task_when_task_is_not_from_this_cluster(self, obj_mock):
mock_ = mock.MagicMock(id=1, cluster_id=2)
self.controller._cluster = mock_
obj_mock.Transaction.get_by_uid.return_value = mock_
self.assertFalse(self.controller.set_task(self.controller.CURRENT, 1))
obj_mock.Transaction.get_deployment_info.assert_not_called()
def test_set_node(self, _):
self.controller._infos[self.controller.EXPECTED] = self.new_context
self.assertTrue(self.controller.set_node(1))
self.assertFalse(self.controller.set_node(2))
def test_get_node(self, _):
self.controller._infos[self.controller.EXPECTED] = self.new_context
self.controller._node_id = 1
self.assertEqual(self.controller.get_node(), self.new_context[1])
def test_get_clusters(self, obj_mock):
obj_mock.ClusterCollection.order_by.return_value = 'cluster'
obj_mock.ClusterCollection.all.return_value = 'all'
self.assertEqual(self.controller.get_clusters(), 'cluster')
obj_mock.ClusterCollection.all.assert_called_once_with()
obj_mock.ClusterCollection.order_by.assert_called_once_with(
'all',
'id'
)
def test_get_tasks(self, obj_mock):
self.controller._cluster = mock.MagicMock(id=1)
obj_mock.TransactionCollection.filter_by.return_value = 'query'
obj_mock.TransactionCollection.filter_by_not.return_value = 'requery'
obj_mock.TransactionCollection.order_by.return_value = 'tasks'
self.assertEqual(self.controller.get_tasks(), 'tasks')
obj_mock.TransactionCollection.filter_by.assert_called_once_with(
None,
cluster_id=self.controller.cluster.id,
name=consts.TASK_NAMES.deployment
)
obj_mock.TransactionCollection.filter_by_not.assert_called_once_with(
'query', deployment_info=None
)
obj_mock.TransactionCollection.order_by.assert_called_once_with(
'requery', 'id'
)
def test_get_nodes(self, _):
self.controller._infos[self.controller.EXPECTED] = {
'1': {'uid': '1'}, '2': {'uid': '2'}}
expected = [{'uid': '1'}, {'uid': '2'}]
self.assertEqual(
expected,
list(self.controller.get_nodes())
)
def test_evaluate(self, _):
old_context = {
1: {
'uid': 1,
'roles': ['primary-controller'],
'debug': 'true',
'cinder': {
'db_password': '9RkYPCQT9V3LerPsp0qvuzmh',
'fixed_key': 'f74ce7f535cb61fc0ee8ba77',
'user_password': 'n8BMurmecwUWcH52nbdxqPtz'
}
}
}
self.controller._infos = [old_context, self.new_context]
self.controller._node_id = 1
self.controller._cluster = True
self.assertTrue(self.controller.evaluate('changed($)'))
self.assertRaises(self.controller.evaluate('changed($.roles)'))
def test_get_task(self, obj_mock):
self.assertEqual(self.controller._get_task(None), None)
obj_mock.Transaction.get_by_uid.return_value = None
self.assertFalse(self.controller._get_task(1))
task = mock.MagicMock()
task.cluster_id = 4
self.controller._cluster = mock.MagicMock()
self.controller._cluster.id = 4
obj_mock.Transaction.get_by_uid.return_value = task
self.assertEqual(self.controller._get_task(1), task)
def test__set_task(self, obj_mock):
obj_mock.Transaction.get_deployment_info.return_value = 'dep_info'
with mock.patch.object(self.controller, '_set_info') as s_info:
self.controller._set_task(self.controller.EXPECTED, 'task')
self.assertEqual(self.controller._tasks[self.controller.EXPECTED],
'task')
obj_mock.Transaction.get_deployment_info.assert_called_with(
'task'
)
s_info.assert_called_once_with(self.controller.EXPECTED, 'dep_info')
@mock.patch(
'nailgun.orchestrator.deployment_serializers.serialize_for_lcm')
def test_set_info(self, serialized, obj_mock):
self.controller._cluster = mock.MagicMock()
self.controller._node_id = 7
self.controller._set_info(self.controller.CURRENT, 'info')
self.assertEqual(self.controller._infos[self.controller.CURRENT],
'info')
self.assertEqual(self.controller._node_id, 7)
self.controller._set_info(self.controller.CURRENT, None)
self.assertEqual(self.controller._infos[self.controller.CURRENT], {})
self.assertEqual(self.controller._node_id, 7)
serialized.return_value = list(self.new_context.values())
self.controller._set_info(self.controller.EXPECTED, 'exp_info')
self.assertEqual(self.controller._infos[self.controller.EXPECTED],
'exp_info')
self.assertEqual(self.controller._node_id, None)
self.controller._node_id = 8
self.controller._set_info(self.controller.EXPECTED, None)
self.assertEqual(self.controller._infos[self.controller.EXPECTED],
self.new_context)
self.assertEqual(self.controller._node_id, None)
serialized.assert_called_once_with(
self.controller._cluster,
obj_mock.Cluster.get_nodes_not_for_deletion(
self.controller._cluster)
)
@mock.patch('nailgun.fuyaql.fuyaql.print', create=True)
class TestFuyaqlInterpreter(base.BaseUnitTest):
def setUp(self):
self.controller = mock.MagicMock(spec=fuyaql.FuYaqlController)
self.interpreter = fuyaql.FuyaqlInterpreter(controller=self.controller)
def test_show_help(self, print_mock):
self.interpreter.show_help()
self.assertEqual(
len(self.interpreter.COMMANDS), print_mock.call_count
)
print_mock.assert_any_call(
":help", "-", self.interpreter.show_help.__doc__
)
def test_show_clusters_if_cluster_selected(self, _):
self.controller.get_clusters.return_value = [
{'id': 1, 'name': 'test', 'status': 'error'},
{'id': 2}
]
self.controller.cluster = {'id': 2}
with mock.patch.object(self.interpreter, 'print_list') as print_mock:
self.interpreter.show_clusters()
print_mock.assert_called_once_with(
('id', 'name', 'status'),
self.controller.get_clusters.return_value,
mock.ANY
)
self.assertEqual(0, print_mock.call_args[0][2]({'id': 2}))
self.assertRaises(ValueError, print_mock.call_args[0][2], {'id': 1})
def test_show_clusters_if_no_cluster_selected(self, _):
self.controller.get_clusters.return_value = [
{'id': 1, 'name': 'test', 'status': 'error'},
{'id': 2}
]
self.controller.cluster = None
with mock.patch.object(self.interpreter, 'print_list') as print_mock:
self.interpreter.show_clusters()
print_mock.assert_called_once_with(
('id', 'name', 'status'),
self.controller.get_clusters.return_value,
mock.ANY
)
self.assertRaises(ValueError, print_mock.call_args[0][2], {'id': 1})
self.assertRaises(ValueError, print_mock.call_args[0][2], {'id': 2})
def test_show_tasks_if_no_cluster(self, print_mock):
self.controller.cluster = None
self.interpreter.show_tasks()
print_mock.assert_called_once_with("Select cluster at first.")
def test_show_tasks_if_tasks_selected(self, _):
self.controller.cluster = {'id': 1}
self.controller.get_tasks.return_value = [
{'id': 1, 'status': 'error'},
{'id': 2},
{'id': 3}
]
self.controller.selected_tasks = [{'id': 2}, {'id': 3}]
with mock.patch.object(self.interpreter, 'print_list') as print_mock:
self.interpreter.show_tasks()
print_mock.assert_called_once_with(
('id', 'status'),
self.controller.get_tasks.return_value,
mock.ANY
)
self.assertEqual(0, print_mock.call_args[0][2]({'id': 2}))
self.assertEqual(1, print_mock.call_args[0][2]({'id': 3}))
self.assertRaises(ValueError, print_mock.call_args[0][2], {'id': 1})
def test_show_tasks_if_no_all_tasks_selected(self, _):
self.controller.get_tasks.return_value = [
{'id': 1, 'status': 'error'},
{'id': 2},
{'id': 3}
]
self.controller.selected_tasks = [None, {'id': 3}]
with mock.patch.object(self.interpreter, 'print_list') as print_mock:
self.interpreter.show_tasks()
print_mock.assert_called_once_with(
('id', 'status'),
self.controller.get_tasks.return_value,
mock.ANY
)
self.assertEqual(1, print_mock.call_args[0][2]({'id': 3}))
self.assertRaises(ValueError, print_mock.call_args[0][2], {'id': 1})
self.assertRaises(ValueError, print_mock.call_args[0][2], {'id': 2})
def test_show_nodes_if_no_cluster(self, print_mock):
self.controller.cluster = None
self.interpreter.show_tasks()
print_mock.assert_called_once_with("Select cluster at first.")
def test_show_nodes_if_node_selected(self, _):
self.controller.cluster = {'id': 1}
self.controller.get_nodes.return_value = [
{'uid': '1', 'status': 'ready', 'roles': ['controller']},
{'uid': '2'},
{'uid': '3'}
]
self.controller.node_id = '2'
with mock.patch.object(self.interpreter, 'print_list') as print_mock:
self.interpreter.show_nodes()
print_mock.assert_called_once_with(
('uid', 'status', 'roles'),
self.controller.get_nodes.return_value,
mock.ANY
)
get_index_func = print_mock.call_args[0][2]
self.assertEqual(0, get_index_func({'uid': '2'}))
self.assertRaises(ValueError, get_index_func, {'uid': '1'})
self.assertRaises(ValueError, get_index_func, {'uid': '3'})
def test_show_nodes_if_no_node_selected(self, _):
self.controller.cluster = {'id': 1}
self.controller.get_nodes.return_value = [
{'uid': '1', 'status': 'ready', 'roles': ['controller']},
{'uid': '2'},
{'uid': '3'}
]
self.controller.node_id = None
with mock.patch.object(self.interpreter, 'print_list') as print_mock:
self.interpreter.show_nodes()
print_mock.assert_called_once_with(
('uid', 'status', 'roles'),
self.controller.get_nodes.return_value,
mock.ANY
)
get_index_func = print_mock.call_args[0][2]
self.assertRaises(ValueError, get_index_func, {'uid': '1'})
self.assertRaises(ValueError, get_index_func, {'uid': '2'})
self.assertRaises(ValueError, get_index_func, {'uid': '3'})
def test_show_cluster_if_no_cluster(self, print_mock):
self.controller.cluster = None
self.interpreter.show_cluster()
print_mock.assert_called_once_with("There is no cluster.")
def test_show_cluster_if_cluster(self, _):
self.controller.cluster = {'id': 1, 'status': 'error', 'name': 'test'}
with mock.patch.object(self.interpreter, 'print_object') as print_mock:
self.interpreter.show_cluster()
print_mock.assert_called_once_with(
'cluster', ('id', 'name', 'status'), self.controller.cluster
)
def test_show_task2(self, _):
with mock.patch.object(self.interpreter, '_show_task') as show_mock:
self.interpreter.show_task2()
show_mock.assert_called_once_with(self.controller.EXPECTED)
def test_show_task(self, _):
with mock.patch.object(self.interpreter, '_show_task') as show_mock:
self.interpreter.show_task1()
show_mock.assert_called_once_with(self.controller.CURRENT)
def test_show_node_if_no_node(self, print_mock):
self.controller.node_id = None
self.interpreter.show_node()
print_mock.assert_called_once_with("Please select node at first.")
self.assertEqual(0, self.controller.get_node.call_count)
def test_show_node_if_node(self, _):
self.controller.node_id = '2'
self.controller.get_node_return_value = {'uid': 2}
with mock.patch.object(self.interpreter, 'print_object') as print_mock:
self.interpreter.show_node()
print_mock.assert_called_once_with(
'node',
('uid', 'status', 'roles'),
self.controller.get_node.return_value
)
def test_set_cluster(self, print_mock):
"""Select the cluster."""
self.controller.set_cluster.side_effect = [True, False]
self.interpreter.set_cluster('1')
self.interpreter.set_cluster('2')
print_mock.assert_called_once_with(
"There is no cluster with id:", "2"
)
def test_set_node_if_no_cluster(self, print_mock):
self.controller.cluster = None
self.interpreter.set_node('1')
print_mock.assert_called_once_with("Select cluster at first.")
def test_set_node_if_cluster(self, print_mock):
"""Select the cluster."""
self.controller.cluster = {'id': 1}
self.controller.set_node.side_effect = [True, False]
self.interpreter.set_node('1')
self.interpreter.set_node('2')
print_mock.assert_called_once_with(
"There is no node with id:", "2"
)
def test_set_task2(self, _):
with mock.patch.object(self.interpreter, '_set_task') as set_mock:
self.interpreter.set_task2('2')
set_mock.assert_called_once_with(self.controller.EXPECTED, '2')
def test_set_task1(self, _):
with mock.patch.object(self.interpreter, '_set_task') as set_mock:
self.interpreter.set_task1('1')
set_mock.assert_called_once_with(self.controller.CURRENT, '1')
def test_evaluate_expression_if_no_node(self, print_mock):
self.controller.node_id = None
self.interpreter.evaluate_expression("$.toYaml()")
print_mock.assert_called_once_with("Select node at first.")
def test_evaluate_expression_if_node(self, _):
self.controller.node_id = {'uid': '1'}
self.controller.evaluate.return_value = '1'
self.assertEqual(
'1',
self.interpreter.evaluate_expression("$.uid")
)
self.controller.evaluate.assert_called_once_with('$.uid')
def test_execute_command_if_invalid_command(self, print_mock):
self.interpreter.execute_command(":helpme")
print_mock.assert_has_calls(
[mock.call("Unknown command:", ":helpme"),
mock.call("Please use :help to see list of available commands")]
)
def test_execute_command_with_arguments(self, _):
set_mock = mock.MagicMock()
with mock.patch.object(
self.interpreter, 'set_cluster', new=set_mock.__call__):
r = self.interpreter.execute_command(":use cluster 1")
set_mock.assert_called_once_with('1')
self.assertIs(set_mock.return_value, r)
def test_execute_command_with_invalid_arguments(self, print_mock):
self.interpreter.execute_command(":use cluster")
print_mock.assert_called_once_with(
'Not enough arguments for a command were given.'
)
def test_execute_command_without_arguments(self, _):
show_mock = mock.MagicMock()
with mock.patch.object(
self.interpreter, 'show_cluster', new=show_mock.__call__):
self.interpreter.execute_command(":show cluster")
show_mock.assert_called_once_with()
def test_show_task_if_no_task(self, print_mock):
self.controller.selected_tasks = [None, 1]
self.interpreter._show_task(0)
print_mock.assert_called_once_with("Please select task at first.")
def test_show_task_if_task(self, _):
self.controller.selected_tasks = [None, {'id': 1}]
with mock.patch.object(self.interpreter, 'print_object') as print_mock:
self.interpreter._show_task(1)
print_mock.assert_called_once_with('task', ('id', 'status'), {'id': 1})
def test_set_task_if_no_cluster(self, print_mock):
self.controller.cluster = None
self.interpreter._set_task(0, 1)
print_mock.assert_called_once_with("Select cluster at first.")
self.assertEqual(0, self.controller.set_task.call_count)
def test_set_task_check_task_order(self, print_mock):
self.controller.cluster = {'id': 1}
self.controller.selected_tasks = [{'id': 5}, {'id': 10}]
self.interpreter._set_task(0, 20)
print_mock.assert_called_with(
"The task, which belongs to state old cannot be"
" under than task which belongs to state new."
)
self.interpreter._set_task(1, 1)
print_mock.assert_called_with(
"The task, which belongs to state new cannot be"
" older than task which belongs to state old."
)
self.assertEqual(0, self.controller.set_task.call_count)
def test_set_task_successfully(self, _):
self.controller.CURRENT = 0
self.controller.EXPECTED = 1
self.controller.cluster = {'id': 1}
self.controller.selected_tasks = [{'id': 5}, {'id': 10}]
self.interpreter._set_task(self.controller.CURRENT, '')
self.controller.set_task.assert_called_with(
self.controller.CURRENT, 0
)
self.interpreter._set_task(self.controller.EXPECTED, '')
self.controller.set_task.assert_called_with(
self.controller.EXPECTED, 0
)
self.controller.selected_tasks = [{'id': 5}, None]
self.interpreter._set_task(self.controller.CURRENT, '10')
self.controller.set_task.assert_called_with(
self.controller.CURRENT, 10
)
self.controller.selected_tasks = [None, {'id': 5}]
self.interpreter._set_task(self.controller.EXPECTED, '1')
self.controller.set_task.assert_called_with(
self.controller.EXPECTED, 1
)
def test_print_list(self, print_mock):
self.interpreter.print_list(
('id', 'status'),
[{'id': 1, 'status': 'ok'}, {'id': 2}, {'id': 3}],
lambda x: [2, 3].index(x['id'])
)
print_mock.assert_has_calls([
mock.call('id\t|\tstatus'),
mock.call('-' * 18),
mock.call('1\t|\tok'),
mock.call('*', end=' '),
mock.call('2\t|\t-'),
mock.call('**', end=' '),
mock.call('3\t|\t-')
], any_order=False)
def test_print_object(self, print_mock):
self.interpreter.print_object(
'node',
('id', 'status'),
{'id': 1},
)
print_mock.assert_has_calls([
mock.call('Node:'),
mock.call("\tid:\t1"),
mock.call("\tstatus:\t-"),
], any_order=False)