Add tags support in CLI

Releases new fuel2 commands to operate on tags:
fuel2 tag create
fuel2 tag delete
fuel2 tag download
fuel2 tag list
fuel2 tag update

DocImpact

Change-Id: I71ae78b4a733d83a5441ff0ac5ba627502e7f2b3
Implements: blueprint role-decomposition
This commit is contained in:
Mikhail 2016-11-11 13:39:55 +03:00
parent f1ca20421a
commit b482cb522e
11 changed files with 931 additions and 1 deletions

View File

@ -78,6 +78,7 @@ def get_client(resource, version='v1', connection=None):
'sequence': v1.sequence,
'snapshot': v1.snapshot,
'task': v1.task,
'tag': v1.tag,
'vip': v1.vip
}
}

260
fuelclient/commands/tag.py Normal file
View File

@ -0,0 +1,260 @@
# -*- coding: utf-8 -*-
#
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import os
from oslo_utils import fileutils
import six
from fuelclient.cli import error
from fuelclient.commands import base
from fuelclient.common import data_utils
class TagMixIn(object):
entity_name = 'tag'
supported_file_formats = ('json', 'yaml')
fields_mapper = (
('env', 'clusters'),
('release', 'releases')
)
def parse_model(self, args):
for param, tag_class in self.fields_mapper:
model_id = getattr(args, param)
if model_id:
return tag_class, model_id
@staticmethod
def get_file_path(directory, owner_type, owner_id, tag_name, file_format):
return os.path.join(os.path.abspath(directory),
'{owner}_{id}'.format(owner=owner_type,
id=owner_id),
'{}.{}'.format(tag_name, file_format))
@six.add_metaclass(abc.ABCMeta)
class BaseUploadCommand(TagMixIn, base.BaseCommand):
"""Base class for uploading metadata of a tag."""
@abc.abstractproperty
def action(self):
"""String with the name of the action."""
pass
@abc.abstractproperty
def uploader(self):
"""Callable for uploading data."""
pass
def get_parser(self, prog_name):
parser = super(BaseUploadCommand, self).get_parser(prog_name)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-r',
'--release',
type=int,
help='Id of the release')
group.add_argument('-e',
'--env',
type=int,
help='Id of the environment')
parser.add_argument('-n',
'--name',
required=True,
help='Name of tag.')
parser.add_argument('-f',
'--format',
required=True,
choices=self.supported_file_formats,
help='Format of serialized tag description.')
parser.add_argument('-d',
'--directory',
required=False,
default=os.path.curdir,
help='Source directory. Defaults to '
'the current directory.')
return parser
def take_action(self, parsed_args):
model, model_id = self.parse_model(parsed_args)
params = {"owner_type": model,
"owner_id": model_id,
"tag_name": parsed_args.name}
file_path = self.get_file_path(parsed_args.directory,
model,
model_id,
parsed_args.name,
parsed_args.format)
try:
with open(file_path, 'r') as stream:
data = data_utils.safe_load(parsed_args.format, stream)
self.uploader(data, **params)
except (OSError, IOError):
msg = "Could not read description for tag '{}' at {}".format(
parsed_args.name, file_path)
raise error.InvalidFileException(msg)
msg = ("Description of tag '{tag}' for {owner} with id {id} was "
"{action}d from {file_path}\n".format(tag=parsed_args.name,
owner=model,
id=model_id,
action=self.action,
file_path=file_path))
self.app.stdout.write(msg)
class TagList(TagMixIn, base.BaseListCommand):
"""Show list of all available tags for release or cluster."""
columns = ("name",
"group",
"conflicts",
"description")
def get_parser(self, prog_name):
parser = super(TagList, self).get_parser(prog_name)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-r',
'--release',
type=int,
help='Id of the release')
group.add_argument('-e',
'--env',
type=int,
help='Id of the environment')
return parser
def take_action(self, parsed_args):
model, model_id = self.parse_model(parsed_args)
data = self.client.get_all(model, model_id)
data = data_utils.get_display_data_multi(self.columns, data)
return self.columns, data
class TagDownload(TagMixIn, base.BaseCommand):
"""Download full tag description to file."""
def get_parser(self, prog_name):
parser = super(TagDownload, self).get_parser(prog_name)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-r',
'--release',
type=int,
help='Id of the release')
group.add_argument('-e',
'--env',
type=int,
help='Id of the environment')
parser.add_argument('-n',
'--name',
required=True,
help='Name of tag.')
parser.add_argument('-f',
'--format',
required=True,
choices=self.supported_file_formats,
help='Format of serialized tag description.')
parser.add_argument('-d',
'--directory',
required=False,
default=os.path.curdir,
help='Destination directory. Defaults to '
'the current directory.')
return parser
def take_action(self, parsed_args):
model, model_id = self.parse_model(parsed_args)
file_path = self.get_file_path(parsed_args.directory,
model,
model_id,
parsed_args.name,
parsed_args.format)
data = self.client.get_tag(model,
model_id,
parsed_args.name)
try:
fileutils.ensure_tree(os.path.dirname(file_path))
fileutils.delete_if_exists(file_path)
with open(file_path, 'w') as stream:
data_utils.safe_dump(parsed_args.format, stream, data)
except (OSError, IOError):
msg = ("Could not store description data "
"for tag {} at {}".format(parsed_args.name, file_path))
raise error.InvalidFileException(msg)
msg = ("Description data of tag '{}' within {} id {} "
"was stored in {}\n".format(parsed_args.name,
model,
model_id,
file_path))
self.app.stdout.write(msg)
class TagUpdate(BaseUploadCommand):
"""Update a tag from file description."""
action = "update"
@property
def uploader(self):
return self.client.update
class TagCreate(BaseUploadCommand):
"""Create a tag from file description"""
action = "create"
@property
def uploader(self):
return self.client.create
class TagDelete(TagMixIn, base.BaseCommand):
"""Delete a tag from release or cluster"""
def get_parser(self, prog_name):
parser = super(TagDelete, self).get_parser(prog_name)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-r',
'--release',
type=int,
help='Id of the release')
group.add_argument('-e',
'--env',
type=int,
help='Id of the environment')
parser.add_argument('-n',
'--name',
required=True,
help='Name of tag.')
return parser
def take_action(self, parsed_args):
model, model_id = self.parse_model(parsed_args)
self.client.delete(model,
model_id,
parsed_args.name)
msg = "Tag '{}' was deleted from {} with id {}\n".format(
parsed_args.name, model, model_id)
self.app.stdout.write(msg)

View File

@ -28,6 +28,7 @@ from fuelclient.objects.role import Role
from fuelclient.objects.task import DeployTask
from fuelclient.objects.task import SnapshotTask
from fuelclient.objects.task import Task
from fuelclient.objects.tag import Tag
from fuelclient.objects.fuelversion import FuelVersion
from fuelclient.objects.network_group import NetworkGroup
from fuelclient.objects.plugins import Plugins

56
fuelclient/objects/tag.py Normal file
View File

@ -0,0 +1,56 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuelclient.objects.base import BaseObject
class Tag(BaseObject):
instance_api_path = "{owner_type}/{owner_id}/tags/"
class_api_path = "{owner_type}/{owner_id}/tags/{tag_name}/"
def __init__(self, owner_type, owner_id, **kwargs):
super(Tag, self).__init__(owner_id, **kwargs)
self.owner_type = owner_type
def get_all(self):
return self.connection.get_request(
self.instance_api_path.format(owner_type=self.owner_type,
owner_id=self.id))
def get_tag(self, tag_name):
return self.connection.get_request(
self.class_api_path.format(owner_type=self.owner_type,
owner_id=self.id,
tag_name=tag_name))
def update_tag(self, tag_name, data):
return self.connection.put_request(
self.class_api_path.format(owner_type=self.owner_type,
owner_id=self.id,
tag_name=tag_name),
data)
def create_tag(self, data):
return self.connection.post_request(
self.instance_api_path.format(owner_type=self.owner_type,
owner_id=self.id),
data)
def delete_tag(self, tag_name):
return self.connection.delete_request(
self.class_api_path.format(owner_type=self.owner_type,
owner_id=self.id,
tag_name=tag_name))

View File

@ -0,0 +1,346 @@
# -*- coding: utf-8 -*-
#
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import yaml
from fuelclient.tests.unit.v2.cli import test_engine
from fuelclient.tests.utils import fake_tag
class TestTagCommand(test_engine.BaseCLITest):
"""Tests for fuel2 tag * commands."""
def test_tag_list_for_release(self):
self.m_client.get_all.return_value = [
{"tag": "fake_tag_1",
"has_primary": True,
"owner_id": 1,
"owner_type": 'release',
},
{"tag": "fake_tag_2",
"has_primary": True,
"owner_id": 1,
"owner_type": 'release',
},
]
release_id = 45
args = 'tag list -r {id}'.format(id=release_id)
self.exec_command(args)
self.m_client.get_all.assert_called_once_with('releases', release_id)
self.m_get_client.assert_called_once_with('tag', mock.ANY)
def test_tag_list_for_cluster(self):
self.m_client.get_all.return_value = [
{"tag": "fake_tag_1",
"has_primary": True,
"owner_id": 1,
"owner_type": 'release',
},
{"tag": "fake_tag_2",
"has_primary": True,
"owner_id": 1,
"owner_type": 'release',
},
]
env_id = 45
args = 'tag list -e {id}'.format(id=env_id)
self.exec_command(args)
self.m_client.get_all.assert_called_once_with('clusters', env_id)
self.m_get_client.assert_called_once_with('tag', mock.ANY)
@mock.patch('sys.stderr')
def test_tag_list_fail(self, mocked_stderr):
args = 'tag list'
self.assertRaises(SystemExit, self.exec_command, args)
self.assertIn('-r/--release -e/--env',
mocked_stderr.write.call_args_list[-1][0][0])
@mock.patch('sys.stderr')
def test_tag_list_with_mutually_exclusive_params(self, mocked_stderr):
args = 'tag list -e 1 -r 2'
self.assertRaises(SystemExit, self.exec_command, args)
self.assertIn('not allowed',
mocked_stderr.write.call_args_list[-1][0][0])
@mock.patch('json.dump')
def test_release_tag_download_json(self, m_dump):
release_id = 45
tag_name = 'fake_tag'
test_data = fake_tag.get_fake_tag(fake_tag)
args = 'tag download -r {} -n {} -f json -d /tmp'.format(release_id,
tag_name)
expected_path = '/tmp/releases_{id}/{name}.json'.format(id=release_id,
name=tag_name)
self.m_client.get_tag.return_value = test_data
m_open = mock.mock_open()
with mock.patch('fuelclient.commands.tag.open', m_open, create=True):
self.exec_command(args)
m_open.assert_called_once_with(expected_path, 'w')
m_dump.assert_called_once_with(test_data, mock.ANY, indent=4)
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.get_tag.assert_called_once_with('releases',
release_id,
tag_name)
@mock.patch('json.dump')
def test_cluster_tag_download_json(self, m_dump):
env_id = 45
tag_name = 'fake_tag'
test_data = fake_tag.get_fake_tag(fake_tag)
args = 'tag download -e {} -n {} -f json -d /tmp'.format(env_id,
tag_name)
expected_path = '/tmp/clusters_{id}/{name}.json'.format(id=env_id,
name=tag_name)
self.m_client.get_tag.return_value = test_data
m_open = mock.mock_open()
with mock.patch('fuelclient.commands.tag.open', m_open, create=True):
self.exec_command(args)
m_open.assert_called_once_with(expected_path, 'w')
m_dump.assert_called_once_with(test_data, mock.ANY, indent=4)
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.get_tag.assert_called_once_with('clusters',
env_id,
tag_name)
@mock.patch('yaml.safe_dump')
def test_release_tag_download_yaml(self, m_safe_dump):
release_id = 45
tag_name = 'fake_tag'
test_data = fake_tag.get_fake_tag(fake_tag)
args = 'tag download -r {} -n {} -f yaml -d /tmp'.format(release_id,
tag_name)
expected_path = '/tmp/releases_{id}/{name}.yaml'.format(id=release_id,
name=tag_name)
self.m_client.get_tag.return_value = test_data
m_open = mock.mock_open()
with mock.patch('fuelclient.commands.tag.open', m_open, create=True):
self.exec_command(args)
m_open.assert_called_once_with(expected_path, 'w')
m_safe_dump.assert_called_once_with(test_data, mock.ANY,
default_flow_style=False)
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.get_tag.assert_called_once_with('releases',
release_id,
tag_name)
@mock.patch('yaml.safe_dump')
def test_cluster_tag_download_yaml(self, m_safe_dump):
env_id = 45
tag_name = 'fake_tag'
test_data = fake_tag.get_fake_tag(fake_tag)
args = 'tag download -e {} -n {} -f yaml -d /tmp'.format(env_id,
tag_name)
expected_path = '/tmp/clusters_{id}/{name}.yaml'.format(id=env_id,
name=tag_name)
self.m_client.get_tag.return_value = test_data
m_open = mock.mock_open()
with mock.patch('fuelclient.commands.tag.open', m_open, create=True):
self.exec_command(args)
m_open.assert_called_once_with(expected_path, 'w')
m_safe_dump.assert_called_once_with(test_data, mock.ANY,
default_flow_style=False)
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.get_tag.assert_called_once_with('clusters',
env_id,
tag_name)
def test_release_tag_update_json(self):
release_id = 45
tag_name = 'fake_tag'
params = {"owner_type": "releases",
"owner_id": release_id,
"tag_name": tag_name}
args = 'tag update -r {} -n {} -f json -d /tmp'.format(release_id,
tag_name)
test_data = fake_tag.get_fake_tag(tag_name)
expected_path = '/tmp/releases_{}/fake_tag.json'.format(release_id)
m_open = mock.mock_open(read_data=json.dumps(test_data))
with mock.patch('fuelclient.commands.tag.open', m_open, create=True):
self.exec_command(args)
m_open.assert_called_once_with(expected_path, 'r')
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.update.assert_called_once_with(test_data, **params)
def test_cluster_tag_update_json(self):
env_id = 45
tag_name = 'fake_tag'
params = {"owner_type": "clusters",
"owner_id": env_id,
"tag_name": tag_name}
args = 'tag update -e {} -n {} -f json -d /tmp'.format(env_id,
tag_name)
test_data = fake_tag.get_fake_tag(tag_name)
expected_path = '/tmp/clusters_{}/fake_tag.json'.format(env_id)
m_open = mock.mock_open(read_data=json.dumps(test_data))
with mock.patch('fuelclient.commands.tag.open', m_open, create=True):
self.exec_command(args)
m_open.assert_called_once_with(expected_path, 'r')
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.update.assert_called_once_with(test_data, **params)
def test_release_tag_update_yaml(self):
release_id = 45
tag_name = 'fake_tag'
params = {"owner_type": "releases",
"owner_id": release_id,
"tag_name": tag_name}
args = 'tag update -r {} -n {} -f yaml -d /tmp'.format(release_id,
tag_name)
test_data = fake_tag.get_fake_tag(tag_name)
expected_path = '/tmp/releases_{}/fake_tag.yaml'.format(release_id)
m_open = mock.mock_open(read_data=yaml.safe_dump(test_data))
with mock.patch('fuelclient.commands.tag.open', m_open, create=True):
self.exec_command(args)
m_open.assert_called_once_with(expected_path, 'r')
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.update.assert_called_once_with(test_data, **params)
def test_cluster_tag_update_yaml(self):
env_id = 45
tag_name = 'fake_tag'
params = {"owner_type": "clusters",
"owner_id": env_id,
"tag_name": tag_name}
args = 'tag update -e {} -n {} -f yaml -d /tmp'.format(env_id,
tag_name)
test_data = fake_tag.get_fake_tag(tag_name)
expected_path = '/tmp/clusters_{}/fake_tag.yaml'.format(env_id)
m_open = mock.mock_open(read_data=yaml.safe_dump(test_data))
with mock.patch('fuelclient.commands.tag.open', m_open, create=True):
self.exec_command(args)
m_open.assert_called_once_with(expected_path, 'r')
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.update.assert_called_once_with(test_data, **params)
def test_release_tag_create_json(self):
release_id = 45
tag_name = 'fake_tag'
params = {"owner_type": "releases",
"owner_id": release_id,
"tag_name": tag_name}
args = 'tag create -r {} -n {} -f json -d /tmp'.format(release_id,
tag_name)
test_data = fake_tag.get_fake_tag(tag_name)
expected_path = '/tmp/releases_{}/fake_tag.json'.format(release_id)
m_open = mock.mock_open(read_data=json.dumps(test_data))
with mock.patch('fuelclient.commands.tag.open', m_open, create=True):
self.exec_command(args)
m_open.assert_called_once_with(expected_path, 'r')
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.create.assert_called_once_with(test_data, **params)
def test_cluster_tag_create_json(self):
env_id = 45
tag_name = 'fake_tag'
params = {"owner_type": "clusters",
"owner_id": env_id,
"tag_name": tag_name}
args = 'tag create -e {} -n {} -f json -d /tmp'.format(env_id,
tag_name)
test_data = fake_tag.get_fake_tag(tag_name)
expected_path = '/tmp/clusters_{}/fake_tag.json'.format(env_id)
m_open = mock.mock_open(read_data=json.dumps(test_data))
with mock.patch('fuelclient.commands.tag.open', m_open, create=True):
self.exec_command(args)
m_open.assert_called_once_with(expected_path, 'r')
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.create.assert_called_once_with(test_data, **params)
def test_release_tag_create_yaml(self):
release_id = 45
tag_name = 'fake_tag'
params = {"owner_type": "releases",
"owner_id": release_id,
"tag_name": tag_name}
args = 'tag create -r {} -n {} -f yaml -d /tmp'.format(release_id,
tag_name)
test_data = fake_tag.get_fake_tag(tag_name)
expected_path = '/tmp/releases_{}/fake_tag.yaml'.format(release_id)
m_open = mock.mock_open(read_data=yaml.safe_dump(test_data))
with mock.patch('fuelclient.commands.tag.open', m_open, create=True):
self.exec_command(args)
m_open.assert_called_once_with(expected_path, 'r')
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.create.assert_called_once_with(test_data, **params)
def test_cluster_tag_create_yaml(self):
env_id = 45
tag_name = 'fake_tag'
params = {"owner_type": "clusters",
"owner_id": env_id,
"tag_name": tag_name}
args = 'tag create -e {} -n {} -f yaml -d /tmp'.format(env_id,
tag_name)
test_data = fake_tag.get_fake_tag(tag_name)
expected_path = '/tmp/clusters_{}/fake_tag.yaml'.format(env_id)
m_open = mock.mock_open(read_data=yaml.safe_dump(test_data))
with mock.patch('fuelclient.commands.tag.open', m_open, create=True):
self.exec_command(args)
m_open.assert_called_once_with(expected_path, 'r')
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.create.assert_called_once_with(test_data, **params)
def test_release_tag_delete(self):
release_id = 45
tag_name = 'fake_tag'
args = 'tag delete -r {} -n {}'.format(release_id, tag_name)
self.exec_command(args)
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.delete.assert_called_once_with('releases',
release_id,
tag_name)
def test_cluster_tag_delete(self):
env_id = 45
tag_name = 'fake_tag'
args = 'tag delete -e {} -n {}'.format(env_id, tag_name)
self.exec_command(args)
self.m_get_client.assert_called_once_with('tag', mock.ANY)
self.m_client.delete.assert_called_once_with('clusters',
env_id,
tag_name)

View File

@ -0,0 +1,160 @@
# -*- coding: utf-8 -*-
#
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fuelclient
from fuelclient.tests.unit.v2.lib import test_api
from fuelclient.tests import utils
class TestTagFacade(test_api.BaseLibTest):
def setUp(self):
super(TestTagFacade, self).setUp()
self.version = 'v1'
self.tag_name = 'fake_tag'
self.fake_tag = utils.get_fake_tag(self.tag_name)
self.fake_tags = utils.get_fake_tags(10)
self.client = fuelclient.get_client('tag', self.version)
def get_uri(self, owner):
return '/api/{version}/{owner}/'.format(version=self.version,
owner=owner)
def test_release_tag_list(self):
owner, owner_id = 'releases', 42
expected_uri = self.get_object_uri(self.get_uri(owner),
owner_id,
'/tags/')
matcher = self.m_request.get(expected_uri, json=self.fake_tags)
self.client.get_all(owner, owner_id)
self.assertTrue(matcher.called)
def test_cluster_tag_list(self):
owner, owner_id = 'clusters', 42
expected_uri = self.get_object_uri(self.get_uri(owner),
owner_id,
'/tags/')
matcher = self.m_request.get(expected_uri, json=self.fake_tags)
self.client.get_all(owner_type=owner, owner_id=owner_id)
self.assertTrue(matcher.called)
def test_release_tag_download(self):
owner, owner_id = 'releases', 45
expected_uri = self.get_object_uri(self.get_uri(owner),
owner_id,
'/tags/{}/'.format(self.tag_name))
tag_matcher = self.m_request.get(expected_uri, json=self.fake_tag)
tag = self.client.get_tag(owner, owner_id, self.tag_name)
self.assertTrue(expected_uri, tag_matcher.called)
self.assertEqual(tag, self.fake_tag)
def test_cluster_tag_download(self):
owner, owner_id = 'clusters', 45
expected_uri = self.get_object_uri(self.get_uri(owner),
owner_id,
'/tags/{}/'.format(self.tag_name))
tag_matcher = self.m_request.get(expected_uri, json=self.fake_tag)
tag = self.client.get_tag(owner, owner_id, self.tag_name)
self.assertTrue(expected_uri, tag_matcher.called)
self.assertEqual(tag, self.fake_tag)
def test_release_tag_update(self):
owner, owner_id = 'releases', 45
params = {"owner_type": owner,
"owner_id": owner_id,
"tag_name": self.tag_name}
expected_uri = self.get_object_uri(self.get_uri(owner),
owner_id,
'/tags/{}/'.format(self.tag_name))
upd_matcher = self.m_request.put(expected_uri, json=self.fake_tag)
self.client.update(self.fake_tag, **params)
self.assertTrue(upd_matcher.called)
self.assertEqual(self.fake_tag, upd_matcher.last_request.json())
def test_cluster_tag_update(self):
owner, owner_id = 'clusters', 45
params = {"owner_type": owner,
"owner_id": owner_id,
"tag_name": self.tag_name}
expected_uri = self.get_object_uri(self.get_uri(owner),
owner_id,
'/tags/{}/'.format(self.tag_name))
upd_matcher = self.m_request.put(expected_uri, json=self.fake_tag)
self.client.update(self.fake_tag, **params)
self.assertTrue(upd_matcher.called)
self.assertEqual(self.fake_tag, upd_matcher.last_request.json())
def test_release_tag_create(self):
owner, owner_id = 'releases', 45
params = {"owner_type": owner,
"owner_id": owner_id}
expected_uri = self.get_object_uri(self.get_uri(owner),
owner_id,
'/tags/')
post_matcher = self.m_request.post(expected_uri, json=self.fake_tag)
self.client.create(self.fake_tag, **params)
self.assertTrue(post_matcher.called)
self.assertEqual(self.fake_tag, post_matcher.last_request.json())
def test_cluster_tag_create(self):
owner, owner_id = 'clusters', 45
params = {"owner_type": owner,
"owner_id": owner_id}
expected_uri = self.get_object_uri(self.get_uri(owner),
owner_id,
'/tags/')
post_matcher = self.m_request.post(expected_uri, json=self.fake_tag)
self.client.create(self.fake_tag, **params)
self.assertTrue(post_matcher.called)
self.assertEqual(self.fake_tag, post_matcher.last_request.json())
def test_release_tag_delete(self):
owner, owner_id = 'releases', 45
expected_uri = self.get_object_uri(self.get_uri(owner),
owner_id,
'/tags/{}/'.format(self.tag_name))
delete_matcher = self.m_request.delete(expected_uri, json={})
self.client.delete(owner, owner_id, self.tag_name)
self.assertTrue(delete_matcher.called)
def test_cluster_tag_delete(self):
owner, owner_id = 'clusters', 45
expected_uri = self.get_object_uri(self.get_uri(owner),
owner_id,
'/tags/{}/'.format(self.tag_name))
delete_matcher = self.m_request.delete(expected_uri, json={})
self.client.delete(owner, owner_id, self.tag_name)
self.assertTrue(delete_matcher.called)

View File

@ -54,6 +54,8 @@ from fuelclient.tests.utils.fake_release import get_fake_release_component
from fuelclient.tests.utils.fake_release import get_fake_release_components
from fuelclient.tests.utils.fake_role import get_fake_role
from fuelclient.tests.utils.fake_role import get_fake_roles
from fuelclient.tests.utils.fake_tag import get_fake_tag
from fuelclient.tests.utils.fake_tag import get_fake_tags
__all__ = (get_fake_deployment_history,
@ -85,4 +87,6 @@ __all__ = (get_fake_deployment_history,
get_fake_node_groups,
get_fake_openstack_config,
get_fake_plugin,
get_fake_plugins)
get_fake_plugins,
get_fake_tag,
get_fake_tags)

View File

@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
#
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def get_fake_tag(tag_name=None, has_primary=False, owner_id=None,
owner_type=None):
"""Create a random fake tag
Returns the serialized and parametrized representation of a dumped Fuel
tag. Represents the average amount of data.
"""
return {
"id": 1,
"tag": tag_name or "controller",
"has_primary": has_primary,
"owner_id": owner_id or 1,
"owner_type": owner_type or 'release'
}
def get_fake_tags(tag_count, **kwargs):
"""Create a random fake list of tags."""
return [get_fake_tag(**kwargs) for _ in range(tag_count)]

View File

@ -30,6 +30,7 @@ from fuelclient.v1 import plugins
from fuelclient.v1 import sequence
from fuelclient.v1 import snapshot
from fuelclient.v1 import task
from fuelclient.v1 import tag
from fuelclient.v1 import vip
# Please keeps the list in alphabetical order
@ -51,4 +52,5 @@ __all__ = ('cluster_settings',
'sequence',
'snapshot',
'task',
'tag',
'vip')

57
fuelclient/v1/tag.py Normal file
View File

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
#
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuelclient import objects
from fuelclient.v1 import base_v1
class TagClient(base_v1.BaseV1Client):
_entity_wrapper = objects.Tag
def get_all(self, owner_type, owner_id):
"""Get all available tags for specific release or cluster.
:param owner_type: release or cluster
:type owner_id: int
:return: tags data as a list of dict
:rtype: list
"""
tags = self._entity_wrapper(owner_type, owner_id).get_all()
return tags
def get_tag(self, owner_type, owner_id, tag_name=''):
tag = self._entity_wrapper(owner_type, owner_id)
return tag.get_tag(tag_name)
def update(self, data, **kwargs):
tag = self._entity_wrapper(owner_type=kwargs['owner_type'],
owner_id=kwargs['owner_id'])
return tag.update_tag(kwargs['tag_name'], data)
def create(self, data, **kwargs):
tag = self._entity_wrapper(owner_type=kwargs['owner_type'],
owner_id=kwargs['owner_id'])
return tag.create_tag(data)
def delete(self, owner_type, owner_id, tag_name):
tag = self._entity_wrapper(owner_type=owner_type, owner_id=owner_id)
return tag.delete_tag(tag_name)
def get_client(connection):
return TagClient(connection)

View File

@ -114,6 +114,12 @@ fuelclient =
role_download=fuelclient.commands.role:RoleDownload
role_list=fuelclient.commands.role:RoleList
role_update=fuelclient.commands.role:RoleUpdate
tag_create=fuelclient.commands.tag:TagCreate
tag_delete=fuelclient.commands.tag:TagDelete
tag_download=fuelclient.commands.tag:TagDownload
tag_list=fuelclient.commands.tag:TagList
tag_show=fuelclient.commands.tag:TagShow
tag_update=fuelclient.commands.tag:TagUpdate
task_delete=fuelclient.commands.task:TaskDelete
task_deployment-info_download=fuelclient.commands.task:TaskDeploymentInfoDownload
task_history_show=fuelclient.commands.task:TaskHistoryShow