murano-dashboard/muranodashboard/dynamic_ui/fields.py
zhurong d76bbb9949 Revert ThemableSelectWidget for themable selects
Now horizon ThemableSelectWidget field can not display
the validation error, just make the field keep the old
can temporary fix this. Once horizon fixed,
we can revert it back.

This reverts commit 85fc5e002c.
Closes-Bug: #1711763

Change-Id: I557d38f739786597a9472c8104e1edf531e9a007
2017-08-25 10:30:42 +08:00

747 lines
27 KiB
Python

# Copyright (c) 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ast
import copy
import json
import re
from django.core.urlresolvers import reverse
from django.core import validators as django_validator
from django import forms
from django.forms import widgets
from django.template import defaultfilters
from django.utils.encoding import force_text
from django.utils import html
from django.utils.translation import ugettext_lazy as _
from horizon import exceptions
from horizon import forms as hz_forms
from horizon import messages
from openstack_dashboard.api import cinder
from openstack_dashboard.api import glance
from openstack_dashboard.api import neutron
from openstack_dashboard.api import nova
from oslo_log import log as logging
from oslo_log import versionutils
import six
from yaql import legacy
from muranodashboard.api import packages as pkg_api
from muranodashboard.common import net
from muranodashboard.environments import api as env_api
LOG = logging.getLogger(__name__)
def with_request(func):
"""Injects request into func
The decorator is meant to be used together with `UpdatableFieldsForm':
apply it to the `update' method of fields inside that form.
"""
def update(self, initial, request=None, **kwargs):
initial_request = initial.get('request')
for key, value in six.iteritems(initial):
if key != 'request' and key not in kwargs:
kwargs[key] = value
if initial_request:
LOG.debug("Using 'request' value from initial dictionary")
func(self, initial_request, **kwargs)
elif request:
LOG.debug("Using direct 'request' value")
func(self, request, **kwargs)
else:
LOG.error("No 'request' value passed neither via initial "
"dictionary, nor directly")
raise forms.ValidationError("Can't get a request information")
return update
def make_yaql_validator(validator_property):
"""Field-level validator uses field's value as its '$' root object."""
expr = validator_property['expr'].spec
message = validator_property.get('message', '')
def validator_func(value):
context = legacy.create_context()
context['$'] = value
if not expr.evaluate(context=context):
raise forms.ValidationError(message)
return validator_func
def get_regex_validator(expr):
try:
validator = expr['validators'][0]
if isinstance(validator, django_validator.RegexValidator):
return validator
except (TypeError, KeyError, IndexError):
pass
return None
# This function is needed if we don't want to change existing services
# regexpValidators
def wrap_regex_validator(validator, message):
def _validator(value):
try:
validator(value)
except forms.ValidationError:
# provide our own message
raise forms.ValidationError(message)
return _validator
def get_murano_images(request):
images = []
try:
# https://bugs.launchpad.net/murano/+bug/1339261 - glance
# client version change alters the API. Other tuple values
# are _more and _prev (in recent glance client)
images = glance.image_list_detailed(request)[0]
except Exception:
LOG.error("Error to request image list from glance ")
exceptions.handle(request, _("Unable to retrieve public images."))
murano_images = []
# filter out the snapshot image type
images = filter(
lambda x: x.properties.get("image_type", '') != 'snapshot', images)
for image in images:
# Additional properties, whose value is always a string data type, are
# only included in the response if they have a value.
murano_property = getattr(image, 'murano_image_info', None)
if murano_property:
try:
murano_metadata = json.loads(murano_property)
except ValueError:
LOG.warning("JSON in image metadata is not valid. "
"Check it in glance.")
messages.error(request, _("Invalid murano image metadata"))
else:
image.murano_property = murano_metadata
murano_images.append(image)
return murano_images
class RawProperty(object):
def __init__(self, key, spec):
self.key = key
self.spec = spec
self.value = None
self.value_evaluated = False
def finalize(self, form_name, service, cls):
def _get(field):
if self.value_evaluated:
return self.value
return service.get_data(form_name, self.spec)
def _set(field, value):
self.value = value
self.value_evaluated = value is not None
if hasattr(cls, self.key):
getattr(cls, self.key).fset(field, value)
def _del(field):
_set(field, None)
return property(_get, _set, _del)
FIELD_ARGS_TO_ESCAPE = ['help_text', 'initial', 'description', 'label']
class CustomPropertiesField(forms.Field):
js_validation = False
def __init__(self, description=None, description_title=None,
*args, **kwargs):
self.description = description
self.description_title = (description_title or
force_text(kwargs.get('label', '')))
for arg in FIELD_ARGS_TO_ESCAPE:
if kwargs.get(arg):
kwargs[arg] = html.escape(force_text(kwargs[arg]))
validators = []
validators_js = []
for validator in kwargs.get('validators', []):
if hasattr(validator, '__call__'): # single regexpValidator
validators.append(validator)
if hasattr(validator, 'regex'):
regex_message = ''
error_messages = kwargs.get('error_messages', {})
if hasattr(validator, 'code') and \
validator.code in error_messages:
regex_message = force_text(
error_messages[validator.code]
)
validators_js. \
append({'regex': force_text(validator.regex.pattern),
'message': regex_message})
else: # mixed list of regexpValidator and YAQL validators
expr = validator.get('expr')
regex_validator = get_regex_validator(expr)
regex_message = validator.get('message', '')
if regex_validator:
validators.append(wrap_regex_validator(
regex_validator, regex_message))
elif isinstance(expr, RawProperty):
validators.append(validator)
if hasattr(regex_validator, 'regex'):
validators_js.\
append({'regex': regex_validator.regex.pattern,
'message': regex_message})
kwargs['validators'] = validators
if validators_js:
self.js_validation = json.dumps(validators_js)
super(CustomPropertiesField, self).__init__(*args, **kwargs)
def widget_attrs(self, widget):
attrs = super(CustomPropertiesField, self).widget_attrs(widget)
if self.js_validation:
attrs['data-validators'] = self.js_validation
return attrs
def clean(self, value):
"""Skip all validators if field is disabled."""
# form is assigned in ServiceConfigurationForm.finalize_fields()
form = self.form
# the only place to ensure that Service object has up-to-date
# cleaned_data
form.service.update_cleaned_data(form.cleaned_data, form=form)
if getattr(self, 'enabled', True):
return super(CustomPropertiesField, self).clean(value)
else:
return super(CustomPropertiesField, self).to_python(value)
@classmethod
def finalize_properties(cls, kwargs, form_name, service):
props = {}
kwargs_ = copy.copy(kwargs)
for key, value in kwargs_.items():
if isinstance(value, RawProperty):
props[key] = value.finalize(form_name, service, cls)
del kwargs[key]
if props:
return type(cls.__name__, (cls,), props)
else:
return cls
class CharField(forms.CharField, CustomPropertiesField):
pass
class PasswordField(CharField):
special_characters = '!@#$%^&*()_+|\/.,~?><:{}-'
password_re = re.compile('^.*(?=.*\d)(?=.*[a-z])(?=.*[A-Z])(?=.*[%s]).*$'
% special_characters)
has_clone = False
original = True
attrs = {'data-type': 'password'}
validate_password = django_validator.RegexValidator(
password_re, _('The password must contain at least one letter, one \
number and one special character'), 'invalid')
@staticmethod
def get_clone_name(name):
return name + '-clone'
def compare(self, name, form_data):
if self.original and self.required:
# run compare only for original fields
# do not run compare for hidden fields (they are not required)
if form_data.get(name) != form_data.get(self.get_clone_name(name)):
raise forms.ValidationError(_(u"{0}{1} don't match").format(
self.label, defaultfilters.pluralize(2)))
def __init__(self, label, *args, **kwargs):
self.confirm_input = kwargs.pop('confirm_input', True)
kwargs.update({'label': label,
'error_messages': kwargs.get('error_messages', {}),
'widget': forms.PasswordInput(attrs=self.attrs,
render_value=True)})
validators = kwargs.get('validators')
help_text = kwargs.get('help_text')
if not validators:
# No custom validators, using default validator
validators = [self.validate_password]
if not help_text:
help_text = _(
'Enter a complex password with at least one letter, '
'one number and one special character')
kwargs['error_messages'].setdefault(
'invalid', self.validate_password.message)
kwargs['min_length'] = kwargs.get('min_length', 7)
kwargs['max_length'] = kwargs.get('max_length', 255)
kwargs['widget'] = forms.PasswordInput(attrs=self.attrs,
render_value=True)
else:
if not help_text:
# NOTE(kzaitsev) There are custom validators for password,
# but no help text let's leave only a generic message,
# since we do not know exact constraints
help_text = _('Enter a password')
kwargs.update({'validators': validators,
'help_text': help_text})
super(PasswordField, self).__init__(*args, **kwargs)
def __deepcopy__(self, memo):
result = super(PasswordField, self).__deepcopy__(memo)
result.error_messages = copy.deepcopy(self.error_messages)
return result
def clone_field(self):
self.has_clone = True
field = copy.deepcopy(self)
field.original = False
field.label = _('Confirm password')
field.error_messages['required'] = _('Please confirm your password')
field.help_text = _('Retype your password')
return field
class IntegerField(forms.IntegerField, CustomPropertiesField):
pass
def _get_title(data):
if isinstance(data, Choice):
return data.title
return data
def _disable_non_ready(data):
if getattr(data, 'enabled', True):
return {}
else:
return {'disabled': 'disabled'}
class ChoiceField(forms.ChoiceField, CustomPropertiesField):
def __init__(self, **kwargs):
choices = kwargs.get('choices') or getattr(self, 'choices', None)
if choices:
if isinstance(choices, dict):
choices = list(choices.items())
kwargs['choices'] = choices
super(ChoiceField, self).__init__(**kwargs)
class DynamicChoiceField(hz_forms.DynamicChoiceField, CustomPropertiesField):
pass
class FlavorChoiceField(ChoiceField):
def __init__(self, *args, **kwargs):
if 'requirements' in kwargs:
self.requirements = kwargs.pop('requirements')
super(FlavorChoiceField, self).__init__(*args, **kwargs)
@with_request
def update(self, request, **kwargs):
choices = []
flavors = nova.novaclient(request).flavors.list()
# If no requirements are present, return all the flavors.
if not hasattr(self, 'requirements'):
choices = [(flavor.name, flavor.name) for flavor in flavors]
else:
for flavor in flavors:
# If a flavor doesn't meet a minimum requirement,
# do not add it to the options list and skip to the
# next flavor.
if flavor.vcpus < self.requirements.get('min_vcpus', 0):
continue
if flavor.disk < self.requirements.get('min_disk', 0):
continue
if flavor.ram < self.requirements.get('min_memory_mb', 0):
continue
if 'max_vcpus' in self.requirements:
if flavor.vcpus > self.requirements['max_vcpus']:
continue
if 'max_disk' in self.requirements:
if flavor.disk > self.requirements['max_disk']:
continue
if 'max_memory_mb' in self.requirements:
if flavor.ram > self.requirements['max_memory_mb']:
continue
choices.append((flavor.name, flavor.name))
choices.sort(key=lambda e: e[1])
self.choices = choices
if kwargs.get('form'):
kwargs_form_flavor = kwargs["form"].fields.get('flavor')
else:
kwargs_form_flavor = None
if kwargs_form_flavor:
self.initial = kwargs["form"]["flavor"].value()
else:
# Search through selected flavors
for flavor_name, flavor_name in self.choices:
if 'medium' in flavor_name:
self.initial = flavor_name
break
class KeyPairChoiceField(DynamicChoiceField):
"""This widget allows to select keypair for VMs"""
@with_request
def update(self, request, **kwargs):
self.choices = [('', _('No keypair'))]
for keypair in sorted(
nova.novaclient(request).keypairs.list(),
key=lambda e: e.name):
self.choices.append((keypair.name, keypair.name))
class SecurityGroupChoiceField(DynamicChoiceField):
"""This widget allows to select a security group for VMs"""
@with_request
def update(self, request, **kwargs):
self.choices = [('', _('Application default security group'))]
# TODO(pbourke): remove sorted when supported natively in Horizon
# (https://bugs.launchpad.net/horizon/+bug/1692972)
for secgroup in sorted(
neutron.security_group_list(request),
key=lambda e: e.name_or_id):
if not secgroup.name_or_id.startswith('murano--'):
self.choices.append((secgroup.name_or_id, secgroup.name_or_id))
# NOTE(kzaitsev): for transform to work correctly on horizon SelectWidget
# Choice has to be non-string
class Choice(object):
"""A choice that allows disabling specific choices in a SelectWidget."""
def __init__(self, title, enabled):
self.title = title
self.enabled = enabled
class ImageChoiceField(ChoiceField):
widget = hz_forms.SelectWidget(transform=_get_title,
transform_html_attrs=_disable_non_ready)
def __init__(self, *args, **kwargs):
self.image_type = kwargs.pop('image_type', None)
super(ImageChoiceField, self).__init__(*args, **kwargs)
@with_request
def update(self, request, **kwargs):
image_map, image_choices = {}, []
murano_images = get_murano_images(request)
for image in murano_images:
murano_data = image.murano_property
title = murano_data.get('title', image.name)
if image.status == 'active':
title = Choice(title, enabled=True)
else:
title = Choice("{} ({})".format(title, image.status),
enabled=False)
if self.image_type is not None:
itype = murano_data.get('type')
if not self.image_type and itype is None:
continue
prefix = '{type}.'.format(type=self.image_type)
if (not itype.startswith(prefix) and
not self.image_type == itype):
continue
image_map[image.id] = title
for id_, title in sorted(six.iteritems(image_map),
key=lambda e: e[1].title):
image_choices.append((id_, title))
if image_choices:
image_choices.insert(0, ("", _("Select Image")))
else:
image_choices.insert(0, ("", _("No images available")))
self.choices = image_choices
class NetworkChoiceField(ChoiceField):
def __init__(self,
filter=None,
murano_networks=None,
allow_auto=True,
*args,
**kwargs):
self.filter = filter
if murano_networks:
if murano_networks.lower() not in ["exclude", "translate"]:
raise ValueError(_("Invalid value of 'murano_nets' option"))
self.murano_networks = murano_networks
self.allow_auto = allow_auto
super(NetworkChoiceField, self).__init__(*args,
**kwargs)
@with_request
def update(self, request, **kwargs):
"""Populates available networks in the control
This method is called automatically when the form which contains it is
rendered
"""
network_choices = net.get_available_networks(request,
self.filter,
self.murano_networks)
if self.allow_auto:
network_choices.insert(0, ((None, None), _('Auto')))
self.choices = network_choices or []
def to_python(self, value):
"""Converts string representation of widget to tuple value
Is called implicitly during form cleanup phase
"""
if value:
return ast.literal_eval(value)
else: # may happen if no networks are available and "Auto" is disabled
return None, None
class AZoneChoiceField(ChoiceField):
@with_request
def update(self, request, **kwargs):
try:
availability_zones = nova.novaclient(
request).availability_zones.list(detailed=False)
except Exception:
availability_zones = []
exceptions.handle(request,
_("Unable to retrieve availability zones."))
az_choices = [(az.zoneName, az.zoneName)
for az in availability_zones if az.zoneState]
if not az_choices:
az_choices.insert(0, ("", _("No availability zones available")))
az_choices.sort(key=lambda e: e[1])
self.choices = az_choices
class VolumeChoiceField(ChoiceField):
def __init__(self,
include_snapshots=True,
*args,
**kwargs):
self.include_snapshots = include_snapshots
super(VolumeChoiceField, self).__init__(*args, **kwargs)
@with_request
def update(self, request, **kwargs):
"""This widget allows selection of Volumes and Volume Snapshots"""
available = {'status': cinder.VOLUME_STATE_AVAILABLE}
try:
choices = [(volume.id, volume.name)
for volume in cinder.volume_list(request,
search_opts=available)]
except Exception:
choices = []
exceptions.handle(request,
_("Unable to retrieve volume list."))
if self.include_snapshots:
try:
choices.extend((snap.id, snap.name)
for snap in cinder.volume_snapshot_list(request,
search_opts=available))
except Exception:
exceptions.handle(request,
_("Unable to retrieve snapshot list."))
if choices:
choices.sort(key=lambda e: e[1])
choices.insert(0, ("", _("Select volume")))
else:
choices.insert(0, ("", _("No volumes available")))
self.choices = choices
class BooleanField(forms.BooleanField, CustomPropertiesField):
def __init__(self, *args, **kwargs):
if 'widget' in kwargs:
widget = kwargs['widget']
if isinstance(widget, type):
widget = widget(attrs={'class': 'checkbox'})
else:
widget = forms.CheckboxInput(attrs={'class': 'checkbox'})
kwargs['widget'] = widget
kwargs['required'] = False
super(BooleanField, self).__init__(*args, **kwargs)
@versionutils.deprecated(
as_of=versionutils.deprecated.JUNO,
in_favor_of='type boolean (regular BooleanField)',
remove_in=1)
class FloatingIpBooleanField(BooleanField):
pass
class ClusterIPField(forms.GenericIPAddressField, CustomPropertiesField):
def __init__(self, *args, **kwargs):
super(ClusterIPField, self).__init__(protocol='ipv4', *args, **kwargs)
class DatabaseListField(CharField):
validate_mssql_identifier = django_validator.RegexValidator(
re.compile(r'^[a-zA-z_][a-zA-Z0-9_$#@]*$'),
_(u'First symbol should be latin letter or underscore. Subsequent '
u'symbols can be latin letter, numeric, underscore, at sign, '
u'number sign or dollar sign'))
default_error_messages = {'invalid': validate_mssql_identifier.message}
def to_python(self, value):
"""Normalize data to a list of strings."""
if not value:
return []
return [name.strip() for name in value.split(',')]
def validate(self, value):
"""Check if value consists only of valid names."""
super(DatabaseListField, self).validate(value)
for db_name in value:
self.validate_mssql_identifier(db_name)
class ErrorWidget(widgets.Widget):
def __init__(self, *args, **kwargs):
self.message = kwargs.pop(
'message', _("There was an error initialising this field."))
super(ErrorWidget, self).__init__(*args, **kwargs)
def render(self, name, value, attrs=None):
return "<div name={name}>{message}</div>".format(
name=name, message=self.message)
class MuranoTypeWidget(hz_forms.fields.DynamicSelectWidget):
def __init__(self, attrs=None, **kwargs):
if attrs is None:
attrs = {'class': 'murano_add_select'}
else:
attrs.setdefault('class', '')
attrs['class'] += ' murano_add_select'
super(MuranoTypeWidget, self).__init__(attrs=attrs, **kwargs)
class Media(object):
js = ('muranodashboard/js/add-select.js',)
def make_select_cls(fqns):
if not isinstance(fqns, (tuple, list)):
fqns = (fqns,)
class DynamicSelect(hz_forms.DynamicChoiceField, CustomPropertiesField):
widget = MuranoTypeWidget
def __init__(self, empty_value_message=None, *args, **kwargs):
super(DynamicSelect, self).__init__(*args, **kwargs)
if empty_value_message is not None:
self.empty_value_message = empty_value_message
else:
self.empty_value_message = _('Select Application')
@with_request
def update(self, request, environment_id, **kwargs):
matching_classes = []
fqns_seen = set()
# NOTE(kzaitsev): it's possible to have a private
# and public apps with the same fqn, however the engine would
# currently favor private package. Therefore we should squash
# these until we devise a better way to work with this
# situation and versioning
for class_fqn in fqns:
app_found = pkg_api.app_by_fqn(request, class_fqn)
if app_found:
fqns_seen.add(app_found.fully_qualified_name)
matching_classes.append(app_found)
apps_found = pkg_api.apps_that_inherit(request, class_fqn)
for app in apps_found:
if app.fully_qualified_name in fqns_seen:
continue
fqns_seen.add(app.fully_qualified_name)
matching_classes.append(app)
if not matching_classes:
msg = _(
"Couldn't find any apps, required for this field.\n"
"Tried: {fqns}").format(fqns=', '.join(fqns))
self.widget = ErrorWidget(message=msg)
# NOTE(kzaitsev): this closure is needed to allow us have custom
# logic when clicking add button
def _make_link():
ns_url = 'horizon:app-catalog:catalog:add'
ns_url_args = (environment_id, False, True)
# This will prevent horizon from adding an extra '+' button
if not matching_classes:
return ''
return json.dumps([
(app.name, reverse(ns_url, args=((app.id,) + ns_url_args)))
for app in matching_classes])
self.widget.add_item_link = _make_link
apps = env_api.service_list_by_fqns(
request, environment_id,
[app.fully_qualified_name for app in matching_classes])
choices = [('', self.empty_value_message)]
choices.extend([(app['?']['id'],
html.escape(app.name)) for app in apps])
self.choices = choices
# NOTE(tsufiev): streamline the drop-down UX: auto-select the
# single available option in a drop-down
if len(choices) == 2:
self.initial = choices[1][0]
def clean(self, value):
value = super(DynamicSelect, self).clean(value)
return None if value == '' else value
return DynamicSelect
@versionutils.deprecated(
as_of=versionutils.deprecated.JUNO,
in_favor_of='type io.murano.windows.ActiveDirectory with a custom '
'emptyValueMessage attribute',
remove_in=1)
class DomainChoiceField(make_select_cls('io.murano.windows.ActiveDirectory')):
def __init__(self, *args, **kwargs):
super(DomainChoiceField, self).__init__(*args, **kwargs)
self.choices = [('', _('Not in domain'))]