Remove old resource/signals/observers stuff
This commit is contained in:
@@ -1,222 +0,0 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from solar.core.log import log
|
||||
from solar.core import signals
|
||||
from solar.interfaces.db import get_db
|
||||
|
||||
db = get_db()
|
||||
|
||||
|
||||
class BaseObserver(object):
|
||||
type_ = None
|
||||
|
||||
def __init__(self, attached_to, name, value):
|
||||
"""
|
||||
:param attached_to: resource.Resource
|
||||
:param name:
|
||||
:param value:
|
||||
:return:
|
||||
"""
|
||||
self._attached_to_name = attached_to.name
|
||||
self.name = name
|
||||
self.value = value
|
||||
|
||||
@property
|
||||
def attached_to(self):
|
||||
from solar.core import resource
|
||||
|
||||
return resource.load(self._attached_to_name)
|
||||
|
||||
@property
|
||||
def receivers(self):
|
||||
from solar.core import resource
|
||||
|
||||
for receiver_name, receiver_input in signals.Connections.receivers(
|
||||
self._attached_to_name,
|
||||
self.name
|
||||
):
|
||||
yield resource.load(receiver_name).args[receiver_input]
|
||||
|
||||
def __repr__(self):
|
||||
return '[{}:{}] {}'.format(self._attached_to_name, self.name, self.value)
|
||||
|
||||
def __unicode__(self):
|
||||
return unicode(self.value)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, BaseObserver):
|
||||
return self.value == other.value
|
||||
|
||||
return self.value == other
|
||||
|
||||
def notify(self, emitter):
|
||||
"""
|
||||
:param emitter: Observer
|
||||
:return:
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def update(self, value):
|
||||
"""
|
||||
:param value:
|
||||
:return:
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def find_receiver(self, receiver):
|
||||
fltr = [r for r in self.receivers
|
||||
if r._attached_to_name == receiver._attached_to_name
|
||||
and r.name == receiver.name]
|
||||
if fltr:
|
||||
return fltr[0]
|
||||
|
||||
def subscribe(self, receiver):
|
||||
"""
|
||||
:param receiver: Observer
|
||||
:return:
|
||||
"""
|
||||
log.debug('Subscribe %s', receiver)
|
||||
# No multiple subscriptions
|
||||
if self.find_receiver(receiver):
|
||||
log.error('No multiple subscriptions from %s', receiver)
|
||||
return
|
||||
receiver.subscribed(self)
|
||||
|
||||
signals.Connections.add(
|
||||
self.attached_to,
|
||||
self.name,
|
||||
receiver.attached_to,
|
||||
receiver.name
|
||||
)
|
||||
|
||||
receiver.notify(self)
|
||||
|
||||
def subscribed(self, emitter):
|
||||
log.debug('Subscribed %s', emitter)
|
||||
|
||||
def unsubscribe(self, receiver):
|
||||
"""
|
||||
:param receiver: Observer
|
||||
:return:
|
||||
"""
|
||||
log.debug('Unsubscribe %s', receiver)
|
||||
if self.find_receiver(receiver):
|
||||
receiver.unsubscribed(self)
|
||||
|
||||
signals.Connections.remove(
|
||||
self.attached_to,
|
||||
self.name,
|
||||
receiver.attached_to,
|
||||
receiver.name
|
||||
)
|
||||
|
||||
# TODO: ?
|
||||
#receiver.notify(self)
|
||||
|
||||
def unsubscribed(self, emitter):
|
||||
log.debug('Unsubscribed %s', emitter)
|
||||
|
||||
|
||||
class Observer(BaseObserver):
|
||||
type_ = 'simple'
|
||||
|
||||
@property
|
||||
def emitter(self):
|
||||
from solar.core import resource
|
||||
|
||||
emitter = signals.Connections.emitter(self._attached_to_name, self.name)
|
||||
|
||||
if emitter is not None:
|
||||
emitter_name, emitter_input_name = emitter
|
||||
return resource.load(emitter_name).args[emitter_input_name]
|
||||
|
||||
def notify(self, emitter):
|
||||
log.debug('Notify from %s value %s', emitter, emitter.value)
|
||||
# Copy emitter's values to receiver
|
||||
self.value = emitter.value
|
||||
for receiver in self.receivers:
|
||||
receiver.notify(self)
|
||||
self.attached_to.set_args_from_dict({self.name: self.value})
|
||||
|
||||
def update(self, value):
|
||||
log.debug('Updating to value %s', value)
|
||||
self.value = value
|
||||
for receiver in self.receivers:
|
||||
receiver.notify(self)
|
||||
self.attached_to.set_args_from_dict({self.name: self.value})
|
||||
|
||||
def subscribed(self, emitter):
|
||||
super(Observer, self).subscribed(emitter)
|
||||
# Simple observer can be attached to at most one emitter
|
||||
if self.emitter is not None:
|
||||
self.emitter.unsubscribe(self)
|
||||
|
||||
|
||||
class ListObserver(BaseObserver):
|
||||
type_ = 'list'
|
||||
|
||||
def __unicode__(self):
|
||||
return unicode(self.value)
|
||||
|
||||
@staticmethod
|
||||
def _format_value(emitter):
|
||||
return {
|
||||
'emitter': emitter.name,
|
||||
'emitter_attached_to': emitter._attached_to_name,
|
||||
'value': emitter.value,
|
||||
}
|
||||
|
||||
def notify(self, emitter):
|
||||
log.debug('Notify from %s value %s', emitter, emitter.value)
|
||||
# Copy emitter's values to receiver
|
||||
idx = self._emitter_idx(emitter)
|
||||
self.value[idx] = self._format_value(emitter)
|
||||
for receiver in self.receivers:
|
||||
receiver.notify(self)
|
||||
self.attached_to.set_args_from_dict({self.name: self.value})
|
||||
|
||||
def subscribed(self, emitter):
|
||||
super(ListObserver, self).subscribed(emitter)
|
||||
idx = self._emitter_idx(emitter)
|
||||
if idx is None:
|
||||
self.value.append(self._format_value(emitter))
|
||||
self.attached_to.set_args_from_dict({self.name: self.value})
|
||||
|
||||
def unsubscribed(self, emitter):
|
||||
"""
|
||||
:param receiver: Observer
|
||||
:return:
|
||||
"""
|
||||
log.debug('Unsubscribed emitter %s', emitter)
|
||||
idx = self._emitter_idx(emitter)
|
||||
self.value.pop(idx)
|
||||
self.attached_to.set_args_from_dict({self.name: self.value})
|
||||
for receiver in self.receivers:
|
||||
receiver.notify(self)
|
||||
|
||||
def _emitter_idx(self, emitter):
|
||||
try:
|
||||
return [i for i, e in enumerate(self.value)
|
||||
if e['emitter_attached_to'] == emitter._attached_to_name
|
||||
][0]
|
||||
except IndexError:
|
||||
return
|
||||
|
||||
|
||||
def create(type_, *args, **kwargs):
|
||||
for klass in BaseObserver.__subclasses__():
|
||||
if klass.type_ == type_:
|
||||
return klass(*args, **kwargs)
|
||||
raise NotImplementedError('No handling class for type {}'.format(type_))
|
||||
@@ -1,22 +0,0 @@
|
||||
__all__ = [
|
||||
'Resource',
|
||||
'assign_resources_to_nodes',
|
||||
'connect_resources',
|
||||
'create',
|
||||
'load',
|
||||
'load_all',
|
||||
'prepare_meta',
|
||||
'wrap_resource',
|
||||
'validate_resources',
|
||||
]
|
||||
|
||||
|
||||
from solar.core.resource.resource import Resource
|
||||
from solar.core.resource.resource import assign_resources_to_nodes
|
||||
from solar.core.resource.resource import connect_resources
|
||||
from solar.core.resource.resource import load
|
||||
from solar.core.resource.resource import load_all
|
||||
from solar.core.resource.resource import wrap_resource
|
||||
from solar.core.resource.virtual_resource import create
|
||||
from solar.core.resource.virtual_resource import prepare_meta
|
||||
from solar.core.resource.virtual_resource import validate_resources
|
||||
@@ -1,80 +0,0 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import inflection
|
||||
import os
|
||||
import pprint
|
||||
|
||||
from solar.core import resource
|
||||
from solar import utils
|
||||
|
||||
|
||||
RESOURCE_HEADER_TEMPLATE = """
|
||||
from solar.core.resource import Resource
|
||||
"""
|
||||
|
||||
|
||||
RESOURCE_CLASS_TEMPLATE = """
|
||||
|
||||
|
||||
class {class_name}(Resource):
|
||||
_metadata = {{
|
||||
'actions': {meta_actions},
|
||||
'actions_path': '{actions_path}',
|
||||
'base_path': '{base_path}',
|
||||
'input': {meta_input},
|
||||
'handler': '{handler}',
|
||||
}}
|
||||
|
||||
{input_properties}
|
||||
"""
|
||||
|
||||
|
||||
RESOURCE_INPUT_PROPERTY_TEMPLATE = """
|
||||
@property
|
||||
def {name}(self):
|
||||
return self.args['{name}']
|
||||
|
||||
@{name}.setter
|
||||
def {name}(self, value):
|
||||
#self.args['{name}'].value = value
|
||||
#self.set_args_from_dict({{'{name}': value}})
|
||||
self.update({{'{name}': value}})
|
||||
"""
|
||||
|
||||
|
||||
def compile(meta):
|
||||
destination_file = utils.read_config()['resources-compiled-file']
|
||||
|
||||
resource.prepare_meta(meta)
|
||||
meta['class_name'] = '{}Resource'.format(
|
||||
inflection.camelize(meta['base_name'])
|
||||
)
|
||||
meta['meta_actions'] = pprint.pformat(meta['actions'])
|
||||
meta['meta_input'] = pprint.pformat(meta['input'])
|
||||
|
||||
print meta['base_name'], meta['class_name']
|
||||
|
||||
if not os.path.exists(destination_file):
|
||||
with open(destination_file, 'w') as f:
|
||||
f.write(RESOURCE_HEADER_TEMPLATE.format(**meta))
|
||||
|
||||
with open(destination_file, 'a') as f:
|
||||
input_properties = '\n'.join(
|
||||
RESOURCE_INPUT_PROPERTY_TEMPLATE.format(name=name)
|
||||
for name in meta['input']
|
||||
)
|
||||
f.write(RESOURCE_CLASS_TEMPLATE.format(
|
||||
input_properties=input_properties, **meta)
|
||||
)
|
||||
@@ -1,237 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import os
|
||||
|
||||
from copy import deepcopy
|
||||
|
||||
import solar
|
||||
|
||||
from solar.core import actions
|
||||
from solar.core import observer
|
||||
from solar.core import signals
|
||||
from solar.core import validation
|
||||
|
||||
from solar.core.connections import ResourcesConnectionGraph
|
||||
from solar.interfaces.db import get_db
|
||||
|
||||
db = get_db()
|
||||
|
||||
|
||||
class Resource(object):
|
||||
_metadata = {}
|
||||
|
||||
def __init__(self, name, metadata, args, tags=None, virtual_resource=None):
|
||||
self.name = name
|
||||
if metadata:
|
||||
self.metadata = metadata
|
||||
else:
|
||||
self.metadata = deepcopy(self._metadata)
|
||||
|
||||
self.metadata['id'] = name
|
||||
|
||||
self.tags = tags or []
|
||||
self.virtual_resource = virtual_resource
|
||||
self.set_args_from_dict(args)
|
||||
|
||||
@property
|
||||
def actions(self):
|
||||
return self.metadata.get('actions') or []
|
||||
|
||||
@property
|
||||
def args(self):
|
||||
ret = {}
|
||||
|
||||
args = self.args_dict()
|
||||
|
||||
for arg_name, metadata_arg in self.metadata['input'].items():
|
||||
type_ = validation.schema_input_type(metadata_arg.get('schema', 'str'))
|
||||
|
||||
ret[arg_name] = observer.create(
|
||||
type_, self, arg_name, args.get(arg_name)
|
||||
)
|
||||
|
||||
return ret
|
||||
|
||||
def args_dict(self):
|
||||
raw_resource = db.read(self.name, collection=db.COLLECTIONS.resource)
|
||||
if raw_resource is None:
|
||||
return {}
|
||||
|
||||
self.metadata = raw_resource
|
||||
|
||||
return Resource.get_raw_resource_args(raw_resource)
|
||||
|
||||
def set_args_from_dict(self, new_args):
|
||||
args = self.args_dict()
|
||||
args.update(new_args)
|
||||
|
||||
self.metadata['tags'] = self.tags
|
||||
self.metadata['virtual_resource'] = self.virtual_resource
|
||||
for k, v in args.items():
|
||||
if k not in self.metadata['input']:
|
||||
raise NotImplementedError(
|
||||
'Argument {} not implemented for resource {}'.format(k, self)
|
||||
)
|
||||
|
||||
if isinstance(v, dict) and 'value' in v:
|
||||
v = v['value']
|
||||
self.metadata['input'][k]['value'] = v
|
||||
|
||||
db.save(self.name, self.metadata, collection=db.COLLECTIONS.resource)
|
||||
|
||||
def set_args(self, args):
|
||||
self.set_args_from_dict({k: v.value for k, v in args.items()})
|
||||
|
||||
def __repr__(self):
|
||||
return ("Resource(name='{id}', metadata={metadata}, args={input}, "
|
||||
"tags={tags})").format(**self.to_dict())
|
||||
|
||||
def color_repr(self):
|
||||
import click
|
||||
|
||||
arg_color = 'yellow'
|
||||
|
||||
return ("{resource_s}({name_s}='{id}', {metadata_s}={metadata}, "
|
||||
"{args_s}={input}, {tags_s}={tags})").format(
|
||||
resource_s=click.style('Resource', fg='white', bold=True),
|
||||
name_s=click.style('name', fg=arg_color, bold=True),
|
||||
metadata_s=click.style('metadata', fg=arg_color, bold=True),
|
||||
args_s=click.style('args', fg=arg_color, bold=True),
|
||||
tags_s=click.style('tags', fg=arg_color, bold=True),
|
||||
**self.to_dict()
|
||||
)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'id': self.name,
|
||||
'metadata': self.metadata,
|
||||
'input': self.args_show(),
|
||||
'tags': self.tags,
|
||||
}
|
||||
|
||||
def args_show(self):
|
||||
def formatter(v):
|
||||
if isinstance(v, observer.ListObserver):
|
||||
return v.value
|
||||
elif isinstance(v, observer.Observer):
|
||||
return {
|
||||
'emitter': v.emitter.attached_to.name if v.emitter else None,
|
||||
'value': v.value,
|
||||
}
|
||||
|
||||
return v
|
||||
|
||||
return {k: formatter(v) for k, v in self.args.items()}
|
||||
|
||||
def add_tag(self, tag):
|
||||
if tag not in self.tags:
|
||||
self.tags.append(tag)
|
||||
|
||||
def remove_tag(self, tag):
|
||||
try:
|
||||
self.tags.remove(tag)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
def notify(self, emitter):
|
||||
"""Update resource's args from emitter's args.
|
||||
|
||||
:param emitter: Resource
|
||||
:return:
|
||||
"""
|
||||
r_args = self.args
|
||||
|
||||
for key, value in emitter.args.iteritems():
|
||||
r_args[key].notify(value)
|
||||
|
||||
def update(self, args):
|
||||
"""This method updates resource's args with a simple dict.
|
||||
|
||||
:param args:
|
||||
:return:
|
||||
"""
|
||||
# Update will be blocked if this resource is listening
|
||||
# on some input that is to be updated -- we should only listen
|
||||
# to the emitter and not be able to change the input's value
|
||||
r_args = self.args
|
||||
|
||||
for key, value in args.iteritems():
|
||||
r_args[key].update(value)
|
||||
|
||||
self.set_args(r_args)
|
||||
|
||||
def action(self, action):
|
||||
if action in self.actions:
|
||||
actions.resource_action(self, action)
|
||||
else:
|
||||
raise Exception('Uuups, action is not available')
|
||||
|
||||
@staticmethod
|
||||
def get_raw_resource_args(raw_resource):
|
||||
return {k: v.get('value') for k, v in raw_resource['input'].items()}
|
||||
|
||||
|
||||
def wrap_resource(raw_resource):
|
||||
name = raw_resource['id']
|
||||
args = Resource.get_raw_resource_args(raw_resource)
|
||||
tags = raw_resource.get('tags', [])
|
||||
virtual_resource = raw_resource.get('virtual_resource', [])
|
||||
|
||||
return Resource(name, raw_resource, args, tags=tags, virtual_resource=virtual_resource)
|
||||
|
||||
|
||||
def wrap_resource_no_value(raw_resource):
|
||||
name = raw_resource['id']
|
||||
args = {k: v for k, v in raw_resource['input'].items()}
|
||||
tags = raw_resource.get('tags', [])
|
||||
virtual_resource = raw_resource.get('virtual_resource', [])
|
||||
|
||||
return Resource(name, raw_resource, args, tags=tags, virtual_resource=virtual_resource)
|
||||
|
||||
|
||||
def load(resource_name):
|
||||
raw_resource = db.read(resource_name, collection=db.COLLECTIONS.resource)
|
||||
|
||||
if raw_resource is None:
|
||||
raise KeyError(
|
||||
'Resource {} does not exist'.format(resource_name)
|
||||
)
|
||||
|
||||
return wrap_resource(raw_resource)
|
||||
|
||||
|
||||
def load_all():
|
||||
ret = {}
|
||||
|
||||
for raw_resource in db.get_list(collection=db.COLLECTIONS.resource):
|
||||
resource = wrap_resource(raw_resource)
|
||||
ret[resource.name] = resource
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def assign_resources_to_nodes(resources, nodes):
|
||||
for node in nodes:
|
||||
for resource in resources:
|
||||
res = deepcopy(resource)
|
||||
res['tags'] = list(set(node.get('tags', [])) |
|
||||
set(resource.get('tags', [])))
|
||||
resource_uuid = '{0}-{1}'.format(res['id'], solar.utils.generate_uuid())
|
||||
# We should not generate here any uuid's, because
|
||||
# a single node should be represented with a single
|
||||
# resource
|
||||
node_uuid = node['id']
|
||||
|
||||
node_resource_template = solar.utils.read_config()['node_resource_template']
|
||||
args = {k: v['value'] for k, v in res['input'].items()}
|
||||
created_resource = create(resource_uuid, resource['dir_path'], args, tags=res['tags'])
|
||||
created_node = create(node_uuid, node_resource_template, node, tags=node.get('tags', []))
|
||||
|
||||
signals.connect(created_node, created_resource)
|
||||
|
||||
|
||||
def connect_resources(profile):
|
||||
connections = profile.get('connections', [])
|
||||
graph = ResourcesConnectionGraph(connections, load_all().values())
|
||||
|
||||
for connection in graph.iter_connections():
|
||||
signals.connect(connection['from'], connection['to'], connection['mapping'])
|
||||
@@ -1,196 +0,0 @@
|
||||
# -*- coding: UTF-8 -*-
|
||||
import os
|
||||
from StringIO import StringIO
|
||||
|
||||
import yaml
|
||||
|
||||
from jinja2 import Template, Environment, meta
|
||||
|
||||
from solar import utils
|
||||
from solar.core import validation
|
||||
from solar.core.resource import load_all, Resource
|
||||
from solar.core import provider
|
||||
from solar.core import signals
|
||||
|
||||
|
||||
def create_resource(name, base_path, args, virtual_resource=None):
|
||||
if isinstance(base_path, provider.BaseProvider):
|
||||
base_path = base_path.directory
|
||||
|
||||
base_meta_file = os.path.join(base_path, 'meta.yaml')
|
||||
actions_path = os.path.join(base_path, 'actions')
|
||||
|
||||
metadata = utils.yaml_load(base_meta_file)
|
||||
metadata['id'] = name
|
||||
metadata['version'] = '1.0.0'
|
||||
metadata['base_path'] = os.path.abspath(base_path)
|
||||
|
||||
prepare_meta(metadata)
|
||||
|
||||
if os.path.exists(actions_path):
|
||||
for f in os.listdir(actions_path):
|
||||
metadata['actions'][os.path.splitext(f)[0]] = f
|
||||
|
||||
tags = metadata.get('tags', [])
|
||||
|
||||
resource = Resource(name, metadata, args, tags, virtual_resource)
|
||||
return resource
|
||||
|
||||
|
||||
def create_virtual_resource(vr_name, template):
|
||||
resources = template['resources']
|
||||
connections = []
|
||||
created_resources = []
|
||||
|
||||
cwd = os.getcwd()
|
||||
for resource in resources:
|
||||
name = resource['id']
|
||||
base_path = os.path.join(cwd, resource['from'])
|
||||
args = resource['values']
|
||||
new_resources = create(name, base_path, args, vr_name)
|
||||
created_resources += new_resources
|
||||
|
||||
if not is_virtual(base_path):
|
||||
for key, arg in args.items():
|
||||
if isinstance(arg, basestring) and '::' in arg:
|
||||
emitter, src = arg.split('::')
|
||||
connections.append((emitter, name, {src: key}))
|
||||
|
||||
db = load_all()
|
||||
for emitter, reciver, mapping in connections:
|
||||
emitter = db[emitter]
|
||||
reciver = db[reciver]
|
||||
signals.connect(emitter, reciver, mapping)
|
||||
|
||||
return created_resources
|
||||
|
||||
|
||||
def create(name, base_path, kwargs, virtual_resource=None):
|
||||
if isinstance(base_path, provider.BaseProvider):
|
||||
base_path = base_path.directory
|
||||
|
||||
if not os.path.exists(base_path):
|
||||
raise Exception(
|
||||
'Base resource does not exist: {0}'.format(base_path)
|
||||
)
|
||||
|
||||
if is_virtual(base_path):
|
||||
template = _compile_file(name, base_path, kwargs)
|
||||
yaml_template = yaml.load(StringIO(template))
|
||||
resources = create_virtual_resource(name, yaml_template)
|
||||
else:
|
||||
resource = create_resource(name, base_path, kwargs, virtual_resource)
|
||||
resources = [resource]
|
||||
|
||||
return resources
|
||||
|
||||
|
||||
def prepare_meta(meta):
|
||||
actions_path = os.path.join(meta['base_path'], 'actions')
|
||||
meta['actions_path'] = actions_path
|
||||
meta['base_name'] = os.path.split(meta['base_path'])[-1]
|
||||
|
||||
meta['actions'] = {}
|
||||
if os.path.exists(meta['actions_path']):
|
||||
for f in os.listdir(meta['actions_path']):
|
||||
meta['actions'][os.path.splitext(f)[0]] = f
|
||||
|
||||
|
||||
def validate_resources():
|
||||
db = load_all()
|
||||
all_errors = []
|
||||
for r in db.values():
|
||||
if not isinstance(r, Resource):
|
||||
continue
|
||||
|
||||
errors = validation.validate_resource(r)
|
||||
if errors:
|
||||
all_errors.append((r, errors))
|
||||
return all_errors
|
||||
|
||||
|
||||
def find_inputs_without_source():
|
||||
"""Find resources and inputs values of which are hardcoded.
|
||||
|
||||
:return: [(resource_name, input_name)]
|
||||
"""
|
||||
resources = load_all()
|
||||
|
||||
ret = set([(r.name, input_name) for r in resources.values()
|
||||
for input_name in r.args])
|
||||
|
||||
clients = signals.Connections.read_clients()
|
||||
|
||||
for dest_dict in clients.values():
|
||||
for destinations in dest_dict.values():
|
||||
for receiver_name, receiver_input in destinations:
|
||||
try:
|
||||
ret.remove((receiver_name, receiver_input))
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
return list(ret)
|
||||
|
||||
|
||||
def find_missing_connections():
|
||||
"""Find resources whose input values are duplicated
|
||||
|
||||
and they are not connected between each other (i.e. the values
|
||||
are hardcoded, not coming from connection).
|
||||
|
||||
NOTE: this we could have 2 inputs of the same value living in 2 "circles".
|
||||
This is not covered, we find only inputs whose value is hardcoded.
|
||||
|
||||
:return: [(resource_name1, input_name1, resource_name2, input_name2)]
|
||||
"""
|
||||
ret = set()
|
||||
|
||||
resources = load_all()
|
||||
|
||||
inputs_without_source = find_inputs_without_source()
|
||||
|
||||
for resource1, input1 in inputs_without_source:
|
||||
r1 = resources[resource1]
|
||||
v1 = r1.args[input1]
|
||||
|
||||
for resource2, input2 in inputs_without_source:
|
||||
r2 = resources[resource2]
|
||||
v2 = r2.args[input2]
|
||||
|
||||
if v1 == v2 and resource1 != resource2 and \
|
||||
(resource2, input2, resource1, input1) not in ret:
|
||||
ret.add((resource1, input1, resource2, input2))
|
||||
|
||||
return list(ret)
|
||||
|
||||
|
||||
def _compile_file(name, path, kwargs):
|
||||
with open(path) as f:
|
||||
content = f.read()
|
||||
|
||||
inputs = get_inputs(content)
|
||||
template = _get_template(name, content, kwargs, inputs)
|
||||
return template
|
||||
|
||||
|
||||
def get_inputs(content):
|
||||
env = Environment()
|
||||
ast = env.parse(content)
|
||||
return meta.find_undeclared_variables(ast)
|
||||
|
||||
|
||||
def _get_template(name, content, kwargs, inputs):
|
||||
missing = []
|
||||
for input in inputs:
|
||||
if input not in kwargs:
|
||||
missing.append(input)
|
||||
if missing:
|
||||
raise Exception('[{0}] Validation error. Missing data in input: {1}'.format(name, missing))
|
||||
template = Template(content)
|
||||
template = template.render(str=str, zip=zip, **kwargs)
|
||||
return template
|
||||
|
||||
|
||||
def is_virtual(path):
|
||||
return os.path.isfile(path)
|
||||
|
||||
@@ -1,276 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from collections import defaultdict
|
||||
import itertools
|
||||
import networkx as nx
|
||||
|
||||
from solar.core.log import log
|
||||
from solar.interfaces.db import get_db
|
||||
|
||||
db = get_db()
|
||||
|
||||
|
||||
class Connections(object):
|
||||
@staticmethod
|
||||
def read_clients():
|
||||
"""
|
||||
Returned structure is:
|
||||
|
||||
emitter_name:
|
||||
emitter_input_name:
|
||||
- - dst_name
|
||||
- dst_input_name
|
||||
|
||||
while DB structure is:
|
||||
|
||||
emitter_name_key:
|
||||
emitter: emitter_name
|
||||
sources:
|
||||
emitter_input_name:
|
||||
- - dst_name
|
||||
- dst_input_name
|
||||
"""
|
||||
|
||||
ret = {}
|
||||
|
||||
for data in db.get_list(collection=db.COLLECTIONS.connection):
|
||||
ret[data['emitter']] = data['sources']
|
||||
|
||||
return ret
|
||||
|
||||
@staticmethod
|
||||
def save_clients(clients):
|
||||
data = []
|
||||
|
||||
for emitter_name, sources in clients.items():
|
||||
data.append((
|
||||
emitter_name,
|
||||
{
|
||||
'emitter': emitter_name,
|
||||
'sources': sources,
|
||||
}))
|
||||
|
||||
db.save_list(data, collection=db.COLLECTIONS.connection)
|
||||
|
||||
@staticmethod
|
||||
def add(emitter, src, receiver, dst):
|
||||
if src not in emitter.args:
|
||||
return
|
||||
|
||||
clients = Connections.read_clients()
|
||||
|
||||
# TODO: implement general circular detection, this one is simple
|
||||
if [emitter.name, src] in clients.get(receiver.name, {}).get(dst, []):
|
||||
raise Exception('Attempted to create cycle in dependencies. Not nice.')
|
||||
|
||||
clients.setdefault(emitter.name, {})
|
||||
clients[emitter.name].setdefault(src, [])
|
||||
if [receiver.name, dst] not in clients[emitter.name][src]:
|
||||
clients[emitter.name][src].append([receiver.name, dst])
|
||||
|
||||
Connections.save_clients(clients)
|
||||
|
||||
@staticmethod
|
||||
def remove(emitter, src, receiver, dst):
|
||||
clients = Connections.read_clients()
|
||||
|
||||
clients[emitter.name][src] = [
|
||||
destination for destination in clients[emitter.name][src]
|
||||
if destination != [receiver.name, dst]
|
||||
]
|
||||
|
||||
Connections.save_clients(clients)
|
||||
|
||||
@staticmethod
|
||||
def receivers(emitter_name, emitter_input_name):
|
||||
return Connections.read_clients().get(emitter_name, {}).get(
|
||||
emitter_input_name, []
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def emitter(receiver_name, receiver_input_name):
|
||||
for emitter_name, dest_dict in Connections.read_clients().items():
|
||||
for emitter_input_name, destinations in dest_dict.items():
|
||||
if [receiver_name, receiver_input_name] in destinations:
|
||||
return [emitter_name, emitter_input_name]
|
||||
|
||||
@staticmethod
|
||||
def clear():
|
||||
db.clear_collection(collection=db.COLLECTIONS.connection)
|
||||
|
||||
|
||||
def guess_mapping(emitter, receiver):
|
||||
"""Guess connection mapping between emitter and receiver.
|
||||
|
||||
Suppose emitter and receiver have common inputs:
|
||||
ip, ssh_key, ssh_user
|
||||
|
||||
Then we return a connection mapping like this:
|
||||
|
||||
{
|
||||
'ip': '<receiver>.ip',
|
||||
'ssh_key': '<receiver>.ssh_key',
|
||||
'ssh_user': '<receiver>.ssh_user'
|
||||
}
|
||||
|
||||
:param emitter:
|
||||
:param receiver:
|
||||
:return:
|
||||
"""
|
||||
guessed = {}
|
||||
for key in emitter.args:
|
||||
if key in receiver.args:
|
||||
guessed[key] = key
|
||||
|
||||
return guessed
|
||||
|
||||
|
||||
def connect_single(emitter, src, receiver, dst):
|
||||
# Disconnect all receiver inputs
|
||||
# Check if receiver input is of list type first
|
||||
if receiver.args[dst].type_ != 'list':
|
||||
disconnect_receiver_by_input(receiver, dst)
|
||||
|
||||
emitter.args[src].subscribe(receiver.args[dst])
|
||||
|
||||
|
||||
def connect(emitter, receiver, mapping=None):
|
||||
mapping = mapping or guess_mapping(emitter, receiver)
|
||||
|
||||
if isinstance(mapping, set):
|
||||
for src in mapping:
|
||||
connect_single(emitter, src, receiver, src)
|
||||
return
|
||||
|
||||
for src, dst in mapping.items():
|
||||
if isinstance(dst, list):
|
||||
for d in dst:
|
||||
connect_single(emitter, src, receiver, d)
|
||||
continue
|
||||
|
||||
connect_single(emitter, src, receiver, dst)
|
||||
|
||||
#receiver.save()
|
||||
|
||||
|
||||
def disconnect(emitter, receiver):
|
||||
clients = Connections.read_clients()
|
||||
|
||||
for src, destinations in clients[emitter.name].items():
|
||||
for destination in destinations:
|
||||
receiver_input = destination[1]
|
||||
if receiver_input in receiver.args:
|
||||
if receiver.args[receiver_input].type_ != 'list':
|
||||
log.debug(
|
||||
'Removing input %s from %s', receiver_input, receiver.name
|
||||
)
|
||||
emitter.args[src].unsubscribe(receiver.args[receiver_input])
|
||||
|
||||
disconnect_by_src(emitter.name, src, receiver)
|
||||
|
||||
|
||||
def disconnect_receiver_by_input(receiver, input):
|
||||
"""Find receiver connection by input and disconnect it.
|
||||
|
||||
:param receiver:
|
||||
:param input:
|
||||
:return:
|
||||
"""
|
||||
clients = Connections.read_clients()
|
||||
|
||||
for emitter_name, inputs in clients.items():
|
||||
disconnect_by_src(emitter_name, input, receiver)
|
||||
|
||||
|
||||
def disconnect_by_src(emitter_name, src, receiver):
|
||||
clients = Connections.read_clients()
|
||||
|
||||
if src in clients[emitter_name]:
|
||||
clients[emitter_name][src] = [
|
||||
destination for destination in clients[emitter_name][src]
|
||||
if destination[0] != receiver.name
|
||||
]
|
||||
|
||||
Connections.save_clients(clients)
|
||||
|
||||
|
||||
def notify(source, key, value):
|
||||
from solar.core.resource import load
|
||||
|
||||
clients = Connections.read_clients()
|
||||
|
||||
if source.name not in clients:
|
||||
clients[source.name] = {}
|
||||
Connections.save_clients(clients)
|
||||
|
||||
log.debug('Notify %s %s %s %s', source.name, key, value, clients[source.name])
|
||||
if key in clients[source.name]:
|
||||
for client, r_key in clients[source.name][key]:
|
||||
resource = load(client)
|
||||
log.debug('Resource found: %s', client)
|
||||
if resource:
|
||||
resource.update({r_key: value}, emitter=source)
|
||||
else:
|
||||
log.debug('Resource %s deleted?', client)
|
||||
pass
|
||||
|
||||
|
||||
def assign_connections(receiver, connections):
|
||||
mappings = defaultdict(list)
|
||||
for key, dest in connections.iteritems():
|
||||
resource, r_key = dest.split('.')
|
||||
mappings[resource].append([r_key, key])
|
||||
for resource, r_mappings in mappings.iteritems():
|
||||
connect(resource, receiver, r_mappings)
|
||||
|
||||
|
||||
def connection_graph():
|
||||
resource_dependencies = {}
|
||||
|
||||
clients = Connections.read_clients()
|
||||
|
||||
for emitter_name, destination_values in clients.items():
|
||||
resource_dependencies.setdefault(emitter_name, set())
|
||||
for emitter_input, receivers in destination_values.items():
|
||||
resource_dependencies[emitter_name].update(
|
||||
receiver[0] for receiver in receivers
|
||||
)
|
||||
|
||||
g = nx.DiGraph()
|
||||
|
||||
# TODO: tags as graph node attributes
|
||||
for emitter_name, receivers in resource_dependencies.items():
|
||||
g.add_node(emitter_name)
|
||||
g.add_nodes_from(receivers)
|
||||
g.add_edges_from(
|
||||
itertools.izip(
|
||||
itertools.repeat(emitter_name),
|
||||
receivers
|
||||
)
|
||||
)
|
||||
|
||||
return g
|
||||
|
||||
|
||||
def detailed_connection_graph(start_with=None, end_with=None):
|
||||
g = nx.MultiDiGraph()
|
||||
|
||||
clients = Connections.read_clients()
|
||||
|
||||
for emitter_name, destination_values in clients.items():
|
||||
for emitter_input, receivers in destination_values.items():
|
||||
for receiver_name, receiver_input in receivers:
|
||||
label = '{}:{}'.format(emitter_input, receiver_input)
|
||||
g.add_edge(emitter_name, receiver_name, label=label)
|
||||
|
||||
ret = g
|
||||
|
||||
if start_with is not None:
|
||||
ret = g.subgraph(
|
||||
nx.dfs_postorder_nodes(ret, start_with)
|
||||
)
|
||||
if end_with is not None:
|
||||
ret = g.subgraph(
|
||||
nx.dfs_postorder_nodes(ret.reverse(), end_with)
|
||||
)
|
||||
|
||||
return ret
|
||||
Reference in New Issue
Block a user