Merge branch 'cgenie/graph-db-2015-09-09' into cgenie/graph-db

This commit is contained in:
Przemyslaw Kaminski
2015-09-09 15:18:54 +02:00
21 changed files with 1859 additions and 647 deletions

View File

@@ -1,18 +1 @@
__all__ = [
'Resource',
'create',
'load',
'load_all',
'prepare_meta',
'wrap_resource',
'validate_resources',
]
from solar.core.resource.resource import Resource
from solar.core.resource.resource import load
from solar.core.resource.resource import load_all
from solar.core.resource.resource import wrap_resource
from solar.core.resource.virtual_resource import create
from solar.core.resource.virtual_resource import prepare_meta
from solar.core.resource.virtual_resource import validate_resources
from .resource import Resource, load, wrap_resource

View File

@@ -1,27 +1,62 @@
# -*- coding: utf-8 -*-
import os
from copy import deepcopy
import solar
from solar.core import actions
from solar.core import observer
from solar.core import signals
from solar.core import validation
import json
from multipledispatch import dispatch
import os
import uuid
from solar.interfaces.db import get_db
from solar import utils
db = get_db()
# TODO: cycle detection?
# TODO: write this as a Cypher query? Move to DB?
def _read_input_value(input_node):
rel = db.get_relations(dest=input_node,
type_=db.RELATION_TYPES.input_to_input)
if not rel:
v = input_node.properties['value'] or 'null'
return json.loads(v)
if input_node.properties['is_list']:
return [_read_input_value(r.start_node) for r in rel]
return _read_input_value(rel[0].start_node)
def prepare_meta(meta):
actions_path = os.path.join(meta['base_path'], 'actions')
meta['actions_path'] = actions_path
meta['base_name'] = os.path.split(meta['base_path'])[-1]
meta['actions'] = {}
if os.path.exists(meta['actions_path']):
for f in os.listdir(meta['actions_path']):
meta['actions'][os.path.splitext(f)[0]] = f
def read_meta(base_path):
base_meta_file = os.path.join(base_path, 'meta.yaml')
metadata = utils.yaml_load(base_meta_file)
metadata['version'] = '1.0.0'
metadata['base_path'] = os.path.abspath(base_path)
return metadata
class Resource(object):
_metadata = {}
def __init__(self, name, metadata, args, tags=None, virtual_resource=None):
# Create
@dispatch(str, str, dict)
def __init__(self, name, base_path, args, tags=None, virtual_resource=None):
self.name = name
if metadata:
self.metadata = metadata
if base_path:
self.metadata = read_meta(base_path)
else:
self.metadata = deepcopy(self._metadata)
@@ -29,180 +64,98 @@ class Resource(object):
self.tags = tags or []
self.virtual_resource = virtual_resource
self.node = db.get_or_create(
name,
properties={
'actions_path': self.metadata.get('actions_path', ''),
'base_name': self.metadata.get('base_name', ''),
'base_path': self.metadata.get('base_path', ''),
'handler': self.metadata.get('handler', ''),
'id': self.metadata['id'],
'version': self.metadata.get('version', ''),
},
collection=db.COLLECTIONS.resource
)
self.set_args_from_dict(args)
# Load
@dispatch(object)
def __init__(self, resource_node):
self.node = resource_node
self.name = resource_node.uid
self.metadata = read_meta(resource_node.properties['base_path'])
self.metadata.update(resource_node.properties)
self.tags = []
self.virtual_resource = None
@property
def actions(self):
return self.metadata.get('actions') or []
# TODO: json.dumps/loads should be probably moved to neo4j.py
def set_args_from_dict(self, args):
self.node.pull()
for k, v in self.metadata['input'].items():
value = args.get(k, v.get('value'))
uid = '{}-{}'.format(k, uuid.uuid4())
i = db.get_or_create(
uid,
properties={
'is_list': isinstance(v.get('schema'), list),
'input_name': k,
'value': json.dumps(value),
},
collection=db.COLLECTIONS.input
)
db.get_or_create_relation(
self.node,
i,
properties={},
type_=db.RELATION_TYPES.resource_input
)
@property
def args(self):
ret = {}
args = self.args_dict()
for arg_name, metadata_arg in self.metadata['input'].items():
type_ = validation.schema_input_type(metadata_arg.get('schema', 'str'))
ret[arg_name] = observer.create(
type_, self, arg_name, args.get(arg_name)
)
for k, n in self.resource_inputs().items():
ret[k] = _read_input_value(n)
return ret
def args_dict(self):
raw_resource = db.read(self.name, collection=db.COLLECTIONS.resource)
if raw_resource is None:
return {}
def update(self, args):
# TODO: disconnect input when it is updated and and end_node
# for some input_to_input relation
resource_inputs = self.resource_inputs()
self.metadata = raw_resource
return Resource.get_raw_resource_args(raw_resource)
def set_args_from_dict(self, new_args):
args = self.args_dict()
args.update(new_args)
self.metadata['tags'] = self.tags
self.metadata['virtual_resource'] = self.virtual_resource
for k, v in args.items():
if k not in self.metadata['input']:
raise NotImplementedError(
'Argument {} not implemented for resource {}'.format(k, self)
)
i = resource_inputs[k]
i.properties['value'] = json.dumps(v)
i.push()
if isinstance(v, dict) and 'value' in v:
v = v['value']
self.metadata['input'][k]['value'] = v
def resource_inputs(self):
resource_inputs = [
r.end_node for r in
db.get_relations(source=self.node,
type_=db.RELATION_TYPES.resource_input)
]
db.save(self.name, self.metadata, collection=db.COLLECTIONS.resource)
def set_args(self, args):
self.set_args_from_dict({k: v.value for k, v in args.items()})
def __repr__(self):
return ("Resource(name='{id}', metadata={metadata}, args={input}, "
"tags={tags})").format(**self.to_dict())
def color_repr(self):
import click
arg_color = 'yellow'
return ("{resource_s}({name_s}='{id}', {metadata_s}={metadata}, "
"{args_s}={input}, {tags_s}={tags})").format(
resource_s=click.style('Resource', fg='white', bold=True),
name_s=click.style('name', fg=arg_color, bold=True),
metadata_s=click.style('metadata', fg=arg_color, bold=True),
args_s=click.style('args', fg=arg_color, bold=True),
tags_s=click.style('tags', fg=arg_color, bold=True),
**self.to_dict()
)
def to_dict(self):
return {
'id': self.name,
'metadata': self.metadata,
'input': self.args_show(),
'tags': self.tags,
i.properties['input_name']: i for i in resource_inputs
}
def args_show(self):
def formatter(v):
if isinstance(v, observer.ListObserver):
return v.value
elif isinstance(v, observer.Observer):
return {
'emitter': v.emitter.attached_to.name if v.emitter else None,
'value': v.value,
}
return v
def load(name):
r = db.get(name, collection=db.COLLECTIONS.resource)
return {k: formatter(v) for k, v in self.args.items()}
if not r:
raise Exception('Resource {} does not exist in DB'.format(name))
def add_tag(self, tag):
if tag not in self.tags:
self.tags.append(tag)
def remove_tag(self, tag):
try:
self.tags.remove(tag)
except ValueError:
pass
def notify(self, emitter):
"""Update resource's args from emitter's args.
:param emitter: Resource
:return:
"""
r_args = self.args
for key, value in emitter.args.iteritems():
r_args[key].notify(value)
def update(self, args):
"""This method updates resource's args with a simple dict.
:param args:
:return:
"""
# Update will be blocked if this resource is listening
# on some input that is to be updated -- we should only listen
# to the emitter and not be able to change the input's value
r_args = self.args
for key, value in args.iteritems():
r_args[key].update(value)
self.set_args(r_args)
def action(self, action):
if action in self.actions:
actions.resource_action(self, action)
else:
raise Exception('Uuups, action is not available')
@staticmethod
def get_raw_resource_args(raw_resource):
return {k: v.get('value') for k, v in raw_resource['input'].items()}
return wrap_resource(r)
def wrap_resource(raw_resource):
name = raw_resource['id']
args = Resource.get_raw_resource_args(raw_resource)
tags = raw_resource.get('tags', [])
virtual_resource = raw_resource.get('virtual_resource', [])
return Resource(name, raw_resource, args, tags=tags, virtual_resource=virtual_resource)
def wrap_resource_no_value(raw_resource):
name = raw_resource['id']
args = {k: v for k, v in raw_resource['input'].items()}
tags = raw_resource.get('tags', [])
virtual_resource = raw_resource.get('virtual_resource', [])
return Resource(name, raw_resource, args, tags=tags, virtual_resource=virtual_resource)
def load(resource_name):
raw_resource = db.read(resource_name, collection=db.COLLECTIONS.resource)
if raw_resource is None:
raise KeyError(
'Resource {} does not exist'.format(resource_name)
)
return wrap_resource(raw_resource)
def load_all():
ret = {}
for raw_resource in db.get_list(collection=db.COLLECTIONS.resource):
resource = wrap_resource(raw_resource)
ret[resource.name] = resource
return ret
def wrap_resource(resource_node):
return Resource(resource_node)

View File

@@ -1,36 +1,42 @@
# -*- coding: UTF-8 -*-
import os
from StringIO import StringIO
import yaml
from jinja2 import Template, Environment, meta
from solar import utils
from solar.core import validation
from solar.core.resource import load_all, Resource
from solar.core import provider
from solar.core import resource
from solar.core import signals
def create(name, base_path, kwargs, virtual_resource=None):
if isinstance(base_path, provider.BaseProvider):
base_path = base_path.directory
if not os.path.exists(base_path):
raise Exception(
'Base resource does not exist: {0}'.format(base_path)
)
if is_virtual(base_path):
template = _compile_file(name, base_path, kwargs)
yaml_template = yaml.load(StringIO(template))
rs = create_virtual_resource(name, yaml_template)
else:
r = create_resource(name, base_path, kwargs, virtual_resource)
rs = [r]
return rs
def create_resource(name, base_path, args, virtual_resource=None):
if isinstance(base_path, provider.BaseProvider):
base_path = base_path.directory
base_meta_file = os.path.join(base_path, 'meta.yaml')
actions_path = os.path.join(base_path, 'actions')
metadata = utils.yaml_load(base_meta_file)
metadata['id'] = name
metadata['version'] = '1.0.0'
metadata['base_path'] = os.path.abspath(base_path)
prepare_meta(metadata)
tags = metadata.get('tags', [])
resource = Resource(name, metadata, args, tags, virtual_resource)
return resource
r = resource.Resource(
name, base_path, args, tags=[], virtual_resource=virtual_resource
)
return r
def create_virtual_resource(vr_name, template):
@@ -52,114 +58,14 @@ def create_virtual_resource(vr_name, template):
emitter, src = arg.split('::')
connections.append((emitter, name, {src: key}))
db = load_all()
for emitter, reciver, mapping in connections:
emitter = db[emitter]
reciver = db[reciver]
emitter = resource.load(emitter)
reciver = resource.load(reciver)
signals.connect(emitter, reciver, mapping)
return created_resources
def create(name, base_path, kwargs, virtual_resource=None):
if isinstance(base_path, provider.BaseProvider):
base_path = base_path.directory
if not os.path.exists(base_path):
raise Exception(
'Base resource does not exist: {0}'.format(base_path)
)
if is_virtual(base_path):
template = _compile_file(name, base_path, kwargs)
yaml_template = yaml.load(StringIO(template))
resources = create_virtual_resource(name, yaml_template)
else:
resource = create_resource(name, base_path, kwargs, virtual_resource)
resources = [resource]
return resources
def prepare_meta(meta):
actions_path = os.path.join(meta['base_path'], 'actions')
meta['actions_path'] = actions_path
meta['base_name'] = os.path.split(meta['base_path'])[-1]
meta['actions'] = {}
if os.path.exists(meta['actions_path']):
for f in os.listdir(meta['actions_path']):
meta['actions'][os.path.splitext(f)[0]] = f
def validate_resources():
db = load_all()
all_errors = []
for r in db.values():
if not isinstance(r, Resource):
continue
errors = validation.validate_resource(r)
if errors:
all_errors.append((r, errors))
return all_errors
def find_inputs_without_source():
"""Find resources and inputs values of which are hardcoded.
:return: [(resource_name, input_name)]
"""
resources = load_all()
ret = set([(r.name, input_name) for r in resources.values()
for input_name in r.args])
clients = signals.Connections.read_clients()
for dest_dict in clients.values():
for destinations in dest_dict.values():
for receiver_name, receiver_input in destinations:
try:
ret.remove((receiver_name, receiver_input))
except KeyError:
continue
return list(ret)
def find_missing_connections():
"""Find resources whose input values are duplicated
and they are not connected between each other (i.e. the values
are hardcoded, not coming from connection).
NOTE: this we could have 2 inputs of the same value living in 2 "circles".
This is not covered, we find only inputs whose value is hardcoded.
:return: [(resource_name1, input_name1, resource_name2, input_name2)]
"""
ret = set()
resources = load_all()
inputs_without_source = find_inputs_without_source()
for resource1, input1 in inputs_without_source:
r1 = resources[resource1]
v1 = r1.args[input1]
for resource2, input2 in inputs_without_source:
r2 = resources[resource2]
v2 = r2.args[input2]
if v1 == v2 and resource1 != resource2 and \
(resource2, input2, resource1, input1) not in ret:
ret.add((resource1, input1, resource2, input2))
return list(ret)
def _compile_file(name, path, kwargs):
with open(path) as f:
content = f.read()
@@ -189,4 +95,3 @@ def _get_template(name, content, kwargs, inputs):
def is_virtual(path):
return os.path.isfile(path)

View File

@@ -0,0 +1,22 @@
__all__ = [
'Resource',
'assign_resources_to_nodes',
'connect_resources',
'create',
'load',
'load_all',
'prepare_meta',
'wrap_resource',
'validate_resources',
]
from solar.core.resource.resource import Resource
from solar.core.resource.resource import assign_resources_to_nodes
from solar.core.resource.resource import connect_resources
from solar.core.resource.resource import load
from solar.core.resource.resource import load_all
from solar.core.resource.resource import wrap_resource
from solar.core.resource.virtual_resource import create
from solar.core.resource.virtual_resource import prepare_meta
from solar.core.resource.virtual_resource import validate_resources

View File

@@ -0,0 +1,237 @@
# -*- coding: utf-8 -*-
import os
from copy import deepcopy
import solar
from solar.core import actions
from solar.core import observer
from solar.core import signals
from solar.core import validation
from solar.core.connections import ResourcesConnectionGraph
from solar.interfaces.db import get_db
db = get_db()
class Resource(object):
_metadata = {}
def __init__(self, name, metadata, args, tags=None, virtual_resource=None):
self.name = name
if metadata:
self.metadata = metadata
else:
self.metadata = deepcopy(self._metadata)
self.metadata['id'] = name
self.tags = tags or []
self.virtual_resource = virtual_resource
self.set_args_from_dict(args)
@property
def actions(self):
return self.metadata.get('actions') or []
@property
def args(self):
ret = {}
args = self.args_dict()
for arg_name, metadata_arg in self.metadata['input'].items():
type_ = validation.schema_input_type(metadata_arg.get('schema', 'str'))
ret[arg_name] = observer.create(
type_, self, arg_name, args.get(arg_name)
)
return ret
def args_dict(self):
raw_resource = db.read(self.name, collection=db.COLLECTIONS.resource)
if raw_resource is None:
return {}
self.metadata = raw_resource
return Resource.get_raw_resource_args(raw_resource)
def set_args_from_dict(self, new_args):
args = self.args_dict()
args.update(new_args)
self.metadata['tags'] = self.tags
self.metadata['virtual_resource'] = self.virtual_resource
for k, v in args.items():
if k not in self.metadata['input']:
raise NotImplementedError(
'Argument {} not implemented for resource {}'.format(k, self)
)
if isinstance(v, dict) and 'value' in v:
v = v['value']
self.metadata['input'][k]['value'] = v
db.save(self.name, self.metadata, collection=db.COLLECTIONS.resource)
def set_args(self, args):
self.set_args_from_dict({k: v.value for k, v in args.items()})
def __repr__(self):
return ("Resource(name='{id}', metadata={metadata}, args={input}, "
"tags={tags})").format(**self.to_dict())
def color_repr(self):
import click
arg_color = 'yellow'
return ("{resource_s}({name_s}='{id}', {metadata_s}={metadata}, "
"{args_s}={input}, {tags_s}={tags})").format(
resource_s=click.style('Resource', fg='white', bold=True),
name_s=click.style('name', fg=arg_color, bold=True),
metadata_s=click.style('metadata', fg=arg_color, bold=True),
args_s=click.style('args', fg=arg_color, bold=True),
tags_s=click.style('tags', fg=arg_color, bold=True),
**self.to_dict()
)
def to_dict(self):
return {
'id': self.name,
'metadata': self.metadata,
'input': self.args_show(),
'tags': self.tags,
}
def args_show(self):
def formatter(v):
if isinstance(v, observer.ListObserver):
return v.value
elif isinstance(v, observer.Observer):
return {
'emitter': v.emitter.attached_to.name if v.emitter else None,
'value': v.value,
}
return v
return {k: formatter(v) for k, v in self.args.items()}
def add_tag(self, tag):
if tag not in self.tags:
self.tags.append(tag)
def remove_tag(self, tag):
try:
self.tags.remove(tag)
except ValueError:
pass
def notify(self, emitter):
"""Update resource's args from emitter's args.
:param emitter: Resource
:return:
"""
r_args = self.args
for key, value in emitter.args.iteritems():
r_args[key].notify(value)
def update(self, args):
"""This method updates resource's args with a simple dict.
:param args:
:return:
"""
# Update will be blocked if this resource is listening
# on some input that is to be updated -- we should only listen
# to the emitter and not be able to change the input's value
r_args = self.args
for key, value in args.iteritems():
r_args[key].update(value)
self.set_args(r_args)
def action(self, action):
if action in self.actions:
actions.resource_action(self, action)
else:
raise Exception('Uuups, action is not available')
@staticmethod
def get_raw_resource_args(raw_resource):
return {k: v.get('value') for k, v in raw_resource['input'].items()}
def wrap_resource(raw_resource):
name = raw_resource['id']
args = Resource.get_raw_resource_args(raw_resource)
tags = raw_resource.get('tags', [])
virtual_resource = raw_resource.get('virtual_resource', [])
return Resource(name, raw_resource, args, tags=tags, virtual_resource=virtual_resource)
def wrap_resource_no_value(raw_resource):
name = raw_resource['id']
args = {k: v for k, v in raw_resource['input'].items()}
tags = raw_resource.get('tags', [])
virtual_resource = raw_resource.get('virtual_resource', [])
return Resource(name, raw_resource, args, tags=tags, virtual_resource=virtual_resource)
def load(resource_name):
raw_resource = db.read(resource_name, collection=db.COLLECTIONS.resource)
if raw_resource is None:
raise KeyError(
'Resource {} does not exist'.format(resource_name)
)
return wrap_resource(raw_resource)
def load_all():
ret = {}
for raw_resource in db.get_list(collection=db.COLLECTIONS.resource):
resource = wrap_resource(raw_resource)
ret[resource.name] = resource
return ret
def assign_resources_to_nodes(resources, nodes):
for node in nodes:
for resource in resources:
res = deepcopy(resource)
res['tags'] = list(set(node.get('tags', [])) |
set(resource.get('tags', [])))
resource_uuid = '{0}-{1}'.format(res['id'], solar.utils.generate_uuid())
# We should not generate here any uuid's, because
# a single node should be represented with a single
# resource
node_uuid = node['id']
node_resource_template = solar.utils.read_config()['node_resource_template']
args = {k: v['value'] for k, v in res['input'].items()}
created_resource = create(resource_uuid, resource['dir_path'], args, tags=res['tags'])
created_node = create(node_uuid, node_resource_template, node, tags=node.get('tags', []))
signals.connect(created_node, created_resource)
def connect_resources(profile):
connections = profile.get('connections', [])
graph = ResourcesConnectionGraph(connections, load_all().values())
for connection in graph.iter_connections():
signals.connect(connection['from'], connection['to'], connection['mapping'])

View File

@@ -0,0 +1,196 @@
# -*- coding: UTF-8 -*-
import os
from StringIO import StringIO
import yaml
from jinja2 import Template, Environment, meta
from solar import utils
from solar.core import validation
from solar.core.resource import load_all, Resource
from solar.core import provider
from solar.core import signals
def create_resource(name, base_path, args, virtual_resource=None):
if isinstance(base_path, provider.BaseProvider):
base_path = base_path.directory
base_meta_file = os.path.join(base_path, 'meta.yaml')
actions_path = os.path.join(base_path, 'actions')
metadata = utils.yaml_load(base_meta_file)
metadata['id'] = name
metadata['version'] = '1.0.0'
metadata['base_path'] = os.path.abspath(base_path)
prepare_meta(metadata)
if os.path.exists(actions_path):
for f in os.listdir(actions_path):
metadata['actions'][os.path.splitext(f)[0]] = f
tags = metadata.get('tags', [])
resource = Resource(name, metadata, args, tags, virtual_resource)
return resource
def create_virtual_resource(vr_name, template):
resources = template['resources']
connections = []
created_resources = []
cwd = os.getcwd()
for resource in resources:
name = resource['id']
base_path = os.path.join(cwd, resource['from'])
args = resource['values']
new_resources = create(name, base_path, args, vr_name)
created_resources += new_resources
if not is_virtual(base_path):
for key, arg in args.items():
if isinstance(arg, basestring) and '::' in arg:
emitter, src = arg.split('::')
connections.append((emitter, name, {src: key}))
db = load_all()
for emitter, reciver, mapping in connections:
emitter = db[emitter]
reciver = db[reciver]
signals.connect(emitter, reciver, mapping)
return created_resources
def create(name, base_path, kwargs, virtual_resource=None):
if isinstance(base_path, provider.BaseProvider):
base_path = base_path.directory
if not os.path.exists(base_path):
raise Exception(
'Base resource does not exist: {0}'.format(base_path)
)
if is_virtual(base_path):
template = _compile_file(name, base_path, kwargs)
yaml_template = yaml.load(StringIO(template))
resources = create_virtual_resource(name, yaml_template)
else:
resource = create_resource(name, base_path, kwargs, virtual_resource)
resources = [resource]
return resources
def prepare_meta(meta):
actions_path = os.path.join(meta['base_path'], 'actions')
meta['actions_path'] = actions_path
meta['base_name'] = os.path.split(meta['base_path'])[-1]
meta['actions'] = {}
if os.path.exists(meta['actions_path']):
for f in os.listdir(meta['actions_path']):
meta['actions'][os.path.splitext(f)[0]] = f
def validate_resources():
db = load_all()
all_errors = []
for r in db.values():
if not isinstance(r, Resource):
continue
errors = validation.validate_resource(r)
if errors:
all_errors.append((r, errors))
return all_errors
def find_inputs_without_source():
"""Find resources and inputs values of which are hardcoded.
:return: [(resource_name, input_name)]
"""
resources = load_all()
ret = set([(r.name, input_name) for r in resources.values()
for input_name in r.args])
clients = signals.Connections.read_clients()
for dest_dict in clients.values():
for destinations in dest_dict.values():
for receiver_name, receiver_input in destinations:
try:
ret.remove((receiver_name, receiver_input))
except KeyError:
continue
return list(ret)
def find_missing_connections():
"""Find resources whose input values are duplicated
and they are not connected between each other (i.e. the values
are hardcoded, not coming from connection).
NOTE: this we could have 2 inputs of the same value living in 2 "circles".
This is not covered, we find only inputs whose value is hardcoded.
:return: [(resource_name1, input_name1, resource_name2, input_name2)]
"""
ret = set()
resources = load_all()
inputs_without_source = find_inputs_without_source()
for resource1, input1 in inputs_without_source:
r1 = resources[resource1]
v1 = r1.args[input1]
for resource2, input2 in inputs_without_source:
r2 = resources[resource2]
v2 = r2.args[input2]
if v1 == v2 and resource1 != resource2 and \
(resource2, input2, resource1, input1) not in ret:
ret.add((resource1, input1, resource2, input2))
return list(ret)
def _compile_file(name, path, kwargs):
with open(path) as f:
content = f.read()
inputs = get_inputs(content)
template = _get_template(name, content, kwargs, inputs)
return template
def get_inputs(content):
env = Environment()
ast = env.parse(content)
return meta.find_undeclared_variables(ast)
def _get_template(name, content, kwargs, inputs):
missing = []
for input in inputs:
if input not in kwargs:
missing.append(input)
if missing:
raise Exception('[{0}] Validation error. Missing data in input: {1}'.format(name, missing))
template = Template(content)
template = template.render(str=str, zip=zip, **kwargs)
return template
def is_virtual(path):
return os.path.isfile(path)

View File

@@ -1,105 +1,11 @@
# -*- coding: utf-8 -*-
from collections import defaultdict
import itertools
import networkx as nx
from solar.core.log import log
from solar.interfaces.db import get_db
from solar.events.api import add_events
from solar.events.controls import Dependency
db = get_db()
class Connections(object):
@staticmethod
def read_clients():
"""
Returned structure is:
emitter_name:
emitter_input_name:
- - dst_name
- dst_input_name
while DB structure is:
emitter_name_key:
emitter: emitter_name
sources:
emitter_input_name:
- - dst_name
- dst_input_name
"""
ret = {}
for data in db.get_list(collection=db.COLLECTIONS.connection):
ret[data['emitter']] = data['sources']
return ret
@staticmethod
def save_clients(clients):
data = []
for emitter_name, sources in clients.items():
data.append((
emitter_name,
{
'emitter': emitter_name,
'sources': sources,
}))
db.save_list(data, collection=db.COLLECTIONS.connection)
@staticmethod
def add(emitter, src, receiver, dst):
if src not in emitter.args:
return
clients = Connections.read_clients()
# TODO: implement general circular detection, this one is simple
if [emitter.name, src] in clients.get(receiver.name, {}).get(dst, []):
raise Exception('Attempted to create cycle in dependencies. Not nice.')
clients.setdefault(emitter.name, {})
clients[emitter.name].setdefault(src, [])
if [receiver.name, dst] not in clients[emitter.name][src]:
clients[emitter.name][src].append([receiver.name, dst])
Connections.save_clients(clients)
@staticmethod
def remove(emitter, src, receiver, dst):
clients = Connections.read_clients()
clients[emitter.name][src] = [
destination for destination in clients[emitter.name][src]
if destination != [receiver.name, dst]
]
Connections.save_clients(clients)
@staticmethod
def receivers(emitter_name, emitter_input_name):
return Connections.read_clients().get(emitter_name, {}).get(
emitter_input_name, []
)
@staticmethod
def emitter(receiver_name, receiver_input_name):
for emitter_name, dest_dict in Connections.read_clients().items():
for emitter_input_name, destinations in dest_dict.items():
if [receiver_name, receiver_input_name] in destinations:
return [emitter_name, emitter_input_name]
@staticmethod
def clear():
db.clear_collection(collection=db.COLLECTIONS.connection)
def guess_mapping(emitter, receiver):
"""Guess connection mapping between emitter and receiver.
@@ -126,23 +32,7 @@ def guess_mapping(emitter, receiver):
return guessed
def connect_single(emitter, src, receiver, dst):
# Disconnect all receiver inputs
# Check if receiver input is of list type first
if receiver.args[dst].type_ != 'list':
disconnect_receiver_by_input(receiver, dst)
emitter.args[src].subscribe(receiver.args[dst])
def connect(emitter, receiver, mapping=None, events=None):
# convert if needed
# TODO: handle invalid resource
# if isinstance(emitter, basestring):
# emitter = resource.load(emitter)
# if isinstance(receiver, basestring):
# receiver = resource.load(receiver)
def connect(emitter, receiver, mapping={}, events=None):
mapping = mapping or guess_mapping(emitter, receiver)
if isinstance(mapping, set):
@@ -158,149 +48,58 @@ def connect(emitter, receiver, mapping=None, events=None):
connect_single(emitter, src, receiver, dst)
# possibility to set events, when False it will NOT add events at all
# setting events to dict with `action_name`:False will not add `action_name`
# event
events_to_add = [
Dependency(emitter.name, 'run', 'success', receiver.name, 'run'),
Dependency(emitter.name, 'update', 'success', receiver.name, 'update')
]
if isinstance(events, dict):
for k, v in events.items():
if v is not False:
events_to_add = filter(lambda x: x.parent_action == k, events_to_add)
add_events(emitter.name, events_to_add)
elif events is not False:
add_events(emitter.name, events_to_add)
# receiver.save()
def connect_single(emitter, src, receiver, dst):
# Disconnect all receiver inputs
# Check if receiver input is of list type first
emitter_input = emitter.resource_inputs()[src]
receiver_input = receiver.resource_inputs()[dst]
if emitter_input.uid == receiver_input.uid:
raise Exception(
'Trying to connect {} to itself, this is not possible'.format(
emitter_input.uid)
)
if not receiver_input.properties['is_list']:
db.delete_relations(
dest=receiver_input,
type_=db.RELATION_TYPES.input_to_input
)
# Check for cycles
# TODO: change to get_paths after it is implemented in drivers
r = db.get_relations(
receiver_input,
emitter_input,
type_=db.RELATION_TYPES.input_to_input
)
if r:
raise Exception('Prevented creating a cycle')
db.get_or_create_relation(
emitter_input,
receiver_input,
properties={},
type_=db.RELATION_TYPES.input_to_input
)
def disconnect_receiver_by_input(receiver, input_name):
input_node = receiver.resource_inputs()[input_name]
db.delete_relations(
dest=input_node,
type_=db.RELATION_TYPES.input_to_input
)
def disconnect(emitter, receiver):
# convert if needed
# TODO: handle invalid resource
# if isinstance(emitter, basestring):
# emitter = resource.load(emitter)
# if isinstance(receiver, basestring):
# receiver = resource.load(receiver)
clients = Connections.read_clients()
for src, destinations in clients[emitter.name].items():
for destination in destinations:
receiver_input = destination[1]
if receiver_input in receiver.args:
if receiver.args[receiver_input].type_ != 'list':
log.debug(
'Removing input %s from %s', receiver_input, receiver.name
)
emitter.args[src].unsubscribe(receiver.args[receiver_input])
disconnect_by_src(emitter.name, src, receiver)
def disconnect_receiver_by_input(receiver, input):
"""Find receiver connection by input and disconnect it.
:param receiver:
:param input:
:return:
"""
clients = Connections.read_clients()
for emitter_name, inputs in clients.items():
disconnect_by_src(emitter_name, input, receiver)
def disconnect_by_src(emitter_name, src, receiver):
clients = Connections.read_clients()
if src in clients[emitter_name]:
clients[emitter_name][src] = [
destination for destination in clients[emitter_name][src]
if destination[0] != receiver.name
]
Connections.save_clients(clients)
def notify(source, key, value):
from solar.core.resource import load
clients = Connections.read_clients()
if source.name not in clients:
clients[source.name] = {}
Connections.save_clients(clients)
log.debug('Notify %s %s %s %s', source.name, key, value, clients[source.name])
if key in clients[source.name]:
for client, r_key in clients[source.name][key]:
resource = load(client)
log.debug('Resource found: %s', client)
if resource:
resource.update({r_key: value}, emitter=source)
else:
log.debug('Resource %s deleted?', client)
pass
def assign_connections(receiver, connections):
mappings = defaultdict(list)
for key, dest in connections.iteritems():
resource, r_key = dest.split('.')
mappings[resource].append([r_key, key])
for resource, r_mappings in mappings.iteritems():
connect(resource, receiver, r_mappings)
def connection_graph():
resource_dependencies = {}
clients = Connections.read_clients()
for emitter_name, destination_values in clients.items():
resource_dependencies.setdefault(emitter_name, set())
for emitter_input, receivers in destination_values.items():
resource_dependencies[emitter_name].update(
receiver[0] for receiver in receivers
for emitter_input in emitter.resource_inputs().values():
for receiver_input in receiver.resource_inputs().values():
db.delete_relations(
source=emitter_input,
dest=receiver_input,
type_=db.RELATION_TYPES.input_to_input
)
g = nx.DiGraph()
# TODO: tags as graph node attributes
for emitter_name, receivers in resource_dependencies.items():
g.add_node(emitter_name)
g.add_nodes_from(receivers)
g.add_edges_from(
itertools.izip(
itertools.repeat(emitter_name),
receivers
)
)
return g
def detailed_connection_graph(start_with=None, end_with=None):
g = nx.MultiDiGraph()
clients = Connections.read_clients()
for emitter_name, destination_values in clients.items():
for emitter_input, receivers in destination_values.items():
for receiver_name, receiver_input in receivers:
label = '{}:{}'.format(emitter_input, receiver_input)
g.add_edge(emitter_name, receiver_name, label=label)
ret = g
if start_with is not None:
ret = g.subgraph(
nx.dfs_postorder_nodes(ret, start_with)
)
if end_with is not None:
ret = g.subgraph(
nx.dfs_postorder_nodes(ret.reverse(), end_with)
)
return ret

View File

@@ -0,0 +1,276 @@
# -*- coding: utf-8 -*-
from collections import defaultdict
import itertools
import networkx as nx
from solar.core.log import log
from solar.interfaces.db import get_db
db = get_db()
class Connections(object):
@staticmethod
def read_clients():
"""
Returned structure is:
emitter_name:
emitter_input_name:
- - dst_name
- dst_input_name
while DB structure is:
emitter_name_key:
emitter: emitter_name
sources:
emitter_input_name:
- - dst_name
- dst_input_name
"""
ret = {}
for data in db.get_list(collection=db.COLLECTIONS.connection):
ret[data['emitter']] = data['sources']
return ret
@staticmethod
def save_clients(clients):
data = []
for emitter_name, sources in clients.items():
data.append((
emitter_name,
{
'emitter': emitter_name,
'sources': sources,
}))
db.save_list(data, collection=db.COLLECTIONS.connection)
@staticmethod
def add(emitter, src, receiver, dst):
if src not in emitter.args:
return
clients = Connections.read_clients()
# TODO: implement general circular detection, this one is simple
if [emitter.name, src] in clients.get(receiver.name, {}).get(dst, []):
raise Exception('Attempted to create cycle in dependencies. Not nice.')
clients.setdefault(emitter.name, {})
clients[emitter.name].setdefault(src, [])
if [receiver.name, dst] not in clients[emitter.name][src]:
clients[emitter.name][src].append([receiver.name, dst])
Connections.save_clients(clients)
@staticmethod
def remove(emitter, src, receiver, dst):
clients = Connections.read_clients()
clients[emitter.name][src] = [
destination for destination in clients[emitter.name][src]
if destination != [receiver.name, dst]
]
Connections.save_clients(clients)
@staticmethod
def receivers(emitter_name, emitter_input_name):
return Connections.read_clients().get(emitter_name, {}).get(
emitter_input_name, []
)
@staticmethod
def emitter(receiver_name, receiver_input_name):
for emitter_name, dest_dict in Connections.read_clients().items():
for emitter_input_name, destinations in dest_dict.items():
if [receiver_name, receiver_input_name] in destinations:
return [emitter_name, emitter_input_name]
@staticmethod
def clear():
db.clear_collection(collection=db.COLLECTIONS.connection)
def guess_mapping(emitter, receiver):
"""Guess connection mapping between emitter and receiver.
Suppose emitter and receiver have common inputs:
ip, ssh_key, ssh_user
Then we return a connection mapping like this:
{
'ip': '<receiver>.ip',
'ssh_key': '<receiver>.ssh_key',
'ssh_user': '<receiver>.ssh_user'
}
:param emitter:
:param receiver:
:return:
"""
guessed = {}
for key in emitter.args:
if key in receiver.args:
guessed[key] = key
return guessed
def connect_single(emitter, src, receiver, dst):
# Disconnect all receiver inputs
# Check if receiver input is of list type first
if receiver.args[dst].type_ != 'list':
disconnect_receiver_by_input(receiver, dst)
emitter.args[src].subscribe(receiver.args[dst])
def connect(emitter, receiver, mapping=None):
mapping = mapping or guess_mapping(emitter, receiver)
if isinstance(mapping, set):
for src in mapping:
connect_single(emitter, src, receiver, src)
return
for src, dst in mapping.items():
if isinstance(dst, list):
for d in dst:
connect_single(emitter, src, receiver, d)
continue
connect_single(emitter, src, receiver, dst)
#receiver.save()
def disconnect(emitter, receiver):
clients = Connections.read_clients()
for src, destinations in clients[emitter.name].items():
for destination in destinations:
receiver_input = destination[1]
if receiver_input in receiver.args:
if receiver.args[receiver_input].type_ != 'list':
log.debug(
'Removing input %s from %s', receiver_input, receiver.name
)
emitter.args[src].unsubscribe(receiver.args[receiver_input])
disconnect_by_src(emitter.name, src, receiver)
def disconnect_receiver_by_input(receiver, input):
"""Find receiver connection by input and disconnect it.
:param receiver:
:param input:
:return:
"""
clients = Connections.read_clients()
for emitter_name, inputs in clients.items():
disconnect_by_src(emitter_name, input, receiver)
def disconnect_by_src(emitter_name, src, receiver):
clients = Connections.read_clients()
if src in clients[emitter_name]:
clients[emitter_name][src] = [
destination for destination in clients[emitter_name][src]
if destination[0] != receiver.name
]
Connections.save_clients(clients)
def notify(source, key, value):
from solar.core.resource import load
clients = Connections.read_clients()
if source.name not in clients:
clients[source.name] = {}
Connections.save_clients(clients)
log.debug('Notify %s %s %s %s', source.name, key, value, clients[source.name])
if key in clients[source.name]:
for client, r_key in clients[source.name][key]:
resource = load(client)
log.debug('Resource found: %s', client)
if resource:
resource.update({r_key: value}, emitter=source)
else:
log.debug('Resource %s deleted?', client)
pass
def assign_connections(receiver, connections):
mappings = defaultdict(list)
for key, dest in connections.iteritems():
resource, r_key = dest.split('.')
mappings[resource].append([r_key, key])
for resource, r_mappings in mappings.iteritems():
connect(resource, receiver, r_mappings)
def connection_graph():
resource_dependencies = {}
clients = Connections.read_clients()
for emitter_name, destination_values in clients.items():
resource_dependencies.setdefault(emitter_name, set())
for emitter_input, receivers in destination_values.items():
resource_dependencies[emitter_name].update(
receiver[0] for receiver in receivers
)
g = nx.DiGraph()
# TODO: tags as graph node attributes
for emitter_name, receivers in resource_dependencies.items():
g.add_node(emitter_name)
g.add_nodes_from(receivers)
g.add_edges_from(
itertools.izip(
itertools.repeat(emitter_name),
receivers
)
)
return g
def detailed_connection_graph(start_with=None, end_with=None):
g = nx.MultiDiGraph()
clients = Connections.read_clients()
for emitter_name, destination_values in clients.items():
for emitter_input, receivers in destination_values.items():
for receiver_name, receiver_input in receivers:
label = '{}:{}'.format(emitter_input, receiver_input)
g.add_edge(emitter_name, receiver_name, label=label)
ret = g
if start_with is not None:
ret = g.subgraph(
nx.dfs_postorder_nodes(ret, start_with)
)
if end_with is not None:
ret = g.subgraph(
nx.dfs_postorder_nodes(ret.reverse(), end_with)
)
return ret

View File

@@ -158,7 +158,7 @@ def validate_resource(r):
ret = {}
input_schemas = r.metadata['input']
args = r.args_dict()
args = r.args
for input_name, input_definition in input_schemas.items():
errors = validate_input(