Connections abstraction added over CLIENTS
This commit is contained in:
@@ -18,7 +18,7 @@ def deploy(filename):
|
||||
|
||||
# Clean stuff first
|
||||
db.clear()
|
||||
xs.clear()
|
||||
xs.Connections.clear()
|
||||
shutil.rmtree(resource_save_path, ignore_errors=True)
|
||||
os.makedirs(resource_save_path)
|
||||
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
from x import signals
|
||||
|
||||
|
||||
class BaseObserver(object):
|
||||
type_ = None
|
||||
|
||||
@@ -58,6 +61,14 @@ class BaseObserver(object):
|
||||
return
|
||||
self.receivers.append(receiver)
|
||||
receiver.subscribed(self)
|
||||
|
||||
signals.Connections.add(
|
||||
self.attached_to,
|
||||
self.name,
|
||||
receiver.attached_to,
|
||||
receiver.name
|
||||
)
|
||||
|
||||
receiver.notify(self)
|
||||
|
||||
def subscribed(self, emitter):
|
||||
|
||||
@@ -156,6 +156,6 @@ def load_all(dest_path):
|
||||
resource = load(resource_path)
|
||||
ret[resource.name] = resource
|
||||
|
||||
signals.reconnect_all()
|
||||
signals.Connections.reconnect_all()
|
||||
|
||||
return ret
|
||||
|
||||
68
x/signals.py
68
x/signals.py
@@ -13,14 +13,41 @@ CLIENTS_CONFIG_KEY = 'clients-data-file'
|
||||
CLIENTS = utils.read_config_file(CLIENTS_CONFIG_KEY)
|
||||
|
||||
|
||||
def clear():
|
||||
global CLIENTS
|
||||
class Connections(object):
|
||||
@staticmethod
|
||||
def add(emitter, src, receiver, dst):
|
||||
if src not in emitter.args:
|
||||
return
|
||||
|
||||
CLIENTS = {}
|
||||
CLIENTS.setdefault(emitter.name, {})
|
||||
CLIENTS[emitter.name].setdefault(src, [])
|
||||
CLIENTS[emitter.name][src].append((receiver.name, dst))
|
||||
|
||||
path = utils.read_config()[CLIENTS_CONFIG_KEY]
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
|
||||
|
||||
@staticmethod
|
||||
def reconnect_all():
|
||||
"""Reconstruct connections for resource inputs from CLIENTS.
|
||||
|
||||
:return:
|
||||
"""
|
||||
for emitter_name, dest_dict in CLIENTS.items():
|
||||
emitter = db.get_resource(emitter_name)
|
||||
for emitter_input, destinations in dest_dict.items():
|
||||
for receiver_name, receiver_input in destinations:
|
||||
receiver = db.get_resource(receiver_name)
|
||||
receiver.args[receiver_input].subscribe(
|
||||
emitter.args[emitter_input])
|
||||
|
||||
@staticmethod
|
||||
def clear():
|
||||
global CLIENTS
|
||||
|
||||
CLIENTS = {}
|
||||
|
||||
path = utils.read_config()[CLIENTS_CONFIG_KEY]
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
|
||||
|
||||
def guess_mapping(emitter, receiver):
|
||||
@@ -59,38 +86,11 @@ def connect(emitter, receiver, mapping=None):
|
||||
if receiver.args[dst].type_ != 'list':
|
||||
disconnect_receiver_by_input(receiver, dst)
|
||||
|
||||
connect_src_dst(emitter, src, receiver, dst)
|
||||
emitter.args[src].subscribe(receiver.args[dst])
|
||||
|
||||
receiver.save()
|
||||
|
||||
|
||||
def connect_src_dst(emitter, src, receiver, dst):
|
||||
if src not in emitter.args:
|
||||
return
|
||||
|
||||
CLIENTS.setdefault(emitter.name, {})
|
||||
CLIENTS[emitter.name].setdefault(src, [])
|
||||
CLIENTS[emitter.name][src].append((receiver.name, dst))
|
||||
|
||||
emitter.args[src].subscribe(receiver.args[dst])
|
||||
|
||||
utils.save_to_config_file(CLIENTS_CONFIG_KEY, CLIENTS)
|
||||
|
||||
|
||||
def reconnect_all():
|
||||
"""Reconstruct connections for resource inputs from CLIENTS.
|
||||
|
||||
:return:
|
||||
"""
|
||||
for emitter_name, dest_dict in CLIENTS.items():
|
||||
emitter = db.get_resource(emitter_name)
|
||||
for emitter_input, destinations in dest_dict.items():
|
||||
for receiver_name, receiver_input in destinations:
|
||||
receiver = db.get_resource(receiver_name)
|
||||
receiver.args[receiver_input].subscribe(
|
||||
emitter.args[emitter_input])
|
||||
|
||||
|
||||
def disconnect(emitter, receiver):
|
||||
for src, destinations in CLIENTS[emitter.name].items():
|
||||
disconnect_by_src(emitter, src, receiver)
|
||||
|
||||
@@ -16,7 +16,7 @@ class BaseResourceTest(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.storage_dir)
|
||||
db.clear()
|
||||
xs.clear()
|
||||
xs.Connections.clear()
|
||||
|
||||
def make_resource_meta(self, meta_yaml):
|
||||
meta = yaml.load(meta_yaml)
|
||||
|
||||
Reference in New Issue
Block a user