From fbcd42e0eab54f303f1ae3d05b9a1ceda838d804 Mon Sep 17 00:00:00 2001 From: Anusha Ramineni Date: Fri, 9 Sep 2016 15:51:30 +0530 Subject: [PATCH] Remove old dse reference in code Change-Id: I49fed8868678c50030584567f984d97f43a7ede5 --- congress/dse/README.rst | 128 ----- congress/dse/__init__.py | 0 congress/dse/amqprouter.py | 152 ------ congress/dse/d6cage.py | 419 ---------------- congress/dse/d6message.py | 56 --- congress/dse/dataobj.py | 127 ----- congress/dse/deepsix.py | 573 ---------------------- congress/tests/base.py | 4 - congress/tests/datasources/test_driver.py | 70 --- congress/tests/dse/test_dse.py | 180 ------- congress/tests/test_benchmark_updates.py | 9 +- 11 files changed, 4 insertions(+), 1714 deletions(-) delete mode 100644 congress/dse/README.rst delete mode 100644 congress/dse/__init__.py delete mode 100644 congress/dse/amqprouter.py delete mode 100644 congress/dse/d6cage.py delete mode 100644 congress/dse/d6message.py delete mode 100644 congress/dse/dataobj.py delete mode 100644 congress/dse/deepsix.py delete mode 100644 congress/tests/datasources/test_driver.py delete mode 100644 congress/tests/dse/test_dse.py diff --git a/congress/dse/README.rst b/congress/dse/README.rst deleted file mode 100644 index 6167b8955..000000000 --- a/congress/dse/README.rst +++ /dev/null @@ -1,128 +0,0 @@ -Data Services Engine -==================== - -The DSE is a lightweight variation of a Data Stream Management System. The -purpose of the DSE is to retrieve, or receive, data from external sources, then -format and present the data on a message bus. - -Overview --------- - -The DSE consists of a Python "cage" (see d6cage.py) which contains one or more -module instances. These are instances of an eventlet subclass called "deepsix" -(see deepsix.py). Each eventlet has an "inbox" queue. All eventlets share an -"outbox" queue called the "datapath". - -A lightweight AMQP router in the cage (see amqprouter.py) routes messages from -the datapath to the appropriate eventlet inbox. In this way, the deepsix -instances are able to communicate with each other. - -A deepsix instance may listen to multiple AMQP addresses. However, every -deepsix instance must have at least one unique non-wildcard AMQP address. -Subsequent addresses do not have to be unique. AMQP wildcards are supported -for these additional addresses. - -Deepsix -------- - -Publisher -~~~~~~~~~ - -A publishing deepsix instance will either pull data from an external source, or -have data pushed to it. The nature of how this is achieved is dependent on the -external data source and the libraries used to access it. For example, a -deepsix module might use the pyVmomi to periodically poll a VMware vSphere -instance to retrieve meta-data for all VM instances on a particular host. - -A developer using deepsix will write code to periodically poll vSphere, extract -the data from the pyVmomi response object, and format it into a JSON data -structure. Next, the "self.publish" method provided by deepsix will be used to -publish the data on the DSE message bus. - -Invoking "self.publish" results in calls to the "prepush_processor" and "push" -methods. For example, if a list of VMs on a host is retrieved from a vSphere -instance, this list is formatted in JSON and the results stored locally in the -instance. Before sending out any updates, the prepush_processor method is -called. Here data is groomed before sending out. Using the prepush_processor -method, a delta of the data can be sent out to known subscribers, instead of -all the data every time it is retrieved from vSphere. Finally, the "push" -method is called, and a list of known subscribers is iterated over, sending the -update to each. - -Incoming subscription requests are processed by the "self.insub" method within -deepsix.py. - -Published data is stored in a dictionary called "pubData". - -Subscriber -~~~~~~~~~~ - -A subscribing deepsix instance will use the "self.subscribe" method to announce -it's interest in data being published on the DSE bus. This announcement is -transmitted periodically, at an interval specified by the developer. When -"self.subscribe" is called, a callback is provided by the developer as an -argument. When new data is received by the subscriber, the callback is invoked -with the published data message passed as an argument. - -A subscriber may need data from multiple sources. There are two ways this can -happen: (1) Multiple invocations of "self.subscribe" to publishers of -different types of data, or (2) A single invocation of "self.subscribe" which -is received by multiple publishers listening to the same AMQP address. - -In the former case a unique UUID, used as a subscription ID, is generated for -each call to "self.subscribe". This UUID is used internally by deepsix to -differentiate between subscriptions. A unique callback can be provided for -each subscription. - -If a UUID is not provided, one is automatically generated. This UUID is sent -to the publisher within the periodic "subscribe" message. When the publisher -sends an update, the subscription UUID is included with the update. - -Let's consider the case of multiple publishers listening to the same AMQP -address for subscriptions. For example, you may have two vSphere deepsix -instances: "vSphere.Boston" and "vSphere.Chicago". Those are the unique names -for those instances, however, both of those instances may also be listening to -the address "vSphere.node.list". - -A subscribing instance might send a subscription announcement to -"vSphere.node.list". In this case, both "vSphere.Boston" and "vSphere.Chicago" -will receive this subscription request and start publishing data back to the -subscriber. The subscriber maintains a nested dictionary "subData" which is a -dictionary, indexed by subscription ID. Each subscription ID, in turn, is a -dictionary indexed by the unique AMQP addresses of the publishers providing -that data. - -Incoming published data is processed by the "self.inpubrep" method within -deepsix.py. It is from this method that the developer provided callback is -invoked. - -Request/Reply -~~~~~~~~~~~~~ - -Another way to retrieve data is with "self.request". This is a one-off -asynchronous request for data. - -d6cage ------- - -The d6cage is itself a deepsix instance. It listens to the AMQP addresses -"local.d6cage" and "local.router". When a deepsix instance within d6cage is -created, it registers it's AMQP addresses by invoking "self.subscribe" and -sending the subscription to "local.router". The d6cage will then add the AMQP -address to it's AMQP route table with the instance inbox thread as a -destination. - - -Miscellaneous/TO-DO -------------------- - -Need to modify d6cage.py/deepsix.py to support dynamic -loading/reloading/stopping of modules. - -Need to write a module to proxy external mq bus. For instance, there may be -multiple OpenStack instances. If a developer wants to receive updates from -Nova on "compute.instance.update", then they will need to disambiguate between -instances of Nova. A proxy module would be loaded for each OpenStack instance. -Subscriptions would be sent to "openstack1.compute.instance.update" and/or -"openstack2.compute.instance.update" - diff --git a/congress/dse/__init__.py b/congress/dse/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/congress/dse/amqprouter.py b/congress/dse/amqprouter.py deleted file mode 100644 index 1a704c470..000000000 --- a/congress/dse/amqprouter.py +++ /dev/null @@ -1,152 +0,0 @@ -# Copyright 2014 Plexxi, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -from __future__ import division -from __future__ import absolute_import - -import six - - -class Node(object): - def __init__(self, rPath=[], results=set()): - self.destinations = set() - - self.results = results - self.children = {} - self.rPath = rPath - - def _remove(self, patternList, destination): - - word = patternList[0] - - if word in self.children: - - if len(patternList) == 1: - if destination in self.children[word].destinations: - self.children[word].destinations.remove(destination) - - if (len(self.children[word].destinations) == 0 and - len(self.children[word].children) == 0): - - del self.children[word] - - else: - self.children[word]._remove(patternList[1:], destination) - - if (len(self.children[word].destinations) == 0 and - len(self.children[word].children) == 0): - - del self.children[word] - - def _add(self, patternList, destination): - - word = patternList[0] - - if word not in self.children: - - if word == "#": - self.children['#'] = hashNode( - rPath=self.rPath + ['#'], - results=self.results) - - else: - self.children[word] = Node( - rPath=self.rPath + [word], - results=self.results) - - if len(patternList) == 1: - self.children[word].destinations.add(destination) - - else: - self.children[word]._add(patternList[1:], destination) - - def update_results(self): - if '#' in self.children: - self.children['#'].update_results() - - self.results.update(self.destinations) - - def _lookup(self, keyList): - word = keyList[0] - - if len(keyList) == 1: - if word in self.children: - self.children[word].update_results() - - if '*' in self.children: - if word: - self.children['*'].update_results() - - else: - if word in self.children: - self.children[word]._lookup(keyList[1:]) - - if '*' in self.children: - if word: - self.children['*']._lookup(keyList[1:]) - - if '#' in self.children: - self.children['#']._lookup(keyList[:]) - self.children['#'].update_results() - - -class hashNode(Node): - def _lookup(self, keyList): - for i in range(len(keyList)): - if keyList[i] in self.children: - self.children[keyList[i]]._lookup(keyList[i:]) - - if '*' in self.children: - if keyList[i]: - self.children['*']._lookup(keyList[i:]) - - if '#' in self.children: - self.children['#']._lookup(keyList[i:]) - - if keyList[-1] in self.children: - self.children[keyList[-1]].update_results() - - if '*' in self.children: - if keyList[-1]: - self.children['*'].update_results() - - -class routeTable(Node): - def add(self, pattern, destination): - if type(pattern) == list: - for p in pattern: - wordList = p.split('.') - self._add(wordList, destination) - elif isinstance(pattern, six.string_types): - wordList = pattern.split('.') - self._add(wordList, destination) - - def remove(self, pattern, destination): - if type(pattern) == list: - for p in pattern: - wordList = p.split('.') - self._remove(wordList, destination) - elif isinstance(pattern, six.string_types): - wordList = pattern.split('.') - self._remove(wordList, destination) - - def lookup(self, key): - self.results.clear() - - wordList = key.split('.') - - self._lookup(wordList) - - return self.results diff --git a/congress/dse/d6cage.py b/congress/dse/d6cage.py deleted file mode 100644 index 5538fd60f..000000000 --- a/congress/dse/d6cage.py +++ /dev/null @@ -1,419 +0,0 @@ -# Copyright 2014 Plexxi, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Main entrypoint for the DSE -# -# Configuration in d6cage.ini -# -# Prerequisites: -# - Plexxi API libraries (there is an RPM) -# - Python dependencies (see readme elsewhere, or capture RPM) - -from __future__ import print_function -from __future__ import division -from __future__ import absolute_import - -import imp -import sys -import traceback - -import eventlet -eventlet.monkey_patch() -from oslo_log import log as logging -from oslo_utils import importutils -from oslo_utils import strutils - -from congress.dse import amqprouter -from congress.dse import d6message -from congress.dse import deepsix - - -LOG = logging.getLogger(__name__) - - -class DataServiceError (Exception): - pass - - -# This holds the cage instance singleton -instances = {} - - -def singleton(class_): - global instances - - def getinstance(*args, **kwargs): - if class_ not in instances: - instances[class_] = class_(*args, **kwargs) - return instances[class_] - return getinstance - - -def delete_cage(): - global instances - for instance in instances.values(): - del instance - instances = {} - - -@singleton -class d6Cage(deepsix.deepSix): - def __init__(self): - self.config = {} - self.config['modules'] = {} - self.config['services'] = {} - - # Dictionary mapping service name to a dict of arguments. - # Those arguments are only passed to d6service by createservice if they - # are not alreay present in the ARGS argument given to createservice. - self.default_service_args = {} - - cageKeys = ['python.d6cage'] - cageDesc = 'deepsix python cage' - name = "d6cage" - - deepsix.deepSix.__init__(self, name, cageKeys) - - self.inbox = eventlet.Queue() - self.dataPath = eventlet.Queue() - - self.table = amqprouter.routeTable() - self.table.add("local.router", self.inbox) - self.table.add(self.name, self.inbox) - self.table.add("router", self.inbox) - - localname = "local." + self.name - self.table.add(localname, self.inbox) - - self.modules = {} - self.services = {} - - self.greenThreads = [] - - self.unloadingServices = {} - self.reloadingServices = set() - - self.services[self.name] = {} - self.services[self.name]['service'] = self - self.services[self.name]['name'] = self.name - self.services[self.name]['description'] = cageDesc - self.services[self.name]['inbox'] = self.inbox - self.services[self.name]['keys'] = self.keys - self.services[self.name]['type'] = None - self.services[self.name]['id'] = None - - self.subscribe( - "local.d6cage", - "routeKeys", - callback=self.updateRoutes, - interval=5) - - self.router_greenthread = eventlet.spawn(self.router_loop) - - def __del__(self): - # This function gets called when the interpreter deletes the object - # by the automatic garbage cleanup - for gt in self.greenThreads: - eventlet.kill(gt) - - eventlet.kill(self.router_greenthread) - eventlet.kill(self) - - def newConfig(self, msg): - newConfig = msg.body.data - if type(newConfig) == dict and newConfig: - if "modules" in newConfig: - for module in newConfig["modules"]: - if module not in sys.modules: - self.loadModule( - module, - newConfig['modules'][module]['filename']) - - if "services" in newConfig: - for service in newConfig['services']: - if service not in self.services: - self.createservice( - service, - **newConfig['services'][service]) - - self.config = newConfig - - def reloadStoppedService(self, service): - moduleName = self.config['services'][service]['moduleName'] - - try: - reload(sys.modules[moduleName]) - except Exception as errmsg: - self.log_error( - "Unable to reload module '%s': %s", moduleName, errmsg) - return - - self.createservice(service, **self.config['services'][service]) - - def waitForServiceToStop( - self, - service, - attemptsLeft=20, - callback=None, - cbkwargs={}): - - if attemptsLeft > 0: - - if self.services[service]['object'].isActive(): - - self.timerThreads.append( - eventlet.spawn_after(10, - self.waitForServiceToStop, - service, - attemptsLeft - 1)) - - else: - - del self.services[service] - - if callback: - callback(**cbkwargs) - - else: - self.log_error("Unable to stop service %s", service) - - def loadModule(self, name, filename): - if name in sys.modules: - # self.log_error( - # "error loading module '%s': module already exists", name) - return - try: - self.log_info("loading module: %s", name) - imp.load_source(name, filename) - except Exception: - raise DataServiceError( - "error loading module '%s' from '%s': %s" % - (name, filename, traceback.format_exc())) - - def load_modules_from_config(self): - for section in self.config['modules'].keys(): - filename = self.config['modules'][section]["filename"] - - self.loadModule(section, filename) - - def deleteservice(self, name): - self.log_info("deleting service: %s", name) - obj = self.services[name]['object'] - if hasattr(obj, "cleanup"): - obj.cleanup() - eventlet.greenthread.kill(obj) - self.greenThreads.remove(obj) - self.table.remove(name, self.services[name]['inbox']) - self.table.remove("local." + name, self.services[name]['inbox']) - self.unsubscribe(name, 'routeKeys') - del self.services[name] - self.log_info("finished deleting service: %s", name) - - def createservice( - self, - name="", - keys="", - description="", - moduleName="", - args={}, - module_driver=False, - type_=None, - id_=None): - - self.log_info("creating service %s with module %s and args %s", - name, moduleName, strutils.mask_password(args, "****")) - - # FIXME(arosen) This will be refactored out in the next patchset - # this is only done because existing imports from d6service - # instead of the module. - if module_driver: - congress_expected_module_path = "" - for entry in range(len(moduleName.split(".")) - 1): - congress_expected_module_path += ( - moduleName.split(".")[entry] + ".") - congress_expected_module_path = congress_expected_module_path[:-1] - module = importutils.import_module(congress_expected_module_path) - - if not module_driver and moduleName not in sys.modules: - self.log_error( - "error loading service %s: module %s does not exist", - name, - moduleName) - raise DataServiceError( - "error loading service %s: module %s does not exist" % - (name, moduleName)) - - if not module_driver and name in self.services: - self.log_error("error loading service '%s': name already in use", - name) - raise DataServiceError( - "error loading service '%s': name already in use" - % name) - - inbox = eventlet.Queue() - if not module_driver: - module = sys.modules[moduleName] - - # set args to default values, as necessary - if name in self.default_service_args: - global_args = self.default_service_args[name] - for key, value in global_args.items(): - if key not in args: - args[key] = value - - try: - svcObject = module.d6service(name, keys, inbox, self.dataPath, - args) - self.greenThreads.append(svcObject) - except Exception: - self.log_error("Error loading service '%s' of module '%s':: \n%s", - name, module, traceback.format_exc()) - raise DataServiceError( - "Error loading service '%s' of module '%s':: \n%s" - % (name, module, traceback.format_exc())) - - self.log_info("created service: %s", name) - self.services[name] = {} - self.services[name]['name'] = name - self.services[name]['description'] = description - self.services[name]['moduleName'] = moduleName - self.services[name]['keys'] = keys - self.services[name]['args'] = args - self.services[name]['object'] = svcObject - self.services[name]['inbox'] = inbox - self.services[name]['type'] = type_ - self.services[name]['id'] = id_ - - try: - self.table.add(name, inbox) - localname = "local." + name - self.table.add(localname, inbox) - - self.subscribe( - name, - 'routeKeys', - callback=self.updateRoutes, - interval=5) - - self.publish('services', self.services) - except Exception as errmsg: - del self.services[name] - self.log_error("error starting service '%s': %s", name, errmsg) - raise DataServiceError( - "error starting service '%s': %s" % (name, errmsg)) - - def getservices(self): - return self.services - - def getservice(self, id_=None, type_=None, name=None): - # Returns the first service that matches all non-None parameters. - for name_, service in self.services.items(): - if (id_ and (not service.get('id', None) or id_ != service['id'])): - continue - if type_ and type_ != service['type']: - continue - if name and name_ != name: - continue - return service - return None - - def service_object(self, name): - if name in self.services: - return self.services[name]['object'] - else: - return None - - def updateRoutes(self, msg): - keyData = self.getSubData(msg.correlationId, sender=msg.replyTo) - currentKeys = set(keyData.data) - # self.log_debug("updateRoutes msgbody: %s", msg.body.data) - pubKeys = set(msg.body.data['keys']) - - if currentKeys != pubKeys: - - newKeys = pubKeys - currentKeys - - if newKeys: - self.table.add( - list(newKeys), self.services[msg.replyTo]['inbox']) - - oldKeys = currentKeys - pubKeys - - if oldKeys: - self.table.remove( - list(oldKeys), self.services[msg.replyTo]['inbox']) - - return msg.body - - def load_services_from_config(self): - - for section in self.config['services'].keys(): - - self.createservice(section, **self.config['services'][section]) - - def routemsg(self, msg): - # LOG.debug( - # "Message lookup %s from %s", msg.key, msg.replyTo) - - destinations = self.table.lookup(msg.key) - # self.log_debug("Destinations %s for key %s for msg %s", - # destinations, msg.key, msg) - - if destinations: - for destination in destinations: - destination.put_nowait(msg) - # self.log_debug("Message sent to %s from %s: %s", - # msg.key, msg.replyTo, msg) - - def d6reload(self, msg): - - inargs = msg.body.data - - service = inargs['service'] - - newmsg = d6message.d6msg(key=service, replyTo=self.name, type="shut") - - self.send(newmsg) - cbkwargs = {} - - cbkwargs['service'] = service - - self.waitForServiceToStop( - service, - callback=self.reloadStoppedService, - cbkwargs=cbkwargs) - - def cmdhandler(self, msg): - - command = msg.header['dataindex'] - - if command == "reload": - self.d6reload(msg) - - def router_loop(self): - while self._running: - msg = self.dataPath.get() - self.routemsg(msg) - self.dataPath.task_done() - - -if __name__ == '__main__': - main = d6Cage - - try: - main.wait() - main.d6stop() - except KeyboardInterrupt: - main.d6stop() - sys.exit(0) diff --git a/congress/dse/d6message.py b/congress/dse/d6message.py deleted file mode 100644 index 4213dae0b..000000000 --- a/congress/dse/d6message.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2014 Plexxi, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -from __future__ import division -from __future__ import absolute_import - -import uuid - - -class d6msg(object): - def __init__(self, - key="", - replyTo="", - correlationId="", - type="", - dataindex="", - body={}, - srcmsg={}): - - self.header = {} - - self.body = body - - self.replyTo = replyTo - self.type = type - - if srcmsg: - self.key = srcmsg.replyTo - self.correlationId = srcmsg.correlationId - self.header['dataindex'] = srcmsg.header['dataindex'] - else: - self.key = key - self.header['dataindex'] = dataindex - if correlationId: - self.correlationId = correlationId - else: - newuuid = uuid.uuid4() - self.correlationId = str(newuuid) - - def __str__(self): - return ("").format( - self.key, self.replyTo, self.correlationId, self.type, - self.header['dataindex'], str(self.body)) diff --git a/congress/dse/dataobj.py b/congress/dse/dataobj.py deleted file mode 100644 index 784a5ae51..000000000 --- a/congress/dse/dataobj.py +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright 2014 Plexxi, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -from __future__ import division -from __future__ import absolute_import - -from oslo_log import log as logging - -LOG = logging.getLogger(__name__) - - -class dataObject(object): - def __init__(self, data=None, version=0): - - if data is None: - self.data = {} - else: - self.data = data - - if version: - self.version = version - else: - self.version = int(bool(data)) - - def __str__(self): - return str(self.data) - - -class subData(object): - """A piece of data that a data service is subscribed to. - - Each data service in the cage can have its own instance of - this data; keep track of who published which instance. - """ - def __init__(self, key, dataindex, corrId, callback): - self.key = key - self.dataindex = dataindex - self.corrId = corrId - self.callback = callback - self.dataObjects = {} - # LOG.info( - # "*****New subdata: %s, %s, %s", - # key, dataindex, id(self.dataObjects)) - - def getSources(self): - return self.dataObjects.keys() - - def update(self, sender, newdata): - self.dataObjects[sender] = newdata - - def version(self, sender): - version = 0 - - if sender in self.dataObjects: - version = self.dataObjects[sender].version - - return version - - def getData(self, sender): - result = dataObject() - - if sender in self.dataObjects: - LOG.info("subdata object: %s", self.dataObjects[sender]) - result = self.dataObjects[sender] - - return result - - def getAllData(self): - result = {} - for sender in self.dataObjects: - result[sender] = self.dataObjects[sender] - - return result - - -class pubData(object): - """A piece of data that a data service is publishing. - - Keep track of those data services that are subscribed. - """ - def __init__(self, dataindex, args={}): - self.dataindex = dataindex - self.dataObject = dataObject() - self.subscribers = {} - self.requesters = {} - self.args = args - - def update(self, newdata): - version = self.dataObject.version + 1 - self.dataObject = dataObject(newdata, version) - - def get(self): - return self.dataObject - - def version(self): - return self.dataObject.version - - def addsubscriber(self, sender, type, corrId): - if sender not in self.subscribers: - self.subscribers[sender] = {} - self.subscribers[sender]['type'] = type - self.subscribers[sender]['correlationId'] = corrId - - def removesubscriber(self, sender): - if sender in self.subscribers: - del self.subscribers[sender] - - def getsubscribers(self, sender=""): - if sender: - if sender in self.subscribers: - return self.subscribers[sender] - else: - return [] - else: - return self.subscribers diff --git a/congress/dse/deepsix.py b/congress/dse/deepsix.py deleted file mode 100644 index 4abe7e0bd..000000000 --- a/congress/dse/deepsix.py +++ /dev/null @@ -1,573 +0,0 @@ -# Copyright 2014 Plexxi, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -from __future__ import division -from __future__ import absolute_import - -import eventlet -from eventlet import greenthread -from eventlet import hubs -eventlet.monkey_patch() -from oslo_log import log as logging -from oslo_utils import strutils - -from congress.dse import d6message -from congress.dse import dataobj - -LOG = logging.getLogger(__name__) - - -class deepSix(greenthread.GreenThread): - def __init__(self, name, keys, inbox=None, dataPath=None): - hub = hubs.get_hub() - greenthread.GreenThread.__init__(self, hub.greenlet) - g = self - - self.name = name - keyList = [] - - for k in keys: - keyList.append(k) - localk = "local." + k - keyList.append(localk) - - keyList.append("allservices") - keyList.append("local.allservices") - - self.keys = keyList - - self._running = True - - self.pubdata = {} - self.subdata = {} - self.subscriberCorrelationUuids = set() - self.scheduuids = set() - self.timerThreads = [] - - # Necessary for deepSix objects that don't get initialized with an - # inbox - self.inbox = None - - if inbox: - self.inbox = inbox - self.dataPath = dataPath - - hub.schedule_call_global(0, g.switch, g._loop, [], {}) - - keyargs = {} - keyargs['keys'] = self.keys - - self.publish("routeKeys", keyargs) - - def send(self, msg): - # TODO(thinrichs): reduce how often sub messages - # get sent so we can re-enable this - # if msg.type == 'sub': - # self.log_info("sending SUB msg %s", msg) - # else: - # self.log_debug("sending msg %s", msg) - self.dataPath.put_nowait(msg) - - def schedule(self, msg, scheduuid, interval): - if scheduuid in self.scheduuids: - - if msg.type == 'pub': - msg.updatebody(self.pubdata[msg.dataindex].get()) - - self.send(msg) - - ev = eventlet.spawn_after(interval, - self.schedule, - msg, - scheduuid, - interval) - self.timerThreads.append(ev) - else: - self.log_debug("stop scheduling a message: %s", msg) - - def getSubData(self, corrId, sender=""): - if corrId in self.subdata: - if sender: - return self.subdata[corrId].getData(sender) - else: - return self.subdata[corrId].getAllData() - - def reqtimeout(self, corrId): - if corrId in self.subdata: - del self.subdata[corrId] - - def inreq(self, msg): - corruuid = msg.correlationId - dataindex = msg.header['dataindex'] - - if dataindex == "pubdata": - newmsg = d6message.d6msg(key=msg.replyTo, - replyTo=self.name, - correlationId=msg.correlationId, - type="rep", - dataindex=dataindex, - body=dataobj.dataObject(self.pubdata)) - self.send(newmsg) - - elif dataindex == "subdata": - newmsg = d6message.d6msg(key=msg.replyTo, - replyTo=self.name, - correlationId=msg.correlationId, - type="rep", - dataindex=dataindex, - body=dataobj.dataObject(self.subdata)) - self.send(newmsg) - - elif hasattr(self, 'reqhandler'): - self.pubdata[dataindex] = dataobj.pubData(dataindex, msg.body) - self.pubdata[dataindex].requesters[msg.replyTo] = corruuid - self.reqhandler(msg) - - else: - self.log_exception("Received a request but have no handler: %s", - msg) - - def inpull(self, msg): - # self.log_debug("received PULL msg: %s", msg) - dataindex = msg.header['dataindex'] - - if dataindex in self.pubdata: - - reply = d6message.d6msg(replyTo=self.name, - type="rep", - body=self.pubdata[dataindex].get(), - srcmsg=msg) - self.send(reply) - - else: - self.pubdata[dataindex] = dataobj.pubData(dataindex, msg.body) - self.subhandler(msg) - - self.pubdata[dataindex].addsubscriber( - msg.replyTo, "pull", msg.correlationId) - - def incmd(self, msg): - # self.log_debug("received CMD msg: %s", msg) - corruuid = msg.correlationId - dataindex = msg.header['dataindex'] - - if corruuid not in self.pubdata: - self.pubdata[corruuid] = dataobj.pubData(dataindex, msg.body) - self.pubdata[corruuid].requesters[msg.replyTo] = corruuid - self.cmdhandler(msg) - - def insub(self, msg): - # self.log_info("received SUB msg: %s", msg) - corruuid = msg.correlationId - dataindex = msg.header['dataindex'] - sender = msg.replyTo - - if corruuid not in self.subscriberCorrelationUuids: - - if dataindex not in self.pubdata: - self.pubdata[dataindex] = dataobj.pubData(dataindex, msg.body) - # always call subhandler so subclass has a chance to know more - # about the subscription - if hasattr(self, "subhandler"): - self.subhandler(msg) - - self.pubdata[dataindex].addsubscriber(sender, "push", corruuid) - self.subscriberCorrelationUuids.add(corruuid) - self.push(dataindex, sender, type='sub') - - def inunsub(self, msg): - # self.log_info("received UNSUB msg: %s", msg) - dataindex = msg.header['dataindex'] - - if hasattr(self, 'unsubhandler'): - if self.unsubhandler(msg): - if dataindex in self.pubdata: - self.pubdata[dataindex].removesubscriber(msg.replyTo) - else: - if dataindex in self.pubdata: - self.pubdata[dataindex].removesubscriber(msg.replyTo) - - # release resource if no more subscribers for this dataindex - if self.pubdata[dataindex].getsubscribers() == []: - self.pubdata.discard(dataindex) - - def inshut(self, msg): - """Shut down this data service.""" - # self.log_warning("received SHUT msg: %s", msg) - - for corruuid in self.subdata: - self.unsubscribe(corrId=corruuid) - - for ev in self.timerThreads: - try: - ev.kill() - except Exception as errmsg: - self.log("error stopping timer thread: %s", errmsg) - - self._running = False - - self.keys = {} - keydata = {} - keydata['keys'] = {} - self.publish("routeKeys", keydata) - - def inpubrep(self, msg): - # self.log_debug("received PUBREP msg: %s", msg) - corruuid = msg.correlationId - sender = msg.replyTo - - if corruuid in self.scheduuids: - self.scheduuids.remove(corruuid) - - if corruuid in self.subdata: - callback = self.subdata[corruuid].callback - - if msg.type in ['pub', 'rep']: - if callback: - scrubbed = callback(msg) - if scrubbed: - self.subdata[corruuid].update( - sender, dataobj.dataObject(scrubbed)) - else: - self.unsubscribe(corrId=corruuid) - - def request( - self, - key, - dataindex, - corrId="", - callback=None, - interval=0, - timer=30, - args={}): - msg = d6message.d6msg(key=key, - replyTo=self.name, - correlationId=corrId, - type="req", - dataindex=dataindex, - body=args) - - corruuid = msg.correlationId - self.subdata[corruuid] = dataobj.subData(key, dataindex, - corruuid, callback) - - if interval: - self.scheduuids.add(corruuid) - self.schedule(msg, corruuid, interval) - else: - - self.send(msg) - - if timer: - self.timerThreads.append( - eventlet.spawn_after(timer, - self.reqtimeout, - corruuid)) - - def reply(self, dataindex, newdata="", delete=True): - for requester in self.pubdata[dataindex].requesters: - - msg = d6message.d6msg(key=requester, - replyTo=self.name, - correlationId=self.pubdata[dataindex] - .requesters[requester], - type="rep", - dataindex=self.pubdata[dataindex].dataindex) - - if newdata: - msg.body = dataobj.dataObject(newdata) - else: - msg.body = self.pubdata[dataindex].get() - # self.log_debug("REPLY body: %s", msg.body) - - self.send(msg) - - if delete: - - del self.pubdata[dataindex] - - def prepush_processor(self, data, dataindex, type=None): - """Pre-processing the data before publish. - - Given the DATA to be published, returns the data actually put - on the wire. Can be overloaded. - """ - return data - - def reserved_dataindex(self, dataindex): - """Returns True if DATAINDEX is one of those reserved by deepsix.""" - return dataindex in ('routeKeys', 'pubdata', 'subdata') - - def push(self, dataindex, key="", type=None): - """Send data for DATAINDEX and KEY to subscribers/requesters.""" - self.log_debug("pushing dataindex %s to subscribers %s " - "and requesters %s ", dataindex, - self.pubdata[dataindex].subscribers, - self.pubdata[dataindex].requesters) - - # bail out if there are no requesters/subscribers - if (len(self.pubdata[dataindex].requesters) == 0 and - len(self.pubdata[dataindex].subscribers) == 0): - self.log_debug("no requesters/subscribers; not sending") - return - - # give prepush hook chance to morph data - if self.reserved_dataindex(dataindex): - data = self.pubdata[dataindex].get() - # bail out if no data to send - if data is None: - return - else: - # .get() returns dataObject - data = self.prepush_processor(self.pubdata[dataindex].get().data, - dataindex, - type=type) - # bail out if prepush hook said there's no data - if data is None: - return - data = dataobj.dataObject(data) - - # send to subscribers/requestors - if self.pubdata[dataindex].subscribers: - - if key: - msg = d6message.d6msg(key=key, - replyTo=self.name, - correlationId=self.pubdata[dataindex] - .subscribers[key]['correlationId'], - type="pub", - dataindex=dataindex, - body=data) - self.send(msg) - else: - subscribers = self.pubdata[dataindex].getsubscribers() - for subscriber in subscribers: - - if subscribers[subscriber]['type'] == "push": - corId = subscribers[subscriber]['correlationId'] - msg = d6message.d6msg(key=subscriber, - replyTo=self.name, - correlationId=corId, - type="pub", - dataindex=dataindex, - body=data) - - self.send(msg) - - if self.pubdata[dataindex].requesters: - if key: - msg = d6message.d6msg(key=key, - replyTo=self.name, - correlationId=self.pubdata[dataindex]. - requesters[key], - type="rep", - dataindex=dataindex, - body=self.pubdata[dataindex].get()) - self.send(msg) - del self.pubdata[dataindex].requesters[key] - else: - for requester in self.pubdata[dataindex].requesters.keys(): - corId = self.pubdata[dataindex].requesters[requester] - msg = d6message.d6msg(key=requester, - replyTo=self.name, - correlationId=corId, - type="rep", - dataindex=dataindex, - body=self.pubdata[dataindex].get()) - self.send(msg) - del self.pubdata[dataindex].requesters[requester] - - def subscribe( - self, - key, - dataindex, - corrId="", - callback=None, - pull=False, - interval=30, - args={}): - """Subscribe to a DATAINDEX for a given KEY.""" - self.log_debug("subscribed to %s with dataindex %s", key, dataindex) - - msg = d6message.d6msg(key=key, - replyTo=self.name, - correlationId=corrId, - dataindex=dataindex, - body=args) - if pull: - msg.type = 'pull' - else: - msg.type = 'sub' - - corruuid = msg.correlationId - - self.subdata[corruuid] = dataobj.subData(key, dataindex, - corruuid, callback) - self.scheduuids.add(corruuid) - self.schedule(msg, corruuid, interval) - - return corruuid - - def unsubscribe(self, key="", dataindex="", corrId=""): - """Unsubscribe self from DATAINDEX for KEY.""" - self.log_debug("unsubscribed to %s with dataindex %s", key, dataindex) - if corrId: - if corrId in self.scheduuids: - self.scheduuids.remove(corrId) - if corrId in self.subdata: - key = self.subdata[corrId].key - dataindex = self.subdata[corrId].dataindex - del self.subdata[corrId] - - msg = d6message.d6msg(key=key, - replyTo=self.name, - correlationId=corrId, - type='unsub', - dataindex=dataindex) - - self.send(msg) - - elif key and dataindex: - - for corruuid in self.subdata.copy().keys(): - # copy to avoid undefined behavior w changing dict during iter - - if (key == self.subdata[corruuid].key and - dataindex == self.subdata[corruuid].dataindex): - - if corruuid in self.scheduuids: - self.scheduuids.remove(corruuid) - - del self.subdata[corruuid] - - msg = d6message.d6msg(key=key, - replyTo=self.name, - correlationId=corruuid, - type='unsub', - dataindex=dataindex) - self.send(msg) - - return - - def command( - self, - key, - command, - corrId="", - callback=None, - timer=30, - args={}): - msg = d6message.d6msg(key=key, - replyTo=self.name, - type="cmd", - correlationId=corrId, - dataindex=command, - body=args) - - corruuid = msg.correlationId - - self.subdata[corruuid] = dataobj.subData(key, command, - corruuid, callback) - - self.send(msg) - - if timer: - self.timerThreads.append( - eventlet.spawn_after(timer, - self.reqtimeout, - corruuid)) - - def publish(self, dataindex, newdata, key='', use_snapshot=False): - # Note(ekcs): use_snapshot param is ignored. - # Accepted here on temporary basis for dse1+2 compatibility. - self.log_debug("publishing to dataindex %s with data %s", - dataindex, strutils.mask_password(newdata, "****")) - if dataindex not in self.pubdata: - self.pubdata[dataindex] = dataobj.pubData(dataindex) - - self.pubdata[dataindex].update(newdata) - - self.push(dataindex, type='pub') - - def receive(self, msg): - if msg.type == 'sub': - self.insub(msg) - elif msg.type == 'unsub': - self.inunsub(msg) - elif msg.type == 'pub': - self.inpubrep(msg) - elif msg.type == 'req': - self.inreq(msg) - elif msg.type == 'rep': - self.inpubrep(msg) - elif msg.type == 'pull': - self.inpull(msg) - elif msg.type == 'shut': - self.inshut(msg) - elif msg.type == 'cmd': - if hasattr(self, 'cmdhandler'): - self.incmd(msg) - else: - assert False, "{} received message of unknown type {}: {}".format( - self.name, msg.type, str(msg)) - - def _loop(self): - - # self.running will be set to False when processing a shutdown a - # message - while self._running: - if self.inbox: - msg = self.inbox.get() - self.receive(msg) - self.inbox.task_done() - else: - # in test cases some deepSix instances are initialized - # without an inbox, this prevents a busy wait state - eventlet.sleep(1) - - def subscription_list(self): - """Return a list version of subscriptions.""" - return [(x.key, x.dataindex) for x in self.subdata.values()] - - def subscriber_list(self): - """Return a list version of subscribers.""" - result = [] - for pubdata in self.pubdata.values(): - for subscriber in pubdata.subscribers: - result.append((subscriber, pubdata.dataindex)) - return result - - def log(self, msg, *args): - self.log_debug(msg, *args) - - def log_debug(self, msg, *args): - msg = "%s:: %s" % (self.name, msg) - LOG.debug(msg, *args) - - def log_info(self, msg, *args): - msg = "%s:: %s" % (self.name, msg) - LOG.info(msg, *args) - - def log_warning(self, msg, *args): - msg = "%s:: %s" % (self.name, msg) - LOG.warning(msg, *args) - - def log_error(self, msg, *args): - msg = "%s:: %s" % (self.name, msg) - LOG.error(msg, *args) - - def log_exception(self, msg, *args): - msg = "%s:: %s" % (self.name, msg) - LOG.exception(msg, *args) diff --git a/congress/tests/base.py b/congress/tests/base.py index caeb9119b..56ee5e5c0 100644 --- a/congress/tests/base.py +++ b/congress/tests/base.py @@ -33,7 +33,6 @@ from congress.db import api as db_api # Import all data models from congress.db.migration.models import head # noqa from congress.db import model_base -from congress.dse import d6cage from congress.tests import helper from congress.tests import policy_fixture @@ -85,9 +84,6 @@ class TestCase(testtools.TestCase): self.log_fixture = self.useFixture(fixtures.FakeLogger()) self.policy = self.useFixture(policy_fixture.PolicyFixture()) - # cage is a singleton so we delete it here and - # recreate it after each test - self.addCleanup(d6cage.delete_cage) def setup_config(self): """Tests that need a non-default config can override this method.""" diff --git a/congress/tests/datasources/test_driver.py b/congress/tests/datasources/test_driver.py deleted file mode 100644 index e1a1cfcc6..000000000 --- a/congress/tests/datasources/test_driver.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright (c) 2013 VMware, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -from __future__ import print_function -from __future__ import division -from __future__ import absolute_import - -from oslo_log import log as logging - -from congress.datasources import datasource_driver - - -LOG = logging.getLogger(__name__) - - -def d6service(name, keys, inbox, datapath, args): - """Create dataservice instance. - - This method is called by d6cage to create a dataservice - instance. There are a couple of parameters we found useful - to add to that call, so we included them here instead of - modifying d6cage (and all the d6cage.createservice calls). - """ - return TestDriver(name, keys, inbox, datapath, args) - - -class TestDriver(datasource_driver.PollingDataSourceDriver): - def __init__(self, name='', keys='', inbox=None, datapath=None, args=None): - if args is None: - args = self._empty_openstack_credentials() - super(TestDriver, self).__init__(name, keys, inbox, datapath, args) - self.msg = None - self.state = {} - self._init_end_start_poll() - - def receive_msg(self, msg): - LOG.info("TestDriver: received msg %s", msg) - self.msg = msg - - def get_msg_data(self): - msgstr = "" - if self.msg is None: - return msgstr - # only support list and set now - if isinstance(self.msg.body.data, (list, set)): - for di in self.msg.body.data: - msgstr += str(di) - else: - msgstr = str(self.msg.body.data) - LOG.info("TestDriver: current received msg: %s", msgstr) - return msgstr - - def update_from_datasource(self): - pass - - def prepush_processor(self, data, dataindex, type=None): - # don't change data before transfer - return data diff --git a/congress/tests/dse/test_dse.py b/congress/tests/dse/test_dse.py deleted file mode 100644 index 21954d222..000000000 --- a/congress/tests/dse/test_dse.py +++ /dev/null @@ -1,180 +0,0 @@ -# Copyright (c) 2013 VMware, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -from __future__ import print_function -from __future__ import division -from __future__ import absolute_import - -from congress.datalog import compile -import congress.dse.d6cage -from congress.tests import base -import congress.tests.helper as helper - - -class TestDSE(base.TestCase): - - def test_cage(self): - """Test basic DSE functionality.""" - cage = congress.dse.d6cage.d6Cage() - cage.loadModule("TestDriver", - helper.data_module_path( - "../tests/datasources/test_driver.py")) - args = helper.datasource_openstack_args() - args['poll_time'] = 0 - cage.createservice(name="test1", moduleName="TestDriver", args=args) - cage.createservice(name="test2", moduleName="TestDriver", args=args) - test1 = cage.service_object('test1') - test2 = cage.service_object('test2') - test1.subscribe('test2', 'p', callback=test1.receive_msg) - test2.publish('p', 42) - helper.retry_check_for_message_to_arrive(test1) - self.assertEqual(42, test1.msg.body.data) - - def test_policy(self): - """Test basic DSE functionality with policy engine.""" - cage = congress.dse.d6cage.d6Cage() - cage.loadModule("TestDriver", - helper.data_module_path( - "../tests/datasources/test_driver.py")) - cage.loadModule("TestPolicy", helper.policy_module_path()) - cage.createservice(name="data", moduleName="TestDriver", - args=helper.datasource_openstack_args()) - cage.createservice(name="policy", moduleName="TestPolicy", - args={'d6cage': cage, 'rootdir': '', - 'log_actions_only': True}) - data = cage.services['data']['object'] - policy = cage.services['policy']['object'] - policy.subscribe('data', 'p', callback=policy.receive_msg) - data.publish('p', 42) - helper.retry_check_for_message_to_arrive(policy) - self.assertEqual(42, policy.msg.body.data) - - def test_policy_data(self): - """Test policy properly inserts data and processes it normally.""" - cage = congress.dse.d6cage.d6Cage() - cage.loadModule("TestDriver", - helper.data_module_path( - "../tests/datasources/test_driver.py")) - cage.loadModule("TestPolicy", helper.policy_module_path()) - cage.createservice(name="data", moduleName="TestDriver", - args=helper.datasource_openstack_args()) - cage.createservice(name="policy", moduleName="TestPolicy", - args={'d6cage': cage, 'rootdir': '', - 'log_actions_only': True}) - data = cage.services['data']['object'] - policy = cage.services['policy']['object'] - # turn off module-schema syntax checking - policy.create_policy('data') - policy.set_schema('data', compile.Schema({'p': ((1, None),)})) - policy.subscribe('data', 'p', callback=policy.receive_data) - formula = policy.parse1('p(1)') - # sending a single Insert. (Default for Event is Insert.) - data.publish('p', [compile.Event(formula)]) - helper.retry_check_db_equal(policy, 'data:p(x)', 'data:p(1)') - - def test_policy_tables(self): - """Test basic DSE functionality with policy engine and the API.""" - cage = congress.dse.d6cage.d6Cage() - cage.loadModule("TestDriver", - helper.data_module_path( - "../tests/datasources/test_driver.py")) - cage.loadModule("TestPolicy", helper.policy_module_path()) - cage.createservice(name="data", moduleName="TestDriver", - args=helper.datasource_openstack_args()) - # using regular testdriver as API for now - cage.createservice(name="api", moduleName="TestDriver", - args=helper.datasource_openstack_args()) - cage.createservice(name="policy", moduleName="TestPolicy", - args={'d6cage': cage, 'rootdir': '', - 'log_actions_only': True}) - data = cage.services['data']['object'] - api = cage.services['api']['object'] - policy = cage.services['policy']['object'] - policy.create_policy('data') - policy.set_schema('data', compile.Schema({'q': (1,)})) - policy.subscribe('api', 'policy-update', - callback=policy.receive_policy_update) - # simulate API call for insertion of policy statements - formula = policy.parse1('p(x) :- data:q(x)') - api.publish('policy-update', [compile.Event(formula)]) - helper.retry_check_nonempty_last_policy_change(policy) - # simulate data source publishing to q - formula = policy.parse1('q(1)') - data.publish('q', [compile.Event(formula)]) - helper.retry_check_db_equal(policy, 'data:q(x)', 'data:q(1)') - # check that policy did the right thing with data - e = helper.db_equal(policy.select('p(x)'), 'p(1)') - self.assertTrue(e, 'Policy insert') - # check that publishing into 'p' does not work - formula = policy.parse1('p(3)') - data.publish('p', [compile.Event(formula)]) - # can't actually check that the update for p does not arrive - # so instead wait a bit and check - helper.pause() - e = helper.db_equal(policy.select('p(x)'), 'p(1)') - self.assertTrue(e, 'Policy non-insert') - - def test_policy_table_publish(self): - """Policy table result publish - - Test basic DSE functionality with policy engine and table result - publish. - """ - - cage = congress.dse.d6cage.d6Cage() - cage.loadModule("TestDriver", - helper.data_module_path( - "../tests/datasources/test_driver.py")) - cage.loadModule("TestPolicy", helper.policy_module_path()) - cage.createservice(name="data", moduleName="TestDriver", - args=helper.datasource_openstack_args()) - cage.createservice(name="policy", moduleName="TestPolicy", - args={'d6cage': cage, 'rootdir': '', - 'log_actions_only': True}) - data = cage.services['data']['object'] - policy = cage.services['policy']['object'] - policy.create_policy('data') - policy.create_policy('classification') - policy.set_schema('data', compile.Schema({'q': (1,)})) - policy.insert('p(x):-data:q(x),gt(x,2)', target='classification') - data.subscribe('policy', 'classification:p', callback=data.receive_msg) - helper.retry_check_subscribers(policy, [('data', 'classification:p')]) - self.assertEqual(list(policy.policySubData.keys()), - [('p', 'classification', None)]) - policy.insert('q(1)', target='data') - # no entry here - self.assertEqual(data.get_msg_data(), '{}') - policy.insert('q(2)', target='data') - policy.insert('q(3)', target='data') - # get an update - helper.retry_check_for_message_data(data, 'insert[p(3)]') - self.assertEqual(data.get_msg_data(), 'insert[p(3)]') - # subscribe again to get a full table - data.subscribe('policy', 'classification:p', callback=data.receive_msg) - helper.retry_check_for_message_data(data, 'p(3)') - self.assertEqual(data.get_msg_data(), 'p(3)') - # get another update - policy.insert('q(4)', target='data') - helper.retry_check_for_message_data(data, 'insert[p(4)]') - self.assertEqual(data.get_msg_data(), 'insert[p(4)]') - # get another update - policy.delete('q(4)', target='data') - helper.retry_check_for_message_data(data, 'delete[p(4)]') - self.assertEqual(data.get_msg_data(), 'delete[p(4)]') - data.unsubscribe('policy', 'classification:p') - # trigger removed - helper.retry_check_no_subscribers(policy, - [('data', 'classification:p')]) - self.assertEqual(list(policy.policySubData.keys()), []) diff --git a/congress/tests/test_benchmark_updates.py b/congress/tests/test_benchmark_updates.py index cdd623719..bc4685b9a 100644 --- a/congress/tests/test_benchmark_updates.py +++ b/congress/tests/test_benchmark_updates.py @@ -23,7 +23,6 @@ from mox3 import mox from six.moves import range from congress.datalog import compile -from congress.dse import dataobj from congress import harness from congress.policy_engines import agnostic from congress.tests import base @@ -60,10 +59,10 @@ class BenchmarkDatasource(base.Benchmark): self.assertEqual(datasource.state, {}) # add a subscriber to ensure the updates end up in datasource.dataPath - pubdata = datasource.pubdata.setdefault(table_name, - dataobj.pubData(table_name)) - pubdata.addsubscriber(self.__class__.__name__, "push", "") - self.assertTrue(datasource.pubdata[table_name]) + # pubdata = datasource.pubdata.setdefault(table_name, + # dataobj.pubData(table_name)) + # pubdata.addsubscriber(self.__class__.__name__, "push", "") + # self.assertTrue(datasource.pubdata[table_name]) self.cage = cage self.engine = engine