From 2341f2acafecacc5f66d5331eec8ad2ca62505f4 Mon Sep 17 00:00:00 2001 From: Robert Putt Date: Thu, 8 Mar 2018 18:48:30 +0000 Subject: [PATCH] Use Oslo Config global CONF object rather than Flask config. Update anything using configuration items to use the global CONF object from Oslo Config rather than using the Flask config as the Oslo Config CONF object can be used outside of the Flask application context. Change-Id: Ie7eafd322ab1a6f57ff1e8e5b66e27079caa4aba --- python_nemesis/api/v1/__init__.py | 4 ++- python_nemesis/db/utilities.py | 21 +++++++++++++++ python_nemesis/plugins/null_plugin.py | 1 + python_nemesis/swift.py | 39 ++++++++++++++------------- python_nemesis/worker/__init__.py | 34 ++++++++++++++++------- python_nemesis/worker_app.py | 2 +- 6 files changed, 71 insertions(+), 30 deletions(-) diff --git a/python_nemesis/api/v1/__init__.py b/python_nemesis/api/v1/__init__.py index bacd1d4..38029cf 100644 --- a/python_nemesis/api/v1/__init__.py +++ b/python_nemesis/api/v1/__init__.py @@ -17,6 +17,7 @@ from flask import jsonify from flask import request import magic import os +from oslo_config.cfg import CONF from python_nemesis.db.utilities import add_request from python_nemesis.db.utilities import create_or_renew_by_hash from python_nemesis.db.utilities import get_file_by_sha512_hash @@ -102,7 +103,8 @@ def post_file(): file_dict = file.to_dict() # Upload to swift and remove the local temp file. - upload_to_swift(filename, file_uuid) + container = CONF.swift.container.encode('utf-8') + upload_to_swift(container, filename, file_uuid) os.remove(filename) # Send message to worker queue with file details. diff --git a/python_nemesis/db/utilities.py b/python_nemesis/db/utilities.py index ff783e1..412a44b 100644 --- a/python_nemesis/db/utilities.py +++ b/python_nemesis/db/utilities.py @@ -91,3 +91,24 @@ def create_or_renew_by_hash(hashes, file_size, file_type=None): file_type, current_user.user_id) return file + + +def get_file_by_id(file_id): + try: + result = db.session.query(Files). \ + filter(Files.file_id == file_id).one() + except NoResultFound: + result = None + + return result + + +def update_status_by_file_id(file_id, new_status): + update_file = get_file_by_id(file_id) + + if update_file: + update_file.status = new_status + db.session.commit() + return True + else: + return False diff --git a/python_nemesis/plugins/null_plugin.py b/python_nemesis/plugins/null_plugin.py index 264d5c3..2c030f7 100644 --- a/python_nemesis/plugins/null_plugin.py +++ b/python_nemesis/plugins/null_plugin.py @@ -14,6 +14,7 @@ class NemesisPlugin(object): + plugin_name = 'null_plugin' def __init__(self, file_path): self.file_path = file_path diff --git a/python_nemesis/swift.py b/python_nemesis/swift.py index 4cd9272..eac7cae 100644 --- a/python_nemesis/swift.py +++ b/python_nemesis/swift.py @@ -12,34 +12,35 @@ # License for the specific language governing permissions and limitations # under the License. -from flask import current_app import os +from oslo_config.cfg import CONF import swiftclient.client as swiftclient -def upload_to_swift(filename, file_id): - config = current_app.config['cfg'] - auth_version = config.swift.auth_version - swift_session = swiftclient.Connection(authurl=config.swift.auth_uri, - user=config.swift.user, - key=config.swift.password, - tenant_name=config.swift.project, +def get_swift_session(): + auth_version = CONF.swift.auth_version + swift_session = swiftclient.Connection(authurl=CONF.swift.auth_uri, + user=CONF.swift.user, + key=CONF.swift.password, + tenant_name=CONF.swift.project, auth_version=auth_version) + return swift_session + +def upload_to_swift(container, filename, file_id): + swift_session = get_swift_session() with open(os.path.join(filename), 'rb') as upload_file: - container = config.swift.container.encode('utf-8') file_id = str(file_id).encode('utf-8') swift_session.put_object(container, file_id, upload_file) -def download_from_swift(file_uuid): - config = current_app.config['cfg'] - auth_version = config.swift.auth_version - swift_session = swiftclient.Connection(authurl=config.swift.auth_uri, - user=config.swift.user, - key=config.swift.password, - tenant_name=config.swift.project, - auth_version=auth_version) +def download_from_swift(container, file_uuid): + swift_session = get_swift_session() + obj = swift_session.get_object(container, file_uuid) + with open('/tmp/%s' % file_uuid, 'wb') as download_file: + download_file.write(obj[1]) - container = config.swift.container.encode('utf-8') - print(swift_session.get_object(container, file_uuid)) + +def delete_from_swift(container, file_uuid): + swift_session = get_swift_session() + swift_session.delete_object(container, file_uuid) diff --git a/python_nemesis/worker/__init__.py b/python_nemesis/worker/__init__.py index 03c0b9d..fb4e5c6 100644 --- a/python_nemesis/worker/__init__.py +++ b/python_nemesis/worker/__init__.py @@ -12,10 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. -from flask import current_app +from oslo_config.cfg import CONF import oslo_messaging +from python_nemesis.db.utilities import update_status_by_file_id from python_nemesis.extensions import log +from python_nemesis.swift import delete_from_swift from python_nemesis.swift import download_from_swift +from python_nemesis.worker_app import create_worker_app class NewFileEndpoint(object): @@ -23,18 +26,30 @@ class NewFileEndpoint(object): event_type='nemsis.new_file') def info(self, ctxt, publisher_id, event_type, payload, metadata): - file_uuid = payload['file_uuid'] - file_id = payload['file_id'] - log.logger.info("Fetched file_id %s to work on from the queue." - % file_id) + with create_worker_app().app_context(): + file_uuid = payload['file_uuid'] + file_id = payload['file_id'] - log.logger.info("Downloading file from Swift for analysis.") - download_from_swift(file_uuid) + container = CONF.swift.container.encode('utf-8') + log.logger.info("Fetched file_id %s to work on from the queue." + % file_id) + + log.logger.info("Downloading file from Swift for analysis.") + download_from_swift(container, file_uuid) + log.logger.info("Fetched file to /tmp/%s" % file_uuid) + + log.logger.info("Running analysis plugins.") + log.logger.info("Updating file analysis ") + + log.logger.info("Cleaning up analysis subject.") + delete_from_swift('incoming_files', file_uuid) + + log.logger.info("Setting file status to complete.") + update_status_by_file_id(file_id, 'complete') def run_worker(): - cfg = current_app.config['cfg'] - transport = oslo_messaging.get_notification_transport(cfg) + transport = oslo_messaging.get_notification_transport(CONF) targets = [ oslo_messaging.Target(topic='nemesis_notifications') @@ -47,6 +62,7 @@ def run_worker(): server = oslo_messaging.get_notification_listener(transport, targets, endpoints, + executor='threading', pool=pool) server.start() diff --git a/python_nemesis/worker_app.py b/python_nemesis/worker_app.py index 7b5d01e..378a8a1 100644 --- a/python_nemesis/worker_app.py +++ b/python_nemesis/worker_app.py @@ -13,7 +13,6 @@ from python_nemesis.base_app import configure_app from python_nemesis.base_app import configure_extensions from python_nemesis.base_app import create_app -from python_nemesis.worker import run_worker def create_worker_app(): @@ -24,6 +23,7 @@ def create_worker_app(): if __name__ == "__main__": # pragma: no cover + from python_nemesis.worker import run_worker app = create_worker_app() with app.app_context(): run_worker()