From a001829f55cab0167853e738d2014bdcfea2e9a6 Mon Sep 17 00:00:00 2001 From: Robert Putt Date: Sat, 3 Mar 2018 00:47:43 +0000 Subject: [PATCH] Update swift / worker base Change-Id: I512a3ecfb52c908e26b3b7a1e679d3303ee39338 --- python_nemesis/swift.py | 13 +++++++++++++ python_nemesis/worker/__init__.py | 11 +++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/python_nemesis/swift.py b/python_nemesis/swift.py index 5b404f9..4cd9272 100644 --- a/python_nemesis/swift.py +++ b/python_nemesis/swift.py @@ -30,3 +30,16 @@ def upload_to_swift(filename, file_id): container = config.swift.container.encode('utf-8') file_id = str(file_id).encode('utf-8') swift_session.put_object(container, file_id, upload_file) + + +def download_from_swift(file_uuid): + config = current_app.config['cfg'] + auth_version = config.swift.auth_version + swift_session = swiftclient.Connection(authurl=config.swift.auth_uri, + user=config.swift.user, + key=config.swift.password, + tenant_name=config.swift.project, + auth_version=auth_version) + + container = config.swift.container.encode('utf-8') + print(swift_session.get_object(container, file_uuid)) diff --git a/python_nemesis/worker/__init__.py b/python_nemesis/worker/__init__.py index 6dca6b7..03c0b9d 100644 --- a/python_nemesis/worker/__init__.py +++ b/python_nemesis/worker/__init__.py @@ -14,6 +14,8 @@ from flask import current_app import oslo_messaging +from python_nemesis.extensions import log +from python_nemesis.swift import download_from_swift class NewFileEndpoint(object): @@ -21,7 +23,13 @@ class NewFileEndpoint(object): event_type='nemsis.new_file') def info(self, ctxt, publisher_id, event_type, payload, metadata): - print(payload) + file_uuid = payload['file_uuid'] + file_id = payload['file_id'] + log.logger.info("Fetched file_id %s to work on from the queue." + % file_id) + + log.logger.info("Downloading file from Swift for analysis.") + download_from_swift(file_uuid) def run_worker(): @@ -39,7 +47,6 @@ def run_worker(): server = oslo_messaging.get_notification_listener(transport, targets, endpoints, - executor='threading', pool=pool) server.start()