diff --git a/taskflow/jobs/backends/__init__.py b/taskflow/jobs/backends/__init__.py index b720024b..0299636a 100644 --- a/taskflow/jobs/backends/__init__.py +++ b/taskflow/jobs/backends/__init__.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib import logging import six @@ -53,3 +54,14 @@ def fetch(name, conf, namespace=BACKEND_NAMESPACE, **kwargs): return mgr.driver except RuntimeError as e: raise exc.NotFound("Could not find jobboard %s" % (board), e) + + +@contextlib.contextmanager +def backend(name, conf, namespace=BACKEND_NAMESPACE, **kwargs): + """Fetches a jobboard backend, connects to it and allows it to be used in + a context manager statement with the jobboard being closed upon completion. + """ + jb = fetch(name, conf, namespace=namespace, **kwargs) + jb.connect() + with contextlib.closing(jb): + yield jb diff --git a/taskflow/jobs/jobboard.py b/taskflow/jobs/jobboard.py index 070b3ea3..0f3d4455 100644 --- a/taskflow/jobs/jobboard.py +++ b/taskflow/jobs/jobboard.py @@ -132,6 +132,18 @@ class JobBoard(object): this must be the same name that was used for claiming this job. """ + @abc.abstractmethod + def connect(self): + """Opens the connection to any backend system.""" + + @abc.abstractmethod + def close(self): + """Close the connection to any backend system. + + Once closed the jobboard can no longer be used (unless reconnection + occurs). + """ + # Jobboard events POSTED = 'POSTED' # new job is/has been posted