Code improvements

Allow multiple publishers for different applications
Included all necessary python application packages in setup.py
Improved diagnostics and documentation in the R forecaster
Changed installation method for the application to align with the requirements of Opendev and Zuul
Refactoring of exn python connector code
Improvements to the forecaster state variables, main prediction logic and the handling of influxdb connections
Improvement in the prediction file handling of empty data
Configuration file improvements

Change-Id: Ic8aaa0728a4b936cd4c6e1ed5a0e01ba8f0fb002
This commit is contained in:
Andreas Tsagkaropoulos 2024-05-14 17:04:53 +03:00
parent c40f480ce9
commit 59968e9fc4
37 changed files with 895 additions and 574 deletions

2
.hadolint.yaml Normal file
View File

@ -0,0 +1,2 @@
ignored:
- DL3008

View File

@ -5,7 +5,7 @@
replicaCount: 1 replicaCount: 1
image: image:
repository: "quay.io/nebulous/exponential-smoothing-predictor-java-spring-boot-demo" repository: "quay.io/nebulous/exponential-smoothing-predictor"
pullPolicy: IfNotPresent pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion. # Overrides the image tag whose default is the chart appVersion.
tag: "" tag: ""

View File

@ -9,64 +9,65 @@ FROM python:3.11 as source
RUN mkdir /src RUN mkdir /src
COPY ./src/ /src/ COPY ./src/ /src/
#COPY src/requirements.txt /src/
WORKDIR /src WORKDIR /src
RUN pip install --no-cache-dir -r requirements.txt && python3 setup.py sdist RUN pip install --no-cache-dir -r requirements.txt && python3 setup.py sdist
FROM ubuntu:noble #FROM ubuntu:noble
FROM python:3.11-slim
RUN mkdir -p /home/r_predictions RUN mkdir -p /home/r_predictions
#RUN apt-get update #RUN apt-get update
ENV LOG_FILE=exponential_smoothing.log
ENV TZ=Europe/Athens ENV TZ=Europe/Athens
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ARG DEBIAN_FRONTEND=noninteractive ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install --no-install-recommends -y \ RUN apt-get update && apt-get install --no-install-recommends -y \
libcurl4-openssl-dev=8.5.0-2ubuntu1 \ libcurl4-openssl-dev \
build-essential=12.10ubuntu1 \ build-essential \
r-base-core=4.3.2-1build1 \ r-base-core \
r-base-dev=4.3.2-1build1 \ r-base-dev \
r-cran-digest=0.6.34-1 \ r-cran-digest \
r-cran-boot=1.3-28.1-1 \ r-cran-boot \
r-cran-class=7.3-22-2 \ r-cran-class \
r-cran-cluster=2.1.6-1 \ r-cran-cluster \
r-cran-codetools=0.2-19-1 \ r-cran-codetools \
r-cran-foreign=0.8.86-1 \ r-cran-foreign \
r-cran-kernsmooth=2.23-22-1 \ r-cran-kernsmooth \
r-cran-lattice=0.22-5-1 \ r-cran-lattice \
r-cran-littler=0.3.19-1 \ r-cran-littler \
r-cran-mass=7.3-60-2 \ r-cran-mass \
r-cran-matrix=1.6-4-1 \ r-cran-matrix \
r-cran-mgcv=1.9-1-1 \ r-cran-mgcv \
r-cran-nlme=3.1.164-1 \ r-cran-nlme \
r-cran-nnet=7.3-19-2 \ r-cran-nnet \
r-cran-pkgkitten=0.2.3-1 \ r-cran-pkgkitten \
r-cran-rcpp=1.0.11-1 \ r-cran-rcpp \
r-cran-rpart=4.1.23-1 \ r-cran-rpart \
r-cran-spatial=7.3-17-1 \ r-cran-spatial \
r-cran-survival=3.5-7-1 \ r-cran-survival \
r-doc-html=4.3.2-1build1 \ r-doc-html \
r-recommended=4.3.2-1build1 \ r-recommended \
python3=3.11.4-5ubuntu1 \
python3-pip=23.3+dfsg-1 \
python3.11-venv=3.11.7-2 \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
COPY ./src/r_predictors/r_commands.R /home/r_predictions/ COPY --from=source /src/r_predictors/r_commands.R /home/r_predictions/
RUN Rscript /home/r_predictions/r_commands.R #install prerequisite libraries RUN Rscript /home/r_predictions/r_commands.R #install prerequisite libraries
COPY --from=source ./src/dist/esm_forecaster-0.1.0.tar.gz /home/r_predictions/ COPY --from=source /src/dist/esm_forecaster-0.1.0.tar.gz /home/r_predictions/
COPY ./src/requirements.txt /home/r_predictions/ COPY --from=source /src/requirements.txt /home/r_predictions/
COPY ./src/prepare_python_dependencies.sh /home/r_predictions/ COPY --from=source /src/prepare_python_dependencies.sh /home/r_predictions/
RUN bash -x /home/r_predictions/prepare_python_dependencies.sh RUN bash -x /home/r_predictions/prepare_python_dependencies.sh
COPY ./src/r_predictors/forecasting_real_workload.R /home/r_predictions/ COPY --from=source /src/r_predictors/forecasting_real_workload.R /home/r_predictions/
COPY --from=source /src/r_predictors/prediction_configuration.properties /home/r_predictions/
#The two commented lines below only serve for experiments with predictive functionality
#below two commented lines only serve for experiments with predictive functionality
#COPY ./default_application.csv /home/r_predictions #COPY ./default_application.csv /home/r_predictions
#RUN Rscript forecasting_real_workload.R default_application.csv MinimumCores 1638878119 #RUN Rscript forecasting_real_workload.R default_application.csv MinimumCores 1638878119
WORKDIR /home/r_predictions/esm_forecaster-0.1.0 #WORKDIR /home/r_predictions/esm_forecaster-0.1.0
CMD ["/bin/sh","-c",". /home/forecasting_env/bin/activate && python3 -u /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties 2>&1 > $LOG_FILE "] CMD ["/usr/local/bin/start_exsmoothing","/home/r_predictions/prediction_configuration.properties"," > $LOG_FILE 2>&1 "]

View File

@ -0,0 +1,6 @@
from . import core
from . import handler
from . import settings
from . import connector

View File

@ -0,0 +1,84 @@
import logging
import os
from proton.reactor import Container
from exn.core import state_publisher, schedule_publisher
from exn.core.context import Context
from .core.manager import Manager
from .settings import base
from .handler import connector_handler
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
_logger = logging.getLogger(__name__)
class EXN:
context = None
container = None
def __init__(self, component=None,
handler:connector_handler.ConnectorHandler = None,
publishers=None,
consumers=None,
**kwargs):
# Load .env file
# Validate and set connector
if not component:
_logger.error("Component cannot be empty or None")
raise ValueError("Component cannot be empty or None")
self.component = component
self.url = kwargs.get('url',os.getenv('NEBULOUS_BROKER_URL'))
self.port = kwargs.get('port', os.getenv('NEBULOUS_BROKER_PORT'))
self.username = kwargs.get('username',os.getenv('NEBULOUS_BROKER_USERNAME'))
self.password = kwargs.get('password', os.getenv('NEBULOUS_BROKER_PASSWORD'))
self.handler = handler
# Validate attributes
if not self.url:
_logger.error("URL cannot be empty or None")
raise ValueError("URL cannot be empty or None")
if not self.port:
_logger.error("PORT cannot be empty or None")
raise ValueError("PORT cannot be empty or None")
if not self.username:
_logger.error("USERNAME cannot be empty or None")
raise ValueError("USERNAME cannot be empty or None")
if not self.password:
_logger.error("PASSWORD cannot be empty or None")
raise ValueError("PASSWORD cannot be empty or None")
self.context = Context(base=f"{base.NEBULOUS_BASE_NAME}.{self.component}")
if not publishers:
publishers = []
if not consumers:
consumers = []
compiled_publishers = publishers
if kwargs.get("enable_state",False):
compiled_publishers.append(state_publisher.Publisher())
if kwargs.get("enable_health",False):
compiled_publishers.append(schedule_publisher.Publisher(
base.NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT,
'health',
'health',
topic=True))
for c in consumers:
self.context.register_consumers(c)
for p in compiled_publishers:
self.context.register_publisher(p)
def start(self):
self.context.start(Manager(f"{self.url}:{self.port}"),self.handler)
def stop(self):
self.context.stop()

View File

@ -1,6 +1,7 @@
from . import context from . import context
from . import handler
from . import publisher from . import publisher
from . import consumer from . import consumer
from . import state_publisher from . import state_publisher

View File

@ -0,0 +1,43 @@
import logging
from proton import Event
from .handler import Handler
from . import link
from proton.handlers import MessagingHandler
_logger = logging.getLogger(__name__)
_logger.setLevel(level=logging.DEBUG)
class Consumer(link.Link, MessagingHandler):
application = None
def __init__(self, key, address, handler: Handler, application=None, topic=False, fqdn=False):
super(Consumer, self).__init__(key, address, topic, fqdn)
self.application = application
self.handler = handler
self.handler._consumer = self
def should_handle(self, event: Event):
should = event.link.name == self._link.name and \
(self.application is None or event.message.subject == self.application)
_logger.debug(f"[{self.key}] checking if link is the same {event.link.name}={self._link.name} "
f" and application {self.application}={event.message.subject} == {should}")
return should
def on_start(self, event: Event) -> None:
_logger.debug(f"[{self.key}] on_start")
def on_message(self, event):
_logger.debug(f"[{self.key}] handling event with address => {event.message.address}")
try:
if self.should_handle(event):
self.handler.on_message(self.key, event.message.address, event.message.body, event.message)
except Exception as e:
_logger.error(f"Received message: {e}")

View File

@ -0,0 +1,109 @@
import logging
from proton.reactor import Container
from . import link
from .manager import Manager
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.DEBUG)
class Context:
base = None
handler = None
publishers = {}
consumers = {}
_manager = None
def __init__(self, base):
self.base = base
def start(self, manager:Manager, handler):
self._manager = manager
def on_ready():
_logger.debug("[context] on_ready" )
for key,publisher in self.publishers.items():
self._manager.start_publisher(self,publisher)
for key,consumer in self.consumers.items():
self._manager.start_consumer(self,consumer)
handler.ready(context=self)
self._manager._on_ready=on_ready
self._manager.start()
def stop(self):
if self._manager is not None and self._manager.started:
for key,publisher in self.publishers:
publisher._link.close()
for key,consumer in self.consumers:
consumer._link.close()
self._manager.close()
def register_publisher(self, publisher):
if publisher.key in self.publishers:
_logger.warning("[context] Trying to register publisher that already exists")
return
_logger.info(f"[context] registering publisher {publisher.key} {publisher.address}" )
self.publishers[publisher.key] = publisher
if self._manager is not None and self._manager.started:
self._manager.start_publisher(self,publisher)
def get_publisher(self, key):
if key in self.publishers:
return self.publishers[key]
return None
def has_publisher(self, key):
return key in self.publishers
def has_consumer(self, key):
return key in self.consumers
def register_consumers(self, consumer):
if consumer.key in self.consumers:
_logger.warning("[context] Trying to register consumer that already exists")
return
self.consumers[consumer.key] = consumer
if self._manager is not None and self._manager.started:
self._manager.start_consumer(self,consumer)
def unregister_consumer(self, key):
if not key in self.consumers:
_logger.warning("[context] Trying to unregister consumer that does not exists")
return
consumer = self.consumers.pop(key)
if self._manager is not None and self._manager.started:
consumer._link.close()
def unregister_publisher(self, key):
if not key in self.consumers:
_logger.warning("[context] Trying to unregister publisher that does not exists")
return
publisher = self.publishers.pop(key)
if self._manager is not None and self._manager.started:
publisher._link.close()
def build_address_from_link(self, link: link.Link):
if link.fqdn:
address = link.address
if link.topic and not link.address.startswith("topic://"):
address = f"topic://{address}"
return address
address = f"{self.base}.{link.address}"
if link.topic:
address = f"topic://{address}"
return address

View File

@ -0,0 +1,11 @@
import logging
from proton import Message
_logger = logging.getLogger(__name__)
class Handler:
def on_message(self, key, address, body, message: Message, context=None):
_logger.info(f"You should really override this... {key}=>{address}")

View File

@ -1,16 +1,18 @@
from proton import Link as pLink from proton import Link as pLink
class Link: class Link:
fqdn=False fqdn=False
def __init__(self, key, address, topic=False, fqdn=False):
def __init__(self, key, address, topic=False, fqdn=False):
super().__init__()
self.key = key self.key = key
self.address = address self.address = address
self._link = None
self.topic= topic self.topic= topic
self.fqdn= fqdn self.fqdn= fqdn
self._link = None
def set(self, link:pLink): def set(self, link:pLink):
# The proton container creates a sender # The proton container creates a sender

View File

@ -0,0 +1,71 @@
import logging
from proton import Event, Connection,Session
from proton.handlers import MessagingHandler
from proton.reactor import Container
from .consumer import Consumer
from .publisher import Publisher
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.DEBUG)
class SessionPerConsumer(object):
def session(self, connection: Connection) -> Session:
session = connection.session()
session.open()
return session
class Manager(MessagingHandler):
uri = None
started = False
container = None
connection = None
_on_ready = None
def __init__(self, uri):
super(Manager, self).__init__()
self.uri = uri
def start(self):
_logger.info(f"[manager] starting")
self.container = Container(self)
self.container.run()
def on_start(self, event: Event) -> None:
self.connection = self.container.connect(self.uri)
self.connection._session_policy=SessionPerConsumer()
self.started=True
_logger.debug(f"[manager] on_start")
if self._on_ready is not None:
self._on_ready()
def on_message(self, event: Event) -> None:
_logger.warning(f"[manager] received generic on_message make sure you have set up your handlers"
f" properly ")
def close(self):
_logger.info(f"[manager] closing")
if self.container:
self.container.stop()
if self.connection:
self.connection.close()
def start_publisher(self, context, publisher: Publisher):
address = context.build_address_from_link(publisher)
_logger.info(f"[manager] starting publisher {publisher.key} => {address}")
publisher.set(self.container.create_sender(self.connection, address))
if hasattr(publisher, "delay"):
_logger.debug(f"{context.base} registering timer {hasattr(publisher, 'delay')}")
self.container.schedule(publisher.delay, handler=publisher)
def start_consumer(self, context, consumer: Consumer):
address = context.build_address_from_link(consumer)
_logger.info(f"[manager] starting consumer {consumer.key} => {address}")
consumer.set(self.container.create_receiver(self.connection, address , handler=consumer))

View File

@ -10,12 +10,15 @@ _logger = logging.getLogger(__name__)
class Publisher(link.Link): class Publisher(link.Link):
def send(self, body=None): def send(self, body=None, application=None):
if not body: if not body:
body = {} body = {}
_logger.debug(f"{self.address} Sending {body} ") _logger.info(f"[{self.key}] sending to {self._link.target.address} for application={application} - {body} ")
msg = self._prepare_message(body) msg = self._prepare_message(body)
if application:
msg.subject = application
self._link.send(msg) self._link.send(msg)
def _prepare_message(self, body=None): def _prepare_message(self, body=None):

View File

@ -0,0 +1,24 @@
import logging
from proton.handlers import MessagingHandler
from .publisher import Publisher
_logger = logging.getLogger(__name__)
class Publisher(Publisher, MessagingHandler):
send_next = False
delay = 15
def __init__(self, delay, key, address, application=None, topic=False, fqdn=False):
super(Publisher, self).__init__(key, address, topic,fqdn)
self.delay = delay
self.application = application
def on_timer_task(self, event):
_logger.debug(f"[manager] on_timer_task")
self.send()
event.reactor.schedule(self.delay, self)
def send(self, body=None, application=None):
super(Publisher, self).send(body, self.application)

View File

@ -27,19 +27,19 @@ class Publisher(publisher.Publisher):
self.send({"state": message_type,"message": None}) self.send({"state": message_type,"message": None})
def starting(self): def starting(self):
self._send_message(States.STARTING) self._send_message(States.STARTING.value)
def started(self): def started(self):
self._send_message(States.STARTED) self._send_message(States.STARTED.value)
def ready(self): def ready(self):
self._send_message(States.READY) self._send_message(States.READY.value)
def stopping(self): def stopping(self):
self._send_message(States.STOPPING) self._send_message(States.STOPPING.value)
def stopped(self): def stopped(self):
self._send_message(States.STOPPED) self._send_message(States.STOPPED.value)
def custom(self, state): def custom(self, state):
self._send_message(state) self._send_message(state)

View File

@ -0,0 +1,2 @@
from . import connector_handler

View File

@ -0,0 +1,12 @@
import logging
_logger = logging.getLogger(__name__)
class ConnectorHandler:
def ready(self, context):
pass

View File

@ -0,0 +1,2 @@
NEBULOUS_BASE_NAME="eu.nebulouscloud"
NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT=15

View File

@ -1,10 +1,10 @@
EXPONENTIAL_SMOOTHING_VERSION="0.1.0" EXPONENTIAL_SMOOTHING_VERSION="0.1.0"
python3 -m venv /home/forecasting_env ##python3 -m venv /home/forecasting_env
. /home/forecasting_env/bin/activate ##. /home/forecasting_env/bin/activate
pip3 install --no-cache-dir -r /home/r_predictions/requirements.txt pip3 install --no-cache-dir -r /home/r_predictions/requirements.txt
cd /home/r_predictions cd /home/r_predictions
# Install the module itself (provided that the tar.gz file of the module has already been copied inside the container) # Install the module itself (provided that the tar.gz file of the module has already been copied inside the container)
pip install esm_forecaster-$EXPONENTIAL_SMOOTHING_VERSION.tar.gz pip install esm_forecaster-$EXPONENTIAL_SMOOTHING_VERSION.tar.gz #--break-system-packages
tar -xzvf esm_forecaster-$EXPONENTIAL_SMOOTHING_VERSION.tar.gz ##tar -xzvf esm_forecaster-$EXPONENTIAL_SMOOTHING_VERSION.tar.gz

View File

@ -20,8 +20,8 @@ library(purrr)
# #
# This forecasting script relies on the presence of a dataset which contains the metric values to be forecasted. It is called with three main parameters - dataset path, metric to be forecasted and the time for which the forecast should be produced - and two optional parameters, the alpha and beta coefficients to be used during forecasting. The time for which the forecast should be produced may be ommitted under some circumstances. # This forecasting script relies on the presence of a dataset which contains the metric values to be forecasted. It is called with three main parameters - dataset path, metric to be forecasted and the time for which the forecast should be produced - and two optional parameters, the alpha and beta coefficients to be used during forecasting. The time for which the forecast should be produced may be ommitted under some circumstances.
# #
# To create the final dataset which will be used for predictions, this script creates a timeseries with all times from the beginning of the observations in the dataset, until its end, using 1-second intervals (to allow for predictions based on epoch). In order for the exponential smoothing forecaster to operate satisfactorily, it is necessary to set the `number_of_seconds_to_aggregate_on` variable to a value which is large enough to smooth small fluctuations, yet small enough to allow for reasonable reaction times (e.g 300 seconds). # To create the final dataset which will be used for predictions, this script creates a timeseries with all times from the beginning of the observations in the dataset, until its end, using 1-second intervals (to allow for predictions based on epoch). In order for the exponential smoothing forecaster to operate satisfactorily, it is necessary to set the `number_of_seconds_to_aggregate_on` variable to a value which is large enough to smooth small fluctuations, yet small enough to allow for reasonable reaction times (e.g 300 seconds). Beware, this number_of_seconds_to_aggregate_on variable is changed (probably increased) at runtime from its initial configuration so that the forecaster does not consume too much time trying to create predictions. This means that more observations will be necessary to guarantee accurate predictions
# Once the creation of the dataset is over, the `configuration_forecasting_horizon` configuration property is evaluated. If this value is positive, the time for which the forecast should be made should be provided as a command line argument, and this allows the formation of a training dataset and a test dataset. If a non-positive horizon is provided, then the `realtime_mode` configuration property is evaluated. In case that this is false, the prediction time does not need to be provided (it means we simply want to evaluate the predictive functionality based on past data), and the next prediction time will be the time of the last observation in the dataset. If the realtime mode parameter is true, then the prediction time needs to be provided, and the script will try to create a prediction using the minimum value between the next prediction time and the last observation time which is available in the dataset - in this case the next prediction time is also needed(TODO:perhaps this behaviour should be changed). # Once the creation of the dataset is over, the `configuration_forecasting_horizon` configuration property is evaluated. If this value is positive, the time for which the forecast should be made should be provided as a command line argument, and this allows the formation of a training dataset and a test dataset. If a non-positive horizon is provided, then the `realtime_mode` configuration property is evaluated. In case that this is false, the prediction time does not need to be provided (it means we simply want to evaluate the predictive functionality based on past data), and the next prediction time will be the time of the last observation in the dataset. If the realtime mode parameter is true, then the prediction time needs to be provided, and the script will try to create a prediction using the maximum value between the next prediction time and the last observation time which is available in the dataset - in this case the next prediction time is needed as well.
#Then, the final data points which will be used for the forecasting are determined, and the forecasting models are created, to produce predictions. The user of the script can opt to try finding the best parameters manually, using the `try_to_optimize_parameters` configuration parameter. #Then, the final data points which will be used for the forecasting are determined, and the forecasting models are created, to produce predictions. The user of the script can opt to try finding the best parameters manually, using the `try_to_optimize_parameters` configuration parameter.
find_smape <- function(actual, forecast) { find_smape <- function(actual, forecast) {
@ -92,7 +92,14 @@ beta_value_argument <- as.double(args[5])
#mydata <- read.csv(configuration_properties$input_data_file, sep=",", header=TRUE) #mydata <- read.csv(configuration_properties$input_data_file, sep=",", header=TRUE)
#mydata <- read.csv(dataset_to_process, sep=",", header=TRUE) #mydata <- read.csv(dataset_to_process, sep=",", header=TRUE)
data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE) if (file.info(dataset_to_process)$size > 0) {
# File is not empty, proceed with reading
data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE)
} else {
# File is empty, handle accordingly (e.g., show a message or skip the reading process)
print(paste("The file ",dataset_to_process," is empty. Please provide a non-empty file."))
stop()
}
#sanitize data_to_process by removing any very old values which may have been accidentally introduced. For this reason we remove all data points before now - number_of_days*24hrs*3600sec/hr seconds, and we additionally subtract configuration_properties$prediction_processing_time_safety_margin_seconds in order to account for the time it takes to create the dataset and start the prediction process) #sanitize data_to_process by removing any very old values which may have been accidentally introduced. For this reason we remove all data points before now - number_of_days*24hrs*3600sec/hr seconds, and we additionally subtract configuration_properties$prediction_processing_time_safety_margin_seconds in order to account for the time it takes to create the dataset and start the prediction process)
current_time <- get_current_epoch_time() current_time <- get_current_epoch_time()
if (!realtime_mode){ if (!realtime_mode){
@ -293,13 +300,13 @@ if (try_to_optimize_parameters){
#Creation of forecasting model #Creation of forecasting model
if (try_to_optimize_parameters){ if (try_to_optimize_parameters){
holt_winters_forecasting_model <- HoltWinters(mydata_trainseries,alpha=optimal_alpha,beta=optimal_beta,gamma=optimal_gamma) holt_winters_forecasting_model <- HoltWinters(mydata_trainseries,alpha=optimal_alpha,beta=optimal_beta,gamma=optimal_gamma)
ets_forecasting_model <- tryCatch({ ets_forecasting_model <- tryCatch({
ets(mydata_trainseries,alpha = optimal_alpha,beta = optimal_beta,gamma = optimal_gamma) #phi is left to be optimized ets(mydata_trainseries,alpha = optimal_alpha,beta = optimal_beta,gamma = optimal_gamma) #phi is left to be optimized
}, error = function(e) { }, error = function(e) {
NULL NULL
}) })
@ -332,6 +339,19 @@ if (try_to_optimize_parameters){
}, error = function(e) { }, error = function(e) {
NULL NULL
}) })
if (length(mydata_trainseries)<3){
print("Possible issue expected with a very small trainseries (length is less than 3). The contents of the trainseries are the following:")
print(mydata_trainseries)
print("This trainseries originated from the following aggregated data:")
print(mydata.train)
print(paste("The number of seconds to aggregate on is:",number_of_seconds_to_aggregate_on))
print("The above aggregated data was based on these training data points: ")
print(training_datapoints)
print("These training data points originate from these data points:")
print(data_points)
print(paste("by using the first", number_of_data_points_used_for_training, "data points"))
}
holt_winters_forecasting_model <- HoltWinters(mydata_trainseries,gamma=FALSE) holt_winters_forecasting_model <- HoltWinters(mydata_trainseries,gamma=FALSE)
} }
} }

View File

@ -5,15 +5,15 @@
APP_NAME=default_application APP_NAME=default_application
METHOD=exponential_smoothing METHOD=exponential_smoothing
INFLUXDB_HOSTNAME=localhost INFLUXDB_HOSTNAME=nebulous-influxdb
INFLUXDB_PORT=8086 INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic INFLUXDB_USERNAME=my-user
INFLUXDB_PASSWORD=password INFLUXDB_PASSWORD=my-password
INFLUXDB_DBNAME=morphemic INFLUXDB_ORG=my-org
INFLUXDB_ORG=morphemic INFLUXDB_ORG_ID=9c929742d57cca02
broker_address=localhost broker_address=nebulous-activemq
broker_port=61613 broker_port=5672
broker_username=admin broker_username=admin
broker_password=admin broker_password=admin
prediction_method=Holt-Winters prediction_method=Holt-Winters

View File

@ -1,14 +1,14 @@
#Fri Jan 12 17:06:48 UTC 2024 #Tue May 14 12:59:33 UTC 2024
APP_NAME=default_application APP_NAME=default_application
METHOD=exponential_smoothing METHOD=exponential_smoothing
INFLUXDB_HOSTNAME=localhost INFLUXDB_HOSTNAME=nebulous-influxdb
INFLUXDB_PORT=8086 INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic INFLUXDB_USERNAME=my-user
INFLUXDB_PASSWORD=password INFLUXDB_PASSWORD=my-password
INFLUXDB_DBNAME=morphemic INFLUXDB_ORG=my-org
INFLUXDB_ORG=morphemic INFLUXDB_ORG_ID=9c929742d57cca02
broker_address=localhost broker_address=nebulous-activemq
broker_port=61610 broker_port=5672
broker_username=morphemic broker_username=morphemic
broker_password=morphemic broker_password=morphemic
prediction_method=Holt-Winters prediction_method=Holt-Winters

View File

@ -1,9 +1,7 @@
python-slugify==8.0.1
jproperties==2.1.1 jproperties==2.1.1
requests==2.31.0 requests==2.31.0
msgpack==1.0.7
numpy==1.26.3 numpy==1.26.3
pandas==2.1.4
python-dotenv==1.0.0
python-qpid-proton==0.39.0 python-qpid-proton==0.39.0
influxdb-client==1.39.0 influxdb-client==1.39.0
python-dotenv==1.0.0
python-dateutil==2.8.2

View File

@ -12,56 +12,105 @@ import os, sys
import multiprocessing import multiprocessing
import traceback import traceback
from subprocess import PIPE, run from subprocess import PIPE, run
from runtime.exn import core from exn import core
import logging import logging
from runtime.exn import connector from exn import connector
from exn.core.handler import Handler
from exn.handler.connector_handler import ConnectorHandler
from runtime.operational_status.ApplicationState import ApplicationState
from runtime.predictions.Prediction import Prediction from runtime.predictions.Prediction import Prediction
from runtime.operational_status.State import State from runtime.operational_status.EsPredictorState import EsPredictorState
from runtime.utilities.PredictionPublisher import PredictionPublisher from runtime.utilities.PredictionPublisher import PredictionPublisher
from runtime.utilities.Utilities import Utilities from runtime.utilities.Utilities import Utilities
print_with_time = Utilities.print_with_time print_with_time = Utilities.print_with_time
def sanitize_prediction_statistics(prediction_confidence_interval, prediction_value, metric_name, application_state):
print_with_time("Inside the sanitization process with an interval of " + prediction_confidence_interval +" and a prediction of " + str(prediction_value))
lower_value_prediction_confidence_interval = float(prediction_confidence_interval.split(",")[0])
upper_value_prediction_confidence_interval = float(prediction_confidence_interval.split(",")[1])
"""if (not application_name in EsPredictorState.individual_application_state):
print_with_time("There is an issue with the application name"+application_name+" not existing in individual application states")
return prediction_confidence_interval,prediction_value_produced"""
lower_bound_value = application_state.lower_bound_value
upper_bound_value = application_state.upper_bound_value
print("Lower_bound_value is "+str(lower_bound_value))
confidence_interval_modified = False
new_prediction_confidence_interval = prediction_confidence_interval
if (not (metric_name in lower_bound_value)) or (not (metric_name in upper_bound_value)):
print_with_time(f"Lower value is unmodified - {lower_value_prediction_confidence_interval} and upper value is unmodified - {upper_value_prediction_confidence_interval}")
return new_prediction_confidence_interval,prediction_value
if (lower_value_prediction_confidence_interval < lower_bound_value[metric_name]):
lower_value_prediction_confidence_interval = lower_bound_value[metric_name]
confidence_interval_modified = True
elif (lower_value_prediction_confidence_interval > upper_bound_value[metric_name]):
lower_value_prediction_confidence_interval = upper_bound_value[metric_name]
confidence_interval_modified = True
if (upper_value_prediction_confidence_interval> upper_bound_value[metric_name]):
upper_value_prediction_confidence_interval = upper_bound_value[metric_name]
confidence_interval_modified = True
elif (upper_value_prediction_confidence_interval < lower_bound_value[metric_name]):
upper_value_prediction_confidence_interval = lower_bound_value[metric_name]
confidence_interval_modified = True
if confidence_interval_modified:
new_prediction_confidence_interval = str(lower_value_prediction_confidence_interval)+","+str(upper_value_prediction_confidence_interval)
print_with_time("The confidence interval "+prediction_confidence_interval+"was modified, becoming "+str(new_prediction_confidence_interval)+", taking into account the values of the metric")
if (prediction_value<lower_bound_value[metric_name]):
print_with_time("The prediction value of " + str(prediction_value) + " for metric " + metric_name + " was sanitized to " + str(lower_bound_value))
prediction_value = lower_bound_value
elif (prediction_value > upper_bound_value[metric_name]):
print_with_time("The prediction value of " + str(prediction_value) + " for metric " + metric_name + " was sanitized to " + str(upper_bound_value))
prediction_value = upper_bound_value
return new_prediction_confidence_interval,prediction_value
def predict_attribute(application_state, attribute, configuration_file_location,next_prediction_time):
def predict_attribute(attribute, configuration_file_location,next_prediction_time):
prediction_confidence_interval_produced = False prediction_confidence_interval_produced = False
prediction_value_produced = False prediction_value_produced = False
prediction_valid = False prediction_valid = False
#os.chdir(os.path.dirname(configuration_file_location)) #os.chdir(os.path.dirname(configuration_file_location))
State.prediction_data_filename = Utilities.get_prediction_data_filename(configuration_file_location,attribute) application_state.prediction_data_filename = application_state.get_prediction_data_filename(configuration_file_location,attribute)
from sys import platform from sys import platform
if State.testing_prediction_functionality: if EsPredictorState.testing_prediction_functionality:
print_with_time("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data") print_with_time("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data")
print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(State.prediction_data_filename)+" "+attribute) print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename)+" "+attribute)
# Windows # Windows
if platform == "win32": if platform == "win32":
command = ['Rscript', 'forecasting_real_workload.R', State.prediction_data_filename, attribute] os.chdir("exponential-smoothing-predictor/src/r_predictors")
command = ['Rscript', 'forecasting_real_workload.R', application_state.prediction_data_filename, attribute]
# linux # linux
elif platform == "linux" or platform == "linux2": elif platform == "linux" or platform == "linux2":
command = ["Rscript forecasting_real_workload.R "+str(State.prediction_data_filename) + " "+ str(attribute)] os.chdir("/home/r_predictions")
command = ["Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename) + " "+ str(attribute)]
#Choosing the solution of linux #Choosing the solution of linux
else: else:
command = ["Rscript forecasting_real_workload.R "+str(State.prediction_data_filename) + " "+ str(attribute)] command = ["Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename) + " "+ str(attribute)]
else: else:
print_with_time("The current directory is "+os.path.abspath(os.getcwd())) print_with_time("The current directory is "+os.path.abspath(os.getcwd()))
print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(State.prediction_data_filename)+" "+attribute+" "+next_prediction_time) print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename)+" "+attribute+" "+next_prediction_time)
# Windows # Windows
if platform == "win32": if platform == "win32":
command = ['Rscript', 'forecasting_real_workload.R', State.prediction_data_filename, attribute, next_prediction_time] os.chdir("exponential-smoothing-predictor/src/r_predictors")
command = ['Rscript', 'forecasting_real_workload.R', application_state.prediction_data_filename, attribute, next_prediction_time]
# Linux # Linux
elif platform == "linux" or platform == "linux2": elif platform == "linux" or platform == "linux2":
command = ["Rscript forecasting_real_workload.R "+str(State.prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time) + " 2>&1"] os.chdir("/home/r_predictions")
command = ["Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time) + " 2>&1"]
#Choosing the solution of linux #Choosing the solution of linux
else: else:
command = ["Rscript forecasting_real_workload.R "+str(State.prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time)] os.chdir("/home/r_predictions")
command = ["Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time)]
process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True) process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
if (process_output.stdout==""): if (process_output.stdout==""):
@ -91,6 +140,7 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim
elif (string.startswith("smape:")): elif (string.startswith("smape:")):
prediction_smape = string.replace("smape:", "") prediction_smape = string.replace("smape:", "")
if (prediction_confidence_interval_produced and prediction_value_produced): if (prediction_confidence_interval_produced and prediction_value_produced):
prediction_confidence_interval,prediction_value = sanitize_prediction_statistics(prediction_confidence_interval,float(prediction_value),attribute,application_state)
prediction_valid = True prediction_valid = True
print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval) print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval)
else: else:
@ -101,7 +151,8 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim
return output_prediction return output_prediction
def predict_attributes(attributes,next_prediction_time): def predict_attributes(application_state,next_prediction_time):
attributes = application_state.metrics_to_predict
pool = multiprocessing.Pool(len(attributes)) pool = multiprocessing.Pool(len(attributes))
print_with_time("Prediction thread pool size set to " + str(len(attributes))) print_with_time("Prediction thread pool size set to " + str(len(attributes)))
attribute_predictions = {} attribute_predictions = {}
@ -109,7 +160,7 @@ def predict_attributes(attributes,next_prediction_time):
for attribute in attributes: for attribute in attributes:
print_with_time("Starting " + attribute + " prediction thread") print_with_time("Starting " + attribute + " prediction thread")
start_time = time.time() start_time = time.time()
attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, State.configuration_file_location,str(next_prediction_time)]) attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[application_state,attribute, EsPredictorState.configuration_file_location, str(next_prediction_time)])
#attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, configuration_file_location,str(next_prediction_time)]).get() #attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, configuration_file_location,str(next_prediction_time)]).get()
for attribute in attributes: for attribute in attributes:
@ -138,43 +189,45 @@ def update_prediction_time(epoch_start,prediction_horizon,maximum_time_for_predi
return prediction_time return prediction_time
def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_for_prediction): def calculate_and_publish_predictions(application_state,maximum_time_required_for_prediction):
while Bootstrap.start_forecasting: start_forecasting = application_state.start_forecasting
print_with_time("Using " + State.configuration_file_location + " for configuration details...")
State.next_prediction_time = update_prediction_time(State.epoch_start, prediction_horizon,maximum_time_required_for_prediction)
for attribute in State.metrics_to_predict: while start_forecasting:
if ((State.previous_prediction is not None) and (State.previous_prediction[attribute] is not None) and (State.previous_prediction[attribute].last_prediction_time_needed>maximum_time_required_for_prediction)): print_with_time("Using " + EsPredictorState.configuration_file_location + " for configuration details...")
maximum_time_required_for_prediction = State.previous_prediction[attribute].last_prediction_time_needed application_state.next_prediction_time = update_prediction_time(application_state.epoch_start, application_state.prediction_horizon,maximum_time_required_for_prediction)
for attribute in application_state.metrics_to_predict:
if ((application_state.previous_prediction is not None) and (application_state.previous_prediction[attribute] is not None) and (application_state.previous_prediction[attribute].last_prediction_time_needed>maximum_time_required_for_prediction)):
maximum_time_required_for_prediction = application_state.previous_prediction[attribute].last_prediction_time_needed
#Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval #Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval
wait_time = State.next_prediction_time - prediction_horizon - time.time() wait_time = application_state.next_prediction_time - application_state.prediction_horizon - time.time()
print_with_time("Waiting for "+str((int(wait_time*100))/100)+" seconds, until time "+datetime.datetime.fromtimestamp(State.next_prediction_time - prediction_horizon).strftime('%Y-%m-%d %H:%M:%S')) print_with_time("Waiting for "+str((int(wait_time*100))/100)+" seconds, until time "+datetime.datetime.fromtimestamp(application_state.next_prediction_time - application_state.prediction_horizon).strftime('%Y-%m-%d %H:%M:%S'))
if (wait_time>0): if (wait_time>0):
time.sleep(wait_time) time.sleep(wait_time)
if(not Bootstrap.start_forecasting): if(not start_forecasting):
break break
Utilities.load_configuration() Utilities.load_configuration()
Utilities.update_monitoring_data() application_state.update_monitoring_data()
first_prediction = None first_prediction = None
for prediction_index in range(0,State.total_time_intervals_to_predict): for prediction_index in range(0, EsPredictorState.total_time_intervals_to_predict):
prediction_time = int(State.next_prediction_time)+prediction_index*prediction_horizon prediction_time = int(application_state.next_prediction_time)+prediction_index*application_state.prediction_horizon
try: try:
print_with_time ("Initiating predictions for all metrics for next_prediction_time, which is "+str(State.next_prediction_time)) print_with_time ("Initiating predictions for all metrics for next_prediction_time, which is "+str(application_state.next_prediction_time))
prediction = predict_attributes(State.metrics_to_predict,prediction_time) prediction = predict_attributes(application_state,prediction_time)
if (prediction_time == int(State.next_prediction_time)): if (prediction_time == int(application_state.next_prediction_time)):
first_prediction = prediction first_prediction = prediction
except Exception as e: except Exception as e:
print_with_time("Could not create a prediction for some or all of the metrics for time point "+str(State.next_prediction_time)+", proceeding to next prediction time. However, "+str(prediction_index)+" predictions were produced (out of the configured "+State.total_time_intervals_to_predict+"). The encountered exception trace follows:") print_with_time("Could not create a prediction for some or all of the metrics for time point " + str(application_state.next_prediction_time) +", proceeding to next prediction time. However, " + str(prediction_index) +" predictions were produced (out of the configured " + str(EsPredictorState.total_time_intervals_to_predict) + "). The encountered exception trace follows:")
print(e) print(traceback.format_exc())
#continue was here, to continue while loop, replaced by break #continue was here, to continue while loop, replaced by break
break break
for attribute in State.metrics_to_predict: for attribute in application_state.metrics_to_predict:
if(not prediction[attribute].prediction_valid): if(not prediction[attribute].prediction_valid):
#continue was here, to continue while loop, replaced by break #continue was here, to continue while loop, replaced by break
break break
if (State.disconnected or State.check_stale_connection()): if (EsPredictorState.disconnected or EsPredictorState.check_stale_connection()):
logging.info("Possible problem due to disconnection or a stale connection") logging.info("Possible problem due to disconnection or a stale connection")
#State.connection.connect() #State.connection.connect()
message_not_sent = True message_not_sent = True
@ -183,16 +236,13 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
"metricValue": float(prediction[attribute].value), "metricValue": float(prediction[attribute].value),
"level": 3, "level": 3,
"timestamp": current_time, "timestamp": current_time,
"probability": 0.95, "probability": 0.95, #This is the default second parameter of the prediction intervals (first is 80%) created as part of the HoltWinters forecasting mode in R
"confidence_interval": [float(prediction[attribute].lower_confidence_interval_value) , float( "confidence_interval": [float(prediction[attribute].lower_confidence_interval_value) , float(
prediction[attribute].upper_confidence_interval_value)], prediction[attribute].upper_confidence_interval_value)],
"predictionTime": prediction_time, "predictionTime": prediction_time,
"refersTo": "todo",
"cloud": "todo",
"provider": "todo",
} }
training_models_message_body = { training_models_message_body = {
"metrics": State.metrics_to_predict, "metrics": application_state.metrics_to_predict,
"forecasting_method": "exponentialsmoothing", "forecasting_method": "exponentialsmoothing",
"timestamp": current_time, "timestamp": current_time,
} }
@ -200,7 +250,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
try: try:
#for publisher in State.broker_publishers: #for publisher in State.broker_publishers:
# if publisher. # if publisher.
for publisher in State.broker_publishers: for publisher in EsPredictorState.broker_publishers:
#if publisher.address=="eu.nebulouscloud.monitoring.preliminary_predicted.exponentialsmoothing"+attribute: #if publisher.address=="eu.nebulouscloud.monitoring.preliminary_predicted.exponentialsmoothing"+attribute:
if publisher.key=="publisher_"+attribute: if publisher.key=="publisher_"+attribute:
@ -211,16 +261,16 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
#State.connection.send_to_topic('training_models',training_models_message_body) #State.connection.send_to_topic('training_models',training_models_message_body)
message_not_sent = False message_not_sent = False
print_with_time("Successfully sent prediction message for %s to topic eu.nebulouscloud.preliminary_predicted.%s.%s\n\n%s\n\n" % (attribute, id, attribute, prediction_message_body)) print_with_time("Successfully sent prediction message for %s to topic eu.nebulouscloud.preliminary_predicted.%s.%s:\n\n%s\n\n" % (attribute, EsPredictorState.forecaster_name, attribute, prediction_message_body))
except ConnectionError as exception: except ConnectionError as exception:
#State.connection.disconnect() #State.connection.disconnect()
#State.connection = messaging.morphemic.Connection('admin', 'admin') #State.connection = messaging.morphemic.Connection('admin', 'admin')
#State.connection.connect() #State.connection.connect()
logging.error("Error sending intermediate prediction"+str(exception)) logging.error("Error sending intermediate prediction"+str(exception))
State.disconnected = False EsPredictorState.disconnected = False
if (first_prediction is not None): if (first_prediction is not None):
State.previous_prediction = first_prediction #first_prediction is the first of the batch of the predictions which are produced. The size of this batch is set by the State.total_time_intervals_to_predict (currently set to 8) application_state.previous_prediction = first_prediction #first_prediction is the first of the batch of the predictions which are produced. The size of this batch is set by the State.total_time_intervals_to_predict (currently set to 8)
#State.number_of_days_to_use_data_from = (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / (wait_time / State.number_of_days_to_use_data_from) #State.number_of_days_to_use_data_from = (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / (wait_time / State.number_of_days_to_use_data_from)
#State.number_of_days_to_use_data_from = 1 + int( #State.number_of_days_to_use_data_from = 1 + int(
@ -230,10 +280,10 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
#class Listener(messaging.listener.MorphemicListener): #class Listener(messaging.listener.MorphemicListener):
class BootStrap(ConnectorHandler):
pass
class ConsumerHandler(Handler):
class Bootstrap(connector.ConnectorHandler):
start_forecasting = None # Whether the component should start (or keep on) forecasting
prediction_thread = None prediction_thread = None
def ready(self, context): def ready(self, context):
@ -244,55 +294,102 @@ class Bootstrap(connector.ConnectorHandler):
context.publishers['state'].stopping() context.publishers['state'].stopping()
context.publishers['state'].stopped() context.publishers['state'].stopped()
context.publishers['publisher_cpu_usage'].send({ #context.publishers['publisher_cpu_usage'].send({
'hello': 'world' # 'hello': 'world'
}) #})
def on_message(self, key, address, body, context, **kwargs): def on_message(self, key, address, body, context, **kwargs):
application_name = "default_application" address = address.replace("topic://"+EsPredictorState.GENERAL_TOPIC_PREFIX,"")
address = address.replace("topic://eu.nebulouscloud.","") if (address).startswith(EsPredictorState.MONITORING_DATA_PREFIX):
if (address).startswith(State.MONITORING_DATA_PREFIX): address = address.replace(EsPredictorState.MONITORING_DATA_PREFIX, "", 1)
address = address.replace(State.MONITORING_DATA_PREFIX+".","",1)
logging.info("New monitoring data arrived at topic "+address) logging.info("New monitoring data arrived at topic "+address)
logging.info(body) if address == 'metric_list':
application_name = body["name"]
message_version = body["version"]
application_state = None
individual_application_state = {}
application_already_defined = application_name in EsPredictorState.individual_application_state
if ( application_already_defined and
( message_version == EsPredictorState.individual_application_state[application_state].message_version )
):
individual_application_state = EsPredictorState.individual_application_state
application_state = individual_application_state[application_name]
elif (address).startswith(State.FORECASTING_CONTROL_PREFIX): print_with_time("Using existing application definition for "+application_name)
address = address.replace(State.FORECASTING_CONTROL_PREFIX+".","",1) else:
logging.info("The address is " + address) if (application_already_defined):
print_with_time("Updating application "+application_name+" based on new metrics list message")
else:
print_with_time("Creating new application "+application_name)
application_state = ApplicationState(application_name,message_version)
metric_list_object = body["metric_list"]
lower_bound_value = application_state.lower_bound_value
upper_bound_value = application_state.upper_bound_value
for metric_object in metric_list_object:
lower_bound_value[metric_object["name"]]=float(metric_object["lower_bound"])
upper_bound_value[metric_object["name"]]=float(metric_object["upper_bound"])
if address == 'metrics_to_predict': application_state.lower_bound_value.update(lower_bound_value)
application_state.upper_bound_value.update(upper_bound_value)
State.initial_metric_list_received = True application_state.initial_metric_list_received = True
print_with_time("Inside message handler for metrics_to predict")
individual_application_state[application_name] = application_state
EsPredictorState.individual_application_state.update(individual_application_state)
#body = json.loads(body) #body = json.loads(body)
#for element in body: #for element in body:
# State.metrics_to_predict.append(element["metric"]) # State.metrics_to_predict.append(element["metric"])
elif address == 'test.exponentialsmoothing':
State.testing_prediction_functionality = True elif (address).startswith(EsPredictorState.FORECASTING_CONTROL_PREFIX):
address = address.replace(EsPredictorState.FORECASTING_CONTROL_PREFIX, "", 1)
logging.info("The address is " + address)
if address == 'test.exponentialsmoothing':
EsPredictorState.testing_prediction_functionality = True
elif address == 'start_forecasting.exponentialsmoothing': elif address == 'start_forecasting.exponentialsmoothing':
try: try:
State.metrics_to_predict = body["metrics"] application_name = body["name"]
print_with_time("Received request to start predicting the following metrics: "+ ",".join(State.metrics_to_predict)) message_version = 0
State.broker_publishers = [] if (not "version" in body):
for metric in State.metrics_to_predict: logging.info("There was an issue in finding the message version in the body of the start forecasting message, assuming it is 1")
State.broker_publishers.append (PredictionPublisher(metric)) message_version = 1
State.publishing_connector = connector.EXN('publishing_exsmoothing', handler=Bootstrap(),#consumers=list(State.broker_consumers), else:
consumers=[], message_version = body["version"]
publishers=State.broker_publishers, if (application_name in EsPredictorState.individual_application_state) and (message_version <= EsPredictorState.individual_application_state[application_name].message_version):
url="localhost", application_state = EsPredictorState.individual_application_state[application_name]
port="5672", else:
username="admin", EsPredictorState.individual_application_state[application_name] = ApplicationState(application_name,message_version)
password="admin" application_state = EsPredictorState.individual_application_state[application_name]
)
thread = threading.Thread(target=State.publishing_connector.start,args=()) if (not application_state.start_forecasting) or ((application_state.metrics_to_predict is not None) and (len(application_state.metrics_to_predict)<=len(body["metrics"]))):
application_state.metrics_to_predict = body["metrics"]
print_with_time("Received request to start predicting the following metrics: "+ ",".join(application_state.metrics_to_predict)+" for application "+application_name+", proceeding with the prediction process")
else:
application_state.metrics_to_predict = body["metrics"]
print_with_time("Received request to start predicting the following metrics: "+ body["metrics"]+" for application "+application_name+"but it was perceived as a duplicate")
return
application_state.broker_publishers = []
for metric in application_state.metrics_to_predict:
EsPredictorState.broker_publishers.append (PredictionPublisher(application_name,metric))
EsPredictorState.publishing_connector = connector.EXN('publishing_'+EsPredictorState.forecaster_name+'-'+application_name, handler=BootStrap(), #consumers=list(State.broker_consumers),
consumers=[],
publishers=EsPredictorState.broker_publishers,
url=EsPredictorState.broker_address,
port=EsPredictorState.broker_port,
username=EsPredictorState.broker_username,
password=EsPredictorState.broker_password
)
#EsPredictorState.publishing_connector.start()
thread = threading.Thread(target=EsPredictorState.publishing_connector.start, args=())
thread.start() thread.start()
except Exception as e: except Exception as e:
print_with_time("Could not load json object to process the start forecasting message \n"+str(body)) print_with_time("Could not load json object to process the start forecasting message \n"+str(body))
print(traceback.format_exc())
return return
#if (not State.initial_metric_list_received): #if (not State.initial_metric_list_received):
@ -301,49 +398,53 @@ class Bootstrap(connector.ConnectorHandler):
# return # return
try: try:
Bootstrap.start_forecasting = True application_state = EsPredictorState.individual_application_state[application_name]
State.epoch_start = body["epoch_start"] application_state.start_forecasting = True
prediction_horizon = int(body["prediction_horizon"]) application_state.epoch_start = body["epoch_start"]
State.next_prediction_time = update_prediction_time(State.epoch_start,prediction_horizon,State.prediction_processing_time_safety_margin_seconds) # State.next_prediction_time was assigned the value of State.epoch_start here, but this re-initializes targeted prediction times after each start_forecasting message, which is not desired necessarily application_state.prediction_horizon = int(body["prediction_horizon"])
print_with_time("A start_forecasting message has been received, epoch start and prediction horizon are "+str(State.epoch_start)+", and "+str(prediction_horizon)+ " seconds respectively") application_state.next_prediction_time = update_prediction_time(application_state.epoch_start,application_state.prediction_horizon,EsPredictorState.prediction_processing_time_safety_margin_seconds) # State.next_prediction_time was assigned the value of State.epoch_start here, but this re-initializes targeted prediction times after each start_forecasting message, which is not desired necessarily
print_with_time("A start_forecasting message has been received, epoch start and prediction horizon are "+str(application_state.epoch_start)+", and "+str(application_state.prediction_horizon)+ " seconds respectively")
except Exception as e: except Exception as e:
print_with_time("Problem while retrieving epoch start and/or prediction_horizon") print_with_time("Problem while retrieving epoch start and/or prediction_horizon")
print(traceback.format_exc())
return return
with open(State.configuration_file_location, "r+b") as f: with open(EsPredictorState.configuration_file_location, "r+b") as f:
State.configuration_details.load(f, "utf-8") EsPredictorState.configuration_details.load(f, "utf-8")
# Do stuff with the p object... # Do stuff with the p object...
initial_seconds_aggregation_value, metadata = State.configuration_details["number_of_seconds_to_aggregate_on"] initial_seconds_aggregation_value, metadata = EsPredictorState.configuration_details["number_of_seconds_to_aggregate_on"]
initial_seconds_aggregation_value = int(initial_seconds_aggregation_value) initial_seconds_aggregation_value = int(initial_seconds_aggregation_value)
if (prediction_horizon<initial_seconds_aggregation_value): if (application_state.prediction_horizon<initial_seconds_aggregation_value):
print_with_time("Changing number_of_seconds_to_aggregate_on to "+str(prediction_horizon)+" from its initial value "+str(initial_seconds_aggregation_value)) print_with_time("Changing number_of_seconds_to_aggregate_on to "+str(application_state.prediction_horizon)+" from its initial value "+str(initial_seconds_aggregation_value))
State.configuration_details["number_of_seconds_to_aggregate_on"] = str(prediction_horizon) EsPredictorState.configuration_details["number_of_seconds_to_aggregate_on"] = str(application_state.prediction_horizon)
f.seek(0) f.seek(0)
f.truncate(0) f.truncate(0)
State.configuration_details.store(f, encoding="utf-8") EsPredictorState.configuration_details.store(f, encoding="utf-8")
maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming X seconds processing time to derive a first prediction maximum_time_required_for_prediction = EsPredictorState.prediction_processing_time_safety_margin_seconds #initialization, assuming X seconds processing time to derive a first prediction
if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())): if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())):
self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[prediction_horizon,maximum_time_required_for_prediction]) self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[application_state,maximum_time_required_for_prediction])
self.prediction_thread.start() self.prediction_thread.start()
#waitfor(first period) #waitfor(first period)
elif address == 'stop_forecasting.exponentialsmoothing': elif address == 'stop_forecasting.exponentialsmoothing':
#waitfor(first period) #waitfor(first period)
application_name = body["name"]
application_state = EsPredictorState.individual_application_state[application_name]
print_with_time("Received message to stop predicting some of the metrics") print_with_time("Received message to stop predicting some of the metrics")
metrics_to_remove = json.loads(body)["metrics"] metrics_to_remove = json.loads(body)["metrics"]
for metric in metrics_to_remove: for metric in metrics_to_remove:
if (State.metrics_to_predict.__contains__(metric)): if (application_state.metrics_to_predict.__contains__(metric)):
print_with_time("Stopping generating predictions for metric "+metric) print_with_time("Stopping generating predictions for metric "+metric)
State.metrics_to_predict.remove(metric) application_state.metrics_to_predict.remove(metric)
if len(State.metrics_to_predict)==0: if len(application_state.metrics_to_predict)==0:
Bootstrap.start_forecasting = False EsPredictorState.individual_application_state[application_name].start_forecasting = False
self.prediction_thread.join() self.prediction_thread.join()
else: else:
@ -354,16 +455,15 @@ class Bootstrap(connector.ConnectorHandler):
def get_dataset_file(attribute): def get_dataset_file(attribute):
pass pass
def main():
if __name__ == "__main__": EsPredictorState.configuration_file_location = sys.argv[1]
os.chdir("exponential-smoothing-predictor/src/r_predictors")
State.configuration_file_location = sys.argv[1]
Utilities.load_configuration() Utilities.load_configuration()
Utilities.update_influxdb_organization_id()
# Subscribe to retrieve the metrics which should be used # Subscribe to retrieve the metrics which should be used
id = "exponentialsmoothing" id = "exponentialsmoothing"
State.disconnected = True EsPredictorState.disconnected = True
#while(True): #while(True):
# State.connection = messaging.morphemic.Connection('admin', 'admin') # State.connection = messaging.morphemic.Connection('admin', 'admin')
@ -378,39 +478,42 @@ if __name__ == "__main__":
current_consumers = [] current_consumers = []
for topic in topics_to_subscribe: for topic in topics_to_subscribe:
current_consumer = core.consumer.Consumer('monitoring_'+topic, topic, topic=True,fqdn=True) current_consumer = core.consumer.Consumer(key='monitoring_'+topic,address=topic,handler=ConsumerHandler(), topic=True,fqdn=True)
State.broker_consumers.append(current_consumer) EsPredictorState.broker_consumers.append(current_consumer)
current_consumers.append(current_consumer) current_consumers.append(current_consumer)
State.subscribing_connector = connector.EXN('slovid', handler=Bootstrap(), EsPredictorState.subscribing_connector = connector.EXN(EsPredictorState.forecaster_name, handler=BootStrap(),
#consumers=list(State.broker_consumers), #consumers=list(State.broker_consumers),
consumers=State.broker_consumers, consumers=EsPredictorState.broker_consumers,
url="localhost", url=EsPredictorState.broker_address,
port="5672", port=EsPredictorState.broker_port,
username="admin", username=EsPredictorState.broker_username,
password="admin" password=EsPredictorState.broker_password
) )
#connector.start() #connector.start()
thread = threading.Thread(target=State.subscribing_connector.start,args=()) thread = threading.Thread(target=EsPredictorState.subscribing_connector.start, args=())
thread.start() thread.start()
State.disconnected = False; EsPredictorState.disconnected = False;
print_with_time("Checking (EMS) broker connectivity state, possibly ready to start") print_with_time("Checking (EMS) broker connectivity state, possibly ready to start")
if (State.disconnected or State.check_stale_connection()): if (EsPredictorState.disconnected or EsPredictorState.check_stale_connection()):
try: try:
#State.connection.disconnect() #required to avoid the already connected exception #State.connection.disconnect() #required to avoid the already connected exception
#State.connection.connect() #State.connection.connect()
State.disconnected = True EsPredictorState.disconnected = True
print_with_time("Possible problem in the connection") print_with_time("Possible problem in the connection")
except Exception as e: except Exception as e:
print_with_time("Encountered exception while trying to connect to broker") print_with_time("Encountered exception while trying to connect to broker")
print(traceback.format_exc()) print(traceback.format_exc())
State.disconnected = True EsPredictorState.disconnected = True
time.sleep(5) time.sleep(5)
continue continue
State.disconnection_handler.acquire() EsPredictorState.disconnection_handler.acquire()
State.disconnection_handler.wait() EsPredictorState.disconnection_handler.wait()
State.disconnection_handler.release() EsPredictorState.disconnection_handler.release()
#State.connector.stop() #State.connector.stop()
if __name__ == "__main__":
main()

View File

@ -1 +0,0 @@
from . import connector

View File

@ -1,155 +0,0 @@
import logging
import os
from dotenv import load_dotenv
from proton.handlers import MessagingHandler
from proton.reactor import Container
from .core import context as core_context, state_publisher, schedule_publisher
from .settings import base
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
_logger = logging.getLogger(__name__)
class ConnectorHandler:
def __init__(self):
self.initialized = False
def set_ready(self,ready, ctx:core_context.Context):
self.initialized = ready
self.ready(ctx)
def ready(self, ctx:core_context.Context):
pass
def on_message(self, key, address, body, context, **kwargs):
pass
class CoreHandler(MessagingHandler):
def __init__(self,
context,
handler: ConnectorHandler,
publishers = [],
consumers = [],
):
super(CoreHandler, self).__init__()
self.context=context
self.publishers=publishers
self.consumers=consumers
self.handler = handler
self.conn = None
def on_start(self, event) -> None:
self.conn = event.container.connect(self.context.connection)
for publisher in self.publishers:
_logger.info(f"{publisher.address} registering sender")
address = self.context.build_address_from_link(publisher)
publisher.set(event.container.create_sender(self.conn,address))
self.context.register_publisher(publisher)
_logger.debug(f"{self.context.base} Registering timer { hasattr(publisher, 'delay')}")
if hasattr(publisher, "delay"):
_logger.debug(f"{self.context.base} Registering timer")
event.reactor.schedule(publisher.delay, self)
for consumer in self.consumers:
address = self.context.build_address_from_link(consumer)
_logger.info(f"{self.context.base} Registering consumer {address}")
consumer.set(event.container.create_receiver(self.conn, address))
self.context.register_consumers(consumer)
def on_sendable(self, event):
if not self.handler.initialized:
self.handler.set_ready(True, self.context)
def on_timer_task(self, event):
_logger.debug(f"{self.context.base} On timer")
for publisher in self._delay_publishers():
publisher.send()
event.reactor.schedule(publisher.delay, self)
def on_message(self, event):
try:
for consumer in self.consumers:
if consumer.should_handle(event):
_logger.debug(f"Received message: {event.message.address}")
self.handler.on_message(consumer.key, event.message.address, event.message.body, self.context, event=event)
except Exception as e:
_logger.error(f"Received message: {e}")
def close(self):
if self.conn:
self.conn.close()
else:
_logger.warning(f"{self.context.base} No open connection")
def _delay_publishers(self):
return [p for p in self.publishers if hasattr(p,'delay')]
class EXN:
def __init__(self, component=None,
handler:ConnectorHandler = None,
publishers=[],
consumers=[],
**kwargs):
# Load .env file
load_dotenv()
# Validate and set connector
if not component:
_logger.error("Component cannot be empty or None")
raise ValueError("Component cannot be empty or None")
self.component = component
self.handler = handler
self.url = kwargs.get('url',os.getenv('NEBULOUS_BROKER_URL'))
self.port = kwargs.get('port', os.getenv('NEBULOUS_BROKER_PORT'))
self.username = kwargs.get('username',os.getenv('NEBULOUS_BROKER_USERNAME'))
self.password = kwargs.get('password', os.getenv('NEBULOUS_BROKER_PASSWORD'))
# Validate attributes
if not self.url:
_logger.error("URL cannot be empty or None")
raise ValueError("URL cannot be empty or None")
if not self.port:
_logger.error("PORT cannot be empty or None")
raise ValueError("PORT cannot be empty or None")
if not self.username:
_logger.error("USERNAME cannot be empty or None")
raise ValueError("USERNAME cannot be empty or None")
if not self.password:
_logger.error("PASSWORD cannot be empty or None")
raise ValueError("PASSWORD cannot be empty or None")
ctx = core_context.Context(
connection=f"{self.url}:{self.port}",
base=f"{base.NEBULOUS_BASE_NAME}.{self.component}",
)
if kwargs.get("enable_state",False):
publishers.append(state_publisher.Publisher())
if kwargs.get("enable_health",False):
publishers.append(schedule_publisher.Publisher(
base.NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT,
'health',
'health',
True))
core_handler = CoreHandler(
ctx,
handler,
publishers,
consumers
)
self.container = Container(core_handler)
def start(self):
self.container.run()

View File

@ -1,17 +0,0 @@
import datetime
from proton import Message, Event
from . import link
import logging
_logger = logging.getLogger(__name__)
class Consumer(link.Link):
def on_message(self, body, **kwargs):
_logger.debug(f"{self.address} Got {body} ")
def should_handle(self, event: Event):
if event.link == self._link:
return True

View File

@ -1,63 +0,0 @@
from . import link
class Context:
def __init__(self, connection, base):
self.connection = connection
self.base = base
self.publishers = {}
self.consumers = {}
def get_publisher(self, key):
if key in self.publishers:
return self.publishers[key]
return None
def has_publisher(self, key):
return key in self.publishers
def has_consumer(self, key):
return key in self.consumers
def register_publisher(self, publisher):
self.publishers[publisher.key] = publisher
def register_consumers(self, consumer):
self.consumers[consumer.key] = consumer
def build_address_from_link(self, link: link.Link):
if link.fqdn:
address = link.address
if link.topic and not link.address.startswith("topic://"):
address = f"topic://{address}"
return address
address = f"{self.base}.{link.address}"
if link.topic:
address = f"topic://{address}"
return address
def match_address(self, l: link.Link, event):
if not event \
or not event.message \
or not event.message.address:
return False
address = self.build_address_from_link(l)
return address == event.message.address
def build_address(self, *actions, topic=False):
if len(actions) <= 0:
return self.base
address = f"{self.base}.{'.'.join(actions)}"
if topic:
address = f"topic://{address}"
return address

View File

@ -1,14 +0,0 @@
import logging
from . import publisher
_logger = logging.getLogger(__name__)
class Publisher(publisher.Publisher):
send_next = False
delay = 15
def __init__(self, delay, key, address, topic=False):
super(Publisher, self).__init__(key, address, topic)
self.delay = delay

View File

@ -1,2 +0,0 @@
NEBULOUS_BASE_NAME="eu.nebulous"
NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT=15

View File

@ -0,0 +1,117 @@
import logging
import time
import traceback
import requests
import json
from runtime.operational_status.EsPredictorState import EsPredictorState
from runtime.utilities.InfluxDBConnector import InfluxDBConnector
from runtime.utilities.Utilities import Utilities
from dateutil import parser
class ApplicationState:
#Forecaster variables
def get_prediction_data_filename(self,configuration_file_location,metric_name):
from jproperties import Properties
p = Properties()
with open(configuration_file_location, "rb") as f:
p.load(f, "utf-8")
path_to_datasets, metadata = p["path_to_datasets"]
#application_name, metadata = p["application_name"]
path_to_datasets = Utilities.fix_path_ending(path_to_datasets)
return "" + str(path_to_datasets) + str(self.application_name) + "_"+metric_name+ ".csv"
def __init__(self,application_name, message_version):
self.message_version = message_version
self.application_name = application_name
self.influxdb_bucket = EsPredictorState.application_name_prefix+application_name+"_bucket"
token = EsPredictorState.influxdb_token
list_bucket_url = 'http://' + EsPredictorState.influxdb_hostname + ':8086/api/v2/buckets?name='+self.influxdb_bucket
create_bucket_url = 'http://' + EsPredictorState.influxdb_hostname + ':8086/api/v2/buckets'
headers = {
'Authorization': 'Token {}'.format(token),
'Content-Type': 'application/json',
'Accept': 'application/json'
}
data = {
'name': self.influxdb_bucket,
'orgID': EsPredictorState.influxdb_organization_id,
'retentionRules': [
{
'type': 'expire',
'everySeconds': 2592000 #30 days (30*24*3600)
}
]
}
response = requests.get(list_bucket_url, headers=headers)
logging.info("The response for listing a possibly existing bucket is "+str(response.status_code)+" for application "+application_name)
if ((response.status_code==200) and ("buckets" in response.json()) and (len(response.json()["buckets"])>0)):
logging.info("The bucket already existed for the particular application, skipping its creation...")
else:
logging.info("The response in the request to list a bucket is "+str(response.json()))
logging.info("The bucket did not exist for the particular application, creation in process...")
response = requests.post(create_bucket_url, headers=headers, data=json.dumps(data))
logging.info("The response for creating a new bucket is "+str(response.status_code))
self.start_forecasting = False # Whether the component should start (or keep on) forecasting
self.prediction_data_filename = application_name+".csv"
self.dataset_file_name = "exponential_smoothing_dataset_"+application_name+".csv"
self.metrics_to_predict = []
self.epoch_start = 0
self.next_prediction_time = 0
self.prediction_horizon = 120
self.previous_prediction = None
self.initial_metric_list_received = False
self.lower_bound_value = {}
self.upper_bound_value = {}
def update_monitoring_data(self):
#query(metrics_to_predict,number_of_days_for_which_data_was_retrieved)
#save_new_file()
Utilities.print_with_time("Starting dataset creation process...")
try:
"""
Deprecated functionality to retrieve dataset creation details. Relevant functionality moved inside the load configuration method
influxdb_hostname = os.environ.get("INFLUXDB_HOSTNAME","localhost")
influxdb_port = int(os.environ.get("INFLUXDB_PORT","8086"))
influxdb_username = os.environ.get("INFLUXDB_USERNAME","morphemic")
influxdb_password = os.environ.get("INFLUXDB_PASSWORD","password")
influxdb_dbname = os.environ.get("INFLUXDB_DBNAME","morphemic")
influxdb_org = os.environ.get("INFLUXDB_ORG","morphemic")
application_name = "default_application"
"""
for metric_name in self.metrics_to_predict:
time_interval_to_get_data_for = str(EsPredictorState.number_of_days_to_use_data_from) + "d"
print_data_from_db = True
query_string = 'from(bucket: "'+self.influxdb_bucket+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")'
influx_connector = InfluxDBConnector()
print("performing query")
current_time = time.time()
result = influx_connector.client.query_api().query(query_string, EsPredictorState.influxdb_organization)
elapsed_time = time.time()-current_time
print("performed query, it took "+str(elapsed_time) + " seconds")
#print(result.to_values())
with open(self.get_prediction_data_filename(EsPredictorState.configuration_file_location, metric_name), 'w') as file:
for table in result:
#print header row
file.write("Timestamp,ems_time,"+metric_name+"\r\n")
for record in table.records:
dt = parser.isoparse(str(record.get_time()))
epoch_time = int(dt.timestamp())
metric_value = record.get_value()
if(print_data_from_db):
file.write(str(epoch_time)+","+str(epoch_time)+","+str(metric_value)+"\r\n")
# Write the string data to the file
except Exception as e:
Utilities.print_with_time("Could not create new dataset as an exception was thrown")
print(traceback.format_exc())

View File

@ -4,43 +4,46 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this # License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/. # file, You can obtain one at https://mozilla.org/MPL/2.0/.
import threading import threading, logging
from influxdb_client import InfluxDBClient
from jproperties import Properties from jproperties import Properties
class State: class EsPredictorState:
"""
The name of the predictor
"""
forecaster_name = "exponentialsmoothing"
"""
A dictionary containing statistics on the application state of individual applications
"""
individual_application_state = {}
""" """
Fail-safe default values introduced below Fail-safe default values introduced below
""" """
application_name_prefix = "nebulous_"
prediction_data_filename = "default_application.csv" GENERAL_TOPIC_PREFIX = "eu.nebulouscloud."
MONITORING_DATA_PREFIX = "monitoring" MONITORING_DATA_PREFIX = "monitoring."
FORECASTING_CONTROL_PREFIX = "forecasting" FORECASTING_CONTROL_PREFIX = "forecasting."
#Used to create the dataset from the InfluxDB #Used to create the dataset from the InfluxDB
application_name = "default_application" influxdb_organization = "my-org"
influxdb_bucket = "nebulous" influxdb_organization_id = "e0033247dcca0c54"
influxdb_organization = "nebulous" influxdb_token = "my-super-secret-auth-token"
influxdb_token = "tzIfpbU9b77quyvN0yHIbWltSh1c1371-o9nl_wJYaeo5TWdk5txyxXhp2iaLVMvOvf020HnEEAkE0yy5AllKQ==" influxdb_password = "my-password"
influxdb_dbname = "nebulous" influxdb_username = "my-user"
influxdb_password = "adminadmin"
influxdb_username = "admin"
influxdb_port = 8086 influxdb_port = 8086
influxdb_hostname = "localhost" influxdb_hostname = "localhost"
path_to_datasets = "./datasets" path_to_datasets = "./datasets"
dataset_file_name = "exponential_smoothing_dataset.csv"
number_of_days_to_use_data_from = 365 number_of_days_to_use_data_from = 365
#Forecaster variables
metrics_to_predict = [] configuration_file_location="exponential-smoothing-predictor/prediction_configuration.properties"
epoch_start = 0
next_prediction_time = 0
previous_prediction = None
configuration_file_location="prediction_configuration.properties"
configuration_details = Properties() configuration_details = Properties()
prediction_processing_time_safety_margin_seconds = 20 prediction_processing_time_safety_margin_seconds = 20
disconnected = True disconnected = True
disconnection_handler = threading.Condition() disconnection_handler = threading.Condition()
initial_metric_list_received = False
testing_prediction_functionality = False testing_prediction_functionality = False
total_time_intervals_to_predict = 8 total_time_intervals_to_predict = 8
@ -59,4 +62,6 @@ class State:
@staticmethod @staticmethod
#TODO inspect State.connection #TODO inspect State.connection
def check_stale_connection(): def check_stale_connection():
return (not State.subscribing_connector) return (not EsPredictorState.subscribing_connector)

View File

@ -2,7 +2,7 @@ from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime from datetime import datetime
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS
from runtime.operational_status.State import State from runtime.operational_status.EsPredictorState import EsPredictorState
#import influxdb_client, os, time #import influxdb_client, os, time
#from influxdb_client import InfluxDBClient, Point, WritePrecision #from influxdb_client import InfluxDBClient, Point, WritePrecision
@ -28,32 +28,32 @@ from runtime.operational_status.State import State
# time.sleep(1) # separate points by 1 second # time.sleep(1) # separate points by 1 second
data = [ #data = [
{ # {
"measurement": "temperature", # "measurement": "temperature",
"tags": {"location": "Prague"}, # "tags": {"location": "Prague"},
"fields": {"temperature": 25.3} # "fields": {"temperature": 25.3}
} # }
] #]
class InfluxDBConnector: class InfluxDBConnector:
client = InfluxDBClient(url="http://"+State.influxdb_hostname+":"+str(State.influxdb_port), token=State.influxdb_token, org=State.influxdb_organization) client = InfluxDBClient(url="http://" + EsPredictorState.influxdb_hostname + ":" + str(EsPredictorState.influxdb_port), token=EsPredictorState.influxdb_token, org=EsPredictorState.influxdb_organization)
write_api = client.write_api(write_options=SYNCHRONOUS) write_api = client.write_api(write_options=SYNCHRONOUS)
def InfluxDBConnector(self): def InfluxDBConnector(self):
pass pass
def write_data(self,data): def write_data(self,data,bucket):
self.write_api.write(bucket=State.influxdb_bucket, org=State.influxdb_organization, record=data, write_precision=WritePrecision.S) self.write_api.write(bucket=bucket, org=EsPredictorState.influxdb_organization, record=data, write_precision=WritePrecision.S)
def get_data(self): def get_data(self):
query_api = self.client.query_api() query_api = self.client.query_api()
query = """from(bucket: "nebulous") query = """from(bucket: "nebulous")
|> range(start: -1m) |> range(start: -1m)
|> filter(fn: (r) => r._measurement == "temperature")""" |> filter(fn: (r) => r._measurement == "temperature")"""
tables = query_api.query(query, org=State.influxdb_organization) tables = query_api.query(query, org=EsPredictorState.influxdb_organization)
for table in tables: for table in tables:
for record in table.records: for record in table.records:

View File

@ -1,11 +1,11 @@
from runtime.exn import core from exn import core
class PredictionPublisher(core.publisher.Publisher): class PredictionPublisher(core.publisher.Publisher):
metric_name = "" metric_name = ""
def __init__(self,metric_name): def __init__(self,application_name,metric_name):
super().__init__('publisher_'+metric_name, 'eu.nebulouscloud.preliminary_predicted.'+metric_name, True,True) super().__init__('publisher_'+application_name+'-'+metric_name, 'eu.nebulouscloud.preliminary_predicted.'+metric_name, True,True)
self.metric_name = metric_name self.metric_name = metric_name
def send(self, body={}): def send(self, body={}, application=""):
super(PredictionPublisher, self).send(body) super(PredictionPublisher, self).send(body)

View File

@ -7,17 +7,14 @@
import pathlib import pathlib
#from morphemic.dataset import DatasetMaker #from morphemic.dataset import DatasetMaker
import datetime import datetime
import time,os import logging,os
from dateutil import parser from dateutil import parser
from influxdb_client import InfluxDBClient
from runtime.operational_status.State import State from runtime.operational_status.EsPredictorState import EsPredictorState
from runtime.utilities.InfluxDBConnector import InfluxDBConnector from runtime.utilities.InfluxDBConnector import InfluxDBConnector
class DatasetMaker:
pass
class Utilities: class Utilities:
@staticmethod @staticmethod
@ -27,85 +24,39 @@ class Utilities:
@staticmethod @staticmethod
def load_configuration(): def load_configuration():
with open(State.configuration_file_location,'rb') as config_file: with open(EsPredictorState.configuration_file_location, 'rb') as config_file:
State.configuration_details.load(config_file) EsPredictorState.configuration_details.load(config_file)
#prediction_horizon = configuration_details.get("prediction_horizon") #prediction_horizon = configuration_details.get("prediction_horizon")
State.dataset_file_name = State.configuration_details.get("input_data_file").data EsPredictorState.number_of_days_to_use_data_from = int(EsPredictorState.configuration_details.get("number_of_days_to_use_data_from").data)
State.number_of_days_to_use_data_from = int(State.configuration_details.get("number_of_days_to_use_data_from").data) EsPredictorState.prediction_processing_time_safety_margin_seconds = int(EsPredictorState.configuration_details.get("prediction_processing_time_safety_margin_seconds").data)
State.prediction_processing_time_safety_margin_seconds = int(State.configuration_details.get("prediction_processing_time_safety_margin_seconds").data) EsPredictorState.testing_prediction_functionality = EsPredictorState.configuration_details.get("testing_prediction_functionality").data.lower() == "true"
State.testing_prediction_functionality = State.configuration_details.get("testing_prediction_functionality").data.lower() == "true" EsPredictorState.path_to_datasets = EsPredictorState.configuration_details.get("path_to_datasets").data
State.path_to_datasets = State.configuration_details.get("path_to_datasets").data EsPredictorState.broker_address = EsPredictorState.configuration_details.get("broker_address").data
State.broker_address = State.configuration_details.get("broker_address").data EsPredictorState.broker_port = int(EsPredictorState.configuration_details.get("broker_port").data)
State.broker_port = int(State.configuration_details.get("broker_port").data) EsPredictorState.broker_username = EsPredictorState.configuration_details.get("broker_username").data
State.broker_username = State.configuration_details.get("broker_username").data EsPredictorState.broker_password = EsPredictorState.configuration_details.get("broker_password").data
State.broker_password = State.configuration_details.get("broker_password").data
State.influxdb_hostname = State.configuration_details.get("INFLUXDB_HOSTNAME").data EsPredictorState.influxdb_hostname = EsPredictorState.configuration_details.get("INFLUXDB_HOSTNAME").data
State.influxdb_port = int(State.configuration_details.get("INFLUXDB_PORT").data) EsPredictorState.influxdb_port = int(EsPredictorState.configuration_details.get("INFLUXDB_PORT").data)
State.influxdb_username = State.configuration_details.get("INFLUXDB_USERNAME").data EsPredictorState.influxdb_username = EsPredictorState.configuration_details.get("INFLUXDB_USERNAME").data
State.influxdb_password = State.configuration_details.get("INFLUXDB_PASSWORD").data EsPredictorState.influxdb_password = EsPredictorState.configuration_details.get("INFLUXDB_PASSWORD").data
State.influxdb_dbname = State.configuration_details.get("INFLUXDB_DBNAME").data EsPredictorState.influxdb_org = EsPredictorState.configuration_details.get("INFLUXDB_ORG").data
State.influxdb_org = State.configuration_details.get("INFLUXDB_ORG").data
State.application_name = State.configuration_details.get("APP_NAME").data
#This method accesses influx db to retrieve the most recent metric values. #This method accesses influx db to retrieve the most recent metric values.
@staticmethod
def update_monitoring_data():
#query(metrics_to_predict,number_of_days_for_which_data_was_retrieved)
#save_new_file()
Utilities.print_with_time("Starting dataset creation process...")
try:
"""
Deprecated functionality to retrieve dataset creation details. Relevant functionality moved inside the load configuration method
influxdb_hostname = os.environ.get("INFLUXDB_HOSTNAME","localhost")
influxdb_port = int(os.environ.get("INFLUXDB_PORT","8086"))
influxdb_username = os.environ.get("INFLUXDB_USERNAME","morphemic")
influxdb_password = os.environ.get("INFLUXDB_PASSWORD","password")
influxdb_dbname = os.environ.get("INFLUXDB_DBNAME","morphemic")
influxdb_org = os.environ.get("INFLUXDB_ORG","morphemic")
application_name = "default_application"
"""
metric_names = ["cpu_usage","ram_usage"]
for metric_name in State.metrics_to_predict:
time_interval_to_get_data_for = str(State.number_of_days_to_use_data_from)+"d"
print_data_from_db = True
query_string = 'from(bucket: "'+State.influxdb_bucket+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")'
influx_connector = InfluxDBConnector()
print("performing query")
current_time = time.time()
result = influx_connector.client.query_api().query(query_string,State.influxdb_organization)
elapsed_time = time.time()-current_time
print("performed query, it took "+str(elapsed_time) + " seconds")
#print(result.to_values())
with open(Utilities.get_prediction_data_filename(State.configuration_file_location,metric_name), 'w') as file:
for table in result:
#print header row
file.write("Timestamp,ems_time,"+metric_name+"\r\n")
for record in table.records:
dt = parser.isoparse(str(record.get_time()))
epoch_time = int(dt.timestamp())
metric_value = record.get_value()
if(print_data_from_db):
file.write(str(epoch_time)+","+str(epoch_time)+","+str(metric_value)+"\r\n")
# Write the string data to the file
except Exception as e:
Utilities.print_with_time("Could not create new dataset as an exception was thrown")
print(e)
@staticmethod @staticmethod
def get_prediction_data_filename(configuration_file_location,metric_name): def update_influxdb_organization_id():
from jproperties import Properties client = InfluxDBClient(url="http://" + EsPredictorState.influxdb_hostname + ":" + str(EsPredictorState.influxdb_port), token=EsPredictorState.influxdb_token)
p = Properties() org_api = client.organizations_api()
with open(configuration_file_location, "rb") as f: # List all organizations
p.load(f, "utf-8") organizations = org_api.find_organizations()
path_to_datasets, metadata = p["path_to_datasets"]
application_name, metadata = p["application_name"]
path_to_datasets = Utilities.fix_path_ending(path_to_datasets)
return "" + str(path_to_datasets) + str(application_name) + "_"+metric_name+ ".csv"
# Find the organization by name and print its ID
for org in organizations:
if org.name == EsPredictorState.influxdb_organization:
logging.info(f"Organization Name: {org.name}, ID: {org.id}")
EsPredictorState.influxdb_organization_id = org.id
break
@staticmethod @staticmethod
def fix_path_ending(path): def fix_path_ending(path):
if (path[-1] is os.sep): if (path[-1] is os.sep):

View File

@ -18,7 +18,7 @@ setup(
author_email="atsagkaropoulos@mail.ntua.gr", author_email="atsagkaropoulos@mail.ntua.gr",
# Packages # Packages
packages=["r_predictors","runtime","runtime.exn","runtime.operational_status","runtime.utilities","runtime.predictions"], packages=["r_predictors","runtime","exn","exn.core","exn.handler","exn.settings","runtime.operational_status","runtime.utilities","runtime.predictions"],
# Include additional files into the package # Include additional files into the package
include_package_data=True, include_package_data=True,
@ -37,4 +37,10 @@ setup(
"python-slugify", "python-slugify",
"jproperties" "jproperties"
], ],
#package_dir={'': '.'},
entry_points={
'console_scripts': [
'start_exsmoothing = runtime.Predictor:main',
],
}
) )

View File

@ -14,9 +14,9 @@
container_images: container_images:
- context: exponential-smoothing-predictor - context: exponential-smoothing-predictor
registry: quay.io registry: quay.io
repository: quay.io/nebulous/exponential-smoothing-predictor-exponential-smoothing-predictor repository: quay.io/nebulous/exponential-smoothing-predictor
namespace: nebulous namespace: nebulous
repo_shortname: exponential-smoothing-predictor-exponential-smoothing-predictor repo_shortname: exponential-smoothing-predictor
repo_description: "" repo_description: ""
- job: - job: