Remove cassandra repository

Change-Id: I914176d60bfce91fbe449702f7e78bb2f78706ce
This commit is contained in:
Michael James Hoppal 2016-01-28 10:59:07 -07:00
parent 5b1ffc3e2b
commit 22e18453dd
7 changed files with 1 additions and 257 deletions

View File

@ -59,9 +59,3 @@ ip_address = 192.168.10.4
port = 8086
user = mon_persister
password = password
# Uncomment, set cluster_ip_addresses, and change the repositories to point to the cassandra classes
#[cassandra]
# Comma separated list of Cassandra node IP addresses. No spaces.
#cluster_ip_addresses: 10.10.10.3
#keyspace: monasca

View File

@ -16,7 +16,7 @@
"""Persister Module
The Persister reads metrics and alarms from Kafka and then stores them
in into either Influxdb or Cassandra
in into Influxdb
Start the perister as stand-alone process by running 'persister.py
--config-file <config file>'

View File

@ -1,22 +0,0 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
cassandra_opts = [cfg.StrOpt('cluster_ip_addresses'),
cfg.StrOpt('keyspace')]
cassandra_group = cfg.OptGroup(name='cassandra')
cfg.CONF.register_group(cassandra_group)
cfg.CONF.register_opts(cassandra_opts, cassandra_group)

View File

@ -1,37 +0,0 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from oslo_config import cfg
import six
from repositories.abstract_repository import AbstractRepository
@six.add_metaclass(abc.ABCMeta)
class AbstractCassandraRepository(AbstractRepository):
def __init__(self):
super(AbstractCassandraRepository, self).__init__()
self.conf = cfg.CONF
self._cassandra_cluster = Cluster(
self.conf.cassandra.cluster_ip_addresses.split(','))
self.cassandra_session = self._cassandra_cluster.connect(
self.conf.cassandra.keyspace)
self._batch_stmt = BatchStatement()

View File

@ -1,70 +0,0 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from cassandra.query import BatchStatement
from oslo_log import log
from repositories.cassandra.abstract_repository import AbstractCassandraRepository
from repositories.utils import parse_alarm_state_hist_message
LOG = log.getLogger(__name__)
class AlarmStateHistCassandraRepository(AbstractCassandraRepository):
def __init__(self):
super(AlarmStateHistCassandraRepository, self).__init__()
self._insert_alarm_state_hist_stmt = self.cassandra_session.prepare(
'insert into alarm_state_history (tenant_id, alarm_id, '
'metrics, new_state, '
'old_state, reason, reason_data, '
'sub_alarms, time_stamp) values (?,?,?,?,?,?,?,?,?)')
def process_message(self, message):
(alarm_id, metrics, new_state, old_state, state_change_reason,
sub_alarms_json_snake_case, tenant_id,
time_stamp) = parse_alarm_state_hist_message(
message)
alarm_state_hist = (
tenant_id.encode('utf8'),
alarm_id.encode('utf8'),
json.dumps(metrics, ensure_ascii=False).encode(
'utf8'),
new_state.encode('utf8'),
old_state.encode('utf8'),
state_change_reason.encode('utf8'),
"{}".encode('utf8'),
sub_alarms_json_snake_case.encode('utf8'),
time_stamp
)
LOG.debug(alarm_state_hist)
return alarm_state_hist
def write_batch(self, alarm_state_hists):
for alarm_state_hist in alarm_state_hists:
self._batch_stmt.add(self._insert_alarm_state_hist_stmt,
alarm_state_hist)
self.cassandra_session.execute(self._batch_stmt)
self._batch_stmt = BatchStatement()

View File

@ -1,120 +0,0 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import json
from cassandra.query import BatchStatement
from oslo_log import log
import urllib
from repositories.cassandra.abstract_repository import AbstractCassandraRepository
from repositories.utils import parse_measurement_message
LOG = log.getLogger(__name__)
class MetricCassandraRepository(AbstractCassandraRepository):
def __init__(self):
super(MetricCassandraRepository, self).__init__()
self._insert_measurement_stmt = self.cassandra_session.prepare(
'insert into measurements (tenant_id,'
'region, metric_hash, time_stamp, value,'
'value_meta) values (?, ?, ?, ?, ?, ?)')
self._insert_metric_map_stmt = self.cassandra_session.prepare(
'insert into metric_map (tenant_id,'
'region, metric_hash, '
'metric_set) values'
'(?,?,?,?)')
def process_message(self, message):
(dimensions, metric_name, region, tenant_id, time_stamp, value,
value_meta) = parse_measurement_message(message)
metric_hash, metric_set = create_metric_hash(metric_name,
dimensions)
measurement = (tenant_id.encode('utf8'),
region.encode('utf8'),
metric_hash,
time_stamp,
value,
json.dumps(value_meta, ensure_ascii=False).encode(
'utf8'))
LOG.debug(measurement)
return MetricMeasurementInfo(
tenant_id.encode('utf8'),
region.encode('utf8'),
metric_hash,
metric_set,
measurement)
def write_batch(self, metric_measurement_infos):
for metric_measurement_info in metric_measurement_infos:
self._batch_stmt.add(self._insert_measurement_stmt,
metric_measurement_info.measurement)
metric_map = (metric_measurement_info.tenant_id,
metric_measurement_info.region,
metric_measurement_info.metric_hash,
metric_measurement_info.metric_set)
self._batch_stmt.add(self._insert_metric_map_stmt,
metric_map)
self.cassandra_session.execute(self._batch_stmt)
self._batch_stmt = BatchStatement()
class MetricMeasurementInfo(object):
def __init__(self, tenant_id, region, metric_hash, metric_set,
measurement):
self.tenant_id = tenant_id
self.region = region
self.metric_hash = metric_hash
self.metric_set = metric_set
self.measurement = measurement
def create_metric_hash(metric_name, dimensions):
metric_name_part = '__name__' + '=' + urllib.quote_plus(metric_name)
hash_string = metric_name_part
metric_set = set()
metric_set.add(metric_name_part)
for dim_name in sorted(dimensions.iterkeys()):
dimension = (urllib.quote_plus(dim_name) + '=' + urllib.quote_plus(
dimensions[dim_name]))
metric_set.add(dimension)
hash_string += dimension
sha1_hash = hashlib.sha1(hash_string).hexdigest()
return bytearray.fromhex(sha1_hash), metric_set

View File

@ -5,7 +5,6 @@ oslo.service
six>=1.9.0
babel
influxdb==2.8.0
cassandra-driver==3.0.0
iso8601
simport
monasca-common