Add watchdog filter for monitoring Heka

This change adds filter + output plugins that will allow Pacemaker to
check that the Heka process is alive.

At periodic intervals, the watchdog filter emits a message containing
the current timestamp. The output filter catches the message and write
the timestamp value to some file. If Heka is wedged (eg channels are
full) then the file won't be updated anymore. Pacemaker should be able
to detect it and respawn the process.

Change-Id: If2a71c9084e3c8da0d92fea5c295b36e56e0c86f
Implements: blueprint lma-aggregator-in-ha-mode
This commit is contained in:
Simon Pasquier 2015-08-14 14:38:14 +02:00
parent e21bec1b98
commit d5c5103ec0
6 changed files with 128 additions and 0 deletions

View File

@ -0,0 +1,34 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
define heka::output::sandbox (
$config_dir,
$filename,
$message_matcher = 'FALSE',
$config = {},
$ensure = present,
) {
include heka::params
validate_hash($config)
file { "${config_dir}/output-${title}.toml":
ensure => $ensure,
content => template('heka/output/sandbox.toml.erb'),
mode => '0600',
owner => $heka::params::user,
group => $heka::params::user,
}
}

View File

@ -0,0 +1,12 @@
[<%= @title %>_output]
type = "SandboxOutput"
filename = "<%= @filename %>"
message_matcher = "<%= @message_matcher %>"
<% if @config.size() > 0 %>
[<%= @title %>_output.config]
<% @config.each do |k,v| %>
<% next if v.nil? or v == :undef -%>
<%= k %> = <%= v.is_a?(String) ? "'#{v}'" : v %>
<% end %>
<% end %>

View File

@ -0,0 +1,24 @@
-- Copyright 2015 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require 'math'
local payload_name = read_config('payload_name') or error('payload_name is required')
local payload = read_config('payload')
-- Very simple filter that emits a fixed message or the current timestamp (in
-- second) every ticker interval. It can be used to check the liveness of the
-- Heka service.
function timer_event(ns)
inject_payload("txt", payload_name, payload or math.floor(ns / 1e9))
end

View File

@ -0,0 +1,27 @@
-- Copyright 2015 Mirantis, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
require "io"
local path = read_config('path') or error('path required')
local field = read_config('field') or 'Payload'
-- Very simple output sandbox that writes the value of one of the message's
-- fields ('Payload' by default) to a file.
function process_message()
local fh = io.open(path, "w")
io.output(fh)
io.write(read_message(field))
io.close()
return 0
end

View File

@ -188,9 +188,36 @@ class lma_collector (
require => File[$plugins_dir]
}
file { "${plugins_dir}/outputs":
ensure => directory,
source => 'puppet:///modules/lma_collector/plugins/outputs',
recurse => remote,
notify => Class['lma_collector::service'],
require => File[$plugins_dir]
}
if size($lma_collector::params::additional_packages) > 0 {
package { $lma_collector::params::additional_packages:
ensure => present,
}
}
heka::filter::sandbox { 'watchdog':
config_dir => $config_dir,
filename => "${plugins_dir}/filters/watchdog.lua",
message_matcher => 'FALSE',
ticker_interval => $lma_collector::params::watchdog_interval,
config => {
payload_name => $lma_collector::params::watchdog_payload_name
}
}
heka::output::sandbox { 'watchdog':
config_dir => $config_dir,
filename => "${plugins_dir}/outputs/lastfile.lua",
message_matcher => "Fields[payload_name] == '${lma_collector::params::watchdog_payload_name}'",
config => {
path => $lma_collector::params::watchdog_file,
}
}
}

View File

@ -26,6 +26,10 @@ class lma_collector::params {
$aggregator_address = '127.0.0.1'
$aggregator_port = 5565
$watchdog_file = "/tmp/${service_name}.watchdog"
$watchdog_payload_name = "${service_name}.watchdog"
$watchdog_interval = 1
$tags = {}
$syslog_pattern = '<%PRI%>%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg%\n'