Adds fluentd-plugin-udp_msgpack for 1st time.

fluentd-plugin-udp_msgpack initial version.

Change-Id: I0fcf0eac5722732889428089bae1c01e358b8561
Implements: add_fluentd-plugin-udp_msgpack
This commit is contained in:
t.goto
2014-08-19 18:45:05 +09:00
parent 34c00218a5
commit ded4c3eda3
5 changed files with 136 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
source 'https://rubygems.org'
# Specify your gem's dependencies in fluent-plugin-udp_msgpack.gemspec
gemspec

View File

@@ -0,0 +1,41 @@
# Msgpack decoder from udp message for Fluentd
This plugin receives messages from udp and decode it with Msgpack.
I'm using it to decode OpenStack Ceilometer metering information and pass it to elasticsearch.
## Installation
Put ruby file to plugin directory
Gemfile will be made pretty soon, so you can install it with `gem`
cp fluent-plugin-udp_msgpack.rb /etc/td-agent/plugin/
## Configuration
* `port`
* `bind`
* `body_size_limit`
* `tag`
## Example
<source>
type udpmsgpack
port 10000
bind 0.0.0.0
body_size_limit 1m
tag ceilometer_meter
</source>
## Contributing
1. Fork it
2. Create your feature branch (`git checkout -b my-new-feature`)
3. Commit your changes (`git commit -am 'Add some feature'`)
4. Push to the branch (`git push origin my-new-feature`)
5. Create new Pull Request

View File

@@ -0,0 +1,18 @@
#!/usr/lib64/fluent/ruby/bin/rake
require "bundler/gem_tasks"
require "rake/testtask"
Rake::TestTask.new(:test) do |test|
test.libs << 'lib' << 'test'
test.pattern = 'test/**/test_*.rb'
test.verbose = true
end
task :default => :test

View File

@@ -0,0 +1,20 @@
# -*- encoding: utf-8 -*-
lib = File.expand_path('../lib', __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
Gem::Specification.new do |gem|
gem.name = "fluent-plugin-udp_msgpack"
gem.version = "0.0.1"
gem.authors = ["Tomoya Goto"]
gem.email = ["tomoya.goto@ctc-g.co.jp"]
gem.description = %q{Receive message from udp and decode with msgpack}
gem.summary = %q{Receive message from udp and decode with msgpack. I'm using it to accept openstack-ceilometer metering.}
gem.homepage = "https://github.com/stackforge/rack/tree/master/tools"
gem.files = `git ls-files`.split($/)
gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) }
gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
gem.require_paths = ["lib"]
gem.add_development_dependency "fluentd"
gem.add_runtime_dependency "fluentd"
end

View File

@@ -0,0 +1,53 @@
module Fluent
class UDPInput < Fluent::Input
Plugin.register_input('udpmsgpack', self)
include DetachMultiProcessMixin
require 'socket'
require 'msgpack'
def initialize
super
end
config_param :port, :integer, :default => 5160
config_param :body_size_limit, :size, :default => 10240
config_param :tag, :string, :default => "udp_msgpack"
config_param :bind, :string, :default => '0.0.0.0'
def configure(conf)
super
end
def start
@udp_s = UDPSocket.new
detach_multi_process do
super
@udp_s.bind(@bind, @port)
$log.debug "listening UDP on #{@bind}:#{@port}"
@thread = Thread.new(&method(:run))
end
end
def shutdown
@udp_s.close
@thread.join
end
def run
loop do
text, sender = @udp_s.recvfrom(@body_size_limit)
text = MessagePack::unpack(text)
Engine.emit(@tag, Engine.now, text)
end
rescue
$log.error "unexpected error", :error=>$!.to_s
$log.error_backtrace
end
end
end