i have flume component listening syslog stream. made custom interceptor modify call, not working. did make wrong? thank you, andrea
interceptor compiled jar ile , in @flume_home/bin directory
interceptor class:
package com.test.flume; import org.apache.flume.context; import org.apache.flume.event; import org.apache.flume.conf.configurable; import org.apache.flume.interceptor.interceptor; import java.util.iterator; import java.util.list; import java.util.concurrent.atomic.atomiclong; public class sqlflumeinterceptor implements interceptor { private final string headerkey; private sqlflumeinterceptor(context ctx) { } @override public void initialize() { } @override public event intercept(event event) { addpreposition(event); return event; } private void addpreposition(event event) { system.out.println("event processed"); event.setbody( "modified event".getbytes() ); } @override public list<event> intercept(list<event> events) { (iterator<event> iterator = events.iterator(); iterator.hasnext(); ) { event next = iterator.next(); intercept(next); if(next == null) { iterator.remove(); } } return events; } @override public void close() { } public static class counterinterceptorbuilder implements interceptor.builder { private context ctx; @override public interceptor build() { return new sqlflumeinterceptor(ctx); } @override public void configure(context context) { this.ctx = context; } }
flume.config file
# name components on agent a1.sources = r1 a1.sinks = file-sink a1.channels = c1 # describe/configure source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 41414 a1.sources.r1.host = 192.168.1.2 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.test.flume.sqlflumeinterceptor$counterinterceptorbuilder # describe file_rollsink a1.sinks.file-sink.type = file_roll a1.sinks.file-sink.sink.directory = /opt/apache-flume-1.5.2-bin/logs/pluto.log a1.sinks.file-sink.sink.rollinterval = 0 ai.sinks.file-sink.batchsize = 100 ai.sinks.file-sink.fileheader = true # use channel buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactioncapacity = 100 # bind source , sink channel a1.sources.r1.channels = c1 a1.sinks.file-sink.channel = c1
the system logs event in file without modifying them, pertinent debug log:
2015-04-27 21:39:17,625 (conf-file-poller-0) [debug - org.apache.flume.conf.flumeconfiguration$agentconfiguration.isvalid(flumeconfiguration.java:313)] starting validation of configuration agent: a1, initial-configuration: agentconfiguration[a1] sources: {r1={ parameters:{port=41414, host=192.168.1.2, interceptors=i1, interceptors.i1.type=com.test.flume. sqlflumeinterceptor$counterinterceptorbuilder, channels=c1, type=syslogtcp} }} channels: {c1={ parameters:{transactioncapacity=100, capacity=1000, type=memory} }} sinks: {file-sink={ parameters:{sink.rollinterval=0, type=file_roll, channel=c1, sink.directory=/opt/apache-flume-1.5.2-bin/logs/pluto.log} }}
please place interceptor jar file in @flume_home/lib directory, not in @flume_home/bin.
otherwise flume not load jar.