Flume configuration to upload files with same name -
i have 10 files data varying in length.i store corresponding data in same file , same filename, flume splitting data , saving flumedata.timestamp. using configuration below:
a1.sources = r1 a1.sinks = k2 a1.channels = c1 a1.channels.c1.type = file a1.channels.c1.checkpointdir = /mnt/flume/checkpoint a1.channels.c1.datadirs = /mnt/flume/data a1.channels.c1.trackerdir = /mnt/flume/track a1.channels.c1.transactioncapacity = 10000000 a1.channels.c1.capacity = 500000000 a1.channels.c1.maxfilesize = 10000000 a1.channels.c1.usedualcheckpoints = true a1.channels.c1.backupcheckpointdir = /mnt/flume/backup a1.channels.c1.checkpointinterval = 2000 a1.channels.c1.minimumrequiredspace = 9000000 a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spooldir = /usr/local/netlog/ a1.sources.r1.fileheader = true a1.sources.r1.buffermaxlinelength = 500 a1.sources.r1.buffermaxlines = 10000 a1.sources.r1.batchsize = 100000 #a1.sources.r1.deletepolicy = immediate a1.sinks.k2.type = hdfs a1.sinks.k2.channel = c1 a1.sinks.k2.hdfs.fileprefix = %{file} a1.sinks.k2.hdfs.filetype = datastream a1.sinks.k2.hdfs.batchsize = 100000 a1.sinks.k2.hdfs.rollsize = 10000000 a1.sinks.k2.hdfs.rollinterval = 0 a1.sinks.k2.hdfs.rollsize = 0 a1.sinks.k2.hdfs.rollcount = 0 a1.sinks.k2.hdfs.idletimeout = 0 a1.sinks.k2.hdfs.writeformat = text a1.sinks.k2.hdfs.path = /user/flume # bind source , sink channel a1.sources.r1.channels = c1 a1.sinks.k2.channel = c1
kindly suggest how can store same 10 files same filename , data within it. file size can vary 2 mb 15 mb.
the error see in logs is
lib/native org.apache.flume.node.application --conf-file conf/flume-conf.properties --name a1 2014-12-03 20:49:47,545 (lifecyclesupervisor-1-0) [info - org.apache.flume.node.pollingpropertiesfileconfigurationprovider.start(pollingpropertiesfileconfigurationprovider.java:61)] configuration provider starting 2014-12-03 20:49:47,550 (lifecyclesupervisor-1-0) [debug - org.apache.flume.node.pollingpropertiesfileconfigurationprovider.start(pollingpropertiesfileconfigurationprovider.java:78)] configuration provider started 2014-12-03 20:49:47,555 (conf-file-poller-0) [debug - org.apache.flume.node.pollingpropertiesfileconfigurationprovider$filewatcherrunnable.run(pollingpropertiesfileconfigurationprovider.java:126)] checking file:conf/flume-conf.properties changes 2014-12-03 20:49:47,555 (conf-file-poller-0) [info - org.apache.flume.node.pollingpropertiesfileconfigurationprovider$filewatcherrunnable.run(pollingpropertiesfileconfigurationprovider.java:133)] reloading configuration file:conf/flume-conf.properties 2014-12-03 20:49:47,571 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1016)] processing:k2 2014-12-03 20:49:47,571 (conf-file-poller-0) [debug - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1020)] created context k2: hdfs.batchsize 2014-12-03 20:49:47,572 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1016)] processing:k2 2014-12-03 20:49:47,572 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1016)] processing:k2 2014-12-03 20:49:47,572 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1016)] processing:k2 2014-12-03 20:49:47,572 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1016)] processing:k2 2014-12-03 20:49:47,572 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1016)] processing:k2 2014-12-03 20:49:47,573 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1016)] processing:k2 2014-12-03 20:49:47,573 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1016)] processing:k2 2014-12-03 20:49:47,573 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:930)] added sinks: k2 agent: a1 2014-12-03 20:49:47,573 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1016)] processing:k2 2014-12-03 20:49:47,573 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1016)] processing:k2 2014-12-03 20:49:47,575 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1016)] processing:k2 2014-12-03 20:49:47,576 (conf-file-poller-0) [debug - org.apache.flume.conf.flumeconfiguration$agentconfiguration.isvalid(flumeconfiguration.java:313)] starting validation of configuration agent: a1, initial-configuration: agentconfiguration[a1] sources: {r1={ parameters:{buffermaxlinelength=500, channels=c1, spooldir=/usr/local/netlog/, buffermaxlines=10000, fileheader=true, batchsize=100000, type=spooldir} }} channels: {c1={ parameters:{trackerdir=/mnt/flume/track, maxfilesize=10000000, datadirs=/mnt/flume/data, type=file, transactioncapacity=10000000, capacity=500000000, checkpointdir=/mnt/flume/checkpoint} }} sinks: {k2={ parameters:{hdfs.batchsize=100000, hdfs.idletimeout=0, hdfs.fileprefix=%{file}, hdfs.path=/user/flume, hdfs.writeformat=text, hdfs.rollsize=0, hdfs.rollcount=0, channel=c1, hdfs.rollinterval=0, hdfs.filetype=datastream, type=hdfs} }} 2014-12-03 20:49:47,583 (conf-file-poller-0) [debug - org.apache.flume.conf.flumeconfiguration$agentconfiguration.validatechannels(flumeconfiguration.java:468)] created channel c1 2014-12-03 20:49:47,593 (conf-file-poller-0) [debug - org.apache.flume.conf.flumeconfiguration$agentconfiguration.validatesinks(flumeconfiguration.java:674)] creating sink: k2 using hdfs 2014-12-03 20:49:47,596 (conf-file-poller-0) [debug - org.apache.flume.conf.flumeconfiguration$agentconfiguration.isvalid(flumeconfiguration.java:371)] post validation configuration a1 agentconfiguration created without configuration stubs basic syntactical validation performed[a1] sources: {r1={ parameters:{buffermaxlinelength=500, channels=c1, spooldir=/usr/local/netlog/, buffermaxlines=10000, fileheader=true, batchsize=100000, type=spooldir} }} channels: {c1={ parameters:{trackerdir=/mnt/flume/track, maxfilesize=10000000, datadirs=/mnt/flume/data, type=file, transactioncapacity=10000000, capacity=500000000, checkpointdir=/mnt/flume/checkpoint} }} sinks: {k2={ parameters:{hdfs.batchsize=100000, hdfs.idletimeout=0, hdfs.fileprefix=%{file}, hdfs.path=/user/flume, hdfs.writeformat=text, hdfs.rollsize=0, hdfs.rollcount=0, channel=c1, hdfs.rollinterval=0, hdfs.filetype=datastream, type=hdfs} }} 2014-12-03 20:49:47,597 (conf-file-poller-0) [debug - org.apache.flume.conf.flumeconfiguration.validateconfiguration(flumeconfiguration.java:135)] channels:c1 2014-12-03 20:49:47,597 (conf-file-poller-0) [debug - org.apache.flume.conf.flumeconfiguration.validateconfiguration(flumeconfiguration.java:136)] sinks k2 2014-12-03 20:49:47,597 (conf-file-poller-0) [debug - org.apache.flume.conf.flumeconfiguration.validateconfiguration(flumeconfiguration.java:137)] sources r1 2014-12-03 20:49:47,597 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration.validateconfiguration(flumeconfiguration.java:140)] post-validation flume configuration contains configuration agents: [a1] 2014-12-03 20:49:47,598 (conf-file-poller-0) [info - org.apache.flume.node.abstractconfigurationprovider.loadchannels(abstractconfigurationprovider.java:150)] creating channels 2014-12-03 20:49:47,629 (conf-file-poller-0) [info - org.apache.flume.channel.defaultchannelfactory.create(defaultchannelfactory.java:40)] creating instance of channel c1 type file 2014-12-03 20:49:47,635 (conf-file-poller-0) [info - org.apache.flume.node.abstractconfigurationprovider.loadchannels(abstractconfigurationprovider.java:205)] created channel c1 2014-12-03 20:49:47,636 (conf-file-poller-0) [info - org.apache.flume.source.defaultsourcefactory.create(defaultsourcefactory.java:39)] creating instance of source r1, type spooldir 2014-12-03 20:49:47,654 (conf-file-poller-0) [info - org.apache.flume.sink.defaultsinkfactory.create(defaultsinkfactory.java:40)] creating instance of sink: k2, type: hdfs 2014-12-03 20:49:48,108 (conf-file-poller-0) [info - org.apache.flume.sink.hdfs.hdfseventsink.authenticate(hdfseventsink.java:555)] hadoop security enabled: false 2014-12-03 20:49:48,111 (conf-file-poller-0) [info - org.apache.flume.node.abstractconfigurationprovider.getconfiguration(abstractconfigurationprovider.java:119)] channel c1 connected [r1, k2] 2014-12-03 20:49:48,125 (conf-file-poller-0) [info - org.apache.flume.node.application.startallcomponents(application.java:138)] starting new configuration:{ sourcerunners:{r1=eventdrivensourcerunner: { source:spool directory source r1: { spooldir: /usr/local/netlog/ } }} sinkrunners:{k2=sinkrunner: { policy:org.apache.flume.sink.defaultsinkprocessor@1f87c88 countergroup:{ name:null counters:{} } }} channels:{c1=filechannel c1 { datadirs: [/mnt/flume/data] }} } 2014-12-03 20:49:48,130 (conf-file-poller-0) [info - org.apache.flume.node.application.startallcomponents(application.java:145)] starting channel c1 2014-12-03 20:49:48,130 (lifecyclesupervisor-1-0) [info - org.apache.flume.channel.file.filechannel.start(filechannel.java:259)] starting filechannel c1 { datadirs: [/mnt/flume/data] }... 2014-12-03 20:49:48,147 (lifecyclesupervisor-1-0) [info - org.apache.flume.channel.file.log.<init>(log.java:328)] encryption not enabled 2014-12-03 20:49:48,149 (lifecyclesupervisor-1-0) [info - org.apache.flume.channel.file.log.replay(log.java:373)] replay started 2014-12-03 20:49:48,150 (lifecyclesupervisor-1-0) [info - org.apache.flume.channel.file.log.replay(log.java:385)] found nextfileid 0, [] 2014-12-03 20:49:48,155 (lifecyclesupervisor-1-0) [error - org.apache.flume.channel.file.log.replay(log.java:481)] failed initialize log on [channel=c1] java.io.eofexception @ java.io.randomaccessfile.readint(randomaccessfile.java:786) @ java.io.randomaccessfile.readlong(randomaccessfile.java:819) @ org.apache.flume.channel.file.eventqueuebackingstorefactory.get(eventqueuebackingstorefactory.java:79) @ org.apache.flume.channel.file.log.replay(log.java:417) @ org.apache.flume.channel.file.filechannel.start(filechannel.java:279) @ org.apache.flume.lifecycle.lifecyclesupervisor$monitorrunnable.run(lifecyclesupervisor.java:251) @ java.util.concurrent.executors$runnableadapter.call(executors.java:511) @ java.util.concurrent.futuretask.runandreset(futuretask.java:308) @ java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.access$301(scheduledthreadpoolexecutor.java:180) @ java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.run(scheduledthreadpoolexecutor.java:294) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) 2014-12-03 20:49:48,160 (lifecyclesupervisor-1-0) [error - org.apache.flume.channel.file.filechannel.start(filechannel.java:290)] failed start file channel [channel=c1] java.io.eofexception @ java.io.randomaccessfile.readint(randomaccessfile.java:786) @ java.io.randomaccessfile.readlong(randomaccessfile.java:819) @ org.apache.flume.channel.file.eventqueuebackingstorefactory.get(eventqueuebackingstorefactory.java:79) @ org.apache.flume.channel.file.log.replay(log.java:417) @ org.apache.flume.channel.file.filechannel.start(filechannel.java:279) @ org.apache.flume.lifecycle.lifecyclesupervisor$monitorrunnable.run(lifecyclesupervisor.java:251) @ java.util.concurrent.executors$runnableadapter.call(executors.java:511) @ java.util.concurrent.futuretask.runandreset(futuretask.java:308) @ java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.access$301(scheduledthreadpoolexecutor.java:180) @ java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.run(scheduledthreadpoolexecutor.java:294) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) 2014-12-03 20:49:48,162 (conf-file-poller-0) [info - org.apache.flume.node.application.startallcomponents(application.java:173)] starting sink k2 2014-12-03 20:49:48,163 (conf-file-poller-0) [info - org.apache.flume.node.application.startallcomponents(application.java:184)] starting source r1 2014-12-03 20:49:48,163 (lifecyclesupervisor-1-3) [info - org.apache.flume.source.spooldirectorysource.start(spooldirectorysource.java:77)] spooldirectorysource source starting directory: /usr/local/netlog/ 2014-12-03 20:49:48,185 (lifecyclesupervisor-1-3) [debug - org.apache.flume.client.avro.reliablespoolingfileeventreader.<init>(reliablespoolingfileeventreader.java:132)] initializing reliablespoolingfileeventreader directory=/usr/local/netlog, metadir=.flumespool, deserializer=line 2014-12-03 20:49:48,204 (lifecyclesupervisor-1-3) [debug - org.apache.flume.client.avro.reliablespoolingfileeventreader.<init>(reliablespoolingfileeventreader.java:154)] created , deleted canary file: /usr/local/netlog/flume-spooldir-perm-check-5019906964160509405.canary 2014-12-03 20:49:48,218 (lifecyclesupervisor-1-3) [debug - org.apache.flume.source.spooldirectorysource.start(spooldirectorysource.java:110)] spooldirectorysource source started 2014-12-03 20:49:48,343 (lifecyclesupervisor-1-3) [info - org.apache.flume.instrumentation.monitoredcountergroup.register(monitoredcountergroup.java:119)] monitored counter group type: source, name: r1: registered new mbean. 2014-12-03 20:49:48,343 (lifecyclesupervisor-1-3) [info - org.apache.flume.instrumentation.monitoredcountergroup.start(monitoredcountergroup.java:95)] component type: source, name: r1 started 2014-12-03 20:49:48,344 (lifecyclesupervisor-1-1) [info - org.apache.flume.instrumentation.monitoredcountergroup.register(monitoredcountergroup.java:119)] monitored counter group type: sink, name: k2: registered new mbean. 2014-12-03 20:49:48,347 (lifecyclesupervisor-1-1) [info - org.apache.flume.instrumentation.monitoredcountergroup.start(monitoredcountergroup.java:95)] component type: sink, name: k2 started 2014-12-03 20:49:48,356 (sinkrunner-pollingrunner-defaultsinkprocessor) [debug - org.apache.flume.sinkrunner$pollingrunner.run(sinkrunner.java:143)] polling sink runner starting 2014-12-03 20:49:48,357 (sinkrunner-pollingrunner-defaultsinkprocessor) [error - org.apache.flume.sinkrunner$pollingrunner.run(sinkrunner.java:160)] unable deliver event. exception follows. java.lang.illegalstateexception: channel closed [channel=c1]. due java.io.eofexception: null @ org.apache.flume.channel.file.filechannel.createtransaction(filechannel.java:329) @ org.apache.flume.channel.basicchannelsemantics.gettransaction(basicchannelsemantics.java:122) @ org.apache.flume.sink.hdfs.hdfseventsink.process(hdfseventsink.java:376) @ org.apache.flume.sink.defaultsinkprocessor.process(defaultsinkprocessor.java:68) @ org.apache.flume.sinkrunner$pollingrunner.run(sinkrunner.java:147) @ java.lang.thread.run(thread.java:745) caused by: java.io.eofexception @ java.io.randomaccessfile.readint(randomaccessfile.java:786) @ java.io.randomaccessfile.readlong(randomaccessfile.java:819) @ org.apache.flume.channel.file.eventqueuebackingstorefactory.get(eventqueuebackingstorefactory.java:79) @ org.apache.flume.channel.file.log.replay(log.java:417) @ org.apache.flume.channel.file.filechannel.start(filechannel.java:279) @ org.apache.flume.lifecycle.lifecyclesupervisor$monitorrunnable.run(lifecyclesupervisor.java:251) @ java.util.concurrent.executors$runnableadapter.call(executors.java:511) @ java.util.concurrent.futuretask.runandreset(futuretask.java:308) @ java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.access$301(scheduledthreadpoolexecutor.java:180) @ java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.run(scheduledthreadpoolexecutor.java:294) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) ... 1 more 2014-12-03 20:49:48,659 (pool-4-thread-1) [error - org.apache.flume.source.spooldirectorysource$spooldirectoryrunnable.run(spooldirectorysource.java:256)] fatal: spool directory source r1: { spooldir: /usr/local/netlog/ }: uncaught exception in spooldirectorysource thread. restart or reconfigure flume continue processing. java.lang.illegalstateexception: channel closed [channel=c1]. due java.io.eofexception: null @ org.apache.flume.channel.file.filechannel.createtransaction(filechannel.java:329) @ org.apache.flume.channel.basicchannelsemantics.gettransaction(basicchannelsemantics.java:122) @ org.apache.flume.channel.channelprocessor.processeventbatch(channelprocessor.java:181) @ org.apache.flume.source.spooldirectorysource$spooldirectoryrunnable.run(spooldirectorysource.java:235) @ java.util.concurrent.executors$runnableadapter.call(executors.java:511) @ java.util.concurrent.futuretask.runandreset(futuretask.java:308) @ java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.access$301(scheduledthreadpoolexecutor.java:180) @ java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.run(scheduledthreadpoolexecutor.java:294) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) caused by: java.io.eofexception @ java.io.randomaccessfile.readint(randomaccessfile.java:786) @ java.io.randomaccessfile.readlong(randomaccessfile.java:819) @ org.apache.flume.channel.file.eventqueuebackingstorefactory.get(eventqueuebackingstorefactory.java:79) @ org.apache.flume.channel.file.log.replay(log.java:417) @ org.apache.flume.channel.file.filechannel.start(filechannel.java:279) @ org.apache.flume.lifecycle.lifecyclesupervisor$monitorrunnable.run(lifecyclesupervisor.java:251) ... 7 more 2014-12-03 20:49:53,359 (sinkrunner-pollingrunner-defaultsinkprocessor) [error - org.apache.flume.sinkrunner$pollingrunner.run(sinkrunner.java:160)] unable deliver event. exception follows. java.lang.illegalstateexception: channel closed [channel=c1]. due java.io.eofexception: null @ org.apache.flume.channel.file.filechannel.createtransaction(filechannel.java:329) @ org.apache.flume.channel.basicchannelsemantics.gettransaction(basicchannelsemantics.java:122) @ org.apache.flume.sink.hdfs.hdfseventsink.process(hdfseventsink.java:376) @ org.apache.flume.sink.defaultsinkprocessor.process(defaultsinkprocessor.java:68) @ org.apache.flume.sinkrunner$pollingrunner.run(sinkrunner.java:147) @ java.lang.thread.run(thread.java:745) caused by: java.io.eofexception @ java.io.randomaccessfile.readint(randomaccessfile.java:786) @ java.io.randomaccessfile.readlong(randomaccessfile.java:819) @ org.apache.flume.channel.file.eventqueuebackingstorefactory.get(eventqueuebackingstorefactory.java:79) @ org.apache.flume.channel.file.log.replay(log.java:417) @ org.apache.flume.channel.file.filechannel.start(filechannel.java:279) @ org.apache.flume.lifecycle.lifecyclesupervisor$monitorrunnable.run(lifecyclesupervisor.java:251) @ java.util.concurrent.executors$runnableadapter.call(executors.java:511) @ java.util.concurrent.futuretask.runandreset(futuretask.java:308) @ java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.access$301(scheduledthreadpoolexecutor.java:180) @ java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.run(scheduledthreadpoolexecutor.java:294) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) ... 1 more
thanks in advance
ok, need few more props fro hdfs sink:
a1.sinks.k2.hdfs.fileprefix = [your prefix] a1.sinks.k2.hdfs.filesuffix = .[your suffix]
suffix .tsv or .csv instance, while prefix can - can use variables date , time - requires use timestamp interceptor. can create own interceptor , generate own variables file name. if omit this, flume add own sequence between prefix , suffix.
as addition our previous comments, props disable rollovers following:
a1.sinks.k2.rollinterval = 0 a1.sinks.k2.rollsize = 0 a1.sinks.k2.rollcount = 0 a1.sinks.k2.idletimeout = 0
to access file name of original file source, append following in hdfs sink config:
a1.sinks.k2.hdfs.fileprefix = %{file}
to simplify channel config, following:
a1.channels.c1.type = memory a1.channels.c1.capacity = 1000
Comments
Post a Comment