flume
系统环境:CentOS 7 1804 x64
java版本: 1.8.0_171
flume二进制版本:1.9
安装java:
jdk-8u171-linux-x64.tar.gz
tar -zxf jdk-8u171-linux-x64.tar.gz
mv jdk1.8.0_171 /usr/local/
vi /etc/profile
JAVA_HOME=/usr/local/jdk1.8.0_171
JRE_HOME=$JAVA_HOME/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
source /etc/profileflume下载地址:https://www-eu.apache.org/dist/flume/
wget https://www-eu.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
tar -zxf apache-flume-1.9.0-bin.tar.gz
ln /usr/src/apache-flume-1.9.0-bin/bin/flume-ng /usr/bin简单获取nginx日志的配置文件
# 配置agent名称,以下指定为a1
# 定义agent基本信息
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1
# 定义r1 日志收集方法
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/nginx/access.log
# 定义r2 日志收集方法
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /var/log/nginx/error.log
# 定义sink信息
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /tmp/nginx/logs
# 间隔多长时间将临时文件滚动成目标文件,单位秒,默认值30,如果设置为0表示不根据时间来滚动文件
a1.sinks.k1.sink.rollInterval = 0
a1.sinks.k1.sink.writeFormat = text
# 生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本
a1.sinks.k1.sink.fileType = DataStream
# 定义通道相关信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 # 通道最大存储事务数量
a1.channels.c1.transactionCapacity = 100 # 通道从每个源获取事务或提供事务的最大数量
# 将sources和sinks 用channels连接起来
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1TAILDIR+kafka配置方法
# 定义agent基本信息
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 定义r1 日志收集方法
a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 2
a1.sources.r1.filegroups.f1 = /var/log/nginx/.* #注意路径通配符,必须以.开始
a1.sources.r1.filegroups.f2 = /var/log/tomcat/log.log
# 定义通道相关信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
#设置Kafka接收器
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
a1.sinks.k1.kafka.bootstrap.servers = 1.1.1.1:9092
#设置Kafka的Topic
a1.sinks.k1.topic=test
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoderElasticSearchSink
channel
-
type
-
hostNames
-
indexName
flume
indexType
logs
clusterName
elasticsearch
batchSize
ttl
-
serialier
org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
serializer.*
-
示例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1File Roll Sink
channel
-
type
-
sink.directory
-
sink.pathManager
DEFAULT
sink.pathManager.extension
-
sink.pathManager.prefix
-
sink.rollInterval
sink.serializer
TEXT
sink.batchSize
示例
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flumeLogger Sink
channel
-
type
-
maxBytesToLog
示例
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1Avro Sink
channel
-
type
-
hostname
-
port
-
batch-size
connect-timeout
request-timeout
reset-connection-interval
none
compression-type
none
compression-level
ssl
false
trust-all-cert
false
truststore
-
truststore-password
-
truststore-type
JKS
exclude-protocols
SSLv3
maxIoWorkers * the number of available processors in the machine
示例
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545HDFS Sink
channel
–
type
–
hdfs.path
–
hdfs.filePrefix
FlumeData
hdfs.fileSuffix
–
hdfs.inUsePrefix
–
hdfs.inUseSuffix
.tmp
hdfs.emptyInUseSuffix
FALSE
hdfs.rollInterval
30
hdfs.rollSize
1024
hdfs.rollCount
10
hdfs.idleTimeout
0
hdfs.batchSize
100
hdfs.codeC
–
hdfs.fileType
SequenceFile
hdfs.maxOpenFiles
5000
hdfs.minBlockReplicas
–
hdfs.writeFormat
Writable
hdfs.threadsPoolSize
10
hdfs.rollTimerPoolSize
1
hdfs.kerberosPrincipal
–
hdfs.kerberosKeytab
–
hdfs.proxyUser
hdfs.round
FALSE
hdfs.roundValue
1
hdfs.roundUnit
second
hdfs.timeZone
Local Time
hdfs.useLocalTimeStamp
FALSE
hdfs.closeTries
0
hdfs.retryInterval
180
serializer
TEXT
serializer.*
示例:
1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minuteTaildir Source
channel
-
type
-
filegroups
-
filegroups.
-
positionFile
~/.flume/taildir_position.json
headers..
-
byteOffsetHeader
false
skipToEnd
false
idleTimeout
120000
writePosInterval
3000
batchSize
100
maxBatchCount
Long.MAX_VALUE
backoffSleepIncrement
1000
maxBackoffSleep
5000
cachePatternMatching
true
fileHeader
false
fileHeaderKey
file
示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000Exec Source 做测试可以,官方不推荐用这种方式,因为tail -F 不会因为目标无法写入(例如channel空间被占满)而停止,也不会记录当前传输位置。不利于数据完整性。推荐用TAILDIR或spooldir
channel
-
type
-
command
-
shell
-
restartThrottle
10000
restart
false
logStdErr
false
batchSize
20
batchTimeout
3000
selector.type
replicating
selector.*
interceptors
-
interceptors.*
示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1最后更新于