flume

系统环境:CentOS 7 1804 x64

java版本: 1.8.0_171

flume二进制版本:1.9

安装java:

jdk-8u171-linux-x64.tar.gz
tar -zxf jdk-8u171-linux-x64.tar.gz
mv jdk1.8.0_171 /usr/local/

vi /etc/profile
JAVA_HOME=/usr/local/jdk1.8.0_171
JRE_HOME=$JAVA_HOME/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

source /etc/profile

flume下载地址:https://www-eu.apache.org/dist/flume/

wget https://www-eu.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
tar -zxf apache-flume-1.9.0-bin.tar.gz
ln /usr/src/apache-flume-1.9.0-bin/bin/flume-ng  /usr/bin

简单获取nginx日志的配置文件

# 配置agent名称,以下指定为a1
# 定义agent基本信息
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1 

# 定义r1 日志收集方法
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/nginx/access.log

# 定义r2 日志收集方法
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /var/log/nginx/error.log

# 定义sink信息
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /tmp/nginx/logs
# 间隔多长时间将临时文件滚动成目标文件,单位秒,默认值30,如果设置为0表示不根据时间来滚动文件
a1.sinks.k1.sink.rollInterval = 0

a1.sinks.k1.sink.writeFormat = text
# 生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本
a1.sinks.k1.sink.fileType = DataStream



# 定义通道相关信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000	# 通道最大存储事务数量
a1.channels.c1.transactionCapacity = 100	# 通道从每个源获取事务或提供事务的最大数量

# 将sources和sinks 用channels连接起来
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1

TAILDIR+kafka配置方法

# 定义agent基本信息
a1.sources = r1
a1.sinks = k1
a1.channels = c1 


# 定义r1 日志收集方法
a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 2
a1.sources.r1.filegroups.f1 = /var/log/nginx/.*		#注意路径通配符,必须以.开始
a1.sources.r1.filegroups.f2 = /var/log/tomcat/log.log


# 定义通道相关信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100



a1.sinks.k1.channel = c1

#设置Kafka接收器
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

#设置Kafka的broker地址和端口号
a1.sinks.k1.kafka.bootstrap.servers = 1.1.1.1:9092

#设置Kafka的Topic
a1.sinks.k1.topic=test

#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

ElasticSearchSink

属性
默认值

channel

-

type

-

hostNames

-

indexName

flume

indexType

logs

clusterName

elasticsearch

batchSize

ttl

-

serialier

org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer

serializer.*

-

示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

File Roll Sink

属性
默认值

channel

-

type

-

sink.directory

-

sink.pathManager

DEFAULT

sink.pathManager.extension

-

sink.pathManager.prefix

-

sink.rollInterval

sink.serializer

TEXT

sink.batchSize

示例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

Logger Sink

属性
默认值

channel

-

type

-

maxBytesToLog

示例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

Avro Sink

属性
默认值

channel

-

type

-

hostname

-

port

-

batch-size

connect-timeout

request-timeout

reset-connection-interval

none

compression-type

none

compression-level

ssl

false

trust-all-cert

false

truststore

-

truststore-password

-

truststore-type

JKS

exclude-protocols

SSLv3

maxIoWorkers * the number of available processors in the machine

示例

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

HDFS Sink

Name
Default

channel

type

hdfs.path

hdfs.filePrefix

FlumeData

hdfs.fileSuffix

hdfs.inUsePrefix

hdfs.inUseSuffix

.tmp

hdfs.emptyInUseSuffix

FALSE

hdfs.rollInterval

30

hdfs.rollSize

1024

hdfs.rollCount

10

hdfs.idleTimeout

0

hdfs.batchSize

100

hdfs.codeC

hdfs.fileType

SequenceFile

hdfs.maxOpenFiles

5000

hdfs.minBlockReplicas

hdfs.writeFormat

Writable

hdfs.threadsPoolSize

10

hdfs.rollTimerPoolSize

1

hdfs.kerberosPrincipal

hdfs.kerberosKeytab

hdfs.proxyUser

hdfs.round

FALSE

hdfs.roundValue

1

hdfs.roundUnit

second

hdfs.timeZone

Local Time

hdfs.useLocalTimeStamp

FALSE

hdfs.closeTries

0

hdfs.retryInterval

180

serializer

TEXT

serializer.*

示例:

1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

Taildir Source

属性
默认值

channel

-

type

-

filegroups

-

filegroups.

-

positionFile

~/.flume/taildir_position.json

headers..

-

byteOffsetHeader

false

skipToEnd

false

idleTimeout

120000

writePosInterval

3000

batchSize

100

maxBatchCount

Long.MAX_VALUE

backoffSleepIncrement

1000

maxBackoffSleep

5000

cachePatternMatching

true

fileHeader

false

fileHeaderKey

file

示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

Exec Source 做测试可以,官方不推荐用这种方式,因为tail -F 不会因为目标无法写入(例如channel空间被占满)而停止,也不会记录当前传输位置。不利于数据完整性。推荐用TAILDIR或spooldir

属性
默认值

channel

-

type

-

command

-

shell

-

restartThrottle

10000

restart

false

logStdErr

false

batchSize

20

batchTimeout

3000

selector.type

replicating

selector.*

interceptors

-

interceptors.*

示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

最后更新于