Flume 从 RabbitMQ 收集数据保存到 Hadoop(Windows 环境)
之前公司使用 ELK 保存日志,将日志发送到 RabbitMQ,然后再用 Logstash 收集日志到 ElasticSearch。
这里使用 Flume 将日志保存到 Hadoop,日志源正好可以重用之前 RabbitMQ 中的日志,只需要在 Exchange 中新增一个 binding 就行了。
1. 下载 Flume 和 安装 rabbitmq-flume-plugin 插件
下载 apache-flume-1.8.0-bin.tar.gz 后解压到 D:\flume\flume-1.8.0 目录。
从 RabbitMQ 读取日志需要单独安装 rabbitmq-flume-plugin 插件。
最新版下载地址:rabbitmq-flume-plugin-standalone-1.0.3.jar 。
下载后复制到 D:\flume\flume-1.8.0\lib 目录即可。
2. 从 RabbitMQ 读取日志保存到文件
创建配置文件 D:\flume\flume-1.8.0\conf\rabbitmq-flume-logger.properties。
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# The channel can be defined as follows.
a1.sources.r1.channels = c1
a1.sources.r1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.r1.host = 192.168.0.1
a1.sources.r1.port = 5672
a1.sources.r1.virtual-host = /
a1.sources.r1.username = username
a1.sources.r1.password = password
a1.sources.r1.queue = queue_log
a1.sources.r1.prefetchCount = 10
# Each sink's type must be defined
a1.sinks.k1.type = logger
# Specify the channel the sink should use
a1.sinks.k1.channel = c1
# Each channel's type is defined.
a1.channels.c1.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
a1.channels.c1.capacity = 100
启动 flume agent
D:\flume\flume-1.8.0>bin\flume-ng agent -n a1 -c conf -f conf/rabbitmq-flume-logger.properties
不知道是不是哪里的配置问题,日志内容只有最前面的 16 个字符被保存了下来。因为只是为了测试从 RabbitMQ 的读取,所以也就没细查。
03 九月 2019 14:48:10,241 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.LoggerSink.process:95) - Event: { headers:{exchange=logstash, routing-key=service.netcore} body: 7B 0A 20 20 22 6C 65 76 65 6C 22 3A 20 22 45 72 {. "level": "Er }
3. 从 Rabbit MQ 读取日志保存到 Hadoop
如何在 Windows 上安装 Hadoop 见 这里 。
创建配置文件 D:\flume\flume-1.8.0\conf\rabbitmq-flume-hadoop.properties 。
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sources.r1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.r1.host = 192.168.0.1
a1.sources.r1.port = 5672
a1.sources.r1.virtual-host = /
a1.sources.r1.username = username
a1.sources.r1.password = password
a1.sources.r1.queue = queue_log
a1.sources.r1.prefetchCount = 10
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/liujiajia/flume/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
启动 flume agent
D:\flume\flume-1.8.0>bin\flume-ng agent -n a1 -c conf -f conf/rabbitmq-flume-hadoop.properties
4. 导出到 RabbitMQ
创建配置文件 D:\flume\flume-1.8.0\conf\rabbitmq-flume-rabbitmq.properties 。
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sources.r1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.r1.host = 192.168.0.1
a1.sources.r1.port = 5672
a1.sources.r1.virtual-host = /
a1.sources.r1.username = username
a1.sources.r1.password = password
a1.sources.r1.queue = queue_log
a1.sources.r1.prefetchCount = 10
a1.sinks.k1.channel = c1
a1.sinks.k1.type = com.aweber.flume.sink.rabbitmq.RabbitMQSink
a1.sinks.k1.host = 192.168.0.1
a1.sinks.k1.port = 5672
a1.sinks.k1.virtual-host = /
a1.sinks.k1.username = username
a1.sinks.k1.password = password
a1.sinks.k1.exchange = flume.sink
a1.sinks.k1.routing-key = service.flume
a1.sinks.k1.publisher-confirms = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
启动 flume agent
D:\flume\flume-1.8.0>bin\flume-ng agent -n a1 -c conf -f conf/rabbitmq-flume-rabbitmq.properties
附 1. Could not configure sink k1 due to: No channel configured for sink: k1 错误
配置导出到 RabbitMQ 时发生了这个错误。具体日志如下:
Could not configure sink k1 due to: No channel configured for sink: k1
org.apache.flume.conf.ConfigurationException: No channel configured for sink: k1
at org.apache.flume.conf.sink.SinkConfiguration.configure(SinkConfiguration.java:52)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:680)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:347)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:212)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:126)
at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:108)
at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:194)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:93)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
明明配置了 sink 的 channel 为什么会报错呢?搜了一下还真有遇到同样问题的人。
原因竟然是 a1.sinks.k1.channel 配置项中的 channel 是不加 s 的。
细追源头才发现 rabbitmq-flume-plugin 插件的文档示例上写的就是错的。
=.=|||
附 2. flume-ng agent 启动时显示 Class path contains multiple SLF4J bindings. 警告
Including Hadoop libraries found in (D:\hadoop-3.2.0) for DFS access
WARN: HBASE_HOME not found
WARN: HIVE_HOME not found
Running FLUME agent :
class: org.apache.flume.node.Application
arguments: -n $agent_name -f "D:\flume\flume-1.8.0\conf\rabbitmq-flumn-haddop.properties"
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/flume/flume-1.8.0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/hadoop-3.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
这个是由于 jar 包重复导致的。倒也不影响使用,若要去掉警告,删除 flume 目录下的 slf4j-log4j12-1.6.1.jar 文件就可以了。
附 3. 发送到 Hadoop 时报错: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
错误消息内容如下:
process failed
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:368)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:368)
... 3 more
Error writing to channel for com.aweber.flume.source.rabbitmq.Consumer@5d8a78d8, message rejected org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
HDFSEventSink.java 的 process 方法中获取路径名(String realPath = BucketPath.escapeString
)时报错了。
/**
* Pull events out of channel and send it to HDFS. Take at most batchSize
* events per Transaction. Find the corresponding bucket for the event.
* Ensure the file is open. Serialize the data and write it to the file on
* HDFS. <br/>
* This method is not thread safe.
*/
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
try {
Set<BucketWriter> writers = new LinkedHashSet<>();
int txnEventCount = 0;
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
Event event = channel.take();
if (event == null) {
}
// reconstruct the path name by substituting place holders
String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String realName = BucketPath.escapeString(fileName, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
BucketWriter bucketWriter;
HDFSWriter hdfsWriter = null;
// Callback to remove the reference to the bucket writer from the
// sfWriters map so that all buffers used by the HDFS file
// handles are garbage collected.
WriterCallback closeCallback = new WriterCallback() {
@Override
public void run(String bucketPath) {
LOG.info("Writer callback called.");
synchronized (sfWritersLock) {
sfWriters.remove(bucketPath);
}
}
};
synchronized (sfWritersLock) {
bucketWriter = sfWriters.get(lookupPath);
// we haven't seen this file yet, so open it and cache the handle
if (bucketWriter == null) {
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
sfWriters.put(lookupPath, bucketWriter);
}
}
// Write the data to HDFS
try {
bucketWriter.append(event);
} catch (BucketClosedException ex) {
LOG.info("Bucket was closed while trying to append, " +
"reinitializing bucket and writing event.");
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
synchronized (sfWritersLock) {
sfWriters.put(lookupPath, bucketWriter);
}
bucketWriter.append(event);
}
// track the buckets getting written in this transaction
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}
}
if (txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
} else if (txnEventCount == batchSize) {
sinkCounter.incrementBatchCompleteCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
// flush all pending buckets before committing the transaction
for (BucketWriter bucketWriter : writers) {
bucketWriter.flush();
}
transaction.commit();
if (txnEventCount < 1) {
return Status.BACKOFF;
} else {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return Status.READY;
}
} catch (IOException eIO) {
transaction.rollback();
LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;
} catch (Throwable th) {
transaction.rollback();
LOG.error("process failed", th);
if (th instanceof Error) {
throw (Error) th;
} else {
throw new EventDeliveryException(th);
}
} finally {
transaction.close();
}
}
具体是在 BucketPath.java 的 replaceShorthand 方法中报出的错误。
从代码中可以看出当不使用本地系统时间时,会使用 MQ 消息中 timestamp Header 的值,然后会验证该值是否为 null
(Preconditions.checkNotNull
),若为 null
则会抛出异常。
感觉这里像是个 bug,因为文档上说当 timestamp Header 不存在时默认会使用系统时间。
/**
* Not intended as a public API
*/
@VisibleForTesting
protected static String replaceShorthand(char c, Map<String, String> headers,
TimeZone timeZone, boolean needRounding, int unit, int roundDown,
boolean useLocalTimestamp, long ts) {
String timestampHeader = null;
try {
if (!useLocalTimestamp) {
timestampHeader = headers.get("timestamp");
Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " +
"the Flume event headers, but it was null");
ts = Long.valueOf(timestampHeader);
} else {
timestampHeader = String.valueOf(ts);
}
} catch (NumberFormatException e) {
throw new RuntimeException("Flume wasn't able to parse timestamp header"
+ " in the event to resolve time based bucketing. Please check that"
+ " you're correctly populating timestamp header (for example using"
+ " TimestampInterceptor source interceptor).", e);
}
if (needRounding) {
ts = roundDown(roundDown, unit, ts, timeZone);
}
// It's a date
String formatString = "";
switch (c) {
case '%':
return "%";
case 'a':
formatString = "EEE";
break;
case 'A':
formatString = "EEEE";
break;
case 'b':
formatString = "MMM";
break;
case 'B':
formatString = "MMMM";
break;
case 'c':
formatString = "EEE MMM d HH:mm:ss yyyy";
break;
case 'd':
formatString = "dd";
break;
case 'e':
formatString = "d";
break;
case 'D':
formatString = "MM/dd/yy";
break;
case 'H':
formatString = "HH";
break;
case 'I':
formatString = "hh";
break;
case 'j':
formatString = "DDD";
break;
case 'k':
formatString = "H";
break;
case 'l':
formatString = "h";
break;
case 'm':
formatString = "MM";
break;
case 'M':
formatString = "mm";
break;
case 'n':
formatString = "M";
break;
case 'p':
formatString = "a";
break;
case 's':
return "" + (ts / 1000);
case 'S':
formatString = "ss";
break;
case 't':
// This is different from unix date (which would insert a tab character
// here)
return timestampHeader;
case 'y':
formatString = "yy";
break;
case 'Y':
formatString = "yyyy";
break;
case 'z':
formatString = "ZZZ";
break;
default:
// LOG.warn("Unrecognized escape in event format string: %" + c);
return "";
}
SimpleDateFormat format = getSimpleDateFormat(formatString);
if (timeZone != null) {
format.setTimeZone(timeZone);
} else {
format.setTimeZone(TimeZone.getDefault());
}
Date date = new Date(ts);
return format.format(date);
}
HDFSEventSink.java 的 configure 方法中对该参数赋的值,其值来自 hdfs.useLocalTimeStamp 配置项。
将该配置项设置为 true
即可避免该错误。
// read configuration and setup thresholds
@Override
public void configure(Context context) {
this.context = context;
filePath = Preconditions.checkNotNull(
context.getString("hdfs.path"), "hdfs.path is required");
fileName = context.getString("hdfs.filePrefix", defaultFileName);
this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix);
inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix);
inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);
String tzName = context.getString("hdfs.timeZone");
timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval);
rollSize = context.getLong("hdfs.rollSize", defaultRollSize);
rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
batchSize = context.getLong("hdfs.batchSize", defaultBatchSize);
idleTimeout = context.getInteger("hdfs.idleTimeout", 0);
String codecName = context.getString("hdfs.codeC");
fileType = context.getString("hdfs.fileType", defaultFileType);
maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
threadsPoolSize = context.getInteger("hdfs.threadsPoolSize",
defaultThreadPoolSize);
rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize",
defaultRollTimerPoolSize);
String kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal");
String kerbKeytab = context.getString("hdfs.kerberosKeytab");
String proxyUser = context.getString("hdfs.proxyUser");
tryCount = context.getInteger("hdfs.closeTries", defaultTryCount);
if (tryCount <= 0) {
LOG.warn("Retry count value : " + tryCount + " is not " +
"valid. The sink will try to close the file until the file " +
"is eventually closed.");
tryCount = defaultTryCount;
}
retryInterval = context.getLong("hdfs.retryInterval", defaultRetryInterval);
if (retryInterval <= 0) {
LOG.warn("Retry Interval value: " + retryInterval + " is not " +
"valid. If the first close of a file fails, " +
"it may remain open and will not be renamed.");
tryCount = 1;
}
Preconditions.checkArgument(batchSize > 0, "batchSize must be greater than 0");
if (codecName == null) {
codeC = null;
compType = CompressionType.NONE;
} else {
codeC = getCodec(codecName);
// TODO : set proper compression type
compType = CompressionType.BLOCK;
}
// Do not allow user to set fileType DataStream with codeC together
// To prevent output file with compress extension (like .snappy)
if (fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType) && codecName != null) {
throw new IllegalArgumentException("fileType: " + fileType +
" which does NOT support compressed output. Please don't set codeC" +
" or change the fileType if compressed output is desired.");
}
if (fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {
Preconditions.checkNotNull(codeC, "It's essential to set compress codec"
+ " when fileType is: " + fileType);
}
// get the appropriate executor
this.privExecutor = FlumeAuthenticationUtil.getAuthenticator(
kerbConfPrincipal, kerbKeytab).proxyAs(proxyUser);
needRounding = context.getBoolean("hdfs.round", false);
if (needRounding) {
String unit = context.getString("hdfs.roundUnit", "second");
if (unit.equalsIgnoreCase("hour")) {
this.roundUnit = Calendar.HOUR_OF_DAY;
} else if (unit.equalsIgnoreCase("minute")) {
this.roundUnit = Calendar.MINUTE;
} else if (unit.equalsIgnoreCase("second")) {
this.roundUnit = Calendar.SECOND;
} else {
LOG.warn("Rounding unit is not valid, please set one of" +
"minute, hour, or second. Rounding will be disabled");
eedRounding = false;
}
this.roundValue = context.getInteger("hdfs.roundValue", 1);
if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE) {
Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
"Round value" +
"must be > 0 and <= 60");
} else if (roundUnit == Calendar.HOUR_OF_DAY) {
Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
"Round value" +
"must be > 0 and <= 24");
}
}
this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false);
if (useLocalTime) {
clock = new SystemClock();
}
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}