本文共 4508 字,大约阅读时间需要 15 分钟。
简介:使用flume将数据以avro格式存在HDFS里。
第一个flume 把数据推到kafkachannel ,第二个flume把数据从kafkachannel落到hdfs中。中间经过Morphline来解析数据。最后导入到hive表中。morphline.conf
morphlines: [ { id: morphline importCommands : ["org.kitesdk.**"] commands: [ { readLine { charset: UTF-8 } } # 解析出字段 { split { inputField: message outputFields: [date, time, soft, version] separator: " " isRegex: false addEmptyStrings: false trim: true } } { split { inputField: soft outputFields: [mes,plat] separator: ":" isRegex: false addEmptyStrings: false trim: true } } { split { inputField: mes outputFields: [state,status,name] separator: "," isRegex: false addEmptyStrings: false trim: true } } # 将时间戳添加到header中,不加会报找不到timestap { addValues { timestamp: "@{date} @{time}" flume.avro.schema.url:"file:/etc/flume-ng/conf/practices/dpkg.avsc" } } # 格式化上面的时间戳 { convertTimestamp { field : timestamp inputFormats : ["yyyy-MM-dd HH:mm:ss"] outputFormat : unixTimeInMillis } } # 测试使用 { logInfo { format : "timestamp: {}, record: {}" args : ["@{timestamp}", "@{}"] } } # 将数据转成avro格式,自定义schema { toAvro { schemaFile: /etc/flume-ng/conf/practices/dpkg.avsc } } # 指定containlessBinary可以去掉schema头,指定编码解码器 { writeAvroToByteArray { format : containerlessBinary codec : snappy } } ] }]
schema
{ "type" : "record", "name" : "soft", "fields" : [ {"name":"date","type":"string"}, {"name":"time","type":"string"}, {"name":"status","type":["null","string"]}, {"name":"state","type":["null","string"]}, {"name":"name","type":"string"}, {"name":"plat","type":["null","string"]}, {"name":"version","type":["string","null"]} ]}
第一个flume
agent.sources = r1agent.channels = c1agent.sources.r1.type = execagent.sources.r1.command = cat /home/training/Desktop/a/dpkg.logagent.sources.r1.channels = c1agent.sources.r1.interceptors=i1 i2agent.sources.r1.interceptors.i2.type=regex_filteragent.sources.r1.interceptors.i2.regex=(.*)installed(.*)agent.sources.r1.interceptors.i1.type=org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builderagent.sources.r1.interceptors.i1.morphlineFile=/etc/flume-ng/conf/practices/morphline.confagent.sources.r1.interceptors.i1.morphlineId=morphline# kafka memeoryagent.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannelagent.channels.c1.brokerList = localhost:9092agent.channels.c1.zookeeperConnect = localhost:2181agent.channels.c1.topic = testagent.channels.c1.kafka.consumer.group.id = format-consumeragent.channels.c1.capacity = 10000agent.channels.c1.transactionCapacity = 1000 agent.channels.c1.parseAsFlumeEvent = true
第二个flume
a1.sinks = k1 a1.channels = c1 # kafka memeorya1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.brokerList = localhost:9092a1.channels.c1.zookeeperConnect = localhost:2181a1.channels.c1.topic = testa1.channels.c1.kafka.consumer.group.id = flume1a1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 1000 a1.channels.c1.parseAsFlumeEvent = true# sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /flume/test/ds=%Y%m%da1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.fileSuffix = .avroa1.sinks.k1.hdfs.batchSize = 10a1.sinks.k1.channel = c1 a1.sinks.k1.serializer = org.apache.flume.serialization.AvroEventSerializer$Buildera1.sinks.k1.serializer.compressionCodec = snappy
建立hive外部表,使用hdfs里的schema
CREATE EXTERNAL TABLE format PARTITIONED BY (ds string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' WITH SERDEPROPERTIES ('avro.schema.url'='hdfs:///flume/schema/dpkg.avsc') STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION '/flume/test';
添加数据
alter table format add partition (ds = '20180805');
查询结果
select * from format;
注意:morphline中的flume.avro.schema.url:
{ addValues { timestamp: “@{date} @{time}” flume.avro.schema.url:”file:/etc/flume-ng/conf/practices/dpkg.avsc” } } 没有这个可能会在运行第二个flume时报 could not find schema for event的错误 。