博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flume结合morphline使用(flume+kafka+morphline+hdfs+avro+hive)
阅读量:3916 次
发布时间:2019-05-23

本文共 4508 字,大约阅读时间需要 15 分钟。

简介:使用flume将数据以avro格式存在HDFS里。

第一个flume 把数据推到kafkachannel ,第二个flume把数据从kafkachannel落到hdfs中。中间经过Morphline来解析数据。最后导入到hive表中。

morphline.conf

morphlines: [  {    id: morphline    importCommands : ["org.kitesdk.**"]    commands: [      {        readLine {          charset: UTF-8        }      }      # 解析出字段      {        split {          inputField: message          outputFields: [date, time, soft, version]          separator: " "          isRegex: false          addEmptyStrings: false          trim: true        }      }      {        split {          inputField: soft          outputFields: [mes,plat]          separator: ":"          isRegex: false          addEmptyStrings: false          trim: true        }      }      {        split {          inputField: mes          outputFields: [state,status,name]          separator: ","          isRegex: false          addEmptyStrings: false          trim: true        }      }      # 将时间戳添加到header中,不加会报找不到timestap      {        addValues {         timestamp: "@{date} @{time}"     flume.avro.schema.url:"file:/etc/flume-ng/conf/practices/dpkg.avsc"        }      }    # 格式化上面的时间戳      {    convertTimestamp {      field : timestamp      inputFormats : ["yyyy-MM-dd HH:mm:ss"]      outputFormat : unixTimeInMillis    }      }    # 测试使用      {    logInfo {       format : "timestamp: {}, record: {}"       args : ["@{timestamp}", "@{}"]    }      }    # 将数据转成avro格式,自定义schema      {    toAvro {      schemaFile: /etc/flume-ng/conf/practices/dpkg.avsc    }      }    # 指定containlessBinary可以去掉schema头,指定编码解码器      {    writeAvroToByteArray {      format : containerlessBinary      codec : snappy    }      }    ]  }]

schema

{    "type" : "record",    "name" : "soft",    "fields" : [        {"name":"date","type":"string"},        {"name":"time","type":"string"},        {"name":"status","type":["null","string"]},        {"name":"state","type":["null","string"]},        {"name":"name","type":"string"},        {"name":"plat","type":["null","string"]},        {"name":"version","type":["string","null"]}    ]}

第一个flume

agent.sources = r1agent.channels = c1agent.sources.r1.type = execagent.sources.r1.command = cat /home/training/Desktop/a/dpkg.logagent.sources.r1.channels = c1agent.sources.r1.interceptors=i1 i2agent.sources.r1.interceptors.i2.type=regex_filteragent.sources.r1.interceptors.i2.regex=(.*)installed(.*)agent.sources.r1.interceptors.i1.type=org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builderagent.sources.r1.interceptors.i1.morphlineFile=/etc/flume-ng/conf/practices/morphline.confagent.sources.r1.interceptors.i1.morphlineId=morphline# kafka memeoryagent.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannelagent.channels.c1.brokerList = localhost:9092agent.channels.c1.zookeeperConnect = localhost:2181agent.channels.c1.topic = testagent.channels.c1.kafka.consumer.group.id = format-consumeragent.channels.c1.capacity = 10000agent.channels.c1.transactionCapacity = 1000                        agent.channels.c1.parseAsFlumeEvent = true

第二个flume

a1.sinks = k1 a1.channels = c1 # kafka memeorya1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.brokerList = localhost:9092a1.channels.c1.zookeeperConnect = localhost:2181a1.channels.c1.topic = testa1.channels.c1.kafka.consumer.group.id = flume1a1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 1000                        a1.channels.c1.parseAsFlumeEvent = true# sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /flume/test/ds=%Y%m%da1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.fileSuffix  = .avroa1.sinks.k1.hdfs.batchSize = 10a1.sinks.k1.channel = c1    a1.sinks.k1.serializer = org.apache.flume.serialization.AvroEventSerializer$Buildera1.sinks.k1.serializer.compressionCodec = snappy

导入到hive表中

建立hive外部表,使用hdfs里的schema

CREATE EXTERNAL TABLE format    PARTITIONED BY (ds string)    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'    WITH SERDEPROPERTIES ('avro.schema.url'='hdfs:///flume/schema/dpkg.avsc')    STORED AS    INPUTFORMAT  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'    LOCATION '/flume/test';

添加数据

alter table format add partition (ds = '20180805');

查询结果

select * from format;

这里写图片描述

注意:morphline中的flume.avro.schema.url:

{
addValues {
timestamp: “@{date} @{time}”
flume.avro.schema.url:”file:/etc/flume-ng/conf/practices/dpkg.avsc”
}
}
没有这个可能会在运行第二个flume时报 could not find schema for event的错误 。

你可能感兴趣的文章
.NET Core3.1升级.NET5,坑还真不少...
查看>>
为什么曾经优秀的人突然变得平庸?
查看>>
.NET 5 中的隐藏特性
查看>>
客户的一个紧急bug,我用了两种方式进行 C# 反编译修改源码
查看>>
.NET5都来了,你还不知道怎么部署到linux?最全部署方案,总有一款适合你
查看>>
我画着图,FluentAPI 她自己就生成了
查看>>
BenchmarkDotNet v0.12x新增功能
查看>>
使用 .NET 5 体验大数据和机器学习
查看>>
C# 中的数字分隔符 _
查看>>
持续交付一:从开发到上线的环境
查看>>
使用 docker 构建分布式调用链跟踪框架skywalking
查看>>
深度探秘.NET 5.0
查看>>
Github Actions 中 Service Container 的使用
查看>>
天际数见数据质量巡检架构优化
查看>>
别在.NET死忠粉面前黑.NET5,它未来可期!
查看>>
Winform 进度条弹窗和任务控制
查看>>
部署Dotnet Core应用到Kubernetes(二)
查看>>
持续交付二:为什么需要多个环境
查看>>
简单理解线程同步上下文
查看>>
购票啦 | 2020中国.NET开发者峰会启动
查看>>