您的当前位置:首页 > 大数据

Apache Beam 实战指南 | 大数据管道 (pipeline) 设计及实战

2020-08-12 05:58:11

作者:张海涛

关于 Apache Beam 实战指南系列文章

随着大数据 2.0 时代悄然到来,大数据从简单的批处理扩展到了实时处理、流处理、交互式查询和机器学习应用。近年来涌现出诸多大数据应用组件,如 HBase、Hive、Kafka、Spark、Flink 等。开发者经常要用到不同的技术、框架、API、开发语言和 SDK 来应对复杂应用的开发,这大大增加了选择合适工具和框架的难度,开发者想要将所有的大数据组件熟练运用几乎是一项不可能完成的任务。

面对这种情况,Google 在 2016 年 2 月宣布将大数据流水线产品(Google DataFlow)贡献给 Apache 基金会孵化,2017 年 1 月 Apache 对外宣布开源 Apache Beam,2017 年 5 月迎来了它的第一个稳定版本 2.0.0。在国内,大部分开发者对于 Beam 还缺乏了解,社区中文资料也比较少。

一.概述

其他行业问咱们 IT 具体干什么的,很多 IT 人员会自嘲自己就是“搬砖”(此处将复制代码称为搬砖)的民工。过了两天 GitHub 出现自动写代码的人工智能,IT 程序员深深叹了一口气说道“完了要失业了,代码没得搬了”。其实从入行 IT 那一刻起,不管我们做前端、服务端、底层架构等任何岗位,其实我们都是为数据服务的服务人员(注:不是说从民工转岗到服务员了):把数据从后端搬到前端,把前端数据再写入数据库。尽管编程语言从 C、C++、C#、JAVA、Python 不停变化,为了适应时代背景框架也是千变万化,我们拼命从“亚马逊热带雨林”一直学到“地中海”。

然后 Apache Beam 这个一统“地中海”的框架出现了。Apache Beam 不光统一了数据源,还统一了流批计算。在这个数据传输过程中有一条核心的技术就是管道(Pipeline),不管是 Strom,Flink ,Beam 它都是核心。在这条管道中可以对数据进行过滤、净化、清洗、合并、分流以及各种实时计算操作。

本文会详细介绍如何设计 Apache Beam 管道、管道设计工具介绍、源码和案例分析,普及和提升大家对 Apache Beam 管道的认知。

二.怎样设计好自己的管道

设计管道注意事项

图 2-1 简单管道

1. 你输入的数据存储在那里

首先要确定你要构造几条数据源,在 Beam 可以构建多条,构建之前可以选择自己的 SDK 的 IO。

2. 你的数据类型是什么样的

Beam 提供的是键值对的数据类型,你的数据可能是日志文本、格式化设备事件、数据库的行,所以在 PCollection 就应该确定数据集的类型。

3. 你想怎么处理数据

对数据进行转换、过滤处理、窗口计算、SQL 处理等。 在管道中提供了通用的 ParDo 转换类,算子计算以及 BeamSQL 等操作。

4. 你打算把数据最后输出到哪里去

在管道末尾进行 Write 写入操作,把数据最后写入你自己想存放或最后流向的地方。

管道的几种玩法

1. 分支管道:多次转换,处理相同的数据集

图 2-2-1 多次转换处理相同数据示意图

描述:例如上图 2-1-1 图所示,从一个数据库的表读取或转换数据集,然后从数据集中分别找找以字母“A”开头的数据放入一个分支数据集中,如果以字母“B”开头的数据放入另一个分支数据集中,最终两个数据集进行隔离处理。

数据集:

// 为了演示显示内存数据集 final List LINES = Arrays.asList( "Aggressive", "Bold", "Apprehensive", "Brilliant");

示例代码:

?PCollection dbRowCollection = ...;// 这个地方可以读取任何数据源。 PCollection aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn(){ @ProcessElement public void processElement(ProcessContext c) { if(c.element().startsWith("A")){// 查找以 "A" 开头的数据 c.output(c.element()); System.out.append("A 开头的单词有:"+c.element()+"\r"); } } })); PCollection bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn(){ @ProcessElement public void processElement(ProcessContext c) { if(c.element().startsWith("B")){// 查找以 "A" 开头的数据 c.output(c.element()); System.out.append("B 开头的单词有:"+c.element()+"\r"); } } }));

最终结果展示:

A 开头的单词有:Aggressive B 开头的单词有:Bold A 开头的单词有:Apprehensive B 开头的单词有:Brilliant

原示例代码地址 : pipelineTest2_1

2. 分支管道:一次转换,输出多个数据集

图 2-2-2 一次转换多个输出示意图

描述:根据图 2-2-1 和图 2-2-2 图中可以看出,他们以不同的方式执行着相同的操作,图 2-2-1 中的管道包含两个转换,用于处理同一输入中的元素 PCollection。一个转换使用以下逻辑:

if(以'A'开头){outputToPCollectionA}

另一个转换为

if(以'B'开头){outputToPCollectionB}

因为每个转换读取整个输入 PCollection,所以输入中的每个元素都会 PCollection 被处理两次。

图 2-2-2 中的管道以不同的方式执行相同的操作 - 只有一个转换使用以下逻辑:

if(以'A'开头){outputToPCollectionA} else if(以'B'开头){outputToPCollectionB}

其中输入中的每个元素都 PCollection 被处理一次。

数据集:同 2-1-1 数据集

示例代码:

?// 定义两个 TupleTag,每个输出一个。 final TupleTag startsWithATag = new TupleTag(){}; final TupleTag startsWithBTag = new TupleTag(){}; PCollectionTuple mixedCollection = dbRowCollection.apply(ParDo .of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { if (c.element().startsWith("A")) { // 返回首字母带有 "A" 的数据集。 c.output(c.element()); } else if(c.element().startsWith("B")) { // // 返回首字母带有 "B" 的数据集。 c.output(startsWithBTag, c.element()); } } }) // Specify main output. In this example, it is the output // with tag startsWithATag. .withOutputTags(startsWithATag, // Specify the output with tag startsWithBTag, as a TupleTagList. TupleTagList.of(startsWithBTag))); // Get subset of the output with tag startsWithATag. mixedCollection.get(startsWithATag).apply(...); // Get subset of the output with tag startsWithBTag. mixedCollection.get(startsWithBTag).apply(...);

如果每个元素的转换计算非常耗时,则使用其他输出会更有意义,因为一次性过滤全部数据,比全部数据过滤两次从性能上和转换上都存在一定程度上提升,数据量越大越明显。

最终结果展示:

?复制代码A 开头的单词有:ApprehensiveA 开头的单词有:AggressiveB 开头的单词有:BrilliantB 开头的单词有:Bold

原示例代码地址 : pipelineTest2_2

3. 合并管道:多个数据集,合并成一个管道输出

图 2-2-3 多数据集合并输出图

描述:

上图 2-2-3 是接图 2-2-1 的继续,把带“A” 的数据和带“B” 字母开头的数据进行合并到一个管道。

这个地方注意点是 Flatten 用法必须两个数据的数据类型相同。

数据集:

// 为了演示显示内存数据集 final List LINESa = Arrays.asList( "Aggressive", "Apprehensive"); final List LINESb = Arrays.asList( "Bold", "Brilliant");

示例代码:

// 将两个 PCollections 与 Flatten 合并 PCollectionList collectionList = PCollectionList.of(aCollection).and(bCollection); PCollection mergedCollectionWithFlatten = collectionList .apply(Flatten.pCollections()); // 继续合并新的 PCollection mergedCollectionWithFlatten.apply(...);

结果展示:

合并单词单词有: Aggressive Brilliant Apprehensive Bold

原示例代码地址 : pipelineTest2_3

4. 合并管道:多个数据源,链接合并一个管道输出

图 2-2-4 多数据源合并输出图

描述:

你的管道可以从一个或多个源读取或输入。如果你的管道从多个源读取并且这些源中的数据相关联,则将输入连接在一起会很有用。在上面的图 2-2-4 所示的示例中,管道从数据库表中读取名称和地址,并从 Kafka 主题中读取名称和订单号。然后管道 CoGroupByKey 用于连接此信息,其中键是名称 ; 结果 PCollection 包含名称,地址和订单的所有组合。

示例代码:

PCollection> userAddress = pipeline.apply(JdbcIO.>read()...); PCollection> userOrder = pipeline.apply(KafkaIO.read()...); final TupleTag addressTag = new TupleTag(); final TupleTag orderTag = new TupleTag(); // 将集合值合并到 CoGbkResult 集合中。 PCollection> joinedCollection = KeyedPCollectionTuple.of(addressTag, userAddress) .and(orderTag, userOrder) .apply(CoGroupByKey.create()); joinedCollection.apply(...);

管道的设计工具

对于管道的设计不光用代码去实现,也可以用视图工具。现在存在的有两种一种是拓蓝公司出品叫 Talend Big Data Studio,另一种就是免费开源的视图设计工具 kettle-beam 。

三.怎样创建你的管道

Apache Beam 程序从头到尾就是处理数据的管道。本小节使用 Apache Beam SDK 中的类构建管道,一个完整的 Apache Beam 管道构建流程如下:

首先创建一个 Pipeline 对象。

不管是数据做任何操作,如“ 读取”或“ 创建”及转换都要为管道创建 PCollection 一个或多个的数 据集(PCollection****)。

在 Apache Beam 的管道中你可以对数据集 PCollection 做任何操作,例如转换数据格式,过滤,分组,分析或以其他方式处理数据中的每一个元素。每个转换都会创建一个新输出数据集 PCollection,当然你可以在处理完成之前进行做任何的转换处理。

把你认为最终处理完成的数据集写或以其他方式输出最终的存储地方。

最后运行管道。

创建管道对象

每一个 Apache Beam 程序都会从创建管道(Pipeline)对象开始。

在 Apache Beam SDK,每一个管道都是一个独立的实体,管道的数据集也都封装着它的数据和对应的数据类型(在 Apache Beam 中有对应的数据转换类型包)。最后把数据进行用于各种转换操作。

在创建的管道的时候需要设置管道选项 PipelineOptions,有两种创建方式第一种是无参数和一种有参数的。具体两种有什么不同呢? 无参数的可以在程序中指定相应的管道选项参数,如显示设置执行大数据引擎参数。有参数的就可以在提交 Apache Beam jar 程序的时候进行用 Shell 脚本的方式后期设置管道对应的参数。

具体示例如下:

无参数

// 首先定义管道的选项 PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); // 显示设置执行大数据引擎 // 创建管道实体对象 Pipeline p = Pipeline.create(options);

有参数

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

提交设置参数的格式如下: