Spark 分布式内存计算框架

Spark 分布式内存计算框架

Spark 简介

Spark 是一种基于内存的、用以实现高效集群计算的平台。准确地讲,Spark 是一个大数据并行计算框架,是对广泛使用的 MapReduce 计算模型的扩展。Spark 有着自己的生态系统,但同时兼容 HDFS、Hive 等分布式存储系统,可以完美融入 Hadoop 的生态圈中,代替 MapReduce 去执行更为高效的分布式计算。两者的区别在于:基于 MapReduce 的计算引擎通常会将中间结果输出到磁盘上进行存储和容错;而 Spark 则是将中间结果尽量保存在内存中以减少底层存储系统的 I/O,以提高计算速度。

Spark 编程模型

核心数据结构 RDD

Spark 将数据抽象成弹性分布式数据集(Resilient Distributed Dataset, RDD),RDD 实际是分布在集群多个节点上数据的集合,通过操作 RDD 对象来并行化操作集群上的分布式数据。

RDD 有两种创建方式:

  1. 并行化驱动程序中已有的原生集合;
  2. 引用 HDFS、HBase 等外部存储系统上的数据集。

RDD 可以缓存在内存中,每次对 RDD 操作的结果都可以放到内存中,下一次操作时可直接从内存中读取,相对于 MapReduce,它省去了大量的磁盘 I/O 操作。另外,持久化的 RDD 能够在错误中自动恢复,如果某部分 RDD 丢失,Spark 会自动重算丢失的部分。

RDD 上的操作

从相关数据源获取初始数据形成初始 RDD 后,需要根据应用的需求对得到的初始 RDD 进行必要的处理,来获取满足需求的数据内容,从而对中间数据进行计算加工,得到最终的数据。

RDD 支持两种操作,一种是转换(Transformation)操作,另一种是行动(Action)操作。

转换(Transformation)操作

转换操作即将一个 RDD 转换为一个新的 RDD。值得注意的是,转换操作是惰性的,这就意味着对 RDD 调用某种转换操作时,操作并不会立即执行,而是 Spark 在内部记录下所要求执行的操作的相关信息,当在行动操作中需要用到这些转换出来的 RDD 时才会被计算,下表所示为基本的转换操作。通过转换操作,可以从已有的 RDD 生成出新的 RDD, Spark 使用谱系(Lineage)记录新旧 RDD 之间的依赖关系,一旦持久化的 RDD 丢失部分数据时,Spark 能通过谱系图重新计算丢失的数据。

输入数据为 {1, 2, 3, 3}

函数名目的示例结果
map()将数据集中的每个元素经过用户自定义的函数转换形成一个新的 RDDrdd.map(x => x * 2){2, 4, 6, 6}
flatMap()与 map() 类似,但每个元素输入项都可以被映射到 0 个或多个的输出项,最终将结果“扁平化“后输出rdd.flatMap(x => (1 to x)){1, 1, 2, 1, 2, 3, 1, 2, 3, 3}
filter()对 RDD 元素进行过滤,把经过指定函数后返回值为 true 的元素组成一个新的 RDDrdd.filter(x => (x != 3)){1, 2}
distinct()对数据进行去重,返回一个新的 RDDrdd.distinct(){1, 2, 3}
sample(withReplacement, fraction, seed)以指定的随机种子随机抽样出数量为 fraction 的数据,withReplacement 表示是抽出的数据是否放回,true 为有放回的抽样,false 为无放回的抽样rdd.sample(true,0.5,3)非确定的

行动(Action)操作

行动操作会触发 Spark 提交作业,对 RDD 进行实际的计算,并将最终求得的结果返回到驱动器程序,或者写入外部存储系统中。由于行动操作会得到一个结果,所以 Spark 会强制对 RDD 的转换操作进行求值,下表所示为基本的行动操作。

输入数据为 {1, 2, 3, 3}

函数名目的示例结果
collect()返回 RDD 中的所有元素rdd.collect(){1, 2, 3, 3}
count()返回 RDD 中元素的个数rdd.count()4
countByValue()返回 RDD 中各元素出现的次数rdd.countByValue(){(1, 1), (2, 1), (3, 2)}
take(n)从 RDD 中返回 n 个元素(任意位置)rdd.take(2){2, 3}
top(n)从 RDD 中返回 n 个元素rdd.top(2){1, 2}
reduce(func)并行整合 RDD 中的所有数据rdd.reduce((x, y) => x + y)9
fold(zero)(func)与 reduce() 类似,但需要提供初始值。加法的默认是 0;乘法的默认是 1rdd.fold(1)((x, y) => x + y)10
aggregate()与 reduce() 类似,但通常返回不同类型的函数rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
(9, 4)
foreach(func)对 RDD 中的每个元素使用给定的函数rdd.foreach(func)

示例

以下两个示例的数据集与源代码均可以在下述链接中进行下载 https://github.com/jinggqu/BigDataTechnologyFoundation_SourceCodeAndDataSet/tree/main/ch08

一、分词

WordCount(单词统计程序)是大数据领域经典的例子,与 Hadoop 实现的 WordCount 程序相比,Spark 实现的版本要显得更加简洁。

从 MapReduce 到 Spark

在经典的计算框架 MapReduce 中,问题会被拆成两个主要阶段: map 阶段和 reduce 阶段。对单词计数来说,MapReduce 程序从 HDFS 中读取一行字符串。在 map 阶段,将字符串分割成单词,并生成 <word, 1> 这样的键值对;在 reduce 阶段,将单词对应的计数值(初始为 1)全部累加起来,最后得到单词的总出现次数。

在 Spark 中,并没有 map/reduce 这样的划分,而是以 RDD 的转换来呈现程序的逻辑。首先,Spark 程序将从 HDFS 中按行读取的文本作为初始 RDD(即集合的每一个元素都是一行字符串);然后,通过 flatMap 操作将每一行字符串分割成单词,并收集起来作为新的单词 RDD;接着,使用 map 操作将每一个单词映射成 <word, 1>这样的键值对,转换成新的键值对 RDD;最后,通过 reduceByKey 操作将相同单词的计数值累加起来,得到单词的总出现次数。

Java 实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public class SparkDemo {
    private static final Pattern kSpace = Pattern.compile(" ");

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("WordCount");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> lines = sc.textFile(args[0]).rdd().toJavaRDD();
        JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(kSpace.split(s)).iterator());
        JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
        JavaPairRDD<String, Integer> counts = ones.reduceByKey(Integer::sum);
        List<Tuple2<String, Integer>> output = counts.collect();

        for (Tuple2<String, Integer> tuple : output) {
            System.out.println(tuple._1() + " : " + tuple._2());
        }
        sc.close();
    }
}

运行过程分析

  1. 初始化 创建配置文件 SparkConf,这里仅设置应用名称;再创建 JavaSparkContext,在程序中主要通过 JavaSparkContext 来访问 Spark 集群;
  2. 处理数据
    1. 根据参数使用 Spark.read().textFile() 方法按行读取输入文件,并转换成 RDD lines;
    2. 使用 flatMap 操作将所有行按空格分割切割成词,并生成新的 RDD words;
    3. 使用 map 操作( Java 中为 mapToPair ),将词映射成 <word, 1>键值对 RDD ones,其中 1 表示出现一次;
    4. 使用 reduceByKey 操作将所有相同的 word 对应的计数累加起来,得到新的 RDD counts;
    5. 使用 collect 操作将所有结果打印出来;
  3. 关闭 JavaSparkContext。

执行

将上述代码生成 Jar 包之后,将其放到服务器中,执行下面的命令即可开始运行。

1
./bin/spark-submit --class SparkDemo ~/Documents/SparkDemo.jar ~/Documents/sample.txt

其中

  • –class SparkDemo 用来指定主类名
  • ~/Documents/SparkDemo.jar 指定 Jar 包路径
  • ~/Documents/sample.txt 指定测试文本路径

sample.txt 文本内容如下所示

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Your want text it even a text notes having wrong even about fake want or not even but. Language way contentwise just language contentwise recipes set start are. Recipes a words than with meeting days ?looks? even than is name story more story words generator anything gone. Having story but fairly random some adequate want it set has a kind looking having. Fantasy anything you looks just copy work text random sets even fake having. Piece some recipes repetitive adequate wrong wrong way options to repetitive working some dummy repetitive copy realistic you fake. Work or just fairly with is unrelated having language about set forever not game repetitive adequate now you looks it of even dummy now. That but design language unrelated copy you text placeholder has review those of with fake. Random to want the has gardening which business some realistic that and just work. Gardening you realistic kind and name looks about name words words way which some name.

The that copy story realistic the adequate text meeting options game gone piece has options has name random. Days wrong set realistic design repetitive adequate review text your or but having start about right are story fairly fairly but to language sets adequate. But work want sets right kind some having contentwise fairly convincing language notes right name from but want realistic unrelated words. From about generator not looks or fairly copy has more. Forever from gone which or that having a with some having the work wrong generator design a fantasy way convincing. Working in dummy not now happily but to it of happily story want those kind looking right words business it generator language are. A you anything sets fake sets kind notes meeting having has in copy realistic is you. Copy or fairly set story.

Wrong and days not work of want piece options unrelated way random just just recipes. Your recipes gardening start fairly. Happily start game days want from in set meeting that forever random. Support has wrong than your language are random business a even design has. Way design dummy unrelated set generator game convincing. Text contentwise copy to of set kind notes a you ?looks? gone work. Way forever result you to not. Your your meeting generator way placeholder looking than has want in repetitive more kind start has you but language a. A than notes name story and a just days some with in options looking just not are want looks. Kind some from review even.

Some start random meeting recipes is a a ?looks? unrelated more the about but are dummy. Words review fake now kind of you meeting it design your. To just a about to. Not realistic name from with fake is. Work even business options fake wrong result notes want the more has dummy a notes random. Gone right repetitive fairly want now it want days review. Has notes want random name that random fantasy not unrelated in is dummy work work random game design now. Business result a and piece from working. Your some recipes copy sets are has kind story support fantasy has and some fantasy a which anything are the.

Language piece that kind copy right anything dummy a of copy which fantasy placeholder which the work are convincing random. Your gone way copy you copy are that game but looking gardening result is start text the words the a anything. Want piece set set fantasy generator sets a more are happily or ?looks? just the and sets not anything. Support to just start game work looks copy that in of but words placeholder support now fairly fake even now. Text adequate words not fairly looks from game that result name realistic or you fake working want. Kind you some looking of review has sets than want the way working has. Of fantasy gardening and kind just game those adequate your from or text are you story working happily. Business set way gardening more dummy want are you business ?looks? work to placeholder are design options sets having. Working from options work right not meeting story it is of which way fake meeting. Adequate story than words want the anything.

Language some gone random or just fairly gone which adequate sets having and adequate or text random from review. From unrelated those a start the ?looks? game business. With copy and which set kind game contentwise which anything the set story notes about or forever. Way anything work ?looks? a contentwise adequate and meeting. Options which realistic words it of to right game random way random your those and those anything some you notes gone gardening dummy than fake. But language just a your work with that set the. Are dummy business story not gardening start wrong fantasy fake and words having text which recipes your ?looks? wrong or. Generator fake than set looking text now forever more design ?looks? text but than has than wrong.

Way than fake gardening those a now it language but piece. A is even looks just result that which realistic gone are working right fake some. Which language wrong having with that looks.

执行结果如下所示

right : 9
Fantasy : 1
review : 5
convincing : 2
is : 8
Business : 2
even : 1
Are : 1
even : 10
start : 10
// 此处省略数行

二、统计用户的视频上传数

场景分析

接下来使用 Spark 来统计 Youtube 的测试数据集中每个用户的视频上传数量。稍加分析,会发现统计每个用户的视频数量其实与 WordCount 中统计每个单词出现的次数的逻辑几乎一致,区别在于处理 Youtube 测试数据集的格式略为复杂些。将给定的数据集按行划分,每行代表一条记录,除了视频类别这一字段中间有可能出现空格之外,其他的字段都是用空格分割。可以考虑使用正则表达式来匹配记录,并提取所需要的信息。

在测试数据集中,假定每行所代表的视频都是唯一的,所以仅仅需要用户 ID 这一条信息。在提取到用户 ID 之后,可以像 WordCount 一样,组成 <ID, 1> 这样的用来计数的键值对,这步之后的逻辑便与 WordCount 相似了。

Java 实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class SparkDemo {
    private static final Pattern EXTRACT = Pattern.compile("(\\S+)\\s+(\\S+)\\s+(\\d+)\\s+(\\D+[a-zA-Z])\\s+(\\d+)\\s+(\\d+)\\s+(\\d+\\.?\\d*)\\s+(\\d+)\\s+(\\d+)\\s+(.*)");

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("CountUploader");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> lines = sc.textFile(args[0]);
        JavaRDD<String> filtered = lines.filter(s -> EXTRACT.matcher(s).matches());
        JavaPairRDD<String, String> records = filtered.mapToPair(s -> {
            Matcher m = EXTRACT.matcher(s);
            boolean result = m.matches();
            return new Tuple2<>(m.group(2), m.group(1));
        });
        JavaPairRDD<String, List<String>> groups = records.groupByKey().mapToPair(t -> {
            List<String> list = new ArrayList<>();
            t._2().forEach(list::add);
            return new Tuple2<>(t._1(), list);
        });
        // 手动实现 sortBy 操作
        JavaRDD<Tuple2<String, List<String>>> tops = groups.keyBy(t -> t._2().size()).sortByKey(false).values();
        List<Tuple2<String, List<String>>> topList = tops.take(100);

        for (Tuple2<String, List<String>> t : topList) {
            System.out.println("User: " + t._1() + ", Number of videos: " + t._2().size());
        }
        sc.stop();
    }
}

执行

将上述代码生成 Jar 包之后,将其放到服务器中,执行下面的命令即可开始运行。

1
./bin/spark-submit --class SparkDemo ~/Documents/SparkDemo.jar ~/Documents/YoutubeDataSets.txt

执行结果如下所示

User: machinima, Number of videos: 21
User: hotforwords, Number of videos: 19
User: theevang1, Number of videos: 19
User: kushtv, Number of videos: 19
User: supermac18, Number of videos: 18
User: NBA, Number of videos: 18
User: somedia, Number of videos: 17
User: tokiohotelchannel, Number of videos: 17
User: AtheneWins, Number of videos: 16
User: davidisbetterthenyou, Number of videos: 16
// 此处省略数行

参考文章

  1. RDD Operations
  2. Spark 函数详解系列之 RDD 基本转换
  3. Spark 教程之 RDD 操作-转换和执行(示例)
  4. Spark 笔记-玩转 RDD 操作
  5. RDD Aggregate in spark
  6. 利用开发工具 IntelliJ IDEA 编写 Spark 应用程序(Scala+Maven)
Licensed under CC BY-NC-SA 4.0
Built with Hugo
Theme Stack designed by Jimmy