Spark

6.824 2020 Lecture 15: Spark

Resilient Distributed Datasets(弹性分布式数据集): A Fault-Tolerant Abstraction for In-Memory Cluster Computing Zaharia et al., NSDI 2012

为什么要研究Spark?

  • 广泛用于数据中心的计算
  • 生成MapReduce任务进数据流
  • 比MapReduce更好的支持迭代的应用
  • 成功的研究:ACM的博士论文奖

三个主要的话题

  • 编程模型
  • 执行策略
  • 容错

先来看page-rank 来自Spark源码库的SparkPageRank.scala,代码Sec3.2.2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
     1      val lines = spark.read.textFile("in").rdd
     2      val links1 = lines.map{ s =>
     3        val parts = s.split("\s+")
     4        (parts(0), parts(1))
     5      }
     6      val links2 = links1.distinct()
     7      val links3 = links2.groupByKey()
     8      val links4 = links3.cache()
     9      var ranks = links4.mapValues(v => 1.0)
    10  
    11      for (i <- 1 to 10) {
    12        val jj = links4.join(ranks)
    13        val contribs = jj.values.flatMap{
    14          case (urls, rank) =>
    15            urls.map(url => (url, rank / urls.size))
    16        }
    17        ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
    18      }
    19  
    20      val output = ranks.collect()
    21      output.foreach(tup => println(s"${tup._1} has rank:  ${tup._2} ."))

page-rank的输入每个link一行,来自网页爬虫 from-url to-url 输出是巨大的!

page-rank的输出时每个页面的”importance”

  • 是基于其他”importance”的网页是否指向这个网页
  • 真实的模型计算了人们可能访问网页的概率
  • 用户模型:
    • 从当前页面,有85%的可能点击一个具体的链接
    • 15%的可能随机访问一个页面

page-rank算法

  • 迭代的,本质是模拟用户多轮的点击链接
  • 权重(概率)逐渐收敛
  • page-rank算法,使用MapReduce实现则不方便、慢

比如输入文件”in”:

1
2
3
4
5
  u1 u3
  u1 u1
  u2 u3
  u2 u2
  u3 u1

我们在Spark中运行page-rank算法(本地机器,非集群):

1
2
3
4
  ./bin/run-example SparkPageRank in 10
  u2 has rank:  0.2610116705534049 .
  u3 has rank:  0.9999999999999998 .
  u1 has rank:  1.7389883294465944 .

通过看结果,u1是最重要的(important)页面

在Scala交互行中,运行page-rank的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
    ./bin/spark-shell

    val lines = spark.read.textFile("in").rdd
      -- what is lines? does it contain the content of file "in"?
    lines.collect()
      -- lines yields a list of strings, one per line of input
      -- if we run lines.collect() again, it re-reads file "in"
    val links1 = lines.map{ s => val parts = s.split("\s+"); (parts(0), parts(1)) }
    links1.collect()
      -- map, split, tuple -- acts on each line in turn
      -- parses each string "x y" into tuple ( "x", "y" )
    val links2 = links1.distinct()
      -- distinct() sorts or hashes to bring duplicates together
    val links3 = links2.groupByKey()
      -- groupByKey() sorts or hashes to bring instances of each key together
    val links4 = links3.cache()
      -- cache() == persist in memory
    var ranks = links4.mapValues(v => 1.0)

    -- now for first loop iteration
    val jj = links4.join(ranks)
      -- the join brings each page's link list and current rank together
    val contribs = jj.values.flatMap{ case (urls, rank) => urls.map(url => (url, rank / urls.size)) }
      -- for each link, the "from" page's rank divided by number of its links
    ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
      -- sum up the links that lead to each page

    -- second loop iteration
    val jj2 = links4.join(ranks)
      -- join() brings together equal keys; must sort or hash
    val contribs2 = jj2.values.flatMap{ case (urls, rank) => urls.map(url => (url, rank / urls.size)) }
    ranks = contribs2.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
      -- reduceByKey() brings together equal keys

    -- the loop &c just creates a lineage graph.
    -- it does not do any real work.

    val output = ranks.collect()
      -- collect() is an action.
      -- it causes the whole computation to execute!
    output.foreach(tup => println(s"${tup._1} has rank:  ${tup._2} ."))

到最后一个collect(), 代码只是创建了一个线状图,还没开始处理数据

线状图长什么样?

线状图

这就是描述转换步骤的图——数据量图。完整的计算步骤。 注意,虽然代码有循环,但是图里是没有循环的。 每次迭代都是新的ranks/contribs

对于多不的计算,编程模型比MapReduce更加方便

scala代码运行在”驱动”(driver)机器上

spark运行时

  • “驱动”构建了线状图
  • “驱动”编译java字节码,发生给worker机器
  • “驱动”管理执行过程和数据移动

执行过程是怎样的?

[diagram: driver, partitioned input file, workers]

  • 从HDFS(类似GFS)获取输入
  • 输入数据文件已经被划分到了多个存储服务器,第一块有钱100w行,之类的
  • 分区比机器数多,为了负载均衡
  • 每个worker机器拿走一个分区,顺序地应用线性图
  • 当计算的各分区是独立的:1. 读取后不需要机器间通信,2. worker对输入流应用一连串的变换

为什么这比MapReduce更有效?

数据直接从一个变化到下一个变换,MapReduce需要多重的map+reduce,特别是耗时的GFS的存储和再读取

那distinct()? groupByKey()? join()? reduceByKey()? 这些操作需要所有分区的数据,而非一个分区,因为所有的记录必须全部一起考虑,这就是论文中说的”wide”(宽)依赖(相对于”narrow”(窄))

宽依赖要如何实现?

[diagram]

  1. 很想MapReduce的中间输出
  2. driver知道宽依赖在哪个环节,比如page-rank的map()和distinct()z之间
  3. 数据到一个新分区前要混合,比如把所有指定的key都放到一起
  4. 上一个变换以后:
    1. 通过混合规则分割输出(特别是一些key)
    2. 放进内存的buckets中,每个下游分区对应一个
  5. 在进行下一个的变换前 (等待是一部完成——由driver管理)
    1. 每个worker从自己的bucket,抓取上一个worker数据
    2. 现在数据已经通过不同的key进行分区的

宽依赖的成本很高!因为所有数据通过网络传输,还有阻塞(等待上一步全部完成)

要是数据重复使用? 比如links4? 默认的,可以重新计算,比如重新从输入文件读取。 persist() and cache()可以将links的数据存在内存中以便重复使用

重复使用内存中persist的数据也是相对于MapReduce的一大优点

spark可以基于全部线状图做优化

  • 记录流,一个时间一个记录,通过窄变换
    • 增加了局部性,对CPU数据缓存有利
    • 避免了把分区的数据数据一次性加载进内存
  • 注意一些操作是没必要的,因为之前的数据已经以同样的方式做了分区,比如links4.join(ranks)

容错怎么样? 要是一台机器挂了?

  • 它的内存和计算过程会丢失
  • 驱动会重新把挂的机器执行的分区交给其他机器做变换
    • 通常每个分区都负责多个分区,所以负载可以分散,因此从新计算是非常快的
  • 对于窄依赖,只要把丢失的分区重新执行即可

要是执行宽依赖时失败了?

  • 重新计算失败的分区,需要所有的分区
  • 所以所有的分区要重头开始执行
  • spark支持对HDFS的checkpoints,driver只要从上一个checkpoint开始计算
  • 对于page-rank,可能每10次迭代做一次checkpoint

有什么局限性?

  • 为批量数据设计的批处理
  • 所以的记录都是同一种处理方式
  • 变换是函数式的,即输入到输出:不能原地修改数据

总结:

  • spark提升MapReduce的表达能力和性能
  • 给出了完整数据量视角的框架,非常有用:性能优化、故障恢复
  • 性能的关键是什么:1. 变换的数据留着了内存,而不是写入GFS然后在读;2. 重复利用内存中的数据
  • spark非常成功,广泛应用