0%

并行与分布式系统

Overview

考试周结束,放松(摸鱼)了一个星期,带着负罪感做点正事…首先是两门高度相似的专业课出分了,一门浑水摸鱼的课(Computer System Design and Implementation)拿了A,另外一门复习得很认真的课(Advanced Distributed System)却拿了个B…嗯对,这篇文章就是对这门拿B的课的一个小结。

虽然很无奈,但是这也许就是有心栽花花不开吧~

Contents

并行(parallel)与分布式(distributed)的区别?当我们一定要对此做区分的时候,一般从两个角度来看:

  1. 数据通讯方式:并行是shared memory,分布式需要通过网络
  2. 时间:并行是shared memory,分布式需要通过网络
  3. failure 模型:并行是all-or-nothing的,分布式存在partial failure

从以下这些角度去看待分布式系统

  • consistency in distributed system
  • crash recovery & logging
  • concurrency control
    • two phase lock & snapshot isolation
  • consensus
    • two phase commit
    • paxos
  • distributed file system
    • NFS
    • GFS
    • Chubby ?
  • data-parallel programming
    • MapReduce
    • dryad
  • graph computing
    • Pregel & GraphLab
    • PowerLyra & BiGraph
    • GraphChi
  • multicore & NUMA: phoenix & TMR
  • Improving in-memory Computing with New Hardware Features

Consistency in Distributed System

consistency是一个被滥用的词,在database和distributed system 中都有设计,但是可能是完全不同的概念,一般有这些含义:consensus、consistent model、consistent database、serializable、cache coherence

consistency in distributed system指的是其中的consistent model。一个分布式系统往往会有很多进程,以分布式数据库为例,在时刻$t$,client A把X由3更新到4,之后在时刻$t^`$,client A和client B分别询问两个不同的replica关于X的值,可能得到两个不同的答案(3 or 4),困惑就产生了。我们的初衷是不管这个分布式数据库内部是如何实现的,但是对外提供了一个假象:X只有单一的副本,这就是一个单机的数据库!

Consistency determines what responses are acceptable following an update of a replica.

Which shared memory behaviors are allowed—so that programmers know what to expect and implementors know the limits to what they can provide.

没有对的或者错的consistency model,它是ease of programmability 和 efficiency之间的tradeoff。在分布式系统中的难点在于

  1. data replication(caching)
  2. concurrency(no shared clock)
  3. failure(machine or network)

Strict Consistency

它是最严格的模型,所有的replica都在一瞬间、全部更新X完成。当然这在现实中是不可能的。当某个数据项(X)发生变化后,不同的 client (client A 和 B)在紧随其后的任意时刻,看到的 X 都是一样的、都是新值。

If one of the processes executes x ≔ 5 at real time t and this is the latest write operation, then at a real time t′ > t, every process trying to read x will receive the value 5.

我们定义分布式系统中的一个操作排列叫history,如果没有overlap则是sequential history,否则是concurrent history

Linearizability

相比strict consistency弱一些,不要求write是瞬间完成的,比如数据库使用同步赋值,write需要一些时间才能完成,但是对于系统的要求依然如strict consistency,和其的区别在于:write 需要时间,但 X 值的改变在时长内的某个瞬间发生。

最简洁的定义为:对于history$H_1$,如果存在一个sequential history$H_2$与$H_1$等价,则称$H_1$满足linearizability

Sequential Consistency

在linearizability的基础上再放松一点,everyone sees same read/write order. strict consistency是不现实的,因为没有global wall-clock,所以使用了total order of ops来替代。注意!sequential consistency是strict的一个特例。即用timestamp来表示total order

consistency rules:

  1. each CPUS’ ops appears in order
  2. All CPUs see results according to total order(ie. reads see most recent writes)

Release Consistency

其实就是lock consistency。

为什么需要它呢?因为sequential consistency有很多问题:

  1. false sharing. 两个变量在同一个page,两个Node分别RW各自的变量->pingpong. 最早出现在cache中,只是粒度是cache line,不过本质是一样的
  2. 解决方法:编译时把两个变量放在不同的page,但是会导致占用的内存和传输大小变大
  3. 真正的解决方法:twin+diff,记录page的增量修改。 sending a diff is cheaper.

update protocol VS invalidate protocol:

modification/invalidation of modification, Invalidations are smaller than the updates, In invalidate-base protocol, there can be significant overhead due to access misses

Eventual Consistency

sequential:pessimistic conflict handling

  • conflicting writes is common case
  • Updates cannot take effect unless they are serialized first

eventual:optimistic conflict handling

  • Conflicting writes is rare case
  • Let updates happen, worry about whether they can be serialized later

为了更好的performance!之前的缺点是性能低下,需要全局同步,从可用性来讲,节点中只要有人断开或partition就会一直block,于是eventual consistency支持disconnected操作,存在可能的应用程序异常行为(陈旧读,冲突写)。

解决方案是 Update Function:

Have update be a function, not a new value. Read current state of storage, decide best change. Function must be deterministic, Otherwise nodes will get different answers.

如何让每个节点的 update order 达成一致呢?不能用一个全局顺序,否则回退到 sequential consistency,发送操作时携带 update ID = time + node ID, 把本机更新该执行操作的时候的时间记录下来,然每个机器的时间会不一样,仅用这个时间来确定 所有(本机和其他)执行的 update order 是可行的。称为 Rollback & Replay.

Challenge of Clock Time: 物理时钟和逻辑时钟(Lamport时钟)

简单来讲就是同步状态时可能破坏因果依赖。某台机器的 timestamp 很小 = 7,在看到其他大 timestamp = 10 的 add 操作 后执行了删除,会导致同步操作是 先 delete 后add,在发操作进行 sync 同步的时候,同时去修改本地时间 max(自己的本地时间,看到的其他机器执行时间 + 1)

logical clock: T = Max(T, T’+1)

De-centralized Centralized
连续 ID 加入 CSN,安排一个节点比其他节点多一个操作,添加 CSN 称为 final time,再发给别人
但是要是 前面 ID 的一直没来就会卡着,出现网络 failure 就凉了 在比较时,有 CSN 的 操作优先,所有人同步到有 CSN 的机器视角
不能保证所有节点的最终状态一致,不能把慢的消息去掉或者赶到前面去 系统性能会不会卡在 CSN 这呢?还行,因为系统正常执行是高效的,状态同步可以认为是 offline 的,影响最终确定状态的性能

Casual Consistency

一篇论文:COPS: Scalable Causal Consistency for Wide-Area Storage

不是提出了 casual consistency,而是关注数据量大,分布广下的场景,关注 casual consistency 的可扩展性。

一台机器不能保存所有数据了,不同机器间对操作的 log 如何确定顺序,不同机器间的数据可能存在依赖关系,如何维护?

如何维护:

  • Bayou系统里在最终状态更新的时候,使用 log 来 replay 其他机器的操作,最后会把操作线性执行,影响 scalability
  • COPS 希望提高并行性,显式记录每个操作的因果依赖,无依赖的操作无需线性
  • dependency 会成为瓶颈吗?
    • 会越来越多
    • 不会,dependency 是树的形式,可以剪枝,实际上仅依赖有一条
    • 这个依赖满足,这个之前的一定也满足

Crash Recovery & Logging

为什么要crash recovery?

crash recovery 指的是数据库事务非正常退出(kill -9 pid),这回导致磁盘上的page状态与redo log的内容不一致,因为buffer pool中的page是异步刷入磁盘的,这就需要我们在机器crash recovery时将两者恢复到一致的状态。All or Nothing

除了要将磁盘页面恢复到一个一致状态以外,还需要考虑到退出时的活跃事务处理:哪些事务需要提交,哪些事务需要回滚等等,这些也属于crash recovery的工作职责。

如何crash recovery?

redo log & undo log。redo log指的是将磁盘上的数据页面恢复至最新状态,而回访的起始位点就是实例推出前记录的checkpoint lsn。因为checkpoint lsn保证这之前的redo log对应的page更改一定被持久化了。

Solution: Logging

logging会记三种。do、undo、redo,在do的时候记录log,用记录的log可以回到old state。

redo log:用于重做已经commit但是还在buffer里即没有刷入磁盘时,就已经crash,导致recovery时数据不一致,所以需要重做一遍

undo log:用于撤销没有commit,但是已经从buffer中被刷入磁盘的数据,因为crash意味着该事务abort了,就不应该持久化,所以需要undo

如何做checkpoint?

找一个合理的时间点,将之前的 log 全部落盘,把已经 commit txn 的 log 去除,写入ongoing 的 txn 及指向其最近的操作记录指针,atomically 更新 checkpoint root

如何从checkpoint中recovery?

UNDO-REDO logging

  1. 读 cp,获取 ongoing 的 txn
  2. 读 log,判断 txn 是否 commit,已经 commit 称为 winner,反之称为 loser
  3. UNDO loser 以去除 cp 之前的 没有 commit txn 的已执行操作
  4. REDO winner 以恢复cp 之后的已经 commit 的 txn 的未执行操作
  5. 那么 cp 没记录且没有 commit 的 txn 呢?不用管,cp 中没有这个 txn,且没有 commit

REDO-ONLY logging

  1. 在做 cp 的时候,仅仅把已经 commit 的 txn 落盘,而对于 uncommitted 的 txn 不能写磁盘(因为不能 UNDO),但是要在 cp 中记录
  2. recovery 的时候对于所有 uncommitted 的 txn 都要 REDO
  3. 这种情况下,CMT 标记可以 delay 写,cp 的状态(磁盘的状态)都是 legal 的,而 REDO-UNDO中 磁盘可能出现 illegal 状态,对于 short txn 较好,而对于 long txn 可能需要 REDO 很长一段已经做了的操作

UNDO-ONLY logging

logging 规则,在写 CMT log 前要把前面的操作都写入操作,因为可以 UNDO,但是不能 REDO,知道这个 txn 的前面的操作都写入了才能写 CMT log

Concurrency Control

为什么需要concurrency control?

在下层(middle ware,数据库,分布式系统)处解决并发控制问题,能为上层应用提供更好的支持即通用性,但是也更难,获取的信息更少,需要支持的更多。在上层做,应用程序需要负责保护数据隔离,好做,但是难移植,但是对于特殊场景下的问题,在上层做可能较为容易,如对于 Phantom 问题,上层较容易发现,可以有自己的约定来解决。

下层并发控制希望并发事务 能够 等价到某种串行执行顺序,as if serially,对应的串行执行顺序可以是$ T_1 $先也可以是$T_2$ 先,但是不能四不像。

事务的ACID

  • Atomicity,在 fail时,all-or-nothing
  • Consistency,维护状态不变量invariant
  • Isolation,并发执行不冲突(interfere)
  • Durability,survive failure

serializability

理想语义状态,一系列事务的执行可以等价到 serial 顺序

conflict serializability: 两个schedule是 equivalent 等价的,即这两个调度包含同样的 op,且对于冲突op安排了相同的顺序order。

冲突op意味着:两个op访问同一个数据,至少有一个是write。

如果某一个 schedule 可以等价到某个 serial schedule,那么该调度就是 serializable

如何保证 serializable schedule 呢?

  1. global lock:正确,但太慢

  2. 在 R/W X 前拿到 X 对应的 锁,完成R/W后放开,即 short-duration lock

    错误,并没有指定并发事务对于操作的顺序;若有 write 操作,则能读取到 uncommitted value

  3. 在 W X前拿X的锁,直到事务完成才放开,即 long-duration locks on WRITE

    解决 Dirty Read 的问题;但是出现Non-Repeatable read 问题,重复读可能会有不同的值,显然在 串行执行中不会出现;改进方法是,对于 read 也用 long-duration lock

  4. two phase lock

    1. growing 阶段只拿锁,shrinking 阶段只能放锁

      谁拿到第一把冲突锁确定了 并发执行所等价的串行执行 的顺序

    2. 2PL 解决串行化调度的问题,但是会有 死锁

      1. Avoidance,将锁排序,按锁的顺序拿锁,而不是按访问的顺序拿锁

        但是并不是总能知道要拿哪些锁?

      2. Detection,探测死锁 cycle,abort 相关事务

        探测可以是主动的(建立事务依赖图),也可以是被动的(timeout到了就abort)abort 的目标选择,abort 哪个好

隔离级别

  • read uncommitted

    • 允许读到uncommitted的值,但是不能写uncommitted value
    • 读的时候不拿锁,写不变
  • read committed

    • 读的时候拿锁,但是读完就放掉
    • RU 和 RC 规定拿锁的时段
    • unrepeatable read
      • 读的数据的的内容在一个transaction中前后不一致
  • repeatable read

    • 读的时候拿锁,直到事务结束放掉,但是对于 index 锁(range锁)是读完就放掉

    • RR 规定放锁的范围

    • phantom

      • 读的数据的的数量在一个transaction中前后不一致

      • 如果有 insert 新数据的操作,会出现 Phantom。因为原有的 lock 只对于本来就有的数据。

        解决方法:谓词锁,predicate locking;对于 B树索引,加range lock;在数据层忽略

  • serializable

    • 每种读都拿锁,直到事务结束放掉

Multi Version Concurrency Control

serializable太慢了,怎么办呢?就这么做,容忍可以读到old value

  1. 每个 data item 有多个版本
  2. 在执行过程中缓存write的op
  3. read时选择合适的版本
  4. 在commit时,系统需要validate现在的状态是否适合使该事务中的W让别人可见

Snapshot Isolation

这是一种弱于serializability,但是强于repeatable read的一种隔离级别。现在MySQL中虽然说的是repeatable read,但是实际上是snapshot isolation!!!(坑爹)

rules:

  1. 数据有 snapshot(旧数据),Read 从快照中读。
  2. Write 进行缓存
  3. 在 Commit 时检查 Write-Write 冲突,若有则 abort

可以发现效率应该较高,只检查 W-W 冲突,检查范围较小,其他事务读取过某数据难以记录及检查。不检查 R-W 冲突,所以仍旧会存在 anomaly (盲写),但是较少

snapshot isolation不等于serializability

会有write skew的问题!$T_1 $读X写Y,$T_2$读Y写X,不可以等价到串行化结果,但是在SI下可以提交。

解决方法:在协议上,将SI退化,把 Read 也加入到多版本控制集合,在 RW冲突下也 abort;或者在上层应用上解决,上层发现该问题并处理。

可是什么是serializable snapshot isolation?

make snapshot isolation serializable!

Consensus

two phase commit

从单个单机操作,到今天是数据在多机上,同时有多个操作。Transaction coordinator 不存储数据,负责分发client 的操作请求,中转 storage server 的数据。

one phase commit会有问题!是一个分布式的事务提交,有 node 可能 crash,有 node 可能不满足事务所需的数据要求,例如转账钱不够

two phase:

  1. voting
  2. committing

参与者需要 log 自己的 vote,发生 crash 也要记住。the coordinator需要log自己的 decision,crash 要记住,发生网络延迟可能有参与者没收到会重新要求 the coordinator 发一遍。事务是commit 还是 abort 在 commit 阶段 TC 决定 outcome 后就确定了,即使有 参与者 在 commit 前 crash,recovery 后也要记住。

timeout

  • participant timeout

    只会在第一轮到第二轮之间,参与者vote 但是没有收到 decision。如果参与者 vote NO,参与者可以 abort;如果参与者 vote YES,由于一个参与者的 YES 最后结果可能仍旧不是 Commit,参与者只能等,指需要的资源会一直被 lock

  • the coordinator timeout

    在第一轮中出现,TC 收不到参与者的 vote。如果有人 vote no,可以直接决定 abort;如果迟迟收不到全 YES,也可以直接 abort,虽然有可能最后收到了全部vote 都是 YES。最终决定在 the coordinator,但是为了 性能,这个 timeout 应该长一点。

termination协议

  1. 参与者$N_1$投了 YES,但是Timeout 收不到 TC 的决定
  2. 可以询问其他 node $N_2$,先问有没有决定,再问 $N_2$ 的 vote
    1. 如果 $N_2$ 已经收到过 决定了,那就是$N_1$和 the coordinator 的网络有问题,用$N_2$的决定即可
    2. 如果 $N_2$ 还没投,看实现,可能是 the coordinator 发的东西 $N_2$ 没收到或者忘了回
      1. 看实现,可以让$N_2$ 直接投个 NO,$N_2$要是一定要投 YES 就继续问别人
    3. 如果 $N_2$ 投了 NO,$N_1$ 则可以 abort
    4. 如果 $N_2$ 投了 YES,继续问别人
  3. 如果参与者互相询问,发现所有参与者都是 YES,但都没有收到决定
    1. 仍旧不能COMMIT
    2. 即使都是YES,the coordinator可能也做出 Abort 决定,如the coordinator fail,the coordinator网络 partition 导致 Timeout

reboot 恢复

  • 参与者 和 TC 都要记住自己的选择
  • 当然有很多 tradeoff 和实现细节
    • the coordinator 可以只 log COMMIT,参与者可以只 log YES
    • 参与者 reboot 的时候发现 YES 可以用 Termination 协议来决定行为

two phase lock与 two phase commit的关系

  • two phase lock 是保证参与者在执行 transaction 时,访问对象的时候不发生事务间的冲突
  • two phase commit 是用于 transaction 最后的提交阶段能在分布式场景下完成
    • 事务提交是 two phase commit 的一个应用

Why need consensus?

关注two phase commit的 fault tolerance阶段。因为two phase commit在有机器挂掉的情况下整个系统都无法前进,需要等待机器重启(网络partition),需要很长时间,需要记录状态。

因此我们需要在有机器 fault 的情况下仍然能够达成一致(consensus)!之前都没有考虑 crash 的情况,需要对 coordinator、接下来进行的操作达成一致。

因此诞生了paxos协议。容错的基础是 replication,replication 的基础是各个作为容错的机器在提供正常服务的时候状态等价,在two phase commit且有 coordinator crash 的情况下有两个issue:

  1. 其他存活的机器要能发现,且选举新的 coordinator
  2. 当crash的机器reboot后,需要加入容错集群,并跟上状态

曾经的做法是RSM:

  • primary backup 以保证其他机器状态与 primary 一致(没有考虑有机器挂了的情况)
  • failure handling 存在很多挑战
    • detect,自己网断了还是primary网断了,对谁failure达成一致
    • 如何竞争成为新的唯一的primary,对谁成为新的primary达成一致
    • 成员人数不满的情况下对sequential operation不管新旧达成一致
  • 达成一致需要 consensus 协议
    • 需要最少三轮消息传递,开销很大,仅在出错导致状态改变需要使状态重回一致时使用

Consensus: Paxos

所有人对同一个值达成一致。前提要求:

  1. 2P+1节点中最多能挂P个节点
  2. 传递的消息可以丢失、乱序或者重复,但是不会被篡改
  3. 节点不会成叛徒,说假话
  4. 达成一致的 value 一定是由某个 proposer 提出的

分布式系统是一个异步系统

  • 消息什么时候有返回是未知的
    • 故不能判断消息是丢了,还是还在发送中
    • 所以不能判断节点是挂了还是活着(但是不能连接)

live lock in paxos

paxos 实际上不保证 termination,极端情况有live lock

  • A 用编号 N 发了 prepare 获得 majority 的promise,在 A 发 accept request前
  • B 用 N + 1 发了 prepare 获得 majority 的promise成为了 新的leader,那么 A 的 accept request 会被拒绝,然而在 B发 accept request前
  • A 用编号 N +2 发了 prepare 获得 majority 的promise,那么在 B 的 accept request会被拒绝

paxos algorithm

  • phase 0: client 发送 request,对于 client 的每个请求,都会产生一个 paxos 实例
  • phase 1:
    • step a: leader proposer 的 prepare request 用编号N
    • step b: acceptor 的 prepare reply (加上 promise)
      • 编号N是收到过的最大编号
        • 返回这个 acceptor 曾经接受过的编号最大 proposal 及 value (如果有的话)
        • 给出 promise——忽略以后编号小于 N 的proposal
      • 见过比 N 大的编号,忽略
  • phase 2:
    • a:得到 enough promise,发送 accept request
      • accept request 包括
        • 发 prepare 用的编号 N
        • prepare reply 中存在的编号最大的value V(如果有的话),没有就自己寻一个V
    • b: accept reply to proposer / learner
      • 收到的 accept request 中的编号仍旧是当前的 promise
        • 记录 v
        • 发 accept OK 给 proposer 和 learner
      • 否则忽略
  • phase 3: learner 返回 client

understanding paxos

令$N_h$为见过的最大编号

令$N_a$为已经接受的value的最大编号

  1. 为什么要多 acceptor
    • 一个 acceptor 挂了
  2. 为什么不直接接受第一个 proposal,而要多 proposal
    • 发第一个proposal 的 leader 可能会挂
    • multiple leader 可能导致这个 proposal 不会被 majority 接受
    • 允许多 proposal 可以使得曾经挂了的节点重启后追上,获取到已经确定的结果
      • 挂了之后可能以较小的 编号发送 prepare,但是会被ignore
      • 然后会以较大的编号再发 prepare,acceptor 会把已经通过的确定的提案 value 返回
      • 这个重启节点就会再以这个 value 发 accept request,acceptor 会通过
      • 从而重启节点获得了结果,而最终 value 早就确定也不会改变
  3. 如果有多个活跃的leader会怎么样?
    • 可以有多个 proposer 都通过了 prepare 阶段
    • 但是最终只会有一个 value 通过 accept 阶段
  4. value 是什么时候最终被 chosen 的
    • majority 的 acceptor 在持有合法 promise 的情况下收到 accept request<N, V> ,这个 value 就确认 chosen 了
    • 即使后面还有其他 leader 去 prepare 也只会选择这个 chosen V 来继续
  5. 如果 acceptor在send promise之后 fail
    • 必须记住(持久化)$N_h$,见过的最大编号
    • 极端情况
      • 如果完成 prepare 阶段后,有 P 个 acceptor 见过编号 3,P+1个acceptor 见过编号 4
      • 按照正常情况,编号3的 accept request 不会收到 majority 返回(最多P个)
      • 如果很不幸 P+1 个见过编号4 的acceptor 挂了一个,然后重启的时候没记住 $N_h$,接受了编号3的accept,就错了
  6. 如果 acceptor 在接受 accept request 之后挂了
    • 必须持久化 $N_h$,$N_a$(已经接受的最大编号及相应的Va-value)
    • 同样极端情况,是 P 个 acceptor 接受了 编号 3,P+1 个接受了编号4
    • 正常情况,以后的所有 proposer 只要收到 majority 的 promise,同时会发现最大的已接受最大提案编号为 4 所对应的 value
    • 若很不幸 P+1 个接受编号 4 的 acceptor 中挂了一个,重启时没记住 Na,那么它返回的 promise 为空
      • 以后的 proposer 会发送一个编号更大,但是携带的value是 编号为3的那个提案所对应的
  7. 如果leader 在发送 accept 后挂了?
    • 重发 Mn(自己的编号为 n 的提案)
    • 也可能是 leader 连不上多数 acceptor 要求重发
  8. 当 leader proposer 挂了之后其他 proposer 如何发现这个问题
    • 这是一个实现上的细节问题,实际上其他 proposer 无法判断到底leader是挂了还是网络分区了
    • 按照 Paxos 的协议实际上 每个 proposer 都会去发 propose(=prepare)
    • 在实际实现中,可能只有 leader 去发,但是其他 proposer会在接到client的请求后设置timeout,超时就认为 leader 挂了,自己作为 leader 开始发 propose了
  9. 什么叫同时存在两个 active 的 leader?
    • 就是可能有多个 proposer 通过了 prepare 阶段,实际上只有一个可能通过第二阶段
    • 他们各自都认为自己的 proposal 通过了第一阶段,还没到第二阶段,故是active 的,等到进入第二阶段被 reject 就重来
    • 在真的实现中,acceptor 第二阶段发的 reject 消息可以携带现在最大 编号等信息
      • 实现中,acceptor 和 proposer 可以在同一个物理实体中,是可以进行通信的,proposer 可以通过同物理进程下的 acceptor 发现已经有 value 被 accept 了

Consensus: Raft

比 Paxos 易于理解,易于实现。都是follower,设置不同的随机 timeout 来成为 candidator 去发包,收到 majority 回包的 成功成为 leader,多个 candidator 就会看谁发得快,谁的包先到别的 follower 那,先到的先拿到票

To be continue…

Distributed File System

从这里开始进入特定的系统,而不是基础的概念和技术

remote file access

  • FTP Telnet
  • 希望transparent:NAS network attached storage
  • File Service Type:
    • upload,download都以文件为单位去操作,存在容量、贷款、一致性的问题
    • remote access:传输文件操作,性能问题、拥塞

文件共享的semantics

设计的选择从cache、场景、stateful/stateless的角度来看

  • stateful:保留client的信息
    1. 短请求
    2. 更好的性能
    3. 实现client端数据的刷新
    4. 实现lock
    5. 问题是:集群网络不好,client没有断开就掉了,会留下僵尸状态;client暴增的时候server会裂开
  • stateless:
    • 扩展性好
    • 没有状态信息的问题,无法lock,不知道什么时候能delete file,因为无法确定有没有client打开了它

NFS——network file system

强调transparency与易用性,每个人都是client也都是server,支持diskless工作站,恩地没有磁盘,并且是stateless的,长请求(据说现在已经转向stateful了…)

在使用过程中,向上提供open操作,实际使用lookup,模拟出fd,然而不保证真的有这个文件,read可能读不到,一般比本地读要慢。cache使用timestamp方式处理,总是在一定时间后invalidate。

存在的问题有:

  • 文件一致性
  • 没有reference count:所有的长亲贵都是独立的
  • 没有锁

GFS——Google file system

用廉价存储支持大规模可扩展的数据密集型应用。设计的场景是:文件很大,生命周期很长,大量的机器 failure 很常见;文件访问一般是append,顺序写入,随机写非常少,尤其是并发写。

因此,app和fs是协同设计的,app只能通过特定的client去访问,修改了文件系统接口,没有VFS,只有User-level API,增加了append操作,允许并发原子append

结构一般为N个chunk 和 1个 master。master存储metadata,以在一台机器的内存里。多节点replica,对于读很好,但是随机写的情况很少

开源实现HDFS,不能random写,只能append

Data-parallel Programming

MapReduce

并行编程,如何使用大规模普通机器去计算大数据。并行有两种方式:

  • 程序的并行:困难,往往步骤之间是有依赖关系的
  • 数据的并行:每个数据都在做相同操作,数据之间的操作没有依赖

并行计算分为集群计算(Cluster computing)和网格计算(Grid computing)

MapReduce架构

分为两层,大量应用。spark pytorch都是这样的。

  • 接口层:这是指编程模型,即使用编程模型的业务应用程序员
  • 平台/框架:实现接口,解决分布式问题,系统/分布式程序员

编程模型

  • 功能足够且接口简单,就实现两个接口,想法来自于函数式编程
  • Map接口:
    • 输入划分(partition)完的 chunk/shard 数据
    • 输出中间结果 (k,v) pair
    • Maper 还要 partition 中间文件。中间文件存在Mapper本地
  • Reduce接口:
    • Reducer先排序中间结果,把相同key聚集
    • 输入排序后聚集的数据 (k, set of v),一个 key 的所有 value
      • 所有相同的 key 要聚集在一起给 reduce,由框架的shuffle完成
      • 不同的reducer处理不同的key:hash(key) % R
    • 输出reduce结果:merge所有key的结果

局部性

网络开销是分布式一大问题。输入输出的文件在 GFS 上,希望把执行 map 的机器也放在这些机器上运行,map要用的输入文件就在本地。可以由 GFS Maaster 和 MapReduce Master 协调完成。

容错

  • worker failure
    • 如果真的挂了,策略是 re-execute。甚至换人做,重复做也可以,只要能记住输入对应的中间文件。partition 的时候跳过 损坏的 和 重复的
    • 如果 logging 会有很大开销
  • master failure
    • 放在 GFS Master 一起,主从备份
    • 需要备份状态
  • straggler
    • 这个任务做得很慢
    • backup 执行,相当于 re-execute,其他人都做完了在等可以来 backup 执行。第一个做完的 win,其他人就停
    • 这也是写在本地的原因,要是发过去,可能会很多人都给 reduce 发同一份数据

中间结果存在mapper还是reducer?

  • 存在mapper的好处
    • 可以提前做聚集,来减少网络传输开销
    • 如果 map failure 了,发送数据是否成功且完整会影响 reduce 执行,不如选择在 map 这保存之后再发送
    • 如果 reduce failure 了,可以重复去 mapper 读取数据,而不需要 map 重新做
  • 存在reducer(直接发过去)的好处
    • pipeline,reduce 可以再整个计算完成前提前提供中间结果

后续的优化

​ 如谷歌的网页搜索,希望做增量计算,而mapreduce不支持,所有有新的框架:caffeine

Dryad

微软强大的系统框架,但是不成功,不告诉你系统的细节和代码,还想收费,大大领先 mapreduce,但是少人知道。

与 Mapreduce 相似的目标,但是有区别。以 graph、DAG 的方式描述业务逻辑,表达依赖关系,而 mapreduce 不管你业务逻辑,只有 map - reduce 一种,灵活调度。表达能力比 MR 强很多,MR业务逻辑都要表达成 MR,而 这是 DAG Directed Acyclic Graph ,有向无环图,每个程序都能被表示成 dataflow graph

容错

也是重做,可以用 顶点 追溯执行,针对 DAG 图

优点

更灵活的 op,只是程序员需要掌握更多东西;根据DAG自动优化(编译器),比MR更先进

Tensorflow

  • 参数服务器
  • 将单个 op 作为 数据流图的节点
  • 同步异步训练
  • straggler backup

Graph Computing

图计算比原本的 Data Parallel 要复杂很多,数据之间有依赖关系,例如page rank。如何用framework来支持这样的计算呢?图计算!

图并行算法,特征是图结构描述数据依赖,一个顶点更新与一个范围(邻居)内的数据有关,迭代更新

从修改现有框架开始

page rank可以在MapReduce中实现,但是很麻烦,数据冗余度也会很大,同时mapreduce不支持迭代更新,每次mapreduce只更新一个迭代,需要多次运行mapreduce实例,在这个过程中每次中间数据都会被持久化,没有任何优化,所以需要有专门的图计算框架!

Pregel, Google 2010

使用BSP(bulk synchronous parallel),节点直接只有消息传递,即使是同一台机器。机器之间的message-passing是bulk化的,即batch。superstep=迭代次数,需要保证同步,所有节点都受到消息,批量传递,提高网络利用效率。

从节点的角度计算,接受上次superstep的消息,计算user-define计算function,发送消息到neighbor

GraphLab

因为Pregel中的BSP是个同步模型,每个superstep所有的节点都会更新,但是不同节点达到收敛所需的迭代次数不同,所有节点都要同步根棍也会更新这些节点,浪费!同时性能也会被运行最慢的节点限制,造成木桶效应。

因此GraphLab提出了异步模型,只有邻居变化时才更新。苏联是分布式的图结构,但是是基于shared memory的,图数据如何划分呢?vertex-cut和edge-cut。

支持动态收敛,可以一部分节点先收敛后停止运行。因此执行模型是节点计算时去pull邻居的数据。而不是邻居send数据给邻居,在Pregel里邻居主动push就会不支持动态收敛,异步执行,每个superstep都要求所有人发消息

一致性模型:race-free

三种consistency

  • full:锁当前vertex、相关的边和所有邻居vertex
  • edge:锁当前vertex、相关的边
  • vertex:只锁当前的vertex

PowerLyra:Differentiated Graph Computation and Partitioning on Skewed Graphs

之前的框架妖魔vertex-cut,要么edge-cut,one size fits all。于是powerLyra对 low-degree vertex 用 edge-cut,对 high-degree vertex 用 vertex-cut。

edge-cut:

  • 是切分边,把边切断放到不同的分区,从而分散了不同的点
  • 对于一个点来说有较好的局部性
  • 适合低度数的点,只要用cache把入边对应的点缓存了,就有单向局部性

vertex-cut:

  • 是切分点,把点切成好几份(mirror)放到不同的分区,从而分散了一个点不同的边
  • 能缓解 幂律分布下高度数节点导致的负载不均衡情况
  • 适合高度数的点,会产生mirror,需要用GAS消息传递

GAS模型

将一个顶点的计算分成了三个阶段。

gather:收集数据

apply:更新节点本身的数据

scatter:发送信息给邻居

为什么要分成三个阶段?之前的计算是都面向顶点的,所有计算都要在顶点上完成,但是图计算的复杂度不在顶点上,而在边上,一般来说边的数量远大于点的数量,比如Gather阶段就是面向边的规模,可以通过分散边来提高并行。

缺点:

  • 虽然提供了细粒度的阶段来提高并行
  • 但是由于限制了每个阶段的参与者,降低了功能性
    • Gather只能来自邻居
    • 原本的 Pregal 的计算可以在该阶段获取距离为2的节点信息来计算
    • 而GAS限制了这个操作

GraphChi:Graph processing on Single machine

单机上的好处:更容易debug,而且分布式图算法更难设计实现。基于磁盘IO的挑战:访问时延是数量级差距,读数据容易Random Access,主要贡献在于如何解决磁盘上的随机访问开销。

因此提出了PSW(parallel sliding window):分别为load、compute、write三个阶段

load:划分成 P 个 sub-graph

  • 把顶点划分成P 个不相交集合,称为 interval
  • 入边放到 所在 interval 相连的 shard 中
    • 即shard中的边的 target 在相连的 interval 中
  • 要求所有 shard 中的入边按照 source 进行排序
    • 从而使得所有 shard 中的边都按照 边的source 全局排序,而不是target
    • 每个 interval 的部分计算需要的局部性边(出边)在其他interval 的 shard 中都是连续的
    • 使得每次 sub-graph 加载都是 P 次连续读
  • 需要P^2连续读取

compute:

  • 所需出入边都被读取到内存了
  • 可以直接执行

write:

​ P^2写回

半异步:

  • 在计算不同个interval的时候
  • 后面的 interval 计算能看到前面的计算结果
  • 既不是同步的,也不是快照的完全异步

Multicore & NUMA: phoenix & TMR

TMR(Tiled MapReduce)

在单机中使用MapReduce,数据量可能不需要这么大,需要关注并发程度。

多核刚出现的时候,pheonix就把 MapReduce 应用到单机多核框架下,利用thread模式,共享内存空间,但是仍然存在问题:

  1. 大量的内存使用
    1. 需要保留整个输入数据,限制数据规模
    2. 输入数据占用几乎全部空间:大小和效率
  2. 数据局部性差
    1. 虽然对于intermediate buffer有局部性考虑。key存的是指针
    2. 输入数据局部性较好
    3. map,当前处理的数据如果与之前的数据需要有交互如比较,局部性就很差
    4. reduce,完全需要随机的访问
  3. 强阶段间的barrier
    1. 阶段改变的时候 CPU 会 idle 浪费
    2. 如某个 reduce 需要很久,其他 cpu 都要等着

因此提出了TMR:借用 Tiling 技术(常用于编译器),一种数据划分的方式,对于整个任务(输入数据)的划分。对小批量数据进行原本的 Map-Reduce,论文中称为 Map-Conbine操作,最后对划分的各个结果进行 reduce,要求reduce满足交换律和结合律,基本都满足。在sub-job中,执行一个iteration输入数据的 map 和 combine(与原本的reduce功能一致),生成 iteration buffer 的部分结果,与 input 同构,最后 reduce 生成 final buffer。

同时提出了三个优化:

  • 内存复用
    • 数据结构复用
    • key 的指针不能用了(输入的小块内存下一个MA会换)
  • 局部性优化
    • 小批的working-set数据自然有更好的局部性
    • 在NUMA下跨CPU的cache 访问很慢,接近内存访问
    • 如何避免 remote cache access呢?提供新的调度器
    • 对不同的CPU(多core)执行不同的输入数据块
      • 各有一套 intermediate buffer 和 interation buffer
      • 完全避免了远程缓存访问,数据压根就不一样
      • 反正最后都要等每块输入计算完成,整体上是差不多的
  • 流水线化
    • Reduce结束后CPU不需要IDLE
    • 可以进入下一块输入的 map 阶段

Improving in-memory Computing with New Hardware Features

关注新硬件提升性能。业务需要high throughput,关注单位时间能完成的请求,峰值并发量;low latency,关注单一请求完成的时延,用户体验

一般的解决方式是分布式数据中心。硬件平台有了新的硬件: HTM NVM GPU RDMA,重构上层系统软件以更好地利用新硬件的特性

HTM

提供硬件事务支持,自动保证 all-or-nothing,解决事务间的冲突。Intel 下的叫做 RTM,提供了三条新的指令,xbegin、xend、xabort,是Restricted 的,由于基于cache-coherent实现,要是访问数据规模超过缓存就会abort,不能有system call,如 pf、io。

RDMA

DMA,用于CPU托付DMA这个硬件去访问内存;RDMA,1的CPU 托付 2的网卡 去访问 2 的内存,不经过 2 的CPU:

  • one-side,提供一侧的 READ、WRITE
    • 从远端取数据的时延在 3us 以内
      • two-side 的可以用 IPoIB 模拟 TCP/IP 的接口,无需修改上层应用,100us
      • 作为对比,CPU直接访存的时延 100 ns = 0.1 us,数量级差距在10倍,访问网络相比磁盘(10^3倍)更快
    • READ、WRITE编程接口,可以直接访问远程内存,不经过远端CPU,原本是做不到的
  • two-side,提供两侧的 SEND 、RECV

hardware offloading

即软件做的事情交给硬件做。例如事务检查、远程内存访问,但是硬件往往需要软件打 patch ,稀释了硬件的性能优势。比如 RDMA 只提供访存,不提供大的一致性,只提供 cache line 级别的一致性,分次读取一大块远程内存数据可能是不一致的,所以跨 cache line 才会不一致,硬件以 cache line 为单位读取。

解决方法是硬件组合来规避:HTM + RDMA ->DRTM

DRTM:用 RDMA 及 HTM 来支持key-value store的both Read 和 Write

key-value store:软件去利用新硬件的时候需要做更多的限制,或者组合不同的硬件来规避单一硬件存在的问题。

RDMA的问题:

  • 只提供 cache line 级别访问的一致性
  • 一般读取的东西比较大,会超过 cacheline
  • 解决方案:
    • 直观做法是加 checksum,计算复杂需要CPU,红利就没了, Pilaf(ATC’13)
    • 比较好的是需要在 cache line 加 version 以保证跨 cacheline 的多次读取是一致的,FaRM(NSDI’14)
  • 上述两个工作都只利用RDMA去 Read,而把 write 只允许发送到 key 所在的机器 local write
    • 设计更好的数据结构优化 cache 以提高 read,可能让 write 更麻烦
    • Pilaf 使用 Cuckoo hash(一种多哈希),优化读
    • 但是复杂的写使得 HTM 难以被利用,容易超出 cache 范围自动 abort
    • 在本地检查 WW 冲突
  • 能不能用RDMA及HTM来支持both Read和Write?于是有了DRTM

简单的hash方式,使得可以利用HTM做 race detection

  • 远程RDMA读或写 会 打断本地的 Txn

允许One-sided RMDA 去 write

设计 cache,只本地缓存 key 和 value 的位置信息,不包括 value 的值

  • 虽然缓存仍旧需要远程读
  • 但是规避了 version 来保护一致性的需要
  • 多线程可以同时填充这个 cache

找到新的合适的硬件去优化分布式系统

  1. 了解其编程接口、性能特性
  2. 尽可能发挥硬件性能的软件重构以 offloading 功能到硬件
  3. 用软件的方式解决硬件操作可能的限制
  4. 一种硬件有大限制就用硬件的组合