Overview
考试周结束,放松(摸鱼)了一个星期,带着负罪感做点正事…首先是两门高度相似的专业课出分了,一门浑水摸鱼的课(Computer System Design and Implementation)拿了A,另外一门复习得很认真的课(Advanced Distributed System)却拿了个B…嗯对,这篇文章就是对这门拿B的课的一个小结。
虽然很无奈,但是这也许就是有心栽花花不开吧~
Contents
并行(parallel)与分布式(distributed)的区别?当我们一定要对此做区分的时候,一般从两个角度来看:
- 数据通讯方式:并行是shared memory,分布式需要通过网络
- 时间:并行是shared memory,分布式需要通过网络
- failure 模型:并行是all-or-nothing的,分布式存在partial failure
从以下这些角度去看待分布式系统
- consistency in distributed system
- crash recovery & logging
- concurrency control
- two phase lock & snapshot isolation
- consensus
- two phase commit
- paxos
- distributed file system
- NFS
- GFS
- Chubby ?
- data-parallel programming
- MapReduce
- dryad
- graph computing
- Pregel & GraphLab
- PowerLyra & BiGraph
- GraphChi
- multicore & NUMA: phoenix & TMR
- Improving in-memory Computing with New Hardware Features
Consistency in Distributed System
consistency是一个被滥用的词,在database和distributed system 中都有设计,但是可能是完全不同的概念,一般有这些含义:consensus、consistent model、consistent database、serializable、cache coherence
consistency in distributed system指的是其中的consistent model。一个分布式系统往往会有很多进程,以分布式数据库为例,在时刻$t$,client A把X由3更新到4,之后在时刻$t^`$,client A和client B分别询问两个不同的replica关于X的值,可能得到两个不同的答案(3 or 4),困惑就产生了。我们的初衷是不管这个分布式数据库内部是如何实现的,但是对外提供了一个假象:X只有单一的副本,这就是一个单机的数据库!
Consistency determines what responses are acceptable following an update of a replica.
Which shared memory behaviors are allowed—so that programmers know what to expect and implementors know the limits to what they can provide.
没有对的或者错的consistency model,它是ease of programmability 和 efficiency之间的tradeoff。在分布式系统中的难点在于
- data replication(caching)
- concurrency(no shared clock)
- failure(machine or network)
Strict Consistency
它是最严格的模型,所有的replica都在一瞬间、全部更新X完成。当然这在现实中是不可能的。当某个数据项(X)发生变化后,不同的 client (client A 和 B)在紧随其后的任意时刻,看到的 X 都是一样的、都是新值。
If one of the processes executes x ≔ 5 at real time t and this is the latest write operation, then at a real time t′ > t, every process trying to read x will receive the value 5.
我们定义分布式系统中的一个操作排列叫history,如果没有overlap则是sequential history,否则是concurrent history
Linearizability
相比strict consistency弱一些,不要求write是瞬间完成的,比如数据库使用同步赋值,write需要一些时间才能完成,但是对于系统的要求依然如strict consistency,和其的区别在于:write 需要时间,但 X 值的改变在时长内的某个瞬间发生。
最简洁的定义为:对于history$H_1$,如果存在一个sequential history$H_2$与$H_1$等价,则称$H_1$满足linearizability
Sequential Consistency
在linearizability的基础上再放松一点,everyone sees same read/write order. strict consistency是不现实的,因为没有global wall-clock,所以使用了total order of ops来替代。注意!sequential consistency是strict的一个特例。即用timestamp来表示total order
consistency rules:
- each CPUS’ ops appears in order
- All CPUs see results according to total order(ie. reads see most recent writes)
Release Consistency
其实就是lock consistency。
为什么需要它呢?因为sequential consistency有很多问题:
- false sharing. 两个变量在同一个page,两个Node分别RW各自的变量->pingpong. 最早出现在cache中,只是粒度是cache line,不过本质是一样的
- 解决方法:编译时把两个变量放在不同的page,但是会导致占用的内存和传输大小变大
- 真正的解决方法:twin+diff,记录page的增量修改。 sending a diff is cheaper.
update protocol VS invalidate protocol:
modification/invalidation of modification, Invalidations are smaller than the updates, In invalidate-base protocol, there can be significant overhead due to access misses
Eventual Consistency
sequential:pessimistic conflict handling
- conflicting writes is common case
- Updates cannot take effect unless they are serialized first
eventual:optimistic conflict handling
- Conflicting writes is rare case
- Let updates happen, worry about whether they can be serialized later
为了更好的performance!之前的缺点是性能低下,需要全局同步,从可用性来讲,节点中只要有人断开或partition就会一直block,于是eventual consistency支持disconnected操作,存在可能的应用程序异常行为(陈旧读,冲突写)。
解决方案是 Update Function:
Have update be a function, not a new value. Read current state of storage, decide best change. Function must be deterministic, Otherwise nodes will get different answers.
如何让每个节点的 update order 达成一致呢?不能用一个全局顺序,否则回退到 sequential consistency,发送操作时携带 update ID = time + node ID, 把本机更新该执行操作的时候的时间记录下来,然每个机器的时间会不一样,仅用这个时间来确定 所有(本机和其他)执行的 update order 是可行的。称为 Rollback & Replay.
Challenge of Clock Time: 物理时钟和逻辑时钟(Lamport时钟)
简单来讲就是同步状态时可能破坏因果依赖。某台机器的 timestamp 很小 = 7,在看到其他大 timestamp = 10 的 add 操作 后执行了删除,会导致同步操作是 先 delete 后add,在发操作进行 sync 同步的时候,同时去修改本地时间 max(自己的本地时间,看到的其他机器执行时间 + 1)
logical clock: T = Max(T, T’+1)
De-centralized | Centralized |
---|---|
连续 ID | 加入 CSN,安排一个节点比其他节点多一个操作,添加 CSN 称为 final time,再发给别人 |
但是要是 前面 ID 的一直没来就会卡着,出现网络 failure 就凉了 | 在比较时,有 CSN 的 操作优先,所有人同步到有 CSN 的机器视角 |
不能保证所有节点的最终状态一致,不能把慢的消息去掉或者赶到前面去 | 系统性能会不会卡在 CSN 这呢?还行,因为系统正常执行是高效的,状态同步可以认为是 offline 的,影响最终确定状态的性能 |
Casual Consistency
一篇论文:COPS: Scalable Causal Consistency for Wide-Area Storage
不是提出了 casual consistency,而是关注数据量大,分布广下的场景,关注 casual consistency 的可扩展性。
一台机器不能保存所有数据了,不同机器间对操作的 log 如何确定顺序,不同机器间的数据可能存在依赖关系,如何维护?
如何维护:
- Bayou系统里在最终状态更新的时候,使用 log 来 replay 其他机器的操作,最后会把操作线性执行,影响 scalability
- COPS 希望提高并行性,显式记录每个操作的因果依赖,无依赖的操作无需线性
- dependency 会成为瓶颈吗?
- 会越来越多
- 不会,dependency 是树的形式,可以剪枝,实际上仅依赖有一条
- 这个依赖满足,这个之前的一定也满足
Crash Recovery & Logging
为什么要crash recovery?
crash recovery 指的是数据库事务非正常退出(kill -9 pid),这回导致磁盘上的page状态与redo log的内容不一致,因为buffer pool中的page是异步刷入磁盘的,这就需要我们在机器crash recovery时将两者恢复到一致的状态。All or Nothing!
除了要将磁盘页面恢复到一个一致状态以外,还需要考虑到退出时的活跃事务处理:哪些事务需要提交,哪些事务需要回滚等等,这些也属于crash recovery的工作职责。
如何crash recovery?
redo log & undo log。redo log指的是将磁盘上的数据页面恢复至最新状态,而回访的起始位点就是实例推出前记录的checkpoint lsn。因为checkpoint lsn保证这之前的redo log对应的page更改一定被持久化了。
Solution: Logging
logging会记三种。do、undo、redo,在do的时候记录log,用记录的log可以回到old state。
redo log:用于重做已经commit但是还在buffer里即没有刷入磁盘时,就已经crash,导致recovery时数据不一致,所以需要重做一遍
undo log:用于撤销没有commit,但是已经从buffer中被刷入磁盘的数据,因为crash意味着该事务abort了,就不应该持久化,所以需要undo
如何做checkpoint?
找一个合理的时间点,将之前的 log 全部落盘,把已经 commit txn 的 log 去除,写入ongoing 的 txn 及指向其最近的操作记录指针,atomically 更新 checkpoint root
如何从checkpoint中recovery?
UNDO-REDO logging
- 读 cp,获取 ongoing 的 txn
- 读 log,判断 txn 是否 commit,已经 commit 称为 winner,反之称为 loser
- UNDO loser 以去除 cp 之前的 没有 commit txn 的已执行操作
- REDO winner 以恢复cp 之后的已经 commit 的 txn 的未执行操作
- 那么 cp 没记录且没有 commit 的 txn 呢?不用管,cp 中没有这个 txn,且没有 commit
REDO-ONLY logging
- 在做 cp 的时候,仅仅把已经 commit 的 txn 落盘,而对于 uncommitted 的 txn 不能写磁盘(因为不能 UNDO),但是要在 cp 中记录
- recovery 的时候对于所有 uncommitted 的 txn 都要 REDO
- 这种情况下,CMT 标记可以 delay 写,cp 的状态(磁盘的状态)都是 legal 的,而 REDO-UNDO中 磁盘可能出现 illegal 状态,对于 short txn 较好,而对于 long txn 可能需要 REDO 很长一段已经做了的操作
UNDO-ONLY logging
logging 规则,在写 CMT log 前要把前面的操作都写入操作,因为可以 UNDO,但是不能 REDO,知道这个 txn 的前面的操作都写入了才能写 CMT log
Concurrency Control
为什么需要concurrency control?
在下层(middle ware,数据库,分布式系统)处解决并发控制问题,能为上层应用提供更好的支持即通用性,但是也更难,获取的信息更少,需要支持的更多。在上层做,应用程序需要负责保护数据隔离,好做,但是难移植,但是对于特殊场景下的问题,在上层做可能较为容易,如对于 Phantom 问题,上层较容易发现,可以有自己的约定来解决。
下层并发控制希望并发事务 能够 等价到某种串行执行顺序,as if serially,对应的串行执行顺序可以是$ T_1 $先也可以是$T_2$ 先,但是不能四不像。
事务的ACID
- Atomicity,在 fail时,all-or-nothing
- Consistency,维护状态不变量invariant
- Isolation,并发执行不冲突(interfere)
- Durability,survive failure
serializability
理想语义状态,一系列事务的执行可以等价到 serial 顺序
conflict serializability: 两个schedule是 equivalent 等价的,即这两个调度包含同样的 op,且对于冲突op安排了相同的顺序order。
冲突op意味着:两个op访问同一个数据,至少有一个是write。
如果某一个 schedule 可以等价到某个 serial schedule,那么该调度就是 serializable
如何保证 serializable schedule 呢?
global lock:正确,但太慢
在 R/W X 前拿到 X 对应的 锁,完成R/W后放开,即 short-duration lock
错误,并没有指定并发事务对于操作的顺序;若有 write 操作,则能读取到 uncommitted value
在 W X前拿X的锁,直到事务完成才放开,即 long-duration locks on WRITE
解决 Dirty Read 的问题;但是出现Non-Repeatable read 问题,重复读可能会有不同的值,显然在 串行执行中不会出现;改进方法是,对于 read 也用 long-duration lock
two phase lock
growing 阶段只拿锁,shrinking 阶段只能放锁
谁拿到第一把冲突锁确定了 并发执行所等价的串行执行 的顺序
2PL 解决串行化调度的问题,但是会有 死锁
Avoidance,将锁排序,按锁的顺序拿锁,而不是按访问的顺序拿锁
但是并不是总能知道要拿哪些锁?
Detection,探测死锁 cycle,abort 相关事务
探测可以是主动的(建立事务依赖图),也可以是被动的(timeout到了就abort)abort 的目标选择,abort 哪个好
隔离级别
read uncommitted
- 允许读到uncommitted的值,但是不能写uncommitted value
- 读的时候不拿锁,写不变
read committed
- 读的时候拿锁,但是读完就放掉
- RU 和 RC 规定拿锁的时段
- unrepeatable read
- 读的数据的的内容在一个transaction中前后不一致
repeatable read
读的时候拿锁,直到事务结束放掉,但是对于 index 锁(range锁)是读完就放掉
RR 规定放锁的范围
phantom
读的数据的的数量在一个transaction中前后不一致
如果有 insert 新数据的操作,会出现 Phantom。因为原有的 lock 只对于本来就有的数据。
解决方法:谓词锁,predicate locking;对于 B树索引,加range lock;在数据层忽略
serializable
- 每种读都拿锁,直到事务结束放掉
Multi Version Concurrency Control
serializable太慢了,怎么办呢?就这么做,容忍可以读到old value
- 每个 data item 有多个版本
- 在执行过程中缓存write的op
- read时选择合适的版本
- 在commit时,系统需要validate现在的状态是否适合使该事务中的W让别人可见
Snapshot Isolation
这是一种弱于serializability,但是强于repeatable read的一种隔离级别。现在MySQL中虽然说的是repeatable read,但是实际上是snapshot isolation!!!(坑爹)
rules:
- 数据有 snapshot(旧数据),Read 从快照中读。
- Write 进行缓存
- 在 Commit 时检查 Write-Write 冲突,若有则 abort
可以发现效率应该较高,只检查 W-W 冲突,检查范围较小,其他事务读取过某数据难以记录及检查。不检查 R-W 冲突,所以仍旧会存在 anomaly (盲写),但是较少
snapshot isolation不等于serializability
会有write skew的问题!$T_1 $读X写Y,$T_2$读Y写X,不可以等价到串行化结果,但是在SI下可以提交。
解决方法:在协议上,将SI退化,把 Read 也加入到多版本控制集合,在 RW冲突下也 abort;或者在上层应用上解决,上层发现该问题并处理。
可是什么是serializable snapshot isolation?
make snapshot isolation serializable!
Consensus
two phase commit
从单个单机操作,到今天是数据在多机上,同时有多个操作。Transaction coordinator 不存储数据,负责分发client 的操作请求,中转 storage server 的数据。
one phase commit会有问题!是一个分布式的事务提交,有 node 可能 crash,有 node 可能不满足事务所需的数据要求,例如转账钱不够
two phase:
- voting
- committing
参与者需要 log 自己的 vote,发生 crash 也要记住。the coordinator需要log自己的 decision,crash 要记住,发生网络延迟可能有参与者没收到会重新要求 the coordinator 发一遍。事务是commit 还是 abort 在 commit 阶段 TC 决定 outcome 后就确定了,即使有 参与者 在 commit 前 crash,recovery 后也要记住。
timeout
participant timeout
只会在第一轮到第二轮之间,参与者vote 但是没有收到 decision。如果参与者 vote NO,参与者可以 abort;如果参与者 vote YES,由于一个参与者的 YES 最后结果可能仍旧不是 Commit,参与者只能等,指需要的资源会一直被 lock
the coordinator timeout
在第一轮中出现,TC 收不到参与者的 vote。如果有人 vote no,可以直接决定 abort;如果迟迟收不到全 YES,也可以直接 abort,虽然有可能最后收到了全部vote 都是 YES。最终决定在 the coordinator,但是为了 性能,这个 timeout 应该长一点。
termination协议
- 参与者$N_1$投了 YES,但是Timeout 收不到 TC 的决定
- 可以询问其他 node $N_2$,先问有没有决定,再问 $N_2$ 的 vote
- 如果 $N_2$ 已经收到过 决定了,那就是$N_1$和 the coordinator 的网络有问题,用$N_2$的决定即可
- 如果 $N_2$ 还没投,看实现,可能是 the coordinator 发的东西 $N_2$ 没收到或者忘了回
- 看实现,可以让$N_2$ 直接投个 NO,$N_2$要是一定要投 YES 就继续问别人
- 如果 $N_2$ 投了 NO,$N_1$ 则可以 abort
- 如果 $N_2$ 投了 YES,继续问别人
- 如果参与者互相询问,发现所有参与者都是 YES,但都没有收到决定
- 仍旧不能COMMIT
- 即使都是YES,the coordinator可能也做出 Abort 决定,如the coordinator fail,the coordinator网络 partition 导致 Timeout
reboot 恢复
- 参与者 和 TC 都要记住自己的选择
- 当然有很多 tradeoff 和实现细节
- the coordinator 可以只 log COMMIT,参与者可以只 log YES
- 参与者 reboot 的时候发现 YES 可以用 Termination 协议来决定行为
two phase lock与 two phase commit的关系
- two phase lock 是保证参与者在执行 transaction 时,访问对象的时候不发生事务间的冲突
- two phase commit 是用于 transaction 最后的提交阶段能在分布式场景下完成
- 事务提交是 two phase commit 的一个应用
Why need consensus?
关注two phase commit的 fault tolerance阶段。因为two phase commit在有机器挂掉的情况下整个系统都无法前进,需要等待机器重启(网络partition),需要很长时间,需要记录状态。
因此我们需要在有机器 fault 的情况下仍然能够达成一致(consensus)!之前都没有考虑 crash 的情况,需要对 coordinator、接下来进行的操作达成一致。
因此诞生了paxos协议。容错的基础是 replication,replication 的基础是各个作为容错的机器在提供正常服务的时候状态等价,在two phase commit且有 coordinator crash 的情况下有两个issue:
- 其他存活的机器要能发现,且选举新的 coordinator
- 当crash的机器reboot后,需要加入容错集群,并跟上状态
曾经的做法是RSM:
- primary backup 以保证其他机器状态与 primary 一致(没有考虑有机器挂了的情况)
- failure handling 存在很多挑战
- detect,自己网断了还是primary网断了,对谁failure达成一致
- 如何竞争成为新的唯一的primary,对谁成为新的primary达成一致
- 成员人数不满的情况下对sequential operation不管新旧达成一致
- 达成一致需要 consensus 协议
- 需要最少三轮消息传递,开销很大,仅在出错导致状态改变需要使状态重回一致时使用
Consensus: Paxos
所有人对同一个值达成一致。前提要求:
- 2P+1节点中最多能挂P个节点
- 传递的消息可以丢失、乱序或者重复,但是不会被篡改
- 节点不会成叛徒,说假话
- 达成一致的 value 一定是由某个 proposer 提出的
分布式系统是一个异步系统
- 消息什么时候有返回是未知的
- 故不能判断消息是丢了,还是还在发送中
- 所以不能判断节点是挂了还是活着(但是不能连接)
live lock in paxos
paxos 实际上不保证 termination,极端情况有live lock
- A 用编号 N 发了 prepare 获得 majority 的promise,在 A 发 accept request前
- B 用 N + 1 发了 prepare 获得 majority 的promise成为了 新的leader,那么 A 的 accept request 会被拒绝,然而在 B发 accept request前
- A 用编号 N +2 发了 prepare 获得 majority 的promise,那么在 B 的 accept request会被拒绝
- …
paxos algorithm
- phase 0: client 发送 request,对于 client 的每个请求,都会产生一个 paxos 实例
- phase 1:
- step a: leader proposer 的 prepare request 用编号N
- step b: acceptor 的 prepare reply (加上 promise)
- 编号N是收到过的最大编号
- 返回这个 acceptor 曾经接受过的编号最大 proposal 及 value (如果有的话)
- 给出 promise——忽略以后编号小于 N 的proposal
- 见过比 N 大的编号,忽略
- 编号N是收到过的最大编号
- phase 2:
- a:得到 enough promise,发送 accept request
- accept request 包括
- 发 prepare 用的编号 N
- prepare reply 中存在的编号最大的value V(如果有的话),没有就自己寻一个V
- accept request 包括
- b: accept reply to proposer / learner
- 收到的 accept request 中的编号仍旧是当前的 promise
- 记录 v
- 发 accept OK 给 proposer 和 learner
- 否则忽略
- 收到的 accept request 中的编号仍旧是当前的 promise
- a:得到 enough promise,发送 accept request
- phase 3: learner 返回 client
understanding paxos
令$N_h$为见过的最大编号
令$N_a$为已经接受的value的最大编号
- 为什么要多 acceptor
- 一个 acceptor 挂了
- 为什么不直接接受第一个 proposal,而要多 proposal
- 发第一个proposal 的 leader 可能会挂
- multiple leader 可能导致这个 proposal 不会被 majority 接受
- 允许多 proposal 可以使得曾经挂了的节点重启后追上,获取到已经确定的结果
- 挂了之后可能以较小的 编号发送 prepare,但是会被ignore
- 然后会以较大的编号再发 prepare,acceptor 会把已经通过的确定的提案 value 返回
- 这个重启节点就会再以这个 value 发 accept request,acceptor 会通过
- 从而重启节点获得了结果,而最终 value 早就确定也不会改变
- 如果有多个活跃的leader会怎么样?
- 可以有多个 proposer 都通过了 prepare 阶段
- 但是最终只会有一个 value 通过 accept 阶段
- value 是什么时候最终被 chosen 的
- majority 的 acceptor 在持有合法 promise 的情况下收到 accept request<N, V> ,这个 value 就确认 chosen 了
- 即使后面还有其他 leader 去 prepare 也只会选择这个 chosen V 来继续
- 如果 acceptor在send promise之后 fail
- 必须记住(持久化)$N_h$,见过的最大编号
- 极端情况
- 如果完成 prepare 阶段后,有 P 个 acceptor 见过编号 3,P+1个acceptor 见过编号 4
- 按照正常情况,编号3的 accept request 不会收到 majority 返回(最多P个)
- 如果很不幸 P+1 个见过编号4 的acceptor 挂了一个,然后重启的时候没记住 $N_h$,接受了编号3的accept,就错了
- 如果 acceptor 在接受 accept request 之后挂了
- 必须持久化 $N_h$,$N_a$(已经接受的最大编号及相应的Va-value)
- 同样极端情况,是 P 个 acceptor 接受了 编号 3,P+1 个接受了编号4
- 正常情况,以后的所有 proposer 只要收到 majority 的 promise,同时会发现最大的已接受最大提案编号为 4 所对应的 value
- 若很不幸 P+1 个接受编号 4 的 acceptor 中挂了一个,重启时没记住 Na,那么它返回的 promise 为空
- 以后的 proposer 会发送一个编号更大,但是携带的value是 编号为3的那个提案所对应的
- 如果leader 在发送 accept 后挂了?
- 重发 Mn(自己的编号为 n 的提案)
- 也可能是 leader 连不上多数 acceptor 要求重发
- 当 leader proposer 挂了之后其他 proposer 如何发现这个问题
- 这是一个实现上的细节问题,实际上其他 proposer 无法判断到底leader是挂了还是网络分区了
- 按照 Paxos 的协议实际上 每个 proposer 都会去发 propose(=prepare)
- 在实际实现中,可能只有 leader 去发,但是其他 proposer会在接到client的请求后设置timeout,超时就认为 leader 挂了,自己作为 leader 开始发 propose了
- 什么叫同时存在两个 active 的 leader?
- 就是可能有多个 proposer 通过了 prepare 阶段,实际上只有一个可能通过第二阶段
- 他们各自都认为自己的 proposal 通过了第一阶段,还没到第二阶段,故是active 的,等到进入第二阶段被 reject 就重来
- 在真的实现中,acceptor 第二阶段发的 reject 消息可以携带现在最大 编号等信息
- 实现中,acceptor 和 proposer 可以在同一个物理实体中,是可以进行通信的,proposer 可以通过同物理进程下的 acceptor 发现已经有 value 被 accept 了
Consensus: Raft
比 Paxos 易于理解,易于实现。都是follower,设置不同的随机 timeout 来成为 candidator 去发包,收到 majority 回包的 成功成为 leader,多个 candidator 就会看谁发得快,谁的包先到别的 follower 那,先到的先拿到票
To be continue…
Distributed File System
从这里开始进入特定的系统,而不是基础的概念和技术
remote file access
- FTP Telnet
- 希望transparent:NAS network attached storage
- File Service Type:
- upload,download都以文件为单位去操作,存在容量、贷款、一致性的问题
- remote access:传输文件操作,性能问题、拥塞
文件共享的semantics
设计的选择从cache、场景、stateful/stateless的角度来看
- stateful:保留client的信息
- 短请求
- 更好的性能
- 实现client端数据的刷新
- 实现lock
- 问题是:集群网络不好,client没有断开就掉了,会留下僵尸状态;client暴增的时候server会裂开
- stateless:
- 扩展性好
- 没有状态信息的问题,无法lock,不知道什么时候能delete file,因为无法确定有没有client打开了它
NFS——network file system
强调transparency与易用性,每个人都是client也都是server,支持diskless工作站,恩地没有磁盘,并且是stateless的,长请求(据说现在已经转向stateful了…)
在使用过程中,向上提供open操作,实际使用lookup,模拟出fd,然而不保证真的有这个文件,read可能读不到,一般比本地读要慢。cache使用timestamp方式处理,总是在一定时间后invalidate。
存在的问题有:
- 文件一致性
- 没有reference count:所有的长亲贵都是独立的
- 没有锁
GFS——Google file system
用廉价存储支持大规模可扩展的数据密集型应用。设计的场景是:文件很大,生命周期很长,大量的机器 failure 很常见;文件访问一般是append,顺序写入,随机写非常少,尤其是并发写。
因此,app和fs是协同设计的,app只能通过特定的client去访问,修改了文件系统接口,没有VFS,只有User-level API,增加了append操作,允许并发原子append
结构一般为N个chunk 和 1个 master。master存储metadata,以在一台机器的内存里。多节点replica,对于读很好,但是随机写的情况很少
开源实现HDFS,不能random写,只能append
Data-parallel Programming
MapReduce
并行编程,如何使用大规模普通机器去计算大数据。并行有两种方式:
- 程序的并行:困难,往往步骤之间是有依赖关系的
- 数据的并行:每个数据都在做相同操作,数据之间的操作没有依赖
并行计算分为集群计算(Cluster computing)和网格计算(Grid computing)
MapReduce架构
分为两层,大量应用。spark pytorch都是这样的。
- 接口层:这是指编程模型,即使用编程模型的业务应用程序员
- 平台/框架:实现接口,解决分布式问题,系统/分布式程序员
编程模型
- 功能足够且接口简单,就实现两个接口,想法来自于函数式编程
- Map接口:
- 输入划分(partition)完的 chunk/shard 数据
- 输出中间结果 (k,v) pair
- Maper 还要 partition 中间文件。中间文件存在Mapper本地
- Reduce接口:
- Reducer先排序中间结果,把相同key聚集
- 输入排序后聚集的数据 (k, set of v),一个 key 的所有 value
- 所有相同的 key 要聚集在一起给 reduce,由框架的shuffle完成
- 不同的reducer处理不同的key:hash(key) % R
- 输出reduce结果:merge所有key的结果
局部性
网络开销是分布式一大问题。输入输出的文件在 GFS 上,希望把执行 map 的机器也放在这些机器上运行,map要用的输入文件就在本地。可以由 GFS Maaster 和 MapReduce Master 协调完成。
容错
- worker failure
- 如果真的挂了,策略是 re-execute。甚至换人做,重复做也可以,只要能记住输入对应的中间文件。partition 的时候跳过 损坏的 和 重复的
- 如果 logging 会有很大开销
- master failure
- 放在 GFS Master 一起,主从备份
- 需要备份状态
- straggler
- 这个任务做得很慢
- backup 执行,相当于 re-execute,其他人都做完了在等可以来 backup 执行。第一个做完的 win,其他人就停
- 这也是写在本地的原因,要是发过去,可能会很多人都给 reduce 发同一份数据
中间结果存在mapper还是reducer?
- 存在mapper的好处
- 可以提前做聚集,来减少网络传输开销
- 如果 map failure 了,发送数据是否成功且完整会影响 reduce 执行,不如选择在 map 这保存之后再发送
- 如果 reduce failure 了,可以重复去 mapper 读取数据,而不需要 map 重新做
- 存在reducer(直接发过去)的好处
- pipeline,reduce 可以再整个计算完成前提前提供中间结果
后续的优化
如谷歌的网页搜索,希望做增量计算,而mapreduce不支持,所有有新的框架:caffeine
Dryad
微软强大的系统框架,但是不成功,不告诉你系统的细节和代码,还想收费,大大领先 mapreduce,但是少人知道。
与 Mapreduce 相似的目标,但是有区别。以 graph、DAG 的方式描述业务逻辑,表达依赖关系,而 mapreduce 不管你业务逻辑,只有 map - reduce 一种,灵活调度。表达能力比 MR 强很多,MR业务逻辑都要表达成 MR,而 这是 DAG Directed Acyclic Graph ,有向无环图,每个程序都能被表示成 dataflow graph
容错
也是重做,可以用 顶点 追溯执行,针对 DAG 图
优点
更灵活的 op,只是程序员需要掌握更多东西;根据DAG自动优化(编译器),比MR更先进
Tensorflow
- 参数服务器
- 将单个 op 作为 数据流图的节点
- 同步异步训练
- straggler backup
Graph Computing
图计算比原本的 Data Parallel 要复杂很多,数据之间有依赖关系,例如page rank。如何用framework来支持这样的计算呢?图计算!
图并行算法,特征是图结构描述数据依赖,一个顶点更新与一个范围(邻居)内的数据有关,迭代更新
从修改现有框架开始
page rank可以在MapReduce中实现,但是很麻烦,数据冗余度也会很大,同时mapreduce不支持迭代更新,每次mapreduce只更新一个迭代,需要多次运行mapreduce实例,在这个过程中每次中间数据都会被持久化,没有任何优化,所以需要有专门的图计算框架!
Pregel, Google 2010
使用BSP(bulk synchronous parallel),节点直接只有消息传递,即使是同一台机器。机器之间的message-passing是bulk化的,即batch。superstep=迭代次数,需要保证同步,所有节点都受到消息,批量传递,提高网络利用效率。
从节点的角度计算,接受上次superstep的消息,计算user-define计算function,发送消息到neighbor
GraphLab
因为Pregel中的BSP是个同步模型,每个superstep所有的节点都会更新,但是不同节点达到收敛所需的迭代次数不同,所有节点都要同步根棍也会更新这些节点,浪费!同时性能也会被运行最慢的节点限制,造成木桶效应。
因此GraphLab提出了异步模型,只有邻居变化时才更新。苏联是分布式的图结构,但是是基于shared memory的,图数据如何划分呢?vertex-cut和edge-cut。
支持动态收敛,可以一部分节点先收敛后停止运行。因此执行模型是节点计算时去pull邻居的数据。而不是邻居send数据给邻居,在Pregel里邻居主动push就会不支持动态收敛,异步执行,每个superstep都要求所有人发消息
一致性模型:race-free
三种consistency
- full:锁当前vertex、相关的边和所有邻居vertex
- edge:锁当前vertex、相关的边
- vertex:只锁当前的vertex
PowerLyra:Differentiated Graph Computation and Partitioning on Skewed Graphs
之前的框架妖魔vertex-cut,要么edge-cut,one size fits all。于是powerLyra对 low-degree vertex 用 edge-cut,对 high-degree vertex 用 vertex-cut。
edge-cut:
- 是切分边,把边切断放到不同的分区,从而分散了不同的点
- 对于一个点来说有较好的局部性
- 适合低度数的点,只要用cache把入边对应的点缓存了,就有单向局部性
vertex-cut:
- 是切分点,把点切成好几份(mirror)放到不同的分区,从而分散了一个点不同的边
- 能缓解 幂律分布下高度数节点导致的负载不均衡情况
- 适合高度数的点,会产生mirror,需要用GAS消息传递
GAS模型
将一个顶点的计算分成了三个阶段。
gather:收集数据
apply:更新节点本身的数据
scatter:发送信息给邻居
为什么要分成三个阶段?之前的计算是都面向顶点的,所有计算都要在顶点上完成,但是图计算的复杂度不在顶点上,而在边上,一般来说边的数量远大于点的数量,比如Gather阶段就是面向边的规模,可以通过分散边来提高并行。
缺点:
- 虽然提供了细粒度的阶段来提高并行
- 但是由于限制了每个阶段的参与者,降低了功能性
- Gather只能来自邻居
- 原本的 Pregal 的计算可以在该阶段获取距离为2的节点信息来计算
- 而GAS限制了这个操作
GraphChi:Graph processing on Single machine
单机上的好处:更容易debug,而且分布式图算法更难设计实现。基于磁盘IO的挑战:访问时延是数量级差距,读数据容易Random Access,主要贡献在于如何解决磁盘上的随机访问开销。
因此提出了PSW(parallel sliding window):分别为load、compute、write三个阶段
load:划分成 P 个 sub-graph
- 把顶点划分成P 个不相交集合,称为 interval
- 把入边放到 所在 interval 相连的 shard 中
- 即shard中的边的 target 在相连的 interval 中
- 要求所有 shard 中的入边按照 source 进行排序
- 从而使得所有 shard 中的边都按照 边的source 全局排序,而不是target
- 每个 interval 的部分计算需要的局部性边(出边)在其他interval 的 shard 中都是连续的
- 使得每次 sub-graph 加载都是 P 次连续读
- 需要P^2连续读取
compute:
- 所需出入边都被读取到内存了
- 可以直接执行
write:
P^2写回
半异步:
- 在计算不同个interval的时候
- 后面的 interval 计算能看到前面的计算结果
- 既不是同步的,也不是快照的完全异步
Multicore & NUMA: phoenix & TMR
TMR(Tiled MapReduce)
在单机中使用MapReduce,数据量可能不需要这么大,需要关注并发程度。
多核刚出现的时候,pheonix就把 MapReduce 应用到单机多核框架下,利用thread模式,共享内存空间,但是仍然存在问题:
- 大量的内存使用
- 需要保留整个输入数据,限制数据规模
- 输入数据占用几乎全部空间:大小和效率
- 数据局部性差
- 虽然对于intermediate buffer有局部性考虑。key存的是指针
- 输入数据局部性较好
- map,当前处理的数据如果与之前的数据需要有交互如比较,局部性就很差
- reduce,完全需要随机的访问
- 强阶段间的barrier
- 阶段改变的时候 CPU 会 idle 浪费
- 如某个 reduce 需要很久,其他 cpu 都要等着
因此提出了TMR:借用 Tiling 技术(常用于编译器),一种数据划分的方式,对于整个任务(输入数据)的划分。对小批量数据进行原本的 Map-Reduce,论文中称为 Map-Conbine操作,最后对划分的各个结果进行 reduce,要求reduce满足交换律和结合律,基本都满足。在sub-job中,执行一个iteration输入数据的 map 和 combine(与原本的reduce功能一致),生成 iteration buffer 的部分结果,与 input 同构,最后 reduce 生成 final buffer。
同时提出了三个优化:
- 内存复用
- 数据结构复用
- key 的指针不能用了(输入的小块内存下一个MA会换)
- 局部性优化
- 小批的working-set数据自然有更好的局部性
- 在NUMA下跨CPU的cache 访问很慢,接近内存访问
- 如何避免 remote cache access呢?提供新的调度器
- 对不同的CPU(多core)执行不同的输入数据块
- 各有一套 intermediate buffer 和 interation buffer
- 完全避免了远程缓存访问,数据压根就不一样
- 反正最后都要等每块输入计算完成,整体上是差不多的
- 流水线化
- Reduce结束后CPU不需要IDLE
- 可以进入下一块输入的 map 阶段
Improving in-memory Computing with New Hardware Features
关注新硬件提升性能。业务需要high throughput,关注单位时间能完成的请求,峰值并发量;low latency,关注单一请求完成的时延,用户体验
一般的解决方式是分布式数据中心。硬件平台有了新的硬件: HTM NVM GPU RDMA,重构上层系统软件以更好地利用新硬件的特性
HTM
提供硬件事务支持,自动保证 all-or-nothing,解决事务间的冲突。Intel 下的叫做 RTM,提供了三条新的指令,xbegin、xend、xabort,是Restricted 的,由于基于cache-coherent实现,要是访问数据规模超过缓存就会abort,不能有system call,如 pf、io。
RDMA
DMA,用于CPU托付DMA这个硬件去访问内存;RDMA,1的CPU 托付 2的网卡 去访问 2 的内存,不经过 2 的CPU:
- one-side,提供一侧的 READ、WRITE
- 从远端取数据的时延在 3us 以内
- two-side 的可以用 IPoIB 模拟 TCP/IP 的接口,无需修改上层应用,100us
- 作为对比,CPU直接访存的时延 100 ns = 0.1 us,数量级差距在10倍,访问网络相比磁盘(10^3倍)更快
- READ、WRITE编程接口,可以直接访问远程内存,不经过远端CPU,原本是做不到的
- 从远端取数据的时延在 3us 以内
- two-side,提供两侧的 SEND 、RECV
hardware offloading
即软件做的事情交给硬件做。例如事务检查、远程内存访问,但是硬件往往需要软件打 patch ,稀释了硬件的性能优势。比如 RDMA 只提供访存,不提供大的一致性,只提供 cache line 级别的一致性,分次读取一大块远程内存数据可能是不一致的,所以跨 cache line 才会不一致,硬件以 cache line 为单位读取。
解决方法是硬件组合来规避:HTM + RDMA ->DRTM
DRTM:用 RDMA 及 HTM 来支持key-value store的both Read 和 Write
key-value store:软件去利用新硬件的时候需要做更多的限制,或者组合不同的硬件来规避单一硬件存在的问题。
RDMA的问题:
- 只提供 cache line 级别访问的一致性
- 一般读取的东西比较大,会超过 cacheline
- 解决方案:
- 直观做法是加 checksum,计算复杂需要CPU,红利就没了, Pilaf(ATC’13)
- 比较好的是需要在 cache line 加 version 以保证跨 cacheline 的多次读取是一致的,FaRM(NSDI’14)
- 上述两个工作都只利用RDMA去 Read,而把 write 只允许发送到 key 所在的机器 local write
- 设计更好的数据结构优化 cache 以提高 read,可能让 write 更麻烦
- Pilaf 使用 Cuckoo hash(一种多哈希),优化读
- 但是复杂的写使得 HTM 难以被利用,容易超出 cache 范围自动 abort
- 在本地检查 WW 冲突
- 能不能用RDMA及HTM来支持both Read和Write?于是有了DRTM
简单的hash方式,使得可以利用HTM做 race detection
- 远程RDMA读或写 会 打断本地的 Txn
允许One-sided RMDA 去 write
设计 cache,只本地缓存 key 和 value 的位置信息,不包括 value 的值
- 虽然缓存仍旧需要远程读
- 但是规避了 version 来保护一致性的需要
- 多线程可以同时填充这个 cache
找到新的合适的硬件去优化分布式系统
- 了解其编程接口、性能特性
- 尽可能发挥硬件性能的软件重构以 offloading 功能到硬件
- 用软件的方式解决硬件操作可能的限制
- 一种硬件有大限制就用硬件的组合