本文共 1784 字,大约阅读时间需要 5 分钟。
Spark Streaming任务在运行过程中无论是Driver还是Executor异常被中断,都有可能导致计算任务失败,进而导致数据丢失等一系列问题,为此Spark Streaming提供了高可用解决方案。
Driver是Spark Streaming程序的"大脑",一旦Driver挂掉,那么整个任务都会失败。Spark Streaming HA将Driver元数据写到checkpoint目录下,如果后面因为某些原因导致Driver节点挂掉,那么可以通过读取checkpoint目录下的数据再次重启driver,然后继续运行计算程序,不会中断,不会丢失数据
代码
object WordCountHA { val checkpointDirectory="/spark/wordcount"; def CreateContextFunc(): StreamingContext = { val conf = new SparkConf() conf.setAppName("wordcount") conf.set("spark.streaming.receiver.writeAheadLog.enable","true"); val ssc = new StreamingContext(conf, Seconds(2)) ssc.checkpoint(checkpointDirectory) //接受数据,形成了一个DStream val dataStream: DStream[String] = ssc.socketTextStream("192.168.61.146",18888) val lineDSteram: DStream[String] = dataStream.flatMap(_.split(",")) val wordAndOneDStream: DStream[(String, Int)] = lineDSteram.map((_,1)) val result = wordAndOneDStream.updateStateByKey((values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val lastCount = state.getOrElse(0) //在scala编程里面,最后一行代码就是返回值。 Some(currentCount + lastCount) }) result.print() ssc } def main(args: Array[String]): Unit = { val ssc = StreamingContext.getOrCreate(checkpointDirectory, CreateContextFunc _) ssc.start() ssc.awaitTermination() ssc.stop() }
提交脚本重要参数
standalonespark-submit --deploy-mode cluster --supervise
yarn
spark-submit --conf spark.yarn.maxAppAttempts=1 --deploy-mode cluster ...# spark.yarn.maxAppAttempts表示spark job最大重试次数
为了防止Executor异常退出导致数据丢失,Spark Streaming提供了WAL(预写日志)机制。
Receiver只要接收到数据,会立即将数据写入一份到高可用文件系统(一般是HDFS)上的checkpoint目录中,这样无论程序怎么挂掉,数据都不会丢失。设置方法
val conf = new SparkConf()conf.set("spark.streaming.receiver.writeAheadLog.enable ","true");
转载地址:http://orcmb.baihongyu.com/