开发者社区> 问答> 正文

E-MapReduce Spark + Log Service是什么?



Spark + LogService



Spark 接入 LogService


下面这个例子演示了Spark Streaming如何消费LogService中的日志数据,统计日志条数。

方法一:Receiver Based DStream

  1. [backcolor=transparent]    val logServiceProject [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]0[backcolor=transparent])[backcolor=transparent]    [backcolor=transparent]// LogService 中 project 名
  2. [backcolor=transparent]    val logStoreName [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]1[backcolor=transparent])[backcolor=transparent]     [backcolor=transparent]// LogService 中 logstore 名
  3. [backcolor=transparent]    val loghubConsumerGroupName [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]2[backcolor=transparent])[backcolor=transparent]  [backcolor=transparent]// loghubGroupName 相同的作业将共同消费 logstore 的数据
  4. [backcolor=transparent]    val loghubEndpoint [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]3[backcolor=transparent])[backcolor=transparent]  [backcolor=transparent]// 阿里云日志服务数据类 API Endpoint
  5. [backcolor=transparent]    val accessKeyId [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeyId>"[backcolor=transparent]     [backcolor=transparent]// 访问日志服务的 AccessKeyId
  6. [backcolor=transparent]    val accessKeySecret [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeySecret>"[backcolor=transparent] [backcolor=transparent]// 访问日志服务的 AccessKeySecret
  7. [backcolor=transparent]    val numReceivers [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]4[backcolor=transparent]).[backcolor=transparent]toInt  [backcolor=transparent]// 启动多少个 Receiver 来读取 logstore 中的数据
  8. [backcolor=transparent]    val batchInterval [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]Milliseconds[backcolor=transparent]([backcolor=transparent]args[backcolor=transparent]([backcolor=transparent]5[backcolor=transparent]).[backcolor=transparent]toInt [backcolor=transparent]*[backcolor=transparent] [backcolor=transparent]1000[backcolor=transparent])[backcolor=transparent] [backcolor=transparent]// Spark Streaming 中每次处理批次时间间隔
  9. [backcolor=transparent]    val conf [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]SparkConf[backcolor=transparent]().[backcolor=transparent]setAppName[backcolor=transparent]([backcolor=transparent]"Test Loghub Streaming"[backcolor=transparent])
  10. [backcolor=transparent]    val ssc [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]StreamingContext[backcolor=transparent]([backcolor=transparent]conf[backcolor=transparent],[backcolor=transparent] batchInterval[backcolor=transparent])
  11. [backcolor=transparent]    val loghubStream [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]LoghubUtils[backcolor=transparent].[backcolor=transparent]createStream[backcolor=transparent](
  12. [backcolor=transparent]      ssc[backcolor=transparent],
  13. [backcolor=transparent]      logServiceProject[backcolor=transparent],
  14. [backcolor=transparent]      logStoreName[backcolor=transparent],
  15. [backcolor=transparent]      loghubConsumerGroupName[backcolor=transparent],
  16. [backcolor=transparent]      loghubEndpoint[backcolor=transparent],
  17. [backcolor=transparent]      numReceivers[backcolor=transparent],
  18. [backcolor=transparent]      accessKeyId[backcolor=transparent],
  19. [backcolor=transparent]      accessKeySecret[backcolor=transparent],
  20. [backcolor=transparent]      [backcolor=transparent]StorageLevel[backcolor=transparent].[backcolor=transparent]MEMORY_AND_DISK[backcolor=transparent])
  21. [backcolor=transparent]    loghubStream[backcolor=transparent].[backcolor=transparent]foreachRDD[backcolor=transparent]([backcolor=transparent]rdd [backcolor=transparent]=>[backcolor=transparent] println[backcolor=transparent]([backcolor=transparent]rdd[backcolor=transparent].[backcolor=transparent]count[backcolor=transparent]()))
  22. [backcolor=transparent]    ssc[backcolor=transparent].[backcolor=transparent]start[backcolor=transparent]()
  23. [backcolor=transparent]    ssc[backcolor=transparent].[backcolor=transparent]awaitTermination[backcolor=transparent]()


方法二: Direct API Based DStream

  1. [backcolor=transparent]    val logServiceProject [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]0[backcolor=transparent])
  2. [backcolor=transparent]    val logStoreName [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]1[backcolor=transparent])
  3. [backcolor=transparent]    val loghubConsumerGroupName [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]2[backcolor=transparent])
  4. [backcolor=transparent]    val loghubEndpoint [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]3[backcolor=transparent])
  5. [backcolor=transparent]    val accessKeyId [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]4[backcolor=transparent])
  6. [backcolor=transparent]    val accessKeySecret [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]5[backcolor=transparent])
  7. [backcolor=transparent]    val batchInterval [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]Milliseconds[backcolor=transparent]([backcolor=transparent]args[backcolor=transparent]([backcolor=transparent]6[backcolor=transparent]).[backcolor=transparent]toInt [backcolor=transparent]*[backcolor=transparent] [backcolor=transparent]1000[backcolor=transparent])
  8. [backcolor=transparent]    val zkConnect [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]7[backcolor=transparent])
  9. [backcolor=transparent]    val checkpointPath [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]8[backcolor=transparent])
  10. [backcolor=transparent]    [backcolor=transparent]def[backcolor=transparent] functionToCreateContext[backcolor=transparent]():[backcolor=transparent] [backcolor=transparent]StreamingContext[backcolor=transparent] [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]{
  11. [backcolor=transparent]      val conf [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]SparkConf[backcolor=transparent]().[backcolor=transparent]setAppName[backcolor=transparent]([backcolor=transparent]"Test Direct Loghub Streaming"[backcolor=transparent])
  12. [backcolor=transparent]      val ssc [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]StreamingContext[backcolor=transparent]([backcolor=transparent]conf[backcolor=transparent],[backcolor=transparent] batchInterval[backcolor=transparent])
  13. [backcolor=transparent]      val zkParas [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]Map[backcolor=transparent]([backcolor=transparent]"zookeeper.connect"[backcolor=transparent] [backcolor=transparent]->[backcolor=transparent] zkConnect[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]"enable.auto.commit"[backcolor=transparent] [backcolor=transparent]->[backcolor=transparent] [backcolor=transparent]"false"[backcolor=transparent])
  14. [backcolor=transparent]      val loghubStream [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]LoghubUtils[backcolor=transparent].[backcolor=transparent]createDirectStream[backcolor=transparent](
  15. [backcolor=transparent]        ssc[backcolor=transparent],
  16. [backcolor=transparent]        logServiceProject[backcolor=transparent],
  17. [backcolor=transparent]        logStoreName[backcolor=transparent],
  18. [backcolor=transparent]        loghubConsumerGroupName[backcolor=transparent],
  19. [backcolor=transparent]        accessKeyId[backcolor=transparent],
  20. [backcolor=transparent]        accessKeySecret[backcolor=transparent],
  21. [backcolor=transparent]        loghubEndpoint[backcolor=transparent],
  22. [backcolor=transparent]        zkParas[backcolor=transparent],
  23. [backcolor=transparent]        [backcolor=transparent]LogHubCursorPosition[backcolor=transparent].[backcolor=transparent]END_CURSOR[backcolor=transparent])
  24. [backcolor=transparent]      ssc[backcolor=transparent].[backcolor=transparent]checkpoint[backcolor=transparent]([backcolor=transparent]checkpointPath[backcolor=transparent])
  25. [backcolor=transparent]      val stream [backcolor=transparent]=[backcolor=transparent] loghubStream[backcolor=transparent].[backcolor=transparent]checkpoint[backcolor=transparent]([backcolor=transparent]batchInterval[backcolor=transparent])
  26. [backcolor=transparent]      stream[backcolor=transparent].[backcolor=transparent]foreachRDD[backcolor=transparent]([backcolor=transparent]rdd [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]{
  27. [backcolor=transparent]        println[backcolor=transparent]([backcolor=transparent]rdd[backcolor=transparent].[backcolor=transparent]count[backcolor=transparent]())
  28. [backcolor=transparent]        loghubStream[backcolor=transparent].[backcolor=transparent]asInstanceOf[backcolor=transparent][[backcolor=transparent]DirectLoghubInputDStream[backcolor=transparent]].[backcolor=transparent]commitAsync[backcolor=transparent]()
  29. [backcolor=transparent]      [backcolor=transparent]})
  30. [backcolor=transparent]      ssc
  31. [backcolor=transparent]    [backcolor=transparent]}
  32. [backcolor=transparent]    val ssc [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]StreamingContext[backcolor=transparent].[backcolor=transparent]getOrCreate[backcolor=transparent]([backcolor=transparent]checkpointPath[backcolor=transparent],[backcolor=transparent] functionToCreateContext _[backcolor=transparent])
  33. [backcolor=transparent]    ssc[backcolor=transparent].[backcolor=transparent]start[backcolor=transparent]()
  34. [backcolor=transparent]    ssc[backcolor=transparent].[backcolor=transparent]awaitTermination[backcolor=transparent]()

从E-MapReduce SDK 1.4.0版本开始,提供基于Direct API的实现方式。这种方式可以避免将Loghub数据重复存储到Write Ahead Log中,也即无需开启Spark Streaming的WAL特性即可实现数据的at least once。目前Direct API实现方式处于experimental状态,需要注意的地方有:
  • 在DStream的action中,必须做一次commit操作。
  • 一个Spark Streaming中,不支持对logstore数据源做多个action操作。
  • Direct API方式需要zookeeper服务的支持。


支持MetaService


上面的例子中,我们都是显式地将AK传入到接口中。不过从E-MapReduce SDK 1.3.2版本开始,Spark Streaming可以基于MetaService实现免AK处理LogService数据。具体可以参考E-MapReduce SDK中的LoghubUtils类说明:
  1. [backcolor=transparent]LoghubUtils[backcolor=transparent].[backcolor=transparent]createStream[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] logServiceProject[backcolor=transparent],[backcolor=transparent] logStoreName[backcolor=transparent],[backcolor=transparent] loghubConsumerGroupName[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent])
  2. [backcolor=transparent]LoghubUtils[backcolor=transparent].[backcolor=transparent]createStream[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] logServiceProject[backcolor=transparent],[backcolor=transparent] logStoreName[backcolor=transparent],[backcolor=transparent] loghubConsumerGroupName[backcolor=transparent],[backcolor=transparent] numReceivers[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent])
  3. [backcolor=transparent]LoghubUtils[backcolor=transparent].[backcolor=transparent]createStream[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] logServiceProject[backcolor=transparent],[backcolor=transparent] logStoreName[backcolor=transparent],[backcolor=transparent] loghubConsumerGroupName[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent],[backcolor=transparent] cursorPosition[backcolor=transparent],[backcolor=transparent] mLoghubCursorStartTime[backcolor=transparent],[backcolor=transparent] forceSpecial[backcolor=transparent])
  4. [backcolor=transparent]LoghubUtils[backcolor=transparent].[backcolor=transparent]createStream[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] logServiceProject[backcolor=transparent],[backcolor=transparent] logStoreName[backcolor=transparent],[backcolor=transparent] loghubConsumerGroupName[backcolor=transparent],[backcolor=transparent] numReceivers[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent],[backcolor=transparent] cursorPosition[backcolor=transparent],[backcolor=transparent] mLoghubCursorStartTime[backcolor=transparent],[backcolor=transparent] forceSpecial[backcolor=transparent])


说明

  • E-MapReduce SDK支持LogService的三种消费模式,即“BEGIN_CURSOR”,“END_CURSOR”和“SPECIAL_TIMER_CURSOR”,默认是“END_CURSOR”。BEGIN_CURSOR:从日志头开始消费,如果有checkpoint记录,则从checkpoint处开始消费。
  • END_CURSOR:从日志尾开始消费,如果有checkpoint记录,则从checkpoint处开始消费。
  • SPECIAL_TIMER_CURSOR:从指定时间点开始消费,如果有checkpoint记录,则从checkpoint处开始消费。单位为秒。
  • 以上三种消费模式都受到checkpoint记录的影响,如果存在checkpoint记录,则从checkpoint处开始消费,不管指定的是什么消费模式。E-MapReduce SDK基于“SPECIAL_TIMER_CURSOR”模式支持用户强制在指定时间点开始消费:在LoghubUtils#createStream接口中,以下参数需要组合使用:cursorPosition:LogHubCursorPosition.SPECIAL_TIMER_CURSOR
  • forceSpecial:true
E-MapReduce 的机器(除了 Master 节点)无法连接公网。配置 LogService endpoint 时,请注意使用 Log Service 提供的内网 endpoint,否则无法请求到 Log Service。更多关于 LogService,请查看 文档

附录


完整示例代码请看:

展开
收起
nicenelly 2017-10-27 16:06:54 1745 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载