开发者社区> 问答> 正文

E-MapReduce Spark + MNS是什么?



Spark + MNS



Spark 接入 MNS


下面这个例子演示了 Spark Streaming 如何消费 MNS 中的数据,统计每个 batch 内的单词个数。

  1. [backcolor=transparent]    val conf [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]SparkConf[backcolor=transparent]().[backcolor=transparent]setAppName[backcolor=transparent]([backcolor=transparent]"Test MNS Streaming"[backcolor=transparent])
  2. [backcolor=transparent]    val batchInterval [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]Seconds[backcolor=transparent]([backcolor=transparent]10[backcolor=transparent])
  3. [backcolor=transparent]    val ssc [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]StreamingContext[backcolor=transparent]([backcolor=transparent]conf[backcolor=transparent],[backcolor=transparent] batchInterval[backcolor=transparent])
  4. [backcolor=transparent]    val queuename [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"queuename"
  5. [backcolor=transparent]    val accessKeyId [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeyId>"
  6. [backcolor=transparent]    val accessKeySecret [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeySecret>"
  7. [backcolor=transparent]    val endpoint [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"http://xxx.yyy.zzzz/abc"
  8. [backcolor=transparent]    val mnsStream [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]MnsUtils[backcolor=transparent].[backcolor=transparent]createPullingStreamAsRawBytes[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] queuename[backcolor=transparent],[backcolor=transparent] accessKeyId[backcolor=transparent],[backcolor=transparent] accessKeySecret[backcolor=transparent],[backcolor=transparent] endpoint[backcolor=transparent],
  9. [backcolor=transparent]      [backcolor=transparent]StorageLevel[backcolor=transparent].[backcolor=transparent]MEMORY_ONLY[backcolor=transparent])
  10. [backcolor=transparent]    mnsStream[backcolor=transparent].[backcolor=transparent]foreachRDD[backcolor=transparent]([backcolor=transparent] rdd [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]{
  11. [backcolor=transparent]      rdd[backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]bytes [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]String[backcolor=transparent]([backcolor=transparent]bytes[backcolor=transparent])).[backcolor=transparent]flatMap[backcolor=transparent]([backcolor=transparent]line [backcolor=transparent]=>[backcolor=transparent] line[backcolor=transparent].[backcolor=transparent]split[backcolor=transparent]([backcolor=transparent]" "[backcolor=transparent]))
  12. [backcolor=transparent]        [backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]word [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]([backcolor=transparent]word[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]1[backcolor=transparent]))
  13. [backcolor=transparent]        [backcolor=transparent].[backcolor=transparent]reduceByKey[backcolor=transparent]([backcolor=transparent]_ [backcolor=transparent]+[backcolor=transparent] _[backcolor=transparent]).[backcolor=transparent]collect[backcolor=transparent]().[backcolor=transparent]foreach[backcolor=transparent]([backcolor=transparent]e [backcolor=transparent]=>[backcolor=transparent] println[backcolor=transparent]([backcolor=transparent]s[backcolor=transparent]"word: ${e._1}, cnt: ${e._2}"[backcolor=transparent]))
  14. [backcolor=transparent]    [backcolor=transparent]})
  15. [backcolor=transparent]    ssc[backcolor=transparent].[backcolor=transparent]start[backcolor=transparent]()
  16. [backcolor=transparent]    ssc[backcolor=transparent].[backcolor=transparent]awaitTermination[backcolor=transparent]()


支持MetaService


上面的例子中,我们都是显式地将AK传入到接口中。不过从E-MapReduce SDK 1.3.2版本开始,Spark Streaming可以基于MetaService实现免AK处理MNS数据。具体可以参考E-MapReduce SDK中的MnsUtils类说明:
  1. [backcolor=transparent]MnsUtils[backcolor=transparent].[backcolor=transparent]createPullingStreamAsBytes[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] queueName[backcolor=transparent],[backcolor=transparent] endpoint[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent])
  2. [backcolor=transparent]MnsUtils[backcolor=transparent].[backcolor=transparent]createPullingStreamAsRawBytes[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] queueName[backcolor=transparent],[backcolor=transparent] endpoint[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent])


附录


完整示例代码请看:

展开
收起
nicenelly 2017-10-27 16:07:36 1866 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载