开发者社区> 问答> 正文

E-MapReduce Spark + MNS是什么?



Spark + MNS



Spark 接入 MNS


下面这个例子演示了 Spark Streaming 如何消费 MNS 中的数据,统计每个 batch 内的单词个数。

  1.     val conf = new SparkConf().setAppName("Test MNS Streaming")
  2.     val batchInterval = Seconds(10)
  3.     val ssc = new StreamingContext(conf, batchInterval)
  4.     val queuename = "queuename"
  5.     val accessKeyId = "<accessKeyId>"
  6.     val accessKeySecret = "<accessKeySecret>"
  7.     val endpoint = "http://xxx.yyy.zzzz/abc"
  8.     val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
  9.       StorageLevel.MEMORY_ONLY)
  10.     mnsStream.foreachRDD( rdd => {
  11.       rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
  12.         .map(word => (word, 1))
  13.         .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
  14.     })
  15.     ssc.start()
  16.     ssc.awaitTermination()


支持MetaService


上面的例子中,我们都是显式地将AK传入到接口中。不过从E-MapReduce SDK 1.3.2版本开始,SparkStreaming可以基于MetaService实现免AK处理MNS数据。具体可以参考E-MapReduce SDK中的MnsUtils类说明:
  1. MnsUtils.createPullingStreamAsBytes(ssc, queueName, endpoint, storageLevel)
  2. MnsUtils.createPullingStreamAsRawBytes(ssc, queueName, endpoint, storageLevel)


附录


完整示例代码请看:

展开
收起
nicenelly 2017-10-30 16:04:00 1676 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载