尝试用Flink的KafkaSource运行一个简单的测试程序,内容如下:
Flink 0.9
Scala 2.10.4
Kafka 0.8.2.1
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka
object TestKafka {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
.print
}
}
报错:
[ERROR] TestKafka.scala:8: error: not found: type KafkaSource
[ERROR] .addSource(new KafkaSourceString)
问题似乎是SBT和Maven配置文件不能很好地协同工作。Flink POM将Scala版本(2.10,2.11,...)称为变量,其中一些在构建配置文件中定义。 SBT未正确评估配置文件,因此包装无法正常工作。
使用以下build.sbt:
organization := "pl.japila.kafka"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"
运行如下:
import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._
object TestKafka {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
.print
}
}
输出:
[kafka-flink]> run
[info] Running TestKafka
log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraph).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[success] Total time: 0 s, completed Jul 15, 2015 9:29:31 AM
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。