这是我的代码,它仅仅把数据从 datagen source 写入到了 jdbc sink.
package main;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
public class Main {
public static void main(String[] args) {
StreamTableEnvironment tEnv = StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment(),
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
);
tEnv.executeSql("CREATE TABLE gen_stuff (\n" +
"\tstuff_id int,\n" +
"\tstuff_base_id int,\n" +
"\tstuff_name varchar(20)\n" +
") WITH (\n" +
" 'connector' = 'datagen'," +
"'rows-per-second'='10000000'," +
"'fields.stuff_id.kind'='sequence'," +
"'fields.stuff_id.start'='1'," +
"'fields.stuff_id.end'='10000000'," +
"'fields.stuff_name.length'='15'" +
")"
);
tEnv.executeSql("CREATE TABLE result_stuff (\n" +
"\tstuff_id int,\n" +
"\tstuff_base_id int,\n" +
"\tstuff_name varchar(20)\n" +
") WITH (\n" +
"\t'connector' = 'jdbc',\n" +
"\t'url' =
'jdbc:mysql://127.0.0.1:3306/test?rewritebatchedstatements=true',\n" +
"\t'table-name' = 'result_stuff',\n" +
"\t'username' = 'root',\n" +
"\t'password' = ''\n" +
")"
);
tEnv.executeSql("insert into result_stuff select stuff_id,
stuff_base_id, stuff_name from gen_stuff");
}
}
然而,mysql 每秒大约只多 10000 条数据。如果按一条数据 20B 来算,写入速度是 200KB/s,这无法满足我的需求。。。
请问,是我哪里的配置有问题,还是有其它更好的写入数据库的方案,谢谢给出任何建议的人。
我使用的和 jdbc 有关的依赖如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
</dependency>
(作为对比,在我的电脑上使用 datagen 生成数据,写入文件系统 sinker 的效率大约是 23MB/s) *来自志愿者整理的flink邮件归档
每秒10000多条不算少了吧,如果还想再高一些,可以提高一下sink.buffer-flush.max-rows配置,默认是100*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。