可以参考MySQLDialect及flinkStreamSql 实现SqlServerDialect。
package org.apache.flink.connector.jdbc.dialect;
/**
* SqlServerDialect
*
* @author zhanjian@pcuuu.com
* @date 2021/4/20 10:13
*/
public class SqlServerDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
// Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
// Define MAX/MIN precision of DECIMAL type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
@Override
public String dialectName() {
return "SqlServer";
}
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:jtds:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new SqlServerConverter(rowType);
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("net.sourceforge.jtds.jdbc.Driver");
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
StringBuilder sb = new StringBuilder();
sb.append(
"MERGE INTO "
+ tableName
+ " T1 USING "
+ "("
+ buildDualQueryStatement(fieldNames)
+ ") T2 ON ("
+ buildConnectionConditions(uniqueKeyFields)
+ ") ");
String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, true);
if (StringUtils.isNotEmpty(updateSql)) {
sb.append(" WHEN MATCHED THEN UPDATE SET ");
sb.append(updateSql);
}
sb.append(
" WHEN NOT MATCHED THEN "
+ "INSERT ("
+ Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(","))
+ ") VALUES ("
+ Arrays.stream(fieldNames)
.map(col -> "T2." + quoteIdentifier(col))
.collect(Collectors.joining(","))
+ ")");
sb.append(";");
return Optional.of(sb.toString());
}
/**
* build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
*
* @param fieldNames
* @param uniqueKeyFields
* @param allReplace
* @return
*/
private String buildUpdateConnection(
String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
List<String> uniqueKeyList = Arrays.asList(uniqueKeyFields);
return Arrays.stream(fieldNames)
.filter(col -> !uniqueKeyList.contains(col))
.map(
col -> {
return allReplace
? quoteIdentifier("T1")
+ "."
+ quoteIdentifier(col)
+ " ="
+ quoteIdentifier("T2")
+ "."
+ quoteIdentifier(col)
: quoteIdentifier("T1")
+ "."
+ quoteIdentifier(col)
+ " =ISNULL("
+ quoteIdentifier("T2")
+ "."
+ quoteIdentifier(col)
+ ","
+ quoteIdentifier("T1")
+ "."
+ quoteIdentifier(col)
+ ")";
})
.collect(Collectors.joining(","));
}
private String buildConnectionConditions(String[] uniqueKeyFields) {
return Arrays.stream(uniqueKeyFields)
.map(col -> "T1." + quoteIdentifier(col) + "=T2." + quoteIdentifier(col))
.collect(Collectors.joining(","));
}
/**
* build select sql , such as (SELECT ? "A",? "B" FROM DUAL)
*
* @param column destination column
* @return
*/
public String buildDualQueryStatement(String[] column) {
StringBuilder sb = new StringBuilder("SELECT ");
String collect =
Arrays.stream(column)
.map(col -> ":" + quoteIdentifier(col) + " " + quoteIdentifier(col))
.collect(Collectors.joining(", "));
sb.append(collect);
return sb.toString();
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
// The data types used in Mysql are list at:
// https://dev.mysql.com/doc/refman/8.0/en/data-types.html
// TODO: We can't convert BINARY data type to
// PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
// LegacyTypeInfoDataTypeConverter.
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。