开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flinkcdc监控sqlserver,数据库的表中该字段是空值,而cdc中该字段的值是'N,不一致

版本
1.flinkcdc

    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-sqlserver-cdc</artifactId>
        <version>3.0.0</version>
    </dependency>

sqlserver的版本: 2019
遇到的问题
sqlserver查询BOSS_UNDER_PROCESSING的值是空值,
但是flinkcdc获得的数据变成 "BOSS_UNDER_PROCESSING": "'N",
该字段在sqlserver的设置是:BOSS_UNDER_PROCESSING nchar(1) DEFAULT N'N' NULL,
img

{
"before": null,
"after": {
"ORIGREC": 338,
"APPRDATE": null,
"APPRDISP": null,
"APPROVEDBY": null,
"APPRSTS": null,
"AUTOAPPRVTEST": null,
"AUTORELEASE": null,
"BATCHNO": null,
"CASENUMBER": null,
"CHARGENO": null,
"COCNO": null,
"COLLDATE": null,
"COMMENTS": null,
"CREATEDBY": null,
"DATEPRODAV": null,
"DATERECV": null,
"DEPT": "Changzhou",
"DESIREDTAT": null,
"DISPSTS": null,
"DRAWDATE": null,
"DRAWNBY": null,
"DUEDATE": null,
"EXTERNAL_ID": null,
"FLDSTS": "Draft",
"FLOWNAME": null,
"FOLDERFLAG": null,
"FOLDERNO": "37B10AAE-F",
"FSTEPCODE": null,
"GUID": null,
"INVENTORYID": null,
"INVESTIGATIONCODE": null,
"INVOICENUMBER": null,
"INVSTIGLAYOUTCODE": null,
"LABDUEDATE": null,
"LOGDATE": null,
"MANAGER": null,
"MATCODE": null,
"NOOFRETAINS": null,
"NOTES": null,
"ORIGSTS": "N",
"PREPRUNFOLDER": null,
"PRODGROUP": null,
"PROGRAMCODE": null,
"PROJECTNO": null,
"RASCLIENTID": null,
"RASNO": null,
"RASPROJECTNO": null,
"RECEIVEDBY": null,
"RESAMPNO": null,
"RUNFOLDER": null,
"SP_CODE": null,
"SPECNO": null,
"STABNO": null,
"STUDYTYPE": null,
"SUBMITTER": null,
"TAGCOMMENTS": null,
"WFSAMPLETYPE": null,
"WORKFLOWCODE": null,
"BATCHID": null,
"SUBMITTINGORG": null,
"FOLDERNAME": null,
"METADATA_GUID": null,
"EMFOLDERNO": null,
"BILL_RASCLIENTID": null,
"BILL_RASPROJECTNO": null,
"DISP_STUDYNO": null,
"SAMPLE_POINT_O": null,
"ORIGREC_ARC": null,
"P_BATCH_O": null,
"PRELOG_DT": null,
"SCHED_OCCURRENCE_O": null,
"F_AREA_NAME": null,
"F_PLANT": null,
"F_PROCESS_SAMPLE_TYPE": null,
"VESSEL_O": null,
"F_SPECIAL_INSTRUCTIONS": null,
"NEED_PRELIMINARY": "N",
"REP_LANG": null,
"SHIP_TO_CLIENTID": null,
"SUPPLIER_CLIENTID": null,
"MFG_CLIENTID": null,
"BUYER_CLIENTID": null,
"AGENT_CLIENTID": null,
"RETURN_ADDRESSNO": null,
"PO_NUMBER": null,
"ORDER_TEMPLATE_CODE": null,
"SERVICE_LEVEL": null,
"SAMPLE_PRODUCTION_STAGE": null,
"SALES_REP_ID": null,
"BOSS_HEADER_ID": null,
"BOSS_LINE_ID": null,
"BOSS_ORDER_STATUS": null,
"BOSS_LINE_STATUS": null,
"SOURCE": "STARLIMS",
"ORGID": null,
"COUNTRY_ORIGIN": null,
"COUNTRY_DESTINATION": null,
"SHIPTO_ADDRESSNO": null,
"BILLTO_ADDRESSNO": null,
"BOSS_TRANSACTION_ID": null,
"BOSS_TRANSACTION_NO": null,
"BOSS_TRANSACTION_DATE": null,
"BUYER_ADDRESSNO": null,
"BOSS_BATCH_SOURCE": null,
"MATTYPE": null,
"RET_ADDRESS_OPT": null,
"APPLICANT_ADDRESSNO": null,
"SUPP_ADDRESSNO": null,
"MFG_ADDRESSNO": null,
"AGENT_ADDRESSNO": null,
"RET_ADDRESS_INSTRC": null,
"FOLDERPRICE": null,
"PRICELISTID": null,
"BILL_PRICELISTID": null,
"BUYER_RASPROJECTNO": null,
"BUYER_PRICELISTID": null,
"AGENT_RASPROJECTNO": null,
"AGENT_PRICELISTID": null,
"BOSS_ORDER_NUMBER": null,
"INDUSTRY_SEGMENT": null,
"TEST_DESCRIPTION": null,
"TEST_METHOD": null,
"METHOD_YEAR": null,
"REFERENCE_ID": null,
"SAMPLE_DESCRIPTION": null,
"ON_HOLD": "N",
"SGS_BILLING_NOTES": null,
"SALES_AFFILIATE_ID": null,
"ASSIGNED_TO": null,
"SERVICE_LEVEL_ID": null,
"CLIENTNOTES": null,
"EXTERNAL_PARENT_ID": null,
"EXTERNAL_CREATEDBY": null,
"UNCERTAINTY_ID": null,
"CREATE_BOSS_ORDER": "Y",
"TEMPLATE_ID": null,
"STA_LANGID": null,
"FIRST_REPORTNO": null,
"DUEDATE_MODE": "Forward",
"DUEDATE_OVERRIDE": "N",
"CONFORMANCE_STATEMENT": null,
"SGS_WITNESS": null,
"LAB_NOTES": null,
"REPORT_NOTES": null,
"SPLIT_LANGID": null,
"SPLIT_COMMENTS": null,
"SPLIT_TYPE": null,
"SPLIT_LANGNAME": null,
"QRCODE_REPORT": null,
"SEND_REPORT_FORMAT": "\u0027PDF",
"SELF_REFERENCE": "\u0027N",
"COMMITTEDBY": null,
"FLD_RPT_STS": null,
"NEED_FINAL_REPORT": null,
"BOSS_LAST_REQ_STATUS": null,
"BOSS_UNDER_PROCESSING": "\u0027N",
"OUTSOURCEFROM": null,
"LAST_UPDATED": null,
"RPT_STS_UPTDATE": null,
"SGS_CLIENT_INFO_UPDATED": null,
"LAST_ACTIVATION_DATE": null,
"BOSS_LAST_REQ_DATE": null,
"BOSS_LAST_REQ_ERROR": null,
"PRICE_REVIEW_STATUS": "\u0027N/A",
"SGS_PO_DATE": null,
"SGS_INVOICE_TMP": null,
"SGS_PEST_EVAL_STATUS": null,
"SGS_CREATED_FROM": null,
"RPT_SAMP_TYPE": "\u0027All",
"SGSMARTTRFNO": null,
"CLIENT_IF_INT": null
},
"source": {
"version": "1.9.7.Final",
"connector": "sqlserver",
"name": "sqlserver_transaction_log_source",
"ts_ms": 1722929207790,
"snapshot": "last",
"db": "loya-demo",
"sequence": null,
"schema": "dbo",
"table": "FOLDERS",
"change_lsn": null,
"commit_lsn": "00000052:000022d0:0002",
"event_serial_no": null
},
"op": "r",
"ts_ms": 1722929208600,
"transaction": null
}

问题相关的代码

Properties mysqlProperties = new Properties();
mysqlProperties.setProperty("converters", "dateConverters");
mysqlProperties.setProperty("dateConverters.type", "com.sgs.bigdata.utils.MySqlDateTimeConverter");
Map config = new HashMap();
config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
JsonDebeziumDeserializationSchema jdd = new JsonDebeziumDeserializationSchema(false, config);
// 数据表
SourceFunction sourceFunction = SqlServerSource.builder()
.hostname(hostname)
.port(port)
.database(database) // monitor sqlserver database
.tableList(tableName) // monitor products table
.username(username)
.password(password)
.startupOptions(StartupOptions.initial())
.deserializer(jdd)
.debeziumProperties(mysqlProperties)
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
dataStreamSource.print("<<<");

不知道怎么解决?

展开
收起
游客rnexmgen6a6va 2024-08-07 10:10:09 69 0
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载