大佬有序列化flinkcdc数据到这个格式的序列化方法么?我这边用java来解析有些卡住了。
自定义反序列化器
package com.yyds;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
/**
* 自定义序列化器
*/
public class MyDeserialization implements DebeziumDeserializationSchema<String> {
/**
*封装为json字符串
* {
* "database":"",
* "tableName":"",
* "type":"c u d",
* "before":{
* "":"",
* "":"",
* "":""
* },
* "after":{
* "":"",
* "":"",
* "":""
* }
* }
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
JSONObject res = new JSONObject();
// 获取数据库和表名称
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct value = (Struct)sourceRecord.value();
// 获取before数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if(before != null){
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(),beforeValue);
}
}
// 获取after数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if(after != null){
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(),afterValue);
}
}
//获取操作类型 READ DELETE UPDATE CREATE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if("create".equals(type)){
type = "insert";
}
// 将字段写到json对象中
res.put("database",database);
res.put("tableName",tableName);
res.put("before",beforeJson);
res.put("after",afterJson);
res.put("type",type);
//输出数据
collector.collect(res.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。