Flink现在我有个这样的操作,就是原先有个接口是修改数据的接口,因为业务复杂,他是先删除后增加,然后删除失败回滚事务,添加失败也回滚事务,但是是手动的,现在有个批量更新的需求,需要我更新批量更新的时候,如果其中一个子事务失败了子事务回滚不影响外部,就是跳过这个错误的数据 继续更新后面的?
Flink支持基于两阶段提交的分布式事务。对于你的场景,可以在Flink作业中使用两阶段提交来确保操作的一致性。
参考示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // 为了简化,这里设置并行度为1
// 假设这是从Kafka读取的数据流
DataStream<MyData> inputStream = env.addSource(new FlinkKafkaConsumer<>(...));
// 使用两阶段提交来确保事务性
inputStream
.map(data -> {
// 这里是业务逻辑,比如删除和添加操作
// 如果需要手动控制事务,可以在这里实现
// ...
return data;
})
.addSink(new TwoPhaseCommitSinkFunction<MyData>() {
// 实现beginTransaction、preCommit、commit、abort等方法来控制事务
// ...
});
env.execute();
——参考链接。
您提到的操作是一个典型的“先删除后增加”操作,在数据库中经常遇到。在Flink中,处理这样的操作通常需要考虑事务性以保证数据的一致性。
首先,确保您的Flink版本支持事务性。Flink从1.12版本开始支持事务性CEP(复杂事件处理)和Table API/SQL。
对于先删除后增加的操作,您可以遵循以下步骤:
设置事务超时时间:确保您为流设置了一个合理的事务超时时间,这样在出现问题时可以回滚事务。
开始事务:使用StreamExecutionEnvironment.startTransaction()或StreamTableEnvironment.beginTransaction()开始一个新的事务。
执行删除操作:使用Flink的SQL或Table API执行删除操作。确保这一步在事务中执行。
执行增加操作:同样在事务中执行增加操作。
提交或回滚事务:根据操作的结果,决定是提交事务还是回滚事务。如果操作成功,提交事务;否则回滚事务以保持数据的一致性。
处理异常:在整个过程中,捕获并处理可能出现的异常,确保数据的完整性。
如果您的sink支持upsert(如Elasticsearch 7.x及以上版本、HBase等),则可以将修改操作表示为一个INSERT INTO ... ON CONFLICT UPDATE的操作。对于Flink,你可以使用UpsertStreamOperator或者针对特定sink支持的Upsert语义进行配置。
看起来你正在尝试在一个Spring Boot应用中使用MyBatis框架执行数据库操作。你的问题是关于如何在保存数据之前检查是否存在相同的记录以避免重复插入的问题。
要解决这个问题,你可以通过自定义插件的方式来拦截SQL语句并在其执行前做相应的校验。下面是一个简单的示例说明如何编写这样一个插件:
你需要继承 org.apache.ibatis.plugin.Interceptor
类并且重写其中的方法:
public class DuplicateKeyInterceptor extends Interceptor {
@Override
public Object intercept(Invocation invocation) throws Throwable {
// 获取sqlSession对象
SqlSession sqlSession = (SqlSession)invocation.getArgs()[0];
// 执行原生查询获取所有同名用户信息
List<User> users = sqlSession.selectList("com.example.demo.mapper.UserMapper.findByName", "张三");
if(users.size()>0){
throw new RuntimeException("存在相同的名字!");
}
return null;
}
@Override
protected void beforeAdvice(Invocation invtation) {}
}
接下来,你需要注册这个插件:
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public MybatisGlobalConfiguration mybatisConfig(){
MybatisGlobalConfiguration config = new MybatisGlobalConfiguration();
// 注册全局异常处理器
GlobalExceptionHandlers globalExceptionHandler = new GlobalExceptionHandlers();
globalExceptionHandler.setInterceptors(new DuplicateKeyInterceptor());
config.globalExceptionHandlers().add(globalExceptionHandler);
return config;
}
// 或者
@Configuration
@EnableTransactionManagement
public class TransactionConfig {
@Bean(name="transactionManager")
public PlatformTransactionManager transactionManager(@Qualifier("dataSource") DataSource dataSource) {
JpaPlatform platform = HibernateJpa PLATFORM_FACTORY.newHibernate_PLATFORM();
JpaTransactionManager manager = new JpaTransactionManager(platform);
manager.setDataSource(dataSource);
return manager;
}
@Bean
public PlatformTransactionManager hibernateTransactionManager(JpaTransactionManager jpaTransactionManager,
DataSource dataSource) {
PlatformTransactionManager pTM = new JpaTransactionManager(jpaTransactionManager);
pTM.setDataSource(dataSource);
return pTM;
}
@Bean
public DataSource dataSource() {
EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.setName("myDB");
return builder.build();
}
@Bean
public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
factory.setPersistenceUnitName("defaultPU");
factory.setPackagesToScan("com.example.demo.entity");
return factory;
}
@Bean
public static PersistenceAnnotationDrivenPersistenceProvider persistenceAnnotationDrivenPersistenceProvider(EntityManagerFactory entityMangerFactory) {
return new PersistenceAnnotationDrivenPersistenceProvider(entityMangerFactory);
}
@Bean
public static PlatformTransactionManager annotationDrivenTransactionManager(PersistenceAnnotationDrivenPersistenceProvider provider,
EntityManagerFactory emf) {
AnnotationDrivenTransactionManager tm = new AnnotationDrivenTransactionManager();
tm.setPersistenceProvider(provider);
tm.setEntityManagerFactory(emf);
return tm;
}
}
上述代码只是一个基本的例子,具体实现还需要结合项目实际情况来进行调整。
您的问题是关于如何在一个批处理框架(如 Apache Flink)中实现类似功能的操作,该操作要求在某个子事务发生异常时,其他子事务继续正常执行并忽略受影响的部分。
在 Flink 中,您可以使用 TransactionAspectSupport.currentTransactionalStatus().setRollbackOnly(); 来标记一个事务为仅 rollback,而不是 commit。如果您想让一个子事务失败时不影响其他子事务,您可以在子事务中捕获异常并将其标记为仅 rollback。这样,主事务将继续执行剩余的子事务,而不会被子事务的异常所阻止。
下面是修改后的代码片段:
@Transactional(rollbackFor = Exception.class)
public void nestTrans1() throws InterruptedException {
for (int i = 0; i < 10; i++) {
log.info("准备执行任务");
this.saveData();
}
}
@Transactional(rollbackFor = Exception.class)
public boolean saveData() throws InterruptedException {
try {
// 做一些业务
TransactionAspectSupport.currentTransactionalStatus().setRollbackOnly();
return true;
} catch (Exception e) {
if (!Thread.currentThread().isInterrupted()) {
throw e;
}
Thread.sleep(1); // 在这里模拟耗时操作,实际场景下可能是真正的耗时操作
TransactionAspectSupport.currentTransactionalStatus().setRollbackOnly();
return false;
}
}
在这个例子中,saveData 方法会在抛出异常时被捕获,并将事务标记为仅 rollback。这样,即使 saveData 子事务失败,也不会影响其他子事务的执行。
请注意,此答案假设您已经在 Spring Boot 应用中使用了 @Transactional 注解。如果不是的话,您可能需要使用其他的事务管理方式,例如编程式事务管理。
可以使用 Flink 的 Savepoint 机制来实现事务的回滚和跳过错误数据。首先,需要将 Flink 的事务日志级别设置为 DISABLE,以便在更新失败时不会回滚整个事务。然后,可以使用 try-catch 语句来捕获异常,并在 catch 语句中使用 savepoint 方法保存当前的状态。如果捕获到异常,可以跳过错误的数据并继续更新后面的数据。
以下是一个简化的示例:
@Transactional(rollbackForException.class)
public String saveData() {
boolean flag = deleteData();
if (flag) {
try {
// 更新数据
updateData();
} catch (Exception e) {
// 保存 Savepoint
savepoint();
// 跳过错误的数据
continue;
}
return "成功";
} else {
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return "异常";
}
}
注意:这个示例仅供参考,您需要根据实际的业务逻辑进行调整。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。