开发者社区> 问答> 正文

flink数据sink到mysql 是事务处理怎么解决?

目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 > 多条记录的操作 > sink的invoke代码 > @Override > public void invoke(Tuple5<String, String, String, String, > List<BroadBandReq>> value, Context context) throws Exception { > connection.setAutoCommit(false); > List<BroadBandReq> f4 = value.f4; > for (BroadBandReq rs: f4){ > statement.setString(1,rs.getUserId()); > statement.setString(2,rs.getPhoneNum()); > statement.setString(3,rs.getProvId()); > statement.addBatch(); > } > try { > statement.executeBatch(); > connection.commit(); > }catch (Exception e){ > LOG.info(" add data for rds ; operTag:{}, > userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, > value.f2, value.f3,f4); > connection.rollback(); > e.printStackTrace(); >             throw new Exception(e); > } >     } > > > > > java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when > trying to get lock; try restarting transaction >         at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73) >         at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18) >         at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >         at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) >         at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) >         at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) >         at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) >         at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) >         at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) >         at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) >         at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627) >         at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718) >         at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736) >         at > org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) >         at > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162) >         at > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157) >         at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) >         at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) >         at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) >         at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) >         at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) >         at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) >         at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) >         at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) >         at org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) >         at org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) >         at org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) >         at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) >         at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) >         at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) >         at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) >         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >         at java.lang.Thread.run(Thread.java:748) > Caused by: java.sql.BatchUpdateException: Deadlock found when trying to > get lock; try restarting transaction >         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) >         at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >         at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >         at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >         at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) >         at com.mysql.jdbc.Util.getInstance(Util.java:408) >         at > com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163) >         at > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772) >         at > com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262) >         at > com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) >         at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67) >         ... 33 more > Caused by: > com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock > found when trying to get lock; try restarting transaction >         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) >         at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >         at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >         at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >         at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) >         at com.mysql.jdbc.Util.getInstance(Util.java:408) >         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952) >         at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976) >         at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912) >         at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530) >         at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683) >         at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) >         at > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) >         at > com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079) >         at > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756) >         ... 36 more > [flink-akka.actor.default-dispatcher-783]

*来自志愿者整理的flink邮件归档

展开
收起
游客nnqbtnagn7h6s 2021-12-06 20:40:48 1576 0
1 条回答
写回答
取消 提交回答
  • 我们也会用幂等处理类似的东西。 1.你要么单条数据处理 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的

    *来自志愿者整理的flink邮件归档

    2021-12-06 21:54:00
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像