Flink hbase短路读取datanode报错?java.net.SocketTimeoutException: read(2) error: Resource temporarily unavailable
2024-05-20 08:22:25,735 WARN [RpcServer.default.RWQ.Fifo.read.handler=144,queue=12,port=16020] hdfs.BlockReaderFactory: BlockReaderFactory(fileName=/hbase/data/default/alerts/6aedb1697e29f1e4be88996849fbb716/data/e20886e05d7b4fbcac8e209800a76709, block=BP-1719712434-10.80.10.150-1522218330932:blk_1589343399_515603961): I/O error requesting file descriptors. Disabling domain socket DomainSocket(fd=753,path=/var/lib/hadoop-hdfs/dn_socket)
java.net.SocketTimeoutException: read(2) error: Resource temporarily unavailable
at org.apache.hadoop.net.unix.DomainSocket.readArray0(Native Method)
at org.apache.hadoop.net.unix.DomainSocket.access$000(DomainSocket.java:45)
at org.apache.hadoop.net.unix.DomainSocket$DomainInputStream.read(DomainSocket.java:532)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2292)
at org.apache.hadoop.hdfs.BlockReaderFactory.requestFileDescriptors(BlockReaderFactory.java:542)
at org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:490)
at org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:782)
at org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:716)
at org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
at org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1161)
at org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1086)
at org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1439)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1402)
at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:89)
at org.apache.hadoop.hbase.io.hfile.HFileBlock.positionalReadWithExtra(HFileBlock.java:805)
at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderImpl.readAtOffset(HFileBlock.java:1565)
at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderImpl.readBlockDataInternal(HFileBlock.java:1769)
at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderImpl.readBlockData(HFileBlock.java:1594)
at org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.readBlock(HFileReaderImpl.java:1488)
at org.apache.hadoop.hbase.io.hfile.HFileBlockIndex$CellBasedKeyBlockIndexReader.loadDataBlockWithScanInfo(HFileBlockIndex.java:340)
at org.apache.hadoop.hbase.io.hfile.HFileReaderImpl$HFileScannerImpl.seekTo(HFileReaderImpl.java:852)
at org.apache.hadoop.hbase.io.hfile.HFileReaderImpl$HFileScannerImpl.seekTo(HFileReaderImpl.java:802)
at org.apache.hadoop.hbase.regionserver.StoreFileScanner.seekAtOrAfter(StoreFileScanner.java:326)
at org.apache.hadoop.hbase.regionserver.StoreFileScanner.seek(StoreFileScanner.java:227)
at org.apache.hadoop.hbase.regionserver.StoreFileScanner.enforceSeek(StoreFileScanner.java:470)
at org.apache.hadoop.hbase.regionserver.KeyValueHeap.pollRealKV(KeyValueHeap.java:369)
at org.apache.hadoop.hbase.regionserver.KeyValueHeap.(KeyValueHeap.java:103)
at org.apache.hadoop.hbase.regionserver.KeyValueHeap.(KeyValueHeap.java:81)
at org.apache.hadoop.hbase.regionserver.StoreScanner.resetKVHeap(StoreScanner.java:407)
at org.apache.hadoop.hbase.regionserver.StoreScanner.(StoreScanner.java:257)
at org.apache.hadoop.hbase.regionserver.MobStoreScanner.(MobStoreScanner.java:44)
at org.apache.hadoop.hbase.regionserver.HMobStore.createScanner(HMobStore.java:159)
at org.apache.hadoop.hbase.regionserver.HStore.getScanner(HStore.java:1943)
at org.apache.hadoop.hbase.regionserver.HRegion$RegionScannerImpl.initializeScanners(HRegion.java:6181)
at org.apache.hadoop.hbase.regionserver.HRegion$RegionScannerImpl.(HRegion.java:6161)
at org.apache.hadoop.hbase.regionserver.HRegion.instantiateRegionScanner(HRegion.java:2809)
在使用Apache Flink进行大数据处理时,如果涉及到与HBase的集成,并且你尝试实现短路读取(short-circuit reads)以直接从Hadoop的DataNode读取数据以提高性能,可能会遇到一些错误。短路读取是一种优化手段,它允许HBase客户端直接通过网络从DataNode读取数据,绕过NameNode的HTTP服务器,从而减少网络延迟和增加吞吐量。
当你遇到与短路读取相关的错误时,可以从以下几个方面进行排查和解决:
Hadoop配置 (hdfs-site.xml):
dfs.domain.socket.path:指定HDFS域套接字的路径,这是DataNode上用于短路读取的UNIX域套接字的路径。
dfs.client.read.shortcircuit:启用或禁用短路读取。
dfs.client.use.legacy.blockreader.local:在某些版本的Hadoop中,这个配置用于启用或禁用旧的本地块读取器,这可能会影响短路读取。
HBase配置 (hbase-site.xml):
确保HBase配置为使用Hadoop的短路读取功能,这通常通过正确设置与HDFS交互的配置项来间接实现。
在Apache Flink中,如果你尝试从HBase读取数据,并且遇到了与HBase DataNode的java.net.SocketTimeoutException错误,这通常表明Flink任务试图从HBase DataNode读取数据时,由于某种原因连接超时了。错误Resource temporarily unavailable可能意味着资源暂时不可用,这可能是由于网络问题、HBase集群过载、配置问题或其他因素引起的。
以下是一些可能的解决步骤和代码示例(注意,这里不直接提供完整的代码示例,因为具体的代码取决于你的Flink和HBase集成方式):
检查网络连接:
确保Flink集群能够成功访问HBase集群的所有DataNode。你可以使用ping命令或其他网络工具来测试网络连接。
检查HBase集群状态:
登录到HBase的Master和RegionServer节点,检查它们的状态和日志。确认没有资源耗尽(如内存、磁盘空间、文件句柄等)或其他类型的错误。
调整Flink和HBase的配置:
根据你的环境和需求,可能需要调整Flink和HBase的配置参数。例如,增加Flink的taskmanager.network.memory.fraction或taskmanager.network.memory.min以提供更多的网络缓冲区内存。对于HBase,可能需要调整hbase.regionserver.handler.count或hbase.regionserver.lease.period等参数。
增加超时设置:
如果问题是由于网络延迟或HBase集群负载较重导致的,你可以尝试增加Flink或HBase中的超时设置。例如,在Flink的flink-conf.yaml中设置更长的超时时间。
代码优化:
如果你的Flink作业正在执行复杂的HBase操作或大量的并发读取,考虑优化你的代码以减少对HBase的压力。例如,通过批量读取、缓存常用数据或优化查询语句来减少网络I/O和HBase的负载。
升级或修复版本:
如果你使用的是较旧的Flink或HBase版本,考虑升级到最新稳定版本。新版本可能包含对性能和稳定性的改进。
查看日志和监控:
详细检查Flink和HBase的日志文件以获取更多关于错误的上下文信息。使用监控工具(如Prometheus、Grafana等)来监控集群的性能和资源使用情况。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。