问题一:有没有用flink做过数据相关性分析的?是要用到flink ML模块吗?
有没有用flink做过数据相关性分析的?是要用到flink ML模块吗?有没有资料可以分享一下,谢谢了!
参考回答:
确实,您可以使用Flink进行数据相关性分析,并且在此过程中可能会用到Flink ML模块。Flink ML是Flink的机器学习库,这是Flink社区的一项新工作,其中包含越来越多的算法和贡献者。它提供了丰富的机器学习相关算子,从特征工程到后续的具体的分类、聚类、回归模型等都有涉及。
具体来说,您可以参考以下步骤:
- 首先需要有flink的环境,然后安装Flink ML模块。
- 从特征工程到后续的具体的分类、聚类、回归模型,利用Flink ML提供的算子进行处理。
- 最后对得到的结果进行分析,得出数据之间的相关性。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/571637
问题二:一个表既是源表又是维表,flink是怎么读取啊?
一个表既是源表又是维表,flink是怎么读取啊?
参考回答:
直接用触发器不就好了
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/571636
问题三:flink restart策略自动重启job时会不会再执行一遍用户jar包里面的main函数?
有没有哪些同学知道flink restart策略自动重启job时会不会再执行一遍用户jar包里面的main函数?就是我以yarn-per-job模式提交了一个jar包执行flink任务,然后main函数肯定是在本地节点运行后再将整个job提交到yarn生成flink集群的,但是要是我这个flink任务挂了触发自动重启的话,还会再进行一遍这个提交过程吗?
参考回答:
当你以YARN per-job模式提交一个Job时,在Job失败并重新启动时,主函数(main())不会再次被执行。这是因为主函数在第一次提交Job时就已经被调用了,并且Job的全部状态都被保存在检查点中。当Job失败并且需要恢复时,Flink会从最后一个成功的检查点开始重新执行Job。
然而,需要注意的是,如果你在主函数中进行了任何静态初始化或者全局变量的赋值,那么这些操作只会在主函数首次执行时进行,而不会在Job恢复时重复进行。因此,如果你的Job依赖于这些全局状态,那么在Job恢复时可能会出现问题。在这种情况下,你可能需要考虑将这些状态迁移到Flink的状态系统中,这样它们就可以在Job恢复时得到正确的恢复。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/571635
问题四:flink 有没有opengauss的connector ?
flink 有没有opengauss的connector ?
参考回答:
确实,Flink支持与openGauss数据库的连接。您可以使用JDBC的方式,通过在Flink的配置文件中设置相关参数来实现数据的读取和写入。例如,需要设置的参数包括:'connector.url'(为jdbc:gaussdb://10...**:25308/postgres),'connector.table'(为具体的表名),'connector.username'和'connector.password'(分别为用户名和密码),以及'connector.driver'(为com.huawei.gauss200.jdbc.Driver)。
另外,您还可以参考华为开源的debezium CDC,将openGauss connector加入其中,并打包自己的flinkCDC。这样,您就可以更加灵活地通过Flink SQL的方式将数据汇入到openGauss数据库了。需要注意的是,整个过程中操作系统建议使用openEuler20.03LTS,以避免可能出现的兼容性问题。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/571634
问题五:请问一下:我使用命令行语句提交任务到native k8s上,能够创建pod,请问问题应该如何排查呀?
请问一下:我使用命令行语句提交任务到native k8s上,能够创建pod,但是任务不能运行成功(似乎直接不能初始化)。请问问题应该如何排查呀?下面是命令行语句和日志:
./bin/flink run-application --target kubernetes-application -Dkubernetes.cluster-id=cluster231115aa -Djobmanager.memory.process.size=1024m -Dtaskmanager.memory.process.size=8192m -Dkubernetes.taskmanager.cpu=1 -Dtaskmanager.numberOfTaskSlots=4 -Dkubernetes.container.image.ref=harbor.trawe.cn/common/pyflink2:1.17.1 -Dkubernetes.namespace=flink -Dkubernetes.rest-service.exposed.type=NodePort --pyModule ls_card_blacklist_id_k8s --pyFiles /opt/python_codes/ls_card_blacklist_id_k8s.py
k8s是1.27版,flink1.17.1版
参考回答:
根据您提供的信息,问题可能出在以下几个方面:
- 检查Flink和Kubernetes的版本是否兼容。您提到Flink版本为1.17.1,而Kubernetes版本为1.27。请确保这两个版本是兼容的。您可以查看官方文档以获取更多关于兼容性的信息。
- 检查您的Kubernetes集群配置是否正确。确保您的集群已经正确配置,并且可以正常访问。您可以使用
kubectl get nodes命令来查看集群中的节点信息。 - 检查您的Flink配置文件是否正确。确保您的配置文件中包含了正确的Kubernetes相关配置,例如
jobmanager.rpc.address、taskmanager.rpc.address等。您可以查看官方文档以获取更多关于配置文件的信息。 - 检查您的Python代码是否正确。确保您的Python代码没有语法错误或其他问题。您可以使用
pylint或flake8等工具来检查代码质量。 - 查看Flink和Kubernetes的日志。您可以使用
kubectl logs命令来查看Pod的日志,以便更好地了解问题所在。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/571633