flink中,我截图这里是变量配置,我创建临时表时with里面引用${secret_values.密钥名},然后对临时表查询报错,创建临时表引用${secret_values.密钥名},不能进行select吗?
java.lang.RuntimeException: Failed to fetch next result
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
at org.apache.flink.table.sqlserver.preview.ResultStore$ResultRetrievalThread.run(ResultStore.java:119)
Caused by: java.io.IOException: Failed to fetch job execution result
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
... 3 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b10c4197a85de7d3ef99fb230b085255)
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
... 5 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b10c4197a85de7d3ef99fb230b085255)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$7(ClusterClientJobClientAdapter.java:133)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$27(RestClusterClient.java:758)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.com
在 Flink SQL 中使用了变量配置,并且在创建临时表时使用了该变量。在 with 子句中引用变量的语法格式为 ${variable_name},其中 variable_name 是您定义的变量名称。
对于您的具体问题,如果您需要在创建临时表时使用一个名为 secret_values.密钥 的变量,可以按照以下步骤进行配置:
在 Flink SQL 中使用 SET 命令定义变量:
Copy
SET secret_values.密钥 = 'your_secret_key';
在创建临时表时使用变量:
scheme
Copy
CREATE TEMPORARY TABLE my_table (
...
) WITH (
'connector' = 'your_connector',
'key' = '${secret_values.密钥}',
...
);
在上述代码中,${secret_values.密钥} 就是引用了名为 secret_values.密钥 的变量。注意,变量名称和变量值都需要使用单引号或双引号括起来。
根据提供的信息,您在Flink中创建临时表时,在WITH
语句中引用了${secret_values.密钥名}
变量,并且查询临时表时遇到了报错。根据错误信息分析,这可能是由于未经授权或密码错误导致的身份验证问题。
具体地,错误信息中给出了errorCode:Unauthorized
和errorMessage:Authorize failed
,这表明身份验证失败。建议按照以下步骤进行排查:
1. 检查变量配置:确保${secret_values.密钥名}
变量的值正确,没有拼写错误,并且在变量配置中应用了正确的密钥名和对应的密码。
2. 复制密码重新新建变量:如果变量配置中的密码是从其他地方复制粘贴得到的,请尝试重新新建一个变量,直接在变量配置中手动输入密码。
3. 检查密钥权限:确认所使用的密钥(${secret_values.密钥名}
)是否具有访问相应资源的权限。确保该密钥已被授权执行所需操作。
4. 检查连接参数:如果表连接的外部系统需要其他参数(如用户名、地址等),请确保这些参数正确设置。
errorCode:Unauthorized, errorMessage:Authorize failed.你复制明文的密码到变量重新新建一下。此回答整理自钉群“实时计算Flink产品交流群”
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。