请问下,flink是docker部署的,怎么使用sql gateway?
阿里云 Flink 是基于 Docker 部署的,可以通过在容器中运行 SQL Gateway 来实现使用 SQL 界面访问 Flink 集群。
下面是一些基本步骤:
docker pull flinksql/sql-gateway
然后,使用以下命令在容器中启动 SQL Gateway:
docker run -d -p 8083:8083 flinksql/sql-gateway
这将在容器中启动 SQL Gateway,将其映射到主机的 8083 端口上。
例如,您可以在配置文件中添加以下配置:
flink:
url: "http://<flink-master>:8081"
其中 <flink-master>
是 Flink 集群的主节点的 IP 地址或域名。
例如,您可以在浏览器中访问以下 URL:
http://<sql-gateway>:8083
其中 <sql-gateway>
是 SQL Gateway 容器所在主机的 IP 地址或域名。
要在Flink上使用SQL Gateway,你需要在Docker中部署Apache Flink集群。以下是一些步骤:
在Docker中创建一个新的网络,以便将所有组件连接起来。
docker network create flink-network 下载并运行Apache Flink镜像,并将其加入上述网络:
docker run
--name flink-jobmanager
--network flink-network
-p 8081:8081
-d
apache/flink:1.13.0 jobmanager docker run
--name flink-taskmanager
--network flink-network
-e JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
-d
apache/flink:1.13.0 taskmanager 下载并运行Apache ZooKeeper镜像,也将其加入上述网络:
docker run
--name zookeeper
--network flink-network
-p 2181:2181
-d
zookeeper:3.6.3 下载并运行Apache Kafka镜像,并将其加入上述网络:
docker run
--name kafka
--network flink-network
-p 9092:9092
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
-d
confluentinc/cp-kafka:5.5.1 在Flink中启用SQL Gateway:
docker exec -it flink-jobmanager ./bin/sql-client.sh embedded 运行SQL Gateway后,在控制台提示符下,您可以执行以下操作:
使用以下命令测试连接的Flink群集:
!connect jdbc:flink://flink-jobmanager:8081 执行Flink SQL查询:
SELECT * FROM test_table; 需要注意的是,这些步骤只是一些示例性步骤,每个组件的版本、配置细节和运行环境都会有所不同。在实际情况中,您需要根据您的环境和要求调整配置和设置。
services:
sql-gateway:
image: ververica/sql-gateway:latest
ports:
- "8082:8082"
environment:
- SQL_GATEWAY_JOB_MANAGER_ADDRESS=http://jobmanager:8081
networks:
default:
name: flink-network
rest.port: 8081
rest.address: jobmanager
rest.bind-address: 0.0.0.0
rest.mode: bind
重新启动docker-compose
通过http://localhost:8082/访问SQL Gateway Web UI
在SQL Gateway中使用以下语句创建一个表:
CREATE TABLE orders (
user_id INT,
amount FLOAT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'orders',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = 'kafka:9092',
'format.type' = 'json'
);
SELECT user_id, SUM(amount)
FROM orders
WHERE order_time BETWEEN TIMESTAMP '2023-04-01 00:00:00' AND TIMESTAMP '2023-04-10 00:00:00'
GROUP BY user_id
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。