开发者社区> 问答> 正文

CDN 实时日志分析 Connector 定义是什么?

已解决

CDN 实时日志分析 Connector 定义是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 15:44:53 453 0
1 条回答
写回答
取消 提交回答
  • 推荐回答

    我们完成了需求分析和 UDF 的定义,我们开始进行作业的开发了,按照通用的作业结构,需要定义 Source connector 来读取 Kafka 数据,定义 Sink connector 来将计算结果存储到 MySQL。最后是编写统计逻辑。

    在这特别说明一下,在 PyFlink 中也支持 SQL DDL 的编写,我们用一个简单的 DDL 描述,就完成了 Source Connector的开发。其中 connector.type 填写 kafka。SinkConnector 也一样,用一行DDL描述即可,其中 connector.type 填写 jdbc。描述 connector 的逻辑非常简单,我们再看看核心统计逻辑是否也一样简单:)

    kafka_source_ddl = """
    CREATE TABLE cdn_access_log (
     uuid VARCHAR,
     client_ip VARCHAR,
     request_time BIGINT,
     response_size BIGINT,
     uri VARCHAR
    ) WITH (
     'connector.type' = 'kafka',
     'connector.version' = 'universal',
     'connector.topic' = 'access_log',
     'connector.properties.zookeeper.connect' = 'localhost:2181',
     'connector.properties.bootstrap.servers' = 'localhost:9092',
     'format.type' = 'csv',
     'format.ignore-parse-errors' = 'true'
    )
    """
    
    mysql_sink_ddl = """
    CREATE TABLE cdn_access_statistic (
     province VARCHAR,
     access_count BIGINT,
     total_download BIGINT,
     download_speed DOUBLE
    ) WITH (
     'connector.type' = 'jdbc',
     'connector.url' = 'jdbc:mysql://localhost:3306/Flink',
     'connector.table' = 'access_statistic',
     'connector.username' = 'root',
     'connector.password' = 'root',
     'connector.write.flush.interval' = '1s'
    )
    """
    
    2021-12-07 15:45:13
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
PostgresChina2018_赖思超_PostgreSQL10_hash索引的WAL日志修改版final 立即下载
Kubernetes下日志实时采集、存储与计算实践 立即下载
日志数据采集与分析对接 立即下载