大数据Flink Sink

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生内存数据库 Tair,内存型 2GB
云数据库 Redis 版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 大数据Flink Sink

1 预定义Sink

1.1 基于控制台和文件的Sink

⚫ API

1.ds.print 直接输出到控制台

2.ds.printToErr() 直接输出到控制台,用红色

3.ds.writeAsText(“本地/HDFS的path”,WriteMode.OVERWRITE).setParallelism(1)

⚫ 注意:

在输出到path的时候,可以在前面设置并行度,如果

并行度>1,则path为目录

并行度=1,则path为文件名

⚫ 代码演示:

package cn.oldlu.sink;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * Author oldlu
 * Desc
 * 1.ds.print 直接输出到控制台
 * 2.ds.printToErr() 直接输出到控制台,用红色
 * 3.ds.collect 将分布式数据收集为本地集合
 * 4.ds.setParallelism(1).writeAsText("本地/HDFS的path",WriteMode.OVERWRITE)
 */
public class SinkDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.source
        //DataStream<String> ds = env.fromElements("hadoop", "flink");
        DataStream<String> ds = env.readTextFile("data/input/words.txt");
        //3.transformation
        //4.sink
        ds.print();
        ds.printToErr();
        ds.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE).setParallelism(2);
        //注意:
        //Parallelism=1为文件
        //Parallelism>1为文件夹
        //5.execute
        env.execute();
    }
}

2 自定义Sink

2.1 MySQL

⚫ 需求:

将Flink集合中的数据通过自定义Sink保存到MySQL

⚫ 代码实现:

package cn.oldlu.sink;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
 * Author oldlu
 * Desc
 * 使用自定义sink将数据保存到MySQL
 */
public class SinkDemo02_MySQL {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStream<Student> studentDS = env.fromElements(new Student(null, "tonyma", 18));
        //3.Transformation
        //4.Sink
        studentDS.addSink(new MySQLSink());
        //5.execute
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
    public static class MySQLSink extends RichSinkFunction<Student> {
        private Connection conn = null;
        private PreparedStatement ps = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            //加载驱动,开启连接
            //Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
            String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)";
            ps = conn.prepareStatement(sql);
        }
        @Override
        public void invoke(Student value, Context context) throws Exception {
            //给ps中的?设置具体值
            ps.setString(1,value.getName());
            ps.setInt(2,value.getAge());
            //执行sql
            ps.executeUpdate();
        }
        @Override
        public void close() throws Exception {
            if (conn != null) conn.close();
            if (ps != null) ps.close();
        }
    }
}

3. Connectors

3.1 JDBC

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html

package cn.oldlu.connectors;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * Author oldlu
 * Desc
 */
public class ConnectorsDemo_JDBC {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        env.fromElements(new Student(null, "tonyma", 18))
                //3.Transformation
                //4.Sink
                .addSink(JdbcSink.sink(
                        "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",
                        (ps, s) -> {
                            ps.setString(1, s.getName());
                            ps.setInt(2, s.getAge());
                        },
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/bigdata")
                                .withUsername("root")
                                .withPassword("root")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .build()));
        //5.execute
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

3.2 Kafka

3.2.1 pom依赖

Flink 里已经提供了一些绑定的 Connector,例如 kafka source 和 sink,Es sink 等。读写

kafka、es、rabbitMQ 时可以直接使用相应 connector 的 api 即可,虽然该部分是 Flink 项目

源代码里的一部分,但是真正意义上不算作 Flink 引擎相关逻辑,并且该部分没有打包在二进制

的发布包里面。所以在提交 Job 时候需要注意, job 代码 jar 包中一定要将相应的 connetor

相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

3.2.2 参数设置

以下参数都必须/建议设置上

1.订阅的主题

2.反序列化规则

3.消费者属性-集群地址

4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)

5.消费者属性-offset重置规则,如earliest/latest…

6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)

7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做

Checkpoint的时候提交到Checkpoint和默认主题中

3.2.3 参数说明

实际的生产环境中可能有这样一些需求,比如:

场景一:有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着

业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感

知新的 topic。

场景二:作业从一个固定的 kafka topic 读数据,开始该 topic 有 10 个 partition,但随着业

务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 10 个扩容到 20。该情况下如

何在不重启作业情况下动态感知新扩容的 partition?

针对上面的两种场景,首先需要在构建 FlinkKafkaConsumer 时的 properties 中设置

flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,以及设置的时

间间隔。此时 FlinkKafkaConsumer 内部会启动一个单独的线程定期去 kafka 获取最新的 meta

信息。

针对场景一,还需在构建 FlinkKafkaConsumer 时,topic 的描述可以传一个正则表达式描述的

pattern。每次获取最新 kafka meta 时获取正则匹配的最新 topic 列表。

针对场景二,设置前面的动态发现参数,在定期获取 kafka 最新 meta 信息时会匹配新的

partition。为了保证数据的正确性,新发现的 partition 从最早的位置开始读取。

注意:

开启 checkpoint 时 offset 是 Flink 通过状态 state 管理和恢复的,并不是从 kafka 的 offset

位置恢复。在 checkpoint 机制下,作业从最近一次checkpoint 恢复,本身是会回放部分历史数

据,导致部分数据重复消费,Flink 引擎仅保证计算状态的精准一次,要想做到端到端精准一次需

要依赖一些幂等的存储系统或者事务操作。

3.2.4 Kafka命令

● 查看当前服务器中的所有topic

/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181

● 创建topic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2

–partitions 3 --topic flink_kafka

● 查看某个Topic的详情

/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181

● 删除topic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka

● 通过shell命令发送消息

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic

flink_kafka

● 通过shell消费消息

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic

flink_kafka --from-beginning

● 修改分区

/export/server/kafka/bin/kafka-topics.sh --alter --partitions 4 --topic flink_kafka --zookeeper

node1:2181

3.2.5 代码实现-Kafka Consumer

package cn.oldlu.connectors;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
/**
 * Author oldlu
 * Desc
 * 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount
 * 需要设置如下参数:
 * 1.订阅的主题
 * 2.反序列化规则
 * 3.消费者属性-集群地址
 * 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
 * 5.消费者属性-offset重置规则,如earliest/latest...
 * 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
 * 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
 */
public class ConnectorsDemo_KafkaConsumer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        Properties props  = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        props.setProperty("group.id", "flink");
        props.setProperty("auto.offset.reset","latest");
        props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //kafkaSource就是KafkaConsumer
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props);
        kafkaSource.setStartFromGroupOffsets();//设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费
        //kafkaSource.setStartFromEarliest();//设置直接从Earliest消费,和auto.offset.reset配置无关
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
}

3.2.6 代码实现-Kafka Producer

⚫ 需求:

将Flink集合中的数据通过自定义Sink保存到Kafka

⚫ 代码实现

package cn.oldlu.connectors;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
/**
 * Author oldlu
 * Desc
 * 使用自定义sink-官方提供的flink-connector-kafka_2.12-将数据保存到Kafka
 */
public class ConnectorsDemo_KafkaProducer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18));
        //3.Transformation
        //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
        //可以直接调用Student的toString,也可以转为JSON
        SingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() {
            @Override
            public String map(Student value) throws Exception {
                //String str = value.toString();
                String jsonStr = JSON.toJSONString(value);
                return jsonStr;
            }
        });
        //4.Sink
        jsonDS.print();
        //根据参数创建KafkaProducer/KafkaSink
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka",  new SimpleStringSchema(),  props);
        jsonDS.addSink(kafkaSink);
        //5.execute
        env.execute();
        // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

3.3 Redis

⚫ API

通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是

flink 提供了专门操作redis 的RedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下

来将主要介绍RedisSink 如何使用。

https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

RedisSink 核心类是RedisMapper 是一个接口,使用时我们要编写自己的redis 操作类实现这个

接口中的三个方法,如下所示

1.getCommandDescription() :

设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型

2.String getKeyFromData(T data):

设置value 中的键值对key的值

3.String getValueFromData(T data);

设置value 中的键值对value的值

⚫ 使用RedisCommand设置数据结构类型时和redis结构对应关系

⚫ 需求

将Flink集合中的数据通过自定义Sink保存到Redis

⚫ 代码实现

package cn.oldlu.connectors;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
/**
 * Author oldlu
 * Desc
 * 需求:
 * 接收消息并做WordCount,
 * 最后将结果保存到Redis
 * 注意:存储到Redis的数据结构:使用hash也就是map
 * key            value
 * WordCount    (单词,数量)
 */
public class ConnectorsDemo_Redis {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.Sink
        result.print();
        // * 最后将结果保存到Redis
        // * 注意:存储到Redis的数据结构:使用hash也就是map
        // * key            value
        // * WordCount      (单词,数量)
        //-1.创建RedisSink之前需要创建RedisConfig
        //连接单机版Redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
        //连接集群版Redis
        //HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379)));
        //FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build();
        //连接哨兵版Redis
        //Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379"));
        //FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build();
        //-3.创建并使用RedisSink
        result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
        //5.execute
        env.execute();
    }
    /**
     * -2.定义一个Mapper用来指定存储到Redis中的数据结构
     */
    public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "WordCount");
        }
        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }
        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString();
        }
    }
}


目录
相关文章
|
21天前
|
存储 SQL 大数据
用实时计算释放当下企业大数据潜能
本文整理自阿里云高级产品解决方案架构师王启华(敖北)老师在 Flink Forward Asia 2023 中闭门会的分享。
288 8
用实时计算释放当下企业大数据潜能
|
27天前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
37 1
|
1月前
|
SQL 关系型数据库 测试技术
实时数仓 Hologres操作报错合集之执行Flink的sink操作时出现报错,是什么原因
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
1月前
|
消息中间件 大数据 Kafka
"Apache Flink:重塑大数据实时处理新纪元,卓越性能与灵活性的实时数据流处理王者"
【8月更文挑战第10天】Apache Flink以卓越性能和高度灵活性在大数据实时处理领域崭露头角。它打破批处理与流处理的传统界限,采用统一模型处理有界和无界数据流,提升了开发效率和系统灵活性。Flink支持毫秒级低延迟处理,通过时间窗口、状态管理和自动并行化等关键技术确保高性能与可靠性。示例代码展示了如何使用Flink从Kafka读取实时数据并进行处理,简明扼要地呈现了Flink的强大能力。随着技术进步,Flink将在更多场景中提供高效可靠的解决方案,持续引领大数据实时处理的发展趋势。
69 7
|
19天前
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
35 0
|
19天前
|
Java Spring 安全
Spring 框架邂逅 OAuth2:解锁现代应用安全认证的秘密武器,你准备好迎接变革了吗?
【8月更文挑战第31天】现代化应用的安全性至关重要,OAuth2 作为实现认证和授权的标准协议之一,被广泛采用。Spring 框架通过 Spring Security 提供了强大的 OAuth2 支持,简化了集成过程。本文将通过问答形式详细介绍如何在 Spring 应用中集成 OAuth2,包括 OAuth2 的基本概念、集成步骤及资源服务器保护方法。首先,需要在项目中添加 `spring-security-oauth2-client` 和 `spring-security-oauth2-resource-server` 依赖。
33 0
|
22天前
|
消息中间件 数据挖掘 Kafka
揭秘大数据时代的极速王者!Flink:颠覆性流处理引擎,让实时数据分析燃爆你的想象力!
【8月更文挑战第29天】Apache Flink 是一个高性能的分布式流处理框架,适用于高吞吐量和低延迟的实时数据处理。它采用统一执行引擎处理有界和无界数据流,具备精确状态管理和灵活窗口操作等特性。Flink 支持毫秒级处理和广泛生态集成,但学习曲线较陡峭,社区相对较小。通过实时日志分析示例,我们展示了如何利用 Flink 从 Kafka 中读取数据并进行词频统计,体现了其强大功能和灵活性。
28 0
|
25天前
|
机器学习/深度学习 监控 大数据
Serverless 应用的监控与调试问题之Flink在整个开源大数据生态中应该如何定位,差异化该如何保持
Serverless 应用的监控与调试问题之Flink在整个开源大数据生态中应该如何定位,差异化该如何保持
|
1月前
|
存储 SQL Java
实时数仓 Hologres产品使用合集之如何使用Flink的sink连接
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
15天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
61 11

热门文章

最新文章