Flink流处理API大合集:掌握所有flink流处理技术,看这一篇就够了

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 一个flink应用程序开发的步骤大致为五个步骤:构建执行环境、获取数据源、操作数据源、输出到外部系统、触发程序执行。

前言


在之前的文章中有提到过,一个flink应用程序开发的步骤大致为五个步骤:构建执行环境、获取数据源、操作数据源、输出到外部系统、触发程序执行。由这五个模块组成了一个flink任务,接下来围绕着每个模块对应的API进行梳理。

以下所有的代码案例都已收录在本人的Gitee仓库,有需要的同学点击链接直接获取:

Gitee地址:https://gitee.com/xiaoZcode/flink_test


。.png

一、构建流执行环境(Environment)


getExecutionEnvironment()


创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境。它会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。


代码如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();


createLocalEnvironment()


返回本地执行环境,需要在调用时指定默认的并行度。


代码如下:


LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);

1

createRemoteEnvironment()


返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager的 IP 和端口号,并指定要在集群中运行的 Jar 包。


代码如下:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//xxx.jar");

二、加载数据源(Source)


案例场景:


工业物联网的背景下,收集传感器的温度值,将收集到不同传感器的温度值进行计算分析操作。

注:以下代码都围绕此场景进行编写,获取更完整源代码请移步文章开头部分。


创建传感器对象:SensorReading


public class SensorReading {
    private String id;
    private Long timestamp;
    private Double temperature;
    public SensorReading() {
    }
    public SensorReading(String id, Long timestamp, Double temperature) {
        this.id = id;
        this.timestamp = timestamp;
        this.temperature = temperature;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public Long getTimestamp() {
        return timestamp;
    }
    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }
    public Double getTemperature() {
        return temperature;
    }
    public void setTemperature(Double temperature) {
        this.temperature = temperature;
    }
    @Override
    public String toString() {
        return "SensorReading{" +
                "id='" + id + '\'' +
                ", timestamp=" + timestamp +
                ", temperature=" + temperature +
                '}';
    }
}

从集合读取数据


public class SourceTest1_Collection {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为 1
        env.setParallelism(1);
        //从集合中读取数据
        DataStream<SensorReading> dataStream  = env.fromCollection(Arrays.asList(
                new SensorReading("sensor_1", 1547718199L, 35.8),
                new SensorReading("sensor_2", 1547718199L, 35.0),
                new SensorReading("sensor_3", 1547718199L, 38.8),
                new SensorReading("sensor_4", 1547718199L, 39.8)
        ));
        DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5, 789);
        //打印输出
        dataStream.print("data");
        integerDataStream.print("int");
        //执行程序
        env.execute();
    }
}

从文件读取数据


从文件中获取数据源的核心代码部分:


DataStream<String> dataStream = env.readTextFile("xxx ");
public class SourceTest2_File {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从文件读取数据
        DataStream<String> dataStream = env.readTextFile("sensor.txt");
         dataStream.print();
         env.execute();
    }
}

从Kafka读取数据


首先需要引入Kafka的以来到工程中


<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
  <version>1.10.1</version>
</dependency>


public class SourceTest3_Kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties=new Properties();
        properties.setProperty("bootstrap.servers","localhost:9092");
        properties.setProperty("group.id","consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset","latest");
        DataStream<String> dataStream=env.addSource(new FlinkKafkaConsumer011<String>("sensor",new SimpleStringSchema(),properties));
        dataStream.print();
        env.execute();
    }
}

自定义数据源Source


除了从集合、文件以及Kafka中获取数据外,还给我们提供了一个自定义source的方式,需要传入sourceFunction函数。核心代码如下:


DataStream<SensorReading> dataStream = env.addSource( new MySensor());
public class SourceTest4_UDF {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从文件读取数据
        DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());
        dataStream.print();
        env.execute();
    }
    // 实现自定义数据源
    public static class MySensorSource implements SourceFunction<SensorReading>{
        // 定义一个标记位,控制数据产生
        private boolean running = true;
        @Override
        public void run(SourceContext<SensorReading> ctv) throws Exception {
            // 随机数
            Random random=new Random();
            //设置10个初始温度
            HashMap<String, Double> sensorTempMap = new HashMap<>();
            for (int i = 0; i < 10; i++) {
                sensorTempMap.put("sensor_"+(i+1), 60 + random.nextGaussian() * 20); // 正态分布
            }
            while (running){
                for (String sensorId: sensorTempMap.keySet()) {
                    Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
                    sensorTempMap.put(sensorId,newTemp);
                    ctv.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
                }
                Thread.sleep(1000);
            }
        }
        @Override
        public void cancel() {
            running=false;
        }
    }
}

三、转换算子(Transform)


获取到指定的数据源后,还要对数据源进行分析计算等操作,


基本转换算子:Map、flatMap、Filter

,,.png


public class TransformTest1_Base {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从文件读取数据
        DataStream<String> inputStream = env.readTextFile("sensor.txt");
        // 1. map 把String转换成长度生成
        DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return value.length();
            }
        });
    //   2. flatmap 按逗号切分字段
        DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] fields=value.split(",");
                for (String field : fields){
                    out.collect(field);
                }
            }
        });
    //    3. filter ,筛选sensor_1 开头对id对应的数据
        DataStream<String> filterStream=inputStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("sensor_1");
            }
        });
    //    打印输出
        mapStream.print("map");
        flatMapStream.print("flatMap");
        filterStream.print("filter");
    // 执行程序
        env.execute();
    }
}

KeyBy、滚动聚合算子【sum()、min()、max()、minBy()、maxBy()】


KeyBy:DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。

如上算子可以针对 KeyedStream 的每一个支流做聚合。

,.png


public class TransformTest2_RollingAggregation {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从文件读取数据
        DataStream<String> inputStream = env.readTextFile("sensor.txt");
        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] fields=s.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
            }
        });
        // DataStream<SensorReading> dataStream = inputStream.map(line -> {
        //     String[] fields = line.split(",");
        //     return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        // });
        // 分组
        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
        // KeyedStream<SensorReading, String> keyedStream1 = dataStream.keyBy(SensorReading::getId);
        //滚动聚合,取当前最大的温度值
        // DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
        DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
        resultStream.print();
        env.execute();
    }
}

Reduce


KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。


public class TransformTest3_Reduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从文件读取数据
        DataStream<String> inputStream = env.readTextFile("sensor.txt");
        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] fields=s.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
            }
        });
        // 分组
        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
        // reduce 聚合,取最大的温度,以及当前最新对时间戳
        DataStream<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
            @Override
            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
                return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
            }
        });
        resultStream.print();
        env.execute();
    }
}


分流【Split 、Select】、合流【Connect 、CoMap、union】


Split


DataStream → SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream。

1.png


Select


SplitStream→DataStream:从一个 SplitStream 中获取一个或者多个DataStream。


2.png

Connect


DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。


3.png


CoMap、CoFlatMap


ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。

4.png



Union


DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream。

5.png



DataStream<SensorReading> unionStream = xxxstream.union(xxx);

1

Connect 与 Union 区别:


Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。

Connect 只能操作两个流,Union 可以操作多个。

public class TransformTest4_MultipleStreams {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从文件读取数据
        DataStream<String> inputStream = env.readTextFile("sensor.txt");
        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] fields=s.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
            }
        });
        // 1。分流 按照温度值30度为界进行分流
        SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading value) {
                return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });
        // 通过条件选择对应流数据
        DataStream<SensorReading> highTempStream = splitStream.select("high");
        DataStream<SensorReading> lowTempStream = splitStream.select("low");
        DataStream<SensorReading> allTempStream = splitStream.select("high","low");
        highTempStream.print("high");
        lowTempStream.print("low");
        allTempStream.print("all");
        // 2。合流 connect,先将高温流转换为二元组,与低温流合并后,输出状态信息。
        DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), value.getTemperature());
            }
        });
        // 只能是两条流进行合并,但是两条流的数据类型可以不一致
        ConnectedStreams<Tuple2<String, Double>, SensorReading> connectStream = warningStream.connect(lowTempStream);
        DataStream<Object> resultStream = connectStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
            @Override
            public Object map1(Tuple2<String, Double> value) throws Exception {
                return new Tuple3<>(value.f0, value.f1, "high temp warning");
            }
            @Override
            public Object map2(SensorReading value) throws Exception {
                return new Tuple2<>(value.getId(), "normal");
            }
        });
        resultStream.print();
        // 3。union联合多条流 限制就是每条流数据类型必须一致
        DataStream<SensorReading> union = highTempStream.union(lowTempStream, allTempStream);
        union.print("union stream");
        env.execute();
    }
}


四、数据输出(Sink)


Flink官方提供了一部分框架的Sink,用户也可以自定义实现Sink。flink将任务进行输出的操作核心代码:stream.addSink(new MySink(xxxx))。


Kafka

引入Kafka依赖:


<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
 <version>1.10.1</version>
</dependency>
public class SinkTest1_Kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从文件读取数据
        DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
        // 转换成SensorReading类型
        DataStream<String> dataStream=inputStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                String[] fields=s.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2])).toString();
            }
        });
        //输出到外部系统
        dataStream.addSink(new FlinkKafkaProducer011<String>("localhost:9092","sinktest",new SimpleStringSchema()));
        env.execute();
    }
}

Redis

引入Redis依赖:


<dependency>
 <groupId>org.apache.bahir</groupId>
 <artifactId>flink-connector-redis_2.11</artifactId>
 <version>1.0</version>
</dependency>
public class SinkTest2_Redis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从文件读取数据
        DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] fields=s.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
            }
        });
        // jedis配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost")
                .setPort(6379)
                .build();
        dataStream.addSink(new RedisSink<>(config,new MyRedisMapper()));
        env.execute();
    }
    // 自定义RedisMapper
    public static class MyRedisMapper implements RedisMapper<SensorReading>{
        //自定义保存数据到Redis的命令,存成hash表Hset
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");
        }
        @Override
        public String getKeyFromData(SensorReading data) {
            return data.getId();
        }
        @Override
        public String getValueFromData(SensorReading data) {
            return data.getTemperature().toString();
        }
    }
}

Elasticsearch

引入依赖:


<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
 <version>1.10.1</version>
</dependency>
public class SinkTest3_ES {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env;
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从文件读取数据
        DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
            public SensorReading map(String s) throws Exception {
                String[] fields=s.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
            }
        });
        // 定义ES的链接配置
        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("localhost",9200));
        dataStream.addSink(new ElasticsearchSink.Builder<SensorReading>(httpHosts,new MyEsSinkFunction()).build());
        env.execute();
    }
    //实现自定义的ES写入操作
    public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
        @Override
        public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
            // 定义写入的数据source
            HashMap<String, String> dataSource = new HashMap<>();
            dataSource.put("id",element.getId());
            dataSource.put("temp",element.getTemperature().toString());
            dataSource.put("ts",element.getTimestamp().toString());
            // 创建请求作为向ES发起的写入命令
            IndexRequest indexRequest = Requests.indexRequest()
                    .index("sensor")
                    .type("readingdata")
                    .source(dataSource);
            // 用indexer发送请求
            indexer.add(indexRequest);
        }
    }
}

自定义Sink(JDBC)

引入依赖:


<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.44</version>
</dependency>
public class SinkTest4_JDBC {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从文件读取数据
        DataStream<String> inputStream = env.readTextFile("sensor.txt");
        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] fields=s.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
            }
        });
        dataStream.addSink(new MyJDBCSink());
        env.execute();
    }
    // 实现自定义SinkFunction
    public static class MyJDBCSink extends RichSinkFunction<SensorReading> {
        //声明连接和预编译
        Connection connection=null;
        PreparedStatement insert=null;
        PreparedStatement update=null;
        @Override
        public void open(Configuration parameters) throws Exception {
            connection= DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");
            insert=connection.prepareStatement("insert into sensor_temp (id,temp) values (?,?)");
            update=connection.prepareStatement("update sensor_temp set temp = ? where id = ? ");
        }
        // 每来一条数据,调用链接,执行sql
        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
           // 直接执行更新
            update.setDouble(1,value.getTemperature());
            update.setString(2,value.getId());
            update.execute();
            if (update.getUpdateCount() == 0){
                insert.setString(1,value.getId());
                insert.setDouble(2,value.getTemperature());
                insert.execute();
            }
        }
        // 关闭连接流
        @Override
        public void close() throws Exception {
            connection.close();
            insert.close();
            update.close();
        }
    }
}


五、数据类型、UDF 函数、富函数


Flink支持的数据类型


Flink 支持所有的 Java 和 Scala 基础数据类型,Int, Double, Long, String等


DataStream<Integer> numberStream = env.fromElements(1, 2, 3, 4);


Java 和 Scala 元组(Tuples)

DataStream<Tuple2<String, Integer>> personStream = env.fromElements(
 new Tuple2("Adam", 17),
 new Tuple2("Sarah", 23) );
personStream.filter(p -> p.f1 > 18);

Flink 对 Java 和 Scala 中的一些特殊目的的类型也都是支持的,比如 Java 的

ArrayList,HashMap,Enum 等等


UDF 函数


Flink 暴露了所有 udf 函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction 等等。


富函数(Rich Functions)


“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。RichMapFunction、RichFlatMapFunction、RichFilterFunction


Rich Function 有一个生命周期的概念。典型的生命周期方法有:


open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前open()会被调用。

close()方法是生命周期中的最后一个调用的方法,做一些清理工作。

getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函 数执行的并行度,任务的名字,以及state 状态。

public class TransformTest5_RichFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        //从文件读取数据
        DataStream<String> inputStream = env.readTextFile("sensor.txt");
        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] fields=s.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
            }
        });
        DataStream<Tuple2<String,Integer>> resultStream=dataStream.map(new MyMapper());
        resultStream.print();
        env.execute();
    }
    public static class MyMapper0 implements MapFunction<SensorReading,Tuple2<String,Integer>>{
        @Override
        public Tuple2<String, Integer> map(SensorReading value) throws Exception {
            return new Tuple2<>(value.getId(),value.getId().length());
        }
    }
    // 继承富函数
    public static class MyMapper extends RichMapFunction<SensorReading,Tuple2<String,Integer>>{
        @Override
        public Tuple2<String, Integer> map(SensorReading value) throws Exception {
            // getRuntimeContext().getState()
            return new Tuple2<String,Integer>(value.getId(),getRuntimeContext().getIndexOfThisSubtask());
        }
        @Override
        public void open(Configuration parameters) throws Exception {
            // 初始化工作,一般是定义状态,或者创建数据库链接
            System.out.println("open");
            // super.open(parameters);
        }
        @Override
        public void close() throws Exception {
            // 关闭链接,收尾状态
            System.out.println("close");
            // super.close();
        }
    }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
缓存 监控 API
淘宝 API 接口使用的技术要点与注意事项
在数字化商业环境中,淘宝API为开发者提供了强大的工具,用于与淘宝平台交互,获取商品信息及处理交易等。本文总结了正确使用API的关键技术要点:注册认证、理解接口文档、遵守调用限制、确保参数准确性、保护数据安全、处理异常、性能优化、版本兼容、合规性及日志监控,帮助开发者实现高效、安全的程序开发。
|
3月前
|
监控 安全 测试技术
深入理解后端技术中的API设计原则
在当今数字化时代,后端技术已成为构建高效、可扩展和安全应用程序的关键因素。本文将探讨后端开发中的API设计原则,包括RESTful架构、版本控制以及安全性等方面,旨在帮助开发者提升API设计的质量和用户体验。通过对这些原则的深入理解,可以更好地满足业务需求并提高系统的可靠性。
68 0
|
3月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
420 31
Apache Flink 流批融合技术介绍
|
2月前
|
安全 物联网 API
API技术之身份认证
【10月更文挑战第17天】身份认证是API安全的核心,确保API可信可控。
API技术之身份认证
|
2月前
|
JSON 前端开发 测试技术
API接口 |产品经理一定要懂的10%技术知识
作为产品经理,掌握约10%的技术知识对处理API相关工作至关重要。这包括理解API的基本概念及其作为数据交换的桥梁作用;熟悉JSON和XML两种主要数据格式及其特点;了解常见HTTP请求方法(GET、POST、PUT、DELETE)及响应状态码;关注API安全性,如认证授权和数据加密;掌握API版本管理和错误处理技巧;重视性能优化,以提升用户体验;参与API联调测试,确保稳定可靠;并与前后端团队紧密协作,选择合适的第三方API服务,推动产品高效开发。
|
2月前
|
XML API 网络架构
API协议 的十种技术特点及适用场景
本文介绍了十种常见的API协议技术,包括REST、GraphQL、gRPC、SOAP、WebSocket、AMF和XML-RPC等,每种技术都有其特点和适用场景,如REST适用于轻量级Web服务开发,gRPC适合高性能分布式系统,而WebSocket则适用于需要低延迟交互的应用。
|
2月前
|
SQL Java API
深入探索Java的持久化技术——JPA(Java Persistence API)
【10月更文挑战第10天】深入探索Java的持久化技术——JPA(Java Persistence API)
61 0
|
2月前
|
Java API 数据库
深入探索Java的持久化技术——JPA(Java Persistence API)
【10月更文挑战第10天】深入探索Java的持久化技术——JPA(Java Persistence API)
47 0
|
3月前
|
缓存 测试技术 API
电商平台 API 接入技术要点深度剖析
本文介绍了高效使用电商平台API的关键步骤。首先,深入理解API文档,明确功能权限与参数格式要求;其次,选择合适的接入方式,如HTTP/HTTPS协议和RESTful API;接着,实施身份验证与授权机制,确保数据安全传输;此外,还需关注性能优化、安全防护、监控与日志记录,以提升系统稳定性和响应速度;最后,进行充分测试与调试,并关注API版本更新,确保长期兼容性。
|
2月前
|
安全 NoSQL 测试技术
商品详情API接口的技术实现
本文介绍了电商平台上商品详情API接口的设计与实现过程,涵盖需求分析、接口定义、数据模型设计及技术选型等方面。通过合理的后端框架、数据库设计和安全措施,确保接口高效、稳定和安全。最后,通过详尽的测试与部署步骤,实现优质购物体验。旨在为技术人员提供实用参考。