开发者社区> 问答> 正文

pyflink中使用window出现expects a time attribute

Pyflink v1.1.2 Python 3.6.8

代码如下:

s_env = StreamExecutionEnvironment.get_execution_environment()
    s_env.set_parallelism(1)
    s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    st_env = StreamTableEnvironment.create(s_env)
    result_file = "/tmp/slide_time_window_streaming.csv"
    if os.path.exists(result_file):
        os.remove(result_file)
    st_env \
        .connect(  # declare the external system to connect to
            Kafka()
            .version("0.11")
            .topic("user")
            .start_from_earliest()
            .property("zookeeper.connect", "localhost:2181")
            .property("bootstrap.servers", "localhost:9092")
        ) \
        .with_format(  # declare a format for this system
            Json()
            .fail_on_missing_field(True)
            .json_schema(
                "{"
                "  type: 'object',"
                "  properties: {"
                "    a: {"
                "      type: 'string'"
                "    },"
                "    b: {"
                "      type: 'string'"
                "    },"
                "    c: {"
                "      type: 'string'"
                "    },"
                "    time: {"
                "      type: 'string',"
                "      format: 'date-time'"
                "    }"
                "  }"
                "}"
             )
      ) \
        .with_schema(  # declare the schema of the table
             Schema()
             .field("rowtime", DataTypes.TIMESTAMP())
             .rowtime(
                Rowtime()
                .timestamps_from_field("time")
                .watermarks_periodic_bounded(60000))
             .field("a", DataTypes.STRING())
             .field("b", DataTypes.STRING())
             .field("c", DataTypes.STRING())
         ) \
        .in_append_mode() \
        .register_table_source("source")

    st_env.register_table_sink("result",
                               CsvTableSink(["a", "b"],
                                            [DataTypes.STRING(),
                                             DataTypes.STRING()],
                                            result_file))

    st_env.scan("source").window(Slide.over("1.hours").every("10.minutes").on("rowtime").alias("w")) \
        .group_by("w, a") \
        .select("a, max(b)").insert_into("result")

    st_env.execute("slide time window streaming")

运行后出现如下问题:


py4j.protocol.Py4JJavaError: An error occurred while calling o69.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.
	at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
	at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
	at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
	at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
	at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
	at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
	at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

展开
收起
夜色清晨 2020-10-20 18:18:08 1510 0
1 条回答
写回答
取消 提交回答
  • 下一站是幸福

    调试手段

    代码中加入:

    System.out.println(orders.getSchema());

    root |-- user: BIGINT |-- product: STRING |-- amount: INT |-- rowtime: TIMESTAMP(3) ROWTIME

    解决方案

    Table orders = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"),$("rowtime").rowtime());

    对应的OrderStream是:

    // *************************************************************************
    //     USER DATA TYPES
    // *************************************************************************
     
    /*
     * Simple POJO.
     */
     
     
    import java.sql.Timestamp;
    import org.apache.flink.streaming.api.windowing.time.Time;
     
    public class OrderStream
    {
        public Long user;
        public String product;
        public int amount;
        public Long rowtime;
     
        public OrderStream()
        {
        }
     
        public OrderStream(Long user, String product, int amount,Long rowtime)
        {
            this.user = user;
            this.product = product;
            this.amount = amount;
            this.rowtime=rowtime;
        }
     
        @Override
        public String toString() {
            return "Order{" +
                    "user=" + user +
                    ", product='" + product + '\'' +
                    ", amount ='" + amount  + '\'' +
                    ", rowtime="  + rowtime +
                    '}';
        }
    }
    

    对应的主程序为:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
     
     
        DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
                new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
                new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
                new OrderStream(3L, "rubber", 2,1505527800L),//2017-09-16 10:10:00
                new OrderStream(3L, "rubber", 2,1505527800L),//2017-09-16 10:10:00
                new OrderStream(1L, "diaper", 4,1505528400L),//2017-09-16 10:20:00
                new OrderStream(1L, "diaper", 4,1505528400L)//2017-09-16 10:20:00
        ));
     
     
            Table orders = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"),$("rowtime").rowtime());
            System.out.println(orders.getSchema());
    

    来自网友的补充:

    Hunter x Hunter: $("rowtime2").proctime()是表示自动生成一个字段 rowtime2 Hunter x Hunter: $("rowtime2").rowtime()是表示用已经存在的字段 rowtime2用作eventime

    注意,时间属性不是数据库表格里面的一个字段

    2021-04-02 22:00:27
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Dynamic DDL Adding Structure to Streaming Data on the Fly 立即下载
A Developer’s View into Spark\'s Memory Model 立即下载
HBase Current State and Future : Community View 立即下载