Pyflink v1.1.2 Python 3.6.8
代码如下:
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
st_env = StreamTableEnvironment.create(s_env)
result_file = "/tmp/slide_time_window_streaming.csv"
if os.path.exists(result_file):
os.remove(result_file)
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("0.11")
.topic("user")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
) \
.with_format( # declare a format for this system
Json()
.fail_on_missing_field(True)
.json_schema(
"{"
" type: 'object',"
" properties: {"
" a: {"
" type: 'string'"
" },"
" b: {"
" type: 'string'"
" },"
" c: {"
" type: 'string'"
" },"
" time: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)
) \
.with_schema( # declare the schema of the table
Schema()
.field("rowtime", DataTypes.TIMESTAMP())
.rowtime(
Rowtime()
.timestamps_from_field("time")
.watermarks_periodic_bounded(60000))
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
) \
.in_append_mode() \
.register_table_source("source")
st_env.register_table_sink("result",
CsvTableSink(["a", "b"],
[DataTypes.STRING(),
DataTypes.STRING()],
result_file))
st_env.scan("source").window(Slide.over("1.hours").every("10.minutes").on("rowtime").alias("w")) \
.group_by("w, a") \
.select("a, max(b)").insert_into("result")
st_env.execute("slide time window streaming")
运行后出现如下问题:
py4j.protocol.Py4JJavaError: An error occurred while calling o69.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.
at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
调试手段
代码中加入:
System.out.println(orders.getSchema());
root |-- user: BIGINT |-- product: STRING |-- amount: INT |-- rowtime: TIMESTAMP(3) ROWTIME
解决方案
Table orders = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"),$("rowtime").rowtime());
对应的OrderStream是:
// *************************************************************************
// USER DATA TYPES
// *************************************************************************
/*
* Simple POJO.
*/
import java.sql.Timestamp;
import org.apache.flink.streaming.api.windowing.time.Time;
public class OrderStream
{
public Long user;
public String product;
public int amount;
public Long rowtime;
public OrderStream()
{
}
public OrderStream(Long user, String product, int amount,Long rowtime)
{
this.user = user;
this.product = product;
this.amount = amount;
this.rowtime=rowtime;
}
@Override
public String toString() {
return "Order{" +
"user=" + user +
", product='" + product + '\'' +
", amount ='" + amount + '\'' +
", rowtime=" + rowtime +
'}';
}
}
对应的主程序为:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
new OrderStream(1L, "beer", 3, 1505529000L), //2017-09-16 10:30:00
new OrderStream(3L, "rubber", 2,1505527800L),//2017-09-16 10:10:00
new OrderStream(3L, "rubber", 2,1505527800L),//2017-09-16 10:10:00
new OrderStream(1L, "diaper", 4,1505528400L),//2017-09-16 10:20:00
new OrderStream(1L, "diaper", 4,1505528400L)//2017-09-16 10:20:00
));
Table orders = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"),$("rowtime").rowtime());
System.out.println(orders.getSchema());
来自网友的补充:
Hunter x Hunter: $("rowtime2").proctime()是表示自动生成一个字段 rowtime2 Hunter x Hunter: $("rowtime2").rowtime()是表示用已经存在的字段 rowtime2用作eventime
注意,时间属性不是数据库表格里面的一个字段
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。