import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowcase class User(id:Int,name:String,timestamp:Long)
object SqlTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1,"nie",1511658000),User(2,"hu",1511658000)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线tEnv.registerDataStream("testTable",stream,'id, 'name, 'event_time.rowtime)val result = tEnv.scan("testTable").window(Tumble over 1.hour on 'event_time as 'test) //指定窗口类型和时间类型并将窗口重新命名为test.groupBy('test) //根据窗口聚合.select('id.sum) //指定对id字段进行sum求和result.toRetractStream[Row].print()streamEnv.execute("windowTest")}
}
1、Tumbling Window
--event_time
//首先需要指定事件类型为EventTime
//然后给字段取别名的时候,EventTime对应rowtime
tEnv.registerDataStream("testTable",stream,'id, 'name, 'event_time.rowtime)
//后面窗口的on关键字后面接事件时间
window(Tumble over 1.hour on 'event_time as 'test)
--process_time
//首先需要指定事件类型为process_time(默认)
//然后给字段取别名的时候,process_time对应rowtime
tEnv.registerDataStream("testTable",stream,'id, 'name, 'process_time.proctime)
//后面窗口的on关键字后面接事件时间
window(Tumble over 1.hour on 'process_time as 'test)
--rowcount
//over后接具体行数,注意:process_time无实际意义
window(Tumble over 100.rows on 'process_time as 'test)
2、Sliding Windows
--event_time
//首先需要指定事件类型为EventTime
//然后给字段取别名的时候,EventTime对应rowtime
tEnv.registerDataStream("testTable",stream,'id, 'name, 'event_time.rowtime)
//后面窗口的on关键字后面接事件时间
window(Slide over 10.minutes every 5.millis on 'event_time as 'test) //表示窗口长度为10分钟,每隔5s统计一次
3、Session Windows
--event_time
//首先需要指定事件类型为EventTime
//然后给字段取别名的时候,EventTime对应rowtime
tEnv.registerDataStream("testTable",stream,'id, 'name, 'event_time.rowtime)
//后面窗口的on关键字后面接事件时间
window(Session WithGap 10.minutes on 'event_time as 'test) //基于eventtime创建Session Window,Session Gap为10min
Over Window和标准Sql中提供的开窗函数语法功能类似,也是一种数据聚合计算的方式,但和Group Window不同的是,over
window不需要对输入数据按照窗口大小进行堆叠
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowcase class User(id:Int,name:String,age:Int,timestamp:Long)
object SqlTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658000)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线tEnv.registerDataStream("testTable",stream,'id, 'name, 'event_time.rowtime)val result = tEnv.scan("testTable").window(Over partitionBy 'id orderBy 'event_time preceding UNBOUNDED_RANGE as 'test) .select('name,'id.sum over 'test)result.toRetractStream[Row].print()streamEnv.execute("windowTest")}
}
1、Tumble Window
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowcase class User(id:Int,name:String,age:Int,timestamp:Long)
object SqlTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658000)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线tEnv.registerDataStream("testTable",stream,'id, 'name,'age,'event_time.rowtime)//以10分钟作为一个窗口,和table API稍有不同val result = tEnv.sqlQuery("select id,sum(age) from testTable group by tumble(event_time, INTERVAL '10' MINUTE),id")result.toRetractStream[Row].print()streamEnv.execute("windowTest")}
}
2、HOP Windows
//指定窗口长度为10分钟,每隔1分钟滑动一次窗口,共有4条记录val result = tEnv.sqlQuery("select id,sum(age) from testTable group by HOP(event_time, INTERVAL '5' MINUTE,INTERVAL '10' MINUTE),id")
3、Session Windows
//Session Gap为5h,表示5h内没有数据接入则认为窗口结束并触发计算val result = tEnv.sqlQuery("select id,sum(age) from testTable group by SESSION(event_time, INTERVAL '5' HOUR),id")
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowcase class User(id:Int,name:String,age:Int,timestamp:Long)
object SqlTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658000)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线tEnv.registerDataStream("testTable",stream,'id, 'name,'age,'event_time.rowtime)val result = tEnv.sqlQuery("select id,name,sum(age)over(partition by id order by event_time " +"ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) from testTable")result.toRetractStream[Row].print()streamEnv.execute("windowTest")}
}
结果如下:
上一篇:6.5 TIM输入捕获
下一篇:【玩转c++】继承深度解析