博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink window Function - ProcessAllWindowFunction
阅读量:5459 次
发布时间:2019-06-15

本文共 1977 字,大约阅读时间需要 6 分钟。

package window.non_keyed import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import org.apache.flink.api.scala._ import scala.collection.mutable /**  * @author: create by maoxiangyi  * @version: v1.0  * @description: window  * @date:2019 /6/4  */ object ProcessAllWindowWordCount {
def main(args: Array[String]): Unit = {
//设置环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment() //设置数据源 env.addSource(new SourceFunction[String] {
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
while (true) {
ctx.collect("hello hadoop hello storm hello spark") Thread.sleep(1000) } } override def cancel(): Unit = {} }) //计算逻辑 .flatMap(_.split(" ")) .map((_, 1)) .timeWindowAll(Time.seconds(10), Time.seconds(10)) .process(new ProcessAllWindowFunction[(String, Int), mutable.Map[String, Int], TimeWindow] {
override def process(context: Context, elements: Iterable[(String, Int)], out: Collector[mutable.Map[String, Int]]): Unit = {
val wordCountMap = mutable.Map[String, Int]() elements.foreach(kv => {
wordCountMap.put(kv._1, wordCountMap.get(kv._1).getOrElse(0) + kv._2) }) out.collect(wordCountMap) } }).flatMap(new FlatMapFunction[mutable.Map[String, Int], (String, Int)] {
override def flatMap(value: mutable.Map[String, Int], out: Collector[(String, Int)]): Unit = {
value.foreach(out.collect(_)) } }) .print() //提交任务 env.execute("word count") } }

转载于:https://www.cnblogs.com/maoxiangyi/p/10978243.html

你可能感兴趣的文章
js实现文字无间断左右滚动和图片左右滚动
查看>>
题目11:软件工程等名词解释
查看>>
自己写平方根squareroot函数
查看>>
关于RTSP-Over-HTTP
查看>>
SQL SERVER 2005如何建立自动备份的维护计划
查看>>
深入剖析C#的多态
查看>>
SQL2008 用户'sa'登录失败(错误18456)图文解决方法
查看>>
json属性名必须加引号的讨论
查看>>
Winform--数据库链接(EF CodeFirst)
查看>>
TCP的发送缓冲区和接收缓冲区
查看>>
SQL Server的导出导入方式有
查看>>
Unity3D_(Shuriken粒子系统)制作简单的烟花爆炸效果
查看>>
3. Longest Substring Without Repeating Characters
查看>>
织梦添加搜索功能
查看>>
JDK的安装和环境变量配置
查看>>
jmeter学习记录--05--Beanshell2
查看>>
HDU1402 HDU4609 FFT快速DFT
查看>>
DataGridView添加一行数据、全选、取消全选、清空数据、删除选中行
查看>>
抽象工厂模式
查看>>
数据库连接数使用情况监控
查看>>