当前位置: 首页 > news >正文

足彩网站怎样做推广营销型制作网站公司

足彩网站怎样做推广,营销型制作网站公司,公司网站横幅是做的吗,中国住房和城乡建设厅网站一、Flink 窗口 理解 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击…

一、Flink 窗口 理解

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。

在Flink中, 窗口(window)是处理无界流的核心. 窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算.

时间窗口
时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸。在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp()),时间窗口又分3种:滚动窗口、滑动窗口、会话窗口。

二、数据准备

准备一个WaterSensor类方便演示

package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}

三、时间滚动窗口

滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口。滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。

滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口
1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定,2.我们传递给window函数的对象叫窗口分配器.

时间滚动窗口代码

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list  = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime + "  " + "key:" + key + "  " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T>  list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}

运行结果
在hadoop100 服务器
输入nc -lk 999 启动socket
在这里插入图片描述

消费结果:
在这里插入图片描述

四、时间滑动窗口

与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据。

时间滑动窗口代码

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(5))).window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list  = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime + "  " + "key:" + key + "  " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T>  list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}

执行结果
在hadoop100 服务器
输入nc -lk 999 启动socket
在这里插入图片描述
消费结果
在这里插入图片描述

五、时间会话窗口

会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口。

时间会话窗口代码

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//                .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))).window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list  = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime + "  " + "key:" + key + "  " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T>  list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}

运行结果

在hadoop100 服务器
输入nc -lk 999 启动socket
在这里插入图片描述
消费结果
在这里插入图片描述
因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction 。

六、基于元素个数的滚动窗口

默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
实例代码
.countWindow(3)
说明:哪个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.

基于元素个数的滚动窗口代码

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s_n {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).countWindow(2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list  = toList(elements);out.collect("窗口:" + "key:" + key + "  " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T>  list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}

运行结果
在这里插入图片描述
在这里插入图片描述

七、基于元素个数的滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。
实例代码
.countWindow(3, 2)

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s_n {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
//                .countWindow(2).countWindow(3,2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list  = toList(elements);out.collect("窗口:" + "key:" + key + "  " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T>  list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}

运行结果
在这里插入图片描述
在这里插入图片描述

http://www.yidumall.com/news/55551.html

相关文章:

  • 获取网站物理路径网络推广平台有哪些
  • 福田网站制作比较好的网络营销工具体系
  • 网站制作真人游戏娱乐平台怎么做优化大师客服
  • 获取网站全站代码外链代发免费
  • 可以做婚礼视频的网站有哪些国内最好用免费建站系统
  • 做的响应式网站用什么测试天津优化网络公司的建议
  • 无锡华诚建设监理有限公司网站怎么让某个关键词排名上去
  • 便宜虚拟主机做网站备份重庆seo排名扣费
  • 农产品网站建设长沙服务好的网络营销
  • 苏州中国建设银行招聘信息网站seo网站优化策划书
  • 网页设计制作工资专业搜索引擎seo技术公司
  • 网站建设工具沈阳seo优化排名公司
  • 如何让WordPress上传媒体北京seo关键词优化外包
  • 如何做有后台的网站手机网站智能建站
  • 福建省建设厅网站信用评分信息推广
  • 建立自己的购物网站谷歌play商店
  • 一个网站只有一个核心关键词fifa最新世界排名
  • asp.net免费网站潍坊关键词优化平台
  • 巢湖做网站的公司2024免费网站推广大全
  • iis配置网站php免费com域名申请注册
  • wordpress加速访问网站seo报价
  • 国外手机主题网站百度网站权重查询
  • 企业做网站需要提供什么资料电脑网页制作
  • 最火的网站开发框架seo综合查询平台官网
  • wordpress menu_walker优化关键词步骤
  • 个人做健康网站好吗免费seo软件推荐
  • 西安哪里做网站最大搜索排名提升
  • 武汉做优化网站公司搜索大全
  • 陕西省人民政府地址关键词优化哪家好
  • wordpress 时间调用东莞seo优化团队