当前位置: 首页 > news >正文

做网站的分页查询网易搜索引擎

做网站的分页查询,网易搜索引擎,江门做网站,泰安选择企业建站公司该文档演示了fink windows的操作DEMO 环境准备: kafka本地运行:kafka部署自动生成名字代码:随机名自动生成随机IP代码:随机IPFlink 1.18 测试数据 自动向kafka推送数据 import cn.hutool.core.date.DateUtil; import com.alibab…

该文档演示了fink windows的操作DEMO

环境准备:

  • kafka本地运行:kafka部署
  • 自动生成名字代码:随机名
  • 自动生成随机IP代码:随机IP
  • Flink 1.18

测试数据

自动向kafka推送数据

import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.example.dto.KafkaPvDto;
import com.wfg.flink.example.utils.RandomGeneratorUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.time.LocalDateTime;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;import static com.wfg.flink.example.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.example.constants.Constants.TOPIC_NAME;public class KafkaTestProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", KAFKA_BROKERS);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");try (Producer<String, String> producer = new KafkaProducer<>(props)) {int times = 100000;for (int i = 0; i < times; i++) {System.out.println("Send No. :" + i);CompletableFuture.allOf(CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer))).join();producer.flush();Random random = new Random();int randomNumber = random.nextInt(7); // 生成一个0到6的随机数Thread.sleep(1000 * randomNumber);}} catch (InterruptedException e) {throw new RuntimeException(e);}}private static void sendKafkaMsg(Producer<String, String> producer) {String msg = createMsg();System.out.println(msg);producer.send(new ProducerRecord<>(TOPIC_NAME, UUID.randomUUID().toString().replaceAll("-", ""), msg));}private static String createMsg() {KafkaPvDto dto = new KafkaPvDto();dto.setUuid(UUID.randomUUID().toString().replaceAll("-", ""));dto.setUserName(RandomGeneratorUtils.generateRandomFullName());dto.setVisitIp(RandomGeneratorUtils.generateRandomIp());
//        DateTime begin = DateUtil.beginOfDay(new Date());
//        String timeStr = DateUtil.format(RandomGeneratorUtils.generateRandomDateTime(LocalDateTimeUtil.of(begin).toLocalDate(), LocalDate.now()), "yyyy-MM-dd HH:mm:ss");String timeStr = DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss");dto.setVisitTime(timeStr);dto.setVisitServiceIp(RandomGeneratorUtils.generateRandomIp());return JSONObject.toJSONString(dto);}
}

注意:

  • kafka本地运行:kafka部署
  • 自动生成名字代码:随机名
  • 自动生成随机IP代码:随机IP

FLINK 数据


/**** @author wfg*/
@Slf4j
public class DataSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) {KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);if (data != null) {collector.collect(new Tuple2<>(data.getUserName(), 1));}}
}

基于时间窗口

*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于时间窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(30)).sum(0).print();*/env.execute("flink window example");}
}

基于滑动时间窗口

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于滑动时间窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(60), Time.seconds(30)).sum(0).print();env.execute("flink window example");}
}

基于事件数量窗口

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于事件数量窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(3).sum(0).print();env.execute("flink window example");}
}

基于事件数量滑动窗口

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于事件数量滑动窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(4, 3).sum(0).print();*env.execute("flink window example");}
}

基于会话时间窗口

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于会话时间窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))//表示如果 5s 内没出现数据则认为超出会话时长,然后计算这个窗口的和.sum(1).print();env.execute("flink window example");}
}

滚动窗口(Tumbling Window)

滚动窗口(Tumbling Window)

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//滚动窗口(Tumbling Window) 基于处理时间的 30 秒滚动窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(30))).sum(1).print();;env.execute("flink window example");}
}

基于事件时间

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于事件时间的 30 秒滚动窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).assignTimestampsAndWatermarks(/* 分配时间戳和水印 */).window(TumblingEventTimeWindows.of(Time.seconds(30))).sum(1).print();env.execute("flink window example");}
}

滑动窗口(Sliding Window)

基于处理时间

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于处理时间的 30 秒滑动窗口,滑动间隔为 10 秒data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute("flink window example");}
}

基于事件时间

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于事件时间的 30 秒滑动窗口,滑动间隔为 10 秒  data.flatMap(new DataSplitter()).keyBy(v->v.f0).assignTimestampsAndWatermarks(/* 分配时间戳和水印 */).window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute("flink window example");}
}

注意:

  • kafka本地运行:kafka部署
  • 自动生成名字代码:随机名
  • 自动生成随机IP代码:随机IP
http://www.yidumall.com/news/76325.html

相关文章:

  • 洮南网站建设哪家专业sem优化师是什么意思
  • 十堰网站建设兼职中国十大seo
  • 淮安新网站制作好网站制作公司
  • 重庆招生院校网站百度seo网站在线诊断
  • 网站开发毕业设计代做博为峰软件测试培训学费
  • 韶关网站建设的公司鄂州seo
  • 云浮网站建设公司常用的关键词有哪些
  • 网站建站方案书seo营销推广公司
  • 无锡有哪些做网站的公司老域名
  • 个人网站 建站郑州百度推广外包
  • ic网站建设企业网站设计素材
  • 短视频公司网站建设方案1688的网站特色
  • wordpress 修改urlseo和sem是什么意思啊
  • 在网站建设中功能描述书的功能什么是市场营销
  • 视频转文字网页兰州网络seo公司
  • 长沙有什么好玩的室内场所seo怎么优化
  • 河北省两学一做网站it培训机构哪个好一点
  • 网站建设优化一年赚几十万网站制作费用一览表
  • 临沂高端网站建设最专业的seo公司
  • 想要去网站做友情链接怎么发邮件seo流程
  • 网站流量不够怎么办线上推广软件
  • 合肥设网站软文素材网站
  • 网络营销的基本内容有哪些seo门户网价格是多少钱
  • 苏州网站建设问问q778925409强涵最新军事新闻今日最新消息
  • 抚顺做网站b站在线观看
  • 广州网站建设公司排行品牌seo培训咨询
  • 在百度上做公司做网站seo业务培训
  • 南宁做网站的公司有哪些百度seo是啥意思
  • 怎么使用网站模板网络推广的方式
  • 免费ppt模板下载网址不需要会员seo自动刷外链工具