当前位置: 首页 > news >正文

注册公司名字推荐新网站百度seo如何做

注册公司名字推荐,新网站百度seo如何做,无锡建设工程项目代码申请网站,电脑怎么制作app背景: kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性 TextInputFormat源码解析 首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取…

背景:

kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性

TextInputFormat源码解析

首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取其中的数据块,但是不是所有的文件都能进行分块,判断文件是否可以进行分块的代码如下:

protected boolean testForUnsplittable(FileStatus pathFile) {if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {unsplittable = true;return true;}return false;
}private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Path path) {String fileExtension = extractFileExtension(path.getName());if (fileExtension != null) {return getInflaterInputStreamFactory(fileExtension);} else {return null;}
}

在这里插入图片描述

后缀名称是.gz,.bzip2等的文件都没法切分,如果可以切分,切分的具体代码如下所示:

while (samplesTaken < numSamples && fileNum < allFiles.size()) {// make a split for the sample and use it to read a recordFileStatus file = allFiles.get(fileNum);
// 根据偏移量进行切分FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);// we open the split, read one line, and take its lengthtry {open(split);if (readLine()) {totalNumBytes += this.currLen + this.delimiter.length;samplesTaken++;}} finally {// close the file stream, do not release the bufferssuper.close();}
// 偏移量迁移offset += stepSize;// skip to the next file, if necessarywhile (fileNum < allFiles.size()&& offset >= (file = allFiles.get(fileNum)).getLen()) {offset -= file.getLen();fileNum++;}
}

再来看一下TextInputFormat如何支持checkpoint操作,保存文件的偏移量的代码:

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);checkState(checkpointedState != null, "The operator state has not been properly initialized.");int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();// 算子列表状态checkpointedState.clear();// 获取文件的当前读取的偏移List<T> readerState = getReaderState();try {for (T split : readerState) {//保存到检查点路径中checkpointedState.add(split);}} catch (Exception e) {checkpointedState.clear();throw new Exception("Could not add timestamped file input splits to to operator "+ "state backend of operator "+ getOperatorName()+ '.',e);}if (LOG.isDebugEnabled()) {LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",getClass().getSimpleName(),subtaskIdx,readerState.size(),readerState);}
}

从检查点中恢复状态的代码如下:

public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);checkState(checkpointedState == null, "The reader state has already been initialized.");// 初始化算子操作状态checkpointedState =context.getOperatorStateStore().getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>()));int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);splits = splits == null ? new PriorityQueue<>() : splits;for (T split : checkpointedState.get()) {//从检查点状态中恢复各个切分的分块splits.add(split);}
}
http://www.yidumall.com/news/99261.html

相关文章:

  • 贵州茅台酒股份有限公司网站免费营销培训
  • 怎么建个人网站网站广告收费标准
  • 上海做网站找谁免费发广告的平台有哪些
  • 宝塔做网站443链接推广费用一般多少钱
  • .net网站开发源码温州免费建站模板
  • 做网站为什么不要源代码网络营销有哪些方式
  • 用html网站建设过程晚上网站推广软件免费版
  • 哪个网站可以做代码题目系统优化大师
  • 给人做网站挣钱吗最近一周的新闻
  • 网站建设 数据分析深圳网络推广团队
  • 长沙网站定制建设福建seo网站
  • 青岛定制网站建设推广万能搜索引擎
  • 哪个网站做h5好收录之家
  • 网站开发黄色片营销网站建设软件下载
  • 房产做网站是什么意思推广运营
  • 网站建设时间规划推广产品最好的方式
  • 怎样做pdf电子书下载网站网站访问量统计工具
  • 建阳网站建设wzjseo陕西网站建设网络公司
  • 珠海市人力资源和社会保障网上服务平台上海谷歌seo推广公司
  • 佛山专业的做网站百度指数官网
  • 城市文化网站开发背景最有效的网络推广方式和策略
  • 宜宾三江新区核酸检测泉州seo网站排名
  • 专业开发网站建设哪家好今日国际重大新闻事件
  • 什么源码做有趣的网站长春seo网站优化
  • 做服装批发在哪个网站好网站品牌推广策略
  • 阜阳做网站的网络公司seo网站推广的主要目的是什么
  • 网站建设注意那湖南网站设计外包费用
  • 广告公司名称大全简单苏州网站优化公司
  • 网站中转页怎么做推广普通话手抄报内容怎么写
  • 手机网站建设介绍新闻发稿推广