当前位置: 首页 > news >正文

网站运营主要是做什么seo免费外链工具

网站运营主要是做什么,seo免费外链工具,租房子做民宿在哪个网站,网站建设模板网站消费者位移 消费者位移这一节介绍了消费者位移的基本概念和消息格式,本节我们来聊聊消费位移的提交。 Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费…
消费者位移

消费者位移这一节介绍了消费者位移的基本概念和消息格式,本节我们来聊聊消费位移的提交。

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

提交位移主要是为了记录Consumer 的消费进度,这样当 Consumer 发生重启之后,就能够从 Kafka 中读取之前提交的位移,从而继续消费,避免以避免重复消费,或消息丢失等。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。

因为位移提交非常灵活,你完全可以提交任何位移值。假设你的 Consumer 消费了 10 条消息,你提交的位移值却是 20,那么从理论上讲就丢失了10条数据;相反地,如果你提交的位移值是 5,那么就重复消费5条数据。所以你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。

位移提交

从使用角度来说位移提交分为自动提交和手动提交;从 Consumer 的角度来说,位移提交分为同步提交和异步提交。

自动提交

默认情况下就是自动提交,你根本无需关心位移提交的事情,Consumer 端有个参数 enable.auto.commit默认值是 true,即 Consumer 默认自动提交位移的。还有个参数auto.commit.interval.ms,默认值是 5 秒,即每 5 秒会为你自动提交一次位移。

这里我们用一段简单的代码来看看这两个参数怎么使用

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "kafka_test");// 自动提交props.put("enable.auto.commit", "true");// 间隔2秒  props.put("auto.commit.interval.ms", "2000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// process}}
手动提交

设置 enable.auto.commit 为 false,还需要调用相应的 API 手动提交位移,KafkaConsumer.commitSync()。

// props.put("enable.auto.commit", "false");
while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));// 处理消息process(records); try {// 同步提交consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}

commitSync()有一个缺陷,提交时Consumer 程序会处于阻塞状态,在生产系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。虽然也可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。鉴于这个问题,Kafka 提供了另一个 异步API 方法:KafkaConsumer.commitAsync()。

不过commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

我们可以将 commitSync 和 commitAsync 组合使用以规避这样的问题:

   try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 异步提交规避阻塞commitAysnc(); }
} catch(Exception e) {} finally {try {// 使用同步阻塞式提交兜底consumer.commitSync(); } finally {consumer.close();
}
}

同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性,如果你自行编写代码开发一套 Kafka Consumer 应用,可以尝试使用上面的代码范例来实现手动的位移提交。

其实还有一种更高级的提交方式,就是分批量提交,就不再这里展开,留给大家查资料学习,也欢迎各位同学在评论区交流讨论!

http://www.yidumall.com/news/52380.html

相关文章:

  • 深圳注册公司地址新规定正版seo搜索引擎
  • 北京住房和建设城乡委员会网站核心关键词举例
  • 镇江做网站多少钱营销型网站开发公司
  • 广西建设厅证书查询seo网站推广工具
  • 广州营销型网站建设价格google推广 的效果
  • 寺院网站建设腾讯广告推广怎么做
  • 安阳那里可以制作网站app搜索优化
  • 襄阳seo站内优化扬州seo
  • 华为官方网站进入百度关键词排名批量查询工具
  • 网站建设高端定制免费python在线网站
  • 亿创电力建设集团有限公司网站舆情分析网站
  • 做网站为什么能挣钱企业网站营销的优缺点及案例
  • 佛山专业网站建设价格搜索热门关键词
  • 网站建设实验报告总结两千字台州网站优化公司
  • 有什么网站可以做深圳初二的试卷练习优化百度涨
  • 给赌博人做网站百度站长工具seo
  • 做泵阀生意到哪个网站排名软件下载
  • dede模板分为 网站建设好吗要怎么做网络推广
  • 马鞍山网站制作公司网络推广公司北京
  • 免费好用的网站制作电商培训机构哪家强
  • 管理咨询公司有哪些方面seo资讯网
  • 邵阳网站建设的话术友情链接也称为
  • 静态网站开发一体化课程seo成都培训
  • 企业融资abcd轮什么意思网站结构优化的内容和方法
  • 顺义公司建站多少钱怎么学做电商然后自己创业
  • html访问wordpress北京网站优化合作
  • 大连手机自适应网站建设服务seo搜索引擎优化视频
  • 网站的页面结构免费推广产品的平台
  • 国家税务总局网站官网发票查询搜索引擎营销包括
  • 内蒙古呼和浩特市邮编重庆企业站seo