当前位置: 首页 > news >正文

php建设网站怎么用上海seo排名

php建设网站怎么用,上海seo排名,手机网站制作推广,做淘宝客网站违法吗Kafka 消费组位移消费者 API命令行Kafka : 基于日志结构(log-based)的消息引擎 消费消息时,只是从磁盘文件上读取数据,不会删除消息数据位移数据能由消费者控制,能很容易修改位移的值,实现重复消费历史数据…

Kafka 消费组位移

  • 消费者 API
  • 命令行

Kafka : 基于日志结构(log-based)的消息引擎

  • 消费消息时,只是从磁盘文件上读取数据,不会删除消息数据
  • 位移数据能由消费者控制,能很容易修改位移的值,实现重复消费历史数据

重设位移的两个维度 :

  • 位移维度 : 根据位移值重设 : 把消费者的位移值重设成自定义位移值
  • 时间维度 : 将位移调整成 > 该时间的最小位移

7 种重设策略 :

维度策略含义
位移维度Earliest位移调整当前最早位移处
Latest位移调整到当前最新位移处
Current位移调整到当前最新提交位移处
Specified-Offset位移调整成指定位移
Shift-By-N位移调整到当前位移 + N 处
时间维度DataTime位移调整到大于给定时间的最小位移处
Duration位移调整到离当前时间指定间隔的位移处

Earliest 策略 : 将位移调整到主题当前最早位移处

  • 不一定是 0,很久的消息会被 Kafka 自动删除,所以当前最早位移可能是 > 0 的值
  • 当要重新消费主题的所有消息,就用 Earliest 策略

Latest 策略 : 把位移重设成最新末端位移

  • 当向某个主题发送 15 条消息,最新末端位移是 15
  • 当要跳过所有历史消息,从最新的消息处开始消费,就用 Latest 策略

Current 策略 : 将位移调整成消费者当前提交的最新位移

  • 修改消费者代码,并重启消费者,结果发现代码有问题,回滚前的代码变更,还把位移重设为消费者重启时的位置

Specified-Offset 策略 : 消费者把位移值调整到你指定的位移处

  • 消费者处理某条错误消息时 , 能手动跳过此消息的处理
  • 生产中 , 出现 corrupted 消息无法被消费时,消费者会抛出异常,无法继续工作

Specified-Offset 策略 : 指定位移的绝对数值

Shift-By-N 策略 : 指定位移的相对数值 : 跳过一段消息的距离

  • 把位移重设成当前位移的前 100 条位移处,指定 N 为 -100

DateTime : 指定一个时间,将位移重置到该时间之后的最早位移处

  • 使用场景 : 重新消费昨天的数据,并使用该策略重设位移到昨天 0 点

Duration 策略 : 相对的时间间隔,将位移调整到距离当前给定时间间隔的位移处,格式是 PnDTnHnMnS

  • Java 8 引入 Duration : 以字母 P 开头,后面由 4 部分组成 (D、H、M、S : 天、小时、分钟、秒)
  • 将位移调回到 15 分钟前,就指定 PT0H15M0S

消费者 API

Earliest 策略 :

Properties consumerProperties = new Properties();
// 禁止自动提交位移
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 重设的消费者组的组 ID
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);String topic = "test";  // 要重设位移的 Kafka 主题 
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {consumer.subscribe(Collections.singleton(topic));consumer.poll(0);// 一次性构造主题的所有分区对象consumer.seekToBeginning( consumer.partitionsFor(topic).stream().map(partitionInfo ->new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));
} 

Latest 策略 :

consumer.seekToEnd(consumer.partitionsFor(topic).stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));

Current 策略 : 获取当前提交的最新位移 :

consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).forEach(tp -> {long committedOffset = consumer.committed(tp).offset();consumer.seek(tp, committedOffset);
});

Specified-Offset 策略 :

long targetOffset = 1234L;for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp = new TopicPartition(topic, info.partition());consumer.seek(tp, targetOffset);
}

实现 Shift-By-N 策略

for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp = new TopicPartition(topic, info.partition());// 假设向前跳 123 条消息long targetOffset = consumer.committed(tp).offset() + 123L; consumer.seek(tp, targetOffset);
}

DateTime 策略 : 重设位移到 2022 年 12 月 11 日晚上 8 点

long ts = LocalDateTime.of(2022, 12, 11, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp -> ts));for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset());
}

Duration 策略 : 将位移调回 30 分钟前

Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000  * 60));for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset());
}

命令行

Earliest 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-earliest –execute

Latest 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-latest --execute

Current 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-current --execute

Specified-Offset 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-offset <offset> --execute

Shift-By-N 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--shift-by <offset_N> --execute

DateTime 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--to-datetime 2019-06-20T20:00:00.000 --execute

Duration 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--by-duration PT0H30M0S --execute
http://www.yidumall.com/news/29214.html

相关文章:

  • 做民宿怎么登录网站seo会被取代吗
  • 东莞建站网站模板优化关键词排名提升
  • 百度网站建设是什么意思2024小学生时事新闻十条
  • 可以做网站头像的图片广告推广精准引流
  • 厦门广告设计制作公司搜索引擎营销优化诊断训练
  • 建网站电话公司广告推广
  • 徐州 商城网站建设高级seo优化招聘
  • 网站模板整站舟山百度seo
  • 黄石建网站百度新闻下载安装
  • 网站首页图片做多大影视剪辑培训机构排名
  • 典型的电子商务网站有哪些韩国搜索引擎排名
  • 山东专业网站建设友情链接对网站的作用
  • 苏州做网站的公司智慧营销系统平台
  • 北京建设专职查询网站网络推广运营主要做什么
  • centos 7.2 做网站seo北京优化
  • 网站标识代码怎么加手游推广渠道平台
  • 怎么做电商平台网站短视频seo排名加盟
  • 建站最好的创建网页步骤
  • 什么蓝色 适合公司网站主色免费发广告网站
  • 鞍山贴吧最新消息北京建站优化
  • 个人做跨境电商的平台网站有哪些seo排名赚靠谱吗
  • 域名查ipseo优化信
  • 微网站建设正规公司如何自己制作一个网站
  • 网站不更新天津seo公司
  • 长沙做网站品牌深圳网络推广服务是什么
  • 济南历山北路网站建设百度下载app下载
  • 如何在手机使用wordpress网站seo优化方案设计
  • 长春火车站疫情防控咨询电话号码企业营销策划是做什么的
  • 网站开发源代码app推广方式有哪些
  • 外包网站问些什么问题无锡网站建设seo