当前位置: 首页 > news >正文

网站建设接单搜索引擎调词平台多少钱

网站建设接单,搜索引擎调词平台多少钱,多少钱一度电,石家庄大型网站建设动态初始化Kafka消费者实例一.Kafka 环境搭建二.动态初始化消费者1.Topic定义2.方法处理器工厂3.参数解析器(Copy SpringBoot 源码)4.消费接口和消费实现5.动态初始化1.关键类简介2.动态初始化实现一.Kafka 环境搭建 参考:Kafka搭建和测试 …

动态初始化Kafka消费者实例

  • 一.Kafka 环境搭建
  • 二.动态初始化消费者
    • 1.Topic定义
    • 2.方法处理器工厂
    • 3.参数解析器(Copy SpringBoot 源码)
    • 4.消费接口和消费实现
    • 5.动态初始化
      • 1.关键类简介
      • 2.动态初始化实现

一.Kafka 环境搭建

参考:Kafka搭建和测试

二.动态初始化消费者

1.Topic定义

动态初始化,即不通过注解和配置文件实现消费者的初始化,定义一个Topic对象,用于设置消费者参数

package com.demo.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author * @date 2023-02-08 15:06* @since 1.8*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Topic {private String id;private String topic;private Integer partitions;private String group = "test";private String clientPrefix;
}

2.方法处理器工厂

此类直接使用 SpringBoot 源码,原实现为私有类

package com.demo.manual;import org.springframework.context.ApplicationContext;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.ConditionalGenericConverter;
import org.springframework.core.convert.converter.Converter;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.util.Assert;
import org.springframework.validation.Validator;import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;/*** @author * @date 2023-02-08 14:18* @since 1.8*/
public class MessageHandlerMethodFactory implements org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory {private ApplicationContext applicationContext;private Validator validator;private List<HandlerMethodArgumentResolver> customMethodArgumentResolvers = new ArrayList<>();private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory handlerMethodFactory;public MessageHandlerMethodFactory(Validator validator, ApplicationContext applicationContext) {this.validator = validator;this.applicationContext = applicationContext;}public void setHandlerMethodFactory(org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {this.handlerMethodFactory = kafkaHandlerMethodFactory1;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory getHandlerMethodFactory() {if (this.handlerMethodFactory == null) {this.handlerMethodFactory = createDefaultMessageHandlerMethodFactory();}return this.handlerMethodFactory;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();if (this.validator != null) {defaultFactory.setValidator(this.validator);}defaultFactory.setBeanFactory(this.applicationContext);this.defaultFormattingConversionService.addConverter(new BytesToStringConverter(StandardCharsets.UTF_8));this.defaultFormattingConversionService.addConverter(new BytesToNumberConverter());defaultFactory.setConversionService(this.defaultFormattingConversionService);GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);defaultFactory.setMessageConverter(messageConverter);List<HandlerMethodArgumentResolver> customArgumentsResolver =new ArrayList<>(Collections.unmodifiableList(this.customMethodArgumentResolvers));// Has to be at the end - look at PayloadMethodArgumentResolver documentationcustomArgumentsResolver.add(new NullAwarePayloadArgumentResolver(messageConverter, this.validator));defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);defaultFactory.afterPropertiesSet();return defaultFactory;}@Overridepublic InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {return getHandlerMethodFactory().createInvocableHandlerMethod(bean, method);}private static class BytesToStringConverter implements Converter<byte[], String> {private final Charset charset;BytesToStringConverter(Charset charset) {this.charset = charset;}@Overridepublic String convert(byte[] source) {return new String(source, this.charset);}}private final class BytesToNumberConverter implements ConditionalGenericConverter {BytesToNumberConverter() {}@Override@Nullablepublic Set<ConvertiblePair> getConvertibleTypes() {HashSet<ConvertiblePair> pairs = new HashSet<>();pairs.add(new ConvertiblePair(byte[].class, long.class));pairs.add(new ConvertiblePair(byte[].class, int.class));pairs.add(new ConvertiblePair(byte[].class, short.class));pairs.add(new ConvertiblePair(byte[].class, byte.class));pairs.add(new ConvertiblePair(byte[].class, Long.class));pairs.add(new ConvertiblePair(byte[].class, Integer.class));pairs.add(new ConvertiblePair(byte[].class, Short.class));pairs.add(new ConvertiblePair(byte[].class, Byte.class));return pairs;}@Override@Nullablepublic Object convert(@Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {byte[] bytes = (byte[]) source;if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) {Assert.state(bytes.length >= 8, "At least 8 bytes needed to convert a byte[] to a long"); // NOSONARreturn ByteBuffer.wrap(bytes).getLong();}else if (targetType.getType().equals(int.class) || targetType.getType().equals(Integer.class)) {Assert.state(bytes.length >= 4, "At least 4 bytes needed to convert a byte[] to an integer"); // NOSONARreturn ByteBuffer.wrap(bytes).getInt();}else if (targetType.getType().equals(short.class) || targetType.getType().equals(Short.class)) {Assert.state(bytes.length >= 2, "At least 2 bytes needed to convert a byte[] to a short"); // NOSONARreturn ByteBuffer.wrap(bytes).getShort();}else if (targetType.getType().equals(byte.class) || targetType.getType().equals(Byte.class)) {Assert.state(bytes.length >= 1, "At least 1 byte needed to convert a byte[] to a byte"); // NOSONARreturn ByteBuffer.wrap(bytes).get();}return null;}@Overridepublic boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {if (sourceType.getType().equals(byte[].class)) {Class<?> target = targetType.getType();return target.equals(long.class) || target.equals(int.class) || target.equals(short.class) // NOSONAR|| target.equals(byte.class) || target.equals(Long.class) || target.equals(Integer.class)|| target.equals(Short.class) || target.equals(Byte.class);}else {return false;}}}
}

3.参数解析器(Copy SpringBoot 源码)

此类直接使用 SpringBoot 源码,原实现为私有类

package com.demo.manual;import org.springframework.core.MethodParameter;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
import org.springframework.validation.Validator;import java.util.List;/*** @author * @date 2023-02-08 14:36* @since 1.8*/
public class NullAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {NullAwarePayloadArgumentResolver(MessageConverter messageConverter, Validator validator) {super(messageConverter, validator);}@Overridepublic Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception { // NOSONARObject resolved = super.resolveArgument(parameter, message);/** Replace KafkaNull list elements with null.*/if (resolved instanceof List) {List<?> list = ((List<?>) resolved);for (int i = 0; i < list.size(); i++) {if (list.get(i) instanceof KafkaNull) {list.set(i, null);}}}return resolved;}@Overrideprotected boolean isEmptyPayload(Object payload) {return payload == null || payload instanceof KafkaNull;}}

4.消费接口和消费实现

当前接口和实现为了用于做统一的数据处理,可以在实现类内再根据Topic去调用对应的数据解析方法

接口:

package com.demo.manual;import org.apache.kafka.clients.consumer.ConsumerRecord;/*** @author * @date 2023-02-08 13:46* @since 1.8*/
public interface Handler {void deal(ConsumerRecord<String, String> cRecord);
}

实现:

package com.demo.manual;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;/*** @author * @date 2023-02-08 11:49* @since 1.8*/
@Slf4j
public class ManualHandler implements Handler{@Overridepublic void deal(ConsumerRecord<String, String> cRecord) {log.info("  Topic:{} Partition:{} Content:{}",cRecord.topic(),cRecord.partition(),cRecord.value());}
}

5.动态初始化

1.关键类简介

此处通过接口调用,实现创建、暂停和恢复消费;可根据实际应用场景进行设计

关键类说明
KafkaListenerEndpointRegistrySpring 的 Kafka 监听容器,可以通过 Id 获取 Listener 实例,从而暂停或恢复消费监听
ConcurrentKafkaListenerContainerFactoryListener 工厂,定义代码可参考上面链接的(2.3 节)
ConsumerAwareListenerErrorHandler消费异常处理器,定义代码可参考上面链接的(2.3 节)
ApplicationContextSpring 的上下文容器,MessageHandlerMethodFactory 初始化用
MethodKafkaListenerEndpointKafka 配置节点,详细逻辑可参考源码

SpringBoot 自动初始化 Kafka 消费者的主要实现类和方法

package org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor

	/*** 此处为相关源码,仅供参考 寻找带有 @KafkaListener 注解的类并初始化/@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class<?> targetClass = AopUtils.getTargetClass(bean);Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);final boolean hasClassLevelListeners = !classLevelListeners.isEmpty();final List<Method> multiMethods = new ArrayList<>();Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {Set<KafkaListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (hasClassLevelListeners) {Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method ->AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty() && !hasClassLevelListeners) {this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());}else {// Non-empty set of methodsfor (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (KafkaListener listener : entry.getValue()) {processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"+ beanName + "': " + annotatedMethods);}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}

2.动态初始化实现

package com.demo.controller;import com.demo.entity.Topic;
import com.demo.manual.MessageHandlerMethodFactory;
import com.demo.manual.ManualHandler;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author * @date 2023-02-07 13:40* @since 1.8*/
@Slf4j
@RestController
@RequestMapping("/listener")
public class ListenerController {@AutowiredKafkaListenerEndpointRegistry registry;@Autowired@Qualifier("batchTestContainerFactory")ConcurrentKafkaListenerContainerFactory<String,String> batchTestContainerFactory;@AutowiredConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler;@AutowiredApplicationContext applicationContext;MessageHandlerMethodFactory factory;@PostConstructprivate void init(){factory = new MessageHandlerMethodFactory(null,applicationContext);}static Map<String, Topic> map = new ConcurrentHashMap<>();static {map.put("test_manual_1_id",new Topic("test_manual_1_id","test-topic-new.1",2,"mygroup","test_manual_1_batch"));map.put("test_manual_2_id",new Topic("test_manual_2_id","test-topic-new.2",1,"mygroup","test_manual_2_batch"));}/*** 停止消费 自行选择停止时是否需要从监听容器内移除实例,容器为 Map 实现* Map<String, MessageListenerContainer>* @param id*/@GetMapping("/close")public void close(String id){MessageListenerContainer container = registry.unregisterListenerContainer(id);container.destroy();}/*** 开始消费 若果是已注册的则判断是否暂停,暂停则恢复* 如果不存在,则定义一个消费者,注册到容器内并启动* @param id* @throws NoSuchMethodException*/@GetMapping("/open")public void open(String id) throws NoSuchMethodException {MessageListenerContainer container = registry.getListenerContainer(id);if (null!=container){if (!container.isRunning()){container.start();container.resume();}} else {//TODO 新建一个对应 Topic 的实例Topic topic = map.get(id);if (null==topic){return;}ManualHandler bean = new ManualHandler();MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();endpoint.setMessageHandlerMethodFactory(factory);endpoint.setBean(bean);Method[] methods = bean.getClass().getDeclaredMethods();endpoint.setMethod(checkProxy(methods[0],bean));endpoint.setId(topic.getId());endpoint.setTopics(topic.getTopic());endpoint.setGroupId(topic.getGroup());endpoint.setClientIdPrefix(topic.getClientPrefix());endpoint.setConcurrency(topic.getPartitions());endpoint.setErrorHandler(consumerAwareListenerErrorHandler);registry.registerListenerContainer(endpoint,batchTestContainerFactory);container = registry.getListenerContainer(id);container.start();}}/*** Copy Spring 源码* @param methodArg* @param bean* @return*/private Method checkProxy(Method methodArg, Object bean) {Method method = methodArg;if (AopUtils.isJdkDynamicProxy(bean)) {try {// Found a @KafkaListener method on the target class for this JDK proxy ->// is it also present on the proxy itself?method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();for (Class<?> iface : proxiedInterfaces) {try {method = iface.getMethod(method.getName(), method.getParameterTypes());break;}catch (@SuppressWarnings("unused") NoSuchMethodException noMethod) {// NOSONAR}}}catch (SecurityException ex) {ReflectionUtils.handleReflectionException(ex);}catch (NoSuchMethodException ex) {throw new IllegalStateException(String.format("@KafkaListener method '%s' found on bean target class '%s', " +"but not found in any interface(s) for bean JDK proxy. Either " +"pull the method up to an interface or switch to subclass (CGLIB) " +"proxies by setting proxy-target-class/proxyTargetClass " +"attribute to 'true'", method.getName(),method.getDeclaringClass().getSimpleName()), ex);}}return method;}
}
http://www.yidumall.com/news/42452.html

相关文章:

  • 高师院校语言类课程体系改革与建设 教学成果奖申报网站seo推广优化服务
  • 网络工程师自学网站广州网络营销推广
  • 营销型网站源码线上推广活动有哪些
  • 泰安网站设计百度关键词优化工具
  • 网站开发与制作中期报告搜索引擎广告形式有哪些
  • 企业seo排名服务裤子seo标题优化关键词
  • 做英语网站百度秒收录软件
  • 莆田做网站的公司网推怎么做
  • 建设银行境外购物网站中公教育培训机构官网
  • 怎么自己做网页初学者网络优化工程师有多累
  • 有合作做时时彩网站的吗谷歌外贸网站推广
  • 连云港公司做网站广东今日最新疫情通报
  • 网上还有什么网站做批发软文发布
  • wordpress城市插件无锡seo优化公司
  • 营销型网站九大特点百度搜索下载app
  • 贵阳网站建设王道下拉惠淘宝优化
  • 郑州计算机培训机构哪个最好seo排名优化公司价格
  • php动态网站开发视频关键词林俊杰无损下载
  • 网站制作深圳社群运营
  • 做网站怎么自定义背景图片seo推广平台
  • 网站设计一个版块seo推广培训课程
  • 国内免费无版权图片素材网站全网搜索引擎
  • 淘宝网站开发注册网站流程
  • 深圳做夜场做网站杭州seo网站优化公司
  • 国外jquery特效网站营销型网站案例
  • 公司网站要什么做营销推广的工具有哪些
  • 旅社网站建设国产搜什么关键词最好看
  • 我的网站要换新域名如何做seo免费培训
  • 用dw做销售网站网络推广外包公司哪家好
  • .net做的网站代码app推广接单平台