瀏覽代碼

更改kafka

dongjh 1 年之前
父節點
當前提交
7268b7a84b

+ 85 - 0
module_ems/src/main/java/org/jeecg/modules/kafka/config/KafkaConsumerConfig.java

@@ -0,0 +1,85 @@
+package org.jeecg.modules.kafka.config;
+/**
+ * author jinsq
+ *
+ * @date 2019/5/22 15:10
+ */
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * kafka消费者配置
+ * 不需要,KafkaTemplate会自动获取配置文件,只要配置文件是严格按照spring.kafka.consumer格式书写
+ * @author Lvjiapeng
+ *
+ */
+//@Configuration
+//@EnableKafka
+public class KafkaConsumerConfig {
+
+//    @Value("${spring.kafka.bootstrap-servers}")
+//    private String servers;
+//    @Value("${spring.kafka.consumer.enable-auto-commit}")
+//    private boolean enableAutoCommit;
+////    @Value("${spring.kafka.consumer.session.timeout}")
+////    private String sessionTimeout;
+//    @Value("${spring.kafka.consumer.auto-commit-interval}")
+//    private String autoCommitInterval;
+//    @Value("${spring.kafka.consumer.group-id}")
+//    private String groupId;
+//    @Value("${spring.kafka.consumer.auto-offset-reset}")
+//    private String autoOffsetReset;
+////    @Value("${spring.kafka.consumer.concurrency}")
+////    private int concurrency;
+//    @Value("${spring.kafka.consumer.key-deserializer}")
+//    private int keyDeserializer;
+//    @Value("${spring.kafka.consumer.value-deserializer}")
+//    private int valueDeserializer;
+//
+//    public Map<String, Object> consumerConfigs() {
+//        Map<String, Object> propsMap = new HashMap<>();
+//        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+//        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
+//        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
+////        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
+//        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
+//        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
+//        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+//        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
+//        return propsMap;
+//    }
+//
+//    public ConsumerFactory<String, String> consumerFactory() {
+//        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
+//    }
+//
+//    @Bean
+//    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
+//        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
+//        factory.setConsumerFactory(consumerFactory());
+////        factory.setConcurrency(concurrency);
+//        factory.getContainerProperties().setPollTimeout(1500);
+//        return factory;
+//    }
+
+    /**
+     * kafka监听
+     * @return
+     */
+//    @Bean
+//    public RawDataListener listener() {
+//        return new RawDataListener();
+//    }
+
+}

+ 68 - 0
module_ems/src/main/java/org/jeecg/modules/kafka/config/KafkaProducerConfig.java

@@ -0,0 +1,68 @@
+package org.jeecg.modules.kafka.config;
+/**
+ * author jinsq
+ *
+ * @date 2019/5/22 15:09
+ */
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * kafka生产配置
+ * 不需要,KafkaTemplate会自动获取配置文件,只要配置文件是严格按照spring.kafka.producer格式书写
+ * @author Lvjiapeng
+ *
+ */
+//@Configuration
+//@EnableKafka
+public class KafkaProducerConfig {
+//    @Value("${spring.kafka.bootstrap-servers}")
+//    private String servers;
+//    @Value("${spring.kafka.producer.retries}")
+//    private int retries;
+//    @Value("${spring.kafka.producer.batch-size}")
+//    private int batchSize;
+////    @Value("${spring.kafka.producer.linger}")
+////    private int linger;
+//    @Value("${spring.kafka.producer.buffer-memory}")
+//    private int bufferMemory;
+//    @Value("${spring.kafka.producer.key-serializer}")
+//    private String keySerializer;
+//    @Value("${spring.kafka.producer.value-serializer}")
+//    private int valueSerializer;
+//    @Value("${spring.kafka.producer.acks}")
+//    private int acks;
+//
+//    public Map<String, Object> producerConfigs() {
+//        Map<String, Object> props = new HashMap<>();
+//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+//        props.put(ProducerConfig.RETRIES_CONFIG, retries);
+//        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
+////        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
+//        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
+//        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
+//        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
+//        props.put(ProducerConfig.ACKS_CONFIG, acks);
+//        return props;
+//    }
+//
+//    public ProducerFactory<String, String> producerFactory() {
+//        return new DefaultKafkaProducerFactory<>(producerConfigs());
+//    }
+//
+//    @Bean
+//    public KafkaTemplate<String, String> kafkaTemplate() {
+//        return new KafkaTemplate<String, String>(producerFactory());
+//    }
+}