1 package it.demo.kafka.springkafka.listener; 2 3 import org.springframework.beans.BeansException; 4 import org.springframework.context.ApplicationContext; 5 import org.springframework.context.ApplicationContextAware; 6 import org.springframework.integration.endpoint.EventDrivenConsumer; 7 import org.springframework.integration.endpoint.SourcePollingChannelAdapter; 8 import org.springframework.integration.kafka.support.ConsumerConfiguration; 9 import org.springframework.integration.kafka.support.KafkaConsumerContext;10 11 import com.yammer.metrics.Metrics;12 13 public class KafkaConsumerStarter implements ApplicationContextAware14 {15 private ApplicationContext appContext;16 17 private SourcePollingChannelAdapter kafkaInboundChannelAdapter;18 19 private KafkaConsumerContext kafkaConsumerContext;20 21 public void initIt() throws Exception22 {23 kafkaInboundChannelAdapter = appContext.getBean("kafka-inbound-channel-adapter", SourcePollingChannelAdapter.class);24 kafkaInboundChannelAdapter.start();25 26 kafkaConsumerContext = appContext.getBean("consumerContext", KafkaConsumerContext.class);27 }28 29 public void cleanUp() throws Exception30 {31 if (kafkaInboundChannelAdapter != null)32 {33 kafkaInboundChannelAdapter.stop();34 }35 36 Thread.sleep(1000);37 38 Metrics.defaultRegistry().shutdown();39 }40 41 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException42 {43 this.appContext = applicationContext;44 }45 46 }