- getRequestForpnfReady = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
- .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
- .port(env.getProperty("pnf.dmaap.port", Integer.class))
- .path(env.getProperty("pnf.dmaap.pnfReadyTopicName")).path(env.getProperty("pnf.dmaap.consumerGroup"))
- .path(env.getProperty("pnf.dmaap.consumerId")).build());
- getRequestForPnfUpdate = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
- .scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
- .port(env.getProperty("pnf.dmaap.port", Integer.class))
- .path(env.getProperty("pnf.dmaap.pnfUpdateTopicName")).path(env.getProperty("pnf.dmaap.consumerGroup"))
- .path(env.getProperty("pnf.dmaap.consumerIdUpdate")).build());
+ try {
+ consumerForPnfReady = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers"));
+ consumerForPnfUpdate = new KafkaConsumerImpl(env.getProperty("pnf.kafka.kafkaBootstrapServers"));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ pnfReadyTopic = env.getProperty("pnf.kafka.pnfReadyTopicName");
+ pnfUpdateTopic = env.getProperty("pnf.kafka.pnfUpdateTopicName");
+ consumerGroup = env.getProperty("pnf.kafka.consumerGroup");
+ consumerId = env.getProperty("pnf.kafka.consumerId");
+ consumerIdUpdate = env.getProperty("pnf.kafka.consumerIdUpdate");