topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}\r
avc:\r
subscription-topic: ${NCMP_CM_AVC_SUBSCRIPTION:cm-avc-subscription}\r
+ cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events}\r
lcm:\r
events:\r
topic: ${LCM_EVENTS_TOPIC:ncmp-events}\r
@Mapping(source = "eventSchemaVersion", target = "eventSchemaVersion")
@Mapping(source = "eventSource", target = "eventSource")
@Mapping(source = "eventType", target = "eventType")
+ @Mapping(source = "event", target = "event")
AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent);
@Named("avcEventId")
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.event.model.AvcEvent;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
private final AvcEventMapper avcEventMapper;
+ @Value("${app.ncmp.avc.cm-events-topic}")
+ private String cmEventsTopic;
+
/**
* Sends message to the configured topic with a message key.
*
public void sendMessage(final AvcEvent incomingAvcEvent) {
// generate new event id while keeping other data
final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(incomingAvcEvent);
- log.debug("Forwarding AVC event {} to topic {} ", outgoingAvcEvent.getEventId(), "cm-events");
- kafkaTemplate.send("cm-events", outgoingAvcEvent.getEventId(), outgoingAvcEvent);
+ log.debug("Forwarding AVC event {} to topic {} ", outgoingAvcEvent.getEventId(), cmEventsTopic);
+ kafkaTemplate.send(cmEventsTopic, outgoingAvcEvent.getEventId(), outgoingAvcEvent);
}
}
def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
def 'Consume and forward valid message'() {
- given: 'consumer has a subscription'
- kafkaConsumer.subscribe(['cm-events'] as List<String>)
+ given: 'consumer has a subscription on a topic'
+ def cmEventsTopic = 'cm-events'
+ avcEventProducer.cmEventsTopic = cmEventsTopic
+ kafkaConsumer.subscribe([cmEventsTopic] as List<String>)
and: 'an event is sent'
def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)