import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.lcm.v1.Event
import java.time.Duration
-@SpringBootTest(classes = [EventsPublisher, ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
@Testcontainers
@DirtiesContext
class LcmEventsPublisherSpec extends MessagingBaseSpec {
- def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+ def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
def testTopic = 'ncmp-events-test'
@SpringBean
- EventsPublisher<LcmEvent> lcmEventsPublisher = new EventsPublisher(kafkaTemplate)
+ EventsPublisher<LcmEvent> lcmEventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@Autowired
JsonObjectMapper jsonObjectMapper
eventSchema : eventSchema,
eventSchemaVersion: eventSchemaVersion]
and: 'consumer has a subscription'
- kafkaConsumer.subscribe([testTopic] as List<String>)
+ legacyEventKafkaConsumer.subscribe([testTopic] as List<String>)
when: 'an event is published'
lcmEventsPublisher.publishEvent(testTopic, eventKey, eventHeader, eventData)
and: 'topic is polled'
- def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+ def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
assert records.size() == 1
and: 'record key matches the expected event key'