2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2020 Nordix Foundation.
5 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
25 import static org.awaitility.Awaitility.await;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.fail;
29 import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
31 import java.io.IOException;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.Before;
34 import org.junit.ClassRule;
35 import org.junit.Test;
36 import org.onap.policy.apex.service.engine.main.ApexMain;
37 import org.onap.policy.common.utils.resources.TextFileUtils;
40 * The Class TestKafka2Kafka tests Kafka event sending and reception.
42 public class TestKafka2Kafka {
43 private static final long MAX_TEST_LENGTH = 300000;
45 private static final int EVENT_COUNT = 25;
46 private static final int EVENT_INTERVAL = 20;
49 * Clear relative file root environment variable.
52 public void clearRelativeFileRoot() {
53 System.clearProperty("APEX_RELATIVE_FILE_ROOT");
57 public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource()
58 // Start a cluster with 1 brokers.
60 // Disable topic auto-creation.
61 .withBrokerProperty("auto.create.topics.enable", "false");
64 * Test json kafka events.
66 * @throws Exception the apex exception
69 public void testJsonKafkaEvents() throws Exception {
70 final String conditionedConfigFile = getConditionedConfigFile(
71 "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json");
72 final String[] args = {"-rfr", "target", "-p", conditionedConfigFile};
73 testKafkaEvents(args, false, "json");
77 * Test XML kafka events.
79 * @throws Exception the apex exception
82 public void testXmlKafkaEvents() throws Exception {
83 final String conditionedConfigFile = getConditionedConfigFile(
84 "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json");
85 final String[] args = {"-rfr", "target", "-p", conditionedConfigFile};
87 testKafkaEvents(args, true, "xml");
93 * @param args the args
94 * @param xmlEvents the xml events
95 * @param topicSuffix the topic suffix
96 * @throws Exception on errors
98 private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix) throws Exception {
100 sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1);
101 sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1);
103 final KafkaEventSubscriber subscriber =
104 new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource);
106 await().atMost(30, TimeUnit.SECONDS).until(() -> subscriber.isAlive());
108 final ApexMain apexMain = new ApexMain(args);
109 await().atMost(10, TimeUnit.SECONDS).until(() -> apexMain.isAlive());
111 long initWaitEndTIme = System.currentTimeMillis() + 10000;
113 await().atMost(12, TimeUnit.SECONDS).until(() -> initWaitEndTIme < System.currentTimeMillis());
115 final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
116 EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
118 await().atMost(30, TimeUnit.SECONDS).until(() -> producer.isAlive());
120 producer.sendEvents();
122 // Wait for the producer to send all its events
123 await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS)
124 .until(() -> producer.getEventsSentCount() >= EVENT_COUNT);
126 await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS)
127 .until(() -> subscriber.getEventsReceivedCount() >= EVENT_COUNT);
130 await().atMost(30, TimeUnit.SECONDS).until(() -> !apexMain.isAlive());
132 subscriber.shutdown();
133 await().atMost(30, TimeUnit.SECONDS).until(() -> !subscriber.isAlive());
136 await().atMost(30, TimeUnit.SECONDS).until(() -> !producer.isAlive());
138 assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount());
141 private String getConditionedConfigFile(final String configurationFileName) {
143 File tempConfigFile = File.createTempFile("Kafka_", ".json");
144 tempConfigFile.deleteOnExit();
145 String configAsString = TextFileUtils.getTextFileAsString(configurationFileName)
146 .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString());
147 TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile());
149 return tempConfigFile.getCanonicalPath();
150 } catch (IOException e) {
151 fail("test should not throw an exception");