2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.fail;
26 import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
29 import java.io.IOException;
31 import org.junit.Before;
32 import org.junit.ClassRule;
33 import org.junit.Test;
34 import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
35 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
36 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
37 import org.onap.policy.apex.model.utilities.TextFileUtils;
38 import org.onap.policy.apex.service.engine.main.ApexMain;
41 * The Class TestKafka2Kafka tests Kafka event sending and reception.
43 public class TestKafka2Kafka {
44 private static final long MAX_TEST_LENGTH = 300000;
46 private static final int EVENT_COUNT = 25;
47 private static final int EVENT_INTERVAL = 20;
50 * Clear relative file root environment variable.
53 public void clearRelativeFileRoot() {
54 System.clearProperty("APEX_RELATIVE_FILE_ROOT");
58 public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource()
59 // Start a cluster with 1 brokers.
61 // Disable topic auto-creation.
62 .withBrokerProperty("auto.create.topics.enable", "false");
65 * Test json kafka events.
67 * @throws MessagingException the messaging exception
68 * @throws ApexException the apex exception
71 public void testJsonKafkaEvents() throws MessagingException, ApexException {
72 final String conditionedConfigFile = getConditionedConfigFile(
73 "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json");
75 { "-rfr", "target", "-c", conditionedConfigFile };
76 testKafkaEvents(args, false, "json");
80 * Test XML kafka events.
82 * @throws MessagingException the messaging exception
83 * @throws ApexException the apex exception
86 public void testXmlKafkaEvents() throws MessagingException, ApexException {
87 final String conditionedConfigFile = getConditionedConfigFile(
88 "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json");
90 { "-rfr", "target", "-c", conditionedConfigFile };
92 testKafkaEvents(args, true, "xml");
98 * @param args the args
99 * @param xmlEvents the xml events
100 * @param topicSuffix the topic suffix
101 * @throws MessagingException the messaging exception
102 * @throws ApexException the apex exception
104 private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix)
105 throws MessagingException, ApexException {
107 sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1);
108 sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1);
110 final KafkaEventSubscriber subscriber = new KafkaEventSubscriber("apex-out-" + topicSuffix,
111 sharedKafkaTestResource);
113 final ApexMain apexMain = new ApexMain(args);
114 ThreadUtilities.sleep(3000);
116 final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
117 EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
119 producer.sendEvents();
121 final long testStartTime = System.currentTimeMillis();
123 // Wait for the producer to send all tis events
124 while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
125 && producer.getEventsSentCount() < EVENT_COUNT) {
126 ThreadUtilities.sleep(EVENT_INTERVAL);
129 while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
130 && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
131 ThreadUtilities.sleep(EVENT_INTERVAL);
134 ThreadUtilities.sleep(3000);
137 subscriber.shutdown();
140 assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount());
143 private String getConditionedConfigFile(final String configurationFileName) {
145 File tempConfigFile = File.createTempFile("Kafka_", ".json");
146 tempConfigFile.deleteOnExit();
147 String configAsString = TextFileUtils.getTextFileAsString(configurationFileName)
148 .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString());
149 TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile());
151 return tempConfigFile.getCanonicalPath();
152 } catch (IOException e) {
153 fail("test should not throw an exception");