Changes for checkstyle 8.32
[policy/apex-pdp.git] / testsuites / integration / integration-uservice-test / src / test / java / org / onap / policy / apex / testsuites / integration / uservice / adapt / kafka / TestKafka2Kafka.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
23
24 import static org.awaitility.Awaitility.await;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.fail;
27
28 import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
29 import java.io.File;
30 import java.io.IOException;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.Before;
33 import org.junit.ClassRule;
34 import org.junit.Test;
35 import org.onap.policy.apex.service.engine.main.ApexMain;
36 import org.onap.policy.common.utils.resources.TextFileUtils;
37
38 /**
39  * The Class TestKafka2Kafka tests Kafka event sending and reception.
40  */
41 public class TestKafka2Kafka {
42     private static final long MAX_TEST_LENGTH = 300000;
43
44     private static final int EVENT_COUNT = 25;
45     private static final int EVENT_INTERVAL = 20;
46
47     /**
48      * Clear relative file root environment variable.
49      */
50     @Before
51     public void clearRelativeFileRoot() {
52         System.clearProperty("APEX_RELATIVE_FILE_ROOT");
53     }
54
55     @ClassRule
56     public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource()
57         // Start a cluster with 1 brokers.
58         .withBrokers(1)
59         // Disable topic auto-creation.
60         .withBrokerProperty("auto.create.topics.enable", "false");
61
62     /**
63      * Test json kafka events.
64      *
65      * @throws Exception the apex exception
66      */
67     @Test
68     public void testJsonKafkaEvents() throws Exception {
69         final String conditionedConfigFile = getConditionedConfigFile(
70             "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json");
71         final String[] args = {"-rfr", "target", "-c", conditionedConfigFile};
72         testKafkaEvents(args, false, "json");
73     }
74
75     /**
76      * Test XML kafka events.
77      *
78      * @throws Exception the apex exception
79      */
80     @Test
81     public void testXmlKafkaEvents() throws Exception {
82         final String conditionedConfigFile = getConditionedConfigFile(
83             "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json");
84         final String[] args = {"-rfr", "target", "-c", conditionedConfigFile};
85
86         testKafkaEvents(args, true, "xml");
87     }
88
89     /**
90      * Test kafka events.
91      *
92      * @param args the args
93      * @param xmlEvents the xml events
94      * @param topicSuffix the topic suffix
95      * @throws Exception on errors
96      */
97     private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix) throws Exception {
98
99         sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1);
100         sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1);
101
102         final KafkaEventSubscriber subscriber =
103             new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource);
104
105         await().atMost(30, TimeUnit.SECONDS).until(() -> subscriber.isAlive());
106
107         final ApexMain apexMain = new ApexMain(args);
108         await().atMost(10, TimeUnit.SECONDS).until(() -> apexMain.isAlive());
109
110         long initWaitEndTIme = System.currentTimeMillis() + 10000;
111
112         await().atMost(12, TimeUnit.SECONDS).until(() -> initWaitEndTIme < System.currentTimeMillis());
113
114         final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
115             EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
116
117         await().atMost(30, TimeUnit.SECONDS).until(() -> producer.isAlive());
118
119         producer.sendEvents();
120
121         // Wait for the producer to send all its events
122         await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS)
123             .until(() -> producer.getEventsSentCount() >= EVENT_COUNT);
124
125         await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS)
126             .until(() -> subscriber.getEventsReceivedCount() >= EVENT_COUNT);
127
128         apexMain.shutdown();
129         await().atMost(30, TimeUnit.SECONDS).until(() -> !apexMain.isAlive());
130
131         subscriber.shutdown();
132         await().atMost(30, TimeUnit.SECONDS).until(() -> !subscriber.isAlive());
133
134         producer.shutdown();
135         await().atMost(30, TimeUnit.SECONDS).until(() -> !producer.isAlive());
136
137         assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount());
138     }
139
140     private String getConditionedConfigFile(final String configurationFileName) {
141         try {
142             File tempConfigFile = File.createTempFile("Kafka_", ".json");
143             tempConfigFile.deleteOnExit();
144             String configAsString = TextFileUtils.getTextFileAsString(configurationFileName)
145                 .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString());
146             TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile());
147
148             return tempConfigFile.getCanonicalPath();
149         } catch (IOException e) {
150             fail("test should not throw an exception");
151             return null;
152         }
153     }
154 }