APEX standalone support for ToscaPolicy format
[policy/apex-pdp.git] / testsuites / integration / integration-uservice-test / src / test / java / org / onap / policy / apex / testsuites / integration / uservice / adapt / kafka / TestKafka2Kafka.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2020 Nordix Foundation.
5  *  Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
24
25 import static org.awaitility.Awaitility.await;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.fail;
28
29 import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
30 import java.io.File;
31 import java.io.IOException;
32 import java.util.concurrent.TimeUnit;
33 import org.junit.Before;
34 import org.junit.ClassRule;
35 import org.junit.Test;
36 import org.onap.policy.apex.service.engine.main.ApexMain;
37 import org.onap.policy.common.utils.resources.TextFileUtils;
38
39 /**
40  * The Class TestKafka2Kafka tests Kafka event sending and reception.
41  */
42 public class TestKafka2Kafka {
43     private static final long MAX_TEST_LENGTH = 300000;
44
45     private static final int EVENT_COUNT = 25;
46     private static final int EVENT_INTERVAL = 20;
47
48     /**
49      * Clear relative file root environment variable.
50      */
51     @Before
52     public void clearRelativeFileRoot() {
53         System.clearProperty("APEX_RELATIVE_FILE_ROOT");
54     }
55
56     @ClassRule
57     public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource()
58         // Start a cluster with 1 brokers.
59         .withBrokers(1)
60         // Disable topic auto-creation.
61         .withBrokerProperty("auto.create.topics.enable", "false");
62
63     /**
64      * Test json kafka events.
65      *
66      * @throws Exception the apex exception
67      */
68     @Test
69     public void testJsonKafkaEvents() throws Exception {
70         final String conditionedConfigFile = getConditionedConfigFile(
71             "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json");
72         final String[] args = {"-rfr", "target", "-p", conditionedConfigFile};
73         testKafkaEvents(args, false, "json");
74     }
75
76     /**
77      * Test XML kafka events.
78      *
79      * @throws Exception the apex exception
80      */
81     @Test
82     public void testXmlKafkaEvents() throws Exception {
83         final String conditionedConfigFile = getConditionedConfigFile(
84             "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json");
85         final String[] args = {"-rfr", "target", "-p", conditionedConfigFile};
86
87         testKafkaEvents(args, true, "xml");
88     }
89
90     /**
91      * Test kafka events.
92      *
93      * @param args the args
94      * @param xmlEvents the xml events
95      * @param topicSuffix the topic suffix
96      * @throws Exception on errors
97      */
98     private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix) throws Exception {
99
100         sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1);
101         sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1);
102
103         final KafkaEventSubscriber subscriber =
104             new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource);
105
106         await().atMost(30, TimeUnit.SECONDS).until(() -> subscriber.isAlive());
107
108         final ApexMain apexMain = new ApexMain(args);
109         await().atMost(10, TimeUnit.SECONDS).until(() -> apexMain.isAlive());
110
111         long initWaitEndTIme = System.currentTimeMillis() + 10000;
112
113         await().atMost(12, TimeUnit.SECONDS).until(() -> initWaitEndTIme < System.currentTimeMillis());
114
115         final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
116             EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
117
118         await().atMost(30, TimeUnit.SECONDS).until(() -> producer.isAlive());
119
120         producer.sendEvents();
121
122         // Wait for the producer to send all its events
123         await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS)
124             .until(() -> producer.getEventsSentCount() >= EVENT_COUNT);
125
126         await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS)
127             .until(() -> subscriber.getEventsReceivedCount() >= EVENT_COUNT);
128
129         apexMain.shutdown();
130         await().atMost(30, TimeUnit.SECONDS).until(() -> !apexMain.isAlive());
131
132         subscriber.shutdown();
133         await().atMost(30, TimeUnit.SECONDS).until(() -> !subscriber.isAlive());
134
135         producer.shutdown();
136         await().atMost(30, TimeUnit.SECONDS).until(() -> !producer.isAlive());
137
138         assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount());
139     }
140
141     private String getConditionedConfigFile(final String configurationFileName) {
142         try {
143             File tempConfigFile = File.createTempFile("Kafka_", ".json");
144             tempConfigFile.deleteOnExit();
145             String configAsString = TextFileUtils.getTextFileAsString(configurationFileName)
146                 .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString());
147             TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile());
148
149             return tempConfigFile.getCanonicalPath();
150         } catch (IOException e) {
151             fail("test should not throw an exception");
152             return null;
153         }
154     }
155 }