b35b961e48f990a584635ca25b60225a4dbcf89c
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
23
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.fail;
26
27 import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
28
29 import java.io.File;
30 import java.io.IOException;
31
32 import org.junit.Before;
33 import org.junit.ClassRule;
34 import org.junit.Test;
35 import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
36 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
37 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
38 import org.onap.policy.apex.service.engine.main.ApexMain;
39 import org.onap.policy.common.utils.resources.TextFileUtils;
40
41 /**
42  * The Class TestKafka2Kafka tests Kafka event sending and reception.
43  */
44 public class TestKafka2Kafka {
45     private static final long MAX_TEST_LENGTH = 300000;
46
47     private static final int EVENT_COUNT = 25;
48     private static final int EVENT_INTERVAL = 20;
49
50     /**
51      * Clear relative file root environment variable.
52      */
53     @Before
54     public void clearRelativeFileRoot() {
55         System.clearProperty("APEX_RELATIVE_FILE_ROOT");
56     }
57
58     @ClassRule
59     public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource()
60             // Start a cluster with 1 brokers.
61             .withBrokers(1)
62             // Disable topic auto-creation.
63             .withBrokerProperty("auto.create.topics.enable", "false");
64
65     /**
66      * Test json kafka events.
67      *
68      * @throws MessagingException the messaging exception
69      * @throws ApexException the apex exception
70      */
71     @Test
72     public void testJsonKafkaEvents() throws MessagingException, ApexException {
73         final String conditionedConfigFile = getConditionedConfigFile(
74                 "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json");
75         final String[] args = {"-rfr", "target", "-c", conditionedConfigFile};
76         testKafkaEvents(args, false, "json");
77     }
78
79     /**
80      * Test XML kafka events.
81      *
82      * @throws MessagingException the messaging exception
83      * @throws ApexException the apex exception
84      */
85     @Test
86     public void testXmlKafkaEvents() throws MessagingException, ApexException {
87         final String conditionedConfigFile = getConditionedConfigFile(
88                 "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json");
89         final String[] args = {"-rfr", "target", "-c", conditionedConfigFile};
90
91         testKafkaEvents(args, true, "xml");
92     }
93
94     /**
95      * Test kafka events.
96      *
97      * @param args the args
98      * @param xmlEvents the xml events
99      * @param topicSuffix the topic suffix
100      * @throws MessagingException the messaging exception
101      * @throws ApexException the apex exception
102      */
103     private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix)
104             throws MessagingException, ApexException {
105
106         sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1);
107         sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1);
108
109         final KafkaEventSubscriber subscriber =
110                 new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource);
111
112         final ApexMain apexMain = new ApexMain(args);
113         ThreadUtilities.sleep(3000);
114
115         final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
116                 EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
117
118         producer.sendEvents();
119
120         final long testStartTime = System.currentTimeMillis();
121
122         // Wait for the producer to send all tis events
123         while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
124                 && producer.getEventsSentCount() < EVENT_COUNT) {
125             ThreadUtilities.sleep(EVENT_INTERVAL);
126         }
127
128         while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
129                 && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
130             ThreadUtilities.sleep(EVENT_INTERVAL);
131         }
132
133         ThreadUtilities.sleep(3000);
134
135         apexMain.shutdown();
136         subscriber.shutdown();
137         producer.shutdown();
138
139         assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount());
140     }
141
142     private String getConditionedConfigFile(final String configurationFileName) {
143         try {
144             File tempConfigFile = File.createTempFile("Kafka_", ".json");
145             tempConfigFile.deleteOnExit();
146             String configAsString = TextFileUtils.getTextFileAsString(configurationFileName)
147                     .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString());
148             TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile());
149
150             return tempConfigFile.getCanonicalPath();
151         } catch (IOException e) {
152             fail("test should not throw an exception");
153             return null;
154         }
155     }
156 }