7afef538b913248c67c3277ae72d44e7d7ce85cf
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.fail;
25
26 import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
27
28 import java.io.File;
29 import java.io.IOException;
30
31 import org.junit.Before;
32 import org.junit.ClassRule;
33 import org.junit.Test;
34 import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
35 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
36 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
37 import org.onap.policy.apex.model.utilities.TextFileUtils;
38 import org.onap.policy.apex.service.engine.main.ApexMain;
39
40 /**
41  * The Class TestKafka2Kafka tests Kafka event sending and reception.
42  */
43 public class TestKafka2Kafka {
44     private static final long MAX_TEST_LENGTH = 300000;
45
46     private static final int EVENT_COUNT = 25;
47     private static final int EVENT_INTERVAL = 20;
48
49     /**
50      * Clear relative file root environment variable.
51      */
52     @Before
53     public void clearRelativeFileRoot() {
54         System.clearProperty("APEX_RELATIVE_FILE_ROOT");
55     }
56
57     @ClassRule
58     public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource()
59                     // Start a cluster with 1 brokers.
60                     .withBrokers(1)
61                     // Disable topic auto-creation.
62                     .withBrokerProperty("auto.create.topics.enable", "false");
63
64     /**
65      * Test json kafka events.
66      *
67      * @throws MessagingException the messaging exception
68      * @throws ApexException the apex exception
69      */
70     @Test
71     public void testJsonKafkaEvents() throws MessagingException, ApexException {
72         final String conditionedConfigFile = getConditionedConfigFile(
73                         "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json");
74         final String[] args =
75             { "-rfr", "target", "-c", conditionedConfigFile };
76         testKafkaEvents(args, false, "json");
77     }
78
79     /**
80      * Test XML kafka events.
81      *
82      * @throws MessagingException the messaging exception
83      * @throws ApexException the apex exception
84      */
85     @Test
86     public void testXmlKafkaEvents() throws MessagingException, ApexException {
87         final String conditionedConfigFile = getConditionedConfigFile(
88                         "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json");
89         final String[] args =
90             { "-rfr", "target", "-c", conditionedConfigFile };
91
92         testKafkaEvents(args, true, "xml");
93     }
94
95     /**
96      * Test kafka events.
97      *
98      * @param args the args
99      * @param xmlEvents the xml events
100      * @param topicSuffix the topic suffix
101      * @throws MessagingException the messaging exception
102      * @throws ApexException the apex exception
103      */
104     private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix)
105                     throws MessagingException, ApexException {
106
107         sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1);
108         sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1);
109
110         final KafkaEventSubscriber subscriber = new KafkaEventSubscriber("apex-out-" + topicSuffix,
111                         sharedKafkaTestResource);
112
113         final ApexMain apexMain = new ApexMain(args);
114         ThreadUtilities.sleep(3000);
115
116         final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
117                         EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
118
119         producer.sendEvents();
120
121         final long testStartTime = System.currentTimeMillis();
122
123         // Wait for the producer to send all tis events
124         while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
125                         && producer.getEventsSentCount() < EVENT_COUNT) {
126             ThreadUtilities.sleep(EVENT_INTERVAL);
127         }
128
129         while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
130                         && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
131             ThreadUtilities.sleep(EVENT_INTERVAL);
132         }
133
134         ThreadUtilities.sleep(3000);
135
136         apexMain.shutdown();
137         subscriber.shutdown();
138         producer.shutdown();
139
140         assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount());
141     }
142
143     private String getConditionedConfigFile(final String configurationFileName) {
144         try {
145             File tempConfigFile = File.createTempFile("Kafka_", ".json");
146             tempConfigFile.deleteOnExit();
147             String configAsString = TextFileUtils.getTextFileAsString(configurationFileName)
148                             .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString());
149             TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile());
150
151             return tempConfigFile.getCanonicalPath();
152         } catch (IOException e) {
153             fail("test should not throw an exception");
154             return null;
155         }
156     }
157 }