Enhance gRPC Simulator:
[policy/models.git] / models-sim / policy-models-simulators / src / main / java / org / onap / policy / models / simulators / Main.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
4  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.models.simulators;
23
24 import java.io.FileNotFoundException;
25 import java.io.IOException;
26 import java.lang.reflect.InvocationTargetException;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Properties;
31 import java.util.concurrent.atomic.AtomicReference;
32 import lombok.AccessLevel;
33 import lombok.Getter;
34 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
35 import org.onap.policy.common.endpoints.event.comm.TopicSink;
36 import org.onap.policy.common.endpoints.event.comm.TopicSource;
37 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
38 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
39 import org.onap.policy.common.endpoints.parameters.TopicParameters;
40 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
41 import org.onap.policy.common.gson.GsonMessageBodyHandler;
42 import org.onap.policy.common.parameters.BeanValidationResult;
43 import org.onap.policy.common.utils.coder.Coder;
44 import org.onap.policy.common.utils.coder.CoderException;
45 import org.onap.policy.common.utils.coder.StandardCoder;
46 import org.onap.policy.common.utils.network.NetworkUtil;
47 import org.onap.policy.common.utils.resources.ResourceUtils;
48 import org.onap.policy.common.utils.services.ServiceManagerContainer;
49 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
50 import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
51 import org.onap.policy.models.sim.dmaap.rest.CambriaMessageBodyHandler;
52 import org.onap.policy.models.sim.dmaap.rest.TextMessageBodyHandler;
53 import org.onap.policy.simulators.CdsSimulator;
54 import org.onap.policy.simulators.TopicServer;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  * This class runs all simulators specified in the parameter file.
60  */
61 public class Main extends ServiceManagerContainer {
62     private static final Logger logger = LoggerFactory.getLogger(Main.class);
63
64     private static final String CANNOT_CONNECT = "cannot connect to port ";
65
66     @Getter(AccessLevel.PROTECTED)
67     private static Main instance;
68
69
70     /**
71      * Runs the simulators.
72      *
73      * @param paramFile parameter file name
74      */
75     public Main(String paramFile) {
76         super(Main.class.getPackage().getName());
77
78         SimulatorParameters params = readParameters(paramFile);
79         BeanValidationResult result = params.validate("simulators");
80         if (!result.isValid()) {
81             logger.error("invalid parameters:\n{}", result.getResult());
82             throw new IllegalArgumentException("invalid simulator parameters");
83         }
84
85         DmaapSimParameterGroup dmaapProv = params.getDmaapProvider();
86         String dmaapName = dmaapProv.getName();
87         String provName = dmaapName.replace("simulator", "provider");
88
89         // dmaap provider
90         AtomicReference<DmaapSimProvider> provRef = new AtomicReference<>();
91         addAction(provName, () -> provRef.set(buildDmaapProvider(dmaapProv)), () -> provRef.get().shutdown());
92
93         CdsServerParameters cdsServer = params.getGrpcServer();
94
95         // Cds Simulator
96         AtomicReference<CdsSimulator> cdsSim = new AtomicReference<>();
97         addAction(cdsServer.getName(), () -> cdsSim.set(buildCdsSimulator(cdsServer)), () -> cdsSim.get().stop());
98
99         // REST server simulators
100         // @formatter:off
101         for (ClassRestServerParameters restsim : params.getRestServers()) {
102             AtomicReference<HttpServletServer> ref = new AtomicReference<>();
103             addAction(restsim.getName(),
104                 () -> ref.set(buildRestServer(dmaapName, restsim)),
105                 () -> ref.get().shutdown());
106         }
107
108         // NOTE: topics must be started AFTER the (dmaap) rest servers
109
110         // topic sinks
111         Map<String, TopicSink> sinks = new HashMap<>();
112         for (TopicParameters topicParams : params.getTopicSinks()) {
113             String topic = topicParams.getTopic();
114             addAction("Sink " + topic,
115                 () -> sinks.put(topic, startSink(topicParams)),
116                 () -> sinks.get(topic).shutdown());
117         }
118
119         // topic sources
120         Map<String, TopicSource> sources = new HashMap<>();
121         for (TopicParameters topicParams : params.getTopicSources()) {
122             String topic = topicParams.getTopic();
123             addAction("Source " + topic,
124                 () -> sources.put(topic, startSource(topicParams)),
125                 () -> sources.get(topic).shutdown());
126         }
127
128         // topic server simulators
129         for (TopicServerParameters topicsim : params.getTopicServers()) {
130             AtomicReference<TopicServer<?>> ref = new AtomicReference<>();
131             addAction(topicsim.getName(),
132                 () -> ref.set(buildTopicServer(topicsim, sinks, sources)),
133                 () -> ref.get().shutdown());
134         }
135         // @formatter:on
136     }
137
138     /**
139      * The main method. The arguments are validated, thus adding the NOSONAR.
140      *
141      * @param args the arguments, the first of which is the name of the parameter file
142      */
143     public static void main(final String[] args) { // NOSONAR
144         /*
145          * Only one argument is used and is validated implicitly by the constructor (i.e.,
146          * file-not-found), thus sonar is disabled.
147          */
148
149         try {
150             if (args.length != 1) {
151                 throw new IllegalArgumentException("arg(s): parameter-file-name");
152             }
153
154             instance = new Main(args[0]);
155             instance.start();
156
157         } catch (RuntimeException e) {
158             logger.error("failed to start simulators", e);
159         }
160     }
161
162     private SimulatorParameters readParameters(String paramFile) {
163         try {
164             String paramsJson = getResourceAsString(paramFile);
165             if (paramsJson == null) {
166                 throw new IllegalArgumentException(new FileNotFoundException(paramFile));
167             }
168
169             String hostName = NetworkUtil.getHostname();
170             logger.info("replacing 'HOST_NAME' with {} in {}", hostName, paramFile);
171
172             paramsJson = paramsJson.replace("${HOST_NAME}", hostName);
173
174             return makeCoder().decode(paramsJson, SimulatorParameters.class);
175
176         } catch (CoderException e) {
177             throw new IllegalArgumentException("cannot decode " + paramFile, e);
178         }
179     }
180
181     private DmaapSimProvider buildDmaapProvider(DmaapSimParameterGroup params) {
182         DmaapSimProvider prov = new DmaapSimProvider(params);
183         DmaapSimProvider.setInstance(prov);
184         prov.start();
185         return prov;
186     }
187
188     private CdsSimulator buildCdsSimulator(CdsServerParameters params) throws IOException {
189         CdsSimulator cdsSimulator = new CdsSimulator(params.getHost(), params.getPort(), params.getResourceLocation(),
190             params.getSuccessRepeatCount(), params.getRequestedResponseDelayMs());
191         cdsSimulator.start();
192         return cdsSimulator;
193     }
194
195
196     private TopicSink startSink(TopicParameters params) {
197         TopicSink sink = TopicEndpointManager.getManager().addTopicSinks(List.of(params)).get(0);
198         sink.start();
199         return sink;
200     }
201
202     private TopicSource startSource(TopicParameters params) {
203         TopicSource source = TopicEndpointManager.getManager().addTopicSources(List.of(params)).get(0);
204         source.start();
205         return source;
206     }
207
208     private HttpServletServer buildRestServer(String dmaapName, ClassRestServerParameters params) {
209         try {
210             Properties props = getServerProperties(dmaapName, params);
211             HttpServletServer testServer = makeServer(props);
212             testServer.waitedStart(5000);
213
214             String svcpfx = PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + params.getName();
215             String hostName = props.getProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX);
216
217             if (!isTcpPortOpen(hostName, testServer.getPort())) {
218                 throw new IllegalStateException(CANNOT_CONNECT + testServer.getPort());
219             }
220
221             return testServer;
222
223         } catch (InterruptedException e) {
224             Thread.currentThread().interrupt();
225             throw new IllegalStateException("interrupted while building " + params.getName(), e);
226         }
227     }
228
229     private TopicServer<?> buildTopicServer(TopicServerParameters params, Map<String, TopicSink> sinks,
230                     Map<String, TopicSource> sources) {
231         try {
232             // find the desired sink
233             TopicSink sink = sinks.get(params.getSink());
234             if (sink == null) {
235                 throw new IllegalArgumentException("invalid sink topic " + params.getSink());
236             }
237
238             // find the desired source
239             TopicSource source = sources.get(params.getSource());
240             if (source == null) {
241                 throw new IllegalArgumentException("invalid source topic " + params.getSource());
242             }
243
244             // create the topic server
245             return (TopicServer<?>) Class.forName(params.getProviderClass())
246                             .getDeclaredConstructor(TopicSink.class, TopicSource.class).newInstance(sink, source);
247
248         } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException
249                         | SecurityException | ClassNotFoundException e) {
250             throw new IllegalArgumentException("cannot create TopicServer: " + params.getName(), e);
251         }
252     }
253
254     /**
255      * Creates a set of properties, suitable for building a REST server, from the
256      * parameters.
257      *
258      * @param params parameters from which to build the properties
259      * @return a set of properties representing the given parameters
260      */
261     private static Properties getServerProperties(String dmaapName, ClassRestServerParameters params) {
262         final Properties props = new Properties();
263         props.setProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, params.getName());
264
265         final String svcpfx = PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + params.getName();
266
267         props.setProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, params.getName());
268         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, params.getHost());
269         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX,
270                         Integer.toString(params.getPort()));
271         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX,
272                         Boolean.toString(params.isHttps()));
273         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
274                         params.getProviderClass());
275         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
276         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "false");
277         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "true");
278
279         if (dmaapName.equals(params.getName())) {
280             props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER,
281                             String.join(",", CambriaMessageBodyHandler.class.getName(),
282                                             GsonMessageBodyHandler.class.getName(),
283                                             TextMessageBodyHandler.class.getName()));
284         } else {
285             props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER, String.join(",",
286                             GsonMessageBodyHandler.class.getName(), TextMessageBodyHandler.class.getName()));
287         }
288
289         return props;
290     }
291
292     // the following methods may be overridden by junit tests
293
294     protected String getResourceAsString(String resourceName) {
295         return ResourceUtils.getResourceAsString(resourceName);
296     }
297
298     protected Coder makeCoder() {
299         return new StandardCoder();
300     }
301
302     protected HttpServletServer makeServer(Properties props) {
303         return HttpServletServerFactoryInstance.getServerFactory().build(props).get(0);
304     }
305
306     protected boolean isTcpPortOpen(String hostName, int port) throws InterruptedException {
307         return NetworkUtil.isTcpPortOpen(hostName, port, 100, 200L);
308     }
309 }