Merge "Exception not propagated by processResponse"
[policy/models.git] / models-sim / policy-models-simulators / src / main / java / org / onap / policy / models / simulators / Main.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.models.simulators;
22
23 import java.io.FileNotFoundException;
24 import java.lang.reflect.InvocationTargetException;
25 import java.util.List;
26 import java.util.Properties;
27 import java.util.concurrent.atomic.AtomicReference;
28 import lombok.AccessLevel;
29 import lombok.Getter;
30 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
31 import org.onap.policy.common.endpoints.event.comm.TopicSink;
32 import org.onap.policy.common.endpoints.event.comm.TopicSource;
33 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
34 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
35 import org.onap.policy.common.endpoints.parameters.TopicParameters;
36 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
37 import org.onap.policy.common.gson.GsonMessageBodyHandler;
38 import org.onap.policy.common.parameters.BeanValidationResult;
39 import org.onap.policy.common.utils.coder.Coder;
40 import org.onap.policy.common.utils.coder.CoderException;
41 import org.onap.policy.common.utils.coder.StandardCoder;
42 import org.onap.policy.common.utils.network.NetworkUtil;
43 import org.onap.policy.common.utils.resources.ResourceUtils;
44 import org.onap.policy.common.utils.services.ServiceManagerContainer;
45 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
46 import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
47 import org.onap.policy.models.sim.dmaap.rest.CambriaMessageBodyHandler;
48 import org.onap.policy.models.sim.dmaap.rest.TextMessageBodyHandler;
49 import org.onap.policy.simulators.TopicServer;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * This class runs all simulators specified in the parameter file.
55  */
56 public class Main extends ServiceManagerContainer {
57     private static final Logger logger = LoggerFactory.getLogger(Main.class);
58
59     private static final String CANNOT_CONNECT = "cannot connect to port ";
60
61     @Getter(AccessLevel.PROTECTED)
62     private static Main instance;
63
64
65     /**
66      * Runs the simulators.
67      *
68      * @param paramFile parameter file name
69      */
70     public Main(String paramFile) {
71         super(Main.class.getPackage().getName());
72
73         SimulatorParameters params = readParameters(paramFile);
74         BeanValidationResult result = params.validate("simulators");
75         if (!result.isValid()) {
76             logger.error("invalid parameters:\n{}", result.getResult());
77             throw new IllegalArgumentException("invalid simulator parameters");
78         }
79
80         DmaapSimParameterGroup dmaapProv = params.getDmaapProvider();
81         String dmaapName = dmaapProv.getName();
82         String provName = dmaapName.replace("simulator", "provider");
83
84         // dmaap provider
85         AtomicReference<DmaapSimProvider> provRef = new AtomicReference<>();
86         addAction(provName, () -> provRef.set(buildDmaapProvider(dmaapProv)), () -> provRef.get().shutdown());
87
88         // @formatter:off
89
90         // REST server simulators
91         for (ClassRestServerParameters restsim : params.getRestServers()) {
92             AtomicReference<HttpServletServer> ref = new AtomicReference<>();
93             addAction(restsim.getName(),
94                 () -> ref.set(buildRestServer(dmaapName, restsim)),
95                 () -> ref.get().shutdown());
96         }
97
98         // NOTE: topics must be started AFTER the (dmaap) rest servers
99
100         // topic sinks
101         AtomicReference<List<TopicSink>> sinkRef = new AtomicReference<>();
102         addAction("topic sinks", () -> sinkRef.set(buildSinks(params.getTopicSinks())),
103             () -> shutdownSinks(sinkRef.get()));
104
105         // topic sources
106         AtomicReference<List<TopicSource>> sourceRef = new AtomicReference<>();
107         addAction("topic sources", () -> sourceRef.set(buildSources(params.getTopicSources())),
108             () -> shutdownSources(sourceRef.get()));
109
110         // topic server simulators
111         for (TopicServerParameters topicsim : params.getTopicServers()) {
112             AtomicReference<TopicServer<?>> ref = new AtomicReference<>();
113             addAction(topicsim.getName(),
114                 () -> ref.set(buildTopicServer(topicsim, sinkRef.get(), sourceRef.get())),
115                 () -> ref.get().shutdown());
116         }
117
118         // @formatter:on
119     }
120
121     /**
122      * The main method.
123      *
124      * @param args the arguments, the first of which is the name of the parameter file
125      */
126     public static void main(final String[] args) {
127         try {
128             if (args.length != 1) {
129                 throw new IllegalArgumentException("arg(s): parameter-file-name");
130             }
131
132             instance = new Main(args[0]);
133             instance.start();
134
135         } catch (RuntimeException e) {
136             logger.error("failed to start simulators", e);
137         }
138     }
139
140     private SimulatorParameters readParameters(String paramFile) {
141         try {
142             String paramsJson = getResourceAsString(paramFile);
143             if (paramsJson == null) {
144                 throw new IllegalArgumentException(new FileNotFoundException(paramFile));
145             }
146
147             String hostName = NetworkUtil.getHostname();
148             logger.info("replacing 'HOST_NAME' with {} in {}", hostName, paramFile);
149
150             paramsJson = paramsJson.replace("${HOST_NAME}", hostName);
151
152             return makeCoder().decode(paramsJson, SimulatorParameters.class);
153
154         } catch (CoderException e) {
155             throw new IllegalArgumentException("cannot decode " + paramFile, e);
156         }
157     }
158
159     private DmaapSimProvider buildDmaapProvider(DmaapSimParameterGroup params) {
160         DmaapSimProvider prov = new DmaapSimProvider(params);
161         DmaapSimProvider.setInstance(prov);
162         prov.start();
163
164         return prov;
165     }
166
167     protected List<TopicSink> buildSinks(List<TopicParameters> params) {
168         return TopicEndpointManager.getManager().addTopicSinks(params);
169     }
170
171     private void shutdownSinks(List<TopicSink> sinks) {
172         sinks.forEach(TopicSink::shutdown);
173     }
174
175     protected List<TopicSource> buildSources(List<TopicParameters> params) {
176         return TopicEndpointManager.getManager().addTopicSources(params);
177     }
178
179     private void shutdownSources(List<TopicSource> sources) {
180         sources.forEach(TopicSource::shutdown);
181     }
182
183     private HttpServletServer buildRestServer(String dmaapName, ClassRestServerParameters params) {
184         try {
185             Properties props = getServerProperties(dmaapName, params);
186             HttpServletServer testServer = makeServer(props);
187             testServer.waitedStart(5000);
188
189             String svcpfx = PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + params.getName();
190             String hostName = props.getProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX);
191
192             if (!isTcpPortOpen(hostName, testServer.getPort())) {
193                 throw new IllegalStateException(CANNOT_CONNECT + testServer.getPort());
194             }
195
196             return testServer;
197
198         } catch (InterruptedException e) {
199             Thread.currentThread().interrupt();
200             throw new IllegalStateException("interrupted while building " + params.getName(), e);
201         }
202     }
203
204     private TopicServer<?> buildTopicServer(TopicServerParameters params, List<TopicSink> sinks,
205                     List<TopicSource> sources) {
206         try {
207             // find the desired sink
208             TopicSink sink = sinks.stream().filter(sink2 -> sink2.getTopic().equals(params.getSink())).findAny()
209                             .orElseThrow(() -> new IllegalArgumentException("invalid sink topic " + params.getSink()));
210
211             // find the desired source
212             TopicSource source = sources.stream().filter(source2 -> source2.getTopic().equals(params.getSource()))
213                             .findAny().orElseThrow(() -> new IllegalArgumentException(
214                                             "invalid source topic " + params.getSource()));
215
216             // create the topic server
217             return (TopicServer<?>) Class.forName(params.getProviderClass())
218                             .getDeclaredConstructor(TopicSink.class, TopicSource.class).newInstance(sink, source);
219
220         } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException
221                         | SecurityException | ClassNotFoundException e) {
222             throw new IllegalArgumentException("cannot create TopicServer: " + params.getName(), e);
223         }
224     }
225
226     /**
227      * Creates a set of properties, suitable for building a REST server, from the
228      * parameters.
229      *
230      * @param params parameters from which to build the properties
231      * @return a set of properties representing the given parameters
232      */
233     private static Properties getServerProperties(String dmaapName, ClassRestServerParameters params) {
234         final Properties props = new Properties();
235         props.setProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, params.getName());
236
237         final String svcpfx = PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + params.getName();
238
239         props.setProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, params.getName());
240         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, params.getHost());
241         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX,
242                         Integer.toString(params.getPort()));
243         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX,
244                         Boolean.toString(params.isHttps()));
245         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
246                         params.getProviderClass());
247         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
248         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "false");
249         props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "true");
250
251         if (dmaapName.equals(params.getName())) {
252             props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER,
253                             String.join(",", CambriaMessageBodyHandler.class.getName(),
254                                             GsonMessageBodyHandler.class.getName(),
255                                             TextMessageBodyHandler.class.getName()));
256         } else {
257             props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER, String.join(",",
258                             GsonMessageBodyHandler.class.getName(), TextMessageBodyHandler.class.getName()));
259         }
260
261         return props;
262     }
263
264     // the following methods may be overridden by junit tests
265
266     protected String getResourceAsString(String resourceName) {
267         return ResourceUtils.getResourceAsString(resourceName);
268     }
269
270     protected Coder makeCoder() {
271         return new StandardCoder();
272     }
273
274     protected HttpServletServer makeServer(Properties props) {
275         return HttpServletServerFactoryInstance.getServerFactory().build(props).get(0);
276     }
277
278     protected boolean isTcpPortOpen(String hostName, int port) throws InterruptedException {
279         return NetworkUtil.isTcpPortOpen(hostName, port, 100, 200L);
280     }
281 }