2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
4 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.models.simulators;
24 import java.io.FileNotFoundException;
25 import java.io.IOException;
26 import java.lang.reflect.InvocationTargetException;
27 import java.util.HashMap;
28 import java.util.List;
30 import java.util.Properties;
31 import java.util.concurrent.atomic.AtomicReference;
32 import lombok.AccessLevel;
34 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
35 import org.onap.policy.common.endpoints.event.comm.TopicSink;
36 import org.onap.policy.common.endpoints.event.comm.TopicSource;
37 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
38 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
39 import org.onap.policy.common.endpoints.parameters.TopicParameters;
40 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
41 import org.onap.policy.common.gson.GsonMessageBodyHandler;
42 import org.onap.policy.common.parameters.BeanValidationResult;
43 import org.onap.policy.common.utils.coder.Coder;
44 import org.onap.policy.common.utils.coder.CoderException;
45 import org.onap.policy.common.utils.coder.StandardCoder;
46 import org.onap.policy.common.utils.network.NetworkUtil;
47 import org.onap.policy.common.utils.resources.ResourceUtils;
48 import org.onap.policy.common.utils.services.ServiceManagerContainer;
49 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
50 import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
51 import org.onap.policy.models.sim.dmaap.rest.CambriaMessageBodyHandler;
52 import org.onap.policy.models.sim.dmaap.rest.TextMessageBodyHandler;
53 import org.onap.policy.simulators.CdsSimulator;
54 import org.onap.policy.simulators.TopicServer;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
59 * This class runs all simulators specified in the parameter file.
61 public class Main extends ServiceManagerContainer {
62 private static final Logger logger = LoggerFactory.getLogger(Main.class);
64 private static final String CANNOT_CONNECT = "cannot connect to port ";
66 @Getter(AccessLevel.PROTECTED)
67 private static Main instance;
71 * Runs the simulators.
73 * @param paramFile parameter file name
75 public Main(String paramFile) {
76 super(Main.class.getPackage().getName());
78 SimulatorParameters params = readParameters(paramFile);
79 BeanValidationResult result = params.validate("simulators");
80 if (!result.isValid()) {
81 logger.error("invalid parameters:\n{}", result.getResult());
82 throw new IllegalArgumentException("invalid simulator parameters");
85 DmaapSimParameterGroup dmaapProv = params.getDmaapProvider();
86 String dmaapName = (dmaapProv != null ? dmaapProv.getName() : null);
89 if (dmaapProv != null) {
90 String provName = dmaapName.replace("simulator", "provider");
91 AtomicReference<DmaapSimProvider> provRef = new AtomicReference<>();
92 addAction(provName, () -> provRef.set(buildDmaapProvider(dmaapProv)), () -> provRef.get().shutdown());
95 CdsServerParameters cdsServer = params.getGrpcServer();
98 if (cdsServer != null) {
99 AtomicReference<CdsSimulator> cdsSim = new AtomicReference<>();
100 addAction(cdsServer.getName(), () -> cdsSim.set(buildCdsSimulator(cdsServer)), () -> cdsSim.get().stop());
103 // REST server simulators
105 for (ClassRestServerParameters restsim : params.getRestServers()) {
106 AtomicReference<HttpServletServer> ref = new AtomicReference<>();
107 addAction(restsim.getName(),
108 () -> ref.set(buildRestServer(dmaapName, restsim)),
109 () -> ref.get().shutdown());
112 // NOTE: topics must be started AFTER the (dmaap) rest servers
115 Map<String, TopicSink> sinks = new HashMap<>();
116 for (TopicParameters topicParams : params.getTopicSinks()) {
117 String topic = topicParams.getTopic();
118 addAction("Sink " + topic,
119 () -> sinks.put(topic, startSink(topicParams)),
120 () -> sinks.get(topic).shutdown());
124 Map<String, TopicSource> sources = new HashMap<>();
125 for (TopicParameters topicParams : params.getTopicSources()) {
126 String topic = topicParams.getTopic();
127 addAction("Source " + topic,
128 () -> sources.put(topic, startSource(topicParams)),
129 () -> sources.get(topic).shutdown());
132 // topic server simulators
133 for (TopicServerParameters topicsim : params.getTopicServers()) {
134 AtomicReference<TopicServer<?>> ref = new AtomicReference<>();
135 addAction(topicsim.getName(),
136 () -> ref.set(buildTopicServer(topicsim, sinks, sources)),
137 () -> ref.get().shutdown());
143 * The main method. The arguments are validated, thus adding the NOSONAR.
145 * @param args the arguments, the first of which is the name of the parameter file
147 public static void main(final String[] args) { // NOSONAR
149 * Only one argument is used and is validated implicitly by the constructor (i.e.,
150 * file-not-found), thus sonar is disabled.
154 if (args.length != 1) {
155 throw new IllegalArgumentException("arg(s): parameter-file-name");
158 instance = new Main(args[0]);
161 } catch (RuntimeException e) {
162 logger.error("failed to start simulators", e);
166 private SimulatorParameters readParameters(String paramFile) {
168 var paramsJson = getResourceAsString(paramFile);
169 if (paramsJson == null) {
170 throw new IllegalArgumentException(new FileNotFoundException(paramFile));
173 String hostName = NetworkUtil.getHostname();
174 logger.info("replacing 'HOST_NAME' with {} in {}", hostName, paramFile);
176 paramsJson = paramsJson.replace("${HOST_NAME}", hostName);
178 return makeCoder().decode(paramsJson, SimulatorParameters.class);
180 } catch (CoderException e) {
181 throw new IllegalArgumentException("cannot decode " + paramFile, e);
185 private DmaapSimProvider buildDmaapProvider(DmaapSimParameterGroup params) {
186 var prov = new DmaapSimProvider(params);
187 DmaapSimProvider.setInstance(prov);
192 private CdsSimulator buildCdsSimulator(CdsServerParameters params) throws IOException {
193 var cdsSimulator = new CdsSimulator(params.getHost(), params.getPort(), params.getResourceLocation(),
194 params.getSuccessRepeatCount(), params.getRequestedResponseDelayMs());
195 cdsSimulator.start();
200 private TopicSink startSink(TopicParameters params) {
201 TopicSink sink = TopicEndpointManager.getManager().addTopicSinks(List.of(params)).get(0);
206 private TopicSource startSource(TopicParameters params) {
207 TopicSource source = TopicEndpointManager.getManager().addTopicSources(List.of(params)).get(0);
212 private HttpServletServer buildRestServer(String dmaapName, ClassRestServerParameters params) {
214 var props = getServerProperties(dmaapName, params);
215 HttpServletServer testServer = makeServer(props);
216 testServer.waitedStart(5000);
218 String svcpfx = PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + params.getName();
219 String hostName = props.getProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX);
221 if (!isTcpPortOpen(hostName, testServer.getPort())) {
222 throw new IllegalStateException(CANNOT_CONNECT + testServer.getPort());
227 } catch (InterruptedException e) {
228 Thread.currentThread().interrupt();
229 throw new IllegalStateException("interrupted while building " + params.getName(), e);
233 private TopicServer<?> buildTopicServer(TopicServerParameters params, Map<String, TopicSink> sinks,
234 Map<String, TopicSource> sources) {
236 // find the desired sink
237 TopicSink sink = sinks.get(params.getSink());
239 throw new IllegalArgumentException("invalid sink topic " + params.getSink());
242 // find the desired source
243 TopicSource source = sources.get(params.getSource());
244 if (source == null) {
245 throw new IllegalArgumentException("invalid source topic " + params.getSource());
248 // create the topic server
249 return (TopicServer<?>) Class.forName(params.getProviderClass())
250 .getDeclaredConstructor(TopicSink.class, TopicSource.class).newInstance(sink, source);
252 } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException
253 | SecurityException | ClassNotFoundException e) {
254 throw new IllegalArgumentException("cannot create TopicServer: " + params.getName(), e);
259 * Creates a set of properties, suitable for building a REST server, from the
262 * @param params parameters from which to build the properties
263 * @return a set of properties representing the given parameters
265 private static Properties getServerProperties(String dmaapName, ClassRestServerParameters params) {
266 final var props = new Properties();
267 props.setProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, params.getName());
269 final String svcpfx = PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + params.getName();
271 props.setProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, params.getName());
272 props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, params.getHost());
273 props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX,
274 Integer.toString(params.getPort()));
275 props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX,
276 Boolean.toString(params.isHttps()));
277 props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
278 params.getProviderClass());
279 props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
280 props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "false");
281 props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "true");
283 if (dmaapName != null && dmaapName.equals(params.getName())) {
284 props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER,
285 String.join(",", CambriaMessageBodyHandler.class.getName(),
286 GsonMessageBodyHandler.class.getName(),
287 TextMessageBodyHandler.class.getName()));
289 props.setProperty(svcpfx + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER, String.join(",",
290 GsonMessageBodyHandler.class.getName(), TextMessageBodyHandler.class.getName()));
296 // the following methods may be overridden by junit tests
298 protected String getResourceAsString(String resourceName) {
299 return ResourceUtils.getResourceAsString(resourceName);
302 protected Coder makeCoder() {
303 return new StandardCoder();
306 protected HttpServletServer makeServer(Properties props) {
307 return HttpServletServerFactoryInstance.getServerFactory().build(props).get(0);
310 protected boolean isTcpPortOpen(String hostName, int port) throws InterruptedException {
311 return NetworkUtil.isTcpPortOpen(hostName, port, 100, 200L);