Merge "Default should be an object on TOSCA properties"
[policy/models.git] / models-interactions / model-simulators / src / main / java / org / onap / policy / simulators / CdsSimulator.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2020 Nordix Foundation.
4  *  Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
5  *  Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.simulators;
24
25 import com.google.protobuf.InvalidProtocolBufferException;
26 import com.google.protobuf.util.JsonFormat;
27 import io.grpc.Server;
28 import io.grpc.netty.NettyServerBuilder;
29 import io.grpc.stub.StreamObserver;
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.time.Instant;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import lombok.Getter;
36 import org.apache.commons.lang3.StringUtils;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
39 import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
41 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
42 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
43 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput.Builder;
44 import org.onap.policy.common.utils.resources.ResourceUtils;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public class CdsSimulator implements Runnable {
49
50     private static final Logger LOGGER = LoggerFactory.getLogger(CdsSimulator.class);
51
52     @Getter
53     private final int port;
54
55     private final Server server;
56
57     private final String resourceLocation;
58
59     private AtomicInteger countOfEvents = new AtomicInteger(1);
60
61     /**
62      * Constructs the object, but does not start it.
63      *
64      * @param host host name of the server
65      * @param port port of the server
66      */
67     public CdsSimulator(String host, int port) {
68         this(host, port, "org/onap/policy/simulators/cds/", 0, 0);
69     }
70
71     /**
72      * Constructs the object, but does not start it.
73      *
74      * @param host host name of the server
75      * @param port port of the server
76      * @param countOfSuccesfulEvents number of successive successful events
77      * @param requestedResponseDelayMs time for the request to be processed
78      */
79     public CdsSimulator(String host, int port, String resourceLocation, int countOfSuccesfulEvents,
80         long requestedResponseDelayMs) {
81         this.port = port;
82         this.resourceLocation = resourceLocation;
83
84         BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {
85
86             @Override
87             public StreamObserver<ExecutionServiceInput> process(
88                 final StreamObserver<ExecutionServiceOutput> responseObserver) {
89
90                 return new StreamObserver<ExecutionServiceInput>() {
91
92                     @Override
93                     public void onNext(final ExecutionServiceInput executionServiceInput) {
94                         LOGGER.info("Received request input to CDS: {}", executionServiceInput);
95                         try {
96                             Builder builder = getResponse(executionServiceInput, countOfSuccesfulEvents);
97                             TimeUnit.MILLISECONDS.sleep(requestedResponseDelayMs);
98                             responseObserver.onNext(builder.build());
99                         } catch (InvalidProtocolBufferException e) {
100                             throw new SimulatorRuntimeException("Cannot convert ExecutionServiceOutput output", e);
101                         } catch (InterruptedException e) {
102                             Thread.currentThread().interrupt();
103                             throw new SimulatorRuntimeException("Execution Interrupted", e);
104                         }
105                     }
106
107                     @Override
108                     public void onError(final Throwable throwable) {
109                         responseObserver.onError(throwable);
110                     }
111
112                     @Override
113                     public void onCompleted() {
114                         responseObserver.onCompleted();
115                     }
116                 };
117             }
118         };
119
120         server = NettyServerBuilder.forAddress(new InetSocketAddress(host, port)).addService(testCdsBlueprintServerImpl)
121             .build();
122     }
123
124     /**
125      * Start the server.
126      *
127      * @throws IOException IO exception.
128      */
129     public void start() throws IOException {
130         server.start();
131         // The grpc server uses daemon threads by default. Hence the application will exit as soon the main thread
132         // completes. So, wrap the server in a non-daemon thread and call awaitTermination to keep the thread alive
133         // until the server is terminated.
134         new Thread(this).start();
135     }
136
137     /**
138      * Stop the server.
139      */
140     public void stop() {
141         server.shutdown();
142     }
143
144     /**
145      * Constructs the ResponseString on the basis of request.
146      *
147      * @param executionServiceInput service input
148      * @param countOfSuccesfulEvents number of successive successful events
149      * @return builder for ExecutionServiceOutput response
150      * @throws InvalidProtocolBufferException when response string cannot be converted
151      */
152     public Builder getResponse(ExecutionServiceInput executionServiceInput, int countOfSuccesfulEvents)
153         throws InvalidProtocolBufferException {
154         String resourceName = "DefaultResponseEvent";
155         if (!StringUtils.isBlank(executionServiceInput.getActionIdentifiers().getActionName())) {
156             ActionIdentifiers actionIdentifiers = executionServiceInput.getActionIdentifiers();
157             resourceName = actionIdentifiers.getBlueprintName() + "-" + actionIdentifiers.getActionName();
158         }
159         if (countOfSuccesfulEvents > 0 && countOfEvents.getAndIncrement() % countOfSuccesfulEvents == 0) {
160             // generating the failure response
161             resourceName = resourceName + "-error.json";
162         } else {
163             resourceName = resourceName + ".json";
164         }
165         LOGGER.info("Fetching response from {}", resourceName);
166         String responseString = ResourceUtils.getResourceAsString(resourceLocation + resourceName);
167         Builder builder = ExecutionServiceOutput.newBuilder();
168         if (null == responseString) {
169             LOGGER.info("Expected response file {} not found in {}", resourceName, resourceLocation);
170             ActionIdentifiers actionIdentifiers = executionServiceInput.getActionIdentifiers();
171             builder.setCommonHeader(executionServiceInput.getCommonHeader());
172             builder.setActionIdentifiers(actionIdentifiers);
173             builder.setPayload(executionServiceInput.getPayload());
174             builder.setStatus(Status.newBuilder().setCode(500).setMessage("failure")
175                 .setErrorMessage("failed to get  get cba file name(" + actionIdentifiers.getBlueprintName()
176                     + "), version(" + actionIdentifiers.getBlueprintVersion() + ") from db : file check failed.")
177                 .setEventType(EventType.EVENT_COMPONENT_FAILURE).setTimestamp(Instant.now().toString()));
178         } else {
179             LOGGER.debug("Returning response from CDS Simulator: {}", responseString);
180             JsonFormat.parser().ignoringUnknownFields().merge(responseString, builder);
181         }
182         return builder;
183     }
184
185     @Override
186     public void run() {
187         try {
188             server.awaitTermination();
189         } catch (InterruptedException e) {
190             LOGGER.info("gRPC server is terminated");
191             Thread.currentThread().interrupt();
192         }
193     }
194 }