import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class CdsSimulator {
+public class CdsSimulator implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(CdsSimulator.class);
@Override
public StreamObserver<ExecutionServiceInput> process(
- final StreamObserver<ExecutionServiceOutput> responseObserver) {
+ final StreamObserver<ExecutionServiceOutput> responseObserver) {
return new StreamObserver<ExecutionServiceInput>() {
public void onNext(final ExecutionServiceInput executionServiceInput) {
LOGGER.info("Received request input to CDS: {}", executionServiceInput);
try {
- String responseString = getResponseString(executionServiceInput, countOfSuccesfulEvents);
- Builder builder = ExecutionServiceOutput.newBuilder();
- JsonFormat.parser().ignoringUnknownFields().merge(responseString, builder);
+ Builder builder = getResponse(executionServiceInput, countOfSuccesfulEvents);
TimeUnit.MILLISECONDS.sleep(requestedResponseDelayMs);
responseObserver.onNext(builder.build());
} catch (InvalidProtocolBufferException e) {
};
server = NettyServerBuilder.forAddress(new InetSocketAddress(host, port)).addService(testCdsBlueprintServerImpl)
- .build();
+ .build();
}
+ /**
+ * Start the server.
+ *
+ * @throws IOException IO exception.
+ */
public void start() throws IOException {
server.start();
+ // The grpc server uses daemon threads by default. Hence the application will exit as soon the main thread
+ // completes. So, wrap the server in a non-daemon thread and call awaitTermination to keep the thread alive
+ // until the server is terminated.
+ new Thread(this).start();
}
+ /**
+ * Stop the server.
+ */
public void stop() {
server.shutdown();
}
*
* @param executionServiceInput service input
* @param countOfSuccesfulEvents number of successive successful events
- * @return responseString
+ * @return builder for ExecutionServiceOutput response
+ * @throws InvalidProtocolBufferException when response string cannot be converted
*/
- public String getResponseString(ExecutionServiceInput executionServiceInput, int countOfSuccesfulEvents) {
+ public Builder getResponse(ExecutionServiceInput executionServiceInput, int countOfSuccesfulEvents)
+ throws InvalidProtocolBufferException {
String resourceName = "DefaultResponseEvent";
if (!StringUtils.isBlank(executionServiceInput.getActionIdentifiers().getActionName())) {
ActionIdentifiers actionIdentifiers = executionServiceInput.getActionIdentifiers();
}
LOGGER.info("Fetching response from {}", resourceName);
String responseString = ResourceUtils.getResourceAsString(resourceLocation + resourceName);
- if (responseString == null) {
+ Builder builder = ExecutionServiceOutput.newBuilder();
+ if (null == responseString) {
LOGGER.info("Expected response file {} not found in {}", resourceName, resourceLocation);
- responseString = ResourceUtils.getResourceAsString(resourceLocation
- + "DefaultResponseEvent.json");
+ ActionIdentifiers actionIdentifiers = executionServiceInput.getActionIdentifiers();
+ builder.setCommonHeader(executionServiceInput.getCommonHeader());
+ builder.setActionIdentifiers(actionIdentifiers);
+ builder.setPayload(executionServiceInput.getPayload());
+ builder.setStatus(Status.newBuilder().setCode(500).setMessage("failure")
+ .setErrorMessage("failed to get get cba file name(" + actionIdentifiers.getBlueprintName()
+ + "), version(" + actionIdentifiers.getBlueprintVersion() + ") from db : file check failed.")
+ .setEventType(EventType.EVENT_COMPONENT_FAILURE).setTimestamp(Instant.now().toString()));
+ } else {
+ LOGGER.debug("Returning response from CDS Simulator: {}", responseString);
+ JsonFormat.parser().ignoringUnknownFields().merge(responseString, builder);
+ }
+ return builder;
+ }
+
+ @Override
+ public void run() {
+ try {
+ server.awaitTermination();
+ } catch (InterruptedException e) {
+ LOGGER.info("gRPC server is terminated");
+ Thread.currentThread().interrupt();
}
- LOGGER.debug("Returning response from CDS Simulator: {}", responseString);
- return responseString;
}
}
}
@Test
- public void testGetResponseString() throws IOException, CoderException, ParseException {
+ public void testGetResponse() throws IOException, CoderException, ParseException {
CdsSimulator cdsSimulator = new CdsSimulator(Util.LOCALHOST, sim.getPort());
String reqstr = ResourceUtils.getResourceAsString(
"org/onap/policy/simulators/cds/cds.request.json");
String responseqstr = ResourceUtils.getResourceAsString(
"org/onap/policy/simulators/cds/pm_control-create-subscription.json");
ExecutionServiceInput request = coder.decode(reqstr, ExecutionServiceInput.class);
- assertEquals(responseqstr, cdsSimulator.getResponseString(request, 0));
+ org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput.Builder esoBuilder =
+ ExecutionServiceOutput.newBuilder();
+ JsonFormat.parser().ignoringUnknownFields().merge(responseqstr, esoBuilder);
+ assertEquals(esoBuilder.toString(), cdsSimulator.getResponse(request, 0).toString());
}
}