* ============LICENSE_START=======================================================
* Copyright (C) 2020 Nordix Foundation.
* Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
-import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput.Builder;
+import org.onap.policy.common.utils.resources.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CdsSimulator implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CdsSimulator.class);
-public class CdsSimulator {
@Getter
private final int port;
private final Server server;
+ private final String resourceLocation;
+
+ private AtomicInteger countOfEvents = new AtomicInteger(1);
+
/**
* Constructs the object, but does not start it.
*
* @param port port of the server
*/
public CdsSimulator(String host, int port) {
+ this(host, port, "org/onap/policy/simulators/cds/", 0, 0);
+ }
+
+ /**
+ * Constructs the object, but does not start it.
+ *
+ * @param host host name of the server
+ * @param port port of the server
+ * @param countOfSuccesfulEvents number of successive successful events
+ * @param requestedResponseDelayMs time for the request to be processed
+ */
+ public CdsSimulator(String host, int port, String resourceLocation, int countOfSuccesfulEvents,
+ long requestedResponseDelayMs) {
this.port = port;
+ this.resourceLocation = resourceLocation;
BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {
@Override
public StreamObserver<ExecutionServiceInput> process(
- final StreamObserver<ExecutionServiceOutput> responseObserver) {
+ final StreamObserver<ExecutionServiceOutput> responseObserver) {
return new StreamObserver<ExecutionServiceInput>() {
@Override
public void onNext(final ExecutionServiceInput executionServiceInput) {
+ LOGGER.info("Received request input to CDS: {}", executionServiceInput);
try {
- String responseString = IOUtils.toString(
- getClass().getResource("cds/CreateSubscriptionResponseEvent.json"),
- StandardCharsets.UTF_8);
- Builder builder = ExecutionServiceOutput.newBuilder();
- JsonFormat.parser().ignoringUnknownFields().merge(responseString, builder);
+ Builder builder = getResponse(executionServiceInput, countOfSuccesfulEvents);
+ TimeUnit.MILLISECONDS.sleep(requestedResponseDelayMs);
responseObserver.onNext(builder.build());
-
} catch (InvalidProtocolBufferException e) {
throw new SimulatorRuntimeException("Cannot convert ExecutionServiceOutput output", e);
-
- } catch (IOException e) {
- throw new SimulatorRuntimeException("Cannot read ExecutionServiceOutput from file", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SimulatorRuntimeException("Execution Interrupted", e);
}
}
};
server = NettyServerBuilder.forAddress(new InetSocketAddress(host, port)).addService(testCdsBlueprintServerImpl)
- .build();
+ .build();
}
+ /**
+ * Start the server.
+ *
+ * @throws IOException IO exception.
+ */
public void start() throws IOException {
server.start();
+ // The grpc server uses daemon threads by default. Hence the application will exit as soon the main thread
+ // completes. So, wrap the server in a non-daemon thread and call awaitTermination to keep the thread alive
+ // until the server is terminated.
+ new Thread(this).start();
}
+ /**
+ * Stop the server.
+ */
public void stop() {
server.shutdown();
}
+
+ /**
+ * Constructs the ResponseString on the basis of request.
+ *
+ * @param executionServiceInput service input
+ * @param countOfSuccesfulEvents number of successive successful events
+ * @return builder for ExecutionServiceOutput response
+ * @throws InvalidProtocolBufferException when response string cannot be converted
+ */
+ public Builder getResponse(ExecutionServiceInput executionServiceInput, int countOfSuccesfulEvents)
+ throws InvalidProtocolBufferException {
+ String resourceName = "DefaultResponseEvent";
+ if (!StringUtils.isBlank(executionServiceInput.getActionIdentifiers().getActionName())) {
+ ActionIdentifiers actionIdentifiers = executionServiceInput.getActionIdentifiers();
+ resourceName = actionIdentifiers.getBlueprintName() + "-" + actionIdentifiers.getActionName();
+ }
+ if (countOfSuccesfulEvents > 0 && countOfEvents.getAndIncrement() % countOfSuccesfulEvents == 0) {
+ // generating the failure response
+ resourceName = resourceName + "-error.json";
+ } else {
+ resourceName = resourceName + ".json";
+ }
+ LOGGER.info("Fetching response from {}", resourceName);
+ String responseString = ResourceUtils.getResourceAsString(resourceLocation + resourceName);
+ Builder builder = ExecutionServiceOutput.newBuilder();
+ if (null == responseString) {
+ LOGGER.info("Expected response file {} not found in {}", resourceName, resourceLocation);
+ ActionIdentifiers actionIdentifiers = executionServiceInput.getActionIdentifiers();
+ builder.setCommonHeader(executionServiceInput.getCommonHeader());
+ builder.setActionIdentifiers(actionIdentifiers);
+ builder.setPayload(executionServiceInput.getPayload());
+ builder.setStatus(Status.newBuilder().setCode(500).setMessage("failure")
+ .setErrorMessage("failed to get get cba file name(" + actionIdentifiers.getBlueprintName()
+ + "), version(" + actionIdentifiers.getBlueprintVersion() + ") from db : file check failed.")
+ .setEventType(EventType.EVENT_COMPONENT_FAILURE).setTimestamp(Instant.now().toString()));
+ } else {
+ LOGGER.debug("Returning response from CDS Simulator: {}", responseString);
+ JsonFormat.parser().ignoringUnknownFields().merge(responseString, builder);
+ }
+ return builder;
+ }
+
+ @Override
+ public void run() {
+ try {
+ server.awaitTermination();
+ } catch (InterruptedException e) {
+ LOGGER.info("gRPC server is terminated");
+ Thread.currentThread().interrupt();
+ }
+ }
}