/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2020 Nordix Foundation.
- * Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
-import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput.Builder;
import org.onap.policy.common.utils.resources.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CdsSimulator implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CdsSimulator.class);
-public class CdsSimulator {
@Getter
private final int port;
@Override
public StreamObserver<ExecutionServiceInput> process(
- final StreamObserver<ExecutionServiceOutput> responseObserver) {
+ final StreamObserver<ExecutionServiceOutput> responseObserver) {
return new StreamObserver<ExecutionServiceInput>() {
@Override
public void onNext(final ExecutionServiceInput executionServiceInput) {
+ LOGGER.info("Received request input to CDS: {}", executionServiceInput);
try {
- String responseString = getResponseString(executionServiceInput, countOfSuccesfulEvents);
- Builder builder = ExecutionServiceOutput.newBuilder();
- JsonFormat.parser().ignoringUnknownFields().merge(responseString, builder);
+ var builder = getResponse(executionServiceInput, countOfSuccesfulEvents);
TimeUnit.MILLISECONDS.sleep(requestedResponseDelayMs);
responseObserver.onNext(builder.build());
} catch (InvalidProtocolBufferException e) {
throw new SimulatorRuntimeException("Cannot convert ExecutionServiceOutput output", e);
- } catch (IOException e) {
- throw new SimulatorRuntimeException("Cannot read ExecutionServiceOutput from file", e);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new SimulatorRuntimeException("Execution Interrupted", e);
}
}
};
server = NettyServerBuilder.forAddress(new InetSocketAddress(host, port)).addService(testCdsBlueprintServerImpl)
- .build();
+ .build();
}
+ /**
+ * Start the server.
+ *
+ * @throws IOException IO exception.
+ */
public void start() throws IOException {
server.start();
+ // The grpc server uses daemon threads by default. Hence the application will exit as soon the main thread
+ // completes. So, wrap the server in a non-daemon thread and call awaitTermination to keep the thread alive
+ // until the server is terminated.
+ new Thread(this).start();
}
+ /**
+ * Stop the server.
+ */
public void stop() {
server.shutdown();
}
*
* @param executionServiceInput service input
* @param countOfSuccesfulEvents number of successive successful events
- * @return responseString
+ * @return builder for ExecutionServiceOutput response
+ * @throws InvalidProtocolBufferException when response string cannot be converted
*/
- public String getResponseString(ExecutionServiceInput executionServiceInput, int countOfSuccesfulEvents) {
- String resourceName = "DefaultResponseEvent";
+ public Builder getResponse(ExecutionServiceInput executionServiceInput, int countOfSuccesfulEvents)
+ throws InvalidProtocolBufferException {
+ var resourceName = "DefaultResponseEvent";
if (!StringUtils.isBlank(executionServiceInput.getActionIdentifiers().getActionName())) {
- ActionIdentifiers actionIdentifiers = executionServiceInput.getActionIdentifiers();
+ var actionIdentifiers = executionServiceInput.getActionIdentifiers();
resourceName = actionIdentifiers.getBlueprintName() + "-" + actionIdentifiers.getActionName();
}
if (countOfSuccesfulEvents > 0 && countOfEvents.getAndIncrement() % countOfSuccesfulEvents == 0) {
} else {
resourceName = resourceName + ".json";
}
- String responseString = ResourceUtils.getResourceAsString(resourceLocation + resourceName);
- if (responseString == null) {
- responseString = ResourceUtils.getResourceAsString(resourceLocation
- + "DefaultResponseEvent.json");
+ LOGGER.info("Fetching response from {}", resourceName);
+ var responseString = ResourceUtils.getResourceAsString(resourceLocation + resourceName);
+ var builder = ExecutionServiceOutput.newBuilder();
+ if (null == responseString) {
+ LOGGER.info("Expected response file {} not found in {}", resourceName, resourceLocation);
+ var actionIdentifiers = executionServiceInput.getActionIdentifiers();
+ builder.setCommonHeader(executionServiceInput.getCommonHeader());
+ builder.setActionIdentifiers(actionIdentifiers);
+ builder.setPayload(executionServiceInput.getPayload());
+ builder.setStatus(Status.newBuilder().setCode(500).setMessage("failure")
+ .setErrorMessage("failed to get get cba file name(" + actionIdentifiers.getBlueprintName()
+ + "), version(" + actionIdentifiers.getBlueprintVersion() + ") from db : file check failed.")
+ .setEventType(EventType.EVENT_COMPONENT_FAILURE).setTimestamp(Instant.now().toString()));
+ } else {
+ LOGGER.debug("Returning response from CDS Simulator: {}", responseString);
+ JsonFormat.parser().ignoringUnknownFields().merge(responseString, builder);
+ }
+ return builder;
+ }
+
+ @Override
+ public void run() {
+ try {
+ server.awaitTermination();
+ } catch (InterruptedException e) {
+ LOGGER.info("gRPC server is terminated");
+ Thread.currentThread().interrupt();
}
- return responseString;
}
}