Add CDS simulator to policy-models
[policy/models.git] / models-interactions / model-actors / actor.cds / src / main / java / org / onap / policy / controlloop / actor / cds / GrpcOperation.java
index 820f4de..0a882ce 100644 (file)
@@ -51,6 +51,7 @@ import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 import org.onap.policy.controlloop.actorserviceprovider.Util;
 import org.onap.policy.controlloop.actorserviceprovider.impl.OperationPartial;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 import org.onap.policy.controlloop.policy.TargetType;
 
 /**
@@ -202,13 +203,26 @@ public class GrpcOperation extends OperationPartial {
     @Override
     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
+        /*
+         * construct the request first so that we don't have to clean up the "client" if
+         * an exception is thrown
+         */
+        ExecutionServiceInput request = constructRequest(params);
+
         CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
+
         client = new CdsProcessorGrpcClient(new CdsActorServiceManager(outcome, future),
                         config.getCdsServerProperties());
 
-        ExecutionServiceInput request = constructRequest(params);
         client.sendRequest(request);
-        return future;
+
+        // arrange to shutdown the client when the request completes
+        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+        controller.wrap(future).whenCompleteAsync(controller.delayedComplete(), params.getExecutor())
+                        .whenCompleteAsync((arg1, arg2) -> client.close(), getBlockingExecutor());
+
+        return controller;
     }
 
     /**