APEX changes to support policy disable/enable and some improvements 83/110483/6
authora.sreekumar <ajith.sreekumar@bell.ca>
Thu, 23 Jul 2020 09:36:32 +0000 (10:36 +0100)
committera.sreekumar <ajith.sreekumar@bell.ca>
Fri, 24 Jul 2020 16:13:47 +0000 (17:13 +0100)
1) Do not stop all the marshallers/unmarshallers while updating the
engine. Stop and remove only those that are part of the policies that
are undeployed.
2) Do not reinitilaize any marshaller/unmarshaller that is already
initilaized as part of the policies which were already deployed.
Initialize only the ones as part of any newly deployed policy.
3) EngineParameters could be different in different policies. Aggregate
these parameters and make it available in the engine for any running policy.
4) Enable support for running policies with same model and different
configurations.

Change-Id: If74807a0515a741ef4e53bd0a93e43b05872f6b5
Issue-ID: POLICY-2536
Signed-off-by: a.sreekumar <ajith.sreekumar@bell.ca>
model/policy-model/src/main/java/org/onap/policy/apex/model/policymodel/handling/PolicyModelMerger.java
model/policy-model/src/test/java/org/onap/policy/apex/model/policymodel/handling/PolicyModelMergerTest.java
services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java

index 37c3b23..90d5b7f 100644 (file)
@@ -2,6 +2,7 @@
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
  *  Modifications Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -115,9 +116,9 @@ public final class PolicyModelMerger {
 
         if (failOnDuplicateKeys) {
             StringBuilder errorMessage = new StringBuilder();
-            checkForDuplicateContextItem(mergedSchemasMap, copyOverSchemasMap, errorMessage, "schema");
+            checkForDuplicateItem(mergedSchemasMap, copyOverSchemasMap, errorMessage, "schema");
             checkForDuplicateItem(mergedEventMap, copyOverEventMap, errorMessage, "event");
-            checkForDuplicateContextItem(mergedAlbumsMap, copyOverAlbumsMap, errorMessage, "album");
+            checkForDuplicateItem(mergedAlbumsMap, copyOverAlbumsMap, errorMessage, "album");
             checkForDuplicateItem(mergedTaskMap, copyOverTaskMap, errorMessage, "task");
             checkForDuplicateItem(mergedPolicyMap, copyOverPolicyMap, errorMessage, "policy");
             if (errorMessage.length() > 0) {
@@ -145,19 +146,10 @@ public final class PolicyModelMerger {
     }
 
     private static <V> void checkForDuplicateItem(Map<AxArtifactKey, V> mergedItemsMap,
-        Map<AxArtifactKey, V> copyOverItemsMap, StringBuilder errorMessage, String itemType) {
-        for (AxArtifactKey key : copyOverItemsMap.keySet()) {
-            if (mergedItemsMap.containsKey(key)) {
-                errorMessage.append("\n Duplicate " + itemType + " found - ").append(key.getId());
-            }
-        }
-    }
-
-    private static <V> void checkForDuplicateContextItem(Map<AxArtifactKey, V> mergedItemsMap,
         Map<AxArtifactKey, V> copyOverItemsMap, StringBuilder errorMessage, String itemType) {
         for (Entry<AxArtifactKey, V> entry : copyOverItemsMap.entrySet()) {
             V item = mergedItemsMap.get(entry.getKey());
-            // same context schema name with different definitions cannot occur in multiple policies
+            // same item with different definitions cannot occur in multiple policies
             if (null != item) {
                 if (item.equals(entry.getValue())) {
                     LOGGER.info("Same {} - {} is used by multiple policies.", itemType, entry.getKey().getId());
index 560ddc6..2bdf749 100644 (file)
@@ -2,6 +2,7 @@
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
  *  Modifications Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -29,6 +30,7 @@ import org.junit.Test;
 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
+import org.onap.policy.apex.model.policymodel.concepts.AxTaskLogic;
 
 /**
  * Test model merging.
@@ -79,8 +81,12 @@ public class PolicyModelMergerTest {
         assertNotNull(mergedPolicyModel);
 
         final AxPolicyModel rightPolicyModel3 = new SupportApexPolicyModelCreator().getModel();
-        assertThatThrownBy(
-            () -> PolicyModelMerger.getMergedPolicyModel(leftPolicyModel, rightPolicyModel3, true, true))
-                .hasMessageContaining("Duplicate policy found");
+        AxArtifactKey taskArtifactKey = new AxArtifactKey("task", "0.0.1");
+        // fail when concepts in two policies have same name but different definition
+        // here make up some change so as to update the definition of the task in second policy
+        rightPolicyModel3.getTasks().getTaskMap().get(taskArtifactKey)
+            .setTaskLogic(new AxTaskLogic(taskArtifactKey, "logicName", "logicFlavour", "logicImpl"));
+        assertThatThrownBy(() -> PolicyModelMerger.getMergedPolicyModel(leftPolicyModel, rightPolicyModel3, true, true))
+            .hasMessage("\n Same task - task:0.0.1 with different definitions used in different policies");
     }
 }
index 4d84fa3..1e2447b 100644 (file)
@@ -36,6 +36,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
 import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException;
+import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
 import org.onap.policy.apex.model.basicmodel.service.ModelService;
 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
@@ -88,6 +89,7 @@ public class ApexActivator {
 
     // The engine service
     private EngineService apexEngineService;
+    private AxArtifactKey engineKey;
 
     /**
      * Instantiate the activator for the Apex engine as a complete service.
@@ -112,8 +114,9 @@ public class ApexActivator {
             int totalInstanceCount = apexParametersMap.values().stream()
                 .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
             apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
+            engineKey = apexParameters.getEngineServiceParameters().getEngineKey();
             instantiateEngine(apexParameters);
-            setUpModelMarhsallerAndUnmarshaller(apexParameters);
+            setUpModelMarshallerAndUnmarshaller(apexParameters);
         } catch (final Exception e) {
             LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
             throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
@@ -122,7 +125,7 @@ public class ApexActivator {
         LOGGER.debug("Apex engine started as a service");
     }
 
-    private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException {
+    private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException {
         policyModelsMap = new LinkedHashMap<>();
         Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
         Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
@@ -149,8 +152,7 @@ public class ApexActivator {
                 try {
                     final String policyModelString = TextFileUtils
                         .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
-                    AxPolicyModel policyModel = EngineServiceImpl
-                        .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
+                    AxPolicyModel policyModel = EngineServiceImpl.createModel(engineKey, policyModelString);
                     policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
                 } catch (ApexException | IOException e) {
                     throw new ApexRuntimeException("Failed to create the apex model.", e);
@@ -160,10 +162,10 @@ public class ApexActivator {
         AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
 
         // Set the policy model in the engine
-        apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel,
-            true);
+        apexEngineService.updateModel(engineKey, finalPolicyModel, true);
 
-        setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
+        handleExistingMarshallerAndUnmarshaller(inputParametersMap, outputParametersMap);
+        setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
             outputParametersMap);
 
         // Wire up pairings between marhsallers and unmarshallers
@@ -197,9 +199,10 @@ public class ApexActivator {
         return finalPolicyModel;
     }
 
-    private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
+    private void setUpNewMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
         Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap)
         throws ApexEventException {
+
         // Producer parameters specify what event marshalers to handle events leaving Apex are
         // set up and how they are set up
         for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) {
@@ -220,9 +223,25 @@ public class ApexActivator {
         }
     }
 
+    private void handleExistingMarshallerAndUnmarshaller(Map<String, EventHandlerParameters> inputParametersMap,
+        Map<String, EventHandlerParameters> outputParametersMap) {
+        // stop and remove any marshaller/unmarshaller that is part of a policy that is undeployed
+        marshallerMap.entrySet().stream()
+            .filter(marshallerEntry -> !outputParametersMap.containsKey(marshallerEntry.getKey()))
+            .forEach(marshallerEntry -> marshallerEntry.getValue().stop());
+        marshallerMap.keySet().removeIf(marshallerKey -> !outputParametersMap.containsKey(marshallerKey));
+        unmarshallerMap.entrySet().stream()
+            .filter(unmarshallerEntry -> !inputParametersMap.containsKey(unmarshallerEntry.getKey()))
+            .forEach(unmarshallerEntry -> unmarshallerEntry.getValue().stop());
+        unmarshallerMap.keySet().removeIf(unmarshallerKey -> !inputParametersMap.containsKey(unmarshallerKey));
+
+        // If a marshaller/unmarshaller is already initialized, they don't need to be reinitialized during model update.
+        outputParametersMap.keySet().removeIf(marshallerMap::containsKey);
+        inputParametersMap.keySet().removeIf(unmarshallerMap::containsKey);
+    }
+
     private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
-        if (null != apexEngineService
-            && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) {
+        if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) {
             throw new ApexException("Apex Engine already initialized.");
         }
         // Create engine with specified thread count
@@ -284,9 +303,8 @@ public class ApexActivator {
      */
     public void updateModel(Map<ToscaPolicyIdentifier, ApexParameters> apexParamsMap) throws ApexException {
         try {
-            shutdownMarshallerAndUnmarshaller();
             ApexParameters apexParameters = apexParamsMap.values().iterator().next();
-            setUpModelMarhsallerAndUnmarshaller(apexParameters);
+            setUpModelMarshallerAndUnmarshaller(apexParameters);
         } catch (final Exception e) {
             LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
             throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
index 0e38230..459cdff 100644 (file)
@@ -2,6 +2,7 @@
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
  *  Modification Copyright (C) 2019-2020 Nordix Foundation.
+ *  Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -30,6 +31,7 @@ import java.util.Map.Entry;
 import java.util.TreeMap;
 import lombok.Getter;
 import lombok.Setter;
+import org.onap.policy.apex.core.engine.EngineParameters;
 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
 import org.onap.policy.apex.model.basicmodel.service.ModelService;
@@ -40,6 +42,7 @@ import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
 import org.onap.policy.apex.service.parameters.ApexParameterHandler;
 import org.onap.policy.apex.service.parameters.ApexParameters;
 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.common.parameters.ParameterService;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
 import org.slf4j.ext.XLogger;
 import org.slf4j.ext.XLoggerFactory;
@@ -61,6 +64,11 @@ public class ApexMain {
     @Getter
     private Map<ToscaPolicyIdentifier, ApexParameters> apexParametersMap;
 
+    //engineParameters are aggregated in case of multiple policies
+    private EngineParameters aggregatedEngineParameters;
+
+    private ApexParameterHandler apexParameterHandler = new ApexParameterHandler();
+
     @Getter
     @Setter(lombok.AccessLevel.PRIVATE)
     private volatile boolean alive = false;
@@ -73,13 +81,14 @@ public class ApexMain {
     public ApexMain(final String[] args) {
         LOGGER.entry("Starting Apex service with parameters " + Arrays.toString(args) + " . . .");
         apexParametersMap = new LinkedHashMap<>();
+        aggregatedEngineParameters = new EngineParameters();
         try {
             apexParametersMap.put(new ToscaPolicyIdentifier(), populateApexParameters(args));
         } catch (ApexException e) {
             LOGGER.error(APEX_SERVICE_FAILED_MSG, e);
             return;
         }
-
+        apexParameterHandler.registerParameters(apexParametersMap.values().iterator().next());
         // Now, create the activator for the Apex service
         activator = new ApexActivator(apexParametersMap);
 
@@ -105,6 +114,7 @@ public class ApexMain {
      */
     public ApexMain(Map<ToscaPolicyIdentifier, String[]> policyArgumentsMap) throws ApexException {
         apexParametersMap = new LinkedHashMap<>();
+        aggregatedEngineParameters = new EngineParameters();
         for (Entry<ToscaPolicyIdentifier, String[]> policyArgsEntry: policyArgumentsMap.entrySet()) {
             try {
                 apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue()));
@@ -117,6 +127,11 @@ public class ApexMain {
             LOGGER.error(APEX_SERVICE_FAILED_MSG);
             return;
         }
+
+        // Set the aggregated engineParameters which will be used later while creating the engine
+        ApexParameters apexParameters = apexParametersMap.values().iterator().next();
+        apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters);
+        apexParameterHandler.registerParameters(apexParameters);
         // Now, create the activator for the Apex service
         activator = new ApexActivator(apexParametersMap);
 
@@ -143,7 +158,10 @@ public class ApexMain {
      * @throws ApexException on errors
      */
     public void updateModel(Map<ToscaPolicyIdentifier, String[]> policyArgsMap) throws ApexException {
+        // flag that determines if any policy received in PDP_UPDATE is already deployed in the engine
+        boolean isAnyPolicyDeployed = policyArgsMap.keySet().stream().anyMatch(apexParametersMap::containsKey);
         apexParametersMap.clear();
+        aggregatedEngineParameters = new EngineParameters();
         AxContextAlbums albums = ModelService.getModel(AxContextAlbums.class);
         Map<AxArtifactKey, AxContextAlbum> albumsMap = new TreeMap<>();
         for (Entry<ToscaPolicyIdentifier, String[]> policyArgsEntry : policyArgsMap.entrySet()) {
@@ -155,14 +173,19 @@ public class ApexMain {
                     policyArgsEntry.getKey().getVersion(), e);
             }
         }
+        // Set the aggregated engineParameters
+        ApexParameters apexParameters = apexParametersMap.values().iterator().next();
+        apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters);
+        ParameterService.clear();
+        apexParameterHandler.registerParameters(apexParameters);
         try {
-            if (albumsMap.isEmpty()) {
+            if (albumsMap.isEmpty() && !isAnyPolicyDeployed) {
                 // clear context since none of the policies' context albums has to be retained
                 // this could be because all policies have a major version change,
                 // or a full set of new policies are received in the update message
                 activator.terminate();
-                // ParameterService is cleared when activator is terminated. Register the engine parameters in this case
-                new ApexParameterHandler().registerParameters(apexParametersMap.values().iterator().next());
+                // ParameterService is cleared when activator is terminated. Register the parameters again in this case
+                apexParameterHandler.registerParameters(apexParameters);
                 activator = new ApexActivator(apexParametersMap);
                 activator.initialize();
                 setAlive(true);
@@ -218,10 +241,6 @@ public class ApexMain {
         ApexParameters axParameters;
         // Read the parameters
         try {
-            ApexParameterHandler apexParameterHandler = new ApexParameterHandler();
-            // In case of multiple policies received from PAP, do not clear ParameterService if parameters of one policy
-            // already registered
-            apexParameterHandler.setKeepParameterServiceFlag(null != apexParametersMap && !apexParametersMap.isEmpty());
             axParameters = apexParameterHandler.getParameters(arguments);
         } catch (final Exception e) {
             LOGGER.error("Cannot create APEX Parameters from the arguments provided.", e);
@@ -244,9 +263,17 @@ public class ApexMain {
                 ehParameterEntry.getValue().setName(ehParameterEntry.getKey());
             }
         }
+        aggregateEngineParameters(axParameters.getEngineServiceParameters().getEngineParameters());
         return axParameters;
     }
 
+    private void aggregateEngineParameters(EngineParameters engineParameters) {
+        aggregatedEngineParameters.getTaskParameters().addAll(engineParameters.getTaskParameters());
+        aggregatedEngineParameters.getExecutorParameterMap().putAll(engineParameters.getExecutorParameterMap());
+        aggregatedEngineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap()
+            .putAll(engineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap());
+    }
+
     /**
      * Shut down Execution.
      *
index 02d3973..f88733f 100644 (file)
@@ -2,19 +2,20 @@
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
  *  Modifications Copyright (C) 2019 Nordix Foundation.
+ *  Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * 
+ *
  * SPDX-License-Identifier: Apache-2.0
  * ============LICENSE_END=========================================================
  */
@@ -24,7 +25,6 @@ package org.onap.policy.apex.service.parameters;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import java.io.FileReader;
-import lombok.Setter;
 import org.onap.policy.apex.core.engine.EngineParameters;
 import org.onap.policy.apex.service.engine.main.ApexCommandLineArguments;
 import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
@@ -46,9 +46,6 @@ import org.slf4j.ext.XLoggerFactory;
 public class ApexParameterHandler {
     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexParameterHandler.class);
 
-    @Setter
-    private boolean keepParameterServiceFlag;
-
     /**
      * Read the parameters from the parameter file.
      *
@@ -57,11 +54,8 @@ public class ApexParameterHandler {
      * @throws ParameterException on parameter exceptions
      */
     public ApexParameters getParameters(final ApexCommandLineArguments arguments) throws ParameterException {
-        // when populating parameters for multiple policies, do not clear the ParameterService already registered
-        // otherwise clear all existing parameters
-        if (!keepParameterServiceFlag) {
-            ParameterService.clear();
-        }
+
+        ParameterService.clear();
 
         ApexParameters parameters = null;
 
@@ -70,11 +64,11 @@ public class ApexParameterHandler {
             // Register the adapters for our carrier technologies and event protocols with GSON
             // @formatter:off
             final Gson gson = new GsonBuilder()
-                            .registerTypeAdapter(EngineParameters.class, 
+                            .registerTypeAdapter(EngineParameters.class,
                                             new EngineServiceParametersJsonAdapter())
-                            .registerTypeAdapter(CarrierTechnologyParameters.class, 
+                            .registerTypeAdapter(CarrierTechnologyParameters.class,
                                             new CarrierTechnologyParametersJsonAdapter())
-                            .registerTypeAdapter(EventProtocolParameters.class, 
+                            .registerTypeAdapter(EventProtocolParameters.class,
                                             new EventProtocolParametersJsonAdapter())
                             .create();
             // @formatter:on
@@ -118,13 +112,6 @@ public class ApexParameterHandler {
             LOGGER.info(returnMessage);
         }
 
-        // engine parameters in multiple policies are expected to be same.
-        // no need to do registration if already registered
-        if (!keepParameterServiceFlag) {
-            // Register the parameters with the parameter service
-            registerParameters(parameters);
-        }
-
         return parameters;
     }