Fixing ConcurrentModificationException during multiple policy deployment in APEX 00/109700/5
authora.sreekumar <ajith.sreekumar@bell.ca>
Tue, 30 Jun 2020 16:50:04 +0000 (17:50 +0100)
committera.sreekumar <ajith.sreekumar@bell.ca>
Wed, 1 Jul 2020 11:35:51 +0000 (12:35 +0100)
Change-Id: Ib39e798d733727bdc676755b66adf2c499e618af
Issue-ID: POLICY-2655
Signed-off-by: a.sreekumar <ajith.sreekumar@bell.ca>
services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java

index 9b4176a..4d84fa3 100644 (file)
@@ -2,6 +2,7 @@
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
+ *  Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.policy.apex.service.engine.main;
 
 import java.io.IOException;
+import java.util.AbstractMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.stream.Stream;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.Setter;
 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException;
 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
 import org.onap.policy.apex.model.basicmodel.service.ModelService;
 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
@@ -116,12 +122,13 @@ public class ApexActivator {
         LOGGER.debug("Apex engine started as a service");
     }
 
-    private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws IOException, ApexException {
+    private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException {
         policyModelsMap = new LinkedHashMap<>();
         Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
         Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
-
-        for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) {
+        Set<Entry<ToscaPolicyIdentifier, ApexParameters>> apexParamsEntrySet =
+            new LinkedHashSet<>(apexParametersMap.entrySet());
+        apexParamsEntrySet.stream().forEach(apexParamsEntry -> {
             ApexParameters apexParams = apexParamsEntry.getValue();
             boolean duplicateInputParameterExist =
                 apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
@@ -131,7 +138,7 @@ public class ApexActivator {
                 LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.",
                     apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
                 apexParametersMap.remove(apexParamsEntry.getKey());
-                continue;
+                return;
             }
             inputParametersMap.putAll(apexParams.getEventInputParameters());
             outputParametersMap.putAll(apexParams.getEventOutputParameters());
@@ -139,14 +146,17 @@ public class ApexActivator {
             if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) {
                 LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .",
                     apexParams.getEngineServiceParameters().getPolicyModelFileName());
-
-                final String policyModelString =
-                    TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
-                AxPolicyModel policyModel = EngineServiceImpl
-                    .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
-                policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
+                try {
+                    final String policyModelString = TextFileUtils
+                        .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
+                    AxPolicyModel policyModel = EngineServiceImpl
+                        .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
+                    policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
+                } catch (ApexException | IOException e) {
+                    throw new ApexRuntimeException("Failed to create the apex model.", e);
+                }
             }
-        }
+        });
         AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
 
         // Set the policy model in the engine
@@ -164,26 +174,26 @@ public class ApexActivator {
     }
 
     private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
-        Map.Entry<ToscaPolicyIdentifier, AxPolicyModel> firstEntry = policyModelsMap.entrySet().iterator().next();
-        ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey());
-        AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue());
-        Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream =
-            policyModelsMap.entrySet().stream().skip(1);
-        Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry =
-            policyModelStream.reduce(firstEntry, ((entry1, entry2) -> {
+        // Doing a deep copy so that original values in policyModelsMap is retained after reduction operation
+        Set<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelsEntries = policyModelsMap.entrySet().stream()
+            .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue())).collect(Collectors.toSet());
+        Optional<Entry<ToscaPolicyIdentifier, AxPolicyModel>> finalPolicyModelEntry =
+            policyModelsEntries.stream().reduce((entry1, entry2) -> {
                 try {
                     entry1.setValue(
                         PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
                 } catch (ApexModelException exc) {
-                    LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
+                    LOGGER.error("Policy model for {} : {} has duplicates. So this policy is not executed.",
                         entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
                     apexParametersMap.remove(entry2.getKey());
                     policyModelsMap.remove(entry2.getKey());
                 }
                 return entry1;
-            }));
-        AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue());
-        policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap
+            });
+        AxPolicyModel finalPolicyModel = null;
+        if (finalPolicyModelEntry.isPresent()) {
+            finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.get().getValue());
+        }
         return finalPolicyModel;
     }
 
@@ -233,7 +243,7 @@ public class ApexActivator {
      * Set up unmarshaler/marshaler pairing for synchronized event handling. We only need to traverse the unmarshalers
      * because the unmarshalers and marshalers are paired one to one uniquely so if we find a synchronized unmarshaler
      * we'll also find its paired marshaler
-     * 
+     *
      * @param inputParametersMap the apex parameters
      */
     private void setUpMarshalerPairings(Map<String, EventHandlerParameters> inputParametersMap) {