Fix bugs in context flushing 83/74283/1
authorliamfallon <liam.fallon@est.tech>
Wed, 5 Dec 2018 17:38:40 +0000 (17:38 +0000)
committerliamfallon <liam.fallon@est.tech>
Wed, 5 Dec 2018 17:41:06 +0000 (17:41 +0000)
The context flushing support in the context distributor can throw a
concurrent exception when a lot of context is active and a flush takes
place. This fix introduces proper synchronized access to the album maps.

In addition, there was an incorrect cast in the call to the persist() method
of the persistor.

Change-Id: I6594ffed83a0b34251a0c5c7893d4abb17437fd4
Issue-ID: POLICY-1380
Signed-off-by: liamfallon <liam.fallon@est.tech>
context/context-management/src/main/java/org/onap/policy/apex/context/impl/distribution/AbstractDistributor.java

index 56368ae..63ad77a 100644 (file)
@@ -59,8 +59,8 @@ public abstract class AbstractDistributor implements Distributor {
     private AxArtifactKey key = null;
 
     // The context albums for this context set indexed by their keys
-    private static Map<AxArtifactKey, ContextAlbum> albumMaps =
-            Collections.synchronizedMap(new HashMap<AxArtifactKey, ContextAlbum>());
+    private static Map<AxArtifactKey, ContextAlbum> albumMaps = Collections
+                    .synchronizedMap(new HashMap<AxArtifactKey, ContextAlbum>());
 
     // Lock manager for this distributor
     private static LockManager lockManager = null;
@@ -109,6 +109,7 @@ public abstract class AbstractDistributor implements Distributor {
 
     /**
      * Set the static lock manager.
+     *
      * @param incomingLockManager the lock manager value
      */
     private static void setLockManager(final LockManager incomingLockManager) {
@@ -117,6 +118,7 @@ public abstract class AbstractDistributor implements Distributor {
 
     /**
      * Set the static flush timer.
+     *
      * @param incomingFlushTimer the flush timer value
      */
     private static void setFlushTimer(final DistributorFlushTimerTask incomingFlushTimer) {
@@ -149,7 +151,6 @@ public abstract class AbstractDistributor implements Distributor {
      */
     public abstract Map<String, Object> getContextAlbumMap(AxArtifactKey contextAlbumKey);
 
-
     /*
      * (non-Javadoc)
      *
@@ -183,8 +184,8 @@ public abstract class AbstractDistributor implements Distributor {
         // Check if the context album is valid
         final AxValidationResult result = album.validate(new AxValidationResult());
         if (!result.isValid()) {
-            final String resultString =
-                    "context album definition for " + album.getKey().getId() + " is invalid" + result;
+            final String resultString = "context album definition for " + album.getKey().getId() + " is invalid"
+                            + result;
             LOGGER.warn(resultString);
             throw new ContextException(resultString);
         }
@@ -193,28 +194,30 @@ public abstract class AbstractDistributor implements Distributor {
         final AxContextSchema schema = ModelService.getModel(AxContextSchemas.class).get(album.getItemSchema());
         if (schema == null) {
             final String resultString = "schema \"" + album.getItemSchema().getId() + "\" for context album "
-                    + album.getKey().getId() + " does not exist";
+                            + album.getKey().getId() + " does not exist";
             LOGGER.warn(resultString);
             throw new ContextException(resultString);
         }
 
-        // Check if the map has already been instantiated
-        if (!albumMaps.containsKey(album.getKey())) {
-            // Instantiate the album map for this context album that we'll distribute using the distribution mechanism
-            final Map<String, Object> newContextAlbumMap = getContextAlbumMap(album.getKey());
-
-            // The distributed context album will have content from another process instance if the album exists in
-            // another process,
-            // if not, we have to try to read the content from persistence
-            if (newContextAlbumMap.isEmpty()) {
-                // Read entries from persistence, (Not implemented yet)
+        synchronized (albumMaps) {
+            // Check if the map has already been instantiated
+            if (!albumMaps.containsKey(album.getKey())) {
+                // Instantiate the album map for this context album that we'll distribute using the distribution
+                // mechanism
+                final Map<String, Object> newContextAlbumMap = getContextAlbumMap(album.getKey());
+
+                // The distributed context album will have content from another process instance if the album exists in
+                // another process, if not, we have to try to read the content from persistence
+                if (newContextAlbumMap.isEmpty()) {
+                    // Read entries from persistence, (Not implemented yet)
+                }
+
+                // Create the context album and put the context album object onto the distributor
+                albumMaps.put(album.getKey(), new ContextAlbumImpl(album, this, newContextAlbumMap));
             }
 
-            // Create the context album and put the context album object onto the distributor
-            albumMaps.put(album.getKey(), new ContextAlbumImpl(album, this, newContextAlbumMap));
+            return albumMaps.get(album.getKey());
         }
-
-        return albumMaps.get(album.getKey());
     }
 
     /*
@@ -226,14 +229,16 @@ public abstract class AbstractDistributor implements Distributor {
      */
     @Override
     public void removeContextAlbum(final AxContextAlbum contextAlbum) throws ContextException {
-        // Check if the map already exists, if not return
-        if (!albumMaps.containsKey(contextAlbum.getKey())) {
-            LOGGER.warn("map remove failed, supplied map is null");
-            throw new ContextException("map update failed, supplied map is null");
-        }
+        synchronized (albumMaps) {
+            // Check if the map already exists, if not return
+            if (!albumMaps.containsKey(contextAlbum.getKey())) {
+                LOGGER.warn("map remove failed, supplied map is null");
+                throw new ContextException("map update failed, supplied map is null");
+            }
 
-        // Remove the map from the distributor
-        albumMaps.remove(contextAlbum.getKey());
+            // Remove the map from the distributor
+            albumMaps.remove(contextAlbum.getKey());
+        }
     }
 
     /*
@@ -243,11 +248,13 @@ public abstract class AbstractDistributor implements Distributor {
      */
     @Override
     public void flush() throws ContextException {
-        // Flush all the maps
-        for (final Entry<AxArtifactKey, ContextAlbum> distributorMapEntry : albumMaps.entrySet()) {
-            // Let the persistor write each of the entries
-            for (final Object contextItem : distributorMapEntry.getValue().values()) {
-                persistor.writeContextItem((AxContextSchema) contextItem);
+        synchronized (albumMaps) {
+            // Flush all the maps
+            for (final Entry<AxArtifactKey, ContextAlbum> distributorMapEntry : albumMaps.entrySet()) {
+                // Let the persistor write each of the entries
+                for (final Object contextItem : distributorMapEntry.getValue().values()) {
+                    persistor.writeContextItem(contextItem);
+                }
             }
         }
     }
@@ -260,15 +267,17 @@ public abstract class AbstractDistributor implements Distributor {
      */
     @Override
     public void flushContextAlbum(final ContextAlbum contextAlbum) throws ContextException {
-        // Check if the map already exists, if not return
-        if (!albumMaps.containsKey(contextAlbum.getKey())) {
-            LOGGER.warn("map flush failed, supplied map is null");
-            throw new ContextException("map flush failed, supplied map is null");
-        }
+        synchronized (albumMaps) {
+            // Check if the map already exists, if not return
+            if (!albumMaps.containsKey(contextAlbum.getKey())) {
+                LOGGER.warn("map flush failed, supplied map is null");
+                throw new ContextException("map flush failed, supplied map is null");
+            }
 
-        // Let the persistor flush the items on the map
-        for (final Object contextItem : albumMaps.get(contextAlbum.getKey()).values()) {
-            persistor.writeContextItem(contextItem);
+            // Let the persistor flush the items on the map
+            for (final Object contextItem : albumMaps.get(contextAlbum.getKey()).values()) {
+                persistor.writeContextItem(contextItem);
+            }
         }
     }
 
@@ -329,7 +338,9 @@ public abstract class AbstractDistributor implements Distributor {
             setLockManager(null);
         }
 
-        albumMaps.clear();
+        synchronized (albumMaps) {
+            albumMaps.clear();
+        }
 
         // Turn off the flush timer
         flushTimer.cancel();