Improved notification on item archive/restore 81/73681/3
authorvempo <vitaliy.emporopulo@amdocs.com>
Tue, 27 Nov 2018 13:12:27 +0000 (15:12 +0200)
committervempo <vitaliy.emporopulo@amdocs.com>
Tue, 27 Nov 2018 13:30:22 +0000 (15:30 +0200)
Addressed code review comments, added unit-tests.

Change-Id: I2a74e9969540cb636658aa012f82a81e0fd2eac2
Issue-ID: SDC-1667
Signed-off-by: vempo <vitaliy.emporopulo@amdocs.com>
openecomp-be/api/openecomp-sdc-rest-webapp/item-rest/item-rest-services/src/main/java/org/openecomp/sdcrests/item/rest/services/ItemsImpl.java
openecomp-be/api/openecomp-sdc-rest-webapp/item-rest/item-rest-services/src/main/java/org/openecomp/sdcrests/item/rest/services/catalog/notification/AsyncNotifier.java
openecomp-be/api/openecomp-sdc-rest-webapp/item-rest/item-rest-services/src/main/java/org/openecomp/sdcrests/item/rest/services/catalog/notification/NotifierFactory.java
openecomp-be/api/openecomp-sdc-rest-webapp/item-rest/item-rest-services/src/main/java/org/openecomp/sdcrests/item/rest/services/catalog/notification/http/HttpTaskProducer.java
openecomp-be/api/openecomp-sdc-rest-webapp/item-rest/item-rest-services/src/test/java/org/openecomp/sdcrests/item/rest/services/catalog/notification/AsyncNotifierTest.java
openecomp-be/api/openecomp-sdc-rest-webapp/item-rest/item-rest-services/src/test/java/org/openecomp/sdcrests/item/rest/services/catalog/notification/http/HttpTaskProducerTest.java
openecomp-be/lib/openecomp-core-lib/openecomp-nosqldb-lib/openecomp-nosqldb-core/src/test/java/org/openecomp/core/nosqldb/util/ConfigurationManagerTest.java

index a93c063..265570a 100644 (file)
@@ -67,6 +67,8 @@ import static org.openecomp.sdc.versioning.VersioningNotificationConstansts.ITEM
 @Validated
 public class ItemsImpl implements Items {
 
+    private static final String ONBOARDING_METHOD = "onboardingMethod";
+
     private ItemManager itemManager = ItemManagerFactory.getInstance().createInterface();
 
     private static ActivityLogManager activityLogManager = ActivityLogManagerFactory.getInstance().createInterface();
@@ -92,8 +94,6 @@ public class ItemsImpl implements Items {
                 .put(ItemAction.RESTORE, new ActionSideAffects(ActivityType.Restore, NotificationEventTypes.RESTORE));
     }
 
-    private static final String ONBOARDING_METHOD = "onboardingMethod";
-
 
     @Override
     public Response actOn(ItemActionRequestDto request, String itemId, String user) {
@@ -115,10 +115,10 @@ public class ItemsImpl implements Items {
 
         actionSideAffectsMap.get(request.getAction()).execute(item, user);
         try {
-            Notifier notifier = NotifierFactory.getInstance();
-            notifier.execute(Collections.singleton(itemId), request.getAction());
+            Notifier catalogNotifier = NotifierFactory.getInstance();
+            catalogNotifier.execute(Collections.singleton(itemId), request.getAction());
         } catch (Exception e) {
-            LOGGER.error("Failed to send catalog notification on item " + itemId + " Error: " + e.getMessage());
+            LOGGER.error("Failed to send catalog notification on item {}", itemId, e);
         }
 
         return Response.ok().build();
@@ -222,7 +222,7 @@ public class ItemsImpl implements Items {
             try {
                 notifier.notifySubscribers(syncEvent, userName);
             } catch (Exception e) {
-                LOGGER.error("Failed to send sync notification to users subscribed to item '" + itemId);
+                LOGGER.error("Failed to send sync notification to users subscribed to item '{}'", itemId, e);
             }
         }
     }
index 8288010..872c61e 100644 (file)
 package org.openecomp.sdcrests.item.rest.services.catalog.notification;
 
 import java.util.Collection;
+import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import org.openecomp.sdc.logging.api.Logger;
 import org.openecomp.sdc.logging.api.LoggerFactory;
@@ -41,9 +43,19 @@ public class AsyncNotifier implements Notifier {
     private static final long DEFAULT_INTERVAL = 5000;
 
     private final BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer;
+    private final int numberOfRetries;
+    private final long retryInterval;
+
 
     AsyncNotifier(BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer) {
+        this(taskProducer, DEFAULT_NUM_OF_RETRIES, DEFAULT_INTERVAL);
+    }
+
+    AsyncNotifier(BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer, int numOfRetries,
+            long retryInterval) {
         this.taskProducer = taskProducer;
+        this.numberOfRetries = numOfRetries;
+        this.retryInterval = retryInterval;
     }
 
     @Override
@@ -51,8 +63,7 @@ public class AsyncNotifier implements Notifier {
 
         Callable<AsyncNotifier.NextAction> worker = taskProducer.apply(itemIds, action);
 
-        RetryingTask retryingTask =
-                new RetryingTask(worker, DEFAULT_NUM_OF_RETRIES, DEFAULT_INTERVAL, EXECUTOR_SERVICE);
+        RetryingTask retryingTask = new RetryingTask(worker, numberOfRetries, retryInterval, EXECUTOR_SERVICE);
 
         EXECUTOR_SERVICE.submit(LoggingContext.copyToCallable(retryingTask));
     }
@@ -68,19 +79,37 @@ public class AsyncNotifier implements Notifier {
         private final Callable<AsyncNotifier.NextAction> worker;
         private final long delay;
         private final ScheduledExecutorService scheduler;
-        private volatile int retries;
+        private final AtomicInteger retries;
 
         RetryingTask(Callable<AsyncNotifier.NextAction> worker, int numOfRetries, long delay,
                 ScheduledExecutorService scheduler) {
 
-            this.worker = worker;
-            this.retries = numOfRetries;
-            this.delay = delay;
-            this.scheduler = scheduler;
+            this.worker = Objects.requireNonNull(worker);
+            this.retries = new AtomicInteger(requirePositiveRetries(numOfRetries));
+            this.delay = requirePositiveDelay(delay);
+            this.scheduler = Objects.requireNonNull(scheduler);
+        }
+
+        private int requirePositiveRetries(int number) {
+
+            if (number < 1) {
+                throw new IllegalArgumentException("Number of retries must be positive");
+            }
+
+            return number;
+        }
+
+        private long requirePositiveDelay(long number) {
+
+            if (number < 1) {
+                throw new IllegalArgumentException("Delay must be positive");
+            }
+
+            return number;
         }
 
         @Override
-        public synchronized Void call() throws Exception {
+        public Void call() throws Exception {
 
             NextAction next = worker.call();
             if (next == NextAction.DONE) {
@@ -88,8 +117,8 @@ public class AsyncNotifier implements Notifier {
                 return null;
             }
 
-            retries--;
-            if (retries == 0) {
+            int attempts = retries.decrementAndGet();
+            if (attempts < 1) {
                 LOGGER.warn("Exhausted number of retries for task {}, exiting", worker);
                 return null;
             }
index c6abd34..d210dc2 100644 (file)
@@ -17,6 +17,8 @@
 package org.openecomp.sdcrests.item.rest.services.catalog.notification.http;
 
 import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.function.BiFunction;
 import org.openecomp.sdc.common.session.SessionContextProviderFactory;
@@ -40,6 +42,13 @@ public class HttpTaskProducer
 
     private static final String CATALOG_HTTP_PROTOCOL = "HTTP";
     private static final String CATALOG_HTTPS_PROTOCOL = "HTTPS";
+    private static final Map<ItemAction, String> ACTION_PATHS;
+
+    static {
+        ACTION_PATHS = new EnumMap<>(ItemAction.class);
+        ACTION_PATHS.put(ItemAction.ARCHIVE, "archived");
+        ACTION_PATHS.put(ItemAction.RESTORE, "restored");
+    }
 
     private final String notifyCatalogUrl;
 
@@ -81,15 +90,21 @@ public class HttpTaskProducer
         return createNotificationTask(itemIds, action);
     }
 
-    private static String getEndpoint(ItemAction action) {
+    private HttpNotificationTask createNotificationTask(Collection<String> itemIds, ItemAction action) {
+        String userId = SessionContextProviderFactory.getInstance().createInterface().get().getUser().getUserId();
+        String notificationEndpoint = notifyCatalogUrl + getApiPath(action);
+        LOGGER.debug("Catalog notification URL: {}", notificationEndpoint);
+        return new HttpNotificationTask(notificationEndpoint, userId, itemIds);
+    }
 
-        if (action == ItemAction.ARCHIVE) {
-            return "archived";
-        } else if (action == ItemAction.RESTORE) {
-            return "restored";
-        } else {
+    static String getApiPath(ItemAction action) {
+
+        String path = ACTION_PATHS.get(action);
+        if (path == null) {
             throw new IllegalArgumentException("Unsupported action: " + action.name());
         }
+
+        return path;
     }
 
     @Override
@@ -98,10 +113,4 @@ public class HttpTaskProducer
         task.call();
     }
 
-    private HttpNotificationTask createNotificationTask(Collection<String> itemIds, ItemAction action) {
-        String userId = SessionContextProviderFactory.getInstance().createInterface().get().getUser().getUserId();
-        String notificationEndpoint = notifyCatalogUrl + getEndpoint(action);
-        LOGGER.debug("Catalog notification URL: " + notificationEndpoint);
-        return new HttpNotificationTask(notificationEndpoint, userId, itemIds);
-    }
 }
index 900fc94..6bfa8b2 100644 (file)
 package org.openecomp.sdcrests.item.rest.services.catalog.notification;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.openecomp.sdcrests.item.rest.services.catalog.notification.AsyncNotifier.NextAction.DONE;
 import static org.openecomp.sdcrests.item.rest.services.catalog.notification.AsyncNotifier.NextAction.RETRY;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
 import org.apache.commons.lang.mutable.MutableInt;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
+import org.openecomp.sdcrests.item.types.ItemAction;
 
 /**
  * @author evitaliy
@@ -36,6 +47,50 @@ import org.mockito.stubbing.Answer;
  */
 public class AsyncNotifierTest {
 
+    private static final String NUMBER_OF_RETRIES_MESSAGE = "Number of retries must be positive";
+    private static final String DELAY_MESSAGE = "Delay must be positive";
+
+    @Rule
+    public ExpectedException exception = ExpectedException.none();
+
+    @Test(expected = NullPointerException.class)
+    public void errorWhenWorkerNull() {
+        new AsyncNotifier.RetryingTask(null, 10, 10, Mockito.mock(ScheduledExecutorService.class));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void errorWhenSchedulerServiceNull() {
+        new AsyncNotifier.RetryingTask(() -> DONE, 10, 10, null);
+    }
+
+    @Test
+    public void errorWhenNumberOfRetriesNegative() {
+        exception.expect(IllegalArgumentException.class);
+        exception.expectMessage(NUMBER_OF_RETRIES_MESSAGE);
+        new AsyncNotifier.RetryingTask(() -> DONE, -12, 10, Mockito.mock(ScheduledExecutorService.class));
+    }
+
+    @Test
+    public void errorWhenNumberOfRetriesZero() {
+        exception.expect(IllegalArgumentException.class);
+        exception.expectMessage(NUMBER_OF_RETRIES_MESSAGE);
+        new AsyncNotifier.RetryingTask(() -> DONE, 0, 10, Mockito.mock(ScheduledExecutorService.class));
+    }
+
+    @Test
+    public void errorWhenDelayNegative() {
+        exception.expect(IllegalArgumentException.class);
+        exception.expectMessage(DELAY_MESSAGE);
+        new AsyncNotifier.RetryingTask(() -> DONE, 1, -77, Mockito.mock(ScheduledExecutorService.class));
+    }
+
+    @Test
+    public void errorWhenDelayZero() {
+        exception.expect(IllegalArgumentException.class);
+        exception.expectMessage(DELAY_MESSAGE);
+        new AsyncNotifier.RetryingTask(() -> DONE, 1, 0, Mockito.mock(ScheduledExecutorService.class));
+    }
+
     @Test
     public void taskRunsOnceWhenSuccessful() throws Exception {
 
@@ -53,6 +108,20 @@ public class AsyncNotifierTest {
         assertEquals(1, counter.intValue());
     }
 
+    private ScheduledExecutorService createMockScheduledExecutor() {
+
+        ScheduledExecutorService executorServiceMock = Mockito.mock(ScheduledExecutorService.class);
+        Answer passThrough = invocation -> {
+            ((Callable<?>) invocation.getArgument(0)).call();
+            return null;
+        };
+
+        Mockito.doAnswer(passThrough).when(executorServiceMock).submit(any(Callable.class));
+        Mockito.doAnswer(passThrough).when(executorServiceMock)
+                .schedule(any(Callable.class), anyLong(), any(TimeUnit.class));
+        return executorServiceMock;
+    }
+
     @Test
     public void taskRunsTwiceWhenFailedFirstTime() throws Exception {
 
@@ -88,17 +157,26 @@ public class AsyncNotifierTest {
         assertEquals(numOfRetries, counter.intValue());
     }
 
-    private ScheduledExecutorService createMockScheduledExecutor() {
+    @Test
+    public void workerExecutedWithGivenItemIdsAndAction()
+            throws InterruptedException, ExecutionException, TimeoutException {
 
-        ScheduledExecutorService executorServiceMock = Mockito.mock(ScheduledExecutorService.class);
-        Answer passThrough = invocation -> {
-            ((Callable<?>) invocation.getArgument(0)).call();
-            return null;
+        CompletableFuture<Boolean> completed = new CompletableFuture<>();
+        Callable<AsyncNotifier.NextAction> mockTask = () -> {
+            completed.complete(true);
+            return DONE;
         };
 
-        Mockito.doAnswer(passThrough).when(executorServiceMock).submit(any(Callable.class));
-        Mockito.doAnswer(passThrough).when(executorServiceMock)
-                .schedule(any(Callable.class), anyLong(), any(TimeUnit.class));
-        return executorServiceMock;
+        final Collection<String> itemIds = Collections.singleton(UUID.randomUUID().toString());
+        final ItemAction action = ItemAction.RESTORE;
+
+        BiFunction<Collection<String>, ItemAction, Callable<AsyncNotifier.NextAction>> mockProducer = (i, a) -> {
+            assertEquals(itemIds, i);
+            assertEquals(action, a);
+            return mockTask;
+        };
+
+        new AsyncNotifier(mockProducer, 1, 1).execute(itemIds, action);
+        assertTrue(completed.get(5, TimeUnit.SECONDS));
     }
 }
\ No newline at end of file
index 3c12b37..4d8a110 100644 (file)
 package org.openecomp.sdcrests.item.rest.services.catalog.notification.http;
 
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
+import java.util.HashSet;
+import java.util.Set;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.openecomp.sdcrests.item.rest.services.catalog.notification.EntryNotConfiguredException;
+import org.openecomp.sdcrests.item.types.ItemAction;
 
 /**
  * @author evitaliy
@@ -32,6 +37,30 @@ public class HttpTaskProducerTest {
     @Rule
     public ExpectedException exception = ExpectedException.none();
 
+    @Test
+    public void uniquePathExistsForEveryAction() {
+
+        ItemAction[] availableActions = ItemAction.values();
+        Set<String> collectedPaths = new HashSet<>(availableActions.length);
+        for (ItemAction action : availableActions) {
+            String path = HttpTaskProducer.getApiPath(action);
+            assertFalse("Path empty for action '" + action.name() + "'", path == null || path.isEmpty());
+            collectedPaths.add(path);
+        }
+
+        assertEquals("Paths not unique for some actions", availableActions.length, collectedPaths.size());
+    }
+
+    @Test
+    public void restorePathEqualsRestored() {
+        assertEquals("restored", HttpTaskProducer.getApiPath(ItemAction.RESTORE));
+    }
+
+    @Test
+    public void archivePathEqualsArchived() {
+        assertEquals("archived", HttpTaskProducer.getApiPath(ItemAction.ARCHIVE));
+    }
+
     @Test
     public void errorWhenProtocolNotDefined() {
         HttpConfiguration config = mockConfiguration();
index 68f7804..8654339 100644 (file)
@@ -49,7 +49,7 @@ public class ConfigurationManagerTest {
     public void testGetInstanceSystemProperty() throws Throwable {
 
         expectedException.expect(IOException.class);
-        expectedException.expectMessage((NON_EXISTENT));
+        expectedException.expectMessage(containsString(NON_EXISTENT));
 
         try (ConfigurationSystemPropertyUpdater updater = new ConfigurationSystemPropertyUpdater(NON_EXISTENT)) {
             ConfigurationManager.getInstance();