Adding to unit test coverage 96/90396/4
authorefiacor <fiachra.corcoran@est.tech>
Tue, 25 Jun 2019 11:01:10 +0000 (11:01 +0000)
committerefiacor <fiachra.corcoran@est.tech>
Tue, 25 Jun 2019 11:01:10 +0000 (11:01 +0000)
Change-Id: Id8d07c94bb70952fb317bfc7824fc0d587087468
Issue-ID: DMAAP-1203
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
datarouter-node/pom.xml
datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/LogManagerTest.java
datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java
datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/Poker.java
datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/ProxyServlet.java
datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java
datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/DrServletTestBase.java
datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTaskTest.java [new file with mode: 0755]
datarouter-prov/src/test/resources/create.sql
datarouter-prov/src/test/resources/h2Database.properties
datarouter-prov/src/test/resources/prov_data.json [new file with mode: 0644]

index a4de6f7..93c8686 100755 (executable)
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
         </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
index b95fb36..c890ed5 100644 (file)
@@ -26,7 +26,9 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Timer;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -55,14 +57,9 @@ public class LogManagerTest {
     }
 
     @After
-    public void tearDown() {
+    public void tearDown() throws IOException {
         File spoolDir = new File(System.getProperty("user.dir") + "/src/test/resources/.spool");
-        for (File file : spoolDir.listFiles()) {
-            if (file.exists()) {
-                file.delete();
-            }
-        }
-        spoolDir.delete();
+        FileUtils.deleteDirectory(spoolDir);
     }
 
     @Test
index 7475b6b..ef106ab 100755 (executable)
@@ -242,7 +242,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
     private static String provName = "feeds-drtr.web.att.com";
 
     /**
-     * The standard FQDN of the ACTIVE provisioning server in this Data Router ecosystem
+     * The standard FQDN of the ACTIVE_POD provisioning server in this Data Router ecosystem
      */
     private static String activeProvName = "feeds-drtr.web.att.com";
 
@@ -719,7 +719,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
     }
 
     /**
-     * Gets the FQDN of the initially ACTIVE provisioning server (POD). Note: this used to be called isActivePOD(),
+     * Gets the FQDN of the initially ACTIVE_POD provisioning server (POD). Note: this used to be called isActivePOD(),
      * however, that is a misnomer, as the active status could shift to the standby POD without these parameters
      * changing.  Hence, the function names have been changed to more accurately reflect their purpose.
      *
@@ -730,7 +730,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
     }
 
     /**
-     * Gets the FQDN of the initially STANDBY provisioning server (POD). Note: this used to be called isStandbyPOD(),
+     * Gets the FQDN of the initially STANDBY_POD provisioning server (POD). Note: this used to be called isStandbyPOD(),
      * however, that is a misnomer, as the standby status could shift to the active POD without these parameters
      * changing.  Hence, the function names have been changed to more accurately reflect their purpose.
      *
index 8afa18a..6cb8520 100644 (file)
@@ -86,8 +86,8 @@ public class Poker extends TimerTask {
         try {\r
             thisPod = InetAddress.getLocalHost().getHostName();\r
         } catch (UnknownHostException e) {\r
-            thisPod = "*UNKNOWN*"; // not a major problem\r
-            logger.info("UnknownHostException: Setting thisPod to \"*UNKNOWN*\"", e);\r
+            thisPod = "*UNKNOWN_POD*"; // not a major problem\r
+            logger.info("UnknownHostException: Setting thisPod to \"*UNKNOWN_POD*\"", e);\r
         }\r
         provString = buildProvisioningString();\r
 \r
index 7791777..7542360 100755 (executable)
@@ -146,7 +146,7 @@ public class ProxyServlet extends BaseServlet {
      */
     public boolean isProxyServer() {
         SynchronizerTask st = SynchronizerTask.getSynchronizer();
-        return st.getState() == SynchronizerTask.STANDBY;
+        return st.getPodState() == SynchronizerTask.STANDBY_POD;
     }
 
     /**
index d3ae4fe..8c5a49a 100644 (file)
 
 package org.onap.dmaap.datarouter.provisioning;
 
+import static org.onap.dmaap.datarouter.provisioning.BaseServlet.TEXT_CT;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -50,8 +55,6 @@ import java.util.TreeSet;
 
 import javax.servlet.http.HttpServletResponse;
 
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpGet;
@@ -85,7 +88,7 @@ import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
  * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
  * the active (master) POD.</li>
  * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
- * <li>Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN)
+ * <li>Providing information to other parts of the system as to the current role (ACTIVE_POD, STANDBY_POD, UNKNOWN_POD)
  * of this POD.</li>
  * </ol>
  * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
@@ -99,167 +102,152 @@ import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
 public class SynchronizerTask extends TimerTask {
 
     /**
-     * This is a singleton -- there is only one SynchronizerTask object in the server
+     * This is a singleton -- there is only one SynchronizerTask object in the server.
      */
     private static SynchronizerTask synctask;
 
     /**
-     * This POD is unknown -- not on the list of PODs
+     * This POD is unknown -- not on the list of PODs.
      */
-    public static final int UNKNOWN = 0;
+    public static final int UNKNOWN_POD = 0;
     /**
-     * This POD is active -- on the list of PODs, and the DNS CNAME points to us
+     * This POD is active -- on the list of PODs, and the DNS CNAME points to us.
      */
-    public static final int ACTIVE = 1;
+    public static final int ACTIVE_POD = 1;
     /**
-     * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us
+     * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us.
      */
-    public static final int STANDBY = 2;
-    private static final String[] stnames = {"UNKNOWN", "ACTIVE", "STANDBY"};
+    public static final int STANDBY_POD = 2;
+
+    private static final String[] stnames = {"UNKNOWN_POD", "ACTIVE_POD", "STANDBY_POD"};
     private static final long ONE_HOUR = 60 * 60 * 1000L;
 
+    private long nextMsg = 0;    // only display the "Current podState" msg every 5 mins.
+
     private final EELFLogger logger;
     private final Timer rolex;
     private final String spooldir;
-    private int state;
+    private int podState;
     private boolean doFetch;
     private long nextsynctime;
     private AbstractHttpClient httpclient = null;
 
-    /**
-     * Get the singleton SynchronizerTask object.
-     *
-     * @return the SynchronizerTask
-     */
-    public static synchronized SynchronizerTask getSynchronizer() {
-        if (synctask == null) {
-            synctask = new SynchronizerTask();
-        }
-        return synctask;
-    }
-
     @SuppressWarnings("deprecation")
     private SynchronizerTask() {
         logger = EELFManager.getInstance().getLogger("InternalLog");
         rolex = new Timer();
         spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
-        state = UNKNOWN;
+        podState = UNKNOWN_POD;
         doFetch = true;        // start off with a fetch
         nextsynctime = 0;
 
-        logger.info("PROV5000: Sync task starting, server state is UNKNOWN");
+        logger.info("PROV5000: Sync task starting, server podState is UNKNOWN_POD");
         try {
             Properties props = (new DB()).getProperties();
             String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");
             String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);
             String pass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
             KeyStore keyStore = KeyStore.getInstance(type);
-            try(FileInputStream instream = new FileInputStream(new File(store))) {
+            try (FileInputStream instream = new FileInputStream(new File(store))) {
                 keyStore.load(instream, pass.toCharArray());
 
             }
-                store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
-                pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY);
-                KeyStore trustStore = null;
-                if (store != null && store.length() > 0) {
-                    trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
-                    try(FileInputStream instream = new FileInputStream(new File(store))){
-                        trustStore.load(instream, pass.toCharArray());
+            store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
+            pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY);
+            KeyStore trustStore = null;
+            if (store != null && store.length() > 0) {
+                trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+                try (FileInputStream instream = new FileInputStream(new File(store))) {
+                    trustStore.load(instream, pass.toCharArray());
 
-                    }
                 }
+            }
 
             // We are connecting with the node name, but the certificate will have the CNAME
             // So we need to accept a non-matching certificate name
-            String keystorepass = props.getProperty(
-                Main.KEYSTORE_PASS_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref
-           try(AbstractHttpClient hc = new DefaultHttpClient()) {
-               SSLSocketFactory socketFactory =
-                       (trustStore == null)
-                               ? new SSLSocketFactory(keyStore, keystorepass)
-                               : new SSLSocketFactory(keyStore, keystorepass, trustStore);
-               socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
-               Scheme sch = new Scheme("https", 443, socketFactory);
-               hc.getConnectionManager().getSchemeRegistry().register(sch);
-            httpclient = hc;
-           }
-            // Run once every 5 seconds to check DNS, etc.
-            long interval = 0;
-            try {
-                String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
-                interval = Long.parseLong(s);
-            } catch (NumberFormatException e) {
-                interval = 5000L;
+            String keystorepass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
+            try (AbstractHttpClient hc = new DefaultHttpClient()) {
+                SSLSocketFactory socketFactory =
+                        (trustStore == null)
+                                ? new SSLSocketFactory(keyStore, keystorepass)
+                                : new SSLSocketFactory(keyStore, keystorepass, trustStore);
+                socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+                Scheme sch = new Scheme("https", 443, socketFactory);
+                hc.getConnectionManager().getSchemeRegistry().register(sch);
+                httpclient = hc;
             }
-            rolex.scheduleAtFixedRate(this, 0L, interval);
+            setSynchTimer(props);
         } catch (Exception e) {
             logger.warn("PROV5005: Problem starting the synchronizer: " + e);
         }
     }
 
+    private void setSynchTimer(Properties props) {
+        // Run once every 5 seconds to check DNS, etc.
+        long interval;
+        try {
+            String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
+            interval = Long.parseLong(s);
+        } catch (NumberFormatException e) {
+            interval = 5000L;
+        }
+        rolex.scheduleAtFixedRate(this, 0L, interval);
+    }
+
+    /**
+     * Get the singleton SynchronizerTask object.
+     *
+     * @return the SynchronizerTask
+     */
+    public static synchronized SynchronizerTask getSynchronizer() {
+        if (synctask == null) {
+            synctask = new SynchronizerTask();
+        }
+        return synctask;
+    }
+
     /**
-     * What is the state of this POD?
+     * What is the podState of this POD?.
      *
-     * @return one of ACTIVE, STANDBY, UNKNOWN
+     * @return one of ACTIVE_POD, STANDBY_POD, UNKNOWN_POD
      */
-    public int getState() {
-        return state;
+    public int getPodState() {
+        return podState;
     }
 
     /**
-     * Is this the active POD?
+     * Is this the active POD?.
      *
      * @return true if we are active (the master), false otherwise
      */
     public boolean isActive() {
-        return state == ACTIVE;
+        return podState == ACTIVE_POD;
     }
 
     /**
      * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we
      * should re-synchronize with the master.
      */
-    public void doFetch() {
+    void doFetch() {
         doFetch = true;
     }
 
     /**
      * Runs once a minute in order to <ol>
      * <li>lookup DNS names,</li>
-     * <li>determine the state of this POD,</li>
-     * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.</li>
+     * <li>determine the podState of this POD,</li>
+     * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of podState from the active POD.</li>
      * <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
-     * </ol>
+     * </ol>.
      */
     @Override
     public void run() {
         try {
-            state = lookupState();
-            if (state == STANDBY) {
+            podState = lookupState();
+            if (podState == STANDBY_POD) {
                 // Only copy provisioning data FROM the active server TO the standby
                 if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
-                    logger.debug("Initiating a sync...");
-                    JSONObject jo = readProvisioningJSON();
-                    if (jo != null) {
-                        doFetch = false;
-                        syncFeeds(jo.getJSONArray("feeds"));
-                        syncSubs(jo.getJSONArray("subscriptions"));
-                        syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
-                        syncParams(jo.getJSONObject("parameters"));
-                        // The following will not be present in a version=1.0 provfeed
-                        JSONArray ja = jo.optJSONArray("ingress");
-                        if (ja != null) {
-                            syncIngressRoutes(ja);
-                        }
-                        JSONObject j2 = jo.optJSONObject("egress");
-                        if (j2 != null) {
-                            syncEgressRoutes(j2);
-                        }
-                        ja = jo.optJSONArray("routing");
-                        if (ja != null) {
-                            syncNetworkRoutes(ja);
-                        }
-                    }
+                    syncProvisioningData();
                     logger.info("PROV5013: Sync completed.");
                     nextsynctime = System.currentTimeMillis() + ONE_HOUR;
                 }
@@ -278,7 +266,7 @@ public class SynchronizerTask extends TimerTask {
                 remote.andNot(local);
                 if (!remote.isEmpty()) {
                     logger.debug(" Replicating logs: " + remote);
-                    replicateDRLogs(remote);
+                    replicateDataRouterLogs(remote);
                 }
             }
         } catch (Exception e) {
@@ -286,14 +274,39 @@ public class SynchronizerTask extends TimerTask {
         }
     }
 
+    private void syncProvisioningData() {
+        logger.debug("Initiating a sync...");
+        JSONObject jo = readProvisioningJson();
+        if (jo != null) {
+            doFetch = false;
+            syncFeeds(jo.getJSONArray("feeds"));
+            syncSubs(jo.getJSONArray("subscriptions"));
+            syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
+            syncParams(jo.getJSONObject("parameters"));
+            // The following will not be present in a version=1.0 provfeed
+            JSONArray ja = jo.optJSONArray("ingress");
+            if (ja != null) {
+                syncIngressRoutes(ja);
+            }
+            JSONObject j2 = jo.optJSONObject("egress");
+            if (j2 != null) {
+                syncEgressRoutes(j2);
+            }
+            ja = jo.optJSONArray("routing");
+            if (ja != null) {
+                syncNetworkRoutes(ja);
+            }
+        }
+    }
+
     /**
-     * This method is used to lookup the CNAME that points to the active server. It returns 0 (UNKNOWN), 1(ACTIVE), or 2
-     * (STANDBY) to indicate the state of this server.
+     * This method is used to lookup the CNAME that points to the active server.
+     * It returns 0 (UNKNOWN_POD), 1(ACTIVE_POD), or (STANDBY_POD) to indicate the podState of this server.
      *
-     * @return the current state
+     * @return the current podState
      */
-    private int lookupState() {
-        int newstate = UNKNOWN;
+    int lookupState() {
+        int newPodState = UNKNOWN_POD;
         try {
             InetAddress myaddr = InetAddress.getLocalHost();
             if (logger.isTraceEnabled()) {
@@ -303,9 +316,9 @@ public class SynchronizerTask extends TimerTask {
             Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods()));
             if (pods.contains(thisPod)) {
                 InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName());
-                newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY;
+                newPodState = myaddr.equals(pserver) ? ACTIVE_POD : STANDBY_POD;
                 if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) {
-                    logger.debug("Active POD = " + pserver + ", Current state is " + stnames[newstate]);
+                    logger.debug("Active POD = " + pserver + ", Current podState is " + stnames[newPodState]);
                     nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L);
                 }
             } else {
@@ -315,15 +328,13 @@ public class SynchronizerTask extends TimerTask {
             logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e);
         }
 
-        if (newstate != state) {
-            logger
-                .info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate]));
+        if (newPodState != podState) {
+            logger.info(String.format("PROV5001: Server podState changed from %s to %s",
+                    stnames[podState], stnames[newPodState]));
         }
-        return newstate;
+        return newPodState;
     }
 
-    private static long nextMsg = 0;    // only display the "Current state" msg every 5 mins.
-
     /**
      * Synchronize the Feeds in the JSONArray, with the Feeds in the DB.
      */
@@ -396,7 +407,7 @@ public class SynchronizerTask extends TimerTask {
                 try {
                     v = "" + jo.getInt(k);
                 } catch (JSONException e1) {
-                    logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e);
+                    logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e1);
                     JSONArray ja = jo.getJSONArray(k);
                     for (int i = 0; i < ja.length(); i++) {
                         if (i > 0) {
@@ -455,7 +466,7 @@ public class SynchronizerTask extends TimerTask {
                 NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
                 coll.add(nr);
             } catch (JSONException e) {
-                logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n));
+                logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n), e);
             }
         }
         if (sync(coll, NetworkRoute.getAllNetworkRoutes())) {
@@ -477,29 +488,11 @@ public class SynchronizerTask extends TimerTask {
                 Syncable newobj = newmap.get(n);
                 Syncable oldobj = oldmap.get(n);
                 if (oldobj == null) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("  Inserting record: " + newobj);
-                    }
-                    newobj.doInsert(conn);
-                    changes = true;
+                    changes = insertRecord(conn, newobj);
                 } else if (newobj == null) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("  Deleting record: " + oldobj);
-                    }
-                    oldobj.doDelete(conn);
-                    changes = true;
+                    changes = deleteRecord(conn, oldobj);
                 } else if (!newobj.equals(oldobj)) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("  Updating record: " + newobj);
-                    }
-                    newobj.doUpdate(conn);
-
-                    /**Rally US708115
-                     * Change Ownership of FEED - 1610, Syncronised with secondary DB.
-                     * */
-                    checkChnageOwner(newobj, oldobj);
-
-                    changes = true;
+                    changes = updateRecord(conn, newobj, oldobj);
                 }
             }
             db.release(conn);
@@ -509,6 +502,30 @@ public class SynchronizerTask extends TimerTask {
         return changes;
     }
 
+    private boolean updateRecord(Connection conn, Syncable newobj, Syncable oldobj) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("  Updating record: " + newobj);
+        }
+        boolean changes = newobj.doUpdate(conn);
+        checkChangeOwner(newobj, oldobj);
+
+        return changes;
+    }
+
+    private boolean deleteRecord(Connection conn, Syncable oldobj) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("  Deleting record: " + oldobj);
+        }
+        return oldobj.doDelete(conn);
+    }
+
+    private boolean insertRecord(Connection conn, Syncable newobj) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("  Inserting record: " + newobj);
+        }
+        return newobj.doInsert(conn);
+    }
+
     private Map<String, Syncable> getMap(Collection<? extends Syncable> c) {
         Map<String, Syncable> map = new HashMap<>();
         for (Syncable v : c) {
@@ -517,18 +534,18 @@ public class SynchronizerTask extends TimerTask {
         return map;
     }
 
-    /**Change owner of FEED/SUBSCRIPTION*/
     /**
+     * Change owner of FEED/SUBSCRIPTION.
      * Rally US708115 Change Ownership of FEED - 1610
      */
-    private void checkChnageOwner(Syncable newobj, Syncable oldobj) {
+    private void checkChangeOwner(Syncable newobj, Syncable oldobj) {
         if (newobj instanceof Feed) {
             Feed oldfeed = (Feed) oldobj;
             Feed newfeed = (Feed) newobj;
 
             if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) {
-                logger.info("PROV5013 -  Previous publisher: " + oldfeed.getPublisher() + ": New publisher-" + newfeed
-                    .getPublisher());
+                logger.info("PROV5013 -  Previous publisher: "
+                                    + oldfeed.getPublisher() + ": New publisher-" + newfeed.getPublisher());
                 oldfeed.setPublisher(newfeed.getPublisher());
                 oldfeed.changeOwnerShip();
             }
@@ -537,8 +554,8 @@ public class SynchronizerTask extends TimerTask {
             Subscription newsub = (Subscription) newobj;
 
             if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) {
-                logger.info("PROV5013 -  Previous subscriber: " + oldsub.getSubscriber() + ": New subscriber-" + newsub
-                    .getSubscriber());
+                logger.info("PROV5013 -  Previous subscriber: "
+                                    + oldsub.getSubscriber() + ": New subscriber-" + newsub.getSubscriber());
                 oldsub.setSubscriber(newsub.getSubscriber());
                 oldsub.changeOwnerShip();
             }
@@ -551,26 +568,26 @@ public class SynchronizerTask extends TimerTask {
      *
      * @return the provisioning data (as a JONObject)
      */
-    private synchronized JSONObject readProvisioningJSON() {
+    private synchronized JSONObject readProvisioningJson() {
         String url = URLUtilities.generatePeerProvURL();
         HttpGet get = new HttpGet(url);
         try {
             HttpResponse response = httpclient.execute(get);
             int code = response.getStatusLine().getStatusCode();
             if (code != HttpServletResponse.SC_OK) {
-                logger.warn("PROV5010: readProvisioningJSON failed, bad error code: " + code);
+                logger.warn("PROV5010: readProvisioningJson failed, bad error code: " + code);
                 return null;
             }
             HttpEntity entity = response.getEntity();
             String ctype = entity.getContentType().getValue().trim();
-            if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype
-                .equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
-                logger.warn("PROV5011: readProvisioningJSON failed, bad content type: " + ctype);
+            if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1)
+                        && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
+                logger.warn("PROV5011: readProvisioningJson failed, bad content type: " + ctype);
                 return null;
             }
             return new JSONObject(new JSONTokener(entity.getContent()));
         } catch (Exception e) {
-            logger.warn("PROV5012: readProvisioningJSON failed, exception: " + e);
+            logger.warn("PROV5012: readProvisioningJson failed, exception: " + e);
             return null;
         } finally {
             get.releaseConnection();
@@ -583,7 +600,7 @@ public class SynchronizerTask extends TimerTask {
      *
      * @return the bitset
      */
-    private RLEBitSet readRemoteLoglist() {
+    RLEBitSet readRemoteLoglist() {
         RLEBitSet bs = new RLEBitSet();
         String url = URLUtilities.generatePeerLogsURL();
 
@@ -594,7 +611,7 @@ public class SynchronizerTask extends TimerTask {
         //End of fix.
 
         HttpGet get = new HttpGet(url);
-        try {
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
             HttpResponse response = httpclient.execute(get);
             int code = response.getStatusLine().getStatusCode();
             if (code != HttpServletResponse.SC_OK) {
@@ -603,13 +620,12 @@ public class SynchronizerTask extends TimerTask {
             }
             HttpEntity entity = response.getEntity();
             String ctype = entity.getContentType().getValue().trim();
-            if (!"text/plain".equals(ctype)) {
+            if (!TEXT_CT.equals(ctype)) {
                 logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype);
                 return bs;
             }
             InputStream is = entity.getContent();
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            int ch = 0;
+            int ch;
             while ((ch = is.read()) >= 0) {
                 bos.write(ch);
             }
@@ -630,12 +646,12 @@ public class SynchronizerTask extends TimerTask {
      *
      * @param bs the bitset (an RELBitSet) of log records to fetch
      */
-    private void replicateDRLogs(RLEBitSet bs) {
+    void replicateDataRouterLogs(RLEBitSet bs) {
         String url = URLUtilities.generatePeerLogsURL();
         HttpPost post = new HttpPost(url);
         try {
             String t = bs.toString();
-            HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain"));
+            HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create(TEXT_CT));
             post.setEntity(body);
             if (logger.isDebugEnabled()) {
                 logger.debug("Requesting records: " + t);
@@ -644,13 +660,13 @@ public class SynchronizerTask extends TimerTask {
             HttpResponse response = httpclient.execute(post);
             int code = response.getStatusLine().getStatusCode();
             if (code != HttpServletResponse.SC_OK) {
-                logger.warn("PROV5010: replicateDRLogs failed, bad error code: " + code);
+                logger.warn("PROV5010: replicateDataRouterLogs failed, bad error code: " + code);
                 return;
             }
             HttpEntity entity = response.getEntity();
             String ctype = entity.getContentType().getValue().trim();
-            if (!"text/plain".equals(ctype)) {
-                logger.warn("PROV5011: replicateDRLogs failed, bad content type: " + ctype);
+            if (!TEXT_CT.equals(ctype)) {
+                logger.warn("PROV5011: replicateDataRouterLogs failed, bad content type: " + ctype);
                 return;
             }
 
@@ -661,7 +677,7 @@ public class SynchronizerTask extends TimerTask {
             Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
             logger.info("Approximately " + bs.cardinality() + " records replicated.");
         } catch (Exception e) {
-            logger.warn("PROV5012: replicateDRLogs failed, exception: " + e);
+            logger.warn("PROV5012: replicateDataRouterLogs failed, exception: " + e);
         } finally {
             post.releaseConnection();
         }
index bad6e2c..e2076b9 100644 (file)
@@ -50,7 +50,7 @@ public class DrServletTestBase {
         FieldUtils.writeDeclaredStaticField(DB.class, "props", props, true);
         FieldUtils.writeDeclaredStaticField(BaseServlet.class, "startmsgFlag", false, true);
         SynchronizerTask synchronizerTask = mock(SynchronizerTask.class);
-        when(synchronizerTask.getState()).thenReturn(SynchronizerTask.UNKNOWN);
+        when(synchronizerTask.getPodState()).thenReturn(SynchronizerTask.UNKNOWN_POD);
         FieldUtils.writeDeclaredStaticField(BaseServlet.class, "synctask", synchronizerTask, true);
     }
 
diff --git a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTaskTest.java b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTaskTest.java
new file mode 100755 (executable)
index 0000000..79d8389
--- /dev/null
@@ -0,0 +1,212 @@
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ *  * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+package org.onap.dmaap.datarouter.provisioning;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import com.att.eelf.configuration.EELFManager;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.conn.ssl.SSLSocketFactory;
+import org.apache.http.impl.client.AbstractHttpClient;
+import org.apache.http.message.BasicHeader;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;
+import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;
+import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.net.ssl.*")
+@PrepareForTest({BaseServlet.class, URLUtilities.class})
+public class SynchronizerTaskTest {
+
+    @Mock
+    private AbstractHttpClient httpClient;
+
+    @Mock
+    private HttpEntity httpEntity;
+
+    @Mock
+    private StatusLine statusLine;
+
+    @Mock
+    private CloseableHttpResponse response;
+
+    @Mock
+    private ByteArrayOutputStream byteArrayOutputStream;
+
+    private SynchronizerTask synchronizerTask;
+
+    private ExecutorService executorService;
+
+    private static EntityManagerFactory emf;
+    private static EntityManager em;
+
+    @BeforeClass
+    public static void init() {
+        emf = Persistence.createEntityManagerFactory("dr-unit-tests");
+        em = emf.createEntityManager();
+        System.setProperty(
+                "org.onap.dmaap.datarouter.provserver.properties",
+                "src/test/resources/h2Database.properties");
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+        em.clear();
+        em.close();
+        emf.close();
+    }
+
+
+    @Before
+    public void setUp() throws IllegalAccessException, UnknownHostException {
+        SSLSocketFactory sslSocketFactory = mock(SSLSocketFactory.class);
+        doNothing().when(sslSocketFactory).setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+
+        PowerMockito.mockStatic(BaseServlet.class);
+        PowerMockito.mockStatic(URLUtilities.class);
+        when(BaseServlet.getPods()).thenReturn(new String[] {InetAddress.getLocalHost().getHostName(), "stand-by-prov"});
+        when(URLUtilities.generatePeerProvURL()).thenReturn("https://stand-by-prov/internal/prov");
+        when(URLUtilities.generatePeerLogsURL()).thenReturn("https://stand-by-prov/internal/drlogs");
+
+        synchronizerTask = Mockito.spy(SynchronizerTask.getSynchronizer());
+        doReturn(2).when(synchronizerTask).lookupState();
+
+        executorService = Executors.newSingleThreadExecutor();
+        executorService.execute(synchronizerTask);
+    }
+
+    @After
+    public void tearDown() throws InterruptedException {
+        executorService.shutdown();
+        executorService.awaitTermination(2, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void Given_Synch_Task_readRemoteLoglist_Called_And_Valid_BitSet_Returned_Success() throws Exception {
+        mockHttpClientForGetRequest();
+        Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200);
+        Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "text/plain"));
+        Mockito.when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream("1-55251".getBytes()));
+        RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist();
+        Assert.assertNotNull(rleBitSet);
+    }
+
+    @Test
+    public void Given_Synch_Task_readRemoteLoglist_Called_And_Invalid_Resonse_Code_Failure() throws Exception {
+        mockHttpClientForGetRequest();
+        Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(404);
+        RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist();
+        Assert.assertNotNull(rleBitSet);
+    }
+
+    @Test
+    public void Given_Synch_Task_readRemoteLoglist_Called_And_Invalid_Content_Type_Failure() throws Exception {
+        mockHttpClientForGetRequest();
+        Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200);
+        Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "invalid_content_type"));
+        RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist();
+        Assert.assertNotNull(rleBitSet);
+    }
+
+    @Test
+    public void Given_Synch_Task_replicateDataRouterLogs_Called_And_Valid_BitSet_Returned_Success() throws Exception {
+        mockHttpClientForGetRequest();
+        Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200);
+        Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "text/plain"));
+        RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist();
+        synchronizerTask.replicateDataRouterLogs(rleBitSet);
+    }
+
+    @Test
+    public void Given_Synch_Task_replicateDataRouterLogs_Called_And_Invalid_Content_Type_Failure() throws Exception {
+        mockHttpClientForGetRequest();
+        Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200);
+        Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "invalid_content_type"));
+        RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist();
+        synchronizerTask.replicateDataRouterLogs(rleBitSet);
+    }
+
+    @Test
+    public void Given_Synch_Task_replicateDataRouterLogs_Called_And_Invalid_Resonse_Code_Failure() throws Exception {
+        mockHttpClientForGetRequest();
+        Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(404);
+        RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist();
+        synchronizerTask.replicateDataRouterLogs(rleBitSet);
+    }
+
+    @Test
+    public void Given_Synch_Task_Is_Started_And_LogFileLoader_Is_Idle_Then_Standby_Pod_Synch_Is_Successful() throws Exception {
+        mockHttpClientForGetRequest();
+        Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200);
+        Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "application/vnd.dmaap-dr.provfeed-full; version=1.0"));
+        mockResponseFromGet();
+    }
+
+
+    private void mockHttpClientForGetRequest() throws Exception {
+        FieldUtils.writeField(synchronizerTask, "httpclient", httpClient, true);
+        Mockito.when(httpClient.execute(anyObject())).thenReturn(response);
+        Mockito.when(response.getEntity()).thenReturn(httpEntity);
+        Mockito.when(response.getStatusLine()).thenReturn(statusLine);
+
+    }
+
+    private void mockResponseFromGet() throws IOException {
+        InputStream in = getClass().getClassLoader().getResourceAsStream("prov_data.json");
+        Mockito.when(httpEntity.getContent()).thenReturn(in);
+    }
+}
index 7c10672..a811847 100755 (executable)
@@ -185,6 +185,9 @@ VALUES (1,1,'user',null,2);
 insert into INGRESS_ROUTES(SEQUENCE, FEEDID , USERID, SUBNET, NODESET)
 VALUES (2,1,'user',null,2);
 
+insert into NODESETS(SETID, NODEID)
+VALUES (1,1);
+
 insert into NODESETS(SETID, NODEID)
 VALUES (2,2);
 
index fee9c68..9c63aea 100755 (executable)
@@ -29,4 +29,11 @@ org.onap.dmaap.datarouter.provserver.https.relaxation      = false
 org.onap.dmaap.datarouter.provserver.accesslog.dir         = unit-test-logs
 org.onap.dmaap.datarouter.provserver.spooldir              = unit-test-logs/spool
 org.onap.dmaap.datarouter.provserver.localhost             = 127.0.0.1
-org.onap.dmaap.datarouter.provserver.passwordencryption    = PasswordEncryptionKey#@$%^&1234#
\ No newline at end of file
+org.onap.dmaap.datarouter.provserver.passwordencryption    = PasswordEncryptionKey#@$%^&1234#
+
+org.onap.dmaap.datarouter.provserver.keystore.type       = jks
+org.onap.dmaap.datarouter.provserver.keymanager.password = FZNkU,B%NJzcT1v7;^v]M#ZX
+org.onap.dmaap.datarouter.provserver.keystore.path       = aaf_certs/org.onap.dmaap-dr.jks
+org.onap.dmaap.datarouter.provserver.keystore.password   = FZNkU,B%NJzcT1v7;^v]M#ZX
+org.onap.dmaap.datarouter.provserver.truststore.path     = aaf_certs/org.onap.dmaap-dr.trust.jks
+org.onap.dmaap.datarouter.provserver.truststore.password = +mzf@J.D^;3!![*Xr.z$c#?b
\ No newline at end of file
diff --git a/datarouter-prov/src/test/resources/prov_data.json b/datarouter-prov/src/test/resources/prov_data.json
new file mode 100644 (file)
index 0000000..3253631
--- /dev/null
@@ -0,0 +1,129 @@
+{
+  "feeds": [
+    {
+      "suspend": false,
+      "groupid": 0,
+      "description": "Default feed provisioned for PM File collector",
+      "version": "m1.0",
+      "authorization": {
+        "endpoint_addrs": [
+
+        ],
+        "classification": "unclassified",
+        "endpoint_ids": [
+          {
+            "password": "dradmin",
+            "id": "dradmin"
+          }
+        ]
+      },
+      "last_mod": 1560871903000,
+      "deleted": false,
+      "feedid": 1,
+      "name": "Default PM Feed",
+      "business_description": "Default Feed",
+      "aaf_instance": "legacy",
+      "publisher": "dradmin",
+      "links": {
+        "subscribe": "https://dmaap-dr-prov/subscribe/1",
+        "log": "https://dmaap-dr-prov/feedlog/1",
+        "publish": "https://dmaap-dr-prov/publish/1",
+        "self": "https://dmaap-dr-prov/feed/1"
+      },
+      "created_date": 1560871903000
+    }
+  ],
+  "groups": [
+    {
+      "authid": "GROUP-0000-c2754bb7-92ef-4869-9c6b-1bc1283be4c0",
+      "name": "Test Group",
+      "description": "Test Description of Group .",
+      "classification": "publisher/subscriber",
+      "members": "{id=attuid, name=User1}, {id=attuid, name=User 2]"
+    }
+  ],
+  "subscriptions": [
+    {
+      "suspend": false,
+      "delivery": {
+        "use100": true,
+        "password": "PASSWORD",
+        "user": "LOGIN",
+        "url": "https://dcae-pm-mapper:8443/delivery"
+      },
+      "subscriber": "dradmin",
+      "groupid": 0,
+      "metadataOnly": false,
+      "privilegedSubscriber": true,
+      "subid": 1,
+      "last_mod": 1560872889000,
+      "feedid": 1,
+      "follow_redirect": false,
+      "decompress": true,
+      "aaf_instance": "legacy",
+      "links": {
+        "feed": "https://dmaap-dr-prov/feed/1",
+        "log": "https://dmaap-dr-prov/sublog/1",
+        "self": "https://dmaap-dr-prov/subs/1"
+      },
+      "created_date": 1560872889000
+    }
+  ],
+  "parameters": {
+    "ACTIVE_POD": "dmaap-dr-prov",
+    "DELIVERY_FILE_PROCESS_INTERVAL": 10,
+    "DELIVERY_INIT_RETRY_INTERVAL": 10,
+    "DELIVERY_MAX_AGE": 86400,
+    "DELIVERY_MAX_RETRY_INTERVAL": 3600,
+    "DELIVERY_RETRY_RATIO": 2,
+    "LOGROLL_INTERVAL": 30,
+    "NODES": [
+      "dmaap-dr-node"
+    ],
+    "PROV_ACTIVE_NAME": "dmaap-dr-prov",
+    "PROV_AUTH_ADDRESSES": [
+      "dmaap-dr-prov",
+      "dmaap-dr-node"
+    ],
+    "PROV_AUTH_SUBJECTS": [
+      ""
+    ],
+    "PROV_DOMAIN": "",
+    "PROV_MAXFEED_COUNT": 10000,
+    "PROV_MAXSUB_COUNT": 100000,
+    "PROV_NAME": "dmaap-dr-prov",
+    "PROV_REQUIRE_CERT": "false",
+    "PROV_REQUIRE_SECURE": "true",
+    "STANDBY_POD": "",
+    "_INT_VALUES": [
+      "LOGROLL_INTERVAL",
+      "PROV_MAXFEED_COUNT",
+      "PROV_MAXSUB_COUNT",
+      "DELIVERY_INIT_RETRY_INTERVAL",
+      "DELIVERY_MAX_RETRY_INTERVAL",
+      "DELIVERY_RETRY_RATIO",
+      "DELIVERY_MAX_AGE",
+      "DELIVERY_FILE_PROCESS_INTERVAL"
+    ]
+  },
+  "ingress": [
+    {
+      "feedid": 1,
+      "subnet": "",
+      "user": "",
+      "node": [
+        "stub_from."
+      ]
+    }
+  ],
+  "egress": {
+    "1": "stub_to."
+  },
+  "routing": [
+    {
+      "from": 1,
+      "to": 3,
+      "via": 2
+    }
+  ]
+}
\ No newline at end of file