Remove major and minor code smells in dr-node 87/87887/3
authoreconwar <conor.ward@est.tech>
Wed, 29 May 2019 13:19:49 +0000 (13:19 +0000)
committereconwar <conor.ward@est.tech>
Wed, 29 May 2019 13:19:49 +0000 (13:19 +0000)
Change-Id: I8c408472050adfa2889093e40d294bf542974a80
Issue-ID: DMAAP-1195
Signed-off-by: econwar <conor.ward@est.tech>
27 files changed:
ci_scripts/onap-style-java.xml
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DRNodeCadiFilter.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Delivery.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTaskHelper.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfo.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java [new file with mode: 0644]
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/IsFrom.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/LogManager.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfig.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeConfigManager.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeMain.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeServlet.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/NodeUtils.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathFinder.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PathUtil.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/ProvData.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/PublishId.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RateLimitedOperation.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/RedirManager.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/StatusLog.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/SubnetMatcher.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/Target.java
datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/TaskList.java
datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/DeliveryTest.java
datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/NodeServletTest.java

index 7f9e471..89b2568 100644 (file)
@@ -52,7 +52,7 @@
         <property name="eachLine" value="true"/>
     </module>
     <module name="SuppressionFilter">
-        <property name="file" value="ci_scripts/suppressTestFiles.xml" />
+        <property name="file" value="ci_scripts/suppressedFilesForStyleCheck.xml" />
     </module>
     <module name="TreeWalker">
         <module name="OuterTypeFilename"/>
         </module>
         <module name="AbbreviationAsWordInName">
             <property name="ignoreFinal" value="false"/>
-            <property name="allowedAbbreviationLength" value="1"/>
+            <property name="allowedAbbreviationLength" value="3"/>
         </module>
         <module name="OverloadMethodsDeclarationOrder"/>
         <module name="VariableDeclarationUsageDistance"/>
index 991d866..245dbcc 100644 (file)
  * SPDX-License-Identifier: Apache-2.0
  * ============LICENSE_END=========================================================
  */
+
 package org.onap.dmaap.datarouter.node;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-import org.onap.aaf.cadi.PropAccess;
-import org.onap.aaf.cadi.filter.CadiFilter;
-
+import java.io.IOException;
 import javax.servlet.FilterChain;
 import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
+import org.onap.aaf.cadi.PropAccess;
+import org.onap.aaf.cadi.filter.CadiFilter;
 
 
 public class DRNodeCadiFilter extends CadiFilter {
+
     private static EELFLogger logger = EELFManager.getInstance().getLogger(NodeServlet.class);
 
     DRNodeCadiFilter(boolean init, PropAccess access) throws ServletException {
@@ -41,23 +42,16 @@ public class DRNodeCadiFilter extends CadiFilter {
     }
 
     @Override
-    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
+    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
+            throws IOException, ServletException {
         HttpServletRequest httpRequest = (HttpServletRequest) request;
         String path = httpRequest.getPathInfo();
         if (!(path.startsWith("/internal"))) {
-            if (!(httpRequest.getMethod().equalsIgnoreCase("POST"))) {
-                if (httpRequest.getMethod().equalsIgnoreCase("DELETE") && path.startsWith("/delete")) {
+            if (!("POST".equalsIgnoreCase(httpRequest.getMethod()))) {
+                if ("DELETE".equalsIgnoreCase(httpRequest.getMethod()) && path.startsWith("/delete")) {
                     chain.doFilter(request, response);
                 } else {
-                    String feedId = getFeedId(request, response);
-                    String aafDbInstance = NodeConfigManager.getInstance().getAafInstance(feedId);
-                    if (aafDbInstance != null && !aafDbInstance.equals("") && !aafDbInstance.equalsIgnoreCase("legacy")) {
-                        logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "AAF Instance -" + aafDbInstance);
-                        super.doFilter(request, response, chain);
-                    } else {
-                        logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "Legacy Feed");
-                        chain.doFilter(request, response);
-                    }
+                    doFilterWithFeedId(request, response, chain);
                 }
             }
         } else {
@@ -72,7 +66,8 @@ public class DRNodeCadiFilter extends CadiFilter {
         if (fileid == null) {
             logger.error("NODE0105 Rejecting bad URI for PUT " + req.getPathInfo() + " from " + req.getRemoteAddr());
             try {
-                resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");
+                resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+                        "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");
             } catch (IOException e) {
                 logger.error("NODE0541 DRNodeCadiFilter.getFeedId: ", e);
             }
@@ -82,19 +77,34 @@ public class DRNodeCadiFilter extends CadiFilter {
 
         if (fileid.startsWith("/publish/")) {
             fileid = fileid.substring(9);
-            int i = fileid.indexOf('/');
-            if (i == -1 || i == fileid.length() - 1) {
-                logger.error("NODE0105 Rejecting bad URI for PUT (publish) of " + req.getPathInfo() + " from " + req.getRemoteAddr());
+            int index = fileid.indexOf('/');
+            if (index == -1 || index == fileid.length() - 1) {
+                logger.error("NODE0105 Rejecting bad URI for PUT (publish) of " + req.getPathInfo() + " from " + req
+                        .getRemoteAddr());
                 try {
-                    resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.  Possible missing fileid.");
+                    resp.sendError(HttpServletResponse.SC_NOT_FOUND,
+                            "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.  "
+                                    + "Possible missing fileid.");
                 } catch (IOException e) {
                     logger.error("NODE0542 DRNodeCadiFilter.getFeedId: ", e);
                 }
                 return null;
             }
-            feedid = fileid.substring(0, i);
+            feedid = fileid.substring(0, index);
         }
         return feedid;
     }
 
+    private void doFilterWithFeedId(ServletRequest request, ServletResponse response, FilterChain chain)
+            throws IOException, ServletException {
+        String feedId = getFeedId(request, response);
+        String aafDbInstance = NodeConfigManager.getInstance().getAafInstance(feedId);
+        if (aafDbInstance != null && !"".equals(aafDbInstance) && !"legacy".equalsIgnoreCase(aafDbInstance)) {
+            logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "AAF Instance -" + aafDbInstance);
+            super.doFilter(request, response, chain);
+        } else {
+            logger.info("DRNodeCadiFilter - doFilter: FeedId - " + feedId + ":" + "Legacy Feed");
+            chain.doFilter(request, response);
+        }
+    }
 }
index 4c21b34..df73c1e 100644 (file)
 
 package org.onap.dmaap.datarouter.node;
 
-import java.util.*;
-import java.io.*;
-
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Hashtable;
+import java.util.Objects;
 
 /**
  * Main control point for delivering files to destinations.
- * <p>
- * The Delivery class manages assignment of delivery threads to delivery
- * queues and creation and destruction of delivery queues as
- * configuration changes.  DeliveryQueues are assigned threads based on a
- * modified round-robin approach giving priority to queues with more work
- * as measured by both bytes to deliver and files to deliver and lower
- * priority to queues that already have delivery threads working.
- * A delivery thread continues to work for a delivery queue as long as
- * that queue has more files to deliver.
+ *
+ * <p>The Delivery class manages assignment of delivery threads to delivery queues and creation and destruction of
+ * delivery queues as configuration changes. DeliveryQueues are assigned threads based on a modified round-robin
+ * approach giving priority to queues with more work as measured by both bytes to deliver and files to deliver and lower
+ * priority to queues that already have delivery threads working. A delivery thread continues to work for a delivery
+ * queue as long as that queue has more files to deliver.
  */
 public class Delivery {
-    private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
-
-    private static class DelItem implements Comparable<DelItem> {
-        private String pubid;
-        private String spool;
-
-        public int compareTo(DelItem x) {
-            int i = pubid.compareTo(x.pubid);
-            if (i == 0) {
-                i = spool.compareTo(x.spool);
-            }
-            return (i);
-        }
-
-        public String getPublishId() {
-            return (pubid);
-        }
-
-        public String getSpool() {
-            return (spool);
-        }
-
-        public DelItem(String pubid, String spool) {
-            this.pubid = pubid;
-            this.spool = spool;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            DelItem delItem = (DelItem) o;
-            return Objects.equals(pubid, delItem.pubid) &&
-                    Objects.equals(getSpool(), delItem.getSpool());
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(pubid, getSpool());
-        }
-    }
 
+    private static final String TOTAL = " total=";
+    private static final String YELLOW = " yellow=";
+    private static EELFLogger logger = EELFManager.getInstance().getLogger(Delivery.class);
     private double fdstart;
     private double fdstop;
     private int threads;
     private int curthreads;
     private NodeConfigManager config;
-    private Hashtable<String, DeliveryQueue> dqs = new Hashtable<String, DeliveryQueue>();
+    private Hashtable<String, DeliveryQueue> dqs = new Hashtable<>();
     private DeliveryQueue[] queues = new DeliveryQueue[0];
     private int qpos = 0;
     private long nextcheck;
-    private Runnable cmon = new Runnable() {
-        public void run() {
-            checkconfig();
-        }
-    };
 
     /**
      * Constructs a new Delivery system using the specified configuration manager.
@@ -110,10 +62,37 @@ public class Delivery {
      */
     public Delivery(NodeConfigManager config) {
         this.config = config;
+        Runnable cmon = this::checkconfig;
         config.registerConfigTask(cmon);
         checkconfig();
     }
 
+    /**
+     * Reset the retry timer for a delivery queue.
+     */
+    public synchronized void resetQueue(String spool) {
+        if (spool != null) {
+            DeliveryQueue dq = dqs.get(spool);
+            if (dq != null) {
+                dq.resetQueue();
+            }
+        }
+    }
+
+    /**
+     * Mark the task in spool a success.
+     */
+    public synchronized boolean markTaskSuccess(String spool, String pubId) {
+        boolean succeeded = false;
+        if (spool != null) {
+            DeliveryQueue dq = dqs.get(spool);
+            if (dq != null) {
+                succeeded = dq.markTaskSuccess(pubId);
+            }
+        }
+        return succeeded;
+    }
+
     private void cleardir(String dir) {
         if (dqs.get(dir) != null) {
             return;
@@ -131,12 +110,11 @@ public class Delivery {
         File spoolfile = new File(config.getSpoolBase());
         long tspace = spoolfile.getTotalSpace();
         long start = (long) (tspace * fdstart);
-        long stop = (long) (tspace * fdstop);
         long cur = spoolfile.getUsableSpace();
         if (cur >= start) {
             return;
         }
-        Vector<DelItem> cv = new Vector<DelItem>();
+        ArrayList<DelItem> cv = new ArrayList<>();
         for (String sdir : dqs.keySet()) {
             for (String meta : (new File(sdir)).list()) {
                 if (!meta.endsWith(".M") || meta.charAt(0) == '.') {
@@ -147,27 +125,21 @@ public class Delivery {
         }
         DelItem[] items = cv.toArray(new DelItem[cv.size()]);
         Arrays.sort(items);
-        logger.info("NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + " total=" + tspace);
-        for (DelItem item : items) {
-            long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
-            logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");
-            if (amount > 0) {
-                cur += amount;
-                if (cur >= stop) {
-                    cur = spoolfile.getUsableSpace();
-                }
-                if (cur >= stop) {
-                    logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);
-                    return;
-                }
-            }
+        long stop = (long) (tspace * fdstop);
+        logger.info(
+                "NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + TOTAL + tspace);
+        if (determineFreeDiskSpace(spoolfile, tspace, stop, cur, items)) {
+            return;
         }
         cur = spoolfile.getUsableSpace();
         if (cur >= stop) {
-            logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);
+            logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + YELLOW + stop
+                    + TOTAL + tspace);
             return;
         }
-        logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + " yellow=" + stop + " total=" + tspace);
+        logger.warn(
+                "NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + YELLOW
+                        + stop + TOTAL + tspace);
     }
 
     private void cleardirs() {
@@ -206,7 +178,7 @@ public class Delivery {
         DestInfo[] alldis = config.getAllDests();
         DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];
         qpos = 0;
-        Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();
+        Hashtable<String, DeliveryQueue> ndqs = new Hashtable<>();
         for (DestInfo di : alldis) {
             String spl = di.getSpool();
             DeliveryQueue dq = dqs.get(spl);
@@ -223,11 +195,8 @@ public class Delivery {
         cleardirs();
         while (curthreads < threads) {
             curthreads++;
-            (new Thread() {
-                {
-                    setName("Delivery Thread");
-                }
-
+            (new Thread("Delivery Thread") {
+                @Override
                 public void run() {
                     dodelivery();
                 }
@@ -276,29 +245,69 @@ public class Delivery {
         }
     }
 
-    /**
-     * Reset the retry timer for a delivery queue
-     */
-    public synchronized void resetQueue(String spool) {
-        if (spool != null) {
-            DeliveryQueue dq = dqs.get(spool);
-            if (dq != null) {
-                dq.resetQueue();
+    private boolean determineFreeDiskSpace(File spoolfile, long tspace, long stop, long cur, DelItem[] items) {
+        for (DelItem item : items) {
+            long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());
+            logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId()
+                    + " to free up disk");
+            if (amount > 0) {
+                cur += amount;
+                if (cur >= stop) {
+                    cur = spoolfile.getUsableSpace();
+                }
+                if (cur >= stop) {
+                    logger.info(
+                            "NODE0503 Free disk space at or above yellow threshold.  current=" + cur + YELLOW + stop
+                                    + TOTAL + tspace);
+                    return true;
+                }
             }
         }
+        return false;
     }
 
-    /**
-     * Mark the task in spool a success
-     */
-    public synchronized boolean markTaskSuccess(String spool, String pubId) {
-        boolean succeeded = false;
-        if (spool != null) {
-            DeliveryQueue dq = dqs.get(spool);
-            if (dq != null) {
-                succeeded = dq.markTaskSuccess(pubId);
+    private static class DelItem implements Comparable<DelItem> {
+
+        private String pubid;
+        private String spool;
+
+        public DelItem(String pubid, String spool) {
+            this.pubid = pubid;
+            this.spool = spool;
+        }
+
+        public int compareTo(DelItem other) {
+            int diff = pubid.compareTo(other.pubid);
+            if (diff == 0) {
+                diff = spool.compareTo(other.spool);
             }
+            return (diff);
+        }
+
+        public String getPublishId() {
+            return (pubid);
+        }
+
+        public String getSpool() {
+            return (spool);
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (this == object) {
+                return true;
+            }
+            if (object == null || getClass() != object.getClass()) {
+                return false;
+            }
+            DelItem delItem = (DelItem) object;
+            return Objects.equals(pubid, delItem.pubid)
+                    && Objects.equals(getSpool(), delItem.getSpool());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(pubid, getSpool());
         }
-        return succeeded;
     }
 }
index 3d48587..8cdafd6 100644 (file)
@@ -26,46 +26,41 @@ package org.onap.dmaap.datarouter.node;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Hashtable;
+import java.util.Vector;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Mechanism for monitoring and controlling delivery of files to a destination.
- * <p>
- * The DeliveryQueue class maintains lists of DeliveryTasks for a single
- * destination (a subscription or another data router node) and assigns
- * delivery threads to try to deliver them.  It also maintains a delivery
- * status that causes it to back off on delivery attempts after a failure.
- * <p>
- * If the most recent delivery result was a failure, then no more attempts
- * will be made for a period of time.  Initially, and on the first failure
- * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds).
- * If, after this delay, additional failures occur, each failure will
- * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a
- * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().
- * Note that this behavior applies to the delivery queue as a whole and not
- * to individual files in the queue.  If multiple files are being
- * delivered and one fails, the delay will be started.  If a second
- * delivery fails while the delay was active, it will not change the delay
- * or change the duration of any subsequent delay.
- * If, however, it succeeds, it will cancel the delay.
- * <p>
- * The queue maintains 3 collections of files to deliver: A todo list of
- * files that will be attempted, a working set of files that are being
- * attempted, and a retry set of files that were attempted and failed.
- * Whenever the todo list is empty and needs to be refilled, a scan of the
- * spool directory is made and the file names sorted.  Any files in the working set are ignored.
- * If a DeliveryTask for the file is in the retry set, then that delivery
- * task is placed on the todo list.  Otherwise, a new DeliveryTask for the
- * file is created and placed on the todo list.
- * If, when a DeliveryTask is about to be removed from the todo list, its
- * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead
- * marked as expired.
- * <p>
- * A delivery queue also maintains a skip flag.  This flag is true if the
- * failure timer is active or if no files are found in a directory scan.
+ *
+ * <p>The DeliveryQueue class maintains lists of DeliveryTasks for a single destination (a subscription or another data
+ * router node) and assigns delivery threads to try to deliver them.  It also maintains a delivery status that causes it
+ * to back off on delivery attempts after a failure.
+ *
+ * <p>If the most recent delivery result was a failure, then no more attempts will be made for a period of time.
+ * Initially, and on the first failure following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer()
+ * (milliseconds). If, after this delay, additional failures occur, each failure will multiply the delay by
+ * DeliveryQueueHelper.getFailureBackoff() up to a maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().
+ * Note that this behavior applies to the delivery queue as a whole and not to individual files in the queue.  If
+ * multiple files are being delivered and one fails, the delay will be started.  If a second delivery fails while the
+ * delay was active, it will not change the delay or change the duration of any subsequent delay. If, however, it
+ * succeeds, it will cancel the delay.
+ *
+ * <p>The queue maintains 3 collections of files to deliver: A to do list of files that will be attempted, a working
+ * set of files that are being attempted, and a retry set of files that were attempted and failed. Whenever the to do
+ * list is empty and needs to be refilled, a scan of the spool directory is made and the file names sorted.  Any files
+ * in the working set are ignored. If a DeliveryTask for the file is in the retry set, then that delivery task is placed
+ * on the to do list.  Otherwise, a new DeliveryTask for the file is created and placed on the to do list. If, when a
+ * DeliveryTask is about to be removed from the to do list, its age exceeds DeliveryQueueHelper.getExpirationTimer(),
+ * then it is instead marked as expired.
+ *
+ * <p>A delivery queue also maintains a skip flag.  This flag is true if the failure timer is active or if no files are
+ * found in a directory scan.
  */
 public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
+
     private static EELFLogger logger = EELFManager.getInstance().getLogger(DeliveryQueue.class);
     private DeliveryQueueHelper deliveryQueueHelper;
     private DestInfo destinationInfo;
@@ -78,6 +73,16 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     private File dir;
     private Vector<DeliveryTask> todo = new Vector<>();
 
+    /**
+     * Create a delivery queue for a given destination info.
+     */
+    DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
+        this.deliveryQueueHelper = deliveryQueueHelper;
+        this.destinationInfo = destinationInfo;
+        dir = new File(destinationInfo.getSpool());
+        dir.mkdirs();
+    }
+
     /**
      * Try to cancel a delivery task.
      *
@@ -106,7 +111,8 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
         if (dt.isCleaned()) {
             return (0);
         }
-        StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts());
+        StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(),
+                dt.getLength(), "diskFull", dt.getAttempts());
         dt.clean();
         return (dt.getLength());
     }
@@ -144,7 +150,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
             if (failduration == 0) {
                 if (destinationInfo.isPrivilegedSubscriber()) {
                     failduration = deliveryQueueHelper.getWaitForFileProcessFailureTimer();
-                } else{
+                } else {
                     failduration = deliveryQueueHelper.getInitFailureTimer();
                 }
             }
@@ -205,144 +211,106 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
                 todo = new Vector<>();
                 String[] files = dir.list();
                 Arrays.sort(files);
-                for (String fname : files) {
-                    if (!fname.endsWith(".M")) {
-                        continue;
-                    }
-                    String fname2 = fname.substring(0, fname.length() - 2);
-                    long pidtime = 0;
-                    int dot = fname2.indexOf('.');
-                    if (dot < 1) {
-                        continue;
-                    }
-                    try {
-                        pidtime = Long.parseLong(fname2.substring(0, dot));
-                    } catch (Exception e) {
-                        logger.error("Exception", e);
-                    }
-                    if (pidtime < 1000000000000L) {
-                        continue;
-                    }
-                    if (working.get(fname2) != null) {
-                        continue;
-                    }
-                    DeliveryTask dt = retry.get(fname2);
-                    if (dt == null) {
-                        dt = new DeliveryTask(this, fname2);
-                    }
-                    todo.add(dt);
-                }
+                scanForNextTask(files);
                 retry = new Hashtable<>();
             }
-            if (todoindex < todo.size()) {
-                DeliveryTask dt = todo.get(todoindex);
-                if (dt.isCleaned()) {
-                    todoindex++;
-                    continue;
-                }
-                if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
-                    retry.put(dt.getPublishId(), dt);
-                    todoindex++;
-                    continue;
-                }
-                if (dt.getDate() >= mindate) {
-                    return (dt);
-                }
-                todoindex++;
-                reportExpiry(dt);
-                continue;
+            DeliveryTask dt = getDeliveryTask(mindate);
+            if (dt != null) {
+                return dt;
             }
-            return (null);
+            return null;
         }
     }
 
     /**
-     * Create a delivery queue for a given destination info
-     */
-    DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) {
-        this.deliveryQueueHelper = deliveryQueueHelper;
-        this.destinationInfo = destinationInfo;
-        dir = new File(destinationInfo.getSpool());
-        dir.mkdirs();
-    }
-
-    /**
-     * Update the destination info for this delivery queue
+     * Update the destination info for this delivery queue.
      */
     public void config(DestInfo destinationInfo) {
         this.destinationInfo = destinationInfo;
     }
 
     /**
-     * Get the dest info
+     * Get the dest info.
      */
     public DestInfo getDestinationInfo() {
         return (destinationInfo);
     }
 
     /**
-     * Get the config manager
+     * Get the config manager.
      */
     public DeliveryQueueHelper getConfig() {
         return (deliveryQueueHelper);
     }
 
     /**
-     * Exceptional condition occurred during delivery
+     * Exceptional condition occurred during delivery.
      */
     public void reportDeliveryExtra(DeliveryTask task, long sent) {
         StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);
     }
 
     /**
-     * Message too old to deliver
+     * Message too old to deliver.
      */
     void reportExpiry(DeliveryTask task) {
-        StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
+        StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+                task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());
         markExpired(task);
     }
 
     /**
-     * Completed a delivery attempt
+     * Completed a delivery attempt.
      */
     public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {
         if (status < 300) {
-            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
+            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+                    task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid);
             if (destinationInfo.isPrivilegedSubscriber()) {
-                task.setResumeTime(System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
+                task.setResumeTime(
+                        System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer());
                 markFailWithRetry(task);
             } else {
                 markSuccess(task);
             }
         } else if (status < 400 && deliveryQueueHelper.isFollowRedirects()) {
-            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
+            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+                    task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
             if (deliveryQueueHelper.handleRedirection(destinationInfo, location, task.getFileId())) {
                 markRedirect(task);
             } else {
-                StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
+                StatusLog
+                        .logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+                                task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
                 markFailNoRetry(task);
             }
-        } else if (status < 500 && status != 429) {         // Status 429 is the standard response for Too Many Requests and indicates that a file needs to be delivered again at a later time.
-            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
-            StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
+        } else if (status < 500 && status != 429) {
+            // Status 429 is the standard response for Too Many Requests and indicates
+            // that a file needs to be delivered again at a later time.
+            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+                    task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
+            StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+                    task.getCType(), task.getLength(), "notRetryable", task.getAttempts());
             markFailNoRetry(task);
         } else {
-            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
+            StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+                    task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, location);
             markFailWithRetry(task);
         }
     }
 
     /**
-     * Delivery failed by reason of an exception
+     * Delivery failed by reason of an exception.
      */
     public void reportException(DeliveryTask task, Exception exception) {
-        StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
+        StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(),
+                task.getCType(), task.getLength(), destinationInfo.getAuthUser(), -1, exception.toString());
         deliveryQueueHelper.handleUnreachable(destinationInfo);
         markFailWithRetry(task);
     }
 
     /**
-     * Get the feed ID for a subscription
+     * Get the feed ID for a subscription.
      *
      * @param subid The subscription ID
      * @return The feed ID
@@ -352,22 +320,21 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     }
 
     /**
-     * Get the URL to deliver a message to given the file ID
+     * Get the URL to deliver a message to given the file ID.
      */
     public String getDestURL(String fileid) {
         return (deliveryQueueHelper.getDestURL(destinationInfo, fileid));
     }
 
     /**
-     * Deliver files until there's a failure or there are no more
-     * files to deliver
+     * Deliver files until there's a failure or there are no more files to deliver.
      */
     public void run() {
-        DeliveryTask t;
+        DeliveryTask task;
         long endtime = System.currentTimeMillis() + deliveryQueueHelper.getFairTimeLimit();
         int filestogo = deliveryQueueHelper.getFairFileLimit();
-        while ((t = getNext()) != null) {
-            t.run();
+        while ((task = getNext()) != null) {
+            task.run();
             if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {
                 break;
             }
@@ -375,21 +342,21 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
     }
 
     /**
-     * Is there no work to do for this queue right now?
+     * Is there no work to do for this queue right now.
      */
     synchronized boolean isSkipSet() {
         return (peekNext() == null);
     }
 
     /**
-     * Reset the retry timer
+     * Reset the retry timer.
      */
     void resetQueue() {
         resumetime = System.currentTimeMillis();
     }
 
     /**
-     * Get task if in queue and mark as success
+     * Get task if in queue and mark as success.
      */
     boolean markTaskSuccess(String pubId) {
         DeliveryTask task = working.get(pubId);
@@ -407,4 +374,63 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper {
         }
         return false;
     }
+
+    private void scanForNextTask(String[] files) {
+        for (String fname : files) {
+            String pubId = getPubId(fname);
+            if (pubId == null) {
+                continue;
+            }
+            DeliveryTask dt = retry.get(pubId);
+            if (dt == null) {
+                dt = new DeliveryTask(this, pubId);
+            }
+            todo.add(dt);
+        }
+    }
+
+    @Nullable
+    private DeliveryTask getDeliveryTask(long mindate) {
+        if (todoindex < todo.size()) {
+            DeliveryTask dt = todo.get(todoindex);
+            if (dt.isCleaned()) {
+                todoindex++;
+            }
+            if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) {
+                retry.put(dt.getPublishId(), dt);
+                todoindex++;
+            }
+            if (dt.getDate() >= mindate) {
+                return (dt);
+            }
+            todoindex++;
+            reportExpiry(dt);
+        }
+        return null;
+    }
+
+    @Nullable
+    private String getPubId(String fname) {
+        if (!fname.endsWith(".M")) {
+            return null;
+        }
+        String fname2 = fname.substring(0, fname.length() - 2);
+        long pidtime = 0;
+        int dot = fname2.indexOf('.');
+        if (dot < 1) {
+            return null;
+        }
+        try {
+            pidtime = Long.parseLong(fname2.substring(0, dot));
+        } catch (Exception e) {
+            logger.error("Exception", e);
+        }
+        if (pidtime < 1000000000000L) {
+            return null;
+        }
+        if (working.get(fname2) != null) {
+            return null;
+        }
+        return fname2;
+    }
 }
index 018c3af..2093d6d 100644 (file)
 
 package org.onap.dmaap.datarouter.node;
 
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.zip.GZIPInputStream;
+import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
+import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.zip.GZIPInputStream;
+import org.jetbrains.annotations.Nullable;
 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
 import org.slf4j.MDC;
 
-import static com.att.eelf.configuration.Configuration.*;
-import static org.onap.dmaap.datarouter.node.NodeUtils.isFiletypeGzip;
-
 /**
  * A file to be delivered to a destination.
- * <p>
- * A Delivery task represents a work item for the data router - a file that
- * needs to be delivered and provides mechanisms to get information about
- * the file and its delivery data as well as to attempt delivery.
+ *
+ * <p>A Delivery task represents a work item for the data router - a file that needs to be delivered and provides
+ * mechanisms to get information about the file and its delivery data as well as to attempt delivery.
  */
 public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
-    private static EELFLogger eelfLogger = EELFManager.getInstance()
-            .getLogger(DeliveryTask.class);
+
+    private static final String DECOMPRESSION_STATUS = "Decompression_Status";
+    private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(DeliveryTask.class);
     private DeliveryTaskHelper deliveryTaskHelper;
     private String pubid;
     private DestInfo destInfo;
@@ -72,9 +82,8 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
      * Create a delivery task for a given delivery queue and pub ID
      *
      * @param deliveryTaskHelper The delivery task helper for the queue this task is in.
-     * @param pubid              The publish ID for this file.  This is used as
-     *                           the base for the file name in the spool directory and is of
-     *                           the form <milliseconds since 1970>.<fqdn of initial data router node>
+     * @param pubid The publish ID for this file.  This is used as the base for the file name in the spool directory and
+     * is of the form <milliseconds since 1970>.<fqdn of initial data router node>
      */
     DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) {
         this.deliveryTaskHelper = deliveryTaskHelper;
@@ -91,40 +100,40 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
         boolean monly = destInfo.isMetaDataOnly();
         date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));
         resumeTime = System.currentTimeMillis();
-        Vector<String[]> hdrv = new Vector<>();
+        ArrayList<String[]> hdrv = new ArrayList<>();
 
         try (BufferedReader br = new BufferedReader(new FileReader(metafile))) {
-            String s = br.readLine();
-            int i = s.indexOf('\t');
-            method = s.substring(0, i);
+            String line = br.readLine();
+            int index = line.indexOf('\t');
+            method = line.substring(0, index);
             NodeUtils.setIpAndFqdnForEelf(method);
             if (!"DELETE".equals(method) && !monly) {
                 length = datafile.length();
             }
-            fileid = s.substring(i + 1);
-            while ((s = br.readLine()) != null) {
-                i = s.indexOf('\t');
-                String h = s.substring(0, i);
-                String v = s.substring(i + 1);
-                if ("x-dmaap-dr-routing".equalsIgnoreCase(h)) {
-                    subid = v.replaceAll("[^ ]*/", "");
+            fileid = line.substring(index + 1);
+            while ((line = br.readLine()) != null) {
+                index = line.indexOf('\t');
+                String header = line.substring(0, index);
+                String headerValue = line.substring(index + 1);
+                if ("x-dmaap-dr-routing".equalsIgnoreCase(header)) {
+                    subid = headerValue.replaceAll("[^ ]*/", "");
                     feedid = deliveryTaskHelper.getFeedId(subid.replaceAll(" .*", ""));
                 }
-                if (length == 0 && h.toLowerCase().startsWith("content-")) {
+                if (length == 0 && header.toLowerCase().startsWith("content-")) {
                     continue;
                 }
-                if (h.equalsIgnoreCase("content-type")) {
-                    ctype = v;
+                if ("content-type".equalsIgnoreCase(header)) {
+                    ctype = headerValue;
                 }
-                if (h.equalsIgnoreCase("x-onap-requestid")) {
-                    MDC.put(MDC_KEY_REQUEST_ID, v);
+                if ("x-onap-requestid".equalsIgnoreCase(header)) {
+                    MDC.put(MDC_KEY_REQUEST_ID, headerValue);
                 }
-                if (h.equalsIgnoreCase("x-invocationid")) {
-                    MDC.put("InvocationId", v);
-                    v = UUID.randomUUID().toString();
-                    newInvocationId = v;
+                if ("x-invocationid".equalsIgnoreCase(header)) {
+                    MDC.put("InvocationId", headerValue);
+                    headerValue = UUID.randomUUID().toString();
+                    newInvocationId = headerValue;
                 }
-                hdrv.add(new String[]{h, v});
+                hdrv.add(new String[]{header, headerValue});
             }
         } catch (Exception e) {
             eelfLogger.error("Exception", e);
@@ -134,20 +143,20 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     }
 
     /**
-     * Is the object a DeliveryTask with the same publication ID?
+     * Is the object a DeliveryTask with the same publication ID.
      */
-    public boolean equals(Object o) {
-        if (!(o instanceof DeliveryTask)) {
+    public boolean equals(Object object) {
+        if (!(object instanceof DeliveryTask)) {
             return (false);
         }
-        return (pubid.equals(((DeliveryTask) o).pubid));
+        return (pubid.equals(((DeliveryTask) object).pubid));
     }
 
     /**
      * Compare the publication IDs.
      */
-    public int compareTo(DeliveryTask o) {
-        return (pubid.compareTo(o.pubid));
+    public int compareTo(DeliveryTask other) {
+        return (pubid.compareTo(other.pubid));
     }
 
     /**
@@ -165,79 +174,49 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     }
 
     /**
-     * Get the publish ID
+     * Get the publish ID.
      */
     String getPublishId() {
         return (pubid);
     }
 
     /**
-     * Attempt delivery
+     * Attempt delivery.
      */
     public void run() {
         attempts++;
         try {
             destInfo = deliveryTaskHelper.getDestinationInfo();
-            boolean expect100 = destInfo.isUsing100();
             boolean monly = destInfo.isMetaDataOnly();
             length = 0;
             if (!"DELETE".equals(method) && !monly) {
                 length = datafile.length();
             }
-            if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")){
-                    fileid = fileid.replace(".gz", "");
-            }
+            stripSuffixIfIsDecompress();
             url = deliveryTaskHelper.getDestURL(fileid);
-            URL u = new URL(url);
-            HttpURLConnection uc = (HttpURLConnection) u.openConnection();
-            uc.setConnectTimeout(60000);
-            uc.setReadTimeout(60000);
-            uc.setInstanceFollowRedirects(false);
-            uc.setRequestMethod(method);
-            uc.setRequestProperty("Content-Length", Long.toString(length));
-            uc.setRequestProperty("Authorization", destInfo.getAuth());
-            uc.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
-            for (String[] nv : hdrs) {
-                uc.addRequestProperty(nv[0], nv[1]);
-            }
-            if (length > 0) {
-                if (expect100) {
-                    uc.setRequestProperty("Expect", "100-continue");
-                }
-                uc.setDoOutput(true);
-                if (destInfo.isDecompress()) {
-                    if (isFiletypeGzip(datafile)) {
-                        sendDecompressedFile(uc);
-                    } else {
-                        uc.setRequestProperty("Decompression_Status", "UNSUPPORTED_FORMAT");
-                        sendFile(uc);
-                    }
-                } else {
-                    sendFile(uc);
-                }
-            }
-            int rc = uc.getResponseCode();
-            String rmsg = uc.getResponseMessage();
-            if (rmsg == null) {
-                String h0 = uc.getHeaderField(0);
-                if (h0 != null) {
-                    int i = h0.indexOf(' ');
-                    int j = h0.indexOf(' ', i + 1);
-                    if (i != -1 && j != -1) {
-                        rmsg = h0.substring(j + 1);
-                    }
-                }
-            }
+            URL urlObj = new URL(url);
+            HttpURLConnection urlConnection = (HttpURLConnection) urlObj.openConnection();
+            urlConnection.setConnectTimeout(60000);
+            urlConnection.setReadTimeout(60000);
+            urlConnection.setInstanceFollowRedirects(false);
+            urlConnection.setRequestMethod(method);
+            urlConnection.setRequestProperty("Content-Length", Long.toString(length));
+            urlConnection.setRequestProperty("Authorization", destInfo.getAuth());
+            urlConnection.setRequestProperty("X-DMAAP-DR-PUBLISH-ID", pubid);
+            boolean expect100 = destInfo.isUsing100();
+            int rc = deliverFileToSubscriber(expect100, urlConnection);
+            String rmsg = urlConnection.getResponseMessage();
+            rmsg = getResponseMessage(urlConnection, rmsg);
             String xpubid = null;
             InputStream is;
             if (rc >= 200 && rc <= 299) {
-                is = uc.getInputStream();
-                xpubid = uc.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
+                is = urlConnection.getInputStream();
+                xpubid = urlConnection.getHeaderField("X-DMAAP-DR-PUBLISH-ID");
             } else {
                 if (rc >= 300 && rc <= 399) {
-                    rmsg = uc.getHeaderField("Location");
+                    rmsg = urlConnection.getHeaderField("Location");
                 }
-                is = uc.getErrorStream();
+                is = urlConnection.getErrorStream();
             }
             byte[] buf = new byte[4096];
             if (is != null) {
@@ -247,20 +226,19 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
             }
             deliveryTaskHelper.reportStatus(this, rc, xpubid, rmsg);
         } catch (Exception e) {
-            eelfLogger.error("Exception "+ Arrays.toString(e.getStackTrace()),e);
+            eelfLogger.error("Exception " + Arrays.toString(e.getStackTrace()), e);
             deliveryTaskHelper.reportException(this, e);
         }
     }
 
     /**
-     * To send decompressed gzip to the subscribers
+     * To send decompressed gzip to the subscribers.
      *
      * @param httpURLConnection connection used to make request
-     * @throws IOException
      */
     private void sendDecompressedFile(HttpURLConnection httpURLConnection) throws IOException {
         byte[] buffer = new byte[8164];
-        httpURLConnection.setRequestProperty("Decompression_Status", "SUCCESS");
+        httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "SUCCESS");
         OutputStream outputStream = getOutputStream(httpURLConnection);
         if (outputStream != null) {
             int bytesRead = 0;
@@ -271,7 +249,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
                 }
                 outputStream.close();
             } catch (IOException e) {
-                httpURLConnection.setRequestProperty("Decompression_Status", "FAILURE");
+                httpURLConnection.setRequestProperty(DECOMPRESSION_STATUS, "FAILURE");
                 eelfLogger.info("Could not decompress file", e);
                 sendFile(httpURLConnection);
             }
@@ -283,44 +261,42 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
      * To send any file to the subscriber.
      *
      * @param httpURLConnection connection used to make request
-     * @throws IOException
      */
     private void sendFile(HttpURLConnection httpURLConnection) throws IOException {
         OutputStream os = getOutputStream(httpURLConnection);
-        if (os != null) {
-            long sofar = 0;
-            try (InputStream is = new FileInputStream(datafile)) {
-                byte[] buf = new byte[1024 * 1024];
-                while (sofar < length) {
-                    int i = buf.length;
-                    if (sofar + i > length) {
-                        i = (int) (length - sofar);
-                    }
-                    i = is.read(buf, 0, i);
-                    if (i <= 0) {
-                        throw new IOException("Unexpected problem reading data file " + datafile);
-                    }
-                    sofar += i;
-                    os.write(buf, 0, i);
+        if (os == null) {
+            return;
+        }
+        long sofar = 0;
+        try (InputStream is = new FileInputStream(datafile)) {
+            byte[] buf = new byte[1024 * 1024];
+            while (sofar < length) {
+                int len = buf.length;
+                if (sofar + len > length) {
+                    len = (int) (length - sofar);
+                }
+                len = is.read(buf, 0, len);
+                if (len <= 0) {
+                    throw new IOException("Unexpected problem reading data file " + datafile);
                 }
-                os.close();
-            } catch (IOException ioe) {
-                deliveryTaskHelper.reportDeliveryExtra(this, sofar);
-                throw ioe;
+                sofar += len;
+                os.write(buf, 0, len);
             }
+            os.close();
+        } catch (IOException ioe) {
+            deliveryTaskHelper.reportDeliveryExtra(this, sofar);
+            throw ioe;
         }
     }
 
     /**
-     * Get the outputstream that will be used to send data
+     * Get the outputstream that will be used to send data.
      *
      * @param httpURLConnection connection used to make request
      * @return AN Outpustream that can be used to send your data.
-     * @throws IOException
      */
     private OutputStream getOutputStream(HttpURLConnection httpURLConnection) throws IOException {
         OutputStream outputStream = null;
-
         try {
             outputStream = httpURLConnection.getOutputStream();
         } catch (ProtocolException pe) {
@@ -331,8 +307,52 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
         return outputStream;
     }
 
+    private void stripSuffixIfIsDecompress() {
+        if (destInfo.isDecompress() && isFiletypeGzip(datafile) && fileid.endsWith(".gz")) {
+            fileid = fileid.replace(".gz", "");
+        }
+    }
+
+    private int deliverFileToSubscriber(boolean expect100, HttpURLConnection uc) throws IOException {
+        for (String[] nv : hdrs) {
+            uc.addRequestProperty(nv[0], nv[1]);
+        }
+        if (length > 0) {
+            if (expect100) {
+                uc.setRequestProperty("Expect", "100-continue");
+            }
+            uc.setDoOutput(true);
+            if (destInfo.isDecompress()) {
+                if (isFiletypeGzip(datafile)) {
+                    sendDecompressedFile(uc);
+                } else {
+                    uc.setRequestProperty(DECOMPRESSION_STATUS, "UNSUPPORTED_FORMAT");
+                    sendFile(uc);
+                }
+            } else {
+                sendFile(uc);
+            }
+        }
+        return uc.getResponseCode();
+    }
+
+    @Nullable
+    private String getResponseMessage(HttpURLConnection uc, String rmsg) {
+        if (rmsg == null) {
+            String h0 = uc.getHeaderField(0);
+            if (h0 != null) {
+                int indexOfSpace1 = h0.indexOf(' ');
+                int indexOfSpace2 = h0.indexOf(' ', indexOfSpace1 + 1);
+                if (indexOfSpace1 != -1 && indexOfSpace2 != -1) {
+                    rmsg = h0.substring(indexOfSpace2 + 1);
+                }
+            }
+        }
+        return rmsg;
+    }
+
     /**
-     * Remove meta and data files
+     * Remove meta and data files.
      */
     void clean() {
         datafile.delete();
@@ -343,28 +363,28 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     }
 
     /**
-     * Set the resume time for a delivery task.
+     * Get the resume time for a delivery task.
      */
-    void setResumeTime(long resumeTime) {
-        this.resumeTime = resumeTime;
+    long getResumeTime() {
+        return resumeTime;
     }
 
     /**
-     * Get the resume time for a delivery task.
+     * Set the resume time for a delivery task.
      */
-    long getResumeTime() {
-        return resumeTime;
+    void setResumeTime(long resumeTime) {
+        this.resumeTime = resumeTime;
     }
 
     /**
-     * Has this delivery task been cleaned?
+     * Has this delivery task been cleaned.
      */
     boolean isCleaned() {
         return (hdrs == null);
     }
 
     /**
-     * Get length of body
+     * Get length of body.
      */
     public long getLength() {
         return (length);
@@ -378,58 +398,58 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> {
     }
 
     /**
-     * Get the most recent delivery attempt URL
+     * Get the most recent delivery attempt URL.
      */
     public String getURL() {
         return (url);
     }
 
     /**
-     * Get the content type
+     * Get the content type.
      */
     String getCType() {
         return (ctype);
     }
 
     /**
-     * Get the method
+     * Get the method.
      */
     String getMethod() {
         return (method);
     }
 
     /**
-     * Get the file ID
+     * Get the file ID.
      */
     String getFileId() {
         return (fileid);
     }
 
     /**
-     * Get the number of delivery attempts
+     * Get the number of delivery attempts.
      */
     int getAttempts() {
         return (attempts);
     }
 
     /**
-     * Get the (space delimited list of) subscription ID for this delivery task
+     * Get the (space delimited list of) subscription ID for this delivery task.
      */
     String getSubId() {
         return (subid);
     }
 
     /**
-     * Get the feed ID for this delivery task
+     * Get the feed ID for this delivery task.
      */
     String getFeedId() {
         return (feedid);
     }
 
     /**
-     * Get the followRedirects for this delivery task
+     * Get the followRedirects for this delivery task.
      */
     public boolean getFollowRedirects() {
-        return(followRedirects);
+        return (followRedirects);
     }
 }
index d4ac8bd..b9068f2 100644 (file)
@@ -26,32 +26,33 @@ package org.onap.dmaap.datarouter.node;
 
 /**
  * Interface to allow independent testing of the DeliveryTask code.
- * <p>
- * This interface represents all the configuraiton information and
- * feedback mechanisms that a delivery task needs.
+ *
+ * <p>This interface represents all the configuraiton information and feedback mechanisms that a delivery task needs.
  */
 
 public interface DeliveryTaskHelper {
+
     /**
-     * Report that a delivery attempt failed due to an exception (like can't connect to remote host)
+     * Report that a delivery attempt failed due to an exception (like can't connect to remote host).
      *
-     * @param task      The task that failed
+     * @param task The task that failed
      * @param exception The exception that occurred
      */
     void reportException(DeliveryTask task, Exception exception);
 
     /**
-     * Report that a delivery attempt completed (successfully or unsuccessfully)
+     * Report that a delivery attempt completed (successfully or unsuccessfully).
      *
-     * @param task     The task that failed
-     * @param status   The HTTP status
-     * @param xpubid   The publish ID from the far end (if any)
+     * @param task The task that failed
+     * @param status The HTTP status
+     * @param xpubid The publish ID from the far end (if any)
      * @param location The redirection location for a 3XX response
      */
     void reportStatus(DeliveryTask task, int status, String xpubid, String location);
 
     /**
-     * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue.
+     * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100
+     * Continue.
      *
      * @param task The task that failed
      * @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue.
@@ -59,14 +60,14 @@ public interface DeliveryTaskHelper {
     void reportDeliveryExtra(DeliveryTask task, long sent);
 
     /**
-     * Get the destination information for the delivery queue
+     * Get the destination information for the delivery queue.
      *
      * @return The destination information
      */
     DestInfo getDestinationInfo();
 
     /**
-     * Given a file ID, get the URL to deliver to
+     * Given a file ID, get the URL to deliver to.
      *
      * @param fileid The file id
      * @return The URL to deliver to
@@ -74,7 +75,7 @@ public interface DeliveryTaskHelper {
     String getDestURL(String fileid);
 
     /**
-     * Get the feed ID for a subscription
+     * Get the feed ID for a subscription.
      *
      * @param subid The subscription ID
      * @return The feed iD
index 8890fe9..f5fa6e9 100644 (file)
 package org.onap.dmaap.datarouter.node;
 
 /**
- * Information for a delivery destination that doesn't change from message to message
+ * Information for a delivery destination that doesn't change from message to message.
  */
 public class DestInfo {
+
     private String name;
     private String spool;
     private String subid;
@@ -40,114 +41,33 @@ public class DestInfo {
     private boolean privilegedSubscriber;
     private boolean decompress;
     private boolean followRedirects;
-    private String aafInstance;
-
-    public static class DestInfoBuilder {
-        private String name;
-        private String spool;
-        private String subid;
-        private String logdata;
-        private String url;
-        private String authuser;
-        private String authentication;
-        private boolean metaonly;
-        private boolean use100;
-        private boolean privilegedSubscriber;
-        private boolean followRedirects;
-        private boolean decompress;
-        private NodeConfig.ProvSubscription subscription;
-
-        public DestInfoBuilder setName(String name) {
-            this.name = name;
-            return this;
-        }
-
-        public DestInfoBuilder setSpool(String spool) {
-            this.spool = spool;
-            return this;
-        }
-
-        public DestInfoBuilder setSubid(String subid) {
-            this.subid = subid;
-            return this;
-        }
-
-        public DestInfoBuilder setLogdata(String logdata) {
-            this.logdata = logdata;
-            return this;
-        }
-
-        public DestInfoBuilder setUrl(String url) {
-            this.url = url;
-            return this;
-        }
-
-        public DestInfoBuilder setAuthuser(String authuser) {
-            this.authuser = authuser;
-            return this;
-        }
-
-        public DestInfoBuilder setAuthentication(String authentication) {
-            this.authentication = authentication;
-            return this;
-        }
-
-        public DestInfoBuilder setMetaonly(boolean metaonly) {
-            this.metaonly = metaonly;
-            return this;
-        }
-
-        public DestInfoBuilder setUse100(boolean use100) {
-            this.use100 = use100;
-            return this;
-        }
-
-        public DestInfoBuilder setPrivilegedSubscriber(boolean privilegedSubscriber) {
-            this.privilegedSubscriber = privilegedSubscriber;
-            return this;
-        }
-
-        public DestInfoBuilder setFollowRedirects(boolean followRedirects) {
-            this.followRedirects = followRedirects;
-            return this;
-        }
-
-        public DestInfoBuilder setDecompress(boolean decompress) {
-            this.decompress = decompress;
-            return this;
-        }
-
-        public DestInfoBuilder setSubscription(NodeConfig.ProvSubscription subscription) {
-            this.subscription = subscription;
-            return this;
-        }
-
-        public DestInfo createDestInfo() {
-            return new DestInfo(this);
-        }
-    }
 
+    /**
+     * Create a destination information object.
+     *
+     * @param destInfoBuilder DestInfo Object Builder
+     */
     public DestInfo(DestInfoBuilder destInfoBuilder) {
-        this.name = destInfoBuilder.name;
-        this.spool = destInfoBuilder.spool;
-        this.subid = destInfoBuilder.subid;
-        this.logdata = destInfoBuilder.logdata;
-        this.url = destInfoBuilder.url;
-        this.authuser = destInfoBuilder.authuser;
-        this.authentication = destInfoBuilder.authentication;
-        this.metaonly = destInfoBuilder.metaonly;
-        this.use100 = destInfoBuilder.use100;
-        this.privilegedSubscriber = destInfoBuilder.privilegedSubscriber;
-        this.followRedirects = destInfoBuilder.followRedirects;
-        this.decompress = destInfoBuilder.decompress;
+        this.name = destInfoBuilder.getName();
+        this.spool = destInfoBuilder.getSpool();
+        this.subid = destInfoBuilder.getSubid();
+        this.logdata = destInfoBuilder.getLogdata();
+        this.url = destInfoBuilder.getUrl();
+        this.authuser = destInfoBuilder.getAuthuser();
+        this.authentication = destInfoBuilder.getAuthentication();
+        this.metaonly = destInfoBuilder.isMetaonly();
+        this.use100 = destInfoBuilder.isUse100();
+        this.privilegedSubscriber = destInfoBuilder.isPrivilegedSubscriber();
+        this.followRedirects = destInfoBuilder.isFollowRedirects();
+        this.decompress = destInfoBuilder.isDecompress();
     }
 
     /**
      * Create a destination information object.
      *
-     * @param    name    n:fqdn or s:subid
-     * @param    spool    The directory where files are spooled.
-     * @param    subscription    The subscription.
+     * @param name n:fqdn or s:subid
+     * @param spool The directory where files are spooled.
+     * @param subscription The subscription.
      */
     public DestInfo(String name, String spool, NodeConfig.ProvSubscription subscription) {
         this.name = name;
@@ -164,8 +84,8 @@ public class DestInfo {
         this.decompress = subscription.isDecompress();
     }
 
-    public boolean equals(Object o) {
-        return ((o instanceof DestInfo) && ((DestInfo) o).spool.equals(spool));
+    public boolean equals(Object object) {
+        return ((object instanceof DestInfo) && ((DestInfo) object).spool.equals(spool));
     }
 
     public int hashCode() {
@@ -173,7 +93,7 @@ public class DestInfo {
     }
 
     /**
-     * Get the name of this destination
+     * Get the name of this destination.
      */
     public String getName() {
         return (name);
@@ -217,7 +137,7 @@ public class DestInfo {
     }
 
     /**
-     * Get the user for authentication
+     * Get the user for authentication.
      *
      * @return The name of the user for logging
      */
@@ -226,7 +146,7 @@ public class DestInfo {
     }
 
     /**
-     * Get the authentication header
+     * Get the authentication header.
      *
      * @return The string to use to authenticate to the recipient.
      */
@@ -235,7 +155,7 @@ public class DestInfo {
     }
 
     /**
-     * Is this a metadata only delivery?
+     * Is this a metadata only delivery.
      *
      * @return True if this is a metadata only delivery
      */
@@ -244,7 +164,7 @@ public class DestInfo {
     }
 
     /**
-     * Should I send expect 100-continue header?
+     * Should I send expect 100-continue header.
      *
      * @return True if I should.
      */
@@ -253,23 +173,23 @@ public class DestInfo {
     }
 
     /**
-     * Should we wait to receive a file processed acknowledgement before deleting file
+     * Should we wait to receive a file processed acknowledgement before deleting file.
      */
     public boolean isPrivilegedSubscriber() {
         return (privilegedSubscriber);
     }
 
     /**
-    * Should I follow redirects?
-    *
-    * @return True if I should.
-    */
+     * Should I follow redirects.
+     *
+     * @return True if I should.
+     */
     public boolean isFollowRedirects() {
         return (followRedirects);
     }
 
     /**
-     * Should i decompress the file before sending it on
+     * Should i decompress the file before sending it on.
      *
      * @return True if I should.
      */
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DestInfoBuilder.java
new file mode 100644 (file)
index 0000000..00c5cd8
--- /dev/null
@@ -0,0 +1,149 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dmaap.datarouter.node;
+
+public class DestInfoBuilder {
+
+    private String destInfoName;
+    private String destInfoSpool;
+    private String destInfoSubId;
+    private String destInfoLogData;
+    private String destInfoUrl;
+    private String destInfoAuthUser;
+    private String destInfoAuthentication;
+    private boolean destInfoMetaOnly;
+    private boolean destInfoUse100;
+    private boolean destInfoPrivilegedSubscriber;
+    private boolean destInfoFollowRedirects;
+    private boolean destInfoDecompress;
+
+    public String getName() {
+        return destInfoName;
+    }
+
+    public DestInfoBuilder setName(String name) {
+        this.destInfoName = name;
+        return this;
+    }
+
+    public String getSpool() {
+        return destInfoSpool;
+    }
+
+    public DestInfoBuilder setSpool(String spool) {
+        this.destInfoSpool = spool;
+        return this;
+    }
+
+    public String getSubid() {
+        return destInfoSubId;
+    }
+
+    public DestInfoBuilder setSubid(String subid) {
+        this.destInfoSubId = subid;
+        return this;
+    }
+
+    String getLogdata() {
+        return destInfoLogData;
+    }
+
+    DestInfoBuilder setLogdata(String logdata) {
+        this.destInfoLogData = logdata;
+        return this;
+    }
+
+    public String getUrl() {
+        return destInfoUrl;
+    }
+
+    public DestInfoBuilder setUrl(String url) {
+        this.destInfoUrl = url;
+        return this;
+    }
+
+    String getAuthuser() {
+        return destInfoAuthUser;
+    }
+
+    DestInfoBuilder setAuthuser(String authuser) {
+        this.destInfoAuthUser = authuser;
+        return this;
+    }
+
+    String getAuthentication() {
+        return destInfoAuthentication;
+    }
+
+    DestInfoBuilder setAuthentication(String authentication) {
+        this.destInfoAuthentication = authentication;
+        return this;
+    }
+
+    boolean isMetaonly() {
+        return destInfoMetaOnly;
+    }
+
+    DestInfoBuilder setMetaonly(boolean metaonly) {
+        this.destInfoMetaOnly = metaonly;
+        return this;
+    }
+
+    boolean isUse100() {
+        return destInfoUse100;
+    }
+
+    DestInfoBuilder setUse100(boolean use100) {
+        this.destInfoUse100 = use100;
+        return this;
+    }
+
+    boolean isPrivilegedSubscriber() {
+        return destInfoPrivilegedSubscriber;
+    }
+
+    DestInfoBuilder setPrivilegedSubscriber(boolean privilegedSubscriber) {
+        this.destInfoPrivilegedSubscriber = privilegedSubscriber;
+        return this;
+    }
+
+    boolean isFollowRedirects() {
+        return destInfoFollowRedirects;
+    }
+
+    DestInfoBuilder setFollowRedirects(boolean followRedirects) {
+        this.destInfoFollowRedirects = followRedirects;
+        return this;
+    }
+
+    boolean isDecompress() {
+        return destInfoDecompress;
+    }
+
+    DestInfoBuilder setDecompress(boolean decompress) {
+        this.destInfoDecompress = decompress;
+        return this;
+    }
+
+    DestInfo createDestInfo() {
+        return new DestInfo(this);
+    }
+}
\ No newline at end of file
index 534b2b3..4985268 100644 (file)
@@ -26,39 +26,40 @@ package org.onap.dmaap.datarouter.node;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-
 import java.io.IOException;
-import java.util.*;
-import java.net.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
 
 /**
- * Determine if an IP address is from a machine
+ * Determine if an IP address is from a machine.
  */
 public class IsFrom {
+
+    private static EELFLogger logger = EELFManager.getInstance().getLogger(IsFrom.class);
     private long nextcheck;
     private String[] ips;
     private String fqdn;
-    private static EELFLogger logger = EELFManager.getInstance().getLogger(IsFrom.class);
 
     /**
-     * Configure the JVM DNS cache to have a 10 second TTL.  This needs to be called very very early or it won't have any effect.
+     * Create an IsFrom for the specified fully qualified domain name.
      */
-    public static void setDNSCache() {
-        java.security.Security.setProperty("networkaddress.cache.ttl", "10");
+    public IsFrom(String fqdn) {
+        this.fqdn = fqdn;
     }
 
     /**
-     * Create an IsFrom for the specified fully qualified domain name.
+     * Configure the JVM DNS cache to have a 10 second TTL.  This needs to be called very very early or it won't have
+     * any effect.
      */
-    public IsFrom(String fqdn) {
-        this.fqdn = fqdn;
+    public static void setDNSCache() {
+        java.security.Security.setProperty("networkaddress.cache.ttl", "10");
     }
 
     /**
-     * Check if an IP address matches.  If it has been more than
-     * 10 seconds since DNS was last checked for changes to the
-     * IP address(es) of this FQDN, check again.  Then check
-     * if the specified IP address belongs to the FQDN.
+     * Check if an IP address matches.  If it has been more than 10 seconds since DNS was last checked for changes to
+     * the IP address(es) of this FQDN, check again.  Then check if the specified IP address belongs to the FQDN.
      */
     public synchronized boolean isFrom(String ip) {
         long now = System.currentTimeMillis();
@@ -98,7 +99,7 @@ public class IsFrom {
     }
 
     /**
-     * Return the fully qualified domain name
+     * Return the fully qualified domain name.
      */
     public String toString() {
         return (fqdn);
index 6ffb760..3277408 100644 (file)
@@ -20,6 +20,7 @@
  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
  * *
  ******************************************************************************/
+
 package org.onap.dmaap.datarouter.node;
 
 import com.att.eelf.configuration.EELFLogger;
@@ -35,16 +36,18 @@ import java.util.Arrays;
 import java.util.TimerTask;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Cleanup of old log files.
- * <p>
- * Periodically scan the log directory for log files that are older than the log file retention interval, and delete
+ *
+ * <p>Periodically scan the log directory for log files that are older than the log file retention interval, and delete
  * them.  In a future release, This class will also be responsible for uploading events logs to the log server to
  * support the log query APIs.
  */
 
 public class LogManager extends TimerTask {
+
     private EELFLogger logger = EELFManager.getInstance().getLogger(LogManager.class);
     private NodeConfigManager config;
     private Matcher isnodelog;
@@ -53,8 +56,55 @@ public class LogManager extends TimerTask {
     private String uploaddir;
     private String logdir;
 
+    /**
+     * Construct a log manager
+     *
+     * <p>The log manager will check for expired log files every 5 minutes at 20 seconds after the 5 minute boundary.
+     * (Actually, the interval is the event log rollover interval, which defaults to 5 minutes).
+     */
+    public LogManager(NodeConfigManager config) {
+        this.config = config;
+        try {
+            isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher("");
+            iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher("");
+        } catch (Exception e) {
+            logger.error("Exception", e);
+        }
+        logdir = config.getLogDir();
+        uploaddir = logdir + "/.spool";
+        (new File(uploaddir)).mkdirs();
+        long now = System.currentTimeMillis();
+        long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 30000);
+        long when = now - now % intvl + intvl + 20000L;
+        config.getTimer().scheduleAtFixedRate(this, when - now, intvl);
+        worker = new Uploader();
+    }
+
+    /**
+     * Trigger check for expired log files and log files to upload.
+     */
+    public void run() {
+        worker.poke();
+    }
+
     private class Uploader extends Thread implements DeliveryQueueHelper {
+
+        private static final String EXCEPTION = "Exception";
+        private static final String META = "/.meta";
         private EELFLogger logger = EELFManager.getInstance().getLogger(Uploader.class);
+        private DeliveryQueue dq;
+
+        Uploader() {
+            dq = new DeliveryQueue(this,
+                    new DestInfoBuilder().setName("LogUpload").setSpool(uploaddir).setSubid(null).setLogdata(null)
+                            .setUrl(null).setAuthuser(config.getMyName()).setAuthentication(config.getMyAuth())
+                            .setMetaonly(false).setUse100(false).setPrivilegedSubscriber(false)
+                            .setFollowRedirects(false)
+                            .setDecompress(false).createDestInfo());
+            setDaemon(true);
+            setName("Log Uploader");
+            start();
+        }
 
         public long getInitFailureTimer() {
             return (10000L);
@@ -89,6 +139,7 @@ public class LogManager extends TimerTask {
         }
 
         public void handleUnreachable(DestInfo destinationInfo) {
+            throw new UnsupportedOperationException();
         }
 
         public boolean handleRedirection(DestInfo destinationInfo, String location, String fileid) {
@@ -103,24 +154,11 @@ public class LogManager extends TimerTask {
             return (null);
         }
 
-        private DeliveryQueue dq;
-
-        public Uploader() {
-            dq = new DeliveryQueue(this,
-                new DestInfo.DestInfoBuilder().setName("LogUpload").setSpool(uploaddir).setSubid(null).setLogdata(null)
-                    .setUrl(null).setAuthuser(config.getMyName()).setAuthentication(config.getMyAuth())
-                    .setMetaonly(false).setUse100(false).setPrivilegedSubscriber(false).setFollowRedirects(false)
-                    .setDecompress(false).createDestInfo());
-            setDaemon(true);
-            setName("Log Uploader");
-            start();
-        }
-
         private synchronized void snooze() {
             try {
                 wait(10000);
             } catch (Exception e) {
-                logger.error("InterruptedException", e);
+                logger.error(EXCEPTION, e);
             }
         }
 
@@ -145,73 +183,48 @@ public class LogManager extends TimerTask {
             String curlog = StatusLog.getCurLogFile();
             curlog = curlog.substring(curlog.lastIndexOf('/') + 1);
             try {
-                Writer w = new FileWriter(uploaddir + "/.meta");
-                w.write("POST\tlogdata\nContent-Type\ttext/plain\n");
-                w.close();
+                Writer writer = new FileWriter(uploaddir + META);
+                writer.write("POST\tlogdata\nContent-Type\ttext/plain\n");
+                writer.close();
                 BufferedReader br = new BufferedReader(new FileReader(uploaddir + "/.lastqueued"));
                 lastqueued = br.readLine();
                 br.close();
             } catch (Exception e) {
-                logger.error("Exception", e);
+                logger.error(EXCEPTION, e);
             }
             for (String fn : fns) {
                 if (!isnodelog.reset(fn).matches()) {
                     if (!iseventlog.reset(fn).matches()) {
                         continue;
                     }
-                    if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) {
-                        lastqueued = fn;
-                        try {
-                            String pid = config.getPublishId();
-                            Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn));
-                            Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + "/.meta"));
-                        } catch (Exception e) {
-                            logger.error("Exception", e);
-                        }
-                    }
+                    lastqueued = setLastQueued(lastqueued, curlog, fn);
                 }
-                File f = new File(dir, fn);
-                if (f.lastModified() < threshold) {
-                    f.delete();
+                File file = new File(dir, fn);
+                if (file.lastModified() < threshold) {
+                    file.delete();
                 }
             }
             try (Writer w = new FileWriter(uploaddir + "/.lastqueued")) {
-                (new File(uploaddir + "/.meta")).delete();
+                (new File(uploaddir + META)).delete();
                 w.write(lastqueued + "\n");
             } catch (Exception e) {
-                logger.error("Exception", e);
+                logger.error(EXCEPTION, e);
             }
         }
-    }
 
-    /**
-     * Construct a log manager
-     * <p>
-     * The log manager will check for expired log files every 5 minutes at 20 seconds after the 5 minute boundary.
-     * (Actually, the interval is the event log rollover interval, which defaults to 5 minutes).
-     */
-    public LogManager(NodeConfigManager config) {
-        this.config = config;
-        try {
-            isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher("");
-            iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher("");
-        } catch (Exception e) {
-            logger.error("Exception", e);
+        @NotNull
+        private String setLastQueued(String lastqueued, String curlog, String fn) {
+            if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) {
+                lastqueued = fn;
+                try {
+                    String pid = config.getPublishId();
+                    Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn));
+                    Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + META));
+                } catch (Exception e) {
+                    logger.error(EXCEPTION, e);
+                }
+            }
+            return lastqueued;
         }
-        logdir = config.getLogDir();
-        uploaddir = logdir + "/.spool";
-        (new File(uploaddir)).mkdirs();
-        long now = System.currentTimeMillis();
-        long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 30000);
-        long when = now - now % intvl + intvl + 20000L;
-        config.getTimer().scheduleAtFixedRate(this, when - now, intvl);
-        worker = new Uploader();
-    }
-
-    /**
-     * Trigger check for expired log files and log files to upload
-     */
-    public void run() {
-        worker.poke();
     }
 }
index d455f2d..7f01821 100644 (file)
@@ -26,924 +26,933 @@ package org.onap.dmaap.datarouter.node;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.Vector;
 
 /**
  * Processed configuration for this node.
- * <p>
- * The NodeConfig represents a processed configuration from the Data Router provisioning server.  Each time
+ *
+ * <p>The NodeConfig represents a processed configuration from the Data Router provisioning server.  Each time
  * configuration data is received from the provisioning server, a new NodeConfig is created and the previous one
  * discarded.
  */
 public class NodeConfig {
+
+    private static final String PUBLISHER_NOT_PERMITTED = "Publisher not permitted for this feed";
     private static EELFLogger logger = EELFManager.getInstance().getLogger(NodeConfig.class);
+    private HashMap<String, String> params = new HashMap<>();
+    private HashMap<String, Feed> feeds = new HashMap<>();
+    private HashMap<String, DestInfo> nodeinfo = new HashMap<>();
+    private HashMap<String, DestInfo> subinfo = new HashMap<>();
+    private HashMap<String, IsFrom> nodes = new HashMap<>();
+    private HashMap<String, ProvSubscription> provSubscriptions = new HashMap<>();
+    private String myname;
+    private String myauth;
+    private DestInfo[] alldests;
+    private int rrcntr;
+
     /**
-     * Raw configuration entry for a data router node
+     * Process the raw provisioning data to configure this node.
+     *
+     * @param pd The parsed provisioning data
+     * @param myname My name as seen by external systems
+     * @param spooldir The directory where temporary files live
+     * @param port The port number for URLs
+     * @param nodeauthkey The keying string used to generate node authentication credentials
      */
-    public static class ProvNode {
-
-        private String cname;
-
-        /**
-         * Construct a node configuration entry.
-         *
-         * @param cname The cname of the node.
-         */
-        public ProvNode(String cname) {
-            this.cname = cname;
+    public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) {
+        this.myname = myname;
+        for (ProvParam p : pd.getParams()) {
+            params.put(p.getName(), p.getValue());
         }
-
-        /**
-         * Get the cname of the node
-         */
-        public String getCName() {
-            return (cname);
+        ArrayList<DestInfo> destInfos = new ArrayList<>();
+        myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);
+        for (ProvNode pn : pd.getNodes()) {
+            String commonName = pn.getCName();
+            if (nodeinfo.get(commonName) != null) {
+                continue;
+            }
+            DestInfo di = new DestInfoBuilder().setName("n:" + commonName).setSpool(spooldir + "/n/" + commonName)
+                    .setSubid(null)
+                    .setLogdata("n2n-" + commonName).setUrl("https://" + commonName + ":" + port + "/internal/publish")
+                    .setAuthuser(commonName).setAuthentication(myauth).setMetaonly(false).setUse100(true)
+                    .setPrivilegedSubscriber(false).setFollowRedirects(false).setDecompress(false).createDestInfo();
+            (new File(di.getSpool())).mkdirs();
+            String auth = NodeUtils.getNodeAuthHdr(commonName, nodeauthkey);
+            destInfos.add(di);
+            nodeinfo.put(commonName, di);
+            nodes.put(auth, new IsFrom(commonName));
         }
-    }
-
-    /**
-     * Raw configuration entry for a provisioning parameter
-     */
-    public static class ProvParam {
-
-        private String name;
-        private String value;
-
-        /**
-         * Construct a provisioning parameter configuration entry.
-         *
-         * @param name The name of the parameter.
-         * @param value The value of the parameter.
-         */
-        public ProvParam(String name, String value) {
-            this.name = name;
-            this.value = value;
+        PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[0]), pd.getHops());
+        HashMap<String, ArrayList<Redirection>> rdtab = new HashMap<>();
+        for (ProvForceIngress pfi : pd.getForceIngress()) {
+            ArrayList<Redirection> v = rdtab.get(pfi.getFeedId());
+            if (v == null) {
+                v = new ArrayList<>();
+                rdtab.put(pfi.getFeedId(), v);
+            }
+            Redirection r = new Redirection();
+            if (pfi.getSubnet() != null) {
+                r.snm = new SubnetMatcher(pfi.getSubnet());
+            }
+            r.user = pfi.getUser();
+            r.nodes = pfi.getNodes();
+            v.add(r);
         }
-
-        /**
-         * Get the name of the parameter.
-         */
-        public String getName() {
-            return (name);
+        HashMap<String, HashMap<String, String>> pfutab = new HashMap<>();
+        for (ProvFeedUser pfu : pd.getFeedUsers()) {
+            HashMap<String, String> t = pfutab.get(pfu.getFeedId());
+            if (t == null) {
+                t = new HashMap<>();
+                pfutab.put(pfu.getFeedId(), t);
+            }
+            t.put(pfu.getCredentials(), pfu.getUser());
         }
-
-        /**
-         * Get the value of the parameter.
-         */
-        public String getValue() {
-            return (value);
+        HashMap<String, String> egrtab = new HashMap<>();
+        for (ProvForceEgress pfe : pd.getForceEgress()) {
+            if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) {
+                continue;
+            }
+            egrtab.put(pfe.getSubId(), pfe.getNode());
         }
-    }
-
-    /**
-     * Raw configuration entry for a data feed.
-     */
-    public static class ProvFeed {
-
-        private String id;
-        private String logdata;
-        private String status;
-        private String createdDate;
-        /*
-         * AAF changes: TDP EPIC US# 307413
-         * Passing aafInstance from to identify legacy/AAF feeds
-         */
-        private String aafInstance;
-
-        /**
-         * Construct a feed configuration entry.
-         *
-         * @param id The feed ID of the entry.
-         * @param logdata String for log entries about the entry.
-         * @param status The reason why this feed cannot be used (Feed has been deleted, Feed has been suspended) or
-         * null if it is valid.
-         */
-        public ProvFeed(String id, String logdata, String status, String createdDate, String aafInstance) {
-            this.id = id;
-            this.logdata = logdata;
-            this.status = status;
-            this.createdDate = createdDate;
-            this.aafInstance = aafInstance;
+        HashMap<String, ArrayList<SubnetMatcher>> pfstab = new HashMap<>();
+        for (ProvFeedSubnet pfs : pd.getFeedSubnets()) {
+            ArrayList<SubnetMatcher> v = pfstab.get(pfs.getFeedId());
+            if (v == null) {
+                v = new ArrayList<>();
+                pfstab.put(pfs.getFeedId(), v);
+            }
+            v.add(new SubnetMatcher(pfs.getCidr()));
         }
-
-        /**
-         * Get the created date of the data feed.
-         */
-        public String getCreatedDate()
-        {
-            return(createdDate);
+        HashMap<String, StringBuilder> feedTargets = new HashMap<>();
+        HashSet<String> allfeeds = new HashSet<>();
+        for (ProvFeed pfx : pd.getFeeds()) {
+            if (pfx.getStatus() == null) {
+                allfeeds.add(pfx.getId());
+            }
         }
-
-        /**
-         * Get the aafInstance of the data feed.
-         */
-        public String getAafInstance() {
-            return aafInstance;
+        for (ProvSubscription provSubscription : pd.getSubscriptions()) {
+            String subId = provSubscription.getSubId();
+            String feedId = provSubscription.getFeedId();
+            if (isFeedOrSubKnown(allfeeds, subId, feedId)) {
+                continue;
+            }
+            int sididx = 999;
+            try {
+                sididx = Integer.parseInt(subId);
+                sididx -= sididx % 100;
+            } catch (Exception e) {
+                logger.error("NODE0517 Exception NodeConfig: " + e);
+            }
+            String subscriptionDirectory = sididx + "/" + subId;
+            DestInfo destinationInfo = new DestInfo("s:" + subId,
+                    spooldir + "/s/" + subscriptionDirectory, provSubscription);
+            (new File(destinationInfo.getSpool())).mkdirs();
+            destInfos.add(destinationInfo);
+            provSubscriptions.put(subId, provSubscription);
+            subinfo.put(subId, destinationInfo);
+            String egr = egrtab.get(subId);
+            if (egr != null) {
+                subId = pf.getPath(egr) + subId;
+            }
+            StringBuilder sb = feedTargets.get(feedId);
+            if (sb == null) {
+                sb = new StringBuilder();
+                feedTargets.put(feedId, sb);
+            }
+            sb.append(' ').append(subId);
         }
-
-        /**
-         * Get the feed id of the data feed.
-         */
-        public String getId() {
-            return (id);
+        alldests = destInfos.toArray(new DestInfo[0]);
+        for (ProvFeed pfx : pd.getFeeds()) {
+            String fid = pfx.getId();
+            Feed f = feeds.get(fid);
+            if (f != null) {
+                continue;
+            }
+            f = new Feed();
+            feeds.put(fid, f);
+            f.createdDate = pfx.getCreatedDate();
+            f.loginfo = pfx.getLogData();
+            f.status = pfx.getStatus();
+            /*
+             * AAF changes: TDP EPIC US# 307413
+             * Passing aafInstance from ProvFeed to identify legacy/AAF feeds
+             */
+            f.aafInstance = pfx.getAafInstance();
+            ArrayList<SubnetMatcher> v1 = pfstab.get(fid);
+            if (v1 == null) {
+                f.subnets = new SubnetMatcher[0];
+            } else {
+                f.subnets = v1.toArray(new SubnetMatcher[0]);
+            }
+            HashMap<String, String> h1 = pfutab.get(fid);
+            if (h1 == null) {
+                h1 = new HashMap();
+            }
+            f.authusers = h1;
+            ArrayList<Redirection> v2 = rdtab.get(fid);
+            if (v2 == null) {
+                f.redirections = new Redirection[0];
+            } else {
+                f.redirections = v2.toArray(new Redirection[0]);
+            }
+            StringBuilder sb = feedTargets.get(fid);
+            if (sb == null) {
+                f.targets = new Target[0];
+            } else {
+                f.targets = parseRouting(sb.toString());
+            }
         }
+    }
 
-        /**
-         * Get the log data of the data feed.
-         */
-        public String getLogData() {
-            return (logdata);
+    /**
+     * Parse a target string into an array of targets
+     *
+     * @param routing Target string
+     * @return Array of targets.
+     */
+    public Target[] parseRouting(String routing) {
+        routing = routing.trim();
+        if ("".equals(routing)) {
+            return (new Target[0]);
         }
-
-        /**
-         * Get the status of the data feed.
-         */
-        public String getStatus() {
-            return (status);
+        String[] xx = routing.split("\\s+");
+        HashMap<String, Target> tmap = new HashMap<>();
+        HashSet<String> subset = new HashSet<>();
+        ArrayList<Target> tv = new ArrayList<>();
+        for (int i = 0; i < xx.length; i++) {
+            String t = xx[i];
+            int j = t.indexOf('/');
+            if (j == -1) {
+                addTarget(subset, tv, t);
+            } else {
+                addTargetWithRouting(tmap, tv, t, j);
+            }
         }
+        return (tv.toArray(new Target[0]));
     }
 
     /**
-     * Raw configuration entry for a feed user.
+     * Check whether this is a valid node-to-node transfer
+     *
+     * @param credentials Credentials offered by the supposed node
+     * @param ip IP address the request came from
      */
-    public static class ProvFeedUser {
-
-        private String feedid;
-        private String user;
-        private String credentials;
+    public boolean isAnotherNode(String credentials, String ip) {
+        IsFrom n = nodes.get(credentials);
+        return (n != null && n.isFrom(ip));
+    }
 
-        /**
-         * Construct a feed user configuration entry
-         *
-         * @param feedid The feed id.
-         * @param user The user that will publish to the feed.
-         * @param credentials The Authorization header the user will use to publish.
-         */
-        public ProvFeedUser(String feedid, String user, String credentials) {
-            this.feedid = feedid;
-            this.user = user;
-            this.credentials = credentials;
+    /**
+     * Check whether publication is allowed.
+     *
+     * @param feedid The ID of the feed being requested.
+     * @param credentials The offered credentials
+     * @param ip The requesting IP address
+     */
+    public String isPublishPermitted(String feedid, String credentials, String ip) {
+        Feed f = feeds.get(feedid);
+        String nf = "Feed does not exist";
+        if (f != null) {
+            nf = f.status;
+        }
+        if (nf != null) {
+            return (nf);
+        }
+        String user = f.authusers.get(credentials);
+        if (user == null) {
+            return (PUBLISHER_NOT_PERMITTED);
+        }
+        if (f.subnets.length == 0) {
+            return (null);
         }
+        byte[] addr = NodeUtils.getInetAddress(ip);
+        for (SubnetMatcher snm : f.subnets) {
+            if (snm.matches(addr)) {
+                return (null);
+            }
+        }
+        return (PUBLISHER_NOT_PERMITTED);
+    }
 
-        /**
-         * Get the feed id of the feed user.
-         */
-        public String getFeedId() {
-            return (feedid);
+    /**
+     * Check whether delete file is allowed.
+     *
+     * @param subId The ID of the subscription being requested.
+     */
+    public boolean isDeletePermitted(String subId) {
+        ProvSubscription provSubscription = provSubscriptions.get(subId);
+        return provSubscription.isPrivilegedSubscriber();
+    }
+
+    /**
+     * Check whether publication is allowed for AAF Feed.
+     *
+     * @param feedid The ID of the feed being requested.
+     * @param ip The requesting IP address
+     */
+    public String isPublishPermitted(String feedid, String ip) {
+        Feed f = feeds.get(feedid);
+        String nf = "Feed does not exist";
+        if (f != null) {
+            nf = f.status;
+        }
+        if (nf != null) {
+            return nf;
+        }
+        if (f.subnets.length == 0) {
+            return null;
+        }
+        byte[] addr = NodeUtils.getInetAddress(ip);
+        for (SubnetMatcher snm : f.subnets) {
+            if (snm.matches(addr)) {
+                return null;
+            }
         }
+        return PUBLISHER_NOT_PERMITTED;
+    }
 
-        /**
-         * Get the user for the feed user.
-         */
-        public String getUser() {
-            return (user);
+    /**
+     * Get authenticated user
+     */
+    public String getAuthUser(String feedid, String credentials) {
+        return (feeds.get(feedid).authusers.get(credentials));
+    }
+
+    /**
+     * AAF changes: TDP EPIC US# 307413 Check AAF_instance for feed ID
+     *
+     * @param feedid The ID of the feed specified
+     */
+    public String getAafInstance(String feedid) {
+        Feed f = feeds.get(feedid);
+        return f.aafInstance;
+    }
+
+    /**
+     * Check if the request should be redirected to a different ingress node
+     */
+    public String getIngressNode(String feedid, String user, String ip) {
+        Feed f = feeds.get(feedid);
+        if (f.redirections.length == 0) {
+            return (null);
+        }
+        byte[] addr = NodeUtils.getInetAddress(ip);
+        for (Redirection r : f.redirections) {
+            if ((r.user != null && !user.equals(r.user)) || (r.snm != null && !r.snm.matches(addr))) {
+                continue;
+            }
+            for (String n : r.nodes) {
+                if (myname.equals(n)) {
+                    return (null);
+                }
+            }
+            if (r.nodes.length == 0) {
+                return (null);
+            }
+            return (r.nodes[rrcntr++ % r.nodes.length]);
         }
+        return (null);
+    }
 
-        /**
-         * Get the credentials for the feed user.
-         */
-        public String getCredentials() {
-            return (credentials);
+    /**
+     * Get a provisioned configuration parameter
+     */
+    public String getProvParam(String name) {
+        return (params.get(name));
+    }
+
+    /**
+     * Get all the DestInfos
+     */
+    public DestInfo[] getAllDests() {
+        return (alldests);
+    }
+
+    /**
+     * Get the targets for a feed
+     *
+     * @param feedid The feed ID
+     * @return The targets this feed should be delivered to
+     */
+    public Target[] getTargets(String feedid) {
+        if (feedid == null) {
+            return (new Target[0]);
+        }
+        Feed f = feeds.get(feedid);
+        if (f == null) {
+            return (new Target[0]);
         }
+        return (f.targets);
     }
 
     /**
-     * Raw configuration entry for a feed subnet
+     * Get the creation date for a feed
+     *
+     * @param feedid The feed ID
+     * @return the timestamp of creation date of feed id passed
      */
-    public static class ProvFeedSubnet {
+    public String getCreatedDate(String feedid) {
+        Feed f = feeds.get(feedid);
+        return (f.createdDate);
+    }
 
-        private String feedid;
-        private String cidr;
+    /**
+     * Get the feed ID for a subscription
+     *
+     * @param subid The subscription ID
+     * @return The feed ID
+     */
+    public String getFeedId(String subid) {
+        DestInfo di = subinfo.get(subid);
+        if (di == null) {
+            return (null);
+        }
+        return (di.getLogData());
+    }
 
-        /**
-         * Construct a feed subnet configuration entry
-         *
-         * @param feedid The feed ID
-         * @param cidr The CIDR allowed to publish to the feed.
-         */
-        public ProvFeedSubnet(String feedid, String cidr) {
-            this.feedid = feedid;
-            this.cidr = cidr;
+    /**
+     * Get the spool directory for a subscription
+     *
+     * @param subid The subscription ID
+     * @return The spool directory
+     */
+    public String getSpoolDir(String subid) {
+        DestInfo di = subinfo.get(subid);
+        if (di == null) {
+            return (null);
+        }
+        return (di.getSpool());
+    }
+
+    /**
+     * Get the Authorization value this node uses
+     *
+     * @return The Authorization header value for this node
+     */
+    public String getMyAuth() {
+        return (myauth);
+    }
+
+    private boolean isFeedOrSubKnown(HashSet<String> allfeeds, String subId, String feedId) {
+        return !allfeeds.contains(feedId) || subinfo.get(subId) != null;
+    }
+
+    private void addTargetWithRouting(HashMap<String, Target> tmap, ArrayList<Target> tv, String t, int j) {
+        String node = t.substring(0, j);
+        String rtg = t.substring(j + 1);
+        DestInfo di = nodeinfo.get(node);
+        if (di == null) {
+            tv.add(new Target(null, t));
+        } else {
+            Target tt = tmap.get(node);
+            if (tt == null) {
+                tt = new Target(di, rtg);
+                tmap.put(node, tt);
+                tv.add(tt);
+            } else {
+                tt.addRouting(rtg);
+            }
+        }
+    }
+
+    private void addTarget(HashSet<String> subset, ArrayList<Target> tv, String t) {
+        DestInfo di = subinfo.get(t);
+        if (di == null) {
+            tv.add(new Target(null, t));
+        } else {
+            if (!subset.contains(t)) {
+                subset.add(t);
+                tv.add(new Target(di, null));
+            }
         }
+    }
+
+    /**
+     * Raw configuration entry for a data router node
+     */
+    public static class ProvNode {
+
+        private String cname;
 
         /**
-         * Get the feed id of the feed subnet.
+         * Construct a node configuration entry.
+         *
+         * @param cname The cname of the node.
          */
-        public String getFeedId() {
-            return (feedid);
+        public ProvNode(String cname) {
+            this.cname = cname;
         }
 
         /**
-         * Get the CIDR of the feed subnet.
+         * Get the cname of the node
          */
-        public String getCidr() {
-            return (cidr);
+        public String getCName() {
+            return (cname);
         }
     }
 
     /**
-     * Raw configuration entry for a subscription
+     * Raw configuration entry for a provisioning parameter
      */
-    public static class ProvSubscription {
+    public static class ProvParam {
 
-        private String subid;
-        private String feedid;
-        private String url;
-        private String authuser;
-        private String credentials;
-        private boolean metaonly;
-        private boolean use100;
-        private boolean privilegedSubscriber;
-        private boolean followRedirect;
-        private boolean decompress;
+        private String name;
+        private String value;
 
         /**
-         * Construct a subscription configuration entry
+         * Construct a provisioning parameter configuration entry.
          *
-         * @param subid The subscription ID
-         * @param feedid The feed ID
-         * @param url The base delivery URL (not including the fileid)
-         * @param authuser The user in the credentials used to deliver
-         * @param credentials The credentials used to authenticate to the delivery URL exactly as they go in the
-         * Authorization header.
-         * @param metaonly Is this a meta data only subscription?
-         * @param use100 Should we send Expect: 100-continue?
-         * @param privilegedSubscriber Can we wait to receive a delete file call before deleting file
-         * @param followRedirect Is follow redirect of destination enabled?
-         * @param decompress To see if they want their information compressed or decompressed
+         * @param name The name of the parameter.
+         * @param value The value of the parameter.
          */
-        public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean followRedirect, boolean decompress) {
-            this.subid = subid;
-            this.feedid = feedid;
-            this.url = url;
-            this.authuser = authuser;
-            this.credentials = credentials;
-            this.metaonly = metaonly;
-            this.use100 = use100;
-            this.privilegedSubscriber = privilegedSubscriber;
-            this.followRedirect = followRedirect;
-            this.decompress = decompress;
+        public ProvParam(String name, String value) {
+            this.name = name;
+            this.value = value;
         }
 
         /**
-         * Get the subscription ID
+         * Get the name of the parameter.
          */
-        public String getSubId() {
-            return (subid);
+        public String getName() {
+            return (name);
         }
 
         /**
-         * Get the feed ID
+         * Get the value of the parameter.
          */
-        public String getFeedId() {
-            return (feedid);
+        public String getValue() {
+            return (value);
         }
+    }
 
-        /**
-         * Get the delivery URL
-         */
-        public String getURL() {
-            return (url);
-        }
+    /**
+     * Raw configuration entry for a data feed.
+     */
+    public static class ProvFeed {
 
-        /**
-         * Get the user
+        private String id;
+        private String logdata;
+        private String status;
+        private String createdDate;
+        /*
+         * AAF changes: TDP EPIC US# 307413
+         * Passing aafInstance from to identify legacy/AAF feeds
          */
-        public String getAuthUser() {
-            return (authuser);
-        }
+        private String aafInstance;
 
         /**
-         * Get the delivery credentials
+         * Construct a feed configuration entry.
+         *
+         * @param id The feed ID of the entry.
+         * @param logdata String for log entries about the entry.
+         * @param status The reason why this feed cannot be used (Feed has been deleted, Feed has been suspended) or
+         * null if it is valid.
          */
-        public String getCredentials() {
-            return (credentials);
+        public ProvFeed(String id, String logdata, String status, String createdDate, String aafInstance) {
+            this.id = id;
+            this.logdata = logdata;
+            this.status = status;
+            this.createdDate = createdDate;
+            this.aafInstance = aafInstance;
         }
 
         /**
-         * Is this a meta data only subscription?
+         * Get the created date of the data feed.
          */
-        public boolean isMetaDataOnly() {
-            return (metaonly);
+        public String getCreatedDate() {
+            return (createdDate);
         }
 
         /**
-         * Should we send Expect: 100-continue?
+         * Get the aafInstance of the data feed.
          */
-        public boolean isUsing100() {
-            return (use100);
+        public String getAafInstance() {
+            return aafInstance;
         }
 
         /**
-         * Can we wait to receive a delete file call before deleting file
+         * Get the feed id of the data feed.
          */
-        public boolean isPrivilegedSubscriber() {
-            return (privilegedSubscriber);
+        public String getId() {
+            return (id);
         }
 
         /**
-         * Should i decompress the file before sending it on
-        */
-        public boolean isDecompress() {
-            return (decompress);
+         * Get the log data of the data feed.
+         */
+        public String getLogData() {
+            return (logdata);
         }
 
         /**
-         *  New field is added - FOLLOW_REDIRECTS feature iTrack:DATARTR-17 - 1706
-         *     Get the followRedirect of this destination
+         * Get the status of the data feed.
          */
-        boolean getFollowRedirect() {
-            return(followRedirect);
+        public String getStatus() {
+            return (status);
         }
     }
 
     /**
-     * Raw configuration entry for controlled ingress to the data router node
+     * Raw configuration entry for a feed user.
      */
-    public static class ProvForceIngress {
+    public static class ProvFeedUser {
 
         private String feedid;
-        private String subnet;
         private String user;
-        private String[] nodes;
+        private String credentials;
 
         /**
-         * Construct a forced ingress configuration entry
+         * Construct a feed user configuration entry
          *
-         * @param feedid The feed ID that this entry applies to
-         * @param subnet The CIDR for which publisher IP addresses this entry applies to or "" if it applies to all
-         * publisher IP addresses
-         * @param user The publishing user this entry applies to or "" if it applies to all publishing users.
-         * @param nodes The array of FQDNs of the data router nodes to redirect publication attempts to.
+         * @param feedid The feed id.
+         * @param user The user that will publish to the feed.
+         * @param credentials The Authorization header the user will use to publish.
          */
-        public ProvForceIngress(String feedid, String subnet, String user, String[] nodes) {
+        public ProvFeedUser(String feedid, String user, String credentials) {
             this.feedid = feedid;
-            this.subnet = subnet;
             this.user = user;
-            //Sonar fix
-            if(nodes == null) {
-                this.nodes = new String[0];
-            } else {
-                this.nodes = Arrays.copyOf(nodes, nodes.length);
-            }
+            this.credentials = credentials;
         }
 
         /**
-         * Get the feed ID
+         * Get the feed id of the feed user.
          */
         public String getFeedId() {
             return (feedid);
         }
 
         /**
-         * Get the subnet
-         */
-        public String getSubnet() {
-            return (subnet);
-        }
-
-        /**
-         * Get the user
+         * Get the user for the feed user.
          */
         public String getUser() {
             return (user);
         }
 
         /**
-         * Get the node
+         * Get the credentials for the feed user.
          */
-        public String[] getNodes() {
-            return (nodes);
+        public String getCredentials() {
+            return (credentials);
         }
     }
 
     /**
-     * Raw configuration entry for controlled egress from the data router
+     * Raw configuration entry for a feed subnet
      */
-    public static class ProvForceEgress {
+    public static class ProvFeedSubnet {
 
-        private String subid;
-        private String node;
+        private String feedid;
+        private String cidr;
 
         /**
-         * Construct a forced egress configuration entry
+         * Construct a feed subnet configuration entry
          *
-         * @param subid The subscription ID the subscription with forced egress
-         * @param node The node handling deliveries for this subscription
+         * @param feedid The feed ID
+         * @param cidr The CIDR allowed to publish to the feed.
          */
-        public ProvForceEgress(String subid, String node) {
-            this.subid = subid;
-            this.node = node;
+        public ProvFeedSubnet(String feedid, String cidr) {
+            this.feedid = feedid;
+            this.cidr = cidr;
         }
 
         /**
-         * Get the subscription ID
+         * Get the feed id of the feed subnet.
          */
-        public String getSubId() {
-            return (subid);
+        public String getFeedId() {
+            return (feedid);
         }
 
         /**
-         * Get the node
+         * Get the CIDR of the feed subnet.
          */
-        public String getNode() {
-            return (node);
+        public String getCidr() {
+            return (cidr);
         }
     }
 
     /**
-     * Raw configuration entry for routing within the data router network
+     * Raw configuration entry for a subscription
      */
-    public static class ProvHop {
+    public static class ProvSubscription {
 
-        private String from;
-        private String to;
-        private String via;
+        private String subid;
+        private String feedid;
+        private String url;
+        private String authuser;
+        private String credentials;
+        private boolean metaonly;
+        private boolean use100;
+        private boolean privilegedSubscriber;
+        private boolean followRedirect;
+        private boolean decompress;
 
         /**
-         * A human readable description of this entry
+         * Construct a subscription configuration entry
+         *
+         * @param subid The subscription ID
+         * @param feedid The feed ID
+         * @param url The base delivery URL (not including the fileid)
+         * @param authuser The user in the credentials used to deliver
+         * @param credentials The credentials used to authenticate to the delivery URL exactly as they go in the
+         * Authorization header.
+         * @param metaonly Is this a meta data only subscription?
+         * @param use100 Should we send Expect: 100-continue?
+         * @param privilegedSubscriber Can we wait to receive a delete file call before deleting file
+         * @param followRedirect Is follow redirect of destination enabled?
+         * @param decompress To see if they want their information compressed or decompressed
          */
-        public String toString() {
-            return ("Hop " + from + "->" + to + " via " + via);
+        public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials,
+                boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean followRedirect,
+                boolean decompress) {
+            this.subid = subid;
+            this.feedid = feedid;
+            this.url = url;
+            this.authuser = authuser;
+            this.credentials = credentials;
+            this.metaonly = metaonly;
+            this.use100 = use100;
+            this.privilegedSubscriber = privilegedSubscriber;
+            this.followRedirect = followRedirect;
+            this.decompress = decompress;
         }
 
         /**
-         * Construct a hop entry
-         *
-         * @param from The FQDN of the node with the data to be delivered
-         * @param to The FQDN of the node that will deliver to the subscriber
-         * @param via The FQDN of the node where the from node should send the data
+         * Get the subscription ID
          */
-        public ProvHop(String from, String to, String via) {
-            this.from = from;
-            this.to = to;
-            this.via = via;
+        public String getSubId() {
+            return (subid);
         }
 
         /**
-         * Get the from node
+         * Get the feed ID
          */
-        public String getFrom() {
-            return (from);
+        public String getFeedId() {
+            return (feedid);
         }
 
         /**
-         * Get the to node
+         * Get the delivery URL
          */
-        public String getTo() {
-            return (to);
+        public String getURL() {
+            return (url);
         }
 
         /**
-         * Get the next intermediate node
+         * Get the user
          */
-        public String getVia() {
-            return (via);
+        public String getAuthUser() {
+            return (authuser);
         }
-    }
-
-    private static class Redirection {
-
-        SubnetMatcher snm;
-        String user;
-        String[] nodes;
-    }
-
-    private static class Feed {
-
-        String loginfo;
-        String status;
-        SubnetMatcher[] subnets;
-        Hashtable<String, String> authusers = new Hashtable<String, String>();
-        Redirection[] redirections;
-        Target[] targets;
-        String createdDate;
-        String aafInstance;
-    }
-
-    private Hashtable<String, String> params = new Hashtable<>();
-    private Hashtable<String, Feed> feeds = new Hashtable<>();
-    private Hashtable<String, DestInfo> nodeinfo = new Hashtable<>();
-    private Hashtable<String, DestInfo> subinfo = new Hashtable<>();
-    private Hashtable<String, IsFrom> nodes = new Hashtable<>();
-    private Hashtable<String, ProvSubscription> provSubscriptions = new Hashtable<>();
-    private String myname;
-    private String myauth;
-    private DestInfo[] alldests;
-    private int rrcntr;
 
-    /**
-     * Process the raw provisioning data to configure this node
-     *
-     * @param pd The parsed provisioning data
-     * @param myname My name as seen by external systems
-     * @param spooldir The directory where temporary files live
-     * @param port The port number for URLs
-     * @param nodeauthkey The keying string used to generate node authentication credentials
-     */
-    public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) {
-        this.myname = myname;
-        for (ProvParam p : pd.getParams()) {
-            params.put(p.getName(), p.getValue());
-        }
-        Vector<DestInfo> destInfos = new Vector<>();
-        myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);
-        for (ProvNode pn : pd.getNodes()) {
-            String cName = pn.getCName();
-            if (nodeinfo.get(cName) != null) {
-                continue;
-            }
-            String auth = NodeUtils.getNodeAuthHdr(cName, nodeauthkey);
-            DestInfo di = new DestInfo.DestInfoBuilder().setName("n:" + cName).setSpool(spooldir + "/n/" + cName).setSubid(null)
-                .setLogdata("n2n-" + cName).setUrl("https://" + cName + ":" + port + "/internal/publish")
-                .setAuthuser(cName).setAuthentication(myauth).setMetaonly(false).setUse100(true)
-                .setPrivilegedSubscriber(false).setFollowRedirects(false).setDecompress(false).createDestInfo();
-            (new File(di.getSpool())).mkdirs();
-            destInfos.add(di);
-            nodeinfo.put(cName, di);
-            nodes.put(auth, new IsFrom(cName));
-        }
-        PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[0]), pd.getHops());
-        Hashtable<String, Vector<Redirection>> rdtab = new Hashtable<>();
-        for (ProvForceIngress pfi : pd.getForceIngress()) {
-            Vector<Redirection> v = rdtab.get(pfi.getFeedId());
-            if (v == null) {
-                v = new Vector<>();
-                rdtab.put(pfi.getFeedId(), v);
-            }
-            Redirection r = new Redirection();
-            if (pfi.getSubnet() != null) {
-                r.snm = new SubnetMatcher(pfi.getSubnet());
-            }
-            r.user = pfi.getUser();
-            r.nodes = pfi.getNodes();
-            v.add(r);
-        }
-        Hashtable<String, Hashtable<String, String>> pfutab = new Hashtable<>();
-        for (ProvFeedUser pfu : pd.getFeedUsers()) {
-            Hashtable<String, String> t = pfutab.get(pfu.getFeedId());
-            if (t == null) {
-                t = new Hashtable<>();
-                pfutab.put(pfu.getFeedId(), t);
-            }
-            t.put(pfu.getCredentials(), pfu.getUser());
-        }
-        Hashtable<String, String> egrtab = new Hashtable<>();
-        for (ProvForceEgress pfe : pd.getForceEgress()) {
-            if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) {
-                continue;
-            }
-            egrtab.put(pfe.getSubId(), pfe.getNode());
-        }
-        Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<>();
-        for (ProvFeedSubnet pfs : pd.getFeedSubnets()) {
-            Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId());
-            if (v == null) {
-                v = new Vector<>();
-                pfstab.put(pfs.getFeedId(), v);
-            }
-            v.add(new SubnetMatcher(pfs.getCidr()));
-        }
-        Hashtable<String, StringBuffer> feedTargets = new Hashtable<>();
-        HashSet<String> allfeeds = new HashSet<>();
-        for (ProvFeed pfx : pd.getFeeds()) {
-            if (pfx.getStatus() == null) {
-                allfeeds.add(pfx.getId());
-            }
-        }
-        for (ProvSubscription provSubscription : pd.getSubscriptions()) {
-            String subId = provSubscription.getSubId();
-            String feedId = provSubscription.getFeedId();
-            if (!allfeeds.contains(feedId)) {
-                continue;
-            }
-            if (subinfo.get(subId) != null) {
-                continue;
-            }
-            int sididx = 999;
-            try {
-                sididx = Integer.parseInt(subId);
-                sididx -= sididx % 100;
-            } catch (Exception e) {
-                logger.error("NODE0517 Exception NodeConfig: "+e);
-            }
-            String subscriptionDirectory = sididx + "/" + subId;
-            DestInfo destinationInfo = new DestInfo("s:" + subId,
-                    spooldir + "/s/" + subscriptionDirectory, provSubscription);
-            (new File(destinationInfo.getSpool())).mkdirs();
-            destInfos.add(destinationInfo);
-            provSubscriptions.put(subId, provSubscription);
-            subinfo.put(subId, destinationInfo);
-            String egr = egrtab.get(subId);
-            if (egr != null) {
-                subId = pf.getPath(egr) + subId;
-            }
-            StringBuffer sb = feedTargets.get(feedId);
-            if (sb == null) {
-                sb = new StringBuffer();
-                feedTargets.put(feedId, sb);
-            }
-            sb.append(' ').append(subId);
-        }
-        alldests = destInfos.toArray(new DestInfo[0]);
-        for (ProvFeed pfx : pd.getFeeds()) {
-            String fid = pfx.getId();
-            Feed f = feeds.get(fid);
-            if (f != null) {
-                continue;
-            }
-            f = new Feed();
-            feeds.put(fid, f);
-            f.createdDate = pfx.getCreatedDate();
-            f.loginfo = pfx.getLogData();
-            f.status = pfx.getStatus();
-            /*
-             * AAF changes: TDP EPIC US# 307413
-             * Passing aafInstance from ProvFeed to identify legacy/AAF feeds
-             */
-            f.aafInstance = pfx.getAafInstance();
-            Vector<SubnetMatcher> v1 = pfstab.get(fid);
-            if (v1 == null) {
-                f.subnets = new SubnetMatcher[0];
-            } else {
-                f.subnets = v1.toArray(new SubnetMatcher[0]);
-            }
-            Hashtable<String, String> h1 = pfutab.get(fid);
-            if (h1 == null) {
-                h1 = new Hashtable<String, String>();
-            }
-            f.authusers = h1;
-            Vector<Redirection> v2 = rdtab.get(fid);
-            if (v2 == null) {
-                f.redirections = new Redirection[0];
-            } else {
-                f.redirections = v2.toArray(new Redirection[0]);
-            }
-            StringBuffer sb = feedTargets.get(fid);
-            if (sb == null) {
-                f.targets = new Target[0];
-            } else {
-                f.targets = parseRouting(sb.toString());
-            }
+        /**
+         * Get the delivery credentials
+         */
+        public String getCredentials() {
+            return (credentials);
         }
-    }
 
-    /**
-     * Parse a target string into an array of targets
-     *
-     * @param routing Target string
-     * @return Array of targets.
-     */
-    public Target[] parseRouting(String routing) {
-        routing = routing.trim();
-        if ("".equals(routing)) {
-            return (new Target[0]);
-        }
-        String[] xx = routing.split("\\s+");
-        Hashtable<String, Target> tmap = new Hashtable<String, Target>();
-        HashSet<String> subset = new HashSet<String>();
-        Vector<Target> tv = new Vector<Target>();
-        Target[] ret = new Target[xx.length];
-        for (int i = 0; i < xx.length; i++) {
-            String t = xx[i];
-            int j = t.indexOf('/');
-            if (j == -1) {
-                DestInfo di = subinfo.get(t);
-                if (di == null) {
-                    tv.add(new Target(null, t));
-                } else {
-                    if (!subset.contains(t)) {
-                        subset.add(t);
-                        tv.add(new Target(di, null));
-                    }
-                }
-            } else {
-                String node = t.substring(0, j);
-                String rtg = t.substring(j + 1);
-                DestInfo di = nodeinfo.get(node);
-                if (di == null) {
-                    tv.add(new Target(null, t));
-                } else {
-                    Target tt = tmap.get(node);
-                    if (tt == null) {
-                        tt = new Target(di, rtg);
-                        tmap.put(node, tt);
-                        tv.add(tt);
-                    } else {
-                        tt.addRouting(rtg);
-                    }
-                }
-            }
+        /**
+         * Is this a meta data only subscription?
+         */
+        public boolean isMetaDataOnly() {
+            return (metaonly);
         }
-        return (tv.toArray(new Target[0]));
-    }
-
-    /**
-     * Check whether this is a valid node-to-node transfer
-     *
-     * @param credentials Credentials offered by the supposed node
-     * @param ip IP address the request came from
-     */
-    public boolean isAnotherNode(String credentials, String ip) {
-        IsFrom n = nodes.get(credentials);
-        return (n != null && n.isFrom(ip));
-    }
 
-    /**
-     * Check whether publication is allowed.
-     *
-     * @param feedid The ID of the feed being requested.
-     * @param credentials The offered credentials
-     * @param ip The requesting IP address
-     */
-    public String isPublishPermitted(String feedid, String credentials, String ip) {
-        Feed f = feeds.get(feedid);
-        String nf = "Feed does not exist";
-        if (f != null) {
-            nf = f.status;
-        }
-        if (nf != null) {
-            return (nf);
+        /**
+         * Should we send Expect: 100-continue?
+         */
+        public boolean isUsing100() {
+            return (use100);
         }
-        String user = f.authusers.get(credentials);
-        if (user == null) {
-            return ("Publisher not permitted for this feed");
+
+        /**
+         * Can we wait to receive a delete file call before deleting file
+         */
+        public boolean isPrivilegedSubscriber() {
+            return (privilegedSubscriber);
         }
-        if (f.subnets.length == 0) {
-            return (null);
+
+        /**
+         * Should i decompress the file before sending it on
+         */
+        public boolean isDecompress() {
+            return (decompress);
         }
-        byte[] addr = NodeUtils.getInetAddress(ip);
-        for (SubnetMatcher snm : f.subnets) {
-            if (snm.matches(addr)) {
-                return (null);
-            }
+
+        /**
+         * New field is added - FOLLOW_REDIRECTS feature iTrack:DATARTR-17 - 1706 Get the followRedirect of this
+         * destination
+         */
+        boolean getFollowRedirect() {
+            return (followRedirect);
         }
-        return ("Publisher not permitted for this feed");
     }
 
     /**
-     * Check whether delete file is allowed.
-     *
-     * @param subId The ID of the subscription being requested.
+     * Raw configuration entry for controlled ingress to the data router node
      */
-    public boolean isDeletePermitted(String subId) {
-        ProvSubscription provSubscription = provSubscriptions.get(subId);
-        return provSubscription.isPrivilegedSubscriber();
-    }
+    public static class ProvForceIngress {
 
-    /**
-     * Check whether publication is allowed for AAF Feed.
-     * @param feedid The ID of the feed being requested.
-     * @param ip The requesting IP address
-     */
-    public String isPublishPermitted(String feedid, String ip) {
-        Feed f = feeds.get(feedid);
-        String nf = "Feed does not exist";
-        if (f != null) {
-            nf = f.status;
+        private String feedid;
+        private String subnet;
+        private String user;
+        private String[] nodes;
+
+        /**
+         * Construct a forced ingress configuration entry
+         *
+         * @param feedid The feed ID that this entry applies to
+         * @param subnet The CIDR for which publisher IP addresses this entry applies to or "" if it applies to all
+         * publisher IP addresses
+         * @param user The publishing user this entry applies to or "" if it applies to all publishing users.
+         * @param nodes The array of FQDNs of the data router nodes to redirect publication attempts to.
+         */
+        public ProvForceIngress(String feedid, String subnet, String user, String[] nodes) {
+            this.feedid = feedid;
+            this.subnet = subnet;
+            this.user = user;
+            //Sonar fix
+            if (nodes == null) {
+                this.nodes = new String[0];
+            } else {
+                this.nodes = Arrays.copyOf(nodes, nodes.length);
+            }
         }
-        if (nf != null) {
-            return(nf);
+
+        /**
+         * Get the feed ID
+         */
+        public String getFeedId() {
+            return (feedid);
         }
-        if (f.subnets.length == 0) {
-            return(null);
+
+        /**
+         * Get the subnet
+         */
+        public String getSubnet() {
+            return (subnet);
         }
-        byte[] addr = NodeUtils.getInetAddress(ip);
-        for (SubnetMatcher snm: f.subnets) {
-            if (snm.matches(addr)) {
-                return(null);
-            }
+
+        /**
+         * Get the user
+         */
+        public String getUser() {
+            return (user);
         }
-        return("Publisher not permitted for this feed");
-    }
 
-    /**
-     * Get authenticated user
-     */
-    public String getAuthUser(String feedid, String credentials) {
-        return (feeds.get(feedid).authusers.get(credentials));
+        /**
+         * Get the node
+         */
+        public String[] getNodes() {
+            return (nodes);
+        }
     }
 
     /**
-     * AAF changes: TDP EPIC US# 307413
-     * Check AAF_instance for feed ID
-     * @param feedid   The ID of the feed specified
+     * Raw configuration entry for controlled egress from the data router
      */
-    public String getAafInstance(String feedid) {
-        Feed f = feeds.get(feedid);
-        return f.aafInstance;
-    }
+    public static class ProvForceEgress {
 
-    /**
-     * Check if the request should be redirected to a different ingress node
-     */
-    public String getIngressNode(String feedid, String user, String ip) {
-        Feed f = feeds.get(feedid);
-        if (f.redirections.length == 0) {
-            return (null);
+        private String subid;
+        private String node;
+
+        /**
+         * Construct a forced egress configuration entry
+         *
+         * @param subid The subscription ID the subscription with forced egress
+         * @param node The node handling deliveries for this subscription
+         */
+        public ProvForceEgress(String subid, String node) {
+            this.subid = subid;
+            this.node = node;
         }
-        byte[] addr = NodeUtils.getInetAddress(ip);
-        for (Redirection r : f.redirections) {
-            if (r.user != null && !user.equals(r.user)) {
-                continue;
-            }
-            if (r.snm != null && !r.snm.matches(addr)) {
-                continue;
-            }
-            for (String n : r.nodes) {
-                if (myname.equals(n)) {
-                    return (null);
-                }
-            }
-            if (r.nodes.length == 0) {
-                return (null);
-            }
-            return (r.nodes[rrcntr++ % r.nodes.length]);
+
+        /**
+         * Get the subscription ID
+         */
+        public String getSubId() {
+            return (subid);
         }
-        return (null);
-    }
 
-    /**
-     * Get a provisioned configuration parameter
-     */
-    public String getProvParam(String name) {
-        return (params.get(name));
+        /**
+         * Get the node
+         */
+        public String getNode() {
+            return (node);
+        }
     }
 
     /**
-     * Get all the DestInfos
+     * Raw configuration entry for routing within the data router network
      */
-    public DestInfo[] getAllDests() {
-        return (alldests);
-    }
+    public static class ProvHop {
 
-    /**
-     * Get the targets for a feed
-     *
-     * @param feedid The feed ID
-     * @return The targets this feed should be delivered to
-     */
-    public Target[] getTargets(String feedid) {
-        if (feedid == null) {
-            return (new Target[0]);
+        private String from;
+        private String to;
+        private String via;
+
+        /**
+         * Construct a hop entry
+         *
+         * @param from The FQDN of the node with the data to be delivered
+         * @param to The FQDN of the node that will deliver to the subscriber
+         * @param via The FQDN of the node where the from node should send the data
+         */
+        public ProvHop(String from, String to, String via) {
+            this.from = from;
+            this.to = to;
+            this.via = via;
         }
-        Feed f = feeds.get(feedid);
-        if (f == null) {
-            return (new Target[0]);
+
+        /**
+         * A human readable description of this entry
+         */
+        public String toString() {
+            return ("Hop " + from + "->" + to + " via " + via);
         }
-        return (f.targets);
-    }
 
-    /**
-     * Get the creation date for a feed
-     * @param feedid The feed ID
-     * @return the timestamp of creation date of feed id passed
-     */
-    public String getCreatedDate(String feedid) {
-        Feed f = feeds.get(feedid);
-        return(f.createdDate);
-    }
+        /**
+         * Get the from node
+         */
+        public String getFrom() {
+            return (from);
+        }
 
-    /**
-     * Get the feed ID for a subscription
-     *
-     * @param subid The subscription ID
-     * @return The feed ID
-     */
-    public String getFeedId(String subid) {
-        DestInfo di = subinfo.get(subid);
-        if (di == null) {
-            return (null);
+        /**
+         * Get the to node
+         */
+        public String getTo() {
+            return (to);
         }
-        return (di.getLogData());
-    }
 
-    /**
-     * Get the spool directory for a subscription
-     *
-     * @param subid The subscription ID
-     * @return The spool directory
-     */
-    public String getSpoolDir(String subid) {
-        DestInfo di = subinfo.get(subid);
-        if (di == null) {
-            return (null);
+        /**
+         * Get the next intermediate node
+         */
+        public String getVia() {
+            return (via);
         }
-        return (di.getSpool());
     }
 
-    /**
-     * Get the Authorization value this node uses
-     *
-     * @return The Authorization header value for this node
-     */
-    public String getMyAuth() {
-        return (myauth);
+    private static class Redirection {
+
+        SubnetMatcher snm;
+        String user;
+        String[] nodes;
     }
 
+    private static class Feed {
+
+        String loginfo;
+        String status;
+        SubnetMatcher[] subnets;
+        HashMap<String, String> authusers = new HashMap<>();
+        Redirection[] redirections;
+        Target[] targets;
+        String createdDate;
+        String aafInstance;
+    }
 }
index 884f7bf..8a0b0b8 100644 (file)
@@ -26,8 +26,6 @@ package org.onap.dmaap.datarouter.node;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStreamReader;
@@ -35,6 +33,7 @@ import java.io.Reader;
 import java.net.URL;
 import java.util.Properties;
 import java.util.Timer;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
 
 
 /**
@@ -49,8 +48,9 @@ import java.util.Timer;
  */
 public class NodeConfigManager implements DeliveryQueueHelper {
 
-    private static EELFLogger eelfLogger = EELFManager.getInstance()
-            .getLogger(NodeConfigManager.class);
+    private static final String CHANGE_ME = "changeme";
+    private static final String NODE_CONFIG_MANAGER = "NodeConfigManager";
+    private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(NodeConfigManager.class);
     private static NodeConfigManager base = new NodeConfigManager();
 
     private Timer timer = new Timer("Node Configuration Timer", true);
@@ -94,7 +94,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
     private String eventlogsuffix;
     private String eventloginterval;
     private boolean followredirects;
-    private String [] enabledprotocols;
+    private String[] enabledprotocols;
     private String aafType;
     private String aafInstance;
     private String aafAction;
@@ -102,13 +102,6 @@ public class NodeConfigManager implements DeliveryQueueHelper {
     private boolean cadiEnabled;
 
 
-    /**
-     * Get the default node configuration manager
-     */
-    public static NodeConfigManager getInstance() {
-        return base;
-    }
-
     /**
      * Initialize the configuration of a Data Router node
      */
@@ -120,8 +113,10 @@ public class NodeConfigManager implements DeliveryQueueHelper {
             drNodeProperties.load(new FileInputStream(System
                     .getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties")));
         } catch (Exception e) {
-            NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
-            eelfLogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR, e, System.getProperty("org.onap.dmaap.datarouter.node.properties", "/opt/app/datartr/etc/node.properties"));
+            NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
+            eelfLogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR, e,
+                    System.getProperty("org.onap.dmaap.datarouter.node.properties",
+                            "/opt/app/datartr/etc/node.properties"));
         }
         provurl = drNodeProperties.getProperty("ProvisioningURL", "https://dmaap-dr-prov:8443/internal/prov");
         /*
@@ -143,7 +138,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
         try {
             provhost = (new URL(provurl)).getHost();
         } catch (Exception e) {
-            NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
+            NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
             eelfLogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, e, provurl);
             System.exit(1);
         }
@@ -168,14 +163,14 @@ public class NodeConfigManager implements DeliveryQueueHelper {
         logretention = Long.parseLong(drNodeProperties.getProperty("LogRetention", "30")) * 86400000L;
         eventlogprefix = logdir + "/events";
         eventlogsuffix = ".log";
-        String redirfile = drNodeProperties.getProperty("RedirectionFile", "etc/redirections.dat");
+        redirfile = drNodeProperties.getProperty("RedirectionFile", "etc/redirections.dat");
         kstype = drNodeProperties.getProperty("KeyStoreType", "jks");
         ksfile = drNodeProperties.getProperty("KeyStoreFile", "etc/keystore");
-        kspass = drNodeProperties.getProperty("KeyStorePassword", "changeme");
-        kpass = drNodeProperties.getProperty("KeyPassword", "changeme");
+        kspass = drNodeProperties.getProperty("KeyStorePassword", CHANGE_ME);
+        kpass = drNodeProperties.getProperty("KeyPassword", CHANGE_ME);
         tstype = drNodeProperties.getProperty("TrustStoreType", "jks");
         tsfile = drNodeProperties.getProperty("TrustStoreFile");
-        tspass = drNodeProperties.getProperty("TrustStorePassword", "changeme");
+        tspass = drNodeProperties.getProperty("TrustStorePassword", CHANGE_ME);
         if (tsfile != null && tsfile.length() > 0) {
             System.setProperty("javax.net.ssl.trustStoreType", tstype);
             System.setProperty("javax.net.ssl.trustStore", tsfile);
@@ -185,7 +180,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
         quiesce = new File(drNodeProperties.getProperty("QuiesceFile", "etc/SHUTDOWN"));
         myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass);
         if (myname == null) {
-            NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");
+            NodeUtils.setIpAndFqdnForEelf(NODE_CONFIG_MANAGER);
             eelfLogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile);
             eelfLogger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile);
             System.exit(1);
@@ -202,6 +197,13 @@ public class NodeConfigManager implements DeliveryQueueHelper {
         pfetcher.request();
     }
 
+    /**
+     * Get the default node configuration manager
+     */
+    public static NodeConfigManager getInstance() {
+        return base;
+    }
+
     private void localconfig() {
         followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));
         eventloginterval = getProvParam("LOGROLL_INTERVAL", "30s");
@@ -218,52 +220,53 @@ public class NodeConfigManager implements DeliveryQueueHelper {
         try {
             initfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000);
         } catch (Exception e) {
-            eelfLogger.error("Error parsing DELIVERY_INIT_RETRY_INTERVAL", e);
+            eelfLogger.trace("Error parsing DELIVERY_INIT_RETRY_INTERVAL", e);
         }
         try {
-            waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL")) * 1000);
+            waitForFileProcessFailureTimer = (long) (Double.parseDouble(getProvParam("DELIVERY_FILE_PROCESS_INTERVAL"))
+                    * 1000);
         } catch (Exception e) {
-            eelfLogger.error("Error parsing DELIVERY_FILE_PROCESS_INTERVAL", e);
+            eelfLogger.trace("Error parsing DELIVERY_FILE_PROCESS_INTERVAL", e);
         }
         try {
             maxfailuretimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000);
         } catch (Exception e) {
-            eelfLogger.error("Error parsing DELIVERY_MAX_RETRY_INTERVAL", e);
+            eelfLogger.trace("Error parsing DELIVERY_MAX_RETRY_INTERVAL", e);
         }
         try {
             expirationtimer = (long) (Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000);
         } catch (Exception e) {
-            eelfLogger.error("Error parsing DELIVERY_MAX_AGE", e);
+            eelfLogger.trace("Error parsing DELIVERY_MAX_AGE", e);
         }
         try {
             failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO"));
         } catch (Exception e) {
-            eelfLogger.error("Error parsing DELIVERY_RETRY_RATIO", e);
+            eelfLogger.trace("Error parsing DELIVERY_RETRY_RATIO", e);
         }
         try {
             deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS"));
         } catch (Exception e) {
-            eelfLogger.error("Error parsing DELIVERY_THREADS", e);
+            eelfLogger.trace("Error parsing DELIVERY_THREADS", e);
         }
         try {
             fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT"));
         } catch (Exception e) {
-            eelfLogger.error("Error parsing FAIR_FILE_LIMIT", e);
+            eelfLogger.trace("Error parsing FAIR_FILE_LIMIT", e);
         }
         try {
             fairtimelimit = (long) (Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000);
         } catch (Exception e) {
-            eelfLogger.error("Error parsing FAIR_TIME_LIMIT", e);
+            eelfLogger.trace("Error parsing FAIR_TIME_LIMIT", e);
         }
         try {
             fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0;
         } catch (Exception e) {
-            eelfLogger.error("Error parsing FREE_DISK_RED_PERCENT", e);
+            eelfLogger.trace("Error parsing FREE_DISK_RED_PERCENT", e);
         }
         try {
             fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0;
         } catch (Exception e) {
-            eelfLogger.error("Error parsing FREE_DISK_YELLOW_PERCENT", e);
+            eelfLogger.trace("Error parsing FREE_DISK_YELLOW_PERCENT", e);
         }
         if (fdpstart < 0.01) {
             fdpstart = 0.01;
@@ -286,14 +289,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
             config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak);
             localconfig();
             configtasks.startRun();
-            Runnable rr;
-            while ((rr = configtasks.next()) != null) {
-                try {
-                    rr.run();
-                } catch (Exception e) {
-                    eelfLogger.error("NODE0518 Exception fetchconfig: " + e);
-                }
-            }
+            runTasks();
         } catch (Exception e) {
             NodeUtils.setIpAndFqdnForEelf("fetchconfigs");
             eelfLogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString());
@@ -302,6 +298,17 @@ public class NodeConfigManager implements DeliveryQueueHelper {
         }
     }
 
+    private void runTasks() {
+        Runnable rr;
+        while ((rr = configtasks.next()) != null) {
+            try {
+                rr.run();
+            } catch (Exception e) {
+                eelfLogger.error("NODE0518 Exception fetchconfig: " + e);
+            }
+        }
+    }
+
     /**
      * Process a gofetch request from a particular IP address.  If the IP address is not an IP address we would go to to
      * fetch the provisioning data, ignore the request.  If the data has been fetched very recently (default 10
@@ -381,7 +388,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
      * @return True if the IP and credentials are valid for the specified feed.
      */
     public String isPublishPermitted(String feedid, String ip) {
-        return(config.isPublishPermitted(feedid, ip));
+        return (config.isPublishPermitted(feedid, ip));
     }
 
     /**
@@ -396,12 +403,12 @@ public class NodeConfigManager implements DeliveryQueueHelper {
     }
 
     /**
-     * AAF changes: TDP EPIC US# 307413
-     * Check AAF_instance for feed ID in NodeConfig
+     * AAF changes: TDP EPIC US# 307413 Check AAF_instance for feed ID in NodeConfig
+     *
      * @param feedid The ID of the feed specified
      */
     public String getAafInstance(String feedid) {
-        return(config.getAafInstance(feedid));
+        return (config.getAafInstance(feedid));
     }
 
     /**
@@ -597,11 +604,12 @@ public class NodeConfigManager implements DeliveryQueueHelper {
 
     /**
      * Get the creation date for a feed
+     *
      * @param feedid The feed ID
      * @return the timestamp of creation date of feed id passed
      */
     public String getCreatedDate(String feedid) {
-        return(config.getCreatedDate(feedid));
+        return (config.getCreatedDate(feedid));
     }
 
     /**
@@ -774,10 +782,11 @@ public class NodeConfigManager implements DeliveryQueueHelper {
 
     /**
      * Disable and enable protocols
-     * */
+     */
     public String[] getEnabledprotocols() {
         return enabledprotocols;
     }
+
     public void setEnabledprotocols(String[] enabledprotocols) {
         this.enabledprotocols = enabledprotocols.clone();
     }
@@ -805,34 +814,42 @@ public class NodeConfigManager implements DeliveryQueueHelper {
     public String getAafType() {
         return aafType;
     }
+
     public void setAafType(String aafType) {
         this.aafType = aafType;
     }
+
     public String getAafInstance() {
         return aafInstance;
     }
+
     public void setAafInstance(String aafInstance) {
         this.aafInstance = aafInstance;
     }
+
     public String getAafAction() {
         return aafAction;
     }
+
     public void setAafAction(String aafAction) {
         this.aafAction = aafAction;
     }
+
     /*
      * Get aafURL from SWM variable
      * */
     public String getAafURL() {
         return aafURL;
     }
+
     public void setAafURL(String aafURL) {
         this.aafURL = aafURL;
     }
 
-    public boolean getCadiEnabeld() {
+    public boolean getCadiEnabled() {
         return cadiEnabled;
     }
+
     public void setCadiEnabled(boolean cadiEnabled) {
         this.cadiEnabled = cadiEnabled;
     }
@@ -847,7 +864,7 @@ public class NodeConfigManager implements DeliveryQueueHelper {
         try {
             String type = getAafType();
             String action = getAafAction();
-            if (aafInstance == null || aafInstance.equals("")) {
+            if ("".equals(aafInstance)) {
                 aafInstance = getAafInstance();
             }
             return type + "|" + aafInstance + "|" + action;
index 7a2691e..9eaea28 100644 (file)
@@ -26,74 +26,38 @@ package org.onap.dmaap.datarouter.node;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.Properties;
+import javax.servlet.DispatcherType;
+import javax.servlet.ServletException;
 import org.eclipse.jetty.http.HttpVersion;
-import org.eclipse.jetty.server.*;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
 import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.onap.aaf.cadi.PropAccess;
 
-import javax.servlet.DispatcherType;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.EnumSet;
-import java.util.Properties;
-
 /**
  * The main starting point for the Data Router node
  */
 public class NodeMain {
 
-    private NodeMain() {
-    }
-
     private static EELFLogger nodeMainLogger = EELFManager.getInstance().getLogger(NodeMain.class);
-
-    class Inner {
-        InputStream getCadiProps() {
-            InputStream in = null;
-            try {
-                in = getClass().getClassLoader().getResourceAsStream("drNodeCadi.properties");
-            } catch (Exception e) {
-                nodeMainLogger.error("Exception in Inner.getCadiProps() method ", e);
-            }
-            return in;
-        }
-    }
-
-    private static class WaitForConfig implements Runnable {
-
-        private NodeConfigManager localNodeConfigManager;
-
-        WaitForConfig(NodeConfigManager ncm) {
-            this.localNodeConfigManager = ncm;
-        }
-
-        public synchronized void run() {
-            notify();
-        }
-
-        synchronized void waitForConfig() {
-            localNodeConfigManager.registerConfigTask(this);
-            while (!localNodeConfigManager.isConfigured()) {
-                nodeMainLogger.info("NODE0003 Waiting for Node Configuration");
-                try {
-                    wait();
-                } catch (Exception exception) {
-                    nodeMainLogger
-                            .error("NodeMain: waitForConfig exception. Exception Message:- " + exception.toString(),
-                                    exception);
-                }
-            }
-            localNodeConfigManager.deregisterConfigTask(this);
-            nodeMainLogger.info("NODE0004 Node Configuration Data Received");
-        }
-    }
-
     private static Delivery delivery;
     private static NodeConfigManager nodeConfigManager;
 
+    private NodeMain() {
+    }
+
     /**
      * Reset the retry timer for a subscription
      */
@@ -123,7 +87,8 @@ public class NodeMain {
         httpConfiguration.setRequestHeaderSize(2048);
 
         // HTTP connector
-        try (ServerConnector httpServerConnector = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration))) {
+        try (ServerConnector httpServerConnector = new ServerConnector(server,
+                new HttpConnectionFactory(httpConfiguration))) {
             httpServerConnector.setPort(nodeConfigManager.getHttpPort());
             httpServerConnector.setIdleTimeout(2000);
 
@@ -147,9 +112,12 @@ public class NodeMain {
 
             sslContextFactory.addExcludeProtocols("SSLv3");
             sslContextFactory.setIncludeProtocols(nodeConfigManager.getEnabledprotocols());
-            nodeMainLogger.info("NODE00004 Unsupported protocols node server:-" + String.join(",", sslContextFactory.getExcludeProtocols()));
-            nodeMainLogger.info("NODE00004 Supported protocols node server:-" + String.join(",", sslContextFactory.getIncludeProtocols()));
-            nodeMainLogger.info("NODE00004 Unsupported ciphers node server:-" + String.join(",", sslContextFactory.getExcludeCipherSuites()));
+            nodeMainLogger.info("NODE00004 Unsupported protocols node server:-" +
+                    String.join(",", sslContextFactory.getExcludeProtocols()));
+            nodeMainLogger.info("NODE00004 Supported protocols node server:-" +
+                    String.join(",", sslContextFactory.getIncludeProtocols()));
+            nodeMainLogger.info("NODE00004 Unsupported ciphers node server:-" +
+                    String.join(",", sslContextFactory.getExcludeCipherSuites()));
 
             HttpConfiguration httpsConfiguration = new HttpConfiguration(httpConfiguration);
             httpsConfiguration.setRequestHeaderSize(8192);
@@ -174,20 +142,8 @@ public class NodeMain {
                 servletContextHandler.addServlet(new ServletHolder(new NodeServlet(delivery)), "/*");
 
                 //CADI Filter activation check
-                if (nodeConfigManager.getCadiEnabeld()) {
-                    Properties cadiProperties = new Properties();
-                    try {
-                        Inner obj = new NodeMain().new Inner();
-                        InputStream in = obj.getCadiProps();
-                        cadiProperties.load(in);
-                    } catch (IOException e1) {
-                        nodeMainLogger.error("NODE00005 Exception in NodeMain.Main() loading CADI properties ", e1);
-                    }
-                    cadiProperties.setProperty("aaf_locate_url", nodeConfigManager.getAafURL());
-                    nodeMainLogger.info("NODE00005  aaf_url set to - " + cadiProperties.getProperty("aaf_url"));
-
-                    PropAccess access = new PropAccess(cadiProperties);
-                    servletContextHandler.addFilter(new FilterHolder(new DRNodeCadiFilter(true, access)), "/*", EnumSet.of(DispatcherType.REQUEST));
+                if (nodeConfigManager.getCadiEnabled()) {
+                    enableCadi(servletContextHandler);
                 }
 
                 server.setHandler(servletContextHandler);
@@ -199,9 +155,68 @@ public class NodeMain {
             server.start();
             nodeMainLogger.info("NODE00006 Node Server started-" + server.getState());
         } catch (Exception e) {
-            nodeMainLogger.info("NODE00006 Jetty failed to start. Reporting will we unavailable", e);
+            nodeMainLogger.info("NODE00006 Jetty failed to start. Reporting will we unavailable", e.getMessage());
         }
         server.join();
         nodeMainLogger.info("NODE00007 Node Server joined - " + server.getState());
     }
+
+    private static void enableCadi(ServletContextHandler servletContextHandler) throws ServletException {
+        Properties cadiProperties = new Properties();
+        try {
+            Inner obj = new NodeMain().new Inner();
+            InputStream in = obj.getCadiProps();
+            cadiProperties.load(in);
+        } catch (IOException e1) {
+            nodeMainLogger.error("NODE00005 Exception in NodeMain.Main() loading CADI properties " + e1.getMessage());
+        }
+        cadiProperties.setProperty("aaf_locate_url", nodeConfigManager.getAafURL());
+        nodeMainLogger.info("NODE00005  aaf_url set to - " + cadiProperties.getProperty("aaf_url"));
+
+        PropAccess access = new PropAccess(cadiProperties);
+        servletContextHandler.addFilter(new FilterHolder(new DRNodeCadiFilter(true, access)), "/*", EnumSet
+                .of(DispatcherType.REQUEST));
+    }
+
+    private static class WaitForConfig implements Runnable {
+
+        private NodeConfigManager localNodeConfigManager;
+
+        WaitForConfig(NodeConfigManager ncm) {
+            this.localNodeConfigManager = ncm;
+        }
+
+        public synchronized void run() {
+            notify();
+        }
+
+        synchronized void waitForConfig() {
+            localNodeConfigManager.registerConfigTask(this);
+            while (!localNodeConfigManager.isConfigured()) {
+                nodeMainLogger.info("NODE0003 Waiting for Node Configuration");
+                try {
+                    wait();
+                } catch (Exception exception) {
+                    nodeMainLogger
+                            .error("NodeMain: waitForConfig exception. Exception Message:- " + exception.toString(),
+                                    exception);
+                }
+            }
+            localNodeConfigManager.deregisterConfigTask(this);
+            nodeMainLogger.info("NODE0004 Node Configuration Data Received");
+        }
+    }
+
+    class Inner {
+
+        InputStream getCadiProps() {
+            InputStream in = null;
+            try {
+                in = getClass().getClassLoader().getResourceAsStream("drNodeCadi.properties");
+            } catch (Exception e) {
+                nodeMainLogger.error("Exception in Inner.getCadiProps() method ", e);
+            }
+            return in;
+        }
+    }
 }
index d665080..3f2fc09 100644 (file)
 
 package org.onap.dmaap.datarouter.node;
 
+import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError;
+
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-import org.jetbrains.annotations.Nullable;
-import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
-import org.slf4j.MDC;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileWriter;
@@ -45,8 +40,12 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Enumeration;
 import java.util.regex.Pattern;
-
-import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
+import org.slf4j.MDC;
 
 /**
  * Servlet for handling all http and https requests to the data router node
@@ -61,23 +60,27 @@ import static org.onap.dmaap.datarouter.node.NodeUtils.sendResponseError;
  */
 public class NodeServlet extends HttpServlet {
 
+    private static final String FROM = " from ";
+    private static final String INVALID_REQUEST_URI = "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.";
+    private static final String IO_EXCEPTION = "IOException";
+    private static final String ON_BEHALF_OF = "X-DMAAP-DR-ON-BEHALF-OF";
     private static NodeConfigManager config;
-    private static Pattern MetaDataPattern;
+    private static Pattern metaDataPattern;
     private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(NodeServlet.class);
-    private final Delivery delivery;
 
     static {
         final String ws = "\\s*";
         // assume that \\ and \" have been replaced by X
         final String string = "\"[^\"]*\"";
-        //String string = "\"(?:[^\"\\\\]|\\\\.)*\"";
         final String number = "[+-]?(?:\\.\\d+|(?:0|[1-9]\\d*)(?:\\.\\d*)?)(?:[eE][+-]?\\d+)?";
         final String value = "(?:" + string + "|" + number + "|null|true|false)";
         final String item = string + ws + ":" + ws + value + ws;
         final String object = ws + "\\{" + ws + "(?:" + item + "(?:" + "," + ws + item + ")*)?\\}" + ws;
-        MetaDataPattern = Pattern.compile(object, Pattern.DOTALL);
+        metaDataPattern = Pattern.compile(object, Pattern.DOTALL);
     }
 
+    private final Delivery delivery;
+
     NodeServlet(Delivery delivery) {
         this.delivery = delivery;
     }
@@ -91,7 +94,7 @@ public class NodeServlet extends HttpServlet {
         eelfLogger.info("NODE0101 Node Servlet Configured");
     }
 
-    private boolean down(HttpServletResponse resp) throws IOException {
+    private boolean down(HttpServletResponse resp) {
         if (config.isShutdown() || !config.isConfigured()) {
             sendResponseError(resp, HttpServletResponse.SC_SERVICE_UNAVAILABLE, eelfLogger);
             eelfLogger.info("NODE0102 Rejecting request: Service is being quiesced");
@@ -109,15 +112,10 @@ public class NodeServlet extends HttpServlet {
         NodeUtils.setRequestIdAndInvocationId(req);
         eelfLogger.info(EelfMsgs.ENTRY);
         try {
-            eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-DMAAP-DR-ON-BEHALF-OF"),
+            eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader(ON_BEHALF_OF),
                     getIdFromPath(req) + "");
-            try {
-                if (down(resp)) {
-                    return;
-                }
-
-            } catch (IOException ioe) {
-                eelfLogger.error("IOException", ioe);
+            if (down(resp)) {
+                return;
             }
             String path = req.getPathInfo();
             String qs = req.getQueryString();
@@ -138,7 +136,7 @@ public class NodeServlet extends HttpServlet {
                 }
             }
 
-            eelfLogger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip);
+            eelfLogger.info("NODE0103 Rejecting invalid GET of " + path + FROM + ip);
             sendResponseError(resp, HttpServletResponse.SC_NOT_FOUND, eelfLogger);
         } finally {
             eelfLogger.info(EelfMsgs.EXIT);
@@ -153,12 +151,12 @@ public class NodeServlet extends HttpServlet {
         NodeUtils.setIpAndFqdnForEelf("doPut");
         NodeUtils.setRequestIdAndInvocationId(req);
         eelfLogger.info(EelfMsgs.ENTRY);
-        eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-DMAAP-DR-ON-BEHALF-OF"),
+        eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader(ON_BEHALF_OF),
                 getIdFromPath(req) + "");
         try {
             common(req, resp, true);
         } catch (IOException ioe) {
-            eelfLogger.error("IOException", ioe);
+            eelfLogger.error(IO_EXCEPTION, ioe);
             eelfLogger.info(EelfMsgs.EXIT);
         }
     }
@@ -171,25 +169,30 @@ public class NodeServlet extends HttpServlet {
         NodeUtils.setIpAndFqdnForEelf("doDelete");
         NodeUtils.setRequestIdAndInvocationId(req);
         eelfLogger.info(EelfMsgs.ENTRY);
-        eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-DMAAP-DR-ON-BEHALF-OF"),
+        eelfLogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader(ON_BEHALF_OF),
                 getIdFromPath(req) + "");
         try {
             common(req, resp, false);
         } catch (IOException ioe) {
-            eelfLogger.error("IOException", ioe);
+            eelfLogger.error(IO_EXCEPTION, ioe);
             eelfLogger.info(EelfMsgs.EXIT);
         }
     }
 
     private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws IOException {
+        final String PUBLISH = "/publish/";
+        final String INTERNAL_PUBLISH = "/internal/publish/";
+        final String HTTPS = "https://";
+        final String USER = " user ";
         String fileid = getFileId(req, resp);
-        if (fileid == null) return;
+        if (fileid == null) {
+            return;
+        }
         String feedid = null;
         String user = null;
         String ip = req.getRemoteAddr();
         String lip = req.getLocalAddr();
         String pubid = null;
-        String xpubid = null;
         String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;
         Target[] targets = null;
         boolean isAAFFeed = false;
@@ -199,17 +202,17 @@ public class NodeServlet extends HttpServlet {
         }
         String credentials = req.getHeader("Authorization");
         if (credentials == null) {
-            eelfLogger.error("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req
+            eelfLogger.error("NODE0306 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + FROM + req
                     .getRemoteAddr());
             resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Authorization header required");
             eelfLogger.info(EelfMsgs.EXIT);
             return;
         }
-        if (fileid.startsWith("/publish/")) {
+        if (fileid.startsWith(PUBLISH)) {
             fileid = fileid.substring(9);
             int i = fileid.indexOf('/');
             if (i == -1 || i == fileid.length() - 1) {
-                eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+                eelfLogger.error("NODE0205 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req
                         .getRemoteAddr());
                 resp.sendError(HttpServletResponse.SC_NOT_FOUND,
                         "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.  Possible missing fileid.");
@@ -218,18 +221,19 @@ public class NodeServlet extends HttpServlet {
             }
             feedid = fileid.substring(0, i);
 
-            if (config.getCadiEnabeld()) {
+            if (config.getCadiEnabled()) {
                 String path = req.getPathInfo();
                 if (!path.startsWith("/internal") && feedid != null) {
                     String aafInstance = config.getAafInstance(feedid);
-                    if (!(aafInstance.equalsIgnoreCase("legacy"))) {
+                    if (!("legacy".equalsIgnoreCase(aafInstance))) {
                         isAAFFeed = true;
                         String permission = config.getPermission(aafInstance);
                         eelfLogger.info("NodeServlet.common() permission string - " + permission);
                         //Check in CADI Framework API if user has AAF permission or not
                         if (!req.isUserInRole(permission)) {
                             String message = "AAF disallows access to permission string - " + permission;
-                            eelfLogger.error("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());
+                            eelfLogger.error("NODE0307 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo()
+                                    + FROM + req.getRemoteAddr());
                             resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
                             eelfLogger.info(EelfMsgs.EXIT);
                             return;
@@ -240,9 +244,8 @@ public class NodeServlet extends HttpServlet {
 
             fileid = fileid.substring(i + 1);
             pubid = config.getPublishId();
-            xpubid = req.getHeader("X-DMAAP-DR-PUBLISH-ID");
             targets = config.getTargets(feedid);
-        } else if (fileid.startsWith("/internal/publish/")) {
+        } else if (fileid.startsWith(INTERNAL_PUBLISH)) {
             if (!config.isAnotherNode(credentials, ip)) {
                 eelfLogger.error("NODE0107 Rejecting unauthorized node-to-node transfer attempt from " + ip);
                 resp.sendError(HttpServletResponse.SC_FORBIDDEN);
@@ -254,18 +257,18 @@ public class NodeServlet extends HttpServlet {
             user = "datartr";   // SP6 : Added usr as datartr to avoid null entries for internal routing
             targets = config.parseRouting(req.getHeader("X-DMAAP-DR-ROUTING"));
         } else {
-            eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+            eelfLogger.error("NODE0204 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req
                     .getRemoteAddr());
             resp.sendError(HttpServletResponse.SC_NOT_FOUND,
-                    "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");
+                    INVALID_REQUEST_URI);
             eelfLogger.info(EelfMsgs.EXIT);
             return;
         }
         if (fileid.indexOf('/') != -1) {
-            eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+            eelfLogger.error("NODE0202 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req
                     .getRemoteAddr());
             resp.sendError(HttpServletResponse.SC_NOT_FOUND,
-                    "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");
+                    INVALID_REQUEST_URI);
             eelfLogger.info(EelfMsgs.EXIT);
             return;
         }
@@ -278,14 +281,16 @@ public class NodeServlet extends HttpServlet {
         if (xp != 443) {
             hp = hp + ":" + xp;
         }
-        String logurl = "https://" + hp + "/internal/publish/" + fileid;
+        String logurl = HTTPS + hp + INTERNAL_PUBLISH + fileid;
         if (feedid != null) {
-            logurl = "https://" + hp + "/publish/" + feedid + "/" + fileid;
+            logurl = HTTPS + hp + PUBLISH + feedid + "/" + fileid;
             //Cadi code starts
             if (!isAAFFeed) {
                 String reason = config.isPublishPermitted(feedid, credentials, ip);
                 if (reason != null) {
-                    eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil.cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + " from " + PathUtil.cleanString(ip) + " reason " + PathUtil.cleanString(reason));
+                    eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil
+                            .cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + FROM + PathUtil
+                            .cleanString(ip) + " reason " + PathUtil.cleanString(reason));
                     resp.sendError(HttpServletResponse.SC_FORBIDDEN, reason);
                     eelfLogger.info(EelfMsgs.EXIT);
                     return;
@@ -294,9 +299,12 @@ public class NodeServlet extends HttpServlet {
             } else {
                 String reason = config.isPublishPermitted(feedid, ip);
                 if (reason != null) {
-                    eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil.cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + " from " + PathUtil.cleanString(ip) + " reason   Invalid AAF user- " + PathUtil.cleanString(reason));
+                    eelfLogger.error("NODE0111 Rejecting unauthorized publish attempt to feed " + PathUtil
+                            .cleanString(feedid) + " fileid " + PathUtil.cleanString(fileid) + FROM + PathUtil
+                            .cleanString(ip) + " reason   Invalid AAF user- " + PathUtil.cleanString(reason));
                     String message = "Invalid AAF user- " + PathUtil.cleanString(reason);
-                    eelfLogger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + PathUtil.cleanString(req.getPathInfo()) + " from " + PathUtil.cleanString(req.getRemoteAddr()));
+                    eelfLogger.info("NODE0308 Rejecting unauthenticated PUT or DELETE of " + PathUtil
+                            .cleanString(req.getPathInfo()) + FROM + PathUtil.cleanString(req.getRemoteAddr()));
                     resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
                     return;
                 }
@@ -316,25 +324,26 @@ public class NodeServlet extends HttpServlet {
                 if (iport != 443) {
                     port = ":" + iport;
                 }
-                String redirto = "https://" + newnode + port + "/publish/" + feedid + "/" + fileid;
-                eelfLogger.info("NODE0108 Redirecting publish attempt for feed " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " to " + PathUtil.cleanString(redirto));  //Fortify scan fixes - log forging
+                String redirto = HTTPS + newnode + port + PUBLISH + feedid + "/" + fileid;
+                eelfLogger
+                        .info("NODE0108 Redirecting publish attempt for feed " + PathUtil.cleanString(feedid) + USER
+                                + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " to " + PathUtil
+                                .cleanString(redirto));  //Fortify scan fixes - log forging
                 resp.sendRedirect(PathUtil.cleanString(redirto));         //Fortify scan fixes-open redirect - 2 issues
                 eelfLogger.info(EelfMsgs.EXIT);
                 return;
             }
             resp.setHeader("X-DMAAP-DR-PUBLISH-ID", pubid);
         }
-        if (req.getPathInfo().startsWith("/internal/publish/")) {
+        if (req.getPathInfo().startsWith(INTERNAL_PUBLISH)) {
             feedid = req.getHeader("X-DMAAP-DR-FEED-ID");
         }
         String fbase = PathUtil.cleanString(config.getSpoolDir() + "/" + pubid);  //Fortify scan fixes-Path manipulation
         File data = new File(fbase);
         File meta = new File(fbase + ".M");
-        OutputStream dos = null;
         Writer mw = null;
-        InputStream is = null;
         try {
-            StringBuffer mx = new StringBuffer();
+            StringBuilder mx = new StringBuilder();
             mx.append(req.getMethod()).append('\t').append(fileid).append('\n');
             Enumeration hnames = req.getHeaderNames();
             String ctype = null;
@@ -364,13 +373,17 @@ public class NodeServlet extends HttpServlet {
                         }
                         if ("x-dmaap-dr-meta".equals(hnlc)) {
                             if (hv.length() > 4096) {
-                                eelfLogger.error("NODE0109 Rejecting publish attempt with metadata too long for feed " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip));  //Fortify scan fixes - log forging
+                                eelfLogger.error("NODE0109 Rejecting publish attempt with metadata too long for feed "
+                                        + PathUtil.cleanString(feedid) + USER + PathUtil.cleanString(user) + " ip "
+                                        + PathUtil.cleanString(ip));  //Fortify scan fixes - log forging
                                 resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Metadata too long");
                                 eelfLogger.info(EelfMsgs.EXIT);
                                 return;
                             }
-                            if (!MetaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) {
-                                eelfLogger.error("NODE0109 Rejecting publish attempt with malformed metadata for feed " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip));  //Fortify scan fixes - log forging
+                            if (!metaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) {
+                                eelfLogger.error("NODE0109 Rejecting publish attempt with malformed metadata for feed "
+                                        + PathUtil.cleanString(feedid) + USER + PathUtil.cleanString(user) + " ip "
+                                        + PathUtil.cleanString(ip));  //Fortify scan fixes - log forging
                                 resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Malformed metadata");
                                 eelfLogger.info(EelfMsgs.EXIT);
                                 return;
@@ -388,28 +401,12 @@ public class NodeServlet extends HttpServlet {
             }
             mx.append("X-DMAAP-DR-RECEIVED\t").append(rcvd).append('\n');
             String metadata = mx.toString();
-            byte[] buf = new byte[1024 * 1024];
-            int i;
-            try {
-                is = req.getInputStream();
-                dos = new FileOutputStream(data);
-                while ((i = is.read(buf)) > 0) {
-                    dos.write(buf, 0, i);
-                }
-                is.close();
-                is = null;
-                dos.close();
-                dos = null;
-            } catch (IOException ioe) {
-                long exlen = -1;
-                try {
-                    exlen = Long.parseLong(req.getHeader("Content-Length"));
-                } catch (Exception e) {
-                    eelfLogger.error("NODE0529 Exception common: " + e);
-                }
-                StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage());
-                eelfLogger.info(EelfMsgs.EXIT);
-                throw ioe;
+            long exlen = getExlen(req);
+            String message = writeInputStreamToFile(req, data);
+            if (message != null) {
+                StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user,
+                        message);
+                throw new IOException(message);
             }
             Path dpath = Paths.get(fbase);
             for (Target t : targets) {
@@ -418,7 +415,8 @@ public class NodeServlet extends HttpServlet {
                     // TODO: unknown destination
                     continue;
                 }
-                String dbase = PathUtil.cleanString(di.getSpool() + "/" + pubid);  //Fortify scan fixes-Path Manipulation
+                String dbase = PathUtil
+                        .cleanString(di.getSpool() + "/" + pubid);  //Fortify scan fixes-Path Manipulation
                 Files.createLink(Paths.get(dbase), dpath);
                 mw = new FileWriter(meta);
                 mw.write(metadata);
@@ -427,45 +425,28 @@ public class NodeServlet extends HttpServlet {
                 }
                 mw.close();
                 meta.renameTo(new File(dbase + ".M"));
-
             }
             resp.setStatus(HttpServletResponse.SC_NO_CONTENT);
             try {
                 resp.getOutputStream().close();
             } catch (IOException ioe) {
-                long exlen = -1;
-                try {
-                    exlen = Long.parseLong(req.getHeader("Content-Length"));
-                } catch (Exception e) {
-                    eelfLogger.error("NODE00000 Exception common", e);
-                }
-                StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage());
+                StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user,
+                        ioe.getMessage());
                 //Fortify scan fixes - log forging
-                eelfLogger.error("NODE0110 IO Exception while closing IO stream " + PathUtil.cleanString(feedid) + " user " + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " " + ioe.toString(), ioe);
-
+                eelfLogger.error("NODE0110 IO Exception while closing IO stream " + PathUtil.cleanString(feedid)
+                        + USER + PathUtil.cleanString(user) + " ip " + PathUtil.cleanString(ip) + " " + ioe
+                        .toString(), ioe);
                 throw ioe;
             }
 
-            StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user, HttpServletResponse.SC_NO_CONTENT);
+            StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user,
+                    HttpServletResponse.SC_NO_CONTENT);
         } catch (IOException ioe) {
-            eelfLogger.error("NODE0110 IO Exception receiving publish attempt for feed " + feedid + " user " + user + " ip " + ip + " " + ioe.toString(), ioe);
+            eelfLogger.error("NODE0110 IO Exception receiving publish attempt for feed " + feedid + USER + user
+                    + " ip " + ip + " " + ioe.toString(), ioe);
             eelfLogger.info(EelfMsgs.EXIT);
             throw ioe;
         } finally {
-            if (is != null) {
-                try {
-                    is.close();
-                } catch (Exception e) {
-                    eelfLogger.error("NODE0530 Exception common: " + e);
-                }
-            }
-            if (dos != null) {
-                try {
-                    dos.close();
-                } catch (Exception e) {
-                    eelfLogger.error("NODE0531 Exception common: " + e);
-                }
-            }
             if (mw != null) {
                 try {
                     mw.close();
@@ -486,12 +467,39 @@ public class NodeServlet extends HttpServlet {
         }
     }
 
+    private String writeInputStreamToFile(HttpServletRequest req, File data) {
+        byte[] buf = new byte[1024 * 1024];
+        int i;
+        try (OutputStream dos = new FileOutputStream(data);
+                InputStream is = req.getInputStream()) {
+            while ((i = is.read(buf)) > 0) {
+                dos.write(buf, 0, i);
+            }
+        } catch (IOException ioe) {
+            eelfLogger.error("NODE0530 Exception common: " + ioe, ioe);
+            eelfLogger.info(EelfMsgs.EXIT);
+            return ioe.getMessage();
+        }
+        return null;
+    }
+
+    private long getExlen(HttpServletRequest req) {
+        long exlen = -1;
+        try {
+            exlen = Long.parseLong(req.getHeader("Content-Length"));
+        } catch (Exception e) {
+            eelfLogger.error("NODE0529 Exception common: " + e);
+        }
+        return exlen;
+    }
+
     private void deleteFile(HttpServletRequest req, HttpServletResponse resp, String fileid, String pubid) {
+        final String FROM_DR_MESSAGE = ".M) from DR Node: ";
         try {
             fileid = fileid.substring(8);
             int i = fileid.indexOf('/');
             if (i == -1 || i == fileid.length() - 1) {
-                eelfLogger.error("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + " from " + req
+                eelfLogger.error("NODE0112 Rejecting bad URI for DELETE of " + req.getPathInfo() + FROM + req
                         .getRemoteAddr());
                 resp.sendError(HttpServletResponse.SC_NOT_FOUND,
                         "Invalid request URI. Expecting <subId>/<pubId>.");
@@ -501,7 +509,7 @@ public class NodeServlet extends HttpServlet {
             String subscriptionId = fileid.substring(0, i);
             int subId = Integer.parseInt(subscriptionId);
             pubid = fileid.substring(i + 1);
-            String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+            String errorMessage = "Unable to delete files (" + pubid + ", " + pubid + FROM_DR_MESSAGE
                     + config.getMyName() + ".";
             int subIdDir = subId - (subId % 100);
             if (!isAuthorizedToDelete(resp, subscriptionId, errorMessage)) {
@@ -509,7 +517,7 @@ public class NodeServlet extends HttpServlet {
             }
             boolean result = delivery.markTaskSuccess(config.getSpoolBase() + "/s/" + subIdDir + "/" + subId, pubid);
             if (result) {
-                eelfLogger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + ".M) from DR Node: "
+                eelfLogger.info("NODE0115 Successfully deleted files (" + pubid + ", " + pubid + FROM_DR_MESSAGE
                         + config.getMyName());
                 resp.setStatus(HttpServletResponse.SC_OK);
                 eelfLogger.info(EelfMsgs.EXIT);
@@ -519,7 +527,7 @@ public class NodeServlet extends HttpServlet {
                 eelfLogger.info(EelfMsgs.EXIT);
             }
         } catch (IOException ioe) {
-            eelfLogger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + ".M) from DR Node: "
+            eelfLogger.error("NODE0117 Unable to delete files (" + pubid + ", " + pubid + FROM_DR_MESSAGE
                     + config.getMyName(), ioe);
             eelfLogger.info(EelfMsgs.EXIT);
         }
@@ -533,7 +541,7 @@ public class NodeServlet extends HttpServlet {
         }
         if (!req.isSecure()) {
             eelfLogger.error(
-                    "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req
+                    "NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + FROM + req
                             .getRemoteAddr());
             resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");
             eelfLogger.info(EelfMsgs.EXIT);
@@ -541,17 +549,18 @@ public class NodeServlet extends HttpServlet {
         }
         String fileid = req.getPathInfo();
         if (fileid == null) {
-            eelfLogger.error("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req
+            eelfLogger.error("NODE0201 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + FROM + req
                     .getRemoteAddr());
             resp.sendError(HttpServletResponse.SC_NOT_FOUND,
-                    "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");
+                    INVALID_REQUEST_URI);
             eelfLogger.info(EelfMsgs.EXIT);
             return null;
         }
         return fileid;
     }
 
-    private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage) throws IOException {
+    private boolean isAuthorizedToDelete(HttpServletResponse resp, String subscriptionId, String errorMessage)
+            throws IOException {
         try {
             boolean deletePermitted = config.isDeletePermitted(subscriptionId);
             if (!deletePermitted) {
index da84ae5..e79e2ee 100644 (file)
 
 package org.onap.dmaap.datarouter.node;
 
+import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
+import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN;
+import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS;
+import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME;
+
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.security.KeyStore;
+import java.security.KeyStoreException;
 import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -47,8 +54,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
 import org.slf4j.MDC;
 
-import static com.att.eelf.configuration.Configuration.*;
-
 /**
  * Utility functions for the data router node
  */
@@ -63,7 +68,7 @@ public class NodeUtils {
     /**
      * Base64 encode a byte array
      *
-     * @param  raw The bytes to be encoded
+     * @param raw The bytes to be encoded
      * @return The encoded string
      */
     public static String base64Encode(byte[] raw) {
@@ -117,11 +122,7 @@ public class NodeUtils {
         KeyStore ks;
         try {
             ks = KeyStore.getInstance(kstype);
-            try (FileInputStream fileInputStream = new FileInputStream(ksfile)) {
-                ks.load(fileInputStream, kspass.toCharArray());
-            } catch (IOException ioException) {
-                eelfLogger.error("IOException occurred while opening FileInputStream: " + ioException.getMessage(),
-                        ioException);
+            if (loadKeyStore(ksfile, kspass, ks)) {
                 return (null);
             }
         } catch (Exception e) {
@@ -142,22 +143,9 @@ public class NodeUtils {
         try {
             Enumeration<String> aliases = ks.aliases();
             while (aliases.hasMoreElements()) {
-                String s = aliases.nextElement();
-                if (ks.entryInstanceOf(s, KeyStore.PrivateKeyEntry.class)) {
-                    X509Certificate c = (X509Certificate) ks.getCertificate(s);
-                    if (c != null) {
-                        String subject = c.getSubjectX500Principal().getName();
-                        String[] parts = subject.split(",");
-                        if (parts.length < 1) {
-                            return (null);
-                        }
-                        subject = parts[5].trim();
-                        if (!subject.startsWith("CN=")) {
-                            return (null);
-
-                        }
-                        return (subject.substring(3));
-                    }
+                String name = getNameFromSubject(ks, aliases);
+                if (name != null) {
+                    return name;
                 }
             }
         } catch (Exception e) {
@@ -290,19 +278,51 @@ public class NodeUtils {
     /**
      * Method to check to see if file is of type gzip
      *
-     * @param   file The name of the file to be checked
-     * @return  True if the file is of type gzip
+     * @param file The name of the file to be checked
+     * @return True if the file is of type gzip
      */
-    public static boolean isFiletypeGzip(File file){
-        try(FileInputStream fileInputStream = new FileInputStream(file);
-            GZIPInputStream gzip = new GZIPInputStream(fileInputStream)) {
+    public static boolean isFiletypeGzip(File file) {
+        try (FileInputStream fileInputStream = new FileInputStream(file);
+                GZIPInputStream gzip = new GZIPInputStream(fileInputStream)) {
 
             return true;
-        }catch (IOException e){
+        } catch (IOException e) {
             eelfLogger.error("NODE0403 " + file.toString() + " Not in gzip(gz) format: " + e.toString() + e);
             return false;
         }
     }
 
 
+    private static boolean loadKeyStore(String ksfile, String kspass, KeyStore ks)
+            throws NoSuchAlgorithmException, CertificateException {
+        try (FileInputStream fileInputStream = new FileInputStream(ksfile)) {
+            ks.load(fileInputStream, kspass.toCharArray());
+        } catch (IOException ioException) {
+            eelfLogger.error("IOException occurred while opening FileInputStream: " + ioException.getMessage(),
+                    ioException);
+            return true;
+        }
+        return false;
+    }
+
+
+    private static String getNameFromSubject(KeyStore ks, Enumeration<String> aliases) throws KeyStoreException {
+        String s = aliases.nextElement();
+        if (ks.entryInstanceOf(s, KeyStore.PrivateKeyEntry.class)) {
+            X509Certificate c = (X509Certificate) ks.getCertificate(s);
+            if (c != null) {
+                String subject = c.getSubjectX500Principal().getName();
+                String[] parts = subject.split(",");
+                if (parts.length < 1) {
+                    return null;
+                }
+                subject = parts[5].trim();
+                if (!subject.startsWith("CN=")) {
+                    return null;
+                }
+                return subject.substring(3);
+            }
+        }
+        return null;
+    }
 }
index fec2ca3..d8beab5 100644 (file)
 
 package org.onap.dmaap.datarouter.node;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.Vector;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvHop;
 
 /**
  * Given a set of node names and next hops, identify and ignore any cycles and figure out the sequence of next hops to
@@ -35,16 +37,41 @@ import java.util.Vector;
 
 public class PathFinder {
 
-    private static class Hop {
+    private ArrayList<String> errors = new ArrayList<>();
+    private HashMap<String, String> routes = new HashMap<>();
 
-        boolean mark;
-        boolean bad;
-        NodeConfig.ProvHop basis;
+    /**
+     * Find routes from a specified origin to all of the nodes given a set of specified next hops.
+     *
+     * @param origin where we start
+     * @param nodes where we can go
+     * @param hops detours along the way
+     */
+    public PathFinder(String origin, String[] nodes, NodeConfig.ProvHop[] hops) {
+        HashSet<String> known = new HashSet<>();
+        HashMap<String, HashMap<String, Hop>> ht = new HashMap<>();
+        for (String n : nodes) {
+            known.add(n);
+            ht.put(n, new HashMap<>());
+        }
+        for (NodeConfig.ProvHop ph : hops) {
+            Hop h = getHop(known, ht, ph);
+            if (h == null) {
+                continue;
+            }
+            if (ph.getVia().equals(ph.getTo())) {
+                errors.add(ph + " gives destination as via");
+                h.bad = true;
+            }
+        }
+        for (String n : known) {
+            if (n.equals(origin)) {
+                routes.put(n, "");
+            }
+            routes.put(n, plot(origin, n, ht.get(n)) + "/");
+        }
     }
 
-    private Vector<String> errors = new Vector<String>();
-    private Hashtable<String, String> routes = new Hashtable<String, String>();
-
     /**
      * Get list of errors encountered while finding paths
      *
@@ -68,13 +95,12 @@ public class PathFinder {
         return (ret);
     }
 
-    private String plot(String from, String to, Hashtable<String, Hop> info) {
+    private String plot(String from, String to, HashMap<String, Hop> info) {
         Hop nh = info.get(from);
         if (nh == null || nh.bad) {
             return (to);
         }
         if (nh.mark) {
-            // loop detected;
             while (!nh.bad) {
                 nh.bad = true;
                 errors.add(nh.basis + " is part of a cycle");
@@ -91,55 +117,38 @@ public class PathFinder {
         return (nh.basis.getVia() + "/" + x);
     }
 
-    /**
-     * Find routes from a specified origin to all of the nodes given a set of specified next hops.
-     *
-     * @param origin where we start
-     * @param nodes where we can go
-     * @param hops detours along the way
-     */
-    public PathFinder(String origin, String[] nodes, NodeConfig.ProvHop[] hops) {
-        HashSet<String> known = new HashSet<String>();
-        Hashtable<String, Hashtable<String, Hop>> ht = new Hashtable<String, Hashtable<String, Hop>>();
-        for (String n : nodes) {
-            known.add(n);
-            ht.put(n, new Hashtable<String, Hop>());
+    @Nullable
+    private Hop getHop(HashSet<String> known, HashMap<String, HashMap<String, Hop>> ht, ProvHop ph) {
+        if (!known.contains(ph.getFrom())) {
+            errors.add(ph + " references unknown from node");
+            return null;
         }
-        for (NodeConfig.ProvHop ph : hops) {
-            if (!known.contains(ph.getFrom())) {
-                errors.add(ph + " references unknown from node");
-                continue;
-            }
-            if (!known.contains(ph.getTo())) {
-                errors.add(ph + " references unknown destination node");
-                continue;
-            }
-            Hashtable<String, Hop> ht2 = ht.get(ph.getTo());
-            Hop h = ht2.get(ph.getFrom());
-            if (h != null) {
-                h.bad = true;
-                errors.add(ph + " gives duplicate next hop - previous via was " + h.basis.getVia());
-                continue;
-            }
-            h = new Hop();
-            h.basis = ph;
-            ht2.put(ph.getFrom(), h);
-            if (!known.contains(ph.getVia())) {
-                errors.add(ph + " references unknown via node");
-                h.bad = true;
-                continue;
-            }
-            if (ph.getVia().equals(ph.getTo())) {
-                errors.add(ph + " gives destination as via");
-                h.bad = true;
-                continue;
-            }
+        if (!known.contains(ph.getTo())) {
+            errors.add(ph + " references unknown destination node");
+            return null;
         }
-        for (String n : known) {
-            if (n.equals(origin)) {
-                routes.put(n, "");
-            }
-            routes.put(n, plot(origin, n, ht.get(n)) + "/");
+        HashMap<String, Hop> ht2 = ht.get(ph.getTo());
+        Hop h = ht2.get(ph.getFrom());
+        if (h != null) {
+            h.bad = true;
+            errors.add(ph + " gives duplicate next hop - previous via was " + h.basis.getVia());
+            return null;
         }
+        h = new Hop();
+        h.basis = ph;
+        ht2.put(ph.getFrom(), h);
+        if (!known.contains(ph.getVia())) {
+            errors.add(ph + " references unknown via node");
+            h.bad = true;
+            return null;
+        }
+        return h;
+    }
+
+    private static class Hop {
+
+        boolean mark;
+        boolean bad;
+        NodeConfig.ProvHop basis;
     }
 }
index a403441..16f8033 100644 (file)
@@ -1,65 +1,82 @@
-/**\r
- * -\r
+/*-\r
  * ============LICENSE_START=======================================================\r
- * Copyright (C) 2019 Nordix Foundation.\r
+ *  Copyright (C) 2019 Nordix Foundation.\r
  * ================================================================================\r
  * Licensed under the Apache License, Version 2.0 (the "License");\r
  * you may not use this file except in compliance with the License.\r
  * You may obtain a copy of the License at\r
- * <p>\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- * <p>\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
  * Unless required by applicable law or agreed to in writing, software\r
  * distributed under the License is distributed on an "AS IS" BASIS,\r
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
  * See the License for the specific language governing permissions and\r
  * limitations under the License.\r
- * <p>\r
+ *\r
  * SPDX-License-Identifier: Apache-2.0\r
  * ============LICENSE_END=========================================================\r
  */\r
+\r
 package org.onap.dmaap.datarouter.node;\r
 \r
 /**\r
  * FORTIFY SCAN FIXES\r
  * <p>This Utility is used for Fortify fixes. It Validates the path url formed from\r
- *  the string passed in the request parameters.</p>\r
- *\r
+ * the string passed in the request parameters.</p>\r
  */\r
 class PathUtil {\r
 \r
+    private PathUtil() {\r
+        throw new IllegalStateException("Utility Class");\r
+    }\r
+\r
     /**\r
      * This method takes String as the parameter and return the filtered path string.\r
+     *\r
      * @param aString String to clean\r
      * @return A cleaned String\r
      */\r
     static String cleanString(String aString) {\r
-        if (aString == null) return null;\r
-        String cleanString = "";\r
+        if (aString == null) {\r
+            return null;\r
+        }\r
+        StringBuilder cleanString = new StringBuilder();\r
         for (int i = 0; i < aString.length(); ++i) {\r
-            cleanString += cleanChar(aString.charAt(i));\r
+            cleanString.append(cleanChar(aString.charAt(i)));\r
         }\r
-        return cleanString;\r
+        return cleanString.toString();\r
     }\r
 \r
     /**\r
      * This method filters the valid special characters in path string.\r
+     *\r
      * @param aChar The char to be cleaned\r
      * @return The cleaned char\r
      */\r
     private static char cleanChar(char aChar) {\r
         // 0 - 9\r
         for (int i = 48; i < 58; ++i) {\r
-            if (aChar == i) return (char) i;\r
+            if (aChar == i) {\r
+                return (char) i;\r
+            }\r
         }\r
         // 'A' - 'Z'\r
         for (int i = 65; i < 91; ++i) {\r
-            if (aChar == i) return (char) i;\r
+            if (aChar == i) {\r
+                return (char) i;\r
+            }\r
         }\r
         // 'a' - 'z'\r
         for (int i = 97; i < 123; ++i) {\r
-            if (aChar == i) return (char) i;\r
+            if (aChar == i) {\r
+                return (char) i;\r
+            }\r
         }\r
+        return getValidCharacter(aChar);\r
+    }\r
+\r
+    private static char getValidCharacter(char aChar) {\r
         // other valid characters\r
         switch (aChar) {\r
             case '/':\r
@@ -82,7 +99,8 @@ class PathUtil {
                 return '_';\r
             case ' ':\r
                 return ' ';\r
+            default:\r
+                return '%';\r
         }\r
-        return '%';\r
     }\r
 }\r
index 1af7dda..bb9ddc3 100644 (file)
 
 package org.onap.dmaap.datarouter.node;
 
-import java.io.*;
-import java.util.*;
-
-import org.json.*;
-import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
-
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.ArrayList;
+import org.jetbrains.annotations.Nullable;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvFeed;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvFeedSubnet;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvFeedUser;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvForceEgress;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvForceIngress;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvHop;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvNode;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvParam;
+import org.onap.dmaap.datarouter.node.NodeConfig.ProvSubscription;
+import org.onap.dmaap.datarouter.node.eelf.EelfMsgs;
 
 /**
  * Parser for provisioning data from the provisioning server.
  * <p>
- * The ProvData class uses a Reader for the text configuration from the
- * provisioning server to construct arrays of raw configuration entries.
+ * The ProvData class uses a Reader for the text configuration from the provisioning server to construct arrays of raw
+ * configuration entries.
  */
 public class ProvData {
+
+    private static final String FEED_ID = "feedid";
     private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(ProvData.class);
-    private NodeConfig.ProvNode[] pn;
-    private NodeConfig.ProvParam[] pp;
-    private NodeConfig.ProvFeed[] pf;
-    private NodeConfig.ProvFeedUser[] pfu;
-    private NodeConfig.ProvFeedSubnet[] pfsn;
-    private NodeConfig.ProvSubscription[] ps;
-    private NodeConfig.ProvForceIngress[] pfi;
-    private NodeConfig.ProvForceEgress[] pfe;
-    private NodeConfig.ProvHop[] ph;
-
-    private static String[] gvasa(JSONArray a, int index) {
-        return (gvasa(a.get(index)));
+    private NodeConfig.ProvNode[] provNodes;
+    private NodeConfig.ProvParam[] provParams;
+    private NodeConfig.ProvFeed[] provFeeds;
+    private NodeConfig.ProvFeedUser[] provFeedUsers;
+    private NodeConfig.ProvFeedSubnet[] provFeedSubnets;
+    private NodeConfig.ProvSubscription[] provSubscriptions;
+    private NodeConfig.ProvForceIngress[] provForceIngresses;
+    private NodeConfig.ProvForceEgress[] provForceEgresses;
+    private NodeConfig.ProvHop[] provHops;
+
+    /**
+     * Construct raw provisioing data entries from the text (JSON) provisioning document received from the provisioning
+     * server
+     *
+     * @param r The reader for the JSON text.
+     */
+    public ProvData(Reader r) throws IOException {
+        ArrayList<ProvNode> provNodes1 = new ArrayList<>();
+        ArrayList<NodeConfig.ProvParam> provParams1 = new ArrayList<>();
+        ArrayList<NodeConfig.ProvFeed> provFeeds1 = new ArrayList<>();
+        ArrayList<NodeConfig.ProvFeedUser> provFeedUsers1 = new ArrayList<>();
+        ArrayList<NodeConfig.ProvFeedSubnet> provFeedSubnets1 = new ArrayList<>();
+        ArrayList<NodeConfig.ProvSubscription> provSubscriptions1 = new ArrayList<>();
+        ArrayList<NodeConfig.ProvForceIngress> provForceIngresses1 = new ArrayList<>();
+        ArrayList<NodeConfig.ProvForceEgress> provForceEgresses1 = new ArrayList<>();
+        ArrayList<NodeConfig.ProvHop> provHops1 = new ArrayList<>();
+        try {
+            JSONTokener jtx = new JSONTokener(r);
+            JSONObject jcfg = new JSONObject(jtx);
+            char c = jtx.nextClean();
+            if (c != '\0') {
+                throw new JSONException("Spurious characters following configuration");
+            }
+            r.close();
+            addJSONFeeds(provFeeds1, provFeedUsers1, provFeedSubnets1, jcfg);
+            addJSONSubs(provSubscriptions1, jcfg);
+            addJSONParams(provNodes1, provParams1, jcfg);
+            addJSONRoutingInformation(provForceIngresses1, provForceEgresses1, provHops1, jcfg);
+        } catch (JSONException jse) {
+            NodeUtils.setIpAndFqdnForEelf("ProvData");
+            eelfLogger.error(EelfMsgs.MESSAGE_PARSING_ERROR, jse.toString());
+            eelfLogger
+                    .error("NODE0201 Error parsing configuration data from provisioning server " + jse.toString(), jse);
+            throw new IOException(jse.toString(), jse);
+        }
+        provNodes = provNodes1.toArray(new NodeConfig.ProvNode[provNodes1.size()]);
+        provParams = provParams1.toArray(new NodeConfig.ProvParam[provParams1.size()]);
+        provFeeds = provFeeds1.toArray(new NodeConfig.ProvFeed[provFeeds1.size()]);
+        provFeedUsers = provFeedUsers1.toArray(new NodeConfig.ProvFeedUser[provFeedUsers1.size()]);
+        provFeedSubnets = provFeedSubnets1.toArray(new NodeConfig.ProvFeedSubnet[provFeedSubnets1.size()]);
+        provSubscriptions = provSubscriptions1.toArray(new NodeConfig.ProvSubscription[provSubscriptions1.size()]);
+        provForceIngresses = provForceIngresses1.toArray(new NodeConfig.ProvForceIngress[provForceIngresses1.size()]);
+        provForceEgresses = provForceEgresses1.toArray(new NodeConfig.ProvForceEgress[provForceEgresses1.size()]);
+        provHops = provHops1.toArray(new NodeConfig.ProvHop[provHops1.size()]);
     }
 
     private static String[] gvasa(JSONObject o, String key) {
@@ -62,7 +118,7 @@ public class ProvData {
     private static String[] gvasa(Object o) {
         if (o instanceof JSONArray) {
             JSONArray a = (JSONArray) o;
-            Vector<String> v = new Vector<String>();
+            ArrayList<String> v = new ArrayList<>();
             for (int i = 0; i < a.length(); i++) {
                 String s = gvas(a, i);
                 if (s != null) {
@@ -95,237 +151,257 @@ public class ProvData {
         return (null);
     }
 
-    /**
-     * Construct raw provisioing data entries from the text (JSON)
-     * provisioning document received from the provisioning server
-     *
-     * @param r The reader for the JSON text.
-     */
-    public ProvData(Reader r) throws IOException {
-        Vector<NodeConfig.ProvNode> pnv = new Vector<NodeConfig.ProvNode>();
-        Vector<NodeConfig.ProvParam> ppv = new Vector<NodeConfig.ProvParam>();
-        Vector<NodeConfig.ProvFeed> pfv = new Vector<NodeConfig.ProvFeed>();
-        Vector<NodeConfig.ProvFeedUser> pfuv = new Vector<NodeConfig.ProvFeedUser>();
-        Vector<NodeConfig.ProvFeedSubnet> pfsnv = new Vector<NodeConfig.ProvFeedSubnet>();
-        Vector<NodeConfig.ProvSubscription> psv = new Vector<NodeConfig.ProvSubscription>();
-        Vector<NodeConfig.ProvForceIngress> pfiv = new Vector<NodeConfig.ProvForceIngress>();
-        Vector<NodeConfig.ProvForceEgress> pfev = new Vector<NodeConfig.ProvForceEgress>();
-        Vector<NodeConfig.ProvHop> phv = new Vector<NodeConfig.ProvHop>();
-        try {
-            JSONTokener jtx = new JSONTokener(r);
-            JSONObject jcfg = new JSONObject(jtx);
-            char c = jtx.nextClean();
-            if (c != '\0') {
-                throw new JSONException("Spurious characters following configuration");
-            }
-            r.close();
-            JSONArray jfeeds = jcfg.optJSONArray("feeds");
-            if (jfeeds != null) {
-                for (int fx = 0; fx < jfeeds.length(); fx++) {
-                    JSONObject jfeed = jfeeds.getJSONObject(fx);
-                    String stat = null;
-                    if (jfeed.optBoolean("suspend", false)) {
-                        stat = "Feed is suspended";
-                    }
-                    if (jfeed.optBoolean("deleted", false)) {
-                        stat = "Feed is deleted";
-                    }
-                    String fid = gvas(jfeed, "feedid");
-                    String fname = gvas(jfeed, "name");
-                    String fver = gvas(jfeed, "version");
-                    String createdDate = gvas(jfeed, "created_date");
-                    /*
-                     * START - AAF changes
-                     * TDP EPIC US# 307413
-                     * Passing aafInstance to ProvFeed from feeds json passed by prov to identify legacy/AAF feeds
-                     */
-                    String aafInstance = gvas(jfeed, "aaf_instance");
-                    pfv.add(new NodeConfig.ProvFeed(fid, fname + "//" + fver, stat,createdDate, aafInstance));
-                    /*
-                     * END - AAF changes
-                     */
-                    JSONObject jauth = jfeed.optJSONObject("authorization");
-                    if (jauth == null) {
-                        continue;
-                    }
-                    JSONArray jeids = jauth.optJSONArray("endpoint_ids");
-                    if (jeids != null) {
-                        for (int ux = 0; ux < jeids.length(); ux++) {
-                            JSONObject ju = jeids.getJSONObject(ux);
-                            String login = gvas(ju, "id");
-                            String password = gvas(ju, "password");
-                            pfuv.add(new NodeConfig.ProvFeedUser(fid, login, NodeUtils.getAuthHdr(login, password)));
-                        }
-                    }
-                    JSONArray jeips = jauth.optJSONArray("endpoint_addrs");
-                    if (jeips != null) {
-                        for (int ix = 0; ix < jeips.length(); ix++) {
-                            String sn = gvas(jeips, ix);
-                            pfsnv.add(new NodeConfig.ProvFeedSubnet(fid, sn));
-                        }
-                    }
-                }
-            }
-            JSONArray jsubs = jcfg.optJSONArray("subscriptions");
-            if (jsubs != null) {
-                for (int sx = 0; sx < jsubs.length(); sx++) {
-                    JSONObject jsub = jsubs.getJSONObject(sx);
-                    if (jsub.optBoolean("suspend", false)) {
-                        continue;
-                    }
-                    String sid = gvas(jsub, "subid");
-                    String fid = gvas(jsub, "feedid");
-                    JSONObject jdel = jsub.getJSONObject("delivery");
-                    String delurl = gvas(jdel, "url");
-                    String id = gvas(jdel, "user");
-                    String password = gvas(jdel, "password");
-                    boolean monly = jsub.getBoolean("metadataOnly");
-                    boolean use100 = jdel.getBoolean("use100");
-                    boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber");
-                    boolean decompress = jsub.getBoolean("decompress");
-                    boolean followRedirect = jsub.getBoolean("follow_redirect");
-                    psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100, privilegedSubscriber, followRedirect, decompress));
-                }
-            }
-            JSONObject jparams = jcfg.optJSONObject("parameters");
-            if (jparams != null) {
-                for (String pname : JSONObject.getNames(jparams)) {
-                    String pvalue = gvas(jparams, pname);
-                    if (pvalue != null) {
-                        ppv.add(new NodeConfig.ProvParam(pname, pvalue));
-                    }
-                }
-                String sfx = gvas(jparams, "PROV_DOMAIN");
-                JSONArray jnodes = jparams.optJSONArray("NODES");
-                if (jnodes != null) {
-                    for (int nx = 0; nx < jnodes.length(); nx++) {
-                        String nn = gvas(jnodes, nx);
-                        if (nn.indexOf('.') == -1) {
-                            nn = nn + "." + sfx;
-                        }
-                        pnv.add(new NodeConfig.ProvNode(nn));
-                    }
-                }
-            }
-            JSONArray jingresses = jcfg.optJSONArray("ingress");
-            if (jingresses != null) {
-                for (int fx = 0; fx < jingresses.length(); fx++) {
-                    JSONObject jingress = jingresses.getJSONObject(fx);
-                    String fid = gvas(jingress, "feedid");
-                    String subnet = gvas(jingress, "subnet");
-                    String user = gvas(jingress, "user");
-                    String[] nodes = gvasa(jingress, "node");
-                    if (fid == null || "".equals(fid)) {
-                        continue;
-                    }
-                    if ("".equals(subnet)) {
-                        subnet = null;
-                    }
-                    if ("".equals(user)) {
-                        user = null;
-                    }
-                    pfiv.add(new NodeConfig.ProvForceIngress(fid, subnet, user, nodes));
-                }
-            }
-            JSONObject jegresses = jcfg.optJSONObject("egress");
-            if (jegresses != null && JSONObject.getNames(jegresses) != null) {
-                for (String esid : JSONObject.getNames(jegresses)) {
-                    String enode = gvas(jegresses, esid);
-                    if (esid != null && enode != null && !"".equals(esid) && !"".equals(enode)) {
-                        pfev.add(new NodeConfig.ProvForceEgress(esid, enode));
-                    }
-                }
-            }
-            JSONArray jhops = jcfg.optJSONArray("routing");
-            if (jhops != null) {
-                for (int fx = 0; fx < jhops.length(); fx++) {
-                    JSONObject jhop = jhops.getJSONObject(fx);
-                    String from = gvas(jhop, "from");
-                    String to = gvas(jhop, "to");
-                    String via = gvas(jhop, "via");
-                    if (from == null || to == null || via == null || "".equals(from) || "".equals(to) || "".equals(via)) {
-                        continue;
-                    }
-                    phv.add(new NodeConfig.ProvHop(from, to, via));
-                }
-            }
-        } catch (JSONException jse) {
-            NodeUtils.setIpAndFqdnForEelf("ProvData");
-            eelfLogger.error(EelfMsgs.MESSAGE_PARSING_ERROR, jse.toString());
-            eelfLogger.error("NODE0201 Error parsing configuration data from provisioning server " + jse.toString(), jse);
-            throw new IOException(jse.toString(), jse);
-        }
-        pn = pnv.toArray(new NodeConfig.ProvNode[pnv.size()]);
-        pp = ppv.toArray(new NodeConfig.ProvParam[ppv.size()]);
-        pf = pfv.toArray(new NodeConfig.ProvFeed[pfv.size()]);
-        pfu = pfuv.toArray(new NodeConfig.ProvFeedUser[pfuv.size()]);
-        pfsn = pfsnv.toArray(new NodeConfig.ProvFeedSubnet[pfsnv.size()]);
-        ps = psv.toArray(new NodeConfig.ProvSubscription[psv.size()]);
-        pfi = pfiv.toArray(new NodeConfig.ProvForceIngress[pfiv.size()]);
-        pfe = pfev.toArray(new NodeConfig.ProvForceEgress[pfev.size()]);
-        ph = phv.toArray(new NodeConfig.ProvHop[phv.size()]);
-    }
-
     /**
      * Get the raw node configuration entries
      */
     public NodeConfig.ProvNode[] getNodes() {
-        return (pn);
+        return (provNodes);
     }
 
     /**
      * Get the raw parameter configuration entries
      */
     public NodeConfig.ProvParam[] getParams() {
-        return (pp);
+        return (provParams);
     }
 
     /**
      * Ge the raw feed configuration entries
      */
     public NodeConfig.ProvFeed[] getFeeds() {
-        return (pf);
+        return (provFeeds);
     }
 
     /**
      * Get the raw feed user configuration entries
      */
     public NodeConfig.ProvFeedUser[] getFeedUsers() {
-        return (pfu);
+        return (provFeedUsers);
     }
 
     /**
      * Get the raw feed subnet configuration entries
      */
     public NodeConfig.ProvFeedSubnet[] getFeedSubnets() {
-        return (pfsn);
+        return (provFeedSubnets);
     }
 
     /**
      * Get the raw subscription entries
      */
     public NodeConfig.ProvSubscription[] getSubscriptions() {
-        return (ps);
+        return (provSubscriptions);
     }
 
     /**
      * Get the raw forced ingress entries
      */
     public NodeConfig.ProvForceIngress[] getForceIngress() {
-        return (pfi);
+        return (provForceIngresses);
     }
 
     /**
      * Get the raw forced egress entries
      */
     public NodeConfig.ProvForceEgress[] getForceEgress() {
-        return (pfe);
+        return (provForceEgresses);
     }
 
     /**
      * Get the raw next hop entries
      */
     public NodeConfig.ProvHop[] getHops() {
-        return (ph);
+        return (provHops);
+    }
+
+    @Nullable
+    private String getFeedStatus(JSONObject jfeed) {
+        String stat = null;
+        if (jfeed.optBoolean("suspend", false)) {
+            stat = "Feed is suspended";
+        }
+        if (jfeed.optBoolean("deleted", false)) {
+            stat = "Feed is deleted";
+        }
+        return stat;
+    }
+
+    private void addJSONFeeds(ArrayList<ProvFeed> provFeeds1, ArrayList<ProvFeedUser> provFeedUsers1,
+            ArrayList<ProvFeedSubnet> provFeedSubnets1,
+            JSONObject jsonConfig) {
+        JSONArray jfeeds = jsonConfig.optJSONArray("feeds");
+        if (jfeeds != null) {
+            for (int fx = 0; fx < jfeeds.length(); fx++) {
+                addJSONFeed(provFeeds1, provFeedUsers1, provFeedSubnets1, jfeeds, fx);
+            }
+        }
+    }
+
+    private void addJSONFeed(ArrayList<ProvFeed> provFeeds1, ArrayList<ProvFeedUser> provFeedUsers1,
+            ArrayList<ProvFeedSubnet> provFeedSubnets1, JSONArray jfeeds, int feedIndex) {
+        JSONObject jfeed = jfeeds.getJSONObject(feedIndex);
+        String stat = getFeedStatus(jfeed);
+        String fid = gvas(jfeed, FEED_ID);
+        String fname = gvas(jfeed, "name");
+        String fver = gvas(jfeed, "version");
+        String createdDate = gvas(jfeed, "created_date");
+        /*
+         * START - AAF changes
+         * TDP EPIC US# 307413
+         * Passing aafInstance to ProvFeed from feeds json passed by prov to identify legacy/AAF feeds
+         */
+        String aafInstance = gvas(jfeed, "aaf_instance");
+        provFeeds1.add(new ProvFeed(fid, fname + "//" + fver, stat, createdDate, aafInstance));
+        /*
+         * END - AAF changes
+         */
+        addJSONFeedAuthArrays(provFeedUsers1, provFeedSubnets1, jfeed, fid);
+    }
+
+    private void addJSONFeedAuthArrays(ArrayList<ProvFeedUser> provFeedUsers1,
+            ArrayList<ProvFeedSubnet> provFeedSubnets1, JSONObject jfeed, String fid) {
+        JSONObject jauth = jfeed.optJSONObject("authorization");
+        if (jauth == null) {
+            return;
+        }
+        JSONArray jeids = jauth.optJSONArray("endpoint_ids");
+        if (jeids != null) {
+            for (int ux = 0; ux < jeids.length(); ux++) {
+                JSONObject ju = jeids.getJSONObject(ux);
+                String login = gvas(ju, "id");
+                String password = gvas(ju, "password");
+                provFeedUsers1.add(new ProvFeedUser(fid, login, NodeUtils.getAuthHdr(login, password)));
+            }
+        }
+        JSONArray jeips = jauth.optJSONArray("endpoint_addrs");
+        if (jeips != null) {
+            for (int ix = 0; ix < jeips.length(); ix++) {
+                String sn = gvas(jeips, ix);
+                provFeedSubnets1.add(new ProvFeedSubnet(fid, sn));
+            }
+        }
+    }
+
+    private void addJSONSubs(ArrayList<ProvSubscription> provSubscriptions1, JSONObject jsonConfig) {
+        JSONArray jsubs = jsonConfig.optJSONArray("subscriptions");
+        if (jsubs != null) {
+            for (int sx = 0; sx < jsubs.length(); sx++) {
+                addJSONSub(provSubscriptions1, jsubs, sx);
+            }
+        }
+    }
+
+    private void addJSONSub(ArrayList<ProvSubscription> provSubscriptions1, JSONArray jsubs, int sx) {
+        JSONObject jsub = jsubs.getJSONObject(sx);
+        if (jsub.optBoolean("suspend", false)) {
+            return;
+        }
+        String sid = gvas(jsub, "subid");
+        String fid = gvas(jsub, FEED_ID);
+        JSONObject jdel = jsub.getJSONObject("delivery");
+        String delurl = gvas(jdel, "url");
+        String id = gvas(jdel, "user");
+        String password = gvas(jdel, "password");
+        boolean monly = jsub.getBoolean("metadataOnly");
+        boolean use100 = jdel.getBoolean("use100");
+        boolean privilegedSubscriber = jsub.getBoolean("privilegedSubscriber");
+        boolean decompress = jsub.getBoolean("decompress");
+        boolean followRedirect = jsub.getBoolean("follow_redirect");
+        provSubscriptions1
+                .add(new ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100,
+                        privilegedSubscriber, followRedirect, decompress));
+    }
+
+    private void addJSONParams(ArrayList<ProvNode> provNodes1, ArrayList<ProvParam> provParams1,
+            JSONObject jsonconfig) {
+        JSONObject jparams = jsonconfig.optJSONObject("parameters");
+        if (jparams != null) {
+            for (String pname : JSONObject.getNames(jparams)) {
+                addJSONParam(provParams1, jparams, pname);
+            }
+            addJSONNodesToParams(provNodes1, jparams);
+        }
+    }
+
+    private void addJSONParam(ArrayList<ProvParam> provParams1, JSONObject jparams, String pname) {
+        String pvalue = gvas(jparams, pname);
+        if (pvalue != null) {
+            provParams1.add(new ProvParam(pname, pvalue));
+        }
+    }
+
+    private void addJSONNodesToParams(ArrayList<ProvNode> provNodes1, JSONObject jparams) {
+        String sfx = gvas(jparams, "PROV_DOMAIN");
+        JSONArray jnodes = jparams.optJSONArray("NODES");
+        if (jnodes != null) {
+            for (int nx = 0; nx < jnodes.length(); nx++) {
+                String nn = gvas(jnodes, nx);
+                if (nn == null) {
+                    continue;
+                }
+                if (nn.indexOf('.') == -1) {
+                    nn = nn + "." + sfx;
+                }
+                provNodes1.add(new ProvNode(nn));
+            }
+        }
+    }
+
+    private void addJSONRoutingInformation(ArrayList<ProvForceIngress> provForceIngresses1,
+            ArrayList<ProvForceEgress> provForceEgresses1, ArrayList<ProvHop> provHops1, JSONObject jsonConfig) {
+        JSONArray jingresses = jsonConfig.optJSONArray("ingress");
+        if (jingresses != null) {
+            for (int fx = 0; fx < jingresses.length(); fx++) {
+                addJSONIngressRoute(provForceIngresses1, jingresses, fx);
+            }
+        }
+        JSONObject jegresses = jsonConfig.optJSONObject("egress");
+        if (jegresses != null && JSONObject.getNames(jegresses) != null) {
+            for (String esid : JSONObject.getNames(jegresses)) {
+                addJSONEgressRoute(provForceEgresses1, jegresses, esid);
+            }
+        }
+        JSONArray jhops = jsonConfig.optJSONArray("routing");
+        if (jhops != null) {
+            for (int fx = 0; fx < jhops.length(); fx++) {
+                addJSONRoutes(provHops1, jhops, fx);
+            }
+        }
+    }
+
+    private void addJSONIngressRoute(ArrayList<ProvForceIngress> provForceIngresses1, JSONArray jingresses, int fx) {
+        JSONObject jingress = jingresses.getJSONObject(fx);
+        String fid = gvas(jingress, FEED_ID);
+        String subnet = gvas(jingress, "subnet");
+        String user = gvas(jingress, "user");
+        String[] nodes = gvasa(jingress, "node");
+        if (fid == null || "".equals(fid)) {
+            return;
+        }
+        if ("".equals(subnet)) {
+            subnet = null;
+        }
+        if ("".equals(user)) {
+            user = null;
+        }
+        provForceIngresses1.add(new ProvForceIngress(fid, subnet, user, nodes));
+    }
+
+    private void addJSONEgressRoute(ArrayList<ProvForceEgress> provForceEgresses1, JSONObject jegresses, String esid) {
+        String enode = gvas(jegresses, esid);
+        if (esid != null && enode != null && !"".equals(esid) && !"".equals(enode)) {
+            provForceEgresses1.add(new ProvForceEgress(esid, enode));
+        }
+    }
+
+    private void addJSONRoutes(ArrayList<ProvHop> provHops1, JSONArray jhops, int fx) {
+        JSONObject jhop = jhops.getJSONObject(fx);
+        String from = gvas(jhop, "from");
+        String to = gvas(jhop, "to");
+        String via = gvas(jhop, "via");
+        if (from == null || to == null || via == null || "".equals(from) || "".equals(to) || "".equals(via)) {
+            return;
+        }
+        provHops1.add(new ProvHop(from, to, via));
     }
 }
index 3d4908e..5b7248a 100644 (file)
@@ -28,6 +28,7 @@ package org.onap.dmaap.datarouter.node;
  * Generate publish IDs
  */
 public class PublishId {
+
     private long nextuid;
     private String myname;
 
@@ -41,7 +42,8 @@ public class PublishId {
     }
 
     /**
-     * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log correlation purposes.
+     * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log
+     * correlation purposes.
      */
     public synchronized String next() {
         long now = System.currentTimeMillis();
index 42af8ca..94b694d 100644 (file)
 
 package org.onap.dmaap.datarouter.node;
 
-import java.util.*;
+import java.util.Timer;
+import java.util.TimerTask;
 
 /**
  * Execute an operation no more frequently than a specified interval
  */
 
 public abstract class RateLimitedOperation implements Runnable {
+
     private boolean marked;    // a timer task exists
     private boolean executing;    // the operation is currently in progress
     private boolean remark;    // a request was made while the operation was in progress
@@ -41,29 +43,15 @@ public abstract class RateLimitedOperation implements Runnable {
     /**
      * Create a rate limited operation
      *
-     * @param mininterval The minimum number of milliseconds after the last execution starts before a new execution can begin
-     * @param timer       The timer used to perform deferred executions
+     * @param mininterval The minimum number of milliseconds after the last execution starts before a new execution can
+     * begin
+     * @param timer The timer used to perform deferred executions
      */
     public RateLimitedOperation(long mininterval, Timer timer) {
         this.timer = timer;
         this.mininterval = mininterval;
     }
 
-    private class deferred extends TimerTask {
-        public void run() {
-            execute();
-        }
-    }
-
-    private synchronized void unmark() {
-        marked = false;
-    }
-
-    private void execute() {
-        unmark();
-        request();
-    }
-
     /**
      * Request that the operation be performed by this thread or at a later time by the timer
      */
@@ -90,7 +78,7 @@ public abstract class RateLimitedOperation implements Runnable {
         if (last + mininterval > now) {
             // too soon - schedule a timer
             marked = true;
-            timer.schedule(new deferred(), last + mininterval - now);
+            timer.schedule(new Deferred(), last + mininterval - now);
             return (true);
         }
         last = now;
@@ -107,4 +95,20 @@ public abstract class RateLimitedOperation implements Runnable {
         }
         return (false);
     }
+
+    private class Deferred extends TimerTask {
+
+        public void run() {
+            execute();
+        }
+
+        private void execute() {
+            unmark();
+            request();
+        }
+
+        private synchronized void unmark() {
+            marked = false;
+        }
+    }
 }
index 4cd650b..83e3c30 100644 (file)
@@ -29,27 +29,27 @@ import com.att.eelf.configuration.EELFManager;
 import java.io.BufferedReader;
 import java.io.FileOutputStream;
 import java.io.FileReader;
-import java.io.IOException;
 import java.io.OutputStream;
-import java.util.Hashtable;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Timer;
 
 /**
  * Track redirections of subscriptions
  */
 public class RedirManager {
+
     private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(RedirManager.class);
-    private Hashtable<String, String> sid2primary = new Hashtable<String, String>();
-    private Hashtable<String, String> sid2secondary = new Hashtable<String, String>();
-    private String redirfile;
     RateLimitedOperation op;
+    private HashMap<String, String> sid2primary = new HashMap<>();
+    private HashMap<String, String> sid2secondary = new HashMap<>();
+    private String redirfile;
 
     /**
      * Create a mechanism for maintaining subscription redirections.
      *
      * @param redirfile The file to store the redirection information.
-     * @param mininterval The minimum number of milliseconds between writes to the redirection
-     * information file.
+     * @param mininterval The minimum number of milliseconds between writes to the redirection information file.
      * @param timer The timer thread used to run delayed file writes.
      */
     public RedirManager(String redirfile, long mininterval, Timer timer) {
@@ -57,10 +57,12 @@ public class RedirManager {
         op = new RateLimitedOperation(mininterval, timer) {
             public void run() {
                 try {
-                    StringBuffer sb = new StringBuffer();
-                    for (String s : sid2primary.keySet()) {
-                        sb.append(s).append(' ').append(sid2primary.get(s)).append(' ')
-                            .append(sid2secondary.get(s)).append('\n');
+                    StringBuilder sb = new StringBuilder();
+                    for (Map.Entry<String, String> entry : sid2primary.entrySet()) {
+                        String s = entry.getKey();
+                        String value = entry.getValue();
+                        sb.append(s).append(' ').append(value).append(' ')
+                                .append(sid2secondary.get(s)).append('\n');
                     }
                     try (OutputStream os = new FileOutputStream(RedirManager.this.redirfile)) {
                         os.write(sb.toString().getBytes());
@@ -74,23 +76,17 @@ public class RedirManager {
             String s;
             try (BufferedReader br = new BufferedReader(new FileReader(redirfile))) {
                 while ((s = br.readLine()) != null) {
-                    s = s.trim();
-                    String[] sx = s.split(" ");
-                    if (s.startsWith("#") || sx.length != 3) {
-                        continue;
-                    }
-                    sid2primary.put(sx[0], sx[1]);
-                    sid2secondary.put(sx[0], sx[2]);
+                    addSubRedirInfo(s);
                 }
             }
         } catch (Exception e) {
-            eelfLogger.error("Missing file is normal", e);
+            eelfLogger.debug("Missing file is normal", e);
         }
     }
 
     /**
-     * Set up redirection.  If a request is to be sent to subscription ID sid, and that is
-     * configured to go to URL primary, instead, go to secondary.
+     * Set up redirection.  If a request is to be sent to subscription ID sid, and that is configured to go to URL
+     * primary, instead, go to secondary.
      *
      * @param sid The subscription ID to be redirected
      * @param primary The URL associated with that subscription ID
@@ -103,8 +99,7 @@ public class RedirManager {
     }
 
     /**
-     * Cancel redirection.  If a request is to be sent to subscription ID sid, send it to its
-     * primary URL.
+     * Cancel redirection.  If a request is to be sent to subscription ID sid, send it to its primary URL.
      *
      * @param sid The subscription ID to remove from the table.
      */
@@ -115,8 +110,8 @@ public class RedirManager {
     }
 
     /**
-     * Look up where to send a subscription.  If the primary has changed or there is no redirection,
-     * use the primary.  Otherwise, redirect to the secondary URL.
+     * Look up where to send a subscription.  If the primary has changed or there is no redirection, use the primary.
+     * Otherwise, redirect to the secondary URL.
      *
      * @param sid The subscription ID to look up.
      * @param primary The configured primary URL.
@@ -138,4 +133,14 @@ public class RedirManager {
     public synchronized boolean isRedirected(String sid) {
         return (sid != null && sid2secondary.get(sid) != null);
     }
+
+    private void addSubRedirInfo(String s) {
+        s = s.trim();
+        String[] sx = s.split(" ");
+        if (s.startsWith("#") || sx.length != 3) {
+            return;
+        }
+        sid2primary.put(sx[0], sx[1]);
+        sid2secondary.put(sx[0], sx[2]);
+    }
 }
index 1be3408..e616558 100644 (file)
@@ -25,20 +25,27 @@ package org.onap.dmaap.datarouter.node;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-import java.util.regex.*;
-import java.util.*;
-import java.io.*;
-import java.nio.file.*;
-import java.text.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Logging for data router delivery events (PUB/DEL/EXP)
  */
 public class StatusLog {
+
+    private static final String EXCEPTION = "Exception";
     private static EELFLogger eelfLogger = EELFManager.getInstance().getLogger(StatusLog.class);
     private static StatusLog instance = new StatusLog();
-    private HashSet<String> toship = new HashSet<String>();
-    private SimpleDateFormat filedate;
+    private SimpleDateFormat filedate = new SimpleDateFormat("-yyyyMMddHHmm");
+
     private String prefix = "logs/events";
     private String suffix = ".log";
     private String plainfile;
@@ -48,85 +55,75 @@ public class StatusLog {
     private long intvl;
     private NodeConfigManager config = NodeConfigManager.getInstance();
 
-    {
-        try {
-            filedate = new SimpleDateFormat("-yyyyMMddHHmm");
-        } catch (Exception e) {
-            eelfLogger.error("Exception", e);
-        }
+    private StatusLog() {
     }
 
     /**
-     * Parse an interval of the form xxhyymzzs and round it to the nearest whole fraction of 24 hours.If no units are specified, assume seconds.
+     * Parse an interval of the form xxhyymzzs and round it to the nearest whole fraction of 24 hours.If no units are
+     * specified, assume seconds.
      */
     public static long parseInterval(String interval, int def) {
         try {
             Matcher m = Pattern.compile("(?:(\\d+)[Hh])?(?:(\\d+)[Mm])?(?:(\\d+)[Ss]?)?").matcher(interval);
             if (m.matches()) {
-                int dur = 0;
-                String x = m.group(1);
-                if (x != null) {
-                    dur += 3600 * Integer.parseInt(x);
-                }
-                x = m.group(2);
-                if (x != null) {
-                    dur += 60 * Integer.parseInt(x);
-                }
-                x = m.group(3);
-                if (x != null) {
-                    dur += Integer.parseInt(x);
-                }
-                if (dur < 60) {
-                    dur = 60;
-                }
+                int dur = getDur(m);
                 int best = 86400;
                 int dist = best - dur;
                 if (dur > best) {
                     dist = dur - best;
                 }
-                int base = 1;
-                for (int i = 0; i < 8; i++) {
-                    int base2 = base;
-                    base *= 2;
-                    for (int j = 0; j < 4; j++) {
-                        int base3 = base2;
-                        base2 *= 3;
-                        for (int k = 0; k < 3; k++) {
-                            int cur = base3;
-                            base3 *= 5;
-                            int ndist = cur - dur;
-                            if (dur > cur) {
-                                ndist = dur - cur;
-                            }
-                            if (ndist < dist) {
-                                best = cur;
-                                dist = ndist;
-                            }
-                        }
-                    }
-                }
+                best = getBest(dur, best, dist);
                 def = best * 1000;
             }
         } catch (Exception e) {
-            eelfLogger.error("Exception", e);
+            eelfLogger.error(EXCEPTION, e);
         }
         return (def);
     }
 
-    private synchronized void checkRoll(long now) throws IOException {
-        if (now >= nexttime) {
-            if (os != null) {
-                os.close();
-                os = null;
+    private static int getBest(int dur, int best, int dist) {
+        int base = 1;
+        for (int i = 0; i < 8; i++) {
+            int base2 = base;
+            base *= 2;
+            for (int j = 0; j < 4; j++) {
+                int base3 = base2;
+                base2 *= 3;
+                for (int k = 0; k < 3; k++) {
+                    int cur = base3;
+                    base3 *= 5;
+                    int ndist = cur - dur;
+                    if (dur > cur) {
+                        ndist = dur - cur;
+                    }
+                    if (ndist < dist) {
+                        best = cur;
+                        dist = ndist;
+                    }
+                }
             }
-            intvl = parseInterval(config.getEventLogInterval(), 300000);
-            prefix = config.getEventLogPrefix();
-            suffix = config.getEventLogSuffix();
-            nexttime = now - now % intvl + intvl;
-            curfile = prefix + filedate.format(new Date(nexttime - intvl)) + suffix;
-            plainfile = prefix + suffix;
-            notify();
         }
+        return best;
+    }
+
+    private static int getDur(Matcher m) {
+        int dur = 0;
+        String x = m.group(1);
+        if (x != null) {
+            dur += 3600 * Integer.parseInt(x);
+        }
+        x = m.group(2);
+        if (x != null) {
+            dur += 60 * Integer.parseInt(x);
+        }
+        x = m.group(3);
+        if (x != null) {
+            dur += Integer.parseInt(x);
+        }
+        if (dur < 60) {
+            dur = 60;
+        }
+        return dur;
     }
 
     /**
@@ -138,111 +135,107 @@ public class StatusLog {
         try {
             instance.checkRoll(System.currentTimeMillis());
         } catch (Exception e) {
-            eelfLogger.error("Exception", e);
+            eelfLogger.error(EXCEPTION, e);
         }
         return (instance.curfile);
     }
 
-    private synchronized void log(String s) {
-        try {
-            long now = System.currentTimeMillis();
-            checkRoll(now);
-            if (os == null) {
-                os = new FileOutputStream(curfile, true);
-                (new File(plainfile)).delete();
-                Files.createLink(Paths.get(plainfile), Paths.get(curfile));
-            }
-            os.write((NodeUtils.logts(new Date(now)) + '|' + s + '\n').getBytes());
-            os.flush();
-        } catch (IOException ioe) {
-            eelfLogger.error("IOException", ioe);
-        }
-    }
-
     /**
      * Log a received publication attempt.
      *
-     * @param pubid  The publish ID assigned by the node
+     * @param pubid The publish ID assigned by the node
      * @param feedid The feed id given by the publisher
      * @param requrl The URL of the received request
      * @param method The method (DELETE or PUT) in the received request
-     * @param ctype  The content type (if method is PUT and clen > 0)
-     * @param clen   The content length (if method is PUT)
-     * @param srcip  The IP address of the publisher
-     * @param user   The identity of the publisher
+     * @param ctype The content type (if method is PUT and clen > 0)
+     * @param clen The content length (if method is PUT)
+     * @param srcip The IP address of the publisher
+     * @param user The identity of the publisher
      * @param status The status returned to the publisher
      */
-    public static void logPub(String pubid, String feedid, String requrl, String method, String ctype, long clen, String srcip, String user, int status) {
-        instance.log("PUB|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + srcip + "|" + user + "|" + status);
+    public static void logPub(String pubid, String feedid, String requrl, String method, String ctype, long clen,
+            String srcip, String user, int status) {
+        instance.log(
+                "PUB|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + srcip
+                        + "|" + user + "|" + status);
     }
 
     /**
      * Log a data transfer error receiving a publication attempt
      *
-     * @param pubid  The publish ID assigned by the node
+     * @param pubid The publish ID assigned by the node
      * @param feedid The feed id given by the publisher
      * @param requrl The URL of the received request
      * @param method The method (DELETE or PUT) in the received request
-     * @param ctype  The content type (if method is PUT and clen > 0)
-     * @param clen   The expected content length (if method is PUT)
-     * @param rcvd   The content length received
-     * @param srcip  The IP address of the publisher
-     * @param user   The identity of the publisher
-     * @param error  The error message from the IO exception
+     * @param ctype The content type (if method is PUT and clen > 0)
+     * @param clen The expected content length (if method is PUT)
+     * @param rcvd The content length received
+     * @param srcip The IP address of the publisher
+     * @param user The identity of the publisher
+     * @param error The error message from the IO exception
      */
-    public static void logPubFail(String pubid, String feedid, String requrl, String method, String ctype, long clen, long rcvd, String srcip, String user, String error) {
-        instance.log("PBF|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + rcvd + "|" + srcip + "|" + user + "|" + error);
+    public static void logPubFail(String pubid, String feedid, String requrl, String method, String ctype, long clen,
+            long rcvd, String srcip, String user, String error) {
+        instance.log("PBF|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + rcvd
+                + "|" + srcip + "|" + user + "|" + error);
     }
 
     /**
      * Log a delivery attempt.
      *
-     * @param pubid  The publish ID assigned by the node
+     * @param pubid The publish ID assigned by the node
      * @param feedid The feed ID
-     * @param subid  The (space delimited list of) subscription ID
+     * @param subid The (space delimited list of) subscription ID
      * @param requrl The URL used in the attempt
      * @param method The method (DELETE or PUT) in the attempt
-     * @param ctype  The content type (if method is PUT, not metaonly, and clen > 0)
-     * @param clen   The content length (if PUT and not metaonly)
-     * @param user   The identity given to the subscriber
+     * @param ctype The content type (if method is PUT, not metaonly, and clen > 0)
+     * @param clen The content length (if PUT and not metaonly)
+     * @param user The identity given to the subscriber
      * @param status The status returned by the subscriber or -1 if an exeception occured trying to connect
      * @param xpubid The publish ID returned by the subscriber
      */
-    public static void logDel(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String user, int status, String xpubid) {
+    public static void logDel(String pubid, String feedid, String subid, String requrl, String method, String ctype,
+            long clen, String user, int status, String xpubid) {
         if (feedid == null) {
             return;
         }
-        instance.log("DEL|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + user + "|" + status + "|" + xpubid);
+        instance.log(
+                "DEL|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen
+                        + "|" + user + "|" + status + "|" + xpubid);
     }
 
     /**
      * Log delivery attempts expired
      *
-     * @param pubid    The publish ID assigned by the node
-     * @param feedid   The feed ID
-     * @param subid    The (space delimited list of) subscription ID
-     * @param requrl   The URL that would be delivered to
-     * @param method   The method (DELETE or PUT) in the request
-     * @param ctype    The content type (if method is PUT, not metaonly, and clen > 0)
-     * @param clen     The content length (if PUT and not metaonly)
-     * @param reason   The reason the attempts were discontinued
+     * @param pubid The publish ID assigned by the node
+     * @param feedid The feed ID
+     * @param subid The (space delimited list of) subscription ID
+     * @param requrl The URL that would be delivered to
+     * @param method The method (DELETE or PUT) in the request
+     * @param ctype The content type (if method is PUT, not metaonly, and clen > 0)
+     * @param clen The content length (if PUT and not metaonly)
+     * @param reason The reason the attempts were discontinued
      * @param attempts The number of attempts made
      */
-    public static void logExp(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String reason, int attempts) {
+    public static void logExp(String pubid, String feedid, String subid, String requrl, String method, String ctype,
+            long clen, String reason, int attempts) {
         if (feedid == null) {
             return;
         }
-        instance.log("EXP|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + reason + "|" + attempts);
+        instance.log(
+                "EXP|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen
+                        + "|" + reason + "|" + attempts);
     }
 
     /**
      * Log extra statistics about unsuccessful delivery attempts.
      *
-     * @param pubid  The publish ID assigned by the node
+     * @param pubid The publish ID assigned by the node
      * @param feedid The feed ID
-     * @param subid  The (space delimited list of) subscription ID
-     * @param clen   The content length
-     * @param sent   The # of bytes sent or -1 if subscriber returned an error instead of 100 Continue, otherwise, the number of bytes sent before an error occurred.
+     * @param subid The (space delimited list of) subscription ID
+     * @param clen The content length
+     * @param sent The # of bytes sent or -1 if subscriber returned an error instead of 100 Continue, otherwise, the
+     * number of bytes sent before an error occurred.
      */
     public static void logDelExtra(String pubid, String feedid, String subid, long clen, long sent) {
         if (feedid == null) {
@@ -251,6 +244,35 @@ public class StatusLog {
         instance.log("DLX|" + pubid + "|" + feedid + "|" + subid + "|" + clen + "|" + sent);
     }
 
-    private StatusLog() {
+    private synchronized void checkRoll(long now) throws IOException {
+        if (now >= nexttime) {
+            if (os != null) {
+                os.close();
+                os = null;
+            }
+            intvl = parseInterval(config.getEventLogInterval(), 300000);
+            prefix = config.getEventLogPrefix();
+            suffix = config.getEventLogSuffix();
+            nexttime = now - now % intvl + intvl;
+            curfile = prefix + filedate.format(new Date(nexttime - intvl)) + suffix;
+            plainfile = prefix + suffix;
+            notify();
+        }
+    }
+
+    private synchronized void log(String s) {
+        try {
+            long now = System.currentTimeMillis();
+            checkRoll(now);
+            if (os == null) {
+                os = new FileOutputStream(curfile, true);
+                (new File(plainfile)).delete();
+                Files.createLink(Paths.get(plainfile), Paths.get(curfile));
+            }
+            os.write((NodeUtils.logts(new Date(now)) + '|' + s + '\n').getBytes());
+            os.flush();
+        } catch (IOException ioe) {
+            eelfLogger.error("IOException", ioe);
+        }
     }
 }
index 6f74df4..fd5a6bc 100644 (file)
@@ -28,6 +28,7 @@ package org.onap.dmaap.datarouter.node;
  * Compare IP addresses as byte arrays to a subnet specified as a CIDR
  */
 public class SubnetMatcher {
+
     private byte[] sn;
     private int len;
     private int mask;
index eb10876..d86b2e9 100644 (file)
@@ -28,6 +28,7 @@ package org.onap.dmaap.datarouter.node;
  * A destination to deliver a message
  */
 public class Target {
+
     private DestInfo destinfo;
     private String routing;
 
@@ -35,7 +36,7 @@ public class Target {
      * A destination to deliver a message
      *
      * @param destinfo Either info for a subscription ID or info for a node-to-node transfer
-     * @param routing  For a node-to-node transfer, what to do when it gets there.
+     * @param routing For a node-to-node transfer, what to do when it gets there.
      */
     public Target(DestInfo destinfo, String routing) {
         this.destinfo = destinfo;
index 33e4f80..1eb73c6 100644 (file)
 
 package org.onap.dmaap.datarouter.node;
 
-import java.util.*;
+import java.util.HashSet;
+import java.util.Iterator;
 
 /**
- * Manage a list of tasks to be executed when an event occurs.
- * This makes the following guarantees:
+ * Manage a list of tasks to be executed when an event occurs. This makes the following guarantees:
  * <ul>
  * <li>Tasks can be safely added and removed in the middle of a run.</li>
  * <li>No task will be returned more than once during a run.</li>
  * <li>No task will be returned when it is not, at that moment, in the list of tasks.</li>
  * <li>At the moment when next() returns null, all tasks on the list have been returned during the run.</li>
- * <li>Initially and once next() returns null during a run, next() will continue to return null until startRun() is called.
+ * <li>Initially and once next() returns null during a run, next() will continue to return null until startRun() is
+ * called.
  * </ul>
  */
 public class TaskList {
+
     private Iterator<Runnable> runlist;
-    private HashSet<Runnable> tasks = new HashSet<Runnable>();
+    private HashSet<Runnable> tasks = new HashSet<>();
     private HashSet<Runnable> togo;
     private HashSet<Runnable> sofar;
     private HashSet<Runnable> added;
     private HashSet<Runnable> removed;
 
-    /**
-     * Construct a new TaskList
-     */
-    public TaskList() {
-    }
-
     /**
      * Start executing the sequence of tasks.
      */
     public synchronized void startRun() {
-        sofar = new HashSet<Runnable>();
-        added = new HashSet<Runnable>();
-        removed = new HashSet<Runnable>();
-        togo = new HashSet<Runnable>(tasks);
+        sofar = new HashSet<>();
+        added = new HashSet<>();
+        removed = new HashSet<>();
+        togo = new HashSet<>(tasks);
         runlist = togo.iterator();
     }
 
@@ -69,18 +65,13 @@ public class TaskList {
         while (runlist != null) {
             if (runlist.hasNext()) {
                 Runnable task = runlist.next();
-                if (removed.contains(task)) {
-                    continue;
+                if (addTaskToSoFar(task)) {
+                    return task;
                 }
-                if (sofar.contains(task)) {
-                    continue;
-                }
-                sofar.add(task);
-                return (task);
             }
-            if (added.size() != 0) {
+            if (!added.isEmpty()) {
                 togo = added;
-                added = new HashSet<Runnable>();
+                added = new HashSet<>();
                 removed.clear();
                 runlist = togo.iterator();
                 continue;
@@ -115,4 +106,15 @@ public class TaskList {
         }
         tasks.remove(task);
     }
+
+    private boolean addTaskToSoFar(Runnable task) {
+        if (removed.contains(task)) {
+            return false;
+        }
+        if (sofar.contains(task)) {
+            return false;
+        }
+        sofar.add(task);
+        return true;
+    }
 }
index 0812007..c21bdec 100644 (file)
@@ -97,7 +97,7 @@ public class DeliveryTest {
 
   private DestInfo[] createDestInfoObjects() {
     DestInfo[] destInfos = new DestInfo[1];
-    DestInfo destInfo = new DestInfo.DestInfoBuilder().setName("node.datarouternew.com").setSpool("spool/s/0/1").setSubid("1")
+    DestInfo destInfo = new DestInfoBuilder().setName("node.datarouternew.com").setSpool("spool/s/0/1").setSubid("1")
         .setLogdata("logs/").setUrl("/subs/1").setAuthuser("user1").setAuthentication("Basic dXNlcjE6cGFzc3dvcmQx")
         .setMetaonly(false).setUse100(true).setPrivilegedSubscriber(false).setFollowRedirects(false)
         .setDecompress(false).createDestInfo();
index db71cea..a375f02 100644 (file)
@@ -219,7 +219,7 @@ public class NodeServletTest {
 
     @Test
     public void Given_Request_Is_HTTP_PUT_On_Publish_On_AAF_Feed_And_Cadi_Enabled_And_No_Permissions_Then_Forbidden_Response_Is_Generated() throws Exception {
-        when(config.getCadiEnabeld()).thenReturn(true);
+        when(config.getCadiEnabled()).thenReturn(true);
         when(config.getAafInstance("1")).thenReturn("*");
         when(request.getPathInfo()).thenReturn("/publish/1/fileName");
         setHeadersForValidRequest(true);