--- /dev/null
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+package org.onap.dmaap.datarouter.node;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Hashtable;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@SuppressStaticInitializationFor("org.onap.dmaap.datarouter.node.NodeConfigManager")
+public class DeliveryTest {
+
+ @Mock
+ private DeliveryQueue deliveryQueue;
+
+ private File nDir = new File("tmp/n");
+ private File sDir = new File("tmp/s");
+
+ @Before
+ public void setUp() throws IOException {
+ nDir.mkdirs();
+ sDir.mkdirs();
+ File newNDir = new File("tmp/n/0");
+ newNDir.mkdirs();
+ File newNFile = new File("tmp/n/0/testN.txt");
+ newNFile.createNewFile();
+ File newSDir = new File("tmp/s/0/1");
+ newSDir.mkdirs();
+ File newSpoolFile = new File("tmp/s/0/1/testSpool.txt");
+ newSpoolFile.createNewFile();
+ }
+
+ @Test
+ public void Validate_Reset_Queue_Calls_Reset_Queue_On_Delivery_Queue_Object() throws IllegalAccessException {
+ NodeConfigManager config = mockNodeConfigManager();
+ Delivery delivery = new Delivery(config);
+ Hashtable<String, DeliveryQueue> dqs = new Hashtable<>();
+ dqs.put("spool/s/0/1", deliveryQueue);
+ FieldUtils.writeDeclaredField(delivery, "dqs", dqs, true);
+ delivery.resetQueue("spool/s/0/1");
+ verify(deliveryQueue, times(1)).resetQueue();
+ }
+
+ @After
+ public void tearDown() {
+ nDir.delete();
+ sDir.delete();
+ File tmpDir = new File("tmp");
+ tmpDir.delete();
+ }
+
+ private NodeConfigManager mockNodeConfigManager() {
+ PowerMockito.mockStatic(NodeConfigManager.class);
+ NodeConfigManager config = mock(NodeConfigManager.class);
+ PowerMockito.when(config.isConfigured()).thenReturn(true);
+ PowerMockito.when(config.getAllDests()).thenReturn(createDestInfoObjects());
+ PowerMockito.when(config.getFreeDiskStart()).thenReturn(0.49);
+ PowerMockito.when(config.getFreeDiskStop()).thenReturn(0.5);
+ PowerMockito.when(config.getDeliveryThreads()).thenReturn(0);
+ PowerMockito.when(config.getSpoolBase()).thenReturn("tmp");
+ return config;
+ }
+
+ private DestInfo[] createDestInfoObjects() {
+ DestInfo[] destInfos = new DestInfo[1];
+ DestInfo destInfo = new DestInfo("node.datarouternew.com", "spool/s/0/1", "1", "logs/", "/subs/1", "user1", "Basic dXNlcjE6cGFzc3dvcmQx", false, true);
+ destInfos[0] = destInfo;
+ return destInfos;
+ }
+}
DB db = new DB();\r
@SuppressWarnings("resource")\r
Connection conn = db.getConnection();\r
- PreparedStatement ps = conn.prepareStatement(SELECT_SQL);\r
- ps.setLong(1, from);\r
- ps.setLong(2, to);\r
- ResultSet rs = ps.executeQuery();\r
- while (rs.next()) {\r
- String id = rs.getString("PUBLISH_ID");\r
- int feed = rs.getInt("FEEDID");\r
- long etime = rs.getLong("EVENT_TIME");\r
- String type = rs.getString("TYPE");\r
- String fid = rs.getString("FEED_FILEID");\r
- long clen = rs.getLong("CONTENT_LENGTH");\r
- String date = sdf.format(new Date(getPstart(id)));\r
- String key = date + "," + feed;\r
- Counters c = map.get(key);\r
- if (c == null) {\r
- c = new Counters(date, feed);\r
- map.put(key, c);\r
+ try(PreparedStatement ps = conn.prepareStatement(SELECT_SQL)) {\r
+ ps.setLong(1, from);\r
+ ps.setLong(2, to);\r
+ try(ResultSet rs = ps.executeQuery()) {\r
+ while (rs.next()) {\r
+ String id = rs.getString("PUBLISH_ID");\r
+ int feed = rs.getInt("FEEDID");\r
+ long etime = rs.getLong("EVENT_TIME");\r
+ String type = rs.getString("TYPE");\r
+ String fid = rs.getString("FEED_FILEID");\r
+ long clen = rs.getLong("CONTENT_LENGTH");\r
+ String date = sdf.format(new Date(getPstart(id)));\r
+ String key = date + "," + feed;\r
+ Counters c = map.get(key);\r
+ if (c == null) {\r
+ c = new Counters(date, feed);\r
+ map.put(key, c);\r
+ }\r
+ c.addEvent(etime, type, id, fid, clen);\r
+ }\r
}\r
- c.addEvent(etime, type, id, fid, clen);\r
+\r
+ db.release(conn);\r
}\r
- rs.close();\r
- ps.close();\r
- db.release(conn);\r
} catch (SQLException e) {\r
e.printStackTrace();\r
}\r
logger.debug("Query time: " + (System.currentTimeMillis()-start) + " ms");\r
- try {\r
- PrintWriter os = new PrintWriter(outfile);\r
+ try (PrintWriter os = new PrintWriter(outfile)){\r
os.println("date,feedid,minsize,maxsize,avgsize,minlat,maxlat,avglat,fanout");\r
for (String key : new TreeSet<String>(map.keySet())) {\r
Counters c = map.get(key);\r
os.println(c.toString());\r
}\r
- os.close();\r
} catch (FileNotFoundException e) {\r
System.err.println("File cannot be written: "+outfile);\r
}\r
DB db = new DB();\r
@SuppressWarnings("resource")\r
Connection conn = db.getConnection();\r
- PreparedStatement ps = conn.prepareStatement(SELECT_SQL);\r
+ try(PreparedStatement ps = conn.prepareStatement(SELECT_SQL)){\r
ps.setLong(1, from);\r
ps.setLong(2, to);\r
- ResultSet rs = ps.executeQuery();\r
- PrintWriter os = new PrintWriter(outfile);\r
- os.println("recordid,feedid,uri,size,min,max,avg,fanout");\r
- Counters c = null;\r
- while (rs.next()) {\r
- long etime = rs.getLong("EVENT_TIME");\r
- String type = rs.getString("TYPE");\r
- String id = rs.getString("PUBLISH_ID");\r
- String fid = rs.getString("FEED_FILEID");\r
- int feed = rs.getInt("FEEDID");\r
- long clen = rs.getLong("CONTENT_LENGTH");\r
- if (c != null && !id.equals(c.id)) {\r
- String line = id + "," + c.toString();\r
- os.println(line);\r
- c = null;\r
+ try(ResultSet rs = ps.executeQuery()) {\r
+ try(PrintWriter os = new PrintWriter(outfile)) {\r
+ os.println("recordid,feedid,uri,size,min,max,avg,fanout");\r
+ Counters c = null;\r
+ while (rs.next()) {\r
+ long etime = rs.getLong("EVENT_TIME");\r
+ String type = rs.getString("TYPE");\r
+ String id = rs.getString("PUBLISH_ID");\r
+ String fid = rs.getString("FEED_FILEID");\r
+ int feed = rs.getInt("FEEDID");\r
+ long clen = rs.getLong("CONTENT_LENGTH");\r
+ if (c != null && !id.equals(c.id)) {\r
+ String line = id + "," + c.toString();\r
+ os.println(line);\r
+ c = null;\r
+ }\r
+ if (c == null) {\r
+ c = new Counters(id, feed, clen, fid);\r
+ }\r
+ if (feed != c.feedid)\r
+ System.err.println("Feed ID mismatch, " + feed + " <=> " + c.feedid);\r
+ if (clen != c.clen)\r
+ System.err.println("Cont Len mismatch, " + clen + " <=> " + c.clen);\r
+ c.addEvent(type, etime);\r
+ }\r
}\r
- if (c == null) {\r
- c = new Counters(id, feed, clen, fid);\r
- }\r
- if (feed != c.feedid)\r
- System.err.println("Feed ID mismatch, " + feed + " <=> " + c.feedid);\r
- if (clen != c.clen)\r
- System.err.println("Cont Len mismatch, " + clen + " <=> " + c.clen);\r
-// if (fid != c.fileid)\r
-// System.err.println("File ID mismatch, "+fid+" <=> "+c.fileid);\r
- c.addEvent(type, etime);\r
+ db.release(conn);\r
+ }\r
}\r
- rs.close();\r
- ps.close();\r
- db.release(conn);\r
- os.close();\r
} catch (FileNotFoundException e) {\r
System.err.println("File cannot be written: " + outfile);\r
} catch (SQLException e) {\r
public void run() {\r
Map<String, Counters> map = new HashMap<String, Counters>();\r
long start = System.currentTimeMillis();\r
+\r
try {\r
DB db = new DB();\r
@SuppressWarnings("resource")\r
Connection conn = db.getConnection();\r
- PreparedStatement ps = conn.prepareStatement(SELECT_SQL);\r
- ps.setLong(1, from);\r
- ps.setLong(2, to);\r
- ResultSet rs = ps.executeQuery();\r
- while (rs.next()) {\r
- String date = rs.getString("DATE");\r
- int sub = rs.getInt("DELIVERY_SUBID");\r
- int res = rs.getInt("RESULT");\r
- int count = rs.getInt("COUNT");\r
- String key = date + "," + sub;\r
- Counters c = map.get(key);\r
- if (c == null) {\r
- c = new Counters(date, sub);\r
- map.put(key, c);\r
+ try(PreparedStatement ps = conn.prepareStatement(SELECT_SQL)) {\r
+ ps.setLong(1, from);\r
+ ps.setLong(2, to);\r
+ try(ResultSet rs = ps.executeQuery()) {\r
+ while (rs.next()) {\r
+ String date = rs.getString("DATE");\r
+ int sub = rs.getInt("DELIVERY_SUBID");\r
+ int res = rs.getInt("RESULT");\r
+ int count = rs.getInt("COUNT");\r
+ String key = date + "," + sub;\r
+ Counters c = map.get(key);\r
+ if (c == null) {\r
+ c = new Counters(date, sub);\r
+ map.put(key, c);\r
+ }\r
+ c.addCounts(res, count);\r
+ }\r
}\r
- c.addCounts(res, count);\r
}\r
- rs.close();\r
- ps.close();\r
\r
- ps = conn.prepareStatement(SELECT_SQL2);\r
- ps.setLong(1, from);\r
- ps.setLong(2, to);\r
- rs = ps.executeQuery();\r
- while (rs.next()) {\r
- String date = rs.getString("DATE");\r
- int sub = rs.getInt("DELIVERY_SUBID");\r
- int count = rs.getInt("COUNT");\r
- String key = date + "," + sub;\r
- Counters c = map.get(key);\r
- if (c == null) {\r
- c = new Counters(date, sub);\r
- map.put(key, c);\r
- }\r
- c.addDlxCount(count);\r
- }\r
- rs.close();\r
- ps.close();\r
+ try( PreparedStatement ps2 = conn.prepareStatement(SELECT_SQL2)) {\r
+ ps2.setLong(1, from);\r
+ ps2.setLong(2, to);\r
+ try(ResultSet rs2 = ps2.executeQuery()) {\r
+ while (rs2.next()) {\r
+ String date = rs2.getString("DATE");\r
+ int sub = rs2.getInt("DELIVERY_SUBID");\r
+ int count = rs2.getInt("COUNT");\r
+ String key = date + "," + sub;\r
+ Counters c = map.get(key);\r
+ if (c == null) {\r
+ c = new Counters(date, sub);\r
+ map.put(key, c);\r
+ }\r
+ c.addDlxCount(count);\r
+ }\r
+ }\r
+ }\r
\r
db.release(conn);\r
} catch (SQLException e) {\r
e.printStackTrace();\r
}\r
logger.debug("Query time: " + (System.currentTimeMillis() - start) + " ms");\r
- try {\r
- PrintWriter os = new PrintWriter(outfile);\r
+ try (PrintWriter os = new PrintWriter(outfile)){\r
os.println("date,subid,count100,count200,count300,count400,count500,countminus1,countdlx");\r
for (String key : new TreeSet<String>(map.keySet())) {\r
Counters c = map.get(key);\r
os.println(c.toString());\r
}\r
- os.close();\r
} catch (FileNotFoundException e) {\r
System.err.println("File cannot be written: " + outfile);\r
}\r
import java.util.Map;\r
import java.util.TreeSet;\r
\r
+import org.apache.log4j.Logger;\r
import org.onap.dmaap.datarouter.provisioning.utils.DB;\r
\r
/**\r
public class VolumeReport extends ReportBase {\r
private static final String SELECT_SQL = "select EVENT_TIME, TYPE, FEEDID, CONTENT_LENGTH, RESULT" +\r
" from LOG_RECORDS where EVENT_TIME >= ? and EVENT_TIME <= ? LIMIT ?, ?";\r
-\r
+ private Logger loggerVolumeReport=Logger.getLogger("org.onap.dmaap.datarouter.reports");\r
private class Counters {\r
public int filespublished, filesdelivered, filesexpired;\r
public long bytespublished, bytesdelivered, bytesexpired;\r
final long stepsize = 6000000L;\r
boolean go_again = true;\r
for (long i = 0; go_again; i += stepsize) {\r
- PreparedStatement ps = conn.prepareStatement(SELECT_SQL);\r
- ps.setLong(1, from);\r
- ps.setLong(2, to);\r
- ps.setLong(3, i);\r
- ps.setLong(4, stepsize);\r
- ResultSet rs = ps.executeQuery();\r
- go_again = false;\r
- while (rs.next()) {\r
- go_again = true;\r
- long etime = rs.getLong("EVENT_TIME");\r
- String type = rs.getString("TYPE");\r
- int feed = rs.getInt("FEEDID");\r
- long clen = rs.getLong("CONTENT_LENGTH");\r
- String key = sdf.format(new Date(etime)) + ":" + feed;\r
- Counters c = map.get(key);\r
- if (c == null) {\r
- c = new Counters();\r
- map.put(key, c);\r
- }\r
- if (type.equalsIgnoreCase("pub")) {\r
- c.filespublished++;\r
- c.bytespublished += clen;\r
- } else if (type.equalsIgnoreCase("del")) {\r
- // Only count successful deliveries\r
- int statusCode = rs.getInt("RESULT");\r
- if (statusCode >= 200 && statusCode < 300) {\r
- c.filesdelivered++;\r
- c.bytesdelivered += clen;\r
+ try (PreparedStatement ps = conn.prepareStatement(SELECT_SQL)) {\r
+ ps.setLong(1, from);\r
+ ps.setLong(2, to);\r
+ ps.setLong(3, i);\r
+ ps.setLong(4, stepsize);\r
+ try(ResultSet rs = ps.executeQuery()) {\r
+ go_again = false;\r
+ while (rs.next()) {\r
+ go_again = true;\r
+ long etime = rs.getLong("EVENT_TIME");\r
+ String type = rs.getString("TYPE");\r
+ int feed = rs.getInt("FEEDID");\r
+ long clen = rs.getLong("CONTENT_LENGTH");\r
+ String key = sdf.format(new Date(etime)) + ":" + feed;\r
+ Counters c = map.get(key);\r
+ if (c == null) {\r
+ c = new Counters();\r
+ map.put(key, c);\r
+ }\r
+ if (type.equalsIgnoreCase("pub")) {\r
+ c.filespublished++;\r
+ c.bytespublished += clen;\r
+ } else if (type.equalsIgnoreCase("del")) {\r
+ // Only count successful deliveries\r
+ int statusCode = rs.getInt("RESULT");\r
+ if (statusCode >= 200 && statusCode < 300) {\r
+ c.filesdelivered++;\r
+ c.bytesdelivered += clen;\r
+ }\r
+ } else if (type.equalsIgnoreCase("exp")) {\r
+ c.filesexpired++;\r
+ c.bytesexpired += clen;\r
+ }\r
}\r
- } else if (type.equalsIgnoreCase("exp")) {\r
- c.filesexpired++;\r
- c.bytesexpired += clen;\r
}\r
+\r
+ }\r
+ catch (SQLException sqlException)\r
+ {\r
+ loggerVolumeReport.error("SqlException",sqlException);\r
}\r
- rs.close();\r
- ps.close();\r
}\r
+\r
db.release(conn);\r
} catch (SQLException e) {\r
e.printStackTrace();\r
}\r
logger.debug("Query time: " + (System.currentTimeMillis() - start) + " ms");\r
- try {\r
- PrintWriter os = new PrintWriter(outfile);\r
+ try (PrintWriter os = new PrintWriter(outfile)) {\r
os.println("date,feedid,filespublished,bytespublished,filesdelivered,bytesdelivered,filesexpired,bytesexpired");\r
- for (String key : new TreeSet<String>(map.keySet())) {\r
+ for(String key :new TreeSet<String>(map.keySet()))\r
+ {\r
Counters c = map.get(key);\r
String[] p = key.split(":");\r
os.println(String.format("%s,%s,%s", p[0], p[1], c.toString()));\r
}\r
- os.close();\r
- } catch (FileNotFoundException e) {\r
+ }\r
+ catch (FileNotFoundException e) {\r
System.err.println("File cannot be written: " + outfile);\r
}\r
}\r