[DMAAP-48] Initial code import
[dmaap/datarouter.git] / datarouter-node / src / main / java / com / att / research / datarouter / node / NodeServlet.java
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java
new file mode 100644 (file)
index 0000000..e0ec1f5
--- /dev/null
@@ -0,0 +1,380 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+import javax.servlet.*;\r
+import javax.servlet.http.*;\r
+import java.util.*;\r
+import java.util.regex.*;\r
+import java.io.*;\r
+import java.nio.file.*;\r
+import org.apache.log4j.Logger;\r
+\r
+import com.att.eelf.configuration.EELFLogger;\r
+import com.att.eelf.configuration.EELFManager;\r
+import com.att.research.datarouter.node.eelf.EelfMsgs;\r
+\r
+import java.net.*;\r
+\r
+/**\r
+ *     Servlet for handling all http and https requests to the data router node\r
+ *     <p>\r
+ *     Handled requests are:\r
+ *     <br>\r
+ *     GET http://<i>node</i>/internal/fetchProv - fetch the provisioning data\r
+ *     <br>\r
+ *     PUT/DELETE https://<i>node</i>/internal/publish/<i>fileid</i> - n2n transfer\r
+ *     <br>\r
+ *     PUT/DELETE https://<i>node</i>/publish/<i>feedid</i>/<i>fileid</i> - publsh request\r
+ */\r
+public class NodeServlet extends HttpServlet   {\r
+       private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeServlet");\r
+       private static NodeConfigManager        config;\r
+       private static Pattern  MetaDataPattern;\r
+       private static SubnetMatcher internalsubnet = new SubnetMatcher("135.207.136.128/25");\r
+       //Adding EELF Logger Rally:US664892  \r
+    private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeServlet");\r
+\r
+       static {\r
+               try {\r
+                       String ws = "\\s*";\r
+                       // assume that \\ and \" have been replaced by X\r
+                       String string = "\"[^\"]*\"";\r
+                       //String string = "\"(?:[^\"\\\\]|\\\\.)*\"";\r
+                       String number = "[+-]?(?:\\.\\d+|(?:0|[1-9]\\d*)(?:\\.\\d*)?)(?:[eE][+-]?\\d+)?";\r
+                       String value = "(?:" + string + "|" + number + "|null|true|false)";\r
+                       String item = string + ws + ":" + ws + value + ws;\r
+                       String object = ws + "\\{" + ws + "(?:" + item + "(?:" + "," + ws + item + ")*)?\\}" + ws;\r
+                       MetaDataPattern = Pattern.compile(object, Pattern.DOTALL);\r
+               } catch (Exception e) {\r
+               }\r
+       }\r
+       /**\r
+        *      Get the NodeConfigurationManager\r
+        */\r
+       public void init() {\r
+               config = NodeConfigManager.getInstance();\r
+               logger.info("NODE0101 Node Servlet Configured");\r
+       }\r
+       private boolean down(HttpServletResponse resp) throws IOException {\r
+               if (config.isShutdown() || !config.isConfigured()) {\r
+                       resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);\r
+                       logger.info("NODE0102 Rejecting request: Service is being quiesced");\r
+                       return(true);\r
+               }\r
+               return(false);\r
+       }\r
+       /**\r
+        *      Handle a GET for /internal/fetchProv\r
+        */\r
+       protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {\r
+               NodeUtils.setIpAndFqdnForEelf("doGet");\r
+               eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+"");\r
+               if (down(resp)) {\r
+                       return;\r
+               }\r
+               String path = req.getPathInfo();\r
+               String qs = req.getQueryString();\r
+               String ip = req.getRemoteAddr();\r
+               if (qs != null) {\r
+                       path = path + "?" + qs;\r
+               }\r
+               if ("/internal/fetchProv".equals(path)) {\r
+                       config.gofetch(ip);\r
+                       resp.setStatus(HttpServletResponse.SC_NO_CONTENT);\r
+                       return;\r
+               } else if (path.startsWith("/internal/resetSubscription/")) {\r
+                       String subid = path.substring(28);\r
+                       if (subid.length() != 0 && subid.indexOf('/') == -1) {\r
+                               NodeMain.resetQueue(subid, ip);\r
+                               resp.setStatus(HttpServletResponse.SC_NO_CONTENT);\r
+                               return;\r
+                       }\r
+               }\r
+               if (internalsubnet.matches(NodeUtils.getInetAddress(ip))) {\r
+                       if (path.startsWith("/internal/logs/")) {\r
+                               String f = path.substring(15);\r
+                               File fn = new File(config.getLogDir() + "/" + f);\r
+                               if (f.indexOf('/') != -1 || !fn.isFile()) {\r
+                                       logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip);\r
+                                       resp.sendError(HttpServletResponse.SC_NOT_FOUND);\r
+                                       return;\r
+                               }\r
+                               byte[] buf = new byte[65536];\r
+                               resp.setContentType("text/plain");\r
+                               resp.setContentLength((int)fn.length());\r
+                               resp.setStatus(200);\r
+                               InputStream is = new FileInputStream(fn);\r
+                               OutputStream os = resp.getOutputStream();\r
+                               int i;\r
+                               while ((i = is.read(buf)) > 0) {\r
+                                       os.write(buf, 0, i);\r
+                               }\r
+                               is.close();\r
+                               return;\r
+                       }\r
+                       if (path.startsWith("/internal/rtt/")) {\r
+                               String xip = path.substring(14);\r
+                               long st = System.currentTimeMillis();\r
+                               String status = " unknown";\r
+                               try {\r
+                                       Socket s = new Socket(xip, 443);\r
+                                       s.close();\r
+                                       status = " connected";\r
+                               } catch (Exception e) {\r
+                                       status = " error " + e.toString();\r
+                               }\r
+                               long dur = System.currentTimeMillis() - st;\r
+                               resp.setContentType("text/plain");\r
+                               resp.setStatus(200);\r
+                               byte[] buf = (dur + status + "\n").getBytes();\r
+                               resp.setContentLength(buf.length);\r
+                               resp.getOutputStream().write(buf);\r
+                               return;\r
+                       }\r
+               }\r
+               logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip);\r
+               resp.sendError(HttpServletResponse.SC_NOT_FOUND);\r
+               return;\r
+       }\r
+       /**\r
+        *      Handle all PUT requests\r
+        */\r
+       protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {\r
+               NodeUtils.setIpAndFqdnForEelf("doPut");\r
+               eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+"");\r
+               common(req, resp, true);\r
+       }\r
+       /**\r
+        *      Handle all DELETE requests\r
+        */\r
+       protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {\r
+               NodeUtils.setIpAndFqdnForEelf("doDelete");\r
+               eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+"");\r
+               common(req, resp, false);\r
+       }\r
+       private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws ServletException, IOException {\r
+               if (down(resp)) {\r
+                       return;\r
+               }\r
+               if (!req.isSecure()) {\r
+                       logger.info("NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+                       resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");\r
+                       return;\r
+               }\r
+               String fileid = req.getPathInfo();\r
+               if (fileid == null) {\r
+                       logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+                       resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");\r
+                       return;\r
+               }\r
+               String feedid = null;\r
+               String user = null;\r
+               String credentials = req.getHeader("Authorization");\r
+               if (credentials == null) {\r
+                       logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+                       resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Authorization header required");\r
+                       return;\r
+               }\r
+               String ip = req.getRemoteAddr();\r
+               String lip = req.getLocalAddr();\r
+               String pubid = null;\r
+               String xpubid = null;\r
+               String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;\r
+               Target[]        targets = null;\r
+               if (fileid.startsWith("/publish/")) {\r
+                       fileid = fileid.substring(9);\r
+                       int i = fileid.indexOf('/');\r
+                       if (i == -1 || i == fileid.length() - 1) {\r
+                               logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+                               resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.  Possible missing fileid.");\r
+                               return;\r
+                       }\r
+                       feedid = fileid.substring(0, i);\r
+                       fileid = fileid.substring(i + 1);\r
+                       pubid = config.getPublishId();\r
+                       xpubid = req.getHeader("X-ATT-DR-PUBLISH-ID");\r
+                       targets = config.getTargets(feedid);\r
+               } else if (fileid.startsWith("/internal/publish/")) {\r
+                       if (!config.isAnotherNode(credentials, ip)) {\r
+                               logger.info("NODE0107 Rejecting unauthorized node-to-node transfer attempt from " + ip);\r
+                               resp.sendError(HttpServletResponse.SC_FORBIDDEN);\r
+                               return;\r
+                       }\r
+                       fileid = fileid.substring(18);\r
+                       pubid = req.getHeader("X-ATT-DR-PUBLISH-ID");\r
+                       targets = config.parseRouting(req.getHeader("X-ATT-DR-ROUTING"));\r
+               } else {\r
+                       logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+                       resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");\r
+                       return;\r
+               }\r
+               if (fileid.indexOf('/') != -1) {\r
+                       logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+                       resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");\r
+                       return;\r
+               }\r
+               String qs = req.getQueryString();\r
+               if (qs != null) {\r
+                       fileid = fileid + "?" + qs;\r
+               }\r
+               String hp = config.getMyName();\r
+               int xp = config.getExtHttpsPort();\r
+               if (xp != 443) {\r
+                       hp = hp + ":" + xp;\r
+               }\r
+               String logurl = "https://" + hp + "/internal/publish/" + fileid;\r
+               if (feedid != null) {\r
+                       logurl = "https://" + hp + "/publish/" + feedid + "/" + fileid;\r
+                       String reason = config.isPublishPermitted(feedid, credentials, ip);\r
+                       if (reason != null) {\r
+                               logger.info("NODE0111 Rejecting unauthorized publish attempt to feed " + feedid + " fileid " + fileid + " from " + ip + " reason " + reason);\r
+                               resp.sendError(HttpServletResponse.SC_FORBIDDEN,reason);\r
+                               return;\r
+                       }\r
+                       user = config.getAuthUser(feedid, credentials);\r
+                       String newnode = config.getIngressNode(feedid, user, ip);\r
+                       if (newnode != null) {\r
+                               String port = "";\r
+                               int iport = config.getExtHttpsPort();\r
+                               if (iport != 443) {\r
+                                       port = ":" + iport;\r
+                               }\r
+                               String redirto = "https://" + newnode + port + "/publish/" + feedid + "/" + fileid;\r
+                               logger.info("NODE0108 Redirecting publish attempt for feed " + feedid + " user " + user + " ip " + ip + " to " + redirto);\r
+                               resp.sendRedirect(redirto);\r
+                               return;\r
+                       }\r
+                       resp.setHeader("X-ATT-DR-PUBLISH-ID", pubid);\r
+               }\r
+               String fbase = config.getSpoolDir() + "/" + pubid;\r
+               File data = new File(fbase);\r
+               File meta = new File(fbase + ".M");\r
+               OutputStream dos = null;\r
+               Writer mw = null;\r
+               InputStream is = null;\r
+               try {\r
+                       StringBuffer mx = new StringBuffer();\r
+                       mx.append(req.getMethod()).append('\t').append(fileid).append('\n');\r
+                       Enumeration hnames = req.getHeaderNames();\r
+                       String ctype = null;\r
+                       while (hnames.hasMoreElements()) {\r
+                               String hn = (String)hnames.nextElement();\r
+                               String hnlc = hn.toLowerCase();\r
+                               if ((isput && ("content-type".equals(hnlc) ||\r
+                                   "content-language".equals(hnlc) ||\r
+                                   "content-md5".equals(hnlc) ||\r
+                                   "content-range".equals(hnlc))) ||\r
+                                   "x-att-dr-meta".equals(hnlc) ||\r
+                                   (feedid == null && "x-att-dr-received".equals(hnlc)) ||\r
+                                   (hnlc.startsWith("x-") && !hnlc.startsWith("x-att-dr-"))) {\r
+                                       Enumeration hvals = req.getHeaders(hn);\r
+                                       while (hvals.hasMoreElements()) {\r
+                                               String hv = (String)hvals.nextElement();\r
+                                               if ("content-type".equals(hnlc)) {\r
+                                                       ctype = hv;\r
+                                               }\r
+                                               if ("x-att-dr-meta".equals(hnlc)) {\r
+                                                       if (hv.length() > 4096) {\r
+                                                               logger.info("NODE0109 Rejecting publish attempt with metadata too long for feed " + feedid + " user " + user + " ip " + ip);\r
+                                                               resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Metadata too long");\r
+                                                               return;\r
+                                                       }\r
+                                                       if (!MetaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) {\r
+                                                               logger.info("NODE0109 Rejecting publish attempt with malformed metadata for feed " + feedid + " user " + user + " ip " + ip);\r
+                                                               resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Malformed metadata");\r
+                                                               return;\r
+                                                       }\r
+                                               }\r
+                                               mx.append(hn).append('\t').append(hv).append('\n');\r
+                                       }\r
+                               }\r
+                       }\r
+                       mx.append("X-ATT-DR-RECEIVED\t").append(rcvd).append('\n');\r
+                       String metadata = mx.toString();\r
+                       byte[] buf = new byte[1024 * 1024];\r
+                       int i;\r
+                       try {\r
+                               is = req.getInputStream();\r
+                               dos = new FileOutputStream(data);\r
+                               while ((i = is.read(buf)) > 0) {\r
+                                       dos.write(buf, 0, i);\r
+                               }\r
+                               is.close();\r
+                               is = null;\r
+                               dos.close();\r
+                               dos = null;\r
+                       } catch (IOException ioe) {\r
+                               long exlen = -1;\r
+                               try {\r
+                                       exlen = Long.parseLong(req.getHeader("Content-Length"));\r
+                               } catch (Exception e) {\r
+                               }\r
+                               StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage());\r
+                               throw ioe;\r
+                       }\r
+                       Path dpath = Paths.get(fbase);\r
+                       for (Target t: targets) {\r
+                               DestInfo di = t.getDestInfo();\r
+                               if (di == null) {\r
+                                       // TODO: unknown destination\r
+                                       continue;\r
+                               }\r
+                               String dbase = di.getSpool() + "/" + pubid;\r
+                               Files.createLink(Paths.get(dbase), dpath);\r
+                               mw = new FileWriter(meta);\r
+                               mw.write(metadata);\r
+                               if (di.getSubId() == null) {\r
+                                       mw.write("X-ATT-DR-ROUTING\t" + t.getRouting() + "\n");\r
+                               }\r
+                               mw.close();\r
+                               meta.renameTo(new File(dbase + ".M"));\r
+                       }\r
+                       resp.setStatus(HttpServletResponse.SC_NO_CONTENT);\r
+                       resp.getOutputStream().close();\r
+                       StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user, HttpServletResponse.SC_NO_CONTENT);\r
+               } catch (IOException ioe) {\r
+                       logger.info("NODE0110 IO Exception receiving publish attempt for feed " + feedid + " user " + user + " ip " + ip + " " + ioe.toString(), ioe);\r
+                       throw ioe;\r
+               } finally {\r
+                       if (is != null) { try { is.close(); } catch (Exception e) {}}\r
+                       if (dos != null) { try { dos.close(); } catch (Exception e) {}}\r
+                       if (mw != null) { try { mw.close(); } catch (Exception e) {}}\r
+                       try { data.delete(); } catch (Exception e) {}\r
+                       try { meta.delete(); } catch (Exception e) {}\r
+               }\r
+       }\r
+       \r
+       private int getIdFromPath(HttpServletRequest req) {\r
+               String path = req.getPathInfo();\r
+               if (path == null || path.length() < 2)\r
+                       return -1;\r
+               try {\r
+                       return Integer.parseInt(path.substring(1));\r
+               } catch (NumberFormatException e) {\r
+                       return -1;\r
+               }\r
+       }\r
+}\r