--- /dev/null
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ * * http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ * * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+import javax.servlet.*;\r
+import javax.servlet.http.*;\r
+import java.util.*;\r
+import java.util.regex.*;\r
+import java.io.*;\r
+import java.nio.file.*;\r
+import org.apache.log4j.Logger;\r
+\r
+import com.att.eelf.configuration.EELFLogger;\r
+import com.att.eelf.configuration.EELFManager;\r
+import com.att.research.datarouter.node.eelf.EelfMsgs;\r
+\r
+import java.net.*;\r
+\r
+/**\r
+ * Servlet for handling all http and https requests to the data router node\r
+ * <p>\r
+ * Handled requests are:\r
+ * <br>\r
+ * GET http://<i>node</i>/internal/fetchProv - fetch the provisioning data\r
+ * <br>\r
+ * PUT/DELETE https://<i>node</i>/internal/publish/<i>fileid</i> - n2n transfer\r
+ * <br>\r
+ * PUT/DELETE https://<i>node</i>/publish/<i>feedid</i>/<i>fileid</i> - publsh request\r
+ */\r
+public class NodeServlet extends HttpServlet {\r
+ private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeServlet");\r
+ private static NodeConfigManager config;\r
+ private static Pattern MetaDataPattern;\r
+ private static SubnetMatcher internalsubnet = new SubnetMatcher("135.207.136.128/25");\r
+ //Adding EELF Logger Rally:US664892 \r
+ private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeServlet");\r
+\r
+ static {\r
+ try {\r
+ String ws = "\\s*";\r
+ // assume that \\ and \" have been replaced by X\r
+ String string = "\"[^\"]*\"";\r
+ //String string = "\"(?:[^\"\\\\]|\\\\.)*\"";\r
+ String number = "[+-]?(?:\\.\\d+|(?:0|[1-9]\\d*)(?:\\.\\d*)?)(?:[eE][+-]?\\d+)?";\r
+ String value = "(?:" + string + "|" + number + "|null|true|false)";\r
+ String item = string + ws + ":" + ws + value + ws;\r
+ String object = ws + "\\{" + ws + "(?:" + item + "(?:" + "," + ws + item + ")*)?\\}" + ws;\r
+ MetaDataPattern = Pattern.compile(object, Pattern.DOTALL);\r
+ } catch (Exception e) {\r
+ }\r
+ }\r
+ /**\r
+ * Get the NodeConfigurationManager\r
+ */\r
+ public void init() {\r
+ config = NodeConfigManager.getInstance();\r
+ logger.info("NODE0101 Node Servlet Configured");\r
+ }\r
+ private boolean down(HttpServletResponse resp) throws IOException {\r
+ if (config.isShutdown() || !config.isConfigured()) {\r
+ resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);\r
+ logger.info("NODE0102 Rejecting request: Service is being quiesced");\r
+ return(true);\r
+ }\r
+ return(false);\r
+ }\r
+ /**\r
+ * Handle a GET for /internal/fetchProv\r
+ */\r
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {\r
+ NodeUtils.setIpAndFqdnForEelf("doGet");\r
+ eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+"");\r
+ if (down(resp)) {\r
+ return;\r
+ }\r
+ String path = req.getPathInfo();\r
+ String qs = req.getQueryString();\r
+ String ip = req.getRemoteAddr();\r
+ if (qs != null) {\r
+ path = path + "?" + qs;\r
+ }\r
+ if ("/internal/fetchProv".equals(path)) {\r
+ config.gofetch(ip);\r
+ resp.setStatus(HttpServletResponse.SC_NO_CONTENT);\r
+ return;\r
+ } else if (path.startsWith("/internal/resetSubscription/")) {\r
+ String subid = path.substring(28);\r
+ if (subid.length() != 0 && subid.indexOf('/') == -1) {\r
+ NodeMain.resetQueue(subid, ip);\r
+ resp.setStatus(HttpServletResponse.SC_NO_CONTENT);\r
+ return;\r
+ }\r
+ }\r
+ if (internalsubnet.matches(NodeUtils.getInetAddress(ip))) {\r
+ if (path.startsWith("/internal/logs/")) {\r
+ String f = path.substring(15);\r
+ File fn = new File(config.getLogDir() + "/" + f);\r
+ if (f.indexOf('/') != -1 || !fn.isFile()) {\r
+ logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip);\r
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND);\r
+ return;\r
+ }\r
+ byte[] buf = new byte[65536];\r
+ resp.setContentType("text/plain");\r
+ resp.setContentLength((int)fn.length());\r
+ resp.setStatus(200);\r
+ InputStream is = new FileInputStream(fn);\r
+ OutputStream os = resp.getOutputStream();\r
+ int i;\r
+ while ((i = is.read(buf)) > 0) {\r
+ os.write(buf, 0, i);\r
+ }\r
+ is.close();\r
+ return;\r
+ }\r
+ if (path.startsWith("/internal/rtt/")) {\r
+ String xip = path.substring(14);\r
+ long st = System.currentTimeMillis();\r
+ String status = " unknown";\r
+ try {\r
+ Socket s = new Socket(xip, 443);\r
+ s.close();\r
+ status = " connected";\r
+ } catch (Exception e) {\r
+ status = " error " + e.toString();\r
+ }\r
+ long dur = System.currentTimeMillis() - st;\r
+ resp.setContentType("text/plain");\r
+ resp.setStatus(200);\r
+ byte[] buf = (dur + status + "\n").getBytes();\r
+ resp.setContentLength(buf.length);\r
+ resp.getOutputStream().write(buf);\r
+ return;\r
+ }\r
+ }\r
+ logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip);\r
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND);\r
+ return;\r
+ }\r
+ /**\r
+ * Handle all PUT requests\r
+ */\r
+ protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {\r
+ NodeUtils.setIpAndFqdnForEelf("doPut");\r
+ eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+"");\r
+ common(req, resp, true);\r
+ }\r
+ /**\r
+ * Handle all DELETE requests\r
+ */\r
+ protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {\r
+ NodeUtils.setIpAndFqdnForEelf("doDelete");\r
+ eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+"");\r
+ common(req, resp, false);\r
+ }\r
+ private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws ServletException, IOException {\r
+ if (down(resp)) {\r
+ return;\r
+ }\r
+ if (!req.isSecure()) {\r
+ logger.info("NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");\r
+ return;\r
+ }\r
+ String fileid = req.getPathInfo();\r
+ if (fileid == null) {\r
+ logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");\r
+ return;\r
+ }\r
+ String feedid = null;\r
+ String user = null;\r
+ String credentials = req.getHeader("Authorization");\r
+ if (credentials == null) {\r
+ logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Authorization header required");\r
+ return;\r
+ }\r
+ String ip = req.getRemoteAddr();\r
+ String lip = req.getLocalAddr();\r
+ String pubid = null;\r
+ String xpubid = null;\r
+ String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;\r
+ Target[] targets = null;\r
+ if (fileid.startsWith("/publish/")) {\r
+ fileid = fileid.substring(9);\r
+ int i = fileid.indexOf('/');\r
+ if (i == -1 || i == fileid.length() - 1) {\r
+ logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting <feed-publishing-url>/<fileid>. Possible missing fileid.");\r
+ return;\r
+ }\r
+ feedid = fileid.substring(0, i);\r
+ fileid = fileid.substring(i + 1);\r
+ pubid = config.getPublishId();\r
+ xpubid = req.getHeader("X-ATT-DR-PUBLISH-ID");\r
+ targets = config.getTargets(feedid);\r
+ } else if (fileid.startsWith("/internal/publish/")) {\r
+ if (!config.isAnotherNode(credentials, ip)) {\r
+ logger.info("NODE0107 Rejecting unauthorized node-to-node transfer attempt from " + ip);\r
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN);\r
+ return;\r
+ }\r
+ fileid = fileid.substring(18);\r
+ pubid = req.getHeader("X-ATT-DR-PUBLISH-ID");\r
+ targets = config.parseRouting(req.getHeader("X-ATT-DR-ROUTING"));\r
+ } else {\r
+ logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");\r
+ return;\r
+ }\r
+ if (fileid.indexOf('/') != -1) {\r
+ logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting <feed-publishing-url>/<fileid>.");\r
+ return;\r
+ }\r
+ String qs = req.getQueryString();\r
+ if (qs != null) {\r
+ fileid = fileid + "?" + qs;\r
+ }\r
+ String hp = config.getMyName();\r
+ int xp = config.getExtHttpsPort();\r
+ if (xp != 443) {\r
+ hp = hp + ":" + xp;\r
+ }\r
+ String logurl = "https://" + hp + "/internal/publish/" + fileid;\r
+ if (feedid != null) {\r
+ logurl = "https://" + hp + "/publish/" + feedid + "/" + fileid;\r
+ String reason = config.isPublishPermitted(feedid, credentials, ip);\r
+ if (reason != null) {\r
+ logger.info("NODE0111 Rejecting unauthorized publish attempt to feed " + feedid + " fileid " + fileid + " from " + ip + " reason " + reason);\r
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN,reason);\r
+ return;\r
+ }\r
+ user = config.getAuthUser(feedid, credentials);\r
+ String newnode = config.getIngressNode(feedid, user, ip);\r
+ if (newnode != null) {\r
+ String port = "";\r
+ int iport = config.getExtHttpsPort();\r
+ if (iport != 443) {\r
+ port = ":" + iport;\r
+ }\r
+ String redirto = "https://" + newnode + port + "/publish/" + feedid + "/" + fileid;\r
+ logger.info("NODE0108 Redirecting publish attempt for feed " + feedid + " user " + user + " ip " + ip + " to " + redirto);\r
+ resp.sendRedirect(redirto);\r
+ return;\r
+ }\r
+ resp.setHeader("X-ATT-DR-PUBLISH-ID", pubid);\r
+ }\r
+ String fbase = config.getSpoolDir() + "/" + pubid;\r
+ File data = new File(fbase);\r
+ File meta = new File(fbase + ".M");\r
+ OutputStream dos = null;\r
+ Writer mw = null;\r
+ InputStream is = null;\r
+ try {\r
+ StringBuffer mx = new StringBuffer();\r
+ mx.append(req.getMethod()).append('\t').append(fileid).append('\n');\r
+ Enumeration hnames = req.getHeaderNames();\r
+ String ctype = null;\r
+ while (hnames.hasMoreElements()) {\r
+ String hn = (String)hnames.nextElement();\r
+ String hnlc = hn.toLowerCase();\r
+ if ((isput && ("content-type".equals(hnlc) ||\r
+ "content-language".equals(hnlc) ||\r
+ "content-md5".equals(hnlc) ||\r
+ "content-range".equals(hnlc))) ||\r
+ "x-att-dr-meta".equals(hnlc) ||\r
+ (feedid == null && "x-att-dr-received".equals(hnlc)) ||\r
+ (hnlc.startsWith("x-") && !hnlc.startsWith("x-att-dr-"))) {\r
+ Enumeration hvals = req.getHeaders(hn);\r
+ while (hvals.hasMoreElements()) {\r
+ String hv = (String)hvals.nextElement();\r
+ if ("content-type".equals(hnlc)) {\r
+ ctype = hv;\r
+ }\r
+ if ("x-att-dr-meta".equals(hnlc)) {\r
+ if (hv.length() > 4096) {\r
+ logger.info("NODE0109 Rejecting publish attempt with metadata too long for feed " + feedid + " user " + user + " ip " + ip);\r
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Metadata too long");\r
+ return;\r
+ }\r
+ if (!MetaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) {\r
+ logger.info("NODE0109 Rejecting publish attempt with malformed metadata for feed " + feedid + " user " + user + " ip " + ip);\r
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Malformed metadata");\r
+ return;\r
+ }\r
+ }\r
+ mx.append(hn).append('\t').append(hv).append('\n');\r
+ }\r
+ }\r
+ }\r
+ mx.append("X-ATT-DR-RECEIVED\t").append(rcvd).append('\n');\r
+ String metadata = mx.toString();\r
+ byte[] buf = new byte[1024 * 1024];\r
+ int i;\r
+ try {\r
+ is = req.getInputStream();\r
+ dos = new FileOutputStream(data);\r
+ while ((i = is.read(buf)) > 0) {\r
+ dos.write(buf, 0, i);\r
+ }\r
+ is.close();\r
+ is = null;\r
+ dos.close();\r
+ dos = null;\r
+ } catch (IOException ioe) {\r
+ long exlen = -1;\r
+ try {\r
+ exlen = Long.parseLong(req.getHeader("Content-Length"));\r
+ } catch (Exception e) {\r
+ }\r
+ StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage());\r
+ throw ioe;\r
+ }\r
+ Path dpath = Paths.get(fbase);\r
+ for (Target t: targets) {\r
+ DestInfo di = t.getDestInfo();\r
+ if (di == null) {\r
+ // TODO: unknown destination\r
+ continue;\r
+ }\r
+ String dbase = di.getSpool() + "/" + pubid;\r
+ Files.createLink(Paths.get(dbase), dpath);\r
+ mw = new FileWriter(meta);\r
+ mw.write(metadata);\r
+ if (di.getSubId() == null) {\r
+ mw.write("X-ATT-DR-ROUTING\t" + t.getRouting() + "\n");\r
+ }\r
+ mw.close();\r
+ meta.renameTo(new File(dbase + ".M"));\r
+ }\r
+ resp.setStatus(HttpServletResponse.SC_NO_CONTENT);\r
+ resp.getOutputStream().close();\r
+ StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user, HttpServletResponse.SC_NO_CONTENT);\r
+ } catch (IOException ioe) {\r
+ logger.info("NODE0110 IO Exception receiving publish attempt for feed " + feedid + " user " + user + " ip " + ip + " " + ioe.toString(), ioe);\r
+ throw ioe;\r
+ } finally {\r
+ if (is != null) { try { is.close(); } catch (Exception e) {}}\r
+ if (dos != null) { try { dos.close(); } catch (Exception e) {}}\r
+ if (mw != null) { try { mw.close(); } catch (Exception e) {}}\r
+ try { data.delete(); } catch (Exception e) {}\r
+ try { meta.delete(); } catch (Exception e) {}\r
+ }\r
+ }\r
+ \r
+ private int getIdFromPath(HttpServletRequest req) {\r
+ String path = req.getPathInfo();\r
+ if (path == null || path.length() < 2)\r
+ return -1;\r
+ try {\r
+ return Integer.parseInt(path.substring(1));\r
+ } catch (NumberFormatException e) {\r
+ return -1;\r
+ }\r
+ }\r
+}\r