X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=datarouter-prov%2Fsrc%2Fmain%2Fjava%2Fcom%2Fatt%2Fresearch%2Fdatarouter%2Fprovisioning%2FSubscriptionServlet.java;fp=datarouter-prov%2Fsrc%2Fmain%2Fjava%2Fcom%2Fatt%2Fresearch%2Fdatarouter%2Fprovisioning%2FSubscriptionServlet.java;h=0bb4717556b51a09635c7fd887566ee05b669bc3;hb=aaf2df8b908fcb48043d2cd51803d8fd99f18b43;hp=0000000000000000000000000000000000000000;hpb=6ec9a9ce6c1062efa99da501fe8c6ea116aebf6f;p=dmaap%2Fdatarouter.git
diff --git a/datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/SubscriptionServlet.java b/datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/SubscriptionServlet.java
new file mode 100644
index 00000000..0bb47175
--- /dev/null
+++ b/datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/SubscriptionServlet.java
@@ -0,0 +1,476 @@
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+
+package com.att.research.datarouter.provisioning;
+
+import java.io.IOException;
+import java.io.InvalidObjectException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+import java.util.Vector;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.research.datarouter.authz.AuthorizationResponse;
+import com.att.research.datarouter.provisioning.beans.EventLogRecord;
+import com.att.research.datarouter.provisioning.beans.Subscription;
+import com.att.research.datarouter.provisioning.eelf.EelfMsgs;
+
+/**
+ * This servlet handles provisioning for the <subscriptionURL> which is generated by the provisioning
+ * server to handle the inspection, modification, and deletion of a particular subscription to a feed.
+ * It supports DELETE to delete a subscription, GET to retrieve information about the subscription,
+ * and PUT to modify the subscription. In DR 3.0, POST is also supported in order to reset the subscription
+ * timers for individual subscriptions.
+ *
+ * @author Robert Eby
+ * @version $Id$
+ */
+@SuppressWarnings("serial")
+public class SubscriptionServlet extends ProxyServlet {
+ public static final String SUBCNTRL_CONTENT_TYPE = "application/vnd.att-dr.subscription-control";
+ //Adding EELF Logger Rally:US664892
+ private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.provisioning.SubscriptionServlet");
+
+ /**
+ * DELETE on the <subscriptionUrl> -- delete a subscription.
+ * See the Deleting a Subscription section in the Provisioning API
+ * document for details on how this method should be invoked.
+ */
+ @Override
+ public void doDelete(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ setIpAndFqdnForEelf("doDelete");
+ eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_SUBID, req.getHeader(BEHALF_HEADER),getIdFromPath(req)+"");
+ EventLogRecord elr = new EventLogRecord(req);
+ String message = isAuthorizedForProvisioning(req);
+ if (message != null) {
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_FORBIDDEN);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
+ return;
+ }
+ if (isProxyServer()) {
+ super.doDelete(req, resp);
+ return;
+ }
+ String bhdr = req.getHeader(BEHALF_HEADER);
+ if (bhdr == null) {
+ message = "Missing "+BEHALF_HEADER+" header.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ return;
+ }
+ int subid = getIdFromPath(req);
+ if (subid < 0) {
+ message = "Missing or bad subscription number.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ return;
+ }
+ Subscription sub = Subscription.getSubscriptionById(subid);
+ if (sub == null) {
+ message = "Missing or bad subscription number.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_NOT_FOUND);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND, message);
+ return;
+ }
+ // Check with the Authorizer
+ AuthorizationResponse aresp = authz.decide(req);
+ if (! aresp.isAuthorized()) {
+ message = "Policy Engine disallows access.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_FORBIDDEN);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
+ return;
+ }
+
+ // Delete Subscription
+ if (doDelete(sub)) {
+ active_subs--;
+ // send response
+ elr.setResult(HttpServletResponse.SC_NO_CONTENT);
+ eventlogger.info(elr);
+ resp.setStatus(HttpServletResponse.SC_NO_CONTENT);
+ provisioningDataChanged();
+ } else {
+ // Something went wrong with the DELETE
+ elr.setResult(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, DB_PROBLEM_MSG);
+ }
+ }
+ /**
+ * GET on the <subscriptionUrl> -- get information about a subscription.
+ * See the Retreiving Information about a Subscription section in the Provisioning API
+ * document for details on how this method should be invoked.
+ */
+ @Override
+ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ setIpAndFqdnForEelf("doGet");
+ eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_SUBID, req.getHeader(BEHALF_HEADER),getIdFromPath(req)+"");
+ EventLogRecord elr = new EventLogRecord(req);
+ String message = isAuthorizedForProvisioning(req);
+ if (message != null) {
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_FORBIDDEN);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
+ return;
+ }
+ if (isProxyServer()) {
+ super.doGet(req, resp);
+ return;
+ }
+ String bhdr = req.getHeader(BEHALF_HEADER);
+ if (bhdr == null) {
+ message = "Missing "+BEHALF_HEADER+" header.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ return;
+ }
+ int subid = getIdFromPath(req);
+ if (subid < 0) {
+ message = "Missing or bad subscription number.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ return;
+ }
+ Subscription sub = Subscription.getSubscriptionById(subid);
+ if (sub == null) {
+ message = "Missing or bad subscription number.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_NOT_FOUND);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND, message);
+ return;
+ }
+ // Check with the Authorizer
+ AuthorizationResponse aresp = authz.decide(req);
+ if (! aresp.isAuthorized()) {
+ message = "Policy Engine disallows access.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_FORBIDDEN);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
+ return;
+ }
+
+ // send response
+ elr.setResult(HttpServletResponse.SC_OK);
+ eventlogger.info(elr);
+ resp.setStatus(HttpServletResponse.SC_OK);
+ resp.setContentType(SUBFULL_CONTENT_TYPE);
+ resp.getOutputStream().print(sub.asJSONObject(true).toString());
+ }
+ /**
+ * PUT on the <subscriptionUrl> -- modify a subscription.
+ * See the Modifying a Subscription section in the Provisioning API
+ * document for details on how this method should be invoked.
+ */
+ @Override
+ public void doPut(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ setIpAndFqdnForEelf("doPut");
+ eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_SUBID, req.getHeader(BEHALF_HEADER),getIdFromPath(req)+"");
+ EventLogRecord elr = new EventLogRecord(req);
+ String message = isAuthorizedForProvisioning(req);
+ if (message != null) {
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_FORBIDDEN);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
+ return;
+ }
+ if (isProxyServer()) {
+ super.doPut(req, resp);
+ return;
+ }
+ String bhdr = req.getHeader(BEHALF_HEADER);
+ if (bhdr == null) {
+ message = "Missing "+BEHALF_HEADER+" header.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ return;
+ }
+ int subid = getIdFromPath(req);
+ if (subid < 0) {
+ message = "Missing or bad subscription number.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ return;
+ }
+ Subscription oldsub = Subscription.getSubscriptionById(subid);
+ if (oldsub == null) {
+ message = "Missing or bad subscription number.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_NOT_FOUND);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND, message);
+ return;
+ }
+ // Check with the Authorizer
+ AuthorizationResponse aresp = authz.decide(req);
+ if (! aresp.isAuthorized()) {
+ message = "Policy Engine disallows access.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_FORBIDDEN);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
+ return;
+ }
+ // check content type is SUB_CONTENT_TYPE, version 1.0
+ ContentHeader ch = getContentHeader(req);
+ String ver = ch.getAttribute("version");
+ if (!ch.getType().equals(SUB_BASECONTENT_TYPE) || !(ver.equals("1.0") || ver.equals("2.0"))) {
+ message = "Incorrect content-type";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE, message);
+ return;
+ }
+ JSONObject jo = getJSONfromInput(req);
+ if (jo == null) {
+ message = "Badly formed JSON";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ return;
+ }
+ if (intlogger.isDebugEnabled())
+ intlogger.debug(jo.toString());
+ Subscription sub = null;
+ try {
+ sub = new Subscription(jo);
+ } catch (InvalidObjectException e) {
+ message = e.getMessage();
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ return;
+ }
+ sub.setSubid(oldsub.getSubid());
+ sub.setFeedid(oldsub.getFeedid());
+ sub.setSubscriber(bhdr); // set from X-ATT-DR-ON-BEHALF-OF header
+
+ String subjectgroup = (req.getHeader("X-ATT-DR-ON-BEHALF-OF-GROUP")); //Adding for group feature:Rally US708115
+ if (!oldsub.getSubscriber().equals(sub.getSubscriber()) && subjectgroup == null) {
+ message = "This subscriber must be modified by the same subscriber that created it.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ return;
+ }
+
+ // Update SUBSCRIPTIONS table entries
+ if (doUpdate(sub)) {
+ // send response
+ elr.setResult(HttpServletResponse.SC_OK);
+ eventlogger.info(elr);
+ resp.setStatus(HttpServletResponse.SC_OK);
+ resp.setContentType(SUBFULL_CONTENT_TYPE);
+ resp.getOutputStream().print(sub.asLimitedJSONObject().toString());
+
+ /**Change Owner ship of Subscriber Adding for group feature:Rally US708115*/
+ if (jo.has("changeowner") && subjectgroup != null) {
+ Boolean changeowner = (Boolean) jo.get("changeowner");
+ if (changeowner != null && changeowner.equals(true)) {
+ sub.setSubscriber(req.getHeader(BEHALF_HEADER));
+ sub.changeOwnerShip();
+ }
+ }
+ /***End of change ownership*/
+
+ provisioningDataChanged();
+ } else {
+ // Something went wrong with the UPDATE
+ elr.setResult(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, DB_PROBLEM_MSG);
+ }
+ }
+ /**
+ * POST on the <subscriptionUrl> -- control a subscription.
+ * See the Resetting a Subscription's Retry Schedule section in the Provisioning API
+ * document for details on how this method should be invoked.
+ */
+ @Override
+ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+// OLD pre-3.0 code
+// String message = "POST not allowed for the subscriptionURL.";
+// EventLogRecord elr = new EventLogRecord(req);
+// elr.setMessage(message);
+// elr.setResult(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
+// eventlogger.info(elr);
+// resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED, message);
+
+ setIpAndFqdnForEelf("doPost");
+ eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF, req.getHeader(BEHALF_HEADER));
+ EventLogRecord elr = new EventLogRecord(req);
+ String message = isAuthorizedForProvisioning(req);
+ if (message != null) {
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_FORBIDDEN);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
+ return;
+ }
+ if (isProxyServer()) {
+ super.doPost(req, resp);
+ return;
+ }
+ String bhdr = req.getHeader(BEHALF_HEADER);
+ if (bhdr == null) {
+ message = "Missing "+BEHALF_HEADER+" header.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ return;
+ }
+ final int subid = getIdFromPath(req);
+ if (subid < 0 || Subscription.getSubscriptionById(subid) == null) {
+ message = "Missing or bad subscription number.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ return;
+ }
+ // check content type is SUBCNTRL_CONTENT_TYPE, version 1.0
+ ContentHeader ch = getContentHeader(req);
+ String ver = ch.getAttribute("version");
+ if (!ch.getType().equals(SUBCNTRL_CONTENT_TYPE) || !ver.equals("1.0")) {
+ message = "Incorrect content-type";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE, message);
+ return;
+ }
+ // Check with the Authorizer
+ AuthorizationResponse aresp = authz.decide(req);
+ if (! aresp.isAuthorized()) {
+ message = "Policy Engine disallows access.";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_FORBIDDEN);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN, message);
+ return;
+ }
+ JSONObject jo = getJSONfromInput(req);
+ if (jo == null) {
+ message = "Badly formed JSON";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ return;
+ }
+ try {
+ // Only the active POD sends notifications
+ boolean active = SynchronizerTask.getSynchronizer().isActive();
+ boolean b = jo.getBoolean("failed");
+ if (active && !b) {
+ // Notify all nodes to reset the subscription
+ SubscriberNotifyThread t = new SubscriberNotifyThread();
+ t.resetSubscription(subid);
+ t.start();
+ }
+ // send response
+ elr.setResult(HttpServletResponse.SC_ACCEPTED);
+ eventlogger.info(elr);
+ resp.setStatus(HttpServletResponse.SC_ACCEPTED);
+ } catch (JSONException e) {
+ message = "Badly formed JSON";
+ elr.setMessage(message);
+ elr.setResult(HttpServletResponse.SC_BAD_REQUEST);
+ eventlogger.info(elr);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, message);
+ }
+ }
+
+ /**
+ * A Thread class used to serially send reset notifications to all nodes in the DR network,
+ * when a POST is received for a subscription.
+ */
+ public class SubscriberNotifyThread extends Thread {
+ public static final String URL_TEMPLATE = "http://%s/internal/resetSubscription/%d";
+ private List urls = new Vector();
+
+ public SubscriberNotifyThread() {
+ setName("SubscriberNotifyThread");
+ }
+ public void resetSubscription(int subid) {
+ for (String nodename : BaseServlet.getNodes()) {
+ String u = String.format(URL_TEMPLATE, nodename, subid);
+ urls.add(u);
+ }
+ }
+ public void run() {
+ try {
+ while (!urls.isEmpty()) {
+ String u = urls.remove(0);
+ try {
+ URL url = new URL(u);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.connect();
+ conn.getContentLength(); // Force the GET through
+ conn.disconnect();
+ } catch (IOException e) {
+ intlogger.info("IOException Error accessing URL: "+u+": " + e.getMessage());
+ }
+ }
+ } catch (Exception e) {
+ intlogger.warn("Caught exception in SubscriberNotifyThread: "+e);
+ e.printStackTrace();
+ }
+ }
+ }
+}