1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
25 package org.onap.dmaap.datarouter.node;
28 import java.util.Arrays;
29 import java.util.HashSet;
30 import java.util.Hashtable;
31 import java.util.Vector;
32 import org.apache.log4j.Logger;
35 * Processed configuration for this node.
37 * The NodeConfig represents a processed configuration from the Data Router provisioning server. Each time
38 * configuration data is received from the provisioning server, a new NodeConfig is created and the previous one
41 public class NodeConfig {
42 private static Logger logger = Logger.getLogger("org.onap.dmaap.datarouter.node.NodeConfig");
44 * Raw configuration entry for a data router node
46 public static class ProvNode {
51 * Construct a node configuration entry.
53 * @param cname The cname of the node.
55 public ProvNode(String cname) {
60 * Get the cname of the node
62 public String getCName() {
68 * Raw configuration entry for a provisioning parameter
70 public static class ProvParam {
76 * Construct a provisioning parameter configuration entry.
78 * @param name The name of the parameter.
79 * @param value The value of the parameter.
81 public ProvParam(String name, String value) {
87 * Get the name of the parameter.
89 public String getName() {
94 * Get the value of the parameter.
96 public String getValue() {
102 * Raw configuration entry for a data feed.
104 public static class ProvFeed {
107 private String logdata;
108 private String status;
109 private String createdDate;
111 * AAF changes: TDP EPIC US# 307413
112 * Passing aafInstance from to identify legacy/AAF feeds
114 private String aafInstance;
117 * Construct a feed configuration entry.
119 * @param id The feed ID of the entry.
120 * @param logdata String for log entries about the entry.
121 * @param status The reason why this feed cannot be used (Feed has been deleted, Feed has been suspended) or
122 * null if it is valid.
124 public ProvFeed(String id, String logdata, String status, String createdDate, String aafInstance) {
126 this.logdata = logdata;
127 this.status = status;
128 this.createdDate = createdDate;
129 this.aafInstance = aafInstance;
133 * Get the created date of the data feed.
135 public String getCreatedDate()
141 * Get the aafInstance of the data feed.
143 public String getAafInstance() {
148 * Get the feed id of the data feed.
150 public String getId() {
155 * Get the log data of the data feed.
157 public String getLogData() {
162 * Get the status of the data feed.
164 public String getStatus() {
170 * Raw configuration entry for a feed user.
172 public static class ProvFeedUser {
174 private String feedid;
176 private String credentials;
179 * Construct a feed user configuration entry
181 * @param feedid The feed id.
182 * @param user The user that will publish to the feed.
183 * @param credentials The Authorization header the user will use to publish.
185 public ProvFeedUser(String feedid, String user, String credentials) {
186 this.feedid = feedid;
188 this.credentials = credentials;
192 * Get the feed id of the feed user.
194 public String getFeedId() {
199 * Get the user for the feed user.
201 public String getUser() {
206 * Get the credentials for the feed user.
208 public String getCredentials() {
209 return (credentials);
214 * Raw configuration entry for a feed subnet
216 public static class ProvFeedSubnet {
218 private String feedid;
222 * Construct a feed subnet configuration entry
224 * @param feedid The feed ID
225 * @param cidr The CIDR allowed to publish to the feed.
227 public ProvFeedSubnet(String feedid, String cidr) {
228 this.feedid = feedid;
233 * Get the feed id of the feed subnet.
235 public String getFeedId() {
240 * Get the CIDR of the feed subnet.
242 public String getCidr() {
248 * Raw configuration entry for a subscription
250 public static class ProvSubscription {
252 private String subid;
253 private String feedid;
255 private String authuser;
256 private String credentials;
257 private boolean metaonly;
258 private boolean use100;
259 private boolean privilegedSubscriber;
260 private boolean followRedirect;
261 private boolean decompress;
264 * Construct a subscription configuration entry
266 * @param subid The subscription ID
267 * @param feedid The feed ID
268 * @param url The base delivery URL (not including the fileid)
269 * @param authuser The user in the credentials used to deliver
270 * @param credentials The credentials used to authenticate to the delivery URL exactly as they go in the
271 * Authorization header.
272 * @param metaonly Is this a meta data only subscription?
273 * @param use100 Should we send Expect: 100-continue?
274 * @param privilegedSubscriber Can we wait to receive a delete file call before deleting file
275 * @param followRedirect Is follow redirect of destination enabled?
276 * @param decompress To see if they want their information compressed or decompressed
278 public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean followRedirect, boolean decompress) {
280 this.feedid = feedid;
282 this.authuser = authuser;
283 this.credentials = credentials;
284 this.metaonly = metaonly;
285 this.use100 = use100;
286 this.privilegedSubscriber = privilegedSubscriber;
287 this.followRedirect = followRedirect;
288 this.decompress = decompress;
292 * Get the subscription ID
294 public String getSubId() {
301 public String getFeedId() {
306 * Get the delivery URL
308 public String getURL() {
315 public String getAuthUser() {
320 * Get the delivery credentials
322 public String getCredentials() {
323 return (credentials);
327 * Is this a meta data only subscription?
329 public boolean isMetaDataOnly() {
334 * Should we send Expect: 100-continue?
336 public boolean isUsing100() {
341 * Can we wait to receive a delete file call before deleting file
343 public boolean isPrivilegedSubscriber() {
344 return (privilegedSubscriber);
348 * Should i decompress the file before sending it on
350 public boolean isDecompress() {
355 * New field is added - FOLLOW_REDIRECTS feature iTrack:DATARTR-17 - 1706
356 * Get the followRedirect of this destination
358 boolean getFollowRedirect() {
359 return(followRedirect);
364 * Raw configuration entry for controlled ingress to the data router node
366 public static class ProvForceIngress {
368 private String feedid;
369 private String subnet;
371 private String[] nodes;
374 * Construct a forced ingress configuration entry
376 * @param feedid The feed ID that this entry applies to
377 * @param subnet The CIDR for which publisher IP addresses this entry applies to or "" if it applies to all
378 * publisher IP addresses
379 * @param user The publishing user this entry applies to or "" if it applies to all publishing users.
380 * @param nodes The array of FQDNs of the data router nodes to redirect publication attempts to.
382 public ProvForceIngress(String feedid, String subnet, String user, String[] nodes) {
383 this.feedid = feedid;
384 this.subnet = subnet;
388 this.nodes = new String[0];
390 this.nodes = Arrays.copyOf(nodes, nodes.length);
397 public String getFeedId() {
404 public String getSubnet() {
411 public String getUser() {
418 public String[] getNodes() {
424 * Raw configuration entry for controlled egress from the data router
426 public static class ProvForceEgress {
428 private String subid;
432 * Construct a forced egress configuration entry
434 * @param subid The subscription ID the subscription with forced egress
435 * @param node The node handling deliveries for this subscription
437 public ProvForceEgress(String subid, String node) {
443 * Get the subscription ID
445 public String getSubId() {
452 public String getNode() {
458 * Raw configuration entry for routing within the data router network
460 public static class ProvHop {
467 * A human readable description of this entry
469 public String toString() {
470 return ("Hop " + from + "->" + to + " via " + via);
474 * Construct a hop entry
476 * @param from The FQDN of the node with the data to be delivered
477 * @param to The FQDN of the node that will deliver to the subscriber
478 * @param via The FQDN of the node where the from node should send the data
480 public ProvHop(String from, String to, String via) {
489 public String getFrom() {
496 public String getTo() {
501 * Get the next intermediate node
503 public String getVia() {
508 private static class Redirection {
515 private static class Feed {
519 SubnetMatcher[] subnets;
520 Hashtable<String, String> authusers = new Hashtable<String, String>();
521 Redirection[] redirections;
527 private Hashtable<String, String> params = new Hashtable<>();
528 private Hashtable<String, Feed> feeds = new Hashtable<>();
529 private Hashtable<String, DestInfo> nodeinfo = new Hashtable<>();
530 private Hashtable<String, DestInfo> subinfo = new Hashtable<>();
531 private Hashtable<String, IsFrom> nodes = new Hashtable<>();
532 private Hashtable<String, ProvSubscription> provSubscriptions = new Hashtable<>();
533 private String myname;
534 private String myauth;
535 private DestInfo[] alldests;
539 * Process the raw provisioning data to configure this node
541 * @param pd The parsed provisioning data
542 * @param myname My name as seen by external systems
543 * @param spooldir The directory where temporary files live
544 * @param port The port number for URLs
545 * @param nodeauthkey The keying string used to generate node authentication credentials
547 public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) {
548 this.myname = myname;
549 for (ProvParam p : pd.getParams()) {
550 params.put(p.getName(), p.getValue());
552 Vector<DestInfo> destInfos = new Vector<>();
553 myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);
554 for (ProvNode pn : pd.getNodes()) {
555 String cName = pn.getCName();
556 if (nodeinfo.get(cName) != null) {
559 String auth = NodeUtils.getNodeAuthHdr(cName, nodeauthkey);
560 DestInfo di = new DestInfo("n:" + cName, spooldir + "/n/" + cName, null, "n2n-" + cName,
561 "https://" + cName + ":" + port + "/internal/publish", cName, myauth, false, true, false, false, false);
562 (new File(di.getSpool())).mkdirs();
564 nodeinfo.put(cName, di);
565 nodes.put(auth, new IsFrom(cName));
567 PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[0]), pd.getHops());
568 Hashtable<String, Vector<Redirection>> rdtab = new Hashtable<>();
569 for (ProvForceIngress pfi : pd.getForceIngress()) {
570 Vector<Redirection> v = rdtab.get(pfi.getFeedId());
573 rdtab.put(pfi.getFeedId(), v);
575 Redirection r = new Redirection();
576 if (pfi.getSubnet() != null) {
577 r.snm = new SubnetMatcher(pfi.getSubnet());
579 r.user = pfi.getUser();
580 r.nodes = pfi.getNodes();
583 Hashtable<String, Hashtable<String, String>> pfutab = new Hashtable<>();
584 for (ProvFeedUser pfu : pd.getFeedUsers()) {
585 Hashtable<String, String> t = pfutab.get(pfu.getFeedId());
587 t = new Hashtable<>();
588 pfutab.put(pfu.getFeedId(), t);
590 t.put(pfu.getCredentials(), pfu.getUser());
592 Hashtable<String, String> egrtab = new Hashtable<>();
593 for (ProvForceEgress pfe : pd.getForceEgress()) {
594 if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) {
597 egrtab.put(pfe.getSubId(), pfe.getNode());
599 Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<>();
600 for (ProvFeedSubnet pfs : pd.getFeedSubnets()) {
601 Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId());
604 pfstab.put(pfs.getFeedId(), v);
606 v.add(new SubnetMatcher(pfs.getCidr()));
608 Hashtable<String, StringBuffer> feedTargets = new Hashtable<>();
609 HashSet<String> allfeeds = new HashSet<>();
610 for (ProvFeed pfx : pd.getFeeds()) {
611 if (pfx.getStatus() == null) {
612 allfeeds.add(pfx.getId());
615 for (ProvSubscription provSubscription : pd.getSubscriptions()) {
616 String subId = provSubscription.getSubId();
617 String feedId = provSubscription.getFeedId();
618 if (!allfeeds.contains(feedId)) {
621 if (subinfo.get(subId) != null) {
626 sididx = Integer.parseInt(subId);
627 sididx -= sididx % 100;
628 } catch (Exception e) {
629 logger.error("NODE0517 Exception NodeConfig: "+e);
631 String subscriptionDirectory = sididx + "/" + subId;
632 DestInfo destinationInfo = new DestInfo("s:" + subId,
633 spooldir + "/s/" + subscriptionDirectory, provSubscription);
634 (new File(destinationInfo.getSpool())).mkdirs();
635 destInfos.add(destinationInfo);
636 provSubscriptions.put(subId, provSubscription);
637 subinfo.put(subId, destinationInfo);
638 String egr = egrtab.get(subId);
640 subId = pf.getPath(egr) + subId;
642 StringBuffer sb = feedTargets.get(feedId);
644 sb = new StringBuffer();
645 feedTargets.put(feedId, sb);
647 sb.append(' ').append(subId);
649 alldests = destInfos.toArray(new DestInfo[0]);
650 for (ProvFeed pfx : pd.getFeeds()) {
651 String fid = pfx.getId();
652 Feed f = feeds.get(fid);
658 f.createdDate = pfx.getCreatedDate();
659 f.loginfo = pfx.getLogData();
660 f.status = pfx.getStatus();
662 * AAF changes: TDP EPIC US# 307413
663 * Passing aafInstance from ProvFeed to identify legacy/AAF feeds
665 f.aafInstance = pfx.getAafInstance();
666 Vector<SubnetMatcher> v1 = pfstab.get(fid);
668 f.subnets = new SubnetMatcher[0];
670 f.subnets = v1.toArray(new SubnetMatcher[0]);
672 Hashtable<String, String> h1 = pfutab.get(fid);
674 h1 = new Hashtable<String, String>();
677 Vector<Redirection> v2 = rdtab.get(fid);
679 f.redirections = new Redirection[0];
681 f.redirections = v2.toArray(new Redirection[0]);
683 StringBuffer sb = feedTargets.get(fid);
685 f.targets = new Target[0];
687 f.targets = parseRouting(sb.toString());
693 * Parse a target string into an array of targets
695 * @param routing Target string
696 * @return Array of targets.
698 public Target[] parseRouting(String routing) {
699 routing = routing.trim();
700 if ("".equals(routing)) {
701 return (new Target[0]);
703 String[] xx = routing.split("\\s+");
704 Hashtable<String, Target> tmap = new Hashtable<String, Target>();
705 HashSet<String> subset = new HashSet<String>();
706 Vector<Target> tv = new Vector<Target>();
707 Target[] ret = new Target[xx.length];
708 for (int i = 0; i < xx.length; i++) {
710 int j = t.indexOf('/');
712 DestInfo di = subinfo.get(t);
714 tv.add(new Target(null, t));
716 if (!subset.contains(t)) {
718 tv.add(new Target(di, null));
722 String node = t.substring(0, j);
723 String rtg = t.substring(j + 1);
724 DestInfo di = nodeinfo.get(node);
726 tv.add(new Target(null, t));
728 Target tt = tmap.get(node);
730 tt = new Target(di, rtg);
739 return (tv.toArray(new Target[0]));
743 * Check whether this is a valid node-to-node transfer
745 * @param credentials Credentials offered by the supposed node
746 * @param ip IP address the request came from
748 public boolean isAnotherNode(String credentials, String ip) {
749 IsFrom n = nodes.get(credentials);
750 return (n != null && n.isFrom(ip));
754 * Check whether publication is allowed.
756 * @param feedid The ID of the feed being requested.
757 * @param credentials The offered credentials
758 * @param ip The requesting IP address
760 public String isPublishPermitted(String feedid, String credentials, String ip) {
761 Feed f = feeds.get(feedid);
762 String nf = "Feed does not exist";
769 String user = f.authusers.get(credentials);
771 return ("Publisher not permitted for this feed");
773 if (f.subnets.length == 0) {
776 byte[] addr = NodeUtils.getInetAddress(ip);
777 for (SubnetMatcher snm : f.subnets) {
778 if (snm.matches(addr)) {
782 return ("Publisher not permitted for this feed");
786 * Check whether delete file is allowed.
788 * @param subId The ID of the subscription being requested.
790 public boolean isDeletePermitted(String subId) {
791 ProvSubscription provSubscription = provSubscriptions.get(subId);
792 return provSubscription.isPrivilegedSubscriber();
796 * Check whether publication is allowed for AAF Feed.
797 * @param feedid The ID of the feed being requested.
798 * @param ip The requesting IP address
800 public String isPublishPermitted(String feedid, String ip) {
801 Feed f = feeds.get(feedid);
802 String nf = "Feed does not exist";
809 if (f.subnets.length == 0) {
812 byte[] addr = NodeUtils.getInetAddress(ip);
813 for (SubnetMatcher snm: f.subnets) {
814 if (snm.matches(addr)) {
818 return("Publisher not permitted for this feed");
822 * Get authenticated user
824 public String getAuthUser(String feedid, String credentials) {
825 return (feeds.get(feedid).authusers.get(credentials));
829 * AAF changes: TDP EPIC US# 307413
830 * Check AAF_instance for feed ID
831 * @param feedid The ID of the feed specified
833 public String getAafInstance(String feedid) {
834 Feed f = feeds.get(feedid);
835 return f.aafInstance;
839 * Check if the request should be redirected to a different ingress node
841 public String getIngressNode(String feedid, String user, String ip) {
842 Feed f = feeds.get(feedid);
843 if (f.redirections.length == 0) {
846 byte[] addr = NodeUtils.getInetAddress(ip);
847 for (Redirection r : f.redirections) {
848 if (r.user != null && !user.equals(r.user)) {
851 if (r.snm != null && !r.snm.matches(addr)) {
854 for (String n : r.nodes) {
855 if (myname.equals(n)) {
859 if (r.nodes.length == 0) {
862 return (r.nodes[rrcntr++ % r.nodes.length]);
868 * Get a provisioned configuration parameter
870 public String getProvParam(String name) {
871 return (params.get(name));
875 * Get all the DestInfos
877 public DestInfo[] getAllDests() {
882 * Get the targets for a feed
884 * @param feedid The feed ID
885 * @return The targets this feed should be delivered to
887 public Target[] getTargets(String feedid) {
888 if (feedid == null) {
889 return (new Target[0]);
891 Feed f = feeds.get(feedid);
893 return (new Target[0]);
899 * Get the creation date for a feed
900 * @param feedid The feed ID
901 * @return the timestamp of creation date of feed id passed
903 public String getCreatedDate(String feedid) {
904 Feed f = feeds.get(feedid);
905 return(f.createdDate);
909 * Get the feed ID for a subscription
911 * @param subid The subscription ID
912 * @return The feed ID
914 public String getFeedId(String subid) {
915 DestInfo di = subinfo.get(subid);
919 return (di.getLogData());
923 * Get the spool directory for a subscription
925 * @param subid The subscription ID
926 * @return The spool directory
928 public String getSpoolDir(String subid) {
929 DestInfo di = subinfo.get(subid);
933 return (di.getSpool());
937 * Get the Authorization value this node uses
939 * @return The Authorization header value for this node
941 public String getMyAuth() {