1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
25 package org.onap.dmaap.datarouter.node;
28 import java.util.HashSet;
29 import java.util.Hashtable;
30 import java.util.Vector;
33 * Processed configuration for this node.
35 * The NodeConfig represents a processed configuration from the Data Router provisioning server. Each time
36 * configuration data is received from the provisioning server, a new NodeConfig is created and the previous one
39 public class NodeConfig {
42 * Raw configuration entry for a data router node
44 public static class ProvNode {
49 * Construct a node configuration entry.
51 * @param cname The cname of the node.
53 public ProvNode(String cname) {
58 * Get the cname of the node
60 public String getCName() {
66 * Raw configuration entry for a provisioning parameter
68 public static class ProvParam {
74 * Construct a provisioning parameter configuration entry.
76 * @param name The name of the parameter.
77 * @param value The value of the parameter.
79 public ProvParam(String name, String value) {
85 * Get the name of the parameter.
87 public String getName() {
92 * Get the value of the parameter.
94 public String getValue() {
100 * Raw configuration entry for a data feed.
102 public static class ProvFeed {
105 private String logdata;
106 private String status;
109 * Construct a feed configuration entry.
111 * @param id The feed ID of the entry.
112 * @param logdata String for log entries about the entry.
113 * @param status The reason why this feed cannot be used (Feed has been deleted, Feed has been suspended) or
114 * null if it is valid.
116 public ProvFeed(String id, String logdata, String status) {
118 this.logdata = logdata;
119 this.status = status;
123 * Get the feed id of the data feed.
125 public String getId() {
130 * Get the log data of the data feed.
132 public String getLogData() {
137 * Get the status of the data feed.
139 public String getStatus() {
145 * Raw configuration entry for a feed user.
147 public static class ProvFeedUser {
149 private String feedid;
151 private String credentials;
154 * Construct a feed user configuration entry
156 * @param feedid The feed id.
157 * @param user The user that will publish to the feed.
158 * @param credentials The Authorization header the user will use to publish.
160 public ProvFeedUser(String feedid, String user, String credentials) {
161 this.feedid = feedid;
163 this.credentials = credentials;
167 * Get the feed id of the feed user.
169 public String getFeedId() {
174 * Get the user for the feed user.
176 public String getUser() {
181 * Get the credentials for the feed user.
183 public String getCredentials() {
184 return (credentials);
189 * Raw configuration entry for a feed subnet
191 public static class ProvFeedSubnet {
193 private String feedid;
197 * Construct a feed subnet configuration entry
199 * @param feedid The feed ID
200 * @param cidr The CIDR allowed to publish to the feed.
202 public ProvFeedSubnet(String feedid, String cidr) {
203 this.feedid = feedid;
208 * Get the feed id of the feed subnet.
210 public String getFeedId() {
215 * Get the CIDR of the feed subnet.
217 public String getCidr() {
223 * Raw configuration entry for a subscription
225 public static class ProvSubscription {
227 private String subid;
228 private String feedid;
230 private String authuser;
231 private String credentials;
232 private boolean metaonly;
233 private boolean use100;
234 private boolean privilegedSubscriber;
235 private boolean decompress;
238 * Construct a subscription configuration entry
240 * @param subid The subscription ID
241 * @param feedid The feed ID
242 * @param url The base delivery URL (not including the fileid)
243 * @param authuser The user in the credentials used to deliver
244 * @param credentials The credentials used to authenticate to the delivery URL exactly as they go in the
245 * Authorization header.
246 * @param metaonly Is this a meta data only subscription?
247 * @param use100 Should we send Expect: 100-continue?
248 * @param privilegedSubscriber Can we wait to receive a delete file call before deleting file
249 * @param decompress To see if they want their information compressed or decompressed
251 public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials,
252 boolean metaonly, boolean use100, boolean privilegedSubscriber, boolean decompress) {
254 this.feedid = feedid;
256 this.authuser = authuser;
257 this.credentials = credentials;
258 this.metaonly = metaonly;
259 this.use100 = use100;
260 this.privilegedSubscriber = privilegedSubscriber;
261 this.decompress = decompress;
265 * Get the subscription ID
267 public String getSubId() {
274 public String getFeedId() {
279 * Get the delivery URL
281 public String getURL() {
288 public String getAuthUser() {
293 * Get the delivery credentials
295 public String getCredentials() {
296 return (credentials);
300 * Is this a meta data only subscription?
302 public boolean isMetaDataOnly() {
307 * Should we send Expect: 100-continue?
309 public boolean isUsing100() {
314 * Can we wait to receive a delete file call before deleting file
316 public boolean isPrivilegedSubscriber() {
317 return (privilegedSubscriber);
321 * Should i decompress the file before sending it on
323 public boolean isDecompress() {
329 * Raw configuration entry for controlled ingress to the data router node
331 public static class ProvForceIngress {
333 private String feedid;
334 private String subnet;
336 private String[] nodes;
339 * Construct a forced ingress configuration entry
341 * @param feedid The feed ID that this entry applies to
342 * @param subnet The CIDR for which publisher IP addresses this entry applies to or "" if it applies to all
343 * publisher IP addresses
344 * @param user The publishing user this entry applies to or "" if it applies to all publishing users.
345 * @param nodes The array of FQDNs of the data router nodes to redirect publication attempts to.
347 public ProvForceIngress(String feedid, String subnet, String user, String[] nodes) {
348 this.feedid = feedid;
349 this.subnet = subnet;
357 public String getFeedId() {
364 public String getSubnet() {
371 public String getUser() {
378 public String[] getNodes() {
384 * Raw configuration entry for controlled egress from the data router
386 public static class ProvForceEgress {
388 private String subid;
392 * Construct a forced egress configuration entry
394 * @param subid The subscription ID the subscription with forced egress
395 * @param node The node handling deliveries for this subscription
397 public ProvForceEgress(String subid, String node) {
403 * Get the subscription ID
405 public String getSubId() {
412 public String getNode() {
418 * Raw configuration entry for routing within the data router network
420 public static class ProvHop {
427 * A human readable description of this entry
429 public String toString() {
430 return ("Hop " + from + "->" + to + " via " + via);
434 * Construct a hop entry
436 * @param from The FQDN of the node with the data to be delivered
437 * @param to The FQDN of the node that will deliver to the subscriber
438 * @param via The FQDN of the node where the from node should send the data
440 public ProvHop(String from, String to, String via) {
449 public String getFrom() {
456 public String getTo() {
461 * Get the next intermediate node
463 public String getVia() {
468 private static class Redirection {
475 private static class Feed {
479 SubnetMatcher[] subnets;
480 Hashtable<String, String> authusers = new Hashtable<String, String>();
481 Redirection[] redirections;
485 private Hashtable<String, String> params = new Hashtable<>();
486 private Hashtable<String, Feed> feeds = new Hashtable<>();
487 private Hashtable<String, DestInfo> nodeinfo = new Hashtable<>();
488 private Hashtable<String, DestInfo> subinfo = new Hashtable<>();
489 private Hashtable<String, IsFrom> nodes = new Hashtable<>();
490 private Hashtable<String, ProvSubscription> provSubscriptions = new Hashtable<>();
491 private String myname;
492 private String myauth;
493 private DestInfo[] alldests;
497 * Process the raw provisioning data to configure this node
499 * @param pd The parsed provisioning data
500 * @param myname My name as seen by external systems
501 * @param spooldir The directory where temporary files live
502 * @param port The port number for URLs
503 * @param nodeauthkey The keying string used to generate node authentication credentials
505 public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) {
506 this.myname = myname;
507 for (ProvParam p : pd.getParams()) {
508 params.put(p.getName(), p.getValue());
510 Vector<DestInfo> destInfos = new Vector<>();
511 myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);
512 for (ProvNode pn : pd.getNodes()) {
513 String cn = pn.getCName();
514 if (nodeinfo.get(cn) != null) {
517 String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey);
518 DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn,
519 "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true, false, false);
520 (new File(di.getSpool())).mkdirs();
522 nodeinfo.put(cn, di);
523 nodes.put(auth, new IsFrom(cn));
525 PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[nodeinfo.size()]), pd.getHops());
526 Hashtable<String, Vector<Redirection>> rdtab = new Hashtable<String, Vector<Redirection>>();
527 for (ProvForceIngress pfi : pd.getForceIngress()) {
528 Vector<Redirection> v = rdtab.get(pfi.getFeedId());
530 v = new Vector<Redirection>();
531 rdtab.put(pfi.getFeedId(), v);
533 Redirection r = new Redirection();
534 if (pfi.getSubnet() != null) {
535 r.snm = new SubnetMatcher(pfi.getSubnet());
537 r.user = pfi.getUser();
538 r.nodes = pfi.getNodes();
541 Hashtable<String, Hashtable<String, String>> pfutab = new Hashtable<String, Hashtable<String, String>>();
542 for (ProvFeedUser pfu : pd.getFeedUsers()) {
543 Hashtable<String, String> t = pfutab.get(pfu.getFeedId());
545 t = new Hashtable<String, String>();
546 pfutab.put(pfu.getFeedId(), t);
548 t.put(pfu.getCredentials(), pfu.getUser());
550 Hashtable<String, String> egrtab = new Hashtable<String, String>();
551 for (ProvForceEgress pfe : pd.getForceEgress()) {
552 if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) {
555 egrtab.put(pfe.getSubId(), pfe.getNode());
557 Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<>();
558 for (ProvFeedSubnet pfs : pd.getFeedSubnets()) {
559 Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId());
561 v = new Vector<SubnetMatcher>();
562 pfstab.put(pfs.getFeedId(), v);
564 v.add(new SubnetMatcher(pfs.getCidr()));
566 Hashtable<String, StringBuffer> feedTargets = new Hashtable<>();
567 HashSet<String> allfeeds = new HashSet<>();
568 for (ProvFeed pfx : pd.getFeeds()) {
569 if (pfx.getStatus() == null) {
570 allfeeds.add(pfx.getId());
573 for (ProvSubscription provSubscription : pd.getSubscriptions()) {
574 String subId = provSubscription.getSubId();
575 String feedId = provSubscription.getFeedId();
576 if (!allfeeds.contains(feedId)) {
579 if (subinfo.get(subId) != null) {
584 sididx = Integer.parseInt(subId);
585 sididx -= sididx % 100;
586 } catch (Exception e) {
588 String subscriptionDirectory = sididx + "/" + subId;
589 DestInfo destinationInfo = new DestInfo("s:" + subId,
590 spooldir + "/s/" + subscriptionDirectory, provSubscription);
591 (new File(destinationInfo.getSpool())).mkdirs();
592 destInfos.add(destinationInfo);
593 provSubscriptions.put(subId, provSubscription);
594 subinfo.put(subId, destinationInfo);
595 String egr = egrtab.get(subId);
597 subId = pf.getPath(egr) + subId;
599 StringBuffer sb = feedTargets.get(feedId);
601 sb = new StringBuffer();
602 feedTargets.put(feedId, sb);
604 sb.append(' ').append(subId);
606 alldests = destInfos.toArray(new DestInfo[destInfos.size()]);
607 for (ProvFeed pfx : pd.getFeeds()) {
608 String fid = pfx.getId();
609 Feed f = feeds.get(fid);
615 f.loginfo = pfx.getLogData();
616 f.status = pfx.getStatus();
617 Vector<SubnetMatcher> v1 = pfstab.get(fid);
619 f.subnets = new SubnetMatcher[0];
621 f.subnets = v1.toArray(new SubnetMatcher[v1.size()]);
623 Hashtable<String, String> h1 = pfutab.get(fid);
625 h1 = new Hashtable<String, String>();
628 Vector<Redirection> v2 = rdtab.get(fid);
630 f.redirections = new Redirection[0];
632 f.redirections = v2.toArray(new Redirection[v2.size()]);
634 StringBuffer sb = feedTargets.get(fid);
636 f.targets = new Target[0];
638 f.targets = parseRouting(sb.toString());
644 * Parse a target string into an array of targets
646 * @param routing Target string
647 * @return Array of targets.
649 public Target[] parseRouting(String routing) {
650 routing = routing.trim();
651 if ("".equals(routing)) {
652 return (new Target[0]);
654 String[] xx = routing.split("\\s+");
655 Hashtable<String, Target> tmap = new Hashtable<String, Target>();
656 HashSet<String> subset = new HashSet<String>();
657 Vector<Target> tv = new Vector<Target>();
658 Target[] ret = new Target[xx.length];
659 for (int i = 0; i < xx.length; i++) {
661 int j = t.indexOf('/');
663 DestInfo di = subinfo.get(t);
665 tv.add(new Target(null, t));
667 if (!subset.contains(t)) {
669 tv.add(new Target(di, null));
673 String node = t.substring(0, j);
674 String rtg = t.substring(j + 1);
675 DestInfo di = nodeinfo.get(node);
677 tv.add(new Target(null, t));
679 Target tt = tmap.get(node);
681 tt = new Target(di, rtg);
690 return (tv.toArray(new Target[tv.size()]));
694 * Check whether this is a valid node-to-node transfer
696 * @param credentials Credentials offered by the supposed node
697 * @param ip IP address the request came from
699 public boolean isAnotherNode(String credentials, String ip) {
700 IsFrom n = nodes.get(credentials);
701 return (n != null && n.isFrom(ip));
705 * Check whether publication is allowed.
707 * @param feedid The ID of the feed being requested.
708 * @param credentials The offered credentials
709 * @param ip The requesting IP address
711 public String isPublishPermitted(String feedid, String credentials, String ip) {
712 Feed f = feeds.get(feedid);
713 String nf = "Feed does not exist";
720 String user = f.authusers.get(credentials);
722 return ("Publisher not permitted for this feed");
724 if (f.subnets.length == 0) {
727 byte[] addr = NodeUtils.getInetAddress(ip);
728 for (SubnetMatcher snm : f.subnets) {
729 if (snm.matches(addr)) {
733 return ("Publisher not permitted for this feed");
737 * Check whether delete file is allowed.
739 * @param subId The ID of the subscription being requested.
741 public boolean isDeletePermitted(String subId) {
742 ProvSubscription provSubscription = provSubscriptions.get(subId);
743 return provSubscription.isPrivilegedSubscriber();
747 * Get authenticated user
749 public String getAuthUser(String feedid, String credentials) {
750 return (feeds.get(feedid).authusers.get(credentials));
754 * Check if the request should be redirected to a different ingress node
756 public String getIngressNode(String feedid, String user, String ip) {
757 Feed f = feeds.get(feedid);
758 if (f.redirections.length == 0) {
761 byte[] addr = NodeUtils.getInetAddress(ip);
762 for (Redirection r : f.redirections) {
763 if (r.user != null && !user.equals(r.user)) {
766 if (r.snm != null && !r.snm.matches(addr)) {
769 for (String n : r.nodes) {
770 if (myname.equals(n)) {
774 if (r.nodes.length == 0) {
777 return (r.nodes[rrcntr++ % r.nodes.length]);
783 * Get a provisioned configuration parameter
785 public String getProvParam(String name) {
786 return (params.get(name));
790 * Get all the DestInfos
792 public DestInfo[] getAllDests() {
797 * Get the targets for a feed
799 * @param feedid The feed ID
800 * @return The targets this feed should be delivered to
802 public Target[] getTargets(String feedid) {
803 if (feedid == null) {
804 return (new Target[0]);
806 Feed f = feeds.get(feedid);
808 return (new Target[0]);
814 * Get the feed ID for a subscription
816 * @param subid The subscription ID
817 * @return The feed ID
819 public String getFeedId(String subid) {
820 DestInfo di = subinfo.get(subid);
824 return (di.getLogData());
828 * Get the spool directory for a subscription
830 * @param subid The subscription ID
831 * @return The spool directory
833 public String getSpoolDir(String subid) {
834 DestInfo di = subinfo.get(subid);
838 return (di.getSpool());
842 * Get the Authorization value this node uses
844 * @return The Authorization header value for this node
846 public String getMyAuth() {