[DMAAP-48] Initial code import 95/6695/3
authorsg481n <sg481n@att.com>
Thu, 3 Aug 2017 21:56:38 +0000 (17:56 -0400)
committerSai Gandham <sg481n@att.com>
Fri, 4 Aug 2017 15:41:45 +0000 (15:41 +0000)
Change-Id: I3e65371093487d7de167ec6c29f327f366f1e299
Signed-off-by: sg481n <sg481n@att.com>
200 files changed:
Contributing.txt [new file with mode: 0644]
Jenkinsfile [new file with mode: 0644]
LICENSE [new file with mode: 0644]
README.md [new file with mode: 0644]
Subscriber/src/SSASubscriber.java [new file with mode: 0644]
Subscriber/src/SubscriberServlet.java [new file with mode: 0644]
Subscriber/src/log4j.properties [new file with mode: 0644]
datarouter-node/pom.xml [new file with mode: 0644]
datarouter-node/self_signed/cacerts.jks [new file with mode: 0644]
datarouter-node/self_signed/keystore.jks [new file with mode: 0644]
datarouter-node/self_signed/mykey.cer [new file with mode: 0644]
datarouter-node/self_signed/nodekey.cer [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/ProvData.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/PublishId.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/RateLimitedOperation.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/RedirManager.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/StatusLog.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/SubnetMatcher.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/Target.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/TaskList.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EELFFilter.java [new file with mode: 0644]
datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EelfMsgs.java [new file with mode: 0644]
datarouter-node/src/main/resources/EelfMessages.properties [new file with mode: 0644]
datarouter-node/src/main/resources/docker/Dockerfile [new file with mode: 0644]
datarouter-node/src/main/resources/docker/startup.sh [new file with mode: 0644]
datarouter-node/src/main/resources/log4j.properties [new file with mode: 0644]
datarouter-node/src/main/resources/log4j.properties.tmpl [new file with mode: 0644]
datarouter-node/src/main/resources/logback.xml [new file with mode: 0644]
datarouter-node/src/main/resources/misc/descriptor.xml [new file with mode: 0644]
datarouter-node/src/main/resources/misc/doaction [new file with mode: 0644]
datarouter-node/src/main/resources/misc/drtrnode [new file with mode: 0644]
datarouter-node/src/main/resources/misc/havecert.tmpl [new file with mode: 0644]
datarouter-node/src/main/resources/misc/log4j.properties.tmpl [new file with mode: 0644]
datarouter-node/src/main/resources/misc/node.properties [new file with mode: 0644]
datarouter-node/src/main/resources/misc/notes [new file with mode: 0644]
datarouter-node/src/main/resources/node.properties [new file with mode: 0644]
datarouter-prov/data/addFeed3.txt [new file with mode: 0644]
datarouter-prov/data/addSubscriber.txt [new file with mode: 0644]
datarouter-prov/pom.xml [new file with mode: 0644]
datarouter-prov/self_signed/cacerts.jks [new file with mode: 0644]
datarouter-prov/self_signed/keystore.jks [new file with mode: 0644]
datarouter-prov/self_signed/mykey.cer [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/authz/AuthorizationResponse.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/authz/AuthorizationResponseSupplement.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/authz/Authorizer.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthRespImpl.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthRespSupplementImpl.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthzResource.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/ProvAuthorizer.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/ProvDataProvider.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/package.html [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/authz/package.html [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/BaseServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/DRFeedsServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/FeedLogServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/FeedServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/GroupServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/InternalServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/LogServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/Main.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/Poker.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/ProxyServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/PublishServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/RouteServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/StatisticsServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/SubLogServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/SubscribeServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/SubscriptionServlet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/SynchronizerTask.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/BaseLogRecord.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Deleteable.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/DeliveryExtraRecord.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/DeliveryRecord.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/EgressRoute.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/EventLogRecord.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/ExpiryRecord.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Feed.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/FeedAuthorization.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/FeedEndpointID.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/FeedLinks.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Group.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/IngressRoute.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Insertable.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/JSONable.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/LOGJSONable.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Loadable.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/LogRecord.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/NetworkRoute.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/NodeClass.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Parameters.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/PubFailRecord.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/PublishRecord.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/SubDelivery.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/SubLinks.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Subscription.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Syncable.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Updateable.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/package.html [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/eelf/EelfMsgs.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/eelf/JettyFilter.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/package.html [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/DB.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/DRRouteCLI.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/JSONUtilities.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/LogfileLoader.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/PurgeLogDirTask.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/RLEBitSet.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/ThrottleFilter.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/URLUtilities.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/package.html [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/reports/DailyLatencyReport.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/reports/FeedReport.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/reports/LatencyReport.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/reports/Report.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/reports/ReportBase.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/reports/SubscriberReport.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/reports/VolumeReport.java [new file with mode: 0644]
datarouter-prov/src/main/java/com/att/research/datarouter/reports/package.html [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/CDL.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/Cookie.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/CookieList.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/HTTP.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/HTTPTokener.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/JSONArray.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/JSONException.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/JSONML.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/JSONObject.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/JSONString.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/JSONStringer.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/JSONTokener.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/JSONWriter.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/LOGJSONObject.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/None.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/XML.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/XMLTokener.java [new file with mode: 0644]
datarouter-prov/src/main/java/org/json/package.html [new file with mode: 0644]
datarouter-prov/src/main/resources/EelfMessages.properties [new file with mode: 0644]
datarouter-prov/src/main/resources/authz.jar [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/database/install_db.sql [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/docker-compose.yml [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/node_data/node.properties [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/node_data/self_signed/cacerts.jks [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/node_data/self_signed/keystore.jks [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/node_data/self_signed/mykey.cer [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/node_data/self_signed/nodekey.cer [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/prov_data/addFeed3.txt [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/prov_data/addSubscriber.txt [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/prov_data/provserver.properties [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/prov_data/self_signed/cacerts.jks [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/prov_data/self_signed/keystore.jks [new file with mode: 0644]
datarouter-prov/src/main/resources/docker-compose/prov_data/self_signed/mykey.cer [new file with mode: 0644]
datarouter-prov/src/main/resources/docker/Dockerfile [new file with mode: 0644]
datarouter-prov/src/main/resources/docker/startup.sh [new file with mode: 0644]
datarouter-prov/src/main/resources/log4j.properties [new file with mode: 0644]
datarouter-prov/src/main/resources/logback.xml [new file with mode: 0644]
datarouter-prov/src/main/resources/misc/doaction [new file with mode: 0644]
datarouter-prov/src/main/resources/misc/dr-route [new file with mode: 0644]
datarouter-prov/src/main/resources/misc/drtrprov [new file with mode: 0644]
datarouter-prov/src/main/resources/misc/havecert.tmpl [new file with mode: 0644]
datarouter-prov/src/main/resources/misc/log4j.drroute.properties [new file with mode: 0644]
datarouter-prov/src/main/resources/misc/log4j.properties.tmpl [new file with mode: 0644]
datarouter-prov/src/main/resources/misc/mysql_dr_schema.sql [new file with mode: 0644]
datarouter-prov/src/main/resources/misc/notes [new file with mode: 0644]
datarouter-prov/src/main/resources/misc/provcmd [new file with mode: 0644]
datarouter-prov/src/main/resources/misc/runreports [new file with mode: 0644]
datarouter-prov/src/main/resources/provserver.properties [new file with mode: 0644]
datarouter-prov/src/main/resources/startup.sh [new file with mode: 0644]
datarouter-prov/src/main/resources/subscriber.jar [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/AllTests.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/FillDB.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/package.html [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testBase.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testCleanup.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testDRFeedsDelete.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testDRFeedsGet.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testDRFeedsPost.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testDRFeedsPut.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testFeedDelete.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testFeedPut.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testInternalGet.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testInternalMisc.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testLogGet.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testPublish.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testRLEBitSet.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testRouteAPI.java [new file with mode: 0644]
datarouter-prov/src/test/java/datarouter/provisioning/testSubscribePost.java [new file with mode: 0644]

diff --git a/Contributing.txt b/Contributing.txt
new file mode 100644 (file)
index 0000000..d64568e
--- /dev/null
@@ -0,0 +1,35 @@
+This software is distributed under a permissive open source\r
+license to allow it to be used in any projects, whether open\r
+source or proprietary. Contributions to the project are welcome\r
+and it is important to maintain clear record of contributions \r
+and terms under which they are licensed.\r
+\r
+To indicate your acceptance of Developer's Certificate of Origin 1.1\r
+terms, please add the following line to the end of the commit message\r
+for each contribution you make to the project:\r
+\r
+Signed-off-by : Your Name <your@email.com>\r
+\r
+Developer's Certificate of Origin 1.1\r
+\r
+By making a contribution to this project, I certify that:\r
+\r
+(a) The contribution was created in whole or inpart by me and I\r
+have the right to submit it under the open source license indicated\r
+in the file: or\r
+\r
+(b) The contribution is based upon previous work that, to the best\r
+of my knowledge, is covered under an appropriate open source license\r
+and I have the right under that license to submit that work with \r
+modifications, whether created in whole or part by me, under the same\r
+open source license (unless I am permitted to submit under a different \r
+license), as indicated in the file; or\r
+\r
+(c) The contribution was provided directly to me by some other person\r
+who certified (a), (b) or (c) I have not modified it.\r
+\r
+(d) I understand and agree that this project and the contribution are\r
+public and that a record of the contribution (including all personal\r
+information I submit with it, including my sign-off)is maintained \r
+indefinitely and may be redistributed consistent with this project or\r
+the open source license(s) involved.
\ No newline at end of file
diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644 (file)
index 0000000..a8161fc
--- /dev/null
@@ -0,0 +1,24 @@
+node {\r
+    // Get the maven tool.\r
+    // ** NOTE: This 'M3' maven tool must be configured\r
+    // **       in the Jenkins global configuration.\r
+    def mvnHome = tool 'M3'\r
+    sh "echo ${mvnHome}"\r
+    \r
+    \r
+    // Mark the code checkout 'stage'....\r
+    stage 'Checkout'\r
+    // Get some code from a GitHub repository\r
+    checkout scm    \r
+   \r
+    // Mark the code build 'stage'....\r
+    stage 'Build DMAAP-DR'\r
+    // Run the maven build\r
+    //sh for unix bat for windows\r
+       \r
+       sh "${mvnHome}/bin/mvn -f datarouter-prov/pom.xml clean deploy"\r
+    sh "${mvnHome}/bin/mvn -f datarouter-node/pom.xml clean deploy"\r
+\r
+       \r
+   \r
+}\r
diff --git a/LICENSE b/LICENSE
new file mode 100644 (file)
index 0000000..2ce945c
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * * 
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ * * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..39dbca9
--- /dev/null
+++ b/README.md
@@ -0,0 +1,149 @@
+# DMAAP_DATAROUTER\r
+                              \r
+## OVERVIEW\r
+  \r
+The Data Routing System project is intended to provide a common framework by which data producers can make data available to data consumers and a way for potential consumers to find feeds with the data they require.  \r
+The delivery of data from these kinds of production systems is the domain of the Data Routing System. Its primary goal is to make it easier to move data from existing applications that may not have been designed from the ground up to share data.\r
+The Data Routing System is different from many existing platforms for distributing messages from producers to consumers which focus on real-time delivery of small messages (on the order of a few kilobytes or so) for more\r
+\r
+   Provisioning is implemented as a Java servlet running under Jetty in one JVM\r
+   \r
+   Provisioning data is stored in a MySQL database\r
+   \r
+   The backup provisioning server and each node is informed any time provisioning data changes\r
+   \r
+   The backup provisioning server and each node may request the complete set of provisioning data at any time\r
+   \r
+   A Node is implemented as a Java servlet running under Jetty in one JVM\r
+\r
+Assumptions\r
+    For 95% of all feeds (there will be some exceptions):\r
+       \r
+    Number of Publishing Endpoints per Feed: 1 – 10\r
+       \r
+    Number of Subscribers per Feed: 2 – 10\r
+       \r
+    File Size: 105 – 1010 bytes\r
+       \r
+    with a distribution towards the high end\r
+       \r
+    Frequency of Publishing: 1/day – 10/minute\r
+       \r
+    Lifetime of a Feed: months to years\r
+       \r
+    Lifetime of a Subscription: months to years\r
+       \r
\r
+Data Router and Sensitive Data Handling\r
\r
+    A publisher of a Data Router feed of sensitive (e.g., PCI, SPI, etc.) data needs to encrypt that data prior to delivering it to the Data Router\r
+       \r
+    The Data Router will distribute that data to all of the subscribers of that feed.\r
+       \r
+    Data Router does not examine the Feed content or enforce any restrictions or Validations on the Feed Content in any way\r
+       \r
+    It is the responsibility of the subscribers to work with the publisher to determine how to decrypt that data\r
+       \r
+\r
+\r
\r
+\r
+What the Data Router is NOT:\r
+\r
+    Does not support streaming data\r
+       \r
+    Does not tightly couple to any specific publish endpoint or subscriber\r
+       \r
+    Agnostic as to source and sink of data residing in an RDBMS, NoSQL DB, Other DBMS, Flat Files, etc.\r
+       \r
+    Does not transform any published data\r
+       \r
+    Does not “examine” any published data\r
+       \r
+    Does not verify the integrity of a published file\r
+       \r
+    Does not perform any data “cleansing”\r
+       \r
+    Does not store feeds (not a repository or archive)\r
+       \r
+    There is no long-term storage – assumes subscribers are responsive most of the time\r
+       \r
+    Does not encrypt data when queued on a node\r
+       \r
+    Does not provide guaranteed order of delivery\r
+       \r
+    Per-file metadata can be used for ordering\r
+       \r
+   External customers supported is via DITREX (MOTS 18274)\r
\r
\r
\r
+\r
+## BUILD  \r
\r
+Datarouter can be cloned and repository and builb using Maven \r
+In the repository \r
+\r
+Go to datarouter-prov in the root\r
+\r
+       mvn clean install\r
+       \r
+Go to datarouter-node in the root\r
+\r
+       mvn clean install\r
+        \r
+Project Build will be Successful\r
+\r
+\r
+\r
+\r
+## RUN \r
+\r
+Datarouter is a Unix based service \r
+\r
+Pre-requisites to run the service\r
+\r
+MySQL Version 5.6\r
+\r
+Java JDK 1.8\r
+\r
+Install MySQL and load needed table into the database\r
+\r
+Sample install_db.sql is provided in the datarouter-prov/data .\r
+\r
+Go to datarouter-prov module and run the service using main.java \r
\r
+Go to datarouter-node module and run the service using nodemain.java \r
+\r
+Curl Commands to test:\r
+\r
+create a feed:\r
+\r
+curl -v -X POST -H "Content-Type : application/vnd.att-dr.feed" -H "X-ATT-DR-ON-BEHALF-OF: rs873m" --data-ascii @/opt/app/datartr/addFeed3.txt --post301 --location-trusted  -k https://prov.datarouternew.com:8443\r
+\r
+Subscribe to feed:\r
+\r
+curl -v -X POST -H "Content-Type: application/vnd.att-dr.subscription" -H "X-ATT-DR-ON-BEHALF-OF: rs873m" --data-ascii @/opt/app/datartr/addSubscriber.txt --post301 --location-trusted -k https://prov.datarouternew.com:8443/subscribe/1\r
+\r
+Publish to feed:\r
+\r
+curl -v -X PUT --user rs873m:rs873m -H "Content-Type: application/octet-stream" --data-binary @/opt/app/datartr/addFeed3.txt  --post301 --location-trusted -k https://prov.datarouternew.com:8443/publish/1/test1\r
+\r
+\r
\r
+\r
+ ## CONFIGURATION \r
+\r
+Recommended \r
+\r
+Environment - Unix based\r
+\r
+Java - 1.8\r
+\r
+Maven - 3.2.5 \r
+\r
+MySQL - 5.6\r
+\r
+Self Signed SSL certificates\r
\r
\r
diff --git a/Subscriber/src/SSASubscriber.java b/Subscriber/src/SSASubscriber.java
new file mode 100644 (file)
index 0000000..5ec099b
--- /dev/null
@@ -0,0 +1,115 @@
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * * 
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ * * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.servlet.*;
+import org.eclipse.jetty.util.ssl.*;
+import org.eclipse.jetty.server.*;
+import org.apache.log4j.Logger;
+
+/**
+ *     Example stand alone subscriber
+ */
+public class SSASubscriber {
+       private static final int Port = 8447;
+       private static final String KeyStoreType = "jks";
+       private static final String KeyStoreFile = "/root/sub/subscriber.jks";
+       //private static final String KeyStoreFile = "c:/tmp/subscriber.jks";
+       private static final String KeyStorePassword = "changeit";
+       private static final String KeyPassword = "changeit";
+       private static final String ContextPath = "/";
+       private static final String URLPattern = "/*";
+
+       public static void main(String[] args) throws Exception {
+               //User story # US792630  -Jetty Upgrade to 9.3.11
+               //SSASubscriber register Jetty server.
+        Server server = new Server();
+        HttpConfiguration http_config = new HttpConfiguration();
+        http_config.setSecureScheme("https");
+        http_config.setSecurePort(Port);
+        http_config.setRequestHeaderSize(8192);
+               
+        // HTTP connector
+        ServerConnector http = new ServerConnector(server,
+                new HttpConnectionFactory(http_config));
+        http.setPort(7070);
+        http.setIdleTimeout(30000);
+        
+        // SSL Context Factory
+        SslContextFactory sslContextFactory = new SslContextFactory();
+        sslContextFactory.setKeyStoreType(KeyStoreType);
+        sslContextFactory.setKeyStorePath(KeyStoreFile);
+        sslContextFactory.setKeyStorePassword(KeyStorePassword);
+        sslContextFactory.setKeyManagerPassword(KeyPassword);
+        
+        // sslContextFactory.setTrustStorePath(ncm.getKSFile());
+        // sslContextFactory.setTrustStorePassword("changeit");
+        sslContextFactory.setExcludeCipherSuites("SSL_RSA_WITH_DES_CBC_SHA",
+                "SSL_DHE_RSA_WITH_DES_CBC_SHA", "SSL_DHE_DSS_WITH_DES_CBC_SHA",
+                "SSL_RSA_EXPORT_WITH_RC4_40_MD5",
+                "SSL_RSA_EXPORT_WITH_DES40_CBC_SHA",
+                "SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA",
+                "SSL_DHE_DSS_EXPORT_WITH_DES40_CBC_SHA");
+
+        // SSL HTTP Configuration
+        HttpConfiguration https_config = new HttpConfiguration(http_config);
+        https_config.addCustomizer(new SecureRequestCustomizer());
+
+        // SSL Connector
+        ServerConnector sslConnector = new ServerConnector(server,
+            new SslConnectionFactory(sslContextFactory,HttpVersion.HTTP_1_1.asString()),
+            new HttpConnectionFactory(https_config));
+        sslConnector.setPort(Port);
+        server.addConnector(sslConnector);
+        
+       /**Skip SSLv3 Fixes*/
+        sslContextFactory.addExcludeProtocols("SSLv3");
+        System.out.println("Excluded protocols SSASubscriber-"+sslContextFactory.getExcludeProtocols().toString());  
+               /**End of SSLv3 Fixes*/
+        
+        // HTTPS Configuration
+        ServerConnector https = new ServerConnector(server,
+            new SslConnectionFactory(sslContextFactory,HttpVersion.HTTP_1_1.asString()),
+                new HttpConnectionFactory(https_config));
+        https.setPort(Port);
+        https.setIdleTimeout(30000);
+        //server.setConnectors(new Connector[] { http, https });
+        server.setConnectors(new Connector[] {  http });
+               ServletContextHandler ctxt = new ServletContextHandler(0);
+               ctxt.setContextPath(ContextPath);
+               server.setHandler(ctxt);
+               
+               ctxt.addServlet(new ServletHolder(new SubscriberServlet()), "/*");
+               
+               try { 
+                   server.start();
+               } catch ( Exception e ) { 
+                       System.out.println("Jetty failed to start. Reporting will we unavailable-"+e);
+               };
+        server.join();
+        
+        System.out.println("Subscriber started-"+ server.getState());  
+
+       }
+}
\ No newline at end of file
diff --git a/Subscriber/src/SubscriberServlet.java b/Subscriber/src/SubscriberServlet.java
new file mode 100644 (file)
index 0000000..1af62a6
--- /dev/null
@@ -0,0 +1,149 @@
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * * 
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ * * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URLEncoder;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.log4j.Logger;
+
+/**
+ *     Example stand alone subscriber servlet with Authorization header checking
+ */
+public class SubscriberServlet extends HttpServlet     {
+       private static Logger logger = Logger.getLogger("com.att.datarouter.pubsub.ssasubscribe.SubscriberServlet");
+       private String Login = "LOGIN";
+       private String Password = "PASSWORD";
+       private String OutputDirectory = "/root/sub/received";
+
+       private String auth;
+
+       private static String gp(ServletConfig config, String param, String deflt) {
+               param = config.getInitParameter(param);
+               if (param == null || param.length() == 0) {
+                       param = deflt;
+               }
+               return(param);
+       }
+       /**
+        *      Configure this subscriberservlet.  Configuration parameters from config.getInitParameter() are:
+        *      <ul>
+        *      <li>Login - The login expected in the Authorization header (default "LOGIN").
+        *      <li>Password - The password expected in the Authorization header (default "PASSWORD").
+        *      <li>OutputDirectory - The directory where files are placed (default "received").
+        *      </ul>
+        */
+       public void init(ServletConfig config) throws ServletException {
+               Login = gp(config, "Login", Login);
+               Password = gp(config, "Password", Password);
+               OutputDirectory = gp(config, "OutputDirectory", OutputDirectory);
+               (new File(OutputDirectory)).mkdirs();
+               auth = "Basic " + Base64.encodeBase64String((Login + ":" + Password).getBytes());
+       }
+       /**
+        *      Invoke common(req, resp, false).
+        */
+       protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+               common(req, resp, false);
+       }
+       /**
+        *      Invoke common(req, resp, true).
+        */
+       protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+               common(req, resp, true);
+       }
+       /**
+        *      Process a PUT or DELETE request.
+        *      <ol>
+        *      <li>Verify that the request contains an Authorization header
+        *      or else UNAUTHORIZED.
+        *      <li>Verify that the Authorization header matches the configured
+        *      Login and Password or else FORBIDDEN.
+        *      <li>If the request is PUT, store the message body as a file
+        *      in the configured OutputDirectory directory protecting against
+        *      evil characters in the received FileID.  The file is created
+        *      initially with its name prefixed with a ".", and once it is complete, it is
+        *      renamed to remove the leading "." character.
+        *      <li>If the request is DELETE, instead delete the file (if it exists) from the configured OutputDirectory directory.
+        *      <li>Respond with NO_CONTENT.
+        *      </ol>
+        */
+       protected void common(HttpServletRequest req, HttpServletResponse resp, boolean isdelete) throws ServletException, IOException {
+               String ah = req.getHeader("Authorization");
+               if (ah == null) {
+                       logger.info("Rejecting request with no Authorization header from " + req.getRemoteAddr() + ": " + req.getPathInfo());
+                       resp.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+                       return;
+               }
+               if (!auth.equals(ah)) {
+                       logger.info("Rejecting request with incorrect Authorization header from " + req.getRemoteAddr() + ": " + req.getPathInfo());
+                       resp.sendError(HttpServletResponse.SC_FORBIDDEN);
+                       return;
+               }
+               String fileid = req.getPathInfo();
+               fileid = fileid.substring(fileid.lastIndexOf('/') + 1);
+               String qs = req.getQueryString();
+               if (qs != null) {
+                       fileid = fileid + "?" + qs;
+               }
+               String publishid = req.getHeader("X-ATT-DR-PUBLISH-ID");
+               String filename = URLEncoder.encode(fileid, "UTF-8").replaceAll("^\\.", "%2E").replaceAll("\\*", "%2A");
+               String finalname = OutputDirectory + "/" + filename;
+               String tmpname = OutputDirectory + "/." + filename;
+               try {
+                       if (isdelete) {
+                               (new File(finalname)).delete();
+                               logger.info("Received delete for file id " + fileid + " from " + req.getRemoteAddr() + " publish id " + publishid + " as " + finalname);
+                       } else {
+                               InputStream is = req.getInputStream();
+                               OutputStream os = new FileOutputStream(tmpname);
+                               byte[] buf = new byte[65536];
+                               int i;
+                               while ((i = is.read(buf)) > 0) {
+                                       os.write(buf, 0, i);
+                               }
+                               is.close();
+                               os.close();
+                               (new File(tmpname)).renameTo(new File(finalname));
+                               logger.info("Received file id " + fileid + " from " + req.getRemoteAddr() + " publish id " + publishid + " as " + finalname);
+                               resp.setStatus(HttpServletResponse.SC_NO_CONTENT);
+                               logger.info("Received file id " + fileid + " from " + req.getRemoteAddr() + " publish id " + publishid + " as " + finalname);
+                       }
+                       resp.setStatus(HttpServletResponse.SC_NO_CONTENT);
+               } catch (IOException ioe) {
+                       (new File(tmpname)).delete();
+                       logger.info("Failure to save file " + finalname + " from " + req.getRemoteAddr() + ": " + req.getPathInfo(), ioe);
+                       throw ioe;
+               }
+       }
+}
diff --git a/Subscriber/src/log4j.properties b/Subscriber/src/log4j.properties
new file mode 100644 (file)
index 0000000..8c12d5c
--- /dev/null
@@ -0,0 +1,9 @@
+log4j.debug=FALSE\r
+log4j.rootLogger=INFO,Root\r
+\r
+log4j.appender.Root=org.apache.log4j.DailyRollingFileAppender\r
+log4j.appender.Root.file=/opt/app/datartr/logs/subscriber.log\r
+log4j.appender.Root.datePattern='.'yyyyMMdd\r
+log4j.appender.Root.append=true\r
+log4j.appender.Root.layout=org.apache.log4j.PatternLayout\r
+log4j.appender.Root.layout.ConversionPattern=%d %p %t %m%n\r
diff --git a/datarouter-node/pom.xml b/datarouter-node/pom.xml
new file mode 100644 (file)
index 0000000..b2b798b
--- /dev/null
@@ -0,0 +1,472 @@
+<!--\r
+  ============LICENSE_START==================================================\r
+  * org.onap.dmaap\r
+  * ===========================================================================\r
+  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+  * ===========================================================================\r
+  * Licensed under the Apache License, Version 2.0 (the "License");\r
+  * you may not use this file except in compliance with the License.\r
+  * You may obtain a copy of the License at\r
+  * \r
+   *      http://www.apache.org/licenses/LICENSE-2.0\r
+  * \r
+   * Unless required by applicable law or agreed to in writing, software\r
+  * distributed under the License is distributed on an "AS IS" BASIS,\r
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+  * See the License for the specific language governing permissions and\r
+  * limitations under the License.\r
+  * ============LICENSE_END====================================================\r
+  *\r
+  * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+  *\r
+-->\r
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\r
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
+       <modelVersion>4.0.0</modelVersion>\r
+\r
+       <groupId>com.att.datarouter-node</groupId>\r
+       <artifactId>datarouter-node</artifactId>\r
+       <version>0.0.1-SNAPSHOT</version>\r
+       <packaging>jar</packaging>\r
+\r
+       <name>datarouter-node</name>\r
+       <url>https://github.com/att/DMAAP_DATAROUTER</url>\r
+    <licenses>\r
+               <license>\r
+               <name>BSD License</name>\r
+               <url> </url>\r
+               </license>\r
+       </licenses>\r
+\r
+\r
+       <properties>\r
+               <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>\r
+               <maven.compiler.source>1.8</maven.compiler.source>\r
+               <maven.compiler.target>1.8</maven.compiler.target>\r
+               <dockerLocation>${basedir}/target/</dockerLocation>\r
+               <docker.registry>hub.docker.com</docker.registry>\r
+       </properties>\r
+\r
+       <dependencies>\r
+               <dependency>\r
+                       <groupId>junit</groupId>\r
+                       <artifactId>junit</artifactId>\r
+                       <version>3.8.1</version>\r
+                       <scope>test</scope>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>org.json</groupId>\r
+                       <artifactId>json</artifactId>\r
+                       <version>20160810</version>\r
+               </dependency>\r
+\r
+               <dependency>\r
+                       <groupId>javax.mail</groupId>\r
+                       <artifactId>javax.mail-api</artifactId>\r
+                       <version>1.5.1</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>com.att.eelf</groupId>\r
+                       <artifactId>eelf-core</artifactId>\r
+                       <version>0.0.1</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>javax.servlet</groupId>\r
+                       <artifactId>servlet-api</artifactId>\r
+                       <version>2.5</version>\r
+               </dependency>\r
+\r
+               <dependency>\r
+                       <groupId>org.eclipse.jetty</groupId>\r
+                       <artifactId>jetty-server</artifactId>\r
+                       <version>7.6.14.v20131031</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>org.eclipse.jetty</groupId>\r
+                       <artifactId>jetty-continuation</artifactId>\r
+                       <version>7.6.14.v20131031</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>org.eclipse.jetty</groupId>\r
+                       <artifactId>jetty-util</artifactId>\r
+                       <version>7.6.14.v20131031</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>org.eclipse.jetty</groupId>\r
+                       <artifactId>jetty-deploy</artifactId>\r
+                       <version>7.6.14.v20131031</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>org.eclipse.jetty</groupId>\r
+                       <artifactId>jetty-servlet</artifactId>\r
+                       <version>7.6.14.v20131031</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>org.eclipse.jetty</groupId>\r
+                       <artifactId>jetty-servlets</artifactId>\r
+                       <version>7.6.14.v20131031</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>org.eclipse.jetty</groupId>\r
+                       <artifactId>jetty-http</artifactId>\r
+                       <version>7.6.14.v20131031</version>\r
+               </dependency>\r
+\r
+               <dependency>\r
+                       <groupId>org.eclipse.jetty</groupId>\r
+                       <artifactId>jetty-security</artifactId>\r
+                       <version>7.6.14.v20131031</version>\r
+               </dependency>\r
+\r
+               <dependency>\r
+                       <groupId>org.eclipse.jetty</groupId>\r
+                       <artifactId>jetty-websocket</artifactId>\r
+                       <version>7.6.14.v20131031</version>\r
+               </dependency>\r
+\r
+               <dependency>\r
+                       <groupId>org.eclipse.jetty</groupId>\r
+                       <artifactId>jetty-io</artifactId>\r
+                       <version>7.6.14.v20131031</version>\r
+               </dependency>\r
+\r
+               <dependency>\r
+                       <groupId>org.apache.commons</groupId>\r
+                       <artifactId>commons-io</artifactId>\r
+                       <version>1.3.2</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>commons-lang</groupId>\r
+                       <artifactId>commons-lang</artifactId>\r
+                       <version>2.4</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>commons-io</groupId>\r
+                       <artifactId>commons-io</artifactId>\r
+                       <version>2.1</version>\r
+                       <scope>compile</scope>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>org.apache.httpcomponents</groupId>\r
+                       <artifactId>httpcore</artifactId>\r
+                       <version>4.2.2</version>\r
+               </dependency>\r
+\r
+               <dependency>\r
+                       <groupId>commons-codec</groupId>\r
+                       <artifactId>commons-codec</artifactId>\r
+                       <version>1.6</version>\r
+               </dependency>\r
+\r
+               <dependency>\r
+                       <groupId>org.mozilla</groupId>\r
+                       <artifactId>rhino</artifactId>\r
+                       <version>1.7R3</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>org.apache.james</groupId>\r
+                       <artifactId>apache-mime4j-core</artifactId>\r
+                       <version>0.7</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>org.apache.httpcomponents</groupId>\r
+                       <artifactId>httpclient</artifactId>\r
+                       <version>4.2.3</version>\r
+               </dependency>\r
+               <dependency>\r
+                       <groupId>org.sonatype.http-testing-harness</groupId>\r
+                       <artifactId>junit-runner</artifactId>\r
+                       <version>0.11</version>\r
+               </dependency>\r
+\r
+\r
+               <dependency>\r
+                       <groupId>log4j</groupId>\r
+                       <artifactId>log4j</artifactId>\r
+                       <version>1.2.17</version>\r
+                       <scope>compile</scope>\r
+               </dependency>\r
+       </dependencies>\r
+\r
+       <build>\r
+               <finalName>datarouter-node</finalName>\r
+               <resources>\r
+                       <resource>\r
+                               <directory>src/main/resources</directory>\r
+                               <filtering>true</filtering>\r
+                               <includes>\r
+                                       <include>**/*.properties</include>\r
+                               </includes>\r
+                       </resource>\r
+                       <resource>\r
+                               <directory>src/main/resources</directory>\r
+                               <filtering>true</filtering>\r
+                               <includes>\r
+                                       <include>**/EelfMessages.properties</include>\r
+                               </includes>\r
+                       </resource>\r
+                       <resource>\r
+                               <directory>src/main/resources</directory>\r
+                               <filtering>true</filtering>\r
+                               <includes>\r
+                                       <include>**/log4j.properties</include>\r
+                               </includes>\r
+                       </resource>\r
+\r
+               </resources>\r
+               <plugins>\r
+                       <plugin>\r
+                               <groupId>org.apache.maven.plugins</groupId>\r
+                               <artifactId>maven-compiler-plugin</artifactId>\r
+                               <configuration>\r
+                                       <archive>\r
+                                               <manifest>\r
+                                                       <mainClass>com.att.research.datarouter.node.NodeMain</mainClass>\r
+\r
+                                               </manifest>\r
+                                       </archive>\r
+\r
+                                       <source>1.8</source>\r
+                                       <target>1.8</target>\r
+                               </configuration>\r
+                               <version>3.6.0</version>\r
+                       </plugin>\r
+                       <plugin>\r
+                               <artifactId>maven-assembly-plugin</artifactId>\r
+                               <version>2.4</version>\r
+                               <configuration>\r
+                                       <descriptorRefs>\r
+                                               <descriptorRef>jar-with-dependencies</descriptorRef>\r
+                                       </descriptorRefs>\r
+                                       <outputDirectory>${basedir}/target/opt/app/datartr/lib</outputDirectory>\r
+                                       <archive>\r
+\r
+                                               <manifest>\r
+                                                       <addClasspath>true</addClasspath>\r
+                                                       <mainClass>com.att.research.datarouter.node.NodeMain</mainClass>\r
+                                               </manifest>\r
+                                       </archive>\r
+                               </configuration>\r
+\r
+                               <executions>\r
+                                       <execution>\r
+                                               <id>make-assembly</id> <!-- this is used for inheritance merges -->\r
+                                               <phase>package</phase> <!-- bind to the packaging phase -->\r
+                                               <goals>\r
+                                                       <goal>single</goal>\r
+                                               </goals>\r
+                                       </execution>\r
+                               </executions>\r
+                       </plugin>\r
+                       <plugin>\r
+                               <groupId>org.apache.maven.plugins</groupId>\r
+                               <artifactId>maven-resources-plugin</artifactId>\r
+                               <version>2.7</version>\r
+                               <executions>\r
+                                       <execution>\r
+                                               <id>copy-docker-file</id>\r
+                                               <phase>package</phase>\r
+                                               <goals>\r
+                                                       <goal>copy-resources</goal>\r
+                                               </goals>\r
+                                               <configuration>\r
+                                                       <outputDirectory>${dockerLocation}</outputDirectory>\r
+                                                       <overwrite>true</overwrite>\r
+                                                       <resources>\r
+                                                               <resource>\r
+                                                                       <directory>${basedir}/src/main/resources/docker</directory>\r
+                                                                       <filtering>true</filtering>\r
+                                                                       <includes>\r
+                                                                               <include>**/*</include>\r
+                                                                       </includes>\r
+                                                               </resource>\r
+                                                       </resources>\r
+                                               </configuration>\r
+                                       </execution>\r
+                                       <execution>\r
+                                               <id>copy-resources</id>\r
+                                               <phase>validate</phase>\r
+                                               <goals>\r
+                                                       <goal>copy-resources</goal>\r
+                                               </goals>\r
+                                               <configuration>\r
+                                                       <outputDirectory>${basedir}/target/opt/app/datartr/etc</outputDirectory>\r
+                                                       <resources>\r
+                                                               <resource>\r
+                                                                       <directory>${basedir}/src/main/resources</directory>\r
+                                                                       <includes>\r
+                                                                               <include>misc/**</include>\r
+                                                                               <include>**/**</include>\r
+                                                                       </includes>\r
+                                                               </resource>\r
+                                                       </resources>\r
+                                               </configuration>\r
+                                       </execution>\r
+                                       <execution>\r
+        <id>copy-resources-1</id>\r
+        <phase>validate</phase>\r
+        <goals>\r
+          <goal>copy-resources</goal>\r
+        </goals>\r
+        <configuration>\r
+          <outputDirectory>${basedir}/target/opt/app/datartr/self_signed</outputDirectory>\r
+          <resources>\r
+            <resource>\r
+                        <directory>${basedir}/self_signed</directory>\r
+                        <includes>\r
+                            <include>misc/**</include>\r
+                            <include>**/**</include>\r
+                        </includes>\r
+                    </resource>\r
+          </resources>\r
+        </configuration>\r
+      </execution>\r
+                               </executions>\r
+                       </plugin>\r
+                       <plugin>\r
+                               <groupId>com.spotify</groupId>\r
+                               <artifactId>docker-maven-plugin</artifactId>\r
+                               <version>0.4.11</version>\r
+                               <configuration>\r
+                                       <imageName>datarouter-node</imageName>\r
+                                       <dockerDirectory>${dockerLocation}</dockerDirectory>\r
+                                       <serverId>docker-hub</serverId>\r
+                                       <registryUrl>https://${docker.registry}</registryUrl>\r
+                                       <imageTags>\r
+                                               <imageTag>${project.version}</imageTag>\r
+                                               <imageTag>latest</imageTag>\r
+                                       </imageTags>\r
+                                       <forceTags>true</forceTags>\r
+                               </configuration>\r
+                       </plugin>\r
+\r
+                       <plugin>\r
+                               <groupId>org.apache.maven.plugins</groupId>\r
+                               <artifactId>maven-dependency-plugin</artifactId>\r
+                               <version>2.10</version>\r
+                               <executions>\r
+                                       <execution>\r
+                                               <id>copy-dependencies</id>\r
+                                               <phase>package</phase>\r
+                                               <goals>\r
+                                                       <goal>copy-dependencies</goal>\r
+                                               </goals>\r
+                                               <configuration>\r
+                                                       <outputDirectory>${project.build.directory}/opt/app/datartr/lib</outputDirectory>\r
+                                                       <overWriteReleases>false</overWriteReleases>\r
+                                                       <overWriteSnapshots>false</overWriteSnapshots>\r
+                                                       <overWriteIfNewer>true</overWriteIfNewer>\r
+                                               </configuration>\r
+                                       </execution>\r
+                               </executions>\r
+                       </plugin>\r
+                                                                               <plugin>\r
+                       <groupId>org.apache.maven.plugins</groupId>\r
+                       <artifactId>maven-javadoc-plugin</artifactId>\r
+                       <configuration>\r
+                       <failOnError>false</failOnError>\r
+                       </configuration>\r
+                       <executions>\r
+                               <execution>\r
+                                       <id>attach-javadocs</id>\r
+                                       <goals>\r
+                                               <goal>jar</goal>\r
+                                       </goals>\r
+                               </execution>\r
+                       </executions>\r
+               </plugin> \r
+          \r
+          \r
+              <plugin>\r
+                     <groupId>org.apache.maven.plugins</groupId>\r
+                     <artifactId>maven-source-plugin</artifactId>\r
+                     <version>2.2.1</version>\r
+                     <executions>\r
+                       <execution>\r
+                         <id>attach-sources</id>\r
+                         <goals>\r
+                           <goal>jar-no-fork</goal>\r
+                         </goals>\r
+                       </execution>\r
+                     </executions>\r
+                   </plugin>\r
+       \r
+\r
+       <plugin>\r
+           <groupId>org.apache.maven.plugins</groupId>\r
+           <artifactId>maven-gpg-plugin</artifactId>\r
+           <version>1.5</version>\r
+           <executions>\r
+               <execution>\r
+                   <id>sign-artifacts</id>\r
+                   <phase>verify</phase>\r
+                   <goals>\r
+                       <goal>sign</goal>\r
+                   </goals>\r
+               </execution>\r
+           </executions>\r
+         </plugin> \r
+                       \r
+               <plugin>\r
+                       <groupId>org.sonatype.plugins</groupId>\r
+                       <artifactId>nexus-staging-maven-plugin</artifactId>\r
+                       <version>1.6.7</version>\r
+                       <extensions>true</extensions>\r
+                       <configuration>\r
+                       <serverId>ossrhdme</serverId>\r
+                       <nexusUrl>https://oss.sonatype.org/</nexusUrl>\r
+                       <autoReleaseAfterClose>true</autoReleaseAfterClose>\r
+                       </configuration>\r
+               </plugin>\r
+                       \r
+               <plugin>\r
+                               <groupId>org.codehaus.mojo</groupId>\r
+                               <artifactId>cobertura-maven-plugin</artifactId>\r
+                               <version>2.7</version>\r
+                               <configuration>\r
+                                       <formats>\r
+                                       <format>html</format>\r
+                                       <format>xml</format>\r
+                                 </formats>\r
+                               </configuration>\r
+                       </plugin>\r
+                               \r
+        <plugin>\r
+               <groupId>com.blackducksoftware.integration</groupId>\r
+               <artifactId>hub-maven-plugin</artifactId>\r
+               <version>1.0.4</version>\r
+                  <inherited>false</inherited>\r
+               <configuration>\r
+                  <target>${project.basedir}</target>\r
+               </configuration>\r
+              <executions>\r
+              <execution>\r
+                 <id>create-bdio-file</id>\r
+                 <phase>package</phase>\r
+              <goals>\r
+               <goal>createHubOutput</goal>\r
+              </goals>\r
+             </execution>\r
+            </executions>\r
+        </plugin>\r
+               </plugins>\r
+       </build>\r
+       \r
+       <distributionManagement>\r
+               <snapshotRepository>\r
+                       <id>ossrhdme</id>\r
+                       <url>https://oss.sonatype.org/content/repositories/snapshots</url>\r
+               </snapshotRepository>\r
+               <repository>\r
+                       <id>ossrhdme</id>\r
+                       <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>\r
+               </repository>\r
+       </distributionManagement>\r
+       \r
+       <scm>\r
+               <connection>https://github.com/att/DMAAP_DATAROUTER.git</connection>\r
+               <developerConnection>${project.scm.connection}</developerConnection>\r
+               <url>https://github.com/att/DMAAP_DATAROUTER/tree/master</url>\r
+       </scm>\r
+       \r
+</project>\r
diff --git a/datarouter-node/self_signed/cacerts.jks b/datarouter-node/self_signed/cacerts.jks
new file mode 100644 (file)
index 0000000..dfd8143
Binary files /dev/null and b/datarouter-node/self_signed/cacerts.jks differ
diff --git a/datarouter-node/self_signed/keystore.jks b/datarouter-node/self_signed/keystore.jks
new file mode 100644 (file)
index 0000000..e5a4e78
Binary files /dev/null and b/datarouter-node/self_signed/keystore.jks differ
diff --git a/datarouter-node/self_signed/mykey.cer b/datarouter-node/self_signed/mykey.cer
new file mode 100644 (file)
index 0000000..2a5c9d7
Binary files /dev/null and b/datarouter-node/self_signed/mykey.cer differ
diff --git a/datarouter-node/self_signed/nodekey.cer b/datarouter-node/self_signed/nodekey.cer
new file mode 100644 (file)
index 0000000..4cdfdfe
Binary files /dev/null and b/datarouter-node/self_signed/nodekey.cer differ
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java
new file mode 100644 (file)
index 0000000..d0e88ec
--- /dev/null
@@ -0,0 +1,253 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+import java.util.*;\r
+import java.io.*;\r
+import org.apache.log4j.Logger;\r
+\r
+/**\r
+ *     Main control point for delivering files to destinations.\r
+ *     <p>\r
+ *     The Delivery class manages assignment of delivery threads to delivery\r
+ *     queues and creation and destruction of delivery queues as\r
+ *     configuration changes.  DeliveryQueues are assigned threads based on a\r
+ *     modified round-robin approach giving priority to queues with more work\r
+ *     as measured by both bytes to deliver and files to deliver and lower\r
+ *     priority to queues that already have delivery threads working.\r
+ *     A delivery thread continues to work for a delivery queue as long as\r
+ *     that queue has more files to deliver.\r
+ */\r
+public class Delivery {\r
+       private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.Delivery");\r
+       private static class DelItem implements Comparable<DelItem>     {\r
+               private String pubid;\r
+               private String spool;\r
+               public int compareTo(DelItem x) {\r
+                       int i = pubid.compareTo(x.pubid);\r
+                       if (i == 0) {\r
+                               i = spool.compareTo(x.spool);\r
+                       }\r
+                       return(i);\r
+               }\r
+               public String getPublishId() {\r
+                       return(pubid);\r
+               }\r
+               public String getSpool() {\r
+                       return(spool);\r
+               }\r
+               public DelItem(String pubid, String spool) {\r
+                       this.pubid = pubid;\r
+                       this.spool = spool;\r
+               }\r
+       }\r
+       private double  fdstart;\r
+       private double  fdstop;\r
+       private int     threads;\r
+       private int     curthreads;\r
+       private NodeConfigManager       config;\r
+       private Hashtable<String, DeliveryQueue>        dqs = new Hashtable<String, DeliveryQueue>();\r
+       private DeliveryQueue[] queues = new DeliveryQueue[0];\r
+       private int     qpos = 0;\r
+       private long    nextcheck;\r
+       private Runnable        cmon = new Runnable() {\r
+               public void run() {\r
+                       checkconfig();\r
+               }\r
+       };\r
+       /**\r
+        *      Constructs a new Delivery system using the specified configuration manager.\r
+        *      @param config   The configuration manager for this delivery system.\r
+        */\r
+       public Delivery(NodeConfigManager config) {\r
+               this.config = config;\r
+               config.registerConfigTask(cmon);\r
+               checkconfig();\r
+       }\r
+       private void cleardir(String dir) {\r
+               if (dqs.get(dir) != null) {\r
+                       return;\r
+               }\r
+               File fdir = new File(dir);\r
+               for (File junk: fdir.listFiles()) {\r
+                       if (junk.isFile()) {\r
+                               junk.delete();\r
+                       }\r
+               }\r
+               fdir.delete();\r
+       }\r
+       private void freeDiskCheck() {\r
+               File spoolfile = new File(config.getSpoolBase());\r
+               long tspace = spoolfile.getTotalSpace();\r
+               long start = (long)(tspace * fdstart);\r
+               long stop = (long)(tspace * fdstop);\r
+               long cur = spoolfile.getUsableSpace();\r
+               if (cur >= start) {\r
+                       return;\r
+               }\r
+               Vector<DelItem> cv = new Vector<DelItem>();\r
+               for (String sdir: dqs.keySet()) {\r
+                       for (String meta: (new File(sdir)).list()) {\r
+                               if (!meta.endsWith(".M") || meta.charAt(0) == '.') {\r
+                                       continue;\r
+                               }\r
+                               cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));\r
+                       }\r
+               }\r
+               DelItem[] items = cv.toArray(new DelItem[cv.size()]);\r
+               Arrays.sort(items);\r
+               logger.info("NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + " total=" + tspace);\r
+               for (DelItem item: items) {\r
+                       long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());\r
+                       logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");\r
+                       if (amount > 0) {\r
+                               cur += amount;\r
+                               if (cur >= stop) {\r
+                                       cur = spoolfile.getUsableSpace();\r
+                               }\r
+                               if (cur >= stop) {\r
+                                       logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);\r
+                                       return;\r
+                               }\r
+                       }\r
+               }\r
+               cur = spoolfile.getUsableSpace();\r
+               if (cur >= stop) {\r
+                       logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);\r
+                       return;\r
+               }\r
+               logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + " yellow=" + stop + " total=" + tspace);\r
+       }\r
+       private void cleardirs() {\r
+               String basedir = config.getSpoolBase();\r
+               String nbase = basedir + "/n";\r
+               for (String nodedir: (new File(nbase)).list()) {\r
+                       if (!nodedir.startsWith(".")) {\r
+                               cleardir(nbase + "/" + nodedir);\r
+                       }\r
+               }\r
+               String sxbase = basedir + "/s";\r
+               for (String sxdir: (new File(sxbase)).list()) {\r
+                       if (sxdir.startsWith(".")) {\r
+                               continue;\r
+                       }\r
+                       File sxf = new File(sxbase + "/" + sxdir);\r
+                       for (String sdir: sxf.list()) {\r
+                               if (!sdir.startsWith(".")) {\r
+                                       cleardir(sxbase + "/" + sxdir + "/" + sdir);\r
+                               }\r
+                       }\r
+                       sxf.delete();  // won't if anything still in it\r
+               }\r
+       }\r
+       private synchronized void checkconfig() {\r
+               if (!config.isConfigured()) {\r
+                       return;\r
+               }\r
+               fdstart = config.getFreeDiskStart();\r
+               fdstop = config.getFreeDiskStop();\r
+               threads = config.getDeliveryThreads();\r
+               if (threads < 1) {\r
+                       threads = 1;\r
+               }\r
+               DestInfo[] alldis = config.getAllDests();\r
+               DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];\r
+               qpos = 0;\r
+               Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();\r
+               for (DestInfo di: alldis) {\r
+                       String spl = di.getSpool();\r
+                       DeliveryQueue dq = dqs.get(spl);\r
+                       if (dq == null) {\r
+                               dq = new DeliveryQueue(config, di);\r
+                       } else {\r
+                               dq.config(di);\r
+                       }\r
+                       ndqs.put(spl, dq);\r
+                       nqs[qpos++] = dq;\r
+               }\r
+               queues = nqs;\r
+               dqs = ndqs;\r
+               cleardirs();\r
+               while (curthreads < threads) {\r
+                       curthreads++;\r
+                       (new Thread() {\r
+                               {\r
+                                       setName("Delivery Thread");\r
+                               }\r
+                               public void run() {\r
+                                       dodelivery();\r
+                               }\r
+                       }).start();\r
+               }\r
+               nextcheck = 0;\r
+               notify();\r
+       }\r
+       private void dodelivery() {\r
+               DeliveryQueue dq;\r
+               while ((dq = getNextQueue()) != null) {\r
+                       dq.run();\r
+               }\r
+       }\r
+       private synchronized DeliveryQueue getNextQueue() {\r
+               while (true) {\r
+                       if (curthreads > threads) {\r
+                               curthreads--;\r
+                               return(null);\r
+                       }\r
+                       if (qpos < queues.length) {\r
+                               DeliveryQueue dq = queues[qpos++];\r
+                               if (dq.isSkipSet()) {\r
+                                       continue;\r
+                               }\r
+                               nextcheck = 0;\r
+                               notify();\r
+                               return(dq);\r
+                       }\r
+                       long now = System.currentTimeMillis();\r
+                       if (now < nextcheck) {\r
+                               try {\r
+                                       wait(nextcheck + 500 - now);\r
+                               } catch (Exception e) {\r
+                               }\r
+                               now = System.currentTimeMillis();\r
+                       }\r
+                       if (now >= nextcheck) {\r
+                               nextcheck = now + 5000;\r
+                               qpos = 0;\r
+                               freeDiskCheck();\r
+                       }\r
+               }\r
+       }\r
+       /**\r
+        *      Reset the retry timer for a delivery queue\r
+        */\r
+       public synchronized void resetQueue(String spool) {\r
+               if (spool != null) {\r
+                       DeliveryQueue dq = dqs.get(spool);\r
+                       if (dq != null) {\r
+                               dq.resetQueue();\r
+                       }\r
+               }\r
+       }\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java
new file mode 100644 (file)
index 0000000..71c7797
--- /dev/null
@@ -0,0 +1,348 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+import java.io.*;\r
+import java.util.*;\r
+\r
+/**\r
+ *     Mechanism for monitoring and controlling delivery of files to a destination.\r
+ *     <p>\r
+ *     The DeliveryQueue class maintains lists of DeliveryTasks for a single\r
+ *     destination (a subscription or another data router node) and assigns\r
+ *     delivery threads to try to deliver them.  It also maintains a delivery\r
+ *     status that causes it to back off on delivery attempts after a failure.\r
+ *     <p>\r
+ *     If the most recent delivery result was a failure, then no more attempts\r
+ *     will be made for a period of time.  Initially, and on the first failure\r
+ *     following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds).\r
+ *     If, after this delay, additional failures occur, each failure will\r
+ *     multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a\r
+ *     maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().\r
+ *     Note that this behavior applies to the delivery queue as a whole and not\r
+ *     to individual files in the queue.  If multiple files are being\r
+ *     delivered and one fails, the delay will be started.  If a second\r
+ *     delivery fails while the delay was active, it will not change the delay\r
+ *     or change the duration of any subsequent delay.\r
+ *     If, however, it succeeds, it will cancel the delay.\r
+ *     <p>\r
+ *     The queue maintains 3 collections of files to deliver: A todo list of\r
+ *     files that will be attempted, a working set of files that are being\r
+ *     attempted, and a retry set of files that were attempted and failed.\r
+ *     Whenever the todo list is empty and needs to be refilled, a scan of the\r
+ *     spool directory is made and the file names sorted.  Any files in the working set are ignored.\r
+ *     If a DeliveryTask for the file is in the retry set, then that delivery\r
+ *     task is placed on the todo list.  Otherwise, a new DeliveryTask for the\r
+ *     file is created and placed on the todo list.\r
+ *     If, when a DeliveryTask is about to be removed from the todo list, its\r
+ *     age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead\r
+ *     marked as expired.\r
+ *     <p>\r
+ *     A delivery queue also maintains a skip flag.  This flag is true if the\r
+ *     failure timer is active or if no files are found in a directory scan.\r
+ */\r
+public class DeliveryQueue implements Runnable, DeliveryTaskHelper     {\r
+       private DeliveryQueueHelper     dqh;\r
+       private DestInfo        di;\r
+       private Hashtable<String, DeliveryTask> working = new Hashtable<String, DeliveryTask>();\r
+       private Hashtable<String, DeliveryTask> retry = new Hashtable<String, DeliveryTask>();\r
+       private int     todoindex;\r
+       private boolean failed;\r
+       private long    failduration;\r
+       private long    resumetime;\r
+       File    dir;\r
+       private Vector<DeliveryTask> todo = new Vector<DeliveryTask>();\r
+       /**\r
+        *      Try to cancel a delivery task.\r
+        *      @return The length of the task in bytes or 0 if the task cannot be cancelled.\r
+        */\r
+       public synchronized long cancelTask(String pubid) {\r
+               if (working.get(pubid) != null) {\r
+                       return(0);\r
+               }\r
+               DeliveryTask dt = retry.get(pubid);\r
+               if (dt == null) {\r
+                       for (int i = todoindex; i < todo.size(); i++) {\r
+                               DeliveryTask xdt = todo.get(i);\r
+                               if (xdt.getPublishId().equals(pubid)) {\r
+                                       dt = xdt;\r
+                                       break;\r
+                               }\r
+                       }\r
+               }\r
+               if (dt == null) {\r
+                       dt = new DeliveryTask(this, pubid);\r
+                       if (dt.getFileId() == null) {\r
+                               return(0);\r
+                       }\r
+               }\r
+               if (dt.isCleaned()) {\r
+                       return(0);\r
+               }\r
+               StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts());\r
+               dt.clean();\r
+               return(dt.getLength());\r
+       }\r
+       /**\r
+        *      Mark that a delivery task has succeeded.\r
+        */\r
+       public synchronized void markSuccess(DeliveryTask task) {\r
+               working.remove(task.getPublishId());\r
+               task.clean();\r
+               failed = false;\r
+               failduration = 0;\r
+       }\r
+       /**\r
+        *      Mark that a delivery task has expired.\r
+        */\r
+       public synchronized void markExpired(DeliveryTask task) {\r
+               task.clean();\r
+       }\r
+       /**\r
+        *      Mark that a delivery task has failed permanently.\r
+        */\r
+       public synchronized void markFailNoRetry(DeliveryTask task) {\r
+               working.remove(task.getPublishId());\r
+               task.clean();\r
+               failed = false;\r
+               failduration = 0;\r
+       }\r
+       private void fdupdate() {\r
+               if (!failed) {\r
+                       failed = true;\r
+                       if (failduration == 0) {\r
+                               failduration = dqh.getInitFailureTimer();\r
+                       }\r
+                       resumetime = System.currentTimeMillis() + failduration;\r
+                       long maxdur = dqh.getMaxFailureTimer();\r
+                       failduration = (long)(failduration * dqh.getFailureBackoff());\r
+                       if (failduration > maxdur) {\r
+                               failduration = maxdur;\r
+                       }\r
+               }\r
+       }\r
+       /**\r
+        *      Mark that a delivery task has been redirected.\r
+        */\r
+       public synchronized void markRedirect(DeliveryTask task) {\r
+               working.remove(task.getPublishId());\r
+               retry.put(task.getPublishId(), task);\r
+       }\r
+       /**\r
+        *      Mark that a delivery task has temporarily failed.\r
+        */\r
+       public synchronized void markFailWithRetry(DeliveryTask task) {\r
+               working.remove(task.getPublishId());\r
+               retry.put(task.getPublishId(), task);\r
+               fdupdate();\r
+       }\r
+       /**\r
+        *      Get the next task.\r
+        */\r
+       public synchronized DeliveryTask getNext() {\r
+               DeliveryTask ret = peekNext();\r
+               if (ret != null) {\r
+                       todoindex++;\r
+                       working.put(ret.getPublishId(), ret);\r
+               }\r
+               return(ret);\r
+       }\r
+       /**\r
+        *      Peek at the next task.\r
+        */\r
+       public synchronized DeliveryTask peekNext() {\r
+               long now = System.currentTimeMillis();\r
+               long mindate = now - dqh.getExpirationTimer();\r
+               if (failed) {\r
+                       if (now > resumetime) {\r
+                               failed = false;\r
+                       } else {\r
+                               return(null);\r
+                       }\r
+               }\r
+               while (true) {\r
+                       if (todoindex >= todo.size()) {\r
+                               todoindex = 0;\r
+                               todo = new Vector<DeliveryTask>();\r
+                               String[] files = dir.list();\r
+                               Arrays.sort(files);\r
+                               for (String fname: files) {\r
+                                       if (!fname.endsWith(".M")) {\r
+                                               continue;\r
+                                       }\r
+                                       String fname2 = fname.substring(0, fname.length() - 2);\r
+                                       long pidtime = 0;\r
+                                       int dot = fname2.indexOf('.');\r
+                                       if (dot < 1) {\r
+                                               continue;\r
+                                       }\r
+                                       try {\r
+                                               pidtime = Long.parseLong(fname2.substring(0, dot));\r
+                                       } catch (Exception e) {\r
+                                       }\r
+                                       if (pidtime < 1000000000000L) {\r
+                                               continue;\r
+                                       }\r
+                                       if (working.get(fname2) != null) {\r
+                                               continue;\r
+                                       }\r
+                                       DeliveryTask dt = retry.get(fname2);\r
+                                       if (dt == null) {\r
+                                               dt = new DeliveryTask(this, fname2);\r
+                                       }\r
+                                       todo.add(dt);\r
+                               }\r
+                               retry = new Hashtable<String, DeliveryTask>();\r
+                       }\r
+                       if (todoindex < todo.size()) {\r
+                               DeliveryTask dt = todo.get(todoindex);\r
+                               if (dt.isCleaned()) {\r
+                                       todoindex++;\r
+                                       continue;\r
+                               }\r
+                               if (dt.getDate() >= mindate) {\r
+                                       return(dt);\r
+                               }\r
+                               todoindex++;\r
+                               reportExpiry(dt);\r
+                               continue;\r
+                       }\r
+                       return(null);\r
+               }\r
+       }\r
+       /**\r
+        *      Create a delivery queue for a given destination info\r
+        */\r
+       public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) {\r
+               this.dqh = dqh;\r
+               this.di = di;\r
+               dir = new File(di.getSpool());\r
+               dir.mkdirs();\r
+       }\r
+       /**\r
+        *      Update the destination info for this delivery queue\r
+        */\r
+       public void config(DestInfo di) {\r
+               this.di = di;\r
+       }\r
+       /**\r
+        *      Get the dest info\r
+        */\r
+       public DestInfo getDestInfo() {\r
+               return(di);\r
+       }\r
+       /**\r
+        *      Get the config manager\r
+        */\r
+       public DeliveryQueueHelper getConfig() {\r
+               return(dqh);\r
+       }\r
+       /**\r
+        *      Exceptional condition occurred during delivery\r
+        */\r
+       public void reportDeliveryExtra(DeliveryTask task, long sent) {\r
+               StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);\r
+       }\r
+       /**\r
+        *      Message too old to deliver\r
+        */\r
+       public void reportExpiry(DeliveryTask task) {\r
+               StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());\r
+               markExpired(task);\r
+       }\r
+       /**\r
+        *      Completed a delivery attempt\r
+        */\r
+       public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {\r
+               if (status < 300) {\r
+                       StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid);\r
+                       markSuccess(task);\r
+               } else if (status < 400 && dqh.isFollowRedirects()) {\r
+                       StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);\r
+                       if (dqh.handleRedirection(di, location, task.getFileId())) {\r
+                               markRedirect(task);\r
+                       } else {\r
+                               StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());\r
+                               markFailNoRetry(task);\r
+                       }\r
+               } else if (status < 500) {\r
+                       StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);\r
+                       StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());\r
+                       markFailNoRetry(task);\r
+               } else {\r
+                       StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);\r
+                       markFailWithRetry(task);\r
+               }\r
+       }\r
+       /**\r
+        *      Delivery failed by reason of an exception\r
+        */\r
+       public void reportException(DeliveryTask task, Exception exception) {\r
+               StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString());\r
+               dqh.handleUnreachable(di);\r
+               markFailWithRetry(task);\r
+       }\r
+       /**\r
+        *      Get the feed ID for a subscription\r
+        *      @param subid    The subscription ID\r
+        *      @return The feed ID\r
+        */\r
+       public String getFeedId(String subid) {\r
+               return(dqh.getFeedId(subid));\r
+       }\r
+       /**\r
+        *      Get the URL to deliver a message to given the file ID\r
+        */\r
+       public String getDestURL(String fileid) {\r
+               return(dqh.getDestURL(di, fileid));\r
+       }\r
+       /**\r
+        *      Deliver files until there's a failure or there are no more\r
+        *      files to deliver\r
+        */\r
+       public void run() {\r
+               DeliveryTask t;\r
+               long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit();\r
+               int filestogo = dqh.getFairFileLimit();\r
+               while ((t = getNext()) != null) {\r
+                       t.run();\r
+                       if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {\r
+                               break;\r
+                       }\r
+               }\r
+       }\r
+       /**\r
+        *      Is there no work to do for this queue right now?\r
+        */\r
+       public synchronized boolean isSkipSet() {\r
+               return(peekNext() == null);\r
+       }\r
+       /**\r
+        *      Reset the retry timer\r
+        */\r
+       public void resetQueue() {\r
+               resumetime = System.currentTimeMillis();\r
+       }\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java
new file mode 100644 (file)
index 0000000..770db1d
--- /dev/null
@@ -0,0 +1,89 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+/**\r
+ *     Interface to allow independent testing of the DeliveryQueue code\r
+ *     <p>\r
+ *     This interface represents all of the configuration information and\r
+ *     feedback mechanisms that a delivery queue needs.\r
+ */\r
+public interface       DeliveryQueueHelper     {\r
+       /**\r
+        *      Get the timeout (milliseconds) before retrying after an initial delivery failure\r
+        */\r
+       public long getInitFailureTimer();\r
+       /**\r
+        *      Get the ratio between timeouts on consecutive delivery attempts\r
+        */\r
+       public double   getFailureBackoff();\r
+       /**\r
+        *      Get the maximum timeout (milliseconds) between delivery attempts\r
+        */\r
+       public long     getMaxFailureTimer();\r
+       /**\r
+        *      Get the expiration timer (milliseconds) for deliveries\r
+        */\r
+       public long     getExpirationTimer();\r
+       /**\r
+        *      Get the maximum number of file delivery attempts before checking\r
+        *      if another queue has work to be performed.\r
+        */\r
+       public int getFairFileLimit();\r
+       /**\r
+        *      Get the maximum amount of time spent delivering files before checking if another queue has work to be performed.\r
+        */\r
+       public long getFairTimeLimit();\r
+       /**\r
+        *      Get the URL for delivering a file\r
+        *      @param dest     The destination information for the file to be delivered.\r
+        *      @param fileid   The file id for the file to be delivered.\r
+        *      @return The URL for delivering the file (typically, dest.getURL() + "/" + fileid).\r
+        */\r
+       public String   getDestURL(DestInfo dest, String fileid);\r
+       /**\r
+        *      Forget redirections associated with a subscriber\r
+        *      @param  dest    Destination information to forget\r
+        */\r
+       public void     handleUnreachable(DestInfo dest);\r
+       /**\r
+        *      Post redirection for a subscriber\r
+        *      @param  dest    Destination information to update\r
+        *      @param  location        Location given by subscriber\r
+        *      @param  fileid  File ID of request\r
+        *      @return true if this 3xx response is retryable, otherwise, false.\r
+        */\r
+       public boolean  handleRedirection(DestInfo dest, String location, String fileid);\r
+       /**\r
+        *      Should I handle 3xx responses differently than 4xx responses?\r
+        */\r
+       public boolean  isFollowRedirects();\r
+       /**\r
+        *      Get the feed ID for a subscription\r
+        *      @param subid    The subscription ID\r
+        *      @return The feed ID\r
+        */\r
+       public String getFeedId(String subid);\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java
new file mode 100644 (file)
index 0000000..3d72a41
--- /dev/null
@@ -0,0 +1,308 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+import java.io.*;\r
+import java.net.*;\r
+import java.util.*;\r
+import org.apache.log4j.Logger;\r
+\r
+/**\r
+ *     A file to be delivered to a destination.\r
+ *     <p>\r
+ *     A Delivery task represents a work item for the data router - a file that\r
+ *     needs to be delivered and provides mechanisms to get information about\r
+ *     the file and its delivery data as well as to attempt delivery.\r
+ */\r
+public class DeliveryTask implements Runnable, Comparable<DeliveryTask>        {\r
+       private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.DeliveryTask");\r
+       private DeliveryTaskHelper      dth;\r
+       private String  pubid;\r
+       private DestInfo        di;\r
+       private String  spool;\r
+       private File    datafile;\r
+       private File    metafile;\r
+       private long    length;\r
+       private long    date;\r
+       private String  method;\r
+       private String  fileid;\r
+       private String  ctype;\r
+       private String  url;\r
+       private String  feedid;\r
+       private String  subid;\r
+       private int     attempts;\r
+       private String[][]      hdrs;\r
+       /**\r
+        *      Is the object a DeliveryTask with the same publication ID?\r
+        */\r
+       public boolean equals(Object o) {\r
+               if (!(o instanceof DeliveryTask)) {\r
+                       return(false);\r
+               }\r
+               return(pubid.equals(((DeliveryTask)o).pubid));\r
+       }\r
+       /**\r
+        *      Compare the publication IDs.\r
+        */\r
+       public int compareTo(DeliveryTask o) {\r
+               return(pubid.compareTo(o.pubid));\r
+       }\r
+       /**\r
+        *      Get the hash code of the publication ID.\r
+        */\r
+       public int hashCode() {\r
+               return(pubid.hashCode());\r
+       }\r
+       /**\r
+        *      Return the publication ID.\r
+        */\r
+       public String toString() {\r
+               return(pubid);\r
+       }\r
+       /**\r
+        *      Create a delivery task for a given delivery queue and pub ID\r
+        *      @param  dth     The delivery task helper for the queue this task is in.\r
+        *      @param  pubid   The publish ID for this file.  This is used as\r
+        *      the base for the file name in the spool directory and is of\r
+        *      the form <milliseconds since 1970>.<fqdn of initial data router node>\r
+        */\r
+       public DeliveryTask(DeliveryTaskHelper dth, String pubid) {\r
+               this.dth = dth;\r
+               this.pubid = pubid;\r
+               di = dth.getDestInfo();\r
+               subid = di.getSubId();\r
+               feedid = di.getLogData();\r
+               spool = di.getSpool();\r
+               String dfn = spool + "/" + pubid;\r
+               String mfn = dfn + ".M";\r
+               datafile = new File(spool + "/" + pubid);\r
+               metafile = new File(mfn);\r
+               boolean monly = di.isMetaDataOnly();\r
+               date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));\r
+               Vector<String[]> hdrv = new Vector<String[]>();\r
+               try {\r
+                       BufferedReader br = new BufferedReader(new FileReader(metafile));\r
+                       String s = br.readLine();\r
+                       int i = s.indexOf('\t');\r
+                       method = s.substring(0, i);\r
+                       if (!"DELETE".equals(method) && !monly) {\r
+                               length = datafile.length();\r
+                       }\r
+                       fileid = s.substring(i + 1);\r
+                       while ((s = br.readLine()) != null) {\r
+                               i = s.indexOf('\t');\r
+                               String h = s.substring(0, i);\r
+                               String v = s.substring(i + 1);\r
+                               if ("x-att-dr-routing".equalsIgnoreCase(h)) {\r
+                                       subid = v.replaceAll("[^ ]*/", "");\r
+                                       feedid = dth.getFeedId(subid.replaceAll(" .*", ""));\r
+                               }\r
+                               if (length == 0 && h.toLowerCase().startsWith("content-")) {\r
+                                       continue;\r
+                               }\r
+                               if (h.equalsIgnoreCase("content-type")) {\r
+                                       ctype = v;\r
+                               }\r
+                               hdrv.add(new String[] {h, v});\r
+                       }\r
+                       br.close();\r
+               } catch (Exception e) {\r
+               }\r
+               hdrs = hdrv.toArray(new String[hdrv.size()][]);\r
+               url = dth.getDestURL(fileid);\r
+       }\r
+       /**\r
+        *      Get the publish ID\r
+        */\r
+       public String getPublishId() {\r
+               return(pubid);\r
+       }\r
+       /**\r
+        *      Attempt delivery\r
+        */\r
+       public void run() {\r
+               attempts++;\r
+               try {\r
+                       di = dth.getDestInfo();\r
+                       boolean expect100 = di.isUsing100();\r
+                       boolean monly = di.isMetaDataOnly();\r
+                       length = 0;\r
+                       if (!"DELETE".equals(method) && !monly) {\r
+                               length = datafile.length();\r
+                       }\r
+                       url = dth.getDestURL(fileid);\r
+                       URL u = new URL(url);\r
+                       HttpURLConnection uc = (HttpURLConnection)u.openConnection();\r
+                       uc.setConnectTimeout(60000);\r
+                       uc.setReadTimeout(60000);\r
+                       uc.setInstanceFollowRedirects(false);\r
+                       uc.setRequestMethod(method);\r
+                       uc.setRequestProperty("Content-Length", Long.toString(length));\r
+                       uc.setRequestProperty("Authorization", di.getAuth());\r
+                       uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);\r
+                       for (String[] nv: hdrs) {\r
+                               uc.addRequestProperty(nv[0], nv[1]);\r
+                       }\r
+                       if (length > 0) {\r
+                               if (expect100) {\r
+                                       uc.setRequestProperty("Expect", "100-continue");\r
+                               }\r
+                               uc.setFixedLengthStreamingMode(length);\r
+                               uc.setDoOutput(true);\r
+                               OutputStream os = null;\r
+                               try {\r
+                                       os = uc.getOutputStream();\r
+                               } catch (ProtocolException pe) {\r
+                                       dth.reportDeliveryExtra(this, -1L);\r
+                                       // Rcvd error instead of 100-continue\r
+                               }\r
+                               if (os != null) {\r
+                                       long sofar = 0;\r
+                                       try {\r
+                                               byte[] buf = new byte[1024 * 1024];\r
+                                               InputStream is = new FileInputStream(datafile);\r
+                                               while (sofar < length) {\r
+                                                       int i = buf.length;\r
+                                                       if (sofar + i > length) {\r
+                                                               i = (int)(length - sofar);\r
+                                                       }\r
+                                                       i = is.read(buf, 0, i);\r
+                                                       if (i <= 0) {\r
+                                                               throw new IOException("Unexpected problem reading data file " + datafile);\r
+                                                       }\r
+                                                       sofar += i;\r
+                                                       os.write(buf, 0, i);\r
+                                               }\r
+                                               is.close();\r
+                                               os.close();\r
+                                       } catch (IOException ioe) {\r
+                                               dth.reportDeliveryExtra(this, sofar);\r
+                                               throw ioe;\r
+                                       }\r
+                               }\r
+                       }\r
+                       int rc = uc.getResponseCode();\r
+                       String rmsg = uc.getResponseMessage();\r
+                       if (rmsg == null) {\r
+                               String h0 = uc.getHeaderField(0);\r
+                               if (h0 != null) {\r
+                                       int i = h0.indexOf(' ');\r
+                                       int j = h0.indexOf(' ', i + 1);\r
+                                       if (i != -1 && j != -1) {\r
+                                               rmsg = h0.substring(j + 1);\r
+                                       }\r
+                               }\r
+                       }\r
+                       String xpubid = null;\r
+                       InputStream is;\r
+                       if (rc >= 200 && rc <= 299) {\r
+                               is = uc.getInputStream();\r
+                               xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");\r
+                       } else {\r
+                               if (rc >= 300 && rc <= 399) {\r
+                                       rmsg = uc.getHeaderField("Location");\r
+                               }\r
+                               is = uc.getErrorStream();\r
+                       }\r
+                       byte[] buf = new byte[4096];\r
+                       if (is != null) {\r
+                               while (is.read(buf) > 0) {\r
+                               }\r
+                               is.close();\r
+                       }\r
+                       dth.reportStatus(this, rc, xpubid, rmsg);\r
+               } catch (Exception e) {\r
+                       dth.reportException(this, e);\r
+               }\r
+       }\r
+       /**\r
+        *      Remove meta and data files\r
+        */\r
+       public void clean() {\r
+               datafile.delete();\r
+               metafile.delete();\r
+               hdrs = null;\r
+       }\r
+       /**\r
+        *      Has this delivery task been cleaned?\r
+        */\r
+       public boolean isCleaned() {\r
+               return(hdrs == null);\r
+       }\r
+       /**\r
+        *      Get length of body\r
+        */\r
+       public long     getLength() {\r
+               return(length);\r
+       }\r
+       /**\r
+        *      Get creation date as encoded in the publish ID.\r
+        */\r
+       public long     getDate() {\r
+               return(date);\r
+       }\r
+       /**\r
+        *      Get the most recent delivery attempt URL\r
+        */\r
+       public String getURL() {\r
+               return(url);\r
+       }\r
+       /**\r
+        *      Get the content type\r
+        */\r
+       public String   getCType() {\r
+               return(ctype);\r
+       }\r
+       /**\r
+        *      Get the method\r
+        */\r
+       public String   getMethod() {\r
+               return(method);\r
+       }\r
+       /**\r
+        *      Get the file ID\r
+        */\r
+       public String   getFileId() {\r
+               return(fileid);\r
+       }\r
+       /**\r
+        *      Get the number of delivery attempts\r
+        */\r
+       public int      getAttempts() {\r
+               return(attempts);\r
+       }\r
+       /**\r
+        *      Get the (space delimited list of) subscription ID for this delivery task\r
+        */\r
+       public String   getSubId() {\r
+               return(subid);\r
+       }\r
+       /**\r
+        *      Get the feed ID for this delivery task\r
+        */\r
+       public String   getFeedId() {\r
+               return(feedid);\r
+       }\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java
new file mode 100644 (file)
index 0000000..702bb29
--- /dev/null
@@ -0,0 +1,72 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+/**\r
+ *     Interface to allow independent testing of the DeliveryTask code.\r
+ *     <p>\r
+ *     This interface represents all the configuraiton information and\r
+ *     feedback mechanisms that a delivery task needs.\r
+ */\r
+\r
+public interface DeliveryTaskHelper    {\r
+       /**\r
+        *      Report that a delivery attempt failed due to an exception (like can't connect to remote host)\r
+        *      @param task     The task that failed\r
+        *      @param exception        The exception that occurred\r
+        */\r
+       public void reportException(DeliveryTask task, Exception exception);\r
+       /**\r
+        *      Report that a delivery attempt completed (successfully or unsuccessfully)\r
+        *      @param task     The task that failed\r
+        *      @param status   The HTTP status\r
+        *      @param xpubid   The publish ID from the far end (if any)\r
+        *      @param location The redirection location for a 3XX response\r
+        */\r
+       public void reportStatus(DeliveryTask task, int status, String xpubid, String location);\r
+       /**\r
+        *      Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue.\r
+        *      @param task     The task that failed\r
+        *      @param sent     The number of bytes sent or -1 if an error was returned instead of 100 Continue.\r
+        */\r
+       public void reportDeliveryExtra(DeliveryTask task, long sent);\r
+       /**\r
+        *      Get the destination information for the delivery queue\r
+        *      @return The destination information\r
+        */\r
+       public DestInfo getDestInfo();\r
+       /**\r
+        *      Given a file ID, get the URL to deliver to\r
+        *      @param fileid   The file id\r
+        *      @return The URL to deliver to\r
+        */\r
+       public String   getDestURL(String fileid);\r
+       /**\r
+        *      Get the feed ID for a subscription\r
+        *      @param subid    The subscription ID\r
+        *      @return The feed iD\r
+        */\r
+       public String   getFeedId(String subid);\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java
new file mode 100644 (file)
index 0000000..e57fef8
--- /dev/null
@@ -0,0 +1,132 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+/**\r
+ *     Information for a delivery destination that doesn't change from message to message\r
+ */\r
+public class DestInfo  {\r
+       private String  name;\r
+       private String  spool;\r
+       private String  subid;\r
+       private String  logdata;\r
+       private String  url;\r
+       private String  authuser;\r
+       private String  authentication;\r
+       private boolean metaonly;\r
+       private boolean use100;\r
+       /**\r
+        *      Create a destination information object.\r
+        *      @param  name    n:fqdn or s:subid\r
+        *      @param  spool   The directory where files are spooled.\r
+        *      @param  subid   The subscription ID (if applicable).\r
+        *      @param  logdata Text to be included in log messages\r
+        *      @param  url     The URL to deliver to.\r
+        *      @param  authuser        The auth user for logging.\r
+        *      @param  authentication  The credentials.\r
+        *      @param  metaonly        Is this a metadata only delivery?\r
+        *      @param  use100  Should I use expect 100-continue?\r
+        */\r
+       public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100) {\r
+               this.name = name;\r
+               this.spool = spool;\r
+               this.subid = subid;\r
+               this.logdata = logdata;\r
+               this.url = url;\r
+               this.authuser = authuser;\r
+               this.authentication = authentication;\r
+               this.metaonly = metaonly;\r
+               this.use100 = use100;\r
+       }\r
+       public boolean equals(Object o) {\r
+               return((o instanceof DestInfo) && ((DestInfo)o).spool.equals(spool));\r
+       }\r
+       public int hashCode() {\r
+               return(spool.hashCode());\r
+       }\r
+       /**\r
+        *      Get the name of this destination\r
+        */\r
+       public String getName() {\r
+               return(name);\r
+       }\r
+       /**\r
+        *      Get the spool directory for this destination.\r
+        *      @return The spool directory\r
+        */\r
+       public String getSpool() {\r
+               return(spool);\r
+       }\r
+       /**\r
+        *      Get the subscription ID.\r
+        *      @return Subscription ID or null if this is a node to node delivery.\r
+        */\r
+       public String getSubId() {\r
+               return(subid);\r
+       }\r
+       /**\r
+        *      Get the log data.\r
+        *      @return Text to be included in a log message about delivery attempts.\r
+        */\r
+       public String getLogData() {\r
+               return(logdata);\r
+       }\r
+       /**\r
+        *      Get the delivery URL.\r
+        *      @return The URL to deliver to (the primary URL).\r
+        */\r
+       public String getURL() {\r
+               return(url);\r
+\r
+       }\r
+       /**\r
+        *      Get the user for authentication\r
+        *      @return The name of the user for logging\r
+        */\r
+       public String   getAuthUser() {\r
+               return(authuser);\r
+       }\r
+       /**\r
+        *      Get the authentication header\r
+        *      @return The string to use to authenticate to the recipient.\r
+        */\r
+       public String getAuth() {\r
+               return(authentication);\r
+       }\r
+       /**\r
+        *      Is this a metadata only delivery?\r
+        *      @return True if this is a metadata only delivery\r
+        */\r
+       public boolean  isMetaDataOnly() {\r
+               return(metaonly);\r
+       }\r
+       /**\r
+        *      Should I send expect 100-continue header?\r
+        *      @return True if I should.\r
+        */\r
+       public boolean isUsing100() {\r
+               return(use100);\r
+       }\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java
new file mode 100644 (file)
index 0000000..bb3e413
--- /dev/null
@@ -0,0 +1,82 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+import java.util.*;\r
+import java.net.*;\r
+\r
+/**\r
+ *     Determine if an IP address is from a machine\r
+ */\r
+public class IsFrom    {\r
+       private long    nextcheck;\r
+       private String[] ips;\r
+       private String  fqdn;\r
+       /**\r
+        *      Configure the JVM DNS cache to have a 10 second TTL.  This needs to be called very very early or it won't have any effect.\r
+        */\r
+       public static void setDNSCache() {\r
+               java.security.Security.setProperty("networkaddress.cache.ttl", "10");\r
+       }\r
+       /**\r
+        *      Create an IsFrom for the specified fully qualified domain name.\r
+        */\r
+       public IsFrom(String fqdn) {\r
+               this.fqdn = fqdn;\r
+       }\r
+       /**\r
+        *      Check if an IP address matches.  If it has been more than\r
+        *      10 seconds since DNS was last checked for changes to the\r
+        *      IP address(es) of this FQDN, check again.  Then check\r
+        *      if the specified IP address belongs to the FQDN.\r
+        */\r
+       public synchronized boolean isFrom(String ip) {\r
+               long now = System.currentTimeMillis();\r
+               if (now > nextcheck) {\r
+                       nextcheck = now + 10000;\r
+                       Vector<String> v = new Vector<String>();\r
+                       try {\r
+                               InetAddress[] addrs = InetAddress.getAllByName(fqdn);\r
+                               for (InetAddress a: addrs) {\r
+                                       v.add(a.getHostAddress());\r
+                               }\r
+                       } catch (Exception e) {\r
+                       }\r
+                       ips = v.toArray(new String[v.size()]);\r
+               }\r
+               for (String s: ips) {\r
+                       if (s.equals(ip)) {\r
+                               return(true);\r
+                       }\r
+               }\r
+               return(false);\r
+       }\r
+       /**\r
+        *      Return the fully qualified domain name\r
+        */\r
+       public String toString() {\r
+               return(fqdn);\r
+       }\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java
new file mode 100644 (file)
index 0000000..078deaa
--- /dev/null
@@ -0,0 +1,159 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+package com.att.research.datarouter.node;\r
+\r
+import java.util.*;\r
+import java.util.regex.*;\r
+import java.io.*;\r
+import java.nio.file.*;\r
+import java.text.*;\r
+\r
+/**\r
+ *     Cleanup of old log files.\r
+ *     <p>\r
+ *     Periodically scan the log directory for log files that are older than\r
+ *     the log file retention interval, and delete them.  In a future release,\r
+ *     This class will also be responsible for uploading events logs to the\r
+ *     log server to support the log query APIs.\r
+ */\r
+\r
+public class LogManager        extends TimerTask       {\r
+       private NodeConfigManager       config;\r
+       private Matcher isnodelog;\r
+       private Matcher iseventlog;\r
+       private Uploader        worker;\r
+       private String  uploaddir;\r
+       private String  logdir;\r
+       private class Uploader extends Thread implements DeliveryQueueHelper {\r
+               public long getInitFailureTimer() { return(10000L); }\r
+               public double getFailureBackoff() { return(2.0); }\r
+               public long getMaxFailureTimer() { return(150000L); }\r
+               public long getExpirationTimer() { return(604800000L); }\r
+               public int getFairFileLimit() { return(10000); }\r
+               public long getFairTimeLimit() { return(86400000); }\r
+               public String getDestURL(DestInfo dest, String fileid) {\r
+                       return(config.getEventLogUrl());\r
+               }\r
+               public void handleUnreachable(DestInfo dest) {}\r
+               public boolean handleRedirection(DestInfo dest, String location, String fileid) { return(false); }\r
+               public boolean isFollowRedirects() { return(false); }\r
+               public String getFeedId(String subid) { return(null); }\r
+               private DeliveryQueue dq;\r
+               public Uploader() {\r
+                       dq = new DeliveryQueue(this, new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false, false));\r
+                       setDaemon(true);\r
+                       setName("Log Uploader");\r
+                       start();\r
+               }\r
+               private synchronized void snooze() {\r
+                       try {\r
+                               wait(10000);\r
+                       } catch (Exception e) {\r
+                       }\r
+               }\r
+               private synchronized void poke() {\r
+                       notify();\r
+               }\r
+               public void run() {\r
+                       while (true) {\r
+                               scan();\r
+                               dq.run();\r
+                               snooze();\r
+                       }\r
+               }\r
+               private void scan() {\r
+                       long threshold = System.currentTimeMillis() - config.getLogRetention();\r
+                       File dir = new File(logdir);\r
+                       String[] fns = dir.list();\r
+                       Arrays.sort(fns);\r
+                       String lastqueued = "events-000000000000.log";\r
+                       String curlog = StatusLog.getCurLogFile();\r
+                       curlog = curlog.substring(curlog.lastIndexOf('/') + 1);\r
+                       try {\r
+                               Writer w = new FileWriter(uploaddir + "/.meta");\r
+                               w.write("POST\tlogdata\nContent-Type\ttext/plain\n");\r
+                               w.close();\r
+                               BufferedReader br = new BufferedReader(new FileReader(uploaddir + "/.lastqueued"));\r
+                               lastqueued = br.readLine();\r
+                               br.close();\r
+                       } catch (Exception e) {\r
+                       }\r
+                       for (String fn: fns) {\r
+                               if (!isnodelog.reset(fn).matches()) {\r
+                                       if (!iseventlog.reset(fn).matches()) {\r
+                                               continue;\r
+                                       }\r
+                                       if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) {\r
+                                               lastqueued = fn;\r
+                                               try {\r
+                                                       String pid = config.getPublishId();\r
+                                                       Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn));\r
+                                                       Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + "/.meta"));\r
+                                               } catch (Exception e) {\r
+                                               }\r
+                                       }\r
+                               }\r
+                               File f = new File(dir, fn);\r
+                               if (f.lastModified() < threshold) {\r
+                                       f.delete();\r
+                               }\r
+                       }\r
+                       try {\r
+                               (new File(uploaddir + "/.meta")).delete();\r
+                               Writer w = new FileWriter(uploaddir + "/.lastqueued");\r
+                               w.write(lastqueued + "\n");\r
+                               w.close();\r
+                       } catch (Exception e) {\r
+                       }\r
+               }\r
+       }\r
+       /**\r
+        *      Construct a log manager\r
+        *      <p>\r
+        *      The log manager will check for expired log files every 5 minutes\r
+        *      at 20 seconds after the 5 minute boundary.  (Actually, the\r
+        *      interval is the event log rollover interval, which\r
+        *      defaults to 5 minutes).\r
+        */\r
+       public LogManager(NodeConfigManager config) {\r
+               this.config = config;\r
+               try {\r
+                       isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher("");\r
+                       iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher("");\r
+               } catch (Exception e) {}\r
+               logdir = config.getLogDir();\r
+               uploaddir = logdir + "/.spool";\r
+               (new File(uploaddir)).mkdirs();\r
+               long now = System.currentTimeMillis();\r
+               long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 300000);\r
+               long when = now - now % intvl + intvl + 20000L;\r
+               config.getTimer().scheduleAtFixedRate(this, when - now, intvl);\r
+               worker = new Uploader();\r
+       }\r
+       /**\r
+        *      Trigger check for expired log files and log files to upload\r
+        */\r
+       public void run() {\r
+               worker.poke();\r
+       }\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java
new file mode 100644 (file)
index 0000000..689f765
--- /dev/null
@@ -0,0 +1,722 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+import java.util.*;\r
+import java.io.*;\r
+\r
+/**\r
+ *     Processed configuration for this node.\r
+ *     <p>\r
+ *     The NodeConfig represents a processed configuration from the Data Router\r
+ *     provisioning server.  Each time configuration data is received from the\r
+ *     provisioning server, a new NodeConfig is created and the previous one\r
+ *     discarded.\r
+ */\r
+public class NodeConfig        {\r
+       /**\r
+        *      Raw configuration entry for a data router node\r
+        */\r
+       public static class ProvNode {\r
+               private String cname;\r
+               /**\r
+                *      Construct a node configuration entry.\r
+                *      @param cname    The cname of the node.\r
+                */\r
+               public ProvNode(String cname) {\r
+                       this.cname = cname;\r
+               }\r
+               /**\r
+                *      Get the cname of the node\r
+                */\r
+               public String getCName() {\r
+                       return(cname);\r
+               }\r
+       }\r
+       /**\r
+        *      Raw configuration entry for a provisioning parameter\r
+        */\r
+       public static class ProvParam {\r
+               private String name;\r
+               private String value;\r
+               /**\r
+                *      Construct a provisioning parameter configuration entry.\r
+                *      @param  name The name of the parameter.\r
+                *      @param  value The value of the parameter.\r
+                */\r
+               public ProvParam(String name, String value) {\r
+                       this.name = name;\r
+                       this.value = value;\r
+               }\r
+               /**\r
+                *      Get the name of the parameter.\r
+                */\r
+               public String getName() {\r
+                       return(name);\r
+               }\r
+               /**\r
+                *      Get the value of the parameter.\r
+                */\r
+               public String getValue() {\r
+                       return(value);\r
+               }\r
+       }\r
+       /**\r
+        *      Raw configuration entry for a data feed.\r
+        */\r
+       public static class ProvFeed {\r
+               private String id;\r
+               private String logdata;\r
+               private String status;\r
+               /**\r
+                *      Construct a feed configuration entry.\r
+                *      @param id       The feed ID of the entry.\r
+                *      @param logdata  String for log entries about the entry.\r
+                *      @param status   The reason why this feed cannot be used (Feed has been deleted, Feed has been suspended) or null if it is valid.\r
+                */\r
+               public ProvFeed(String id, String logdata, String status) {\r
+                       this.id = id;\r
+                       this.logdata = logdata;\r
+                       this.status = status;\r
+               }\r
+               /**\r
+                *      Get the feed id of the data feed.\r
+                */\r
+               public String getId() {\r
+                       return(id);\r
+               }\r
+               /**\r
+                *      Get the log data of the data feed.\r
+                */\r
+               public String getLogData() {\r
+                       return(logdata);\r
+               }\r
+               /**\r
+                *      Get the status of the data feed.\r
+                */\r
+               public String getStatus() {\r
+                       return(status);\r
+               }\r
+       }\r
+       /**\r
+        *      Raw configuration entry for a feed user.\r
+        */\r
+       public static class ProvFeedUser        {\r
+               private String feedid;\r
+               private String user;\r
+               private String credentials;\r
+               /**\r
+                *      Construct a feed user configuration entry\r
+                *      @param feedid   The feed id.\r
+                *      @param user     The user that will publish to the feed.\r
+                *      @param credentials      The Authorization header the user will use to publish.\r
+                */\r
+               public ProvFeedUser(String feedid, String user, String credentials) {\r
+                       this.feedid = feedid;\r
+                       this.user = user;\r
+                       this.credentials = credentials;\r
+               }\r
+               /**\r
+                *      Get the feed id of the feed user.\r
+                */\r
+               public String getFeedId() {\r
+                       return(feedid);\r
+               }\r
+               /**\r
+                *      Get the user for the feed user.\r
+                */\r
+               public String getUser() {\r
+                       return(user);\r
+               }\r
+               /**\r
+                *      Get the credentials for the feed user.\r
+                */\r
+               public String getCredentials() {\r
+                       return(credentials);\r
+               }\r
+       }\r
+       /**\r
+        *      Raw configuration entry for a feed subnet\r
+        */\r
+       public static class ProvFeedSubnet      {\r
+               private String feedid;\r
+               private String cidr;\r
+               /**\r
+                *      Construct a feed subnet configuration entry\r
+                *      @param feedid   The feed ID\r
+                *      @param cidr     The CIDR allowed to publish to the feed.\r
+                */\r
+               public ProvFeedSubnet(String feedid, String cidr) {\r
+                       this.feedid = feedid;\r
+                       this.cidr = cidr;\r
+               }\r
+               /**\r
+                *      Get the feed id of the feed subnet.\r
+                */\r
+               public String getFeedId() {\r
+                       return(feedid);\r
+               }\r
+               /**\r
+                *      Get the CIDR of the feed subnet.\r
+                */\r
+               public String getCidr() {\r
+                       return(cidr);\r
+               }\r
+       }\r
+       /**\r
+        *      Raw configuration entry for a subscription\r
+        */\r
+       public static class ProvSubscription    {\r
+               private String  subid;\r
+               private String  feedid;\r
+               private String  url;\r
+               private String  authuser;\r
+               private String  credentials;\r
+               private boolean metaonly;\r
+               private boolean use100;\r
+               /**\r
+                *      Construct a subscription configuration entry\r
+                *      @param subid    The subscription ID\r
+                *      @param feedid   The feed ID\r
+                *      @param url      The base delivery URL (not including the fileid)\r
+                *      @param authuser The user in the credentials used to deliver\r
+                *      @param credentials      The credentials used to authenticate to the delivery URL exactly as they go in the Authorization header.\r
+                *      @param metaonly Is this a meta data only subscription?\r
+                *      @param use100   Should we send Expect: 100-continue?\r
+                */\r
+               public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, boolean metaonly, boolean use100) {\r
+                       this.subid = subid;\r
+                       this.feedid = feedid;\r
+                       this.url = url;\r
+                       this.authuser = authuser;\r
+                       this.credentials = credentials;\r
+                       this.metaonly = metaonly;\r
+                       this.use100 = use100;\r
+               }\r
+               /**\r
+                *      Get the subscription ID\r
+                */\r
+               public String getSubId() {\r
+                       return(subid);\r
+               }\r
+               /**\r
+                *      Get the feed ID\r
+                */\r
+               public String getFeedId() {\r
+                       return(feedid);\r
+               }\r
+               /**\r
+                *      Get the delivery URL\r
+                */\r
+               public String getURL() {\r
+                       return(url);\r
+               }\r
+               /**\r
+                *      Get the user\r
+                */\r
+               public String getAuthUser() {\r
+                       return(authuser);\r
+               }\r
+               /**\r
+                *      Get the delivery credentials\r
+                */\r
+               public String getCredentials() {\r
+                       return(credentials);\r
+               }\r
+               /**\r
+                *      Is this a meta data only subscription?\r
+                */\r
+               public boolean isMetaDataOnly() {\r
+                       return(metaonly);\r
+               }\r
+               /**\r
+                *      Should we send Expect: 100-continue?\r
+                */\r
+               public boolean isUsing100() {\r
+                       return(use100);\r
+               }\r
+       }\r
+       /**\r
+        *      Raw configuration entry for controlled ingress to the data router node\r
+        */\r
+       public static class ProvForceIngress    {\r
+               private String feedid;\r
+               private String subnet;\r
+               private String user;\r
+               private String[] nodes;\r
+               /**\r
+                *      Construct a forced ingress configuration entry\r
+                *      @param feedid   The feed ID that this entry applies to\r
+                *      @param subnet   The CIDR for which publisher IP addresses this entry applies to or "" if it applies to all publisher IP addresses\r
+                *      @param user     The publishing user this entry applies to or "" if it applies to all publishing users.\r
+                *      @param nodes    The array of FQDNs of the data router nodes to redirect publication attempts to.\r
+                */\r
+               public ProvForceIngress(String feedid, String subnet, String user, String[] nodes) {\r
+                       this.feedid = feedid;\r
+                       this.subnet = subnet;\r
+                       this.user = user;\r
+                       this.nodes = nodes;\r
+               }\r
+               /**\r
+                *      Get the feed ID\r
+                */\r
+               public String getFeedId() {\r
+                       return(feedid);\r
+               }\r
+               /**\r
+                *      Get the subnet\r
+                */\r
+               public String getSubnet() {\r
+                       return(subnet);\r
+               }\r
+               /**\r
+                *      Get the user\r
+                */\r
+               public String getUser() {\r
+                       return(user);\r
+               }\r
+               /**\r
+                *      Get the node\r
+                */\r
+               public String[] getNodes() {\r
+                       return(nodes);\r
+               }\r
+       }\r
+       /**\r
+        *      Raw configuration entry for controlled egress from the data router\r
+        */\r
+       public static class ProvForceEgress     {\r
+               private String subid;\r
+               private String node;\r
+               /**\r
+                *      Construct a forced egress configuration entry\r
+                *      @param subid    The subscription ID the subscription with forced egress\r
+                *      @param node     The node handling deliveries for this subscription\r
+                */\r
+               public ProvForceEgress(String subid, String node) {\r
+                       this.subid = subid;\r
+                       this.node = node;\r
+               }\r
+               /**\r
+                *      Get the subscription ID\r
+                */\r
+               public String getSubId() {\r
+                       return(subid);\r
+               }\r
+               /**\r
+                *      Get the node\r
+                */\r
+               public String getNode() {\r
+                       return(node);\r
+               }\r
+       }\r
+       /**\r
+        *      Raw configuration entry for routing within the data router network\r
+        */\r
+       public static class ProvHop     {\r
+               private String  from;\r
+               private String  to;\r
+               private String  via;\r
+               /**\r
+                *      A human readable description of this entry\r
+                */\r
+               public String toString() {\r
+                       return("Hop " + from + "->" + to + " via " + via);\r
+               }\r
+               /**\r
+                *      Construct a hop entry\r
+                *      @param from     The FQDN of the node with the data to be delivered\r
+                *      @param to       The FQDN of the node that will deliver to the subscriber\r
+                *      @param via      The FQDN of the node where the from node should send the data\r
+                */\r
+               public ProvHop(String from, String to, String via) {\r
+                       this.from = from;\r
+                       this.to = to;\r
+                       this.via = via;\r
+               }\r
+               /**\r
+                *      Get the from node\r
+                */\r
+               public String getFrom() {\r
+                       return(from);\r
+               }\r
+               /**\r
+                *      Get the to node\r
+                */\r
+               public String getTo() {\r
+                       return(to);\r
+               }\r
+               /**\r
+                *      Get the next intermediate node\r
+                */\r
+               public String getVia() {\r
+                       return(via);\r
+               }\r
+       }\r
+       private static class Redirection        {\r
+               public SubnetMatcher snm;\r
+               public String user;\r
+               public String[] nodes;\r
+       }\r
+       private static class Feed       {\r
+               public String   loginfo;\r
+               public String   status;\r
+               public SubnetMatcher[] subnets;\r
+               public Hashtable<String, String> authusers = new Hashtable<String, String>();\r
+               public Redirection[]    redirections;\r
+               public Target[] targets;\r
+       }\r
+       private Hashtable<String, String> params = new Hashtable<String, String>();\r
+       private Hashtable<String, Feed> feeds = new Hashtable<String, Feed>();\r
+       private Hashtable<String, DestInfo> nodeinfo = new Hashtable<String, DestInfo>();\r
+       private Hashtable<String, DestInfo> subinfo = new Hashtable<String, DestInfo>();\r
+       private Hashtable<String, IsFrom> nodes = new Hashtable<String, IsFrom>();\r
+       private String  myname;\r
+       private String  myauth;\r
+       private DestInfo[]      alldests;\r
+       private int     rrcntr;\r
+       /**\r
+        *      Process the raw provisioning data to configure this node\r
+        *      @param pd       The parsed provisioning data\r
+        *      @param myname   My name as seen by external systems\r
+        *      @param spooldir The directory where temporary files live\r
+        *      @param port     The port number for URLs\r
+        *      @param nodeauthkey      The keying string used to generate node authentication credentials\r
+        */\r
+       public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) {\r
+               this.myname = myname;\r
+               for (ProvParam p: pd.getParams()) {\r
+                       params.put(p.getName(), p.getValue());\r
+               }\r
+               Vector<DestInfo>        div = new Vector<DestInfo>();\r
+               myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);\r
+               for (ProvNode pn: pd.getNodes()) {\r
+                       String cn = pn.getCName();\r
+                       if (nodeinfo.get(cn) != null) {\r
+                               continue;\r
+                       }\r
+                       String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey);\r
+                       DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn, "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true);\r
+                       (new File(di.getSpool())).mkdirs();\r
+                       div.add(di);\r
+                       nodeinfo.put(cn, di);\r
+                       nodes.put(auth, new IsFrom(cn));\r
+               }\r
+               PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[nodeinfo.size()]), pd.getHops());\r
+               Hashtable<String, Vector<Redirection>> rdtab = new Hashtable<String, Vector<Redirection>>();\r
+               for (ProvForceIngress pfi: pd.getForceIngress()) {\r
+                       Vector<Redirection> v = rdtab.get(pfi.getFeedId());\r
+                       if (v == null) {\r
+                               v = new Vector<Redirection>();\r
+                               rdtab.put(pfi.getFeedId(), v);\r
+                       }\r
+                       Redirection r = new Redirection();\r
+                       if (pfi.getSubnet() != null) {\r
+                               r.snm = new SubnetMatcher(pfi.getSubnet());\r
+                       }\r
+                       r.user = pfi.getUser();\r
+                       r.nodes = pfi.getNodes();\r
+                       v.add(r);\r
+               }\r
+               Hashtable<String, Hashtable<String, String>> pfutab = new Hashtable<String, Hashtable<String, String>>();\r
+               for (ProvFeedUser pfu: pd.getFeedUsers()) {\r
+                       Hashtable<String, String> t = pfutab.get(pfu.getFeedId());\r
+                       if (t == null) {\r
+                               t = new Hashtable<String, String>();\r
+                               pfutab.put(pfu.getFeedId(), t);\r
+                       }\r
+                       t.put(pfu.getCredentials(), pfu.getUser());\r
+               }\r
+               Hashtable<String, String> egrtab = new Hashtable<String, String>();\r
+               for (ProvForceEgress pfe: pd.getForceEgress()) {\r
+                       if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) {\r
+                               continue;\r
+                       }\r
+                       egrtab.put(pfe.getSubId(), pfe.getNode());\r
+               }\r
+               Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<String, Vector<SubnetMatcher>>();\r
+               for (ProvFeedSubnet pfs: pd.getFeedSubnets()) {\r
+                       Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId());\r
+                       if (v == null) {\r
+                               v = new Vector<SubnetMatcher>();\r
+                               pfstab.put(pfs.getFeedId(), v);\r
+                       }\r
+                       v.add(new SubnetMatcher(pfs.getCidr()));\r
+               }\r
+               Hashtable<String, StringBuffer> ttab = new Hashtable<String, StringBuffer>();\r
+               HashSet<String> allfeeds = new HashSet<String>();\r
+               for (ProvFeed pfx: pd.getFeeds()) {\r
+                       if (pfx.getStatus() == null) {\r
+                               allfeeds.add(pfx.getId());\r
+                       }\r
+               }\r
+               for (ProvSubscription ps: pd.getSubscriptions()) {\r
+                       String sid = ps.getSubId();\r
+                       String fid = ps.getFeedId();\r
+                       if (!allfeeds.contains(fid)) {\r
+                               continue;\r
+                       }\r
+                       if (subinfo.get(sid) != null) {\r
+                               continue;\r
+                       }\r
+                       int sididx = 999;\r
+                       try {\r
+                               sididx = Integer.parseInt(sid);\r
+                               sididx -= sididx % 100;\r
+                       } catch (Exception e) {\r
+                       }\r
+                       String siddir = sididx + "/" + sid;\r
+                       DestInfo di = new DestInfo("s:" + sid, spooldir + "/s/" + siddir, sid, fid, ps.getURL(), ps.getAuthUser(), ps.getCredentials(), ps.isMetaDataOnly(), ps.isUsing100());\r
+                       (new File(di.getSpool())).mkdirs();\r
+                       div.add(di);\r
+                       subinfo.put(sid, di);\r
+                       String egr = egrtab.get(sid);\r
+                       if (egr != null) {\r
+                               sid = pf.getPath(egr) + sid;\r
+                       }\r
+                       StringBuffer sb = ttab.get(fid);\r
+                       if (sb == null) {\r
+                               sb = new StringBuffer();\r
+                               ttab.put(fid, sb);\r
+                       }\r
+                       sb.append(' ').append(sid);\r
+               }\r
+               alldests = div.toArray(new DestInfo[div.size()]);\r
+               for (ProvFeed pfx: pd.getFeeds()) {\r
+                       String fid = pfx.getId();\r
+                       Feed f = feeds.get(fid);\r
+                       if (f != null) {\r
+                               continue;\r
+                       }\r
+                       f = new Feed();\r
+                       feeds.put(fid, f);\r
+                       f.loginfo = pfx.getLogData();\r
+                       f.status = pfx.getStatus();\r
+                       Vector<SubnetMatcher> v1 = pfstab.get(fid);\r
+                       if (v1 == null) {\r
+                               f.subnets = new SubnetMatcher[0];\r
+                       } else {\r
+                               f.subnets = v1.toArray(new SubnetMatcher[v1.size()]);\r
+                       }\r
+                       Hashtable<String, String> h1 = pfutab.get(fid);\r
+                       if (h1 == null) {\r
+                               h1 = new Hashtable<String, String>();\r
+                       }\r
+                       f.authusers = h1;\r
+                       Vector<Redirection> v2 = rdtab.get(fid);\r
+                       if (v2 == null) {\r
+                               f.redirections = new Redirection[0];\r
+                       } else {\r
+                               f.redirections = v2.toArray(new Redirection[v2.size()]);\r
+                       }\r
+                       StringBuffer sb = ttab.get(fid);\r
+                       if (sb == null) {\r
+                               f.targets = new Target[0];\r
+                       } else {\r
+                               f.targets = parseRouting(sb.toString());\r
+                       }\r
+               }\r
+       }\r
+       /**\r
+        *      Parse a target string into an array of targets\r
+        *      @param routing Target string\r
+        *      @return Array of targets.\r
+        */\r
+       public Target[] parseRouting(String routing) {\r
+               routing = routing.trim();\r
+               if ("".equals(routing)) {\r
+                       return(new Target[0]);\r
+               }\r
+               String[] xx = routing.split("\\s+");\r
+               Hashtable<String, Target> tmap = new Hashtable<String, Target>();\r
+               HashSet<String> subset = new HashSet<String>();\r
+               Vector<Target> tv = new Vector<Target>();\r
+               Target[] ret = new Target[xx.length];\r
+               for (int i = 0; i < xx.length; i++) {\r
+                       String t = xx[i];\r
+                       int j = t.indexOf('/');\r
+                       if (j == -1) {\r
+                               DestInfo di = subinfo.get(t);\r
+                               if (di == null) {\r
+                                       tv.add(new Target(null, t));\r
+                               } else {\r
+                                       if (!subset.contains(t)) {\r
+                                               subset.add(t);\r
+                                               tv.add(new Target(di, null));\r
+                                       }\r
+                               }\r
+                       } else {\r
+                               String node = t.substring(0, j);\r
+                               String rtg = t.substring(j + 1);\r
+                               DestInfo di = nodeinfo.get(node);\r
+                               if (di == null) {\r
+                                       tv.add(new Target(null, t));\r
+                               } else {\r
+                                       Target tt = tmap.get(node);\r
+                                       if (tt == null) {\r
+                                               tt = new Target(di, rtg);\r
+                                               tmap.put(node, tt);\r
+                                               tv.add(tt);\r
+                                       } else {\r
+                                               tt.addRouting(rtg);\r
+                                       }\r
+                               }\r
+                       }\r
+               }\r
+               return(tv.toArray(new Target[tv.size()]));\r
+       }\r
+       /**\r
+        *      Check whether this is a valid node-to-node transfer\r
+        *      @param credentials      Credentials offered by the supposed node\r
+        *      @param ip       IP address the request came from\r
+        */\r
+       public boolean isAnotherNode(String credentials, String ip) {\r
+               IsFrom n = nodes.get(credentials);\r
+               return (n != null && n.isFrom(ip));\r
+       }\r
+       /**\r
+        *      Check whether publication is allowed.\r
+        *      @param feedid   The ID of the feed being requested.\r
+        *      @param credentials      The offered credentials\r
+        *      @param ip       The requesting IP address\r
+        */\r
+       public String isPublishPermitted(String feedid, String credentials, String ip) {\r
+               Feed f = feeds.get(feedid);\r
+               String nf = "Feed does not exist";\r
+               if (f != null) {\r
+                       nf = f.status;\r
+               }\r
+               if (nf != null) {\r
+                       return(nf);\r
+               }\r
+               String user = f.authusers.get(credentials);\r
+               if (user == null) {\r
+                       return("Publisher not permitted for this feed");\r
+               }\r
+               if (f.subnets.length == 0) {\r
+                       return(null);\r
+               }\r
+               byte[] addr = NodeUtils.getInetAddress(ip);\r
+               for (SubnetMatcher snm: f.subnets) {\r
+                       if (snm.matches(addr)) {\r
+                               return(null);\r
+                       }\r
+               }\r
+               return("Publisher not permitted for this feed");\r
+       }\r
+       /**\r
+        *      Get authenticated user\r
+        */\r
+       public String getAuthUser(String feedid, String credentials) {\r
+               return(feeds.get(feedid).authusers.get(credentials));\r
+       }\r
+       /**\r
+        *      Check if the request should be redirected to a different ingress node\r
+        */\r
+       public String getIngressNode(String feedid, String user, String ip) {\r
+               Feed f = feeds.get(feedid);\r
+               if (f.redirections.length == 0) {\r
+                       return(null);\r
+               }\r
+               byte[] addr = NodeUtils.getInetAddress(ip);\r
+               for (Redirection r: f.redirections) {\r
+                       if (r.user != null && !user.equals(r.user)) {\r
+                               continue;\r
+                       }\r
+                       if (r.snm != null && !r.snm.matches(addr)) {\r
+                               continue;\r
+                       }\r
+                       for (String n: r.nodes) {\r
+                               if (myname.equals(n)) {\r
+                                       return(null);\r
+                               }\r
+                       }\r
+                       if (r.nodes.length == 0) {\r
+                               return(null);\r
+                       }\r
+                       return(r.nodes[rrcntr++ % r.nodes.length]);\r
+               }\r
+               return(null);\r
+       }\r
+       /**\r
+        *      Get a provisioned configuration parameter\r
+        */\r
+       public String getProvParam(String name) {\r
+               return(params.get(name));\r
+       }\r
+       /**\r
+        *      Get all the DestInfos\r
+        */\r
+       public DestInfo[]       getAllDests() {\r
+               return(alldests);\r
+       }\r
+       /**\r
+        *      Get the targets for a feed\r
+        *      @param feedid   The feed ID\r
+        *      @return The targets this feed should be delivered to\r
+        */\r
+       public Target[] getTargets(String feedid) {\r
+               if (feedid == null) {\r
+                       return(new Target[0]);\r
+               }\r
+               Feed f = feeds.get(feedid);\r
+               if (f == null) {\r
+                       return(new Target[0]);\r
+               }\r
+               return(f.targets);\r
+       }\r
+       /**\r
+        *      Get the feed ID for a subscription\r
+        *      @param subid    The subscription ID\r
+        *      @return The feed ID\r
+        */\r
+       public String getFeedId(String subid) {\r
+               DestInfo di = subinfo.get(subid);\r
+               if (di == null) {\r
+                       return(null);\r
+               }\r
+               return(di.getLogData());\r
+       }\r
+       /**\r
+        *      Get the spool directory for a subscription\r
+        *      @param subid    The subscription ID\r
+        *      @return The spool directory\r
+        */\r
+       public String getSpoolDir(String subid) {\r
+               DestInfo di = subinfo.get(subid);\r
+               if (di == null) {\r
+                       return(null);\r
+               }\r
+               return(di.getSpool());\r
+       }\r
+       /**\r
+        *      Get the Authorization value this node uses\r
+        *      @return The Authorization header value for this node\r
+        */\r
+       public String getMyAuth() {\r
+               return(myauth);\r
+       }\r
+\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java
new file mode 100644 (file)
index 0000000..01ca442
--- /dev/null
@@ -0,0 +1,599 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+import java.net.*;\r
+import java.util.*;\r
+import java.io.*;\r
+import org.apache.log4j.Logger;\r
+\r
+import com.att.eelf.configuration.EELFLogger;\r
+import com.att.eelf.configuration.EELFManager;\r
+import com.att.research.datarouter.node.eelf.EelfMsgs;\r
+\r
+\r
+/**\r
+ *     Maintain the configuration of a Data Router node\r
+ *     <p>\r
+ *     The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention subsystems to access configuration information.  (Log4J has its own configuration mechanism).\r
+ *     <p>\r
+ *     There are two basic sets of configuration data.  The\r
+ *     static local configuration data, stored in a local configuration file (created\r
+ *     as part of installation by SWM), and the dynamic global\r
+ *     configuration data fetched from the data router provisioning server.\r
+ */\r
+public class NodeConfigManager implements DeliveryQueueHelper  {\r
+    private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeConfigManager");\r
+       private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeConfigManager");\r
+       private static NodeConfigManager        base = new NodeConfigManager();\r
+\r
+       private Timer timer = new Timer("Node Configuration Timer", true);\r
+       private long    maxfailuretimer;\r
+       private long    initfailuretimer;\r
+       private long    expirationtimer;\r
+       private double  failurebackoff;\r
+       private long    fairtimelimit;\r
+       private int     fairfilelimit;\r
+       private double  fdpstart;\r
+       private double  fdpstop;\r
+       private int     deliverythreads;\r
+       private String  provurl;\r
+       private String  provhost;\r
+       private IsFrom  provcheck;\r
+       private int     gfport;\r
+       private int     svcport;\r
+       private int     port;\r
+       private String  spooldir;\r
+       private String  logdir;\r
+       private long    logretention;\r
+       private String  redirfile;\r
+       private String  kstype;\r
+       private String  ksfile;\r
+       private String  kspass;\r
+       private String  kpass;\r
+       private String  tstype;\r
+       private String  tsfile;\r
+       private String  tspass;\r
+       private String  myname;\r
+       private RedirManager    rdmgr;\r
+       private RateLimitedOperation    pfetcher;\r
+       private NodeConfig      config;\r
+       private File    quiesce;\r
+       private PublishId       pid;\r
+       private String  nak;\r
+       private TaskList        configtasks = new TaskList();\r
+       private String  eventlogurl;\r
+       private String  eventlogprefix;\r
+       private String  eventlogsuffix;\r
+       private String  eventloginterval;\r
+       private boolean followredirects;\r
+\r
+       \r
+       /**\r
+        *      Get the default node configuration manager\r
+        */\r
+       public static NodeConfigManager getInstance() {\r
+               return(base);\r
+       }\r
+       /**\r
+        *      Initialize the configuration of a Data Router node\r
+        */\r
+       private NodeConfigManager() {\r
+               Properties p = new Properties();\r
+               try {\r
+                       p.load(new FileInputStream(System.getProperty("com.att.research.datarouter.node.ConfigFile", "/opt/app/datartr/etc/node.properties")));\r
+               } catch (Exception e) {\r
+                       \r
+                       NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");\r
+                       eelflogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR);\r
+                       logger.error("NODE0301 Unable to load local configuration file " + System.getProperty("com.att.research.datarouter.node.ConfigFile", "/opt/app/datartr/etc/node.properties"), e);\r
+               }\r
+               provurl = p.getProperty("ProvisioningURL", "https://feeds-drtr.web.att.com/internal/prov");\r
+               try {\r
+                       provhost = (new URL(provurl)).getHost();\r
+               } catch (Exception e) {\r
+                       NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");\r
+                       eelflogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, provurl);\r
+                       logger.error("NODE0302 Bad provisioning server URL " + provurl);\r
+                       System.exit(1);\r
+               }\r
+               logger.info("NODE0303 Provisioning server is " + provhost);\r
+               eventlogurl = p.getProperty("LogUploadURL", "https://feeds-drtr.web.att.com/internal/logs");\r
+               provcheck = new IsFrom(provhost);\r
+               gfport = Integer.parseInt(p.getProperty("IntHttpPort", "8080"));\r
+               svcport = Integer.parseInt(p.getProperty("IntHttpsPort", "8443"));\r
+               port = Integer.parseInt(p.getProperty("ExtHttpsPort", "443"));\r
+               long minpfinterval = Long.parseLong(p.getProperty("MinProvFetchInterval", "10000"));\r
+               long minrsinterval = Long.parseLong(p.getProperty("MinRedirSaveInterval", "10000"));\r
+               spooldir = p.getProperty("SpoolDir", "spool");\r
+               File fdir = new File(spooldir + "/f");\r
+               fdir.mkdirs();\r
+               for (File junk: fdir.listFiles()) {\r
+                       if (junk.isFile()) {\r
+                               junk.delete();\r
+                       }\r
+               }\r
+               logdir = p.getProperty("LogDir", "logs");\r
+               (new File(logdir)).mkdirs();\r
+               logretention = Long.parseLong(p.getProperty("LogRetention", "30")) * 86400000L;\r
+               eventlogprefix = logdir + "/events";\r
+               eventlogsuffix = ".log";\r
+               String redirfile = p.getProperty("RedirectionFile", "etc/redirections.dat");\r
+               kstype = p.getProperty("KeyStoreType", "jks");\r
+               ksfile = p.getProperty("KeyStoreFile", "etc/keystore");\r
+               kspass = p.getProperty("KeyStorePassword", "changeme");\r
+               kpass = p.getProperty("KeyPassword", "changeme");\r
+               tstype = p.getProperty("TrustStoreType", "jks");\r
+               tsfile = p.getProperty("TrustStoreFile");\r
+               tspass = p.getProperty("TrustStorePassword", "changeme");\r
+               if (tsfile != null && tsfile.length() > 0) {\r
+                       System.setProperty("javax.net.ssl.trustStoreType", tstype);\r
+                       System.setProperty("javax.net.ssl.trustStore", tsfile);\r
+                       System.setProperty("javax.net.ssl.trustStorePassword", tspass);\r
+               }\r
+               nak = p.getProperty("NodeAuthKey", "Node123!");\r
+               quiesce = new File(p.getProperty("QuiesceFile", "etc/SHUTDOWN"));\r
+               myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass);\r
+               if (myname == null) {\r
+                       NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");\r
+                       eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile);\r
+                       logger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile);\r
+                       System.exit(1);\r
+               }\r
+               logger.info("NODE0304 My certificate says my name is " + myname);\r
+               pid = new PublishId(myname);\r
+               rdmgr = new RedirManager(redirfile, minrsinterval, timer);\r
+               pfetcher = new RateLimitedOperation(minpfinterval, timer) {\r
+                       public void run() {\r
+                               fetchconfig();\r
+                       }\r
+               };\r
+               logger.info("NODE0305 Attempting to fetch configuration at " + provurl);\r
+               pfetcher.request();\r
+       }\r
+       private void localconfig() {\r
+               followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));\r
+               eventloginterval = getProvParam("LOGROLL_INTERVAL", "5m");\r
+               initfailuretimer = 10000;\r
+               maxfailuretimer = 3600000;\r
+               expirationtimer = 86400000;\r
+               failurebackoff = 2.0;\r
+               deliverythreads = 40;\r
+               fairfilelimit = 100;\r
+               fairtimelimit = 60000;\r
+               fdpstart = 0.05;\r
+               fdpstop = 0.2;\r
+               try { initfailuretimer = (long)(Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000); } catch (Exception e) {}\r
+               try { maxfailuretimer = (long)(Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000); } catch (Exception e) {}\r
+               try { expirationtimer = (long)(Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000); } catch (Exception e) {}\r
+               try { failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO")); } catch (Exception e) {}\r
+               try { deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS")); } catch (Exception e) {}\r
+               try { fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT")); } catch (Exception e) {}\r
+               try { fairtimelimit = (long)(Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000); } catch (Exception e) {}\r
+               try { fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0; } catch (Exception e) {}\r
+               try { fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0; } catch (Exception e) {}\r
+               if (fdpstart < 0.01) {\r
+                       fdpstart = 0.01;\r
+               }\r
+               if (fdpstart > 0.5) {\r
+                       fdpstart = 0.5;\r
+               }\r
+               if (fdpstop < fdpstart) {\r
+                       fdpstop = fdpstart;\r
+               }\r
+               if (fdpstop > 0.5) {\r
+                       fdpstop = 0.5;\r
+               }\r
+       }\r
+       private void fetchconfig() {\r
+               try {\r
+                       System.out.println("provurl:: "+provurl);\r
+                       Reader r = new InputStreamReader((new URL(provurl)).openStream());\r
+                       config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak);\r
+                       localconfig();\r
+                       configtasks.startRun();\r
+                       Runnable rr;\r
+                       while ((rr = configtasks.next()) != null) {\r
+                               try {\r
+                                       rr.run();\r
+                               } catch (Exception e) {\r
+                               }\r
+                       }\r
+               } catch (Exception e) {\r
+                       e.printStackTrace();\r
+                       NodeUtils.setIpAndFqdnForEelf("fetchconfigs");\r
+                       eelflogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString());\r
+                       logger.error("NODE0306 Configuration failed " + e.toString() + " - try again later", e);\r
+                       pfetcher.request();\r
+               }\r
+       }\r
+       /**\r
+        *      Process a gofetch request from a particular IP address.  If the\r
+        *      IP address is not an IP address we would go to to fetch the\r
+        *      provisioning data, ignore the request.  If the data has been\r
+        *      fetched very recently (default 10 seconds), wait a while before fetching again.\r
+        */\r
+       public synchronized void gofetch(String remoteaddr) {\r
+               if (provcheck.isFrom(remoteaddr)) {\r
+                       logger.info("NODE0307 Received configuration fetch request from provisioning server " + remoteaddr);\r
+                       pfetcher.request();\r
+               } else {\r
+                       logger.info("NODE0308 Received configuration fetch request from unexpected server " + remoteaddr);\r
+               }\r
+       }\r
+       /**\r
+        *      Am I configured?\r
+        */\r
+       public boolean isConfigured() {\r
+               return(config != null);\r
+       }\r
+       /**\r
+        *      Am I shut down?\r
+        */\r
+       public boolean isShutdown() {\r
+               return(quiesce.exists());\r
+       }\r
+       /**\r
+        *      Given a routing string, get the targets.\r
+        *      @param routing  Target string\r
+        *      @return array of targets\r
+        */\r
+       public Target[] parseRouting(String routing) {\r
+               return(config.parseRouting(routing));\r
+       }\r
+       /**\r
+        *      Given a set of credentials and an IP address, is this request from another node?\r
+        *      @param credentials      Credentials offered by the supposed node\r
+        *      @param ip       IP address the request came from\r
+        *      @return If the credentials and IP address are recognized, true, otherwise false.\r
+        */\r
+       public boolean isAnotherNode(String credentials, String ip) {\r
+               return(config.isAnotherNode(credentials, ip));\r
+       }\r
+       /**\r
+        *      Check whether publication is allowed.\r
+        *      @param feedid   The ID of the feed being requested\r
+        *      @param credentials      The offered credentials\r
+        *      @param ip       The requesting IP address\r
+        *      @return True if the IP and credentials are valid for the specified feed.\r
+        */\r
+       public String isPublishPermitted(String feedid, String credentials, String ip) {\r
+               return(config.isPublishPermitted(feedid, credentials, ip));\r
+       }\r
+       /**\r
+        *      Check who the user is given the feed ID and the offered credentials.\r
+        *      @param feedid   The ID of the feed specified\r
+        *      @param credentials      The offered credentials\r
+        *      @return Null if the credentials are invalid or the user if they are valid.\r
+        */\r
+       public String getAuthUser(String feedid, String credentials) {\r
+               return(config.getAuthUser(feedid, credentials));\r
+       }\r
+       /**\r
+        *      Check if the publish request should be sent to another node based on the feedid, user, and source IP address.\r
+        *      @param feedid   The ID of the feed specified\r
+        *      @param user     The publishing user\r
+        *      @param ip       The IP address of the publish endpoint\r
+        *      @return Null if the request should be accepted or the correct hostname if it should be sent to another node.\r
+        */\r
+       public String getIngressNode(String feedid, String user, String ip) {\r
+               return(config.getIngressNode(feedid, user, ip));\r
+       }\r
+       /**\r
+        *      Get a provisioned configuration parameter (from the provisioning server configuration)\r
+        *      @param name     The name of the parameter\r
+        *      @return The value of the parameter or null if it is not defined.\r
+        */\r
+       public String getProvParam(String name) {\r
+               return(config.getProvParam(name));\r
+       }\r
+       /**\r
+        *      Get a provisioned configuration parameter (from the provisioning server configuration)\r
+        *      @param name     The name of the parameter\r
+        *      @param deflt    The value to use if the parameter is not defined\r
+        *      @return The value of the parameter or deflt if it is not defined.\r
+        */\r
+       public String getProvParam(String name, String deflt) {\r
+               name = config.getProvParam(name);\r
+               if (name == null) {\r
+                       name = deflt;\r
+               }\r
+               return(name);\r
+       }\r
+       /**\r
+        *      Generate a publish ID\r
+        */\r
+       public String getPublishId() {\r
+               return(pid.next());\r
+       }\r
+       /**\r
+        *      Get all the outbound spooling destinations.\r
+        *      This will include both subscriptions and nodes.\r
+        */\r
+       public DestInfo[] getAllDests() {\r
+               return(config.getAllDests());\r
+       }\r
+       /**\r
+        *      Register a task to run whenever the configuration changes\r
+        */\r
+       public void registerConfigTask(Runnable task) {\r
+               configtasks.addTask(task);\r
+       }\r
+       /**\r
+        *      Deregister a task to run whenever the configuration changes\r
+        */\r
+       public void deregisterConfigTask(Runnable task) {\r
+               configtasks.removeTask(task);\r
+       }\r
+       /**\r
+        *      Get the URL to deliver a message to.\r
+        *      @param destinfo The destination information\r
+        *      @param fileid   The file ID\r
+        *      @return The URL to deliver to\r
+        */\r
+       public String getDestURL(DestInfo destinfo, String fileid) {\r
+               String subid = destinfo.getSubId();\r
+               String purl = destinfo.getURL();\r
+               if (followredirects && subid != null) {\r
+                       purl = rdmgr.lookup(subid, purl);\r
+               }\r
+               return(purl + "/" + fileid);\r
+       }\r
+       /**\r
+        *      Is a destination redirected?\r
+        */\r
+       public boolean isDestRedirected(DestInfo destinfo) {\r
+               return(followredirects && rdmgr.isRedirected(destinfo.getSubId()));\r
+       }\r
+       /**\r
+        *      Set up redirection on receipt of a 3XX from a target URL\r
+        */\r
+       public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) {\r
+               fileid = "/" + fileid;\r
+               String subid = destinfo.getSubId();\r
+               String purl = destinfo.getURL();\r
+               if (followredirects && subid != null && redirto.endsWith(fileid)) {\r
+                       redirto = redirto.substring(0, redirto.length() - fileid.length());\r
+                       if (!redirto.equals(purl)) {\r
+                               rdmgr.redirect(subid, purl, redirto);\r
+                               return(true);\r
+                       }\r
+               }\r
+               return(false);\r
+       }\r
+       /**\r
+        *      Handle unreachable target URL\r
+        */\r
+       public void handleUnreachable(DestInfo destinfo) {\r
+               String subid = destinfo.getSubId();\r
+               if (followredirects && subid != null) {\r
+                       rdmgr.forget(subid);\r
+               }\r
+       }\r
+       /**\r
+        *      Get the timeout before retrying after an initial delivery failure\r
+        */\r
+       public long getInitFailureTimer() {\r
+               return(initfailuretimer);\r
+       }\r
+       /**\r
+        *      Get the maximum timeout between delivery attempts\r
+        */\r
+       public long getMaxFailureTimer() {\r
+               return(maxfailuretimer);\r
+       }\r
+       /**\r
+        *      Get the ratio between consecutive delivery attempts\r
+        */\r
+       public double getFailureBackoff() {\r
+               return(failurebackoff);\r
+       }\r
+       /**\r
+        *      Get the expiration timer for deliveries\r
+        */\r
+       public long getExpirationTimer() {\r
+               return(expirationtimer);\r
+       }\r
+       /**\r
+        *      Get the maximum number of file delivery attempts before checking\r
+        *      if another queue has work to be performed.\r
+        */\r
+       public int getFairFileLimit() {\r
+               return(fairfilelimit);\r
+       }\r
+       /**\r
+        *      Get the maximum amount of time spent delivering files before\r
+        *      checking if another queue has work to be performed.\r
+        */\r
+       public long getFairTimeLimit() {\r
+               return(fairtimelimit);\r
+       }\r
+       /**\r
+        *      Get the targets for a feed\r
+        *      @param feedid   The feed ID\r
+        *      @return The targets this feed should be delivered to\r
+        */\r
+       public Target[] getTargets(String feedid) {\r
+               return(config.getTargets(feedid));\r
+       }\r
+       /**\r
+        *      Get the spool directory for temporary files\r
+        */\r
+       public String getSpoolDir() {\r
+               return(spooldir + "/f");\r
+       }\r
+       /**\r
+        *      Get the base directory for spool directories\r
+        */\r
+       public String getSpoolBase() {\r
+               return(spooldir);\r
+       }\r
+       /**\r
+        *      Get the key store type\r
+        */\r
+       public String getKSType() {\r
+               return(kstype);\r
+       }\r
+       /**\r
+        *      Get the key store file\r
+        */\r
+       public String getKSFile() {\r
+               return(ksfile);\r
+       }\r
+       /**\r
+        *      Get the key store password\r
+        */\r
+       public String getKSPass() {\r
+               return(kspass);\r
+       }\r
+       /**\r
+        *      Get the key password\r
+        */\r
+       public String getKPass() {\r
+               return(kpass);\r
+       }\r
+       /**\r
+        *      Get the http port\r
+        */\r
+       public int getHttpPort() {\r
+               return(gfport);\r
+       }\r
+       /**\r
+        *      Get the https port\r
+        */\r
+       public int getHttpsPort() {\r
+               return(svcport);\r
+       }\r
+       /**\r
+        *      Get the externally visible https port\r
+        */\r
+       public int getExtHttpsPort() {\r
+               return(port);\r
+       }\r
+       /**\r
+        *      Get the external name of this machine\r
+        */\r
+       public String getMyName() {\r
+               return(myname);\r
+       }\r
+       /**\r
+        *      Get the number of threads to use for delivery\r
+        */\r
+       public int      getDeliveryThreads() {\r
+               return(deliverythreads);\r
+       }\r
+       /**\r
+        *      Get the URL for uploading the event log data\r
+        */\r
+       public String   getEventLogUrl() {\r
+               return(eventlogurl);\r
+       }\r
+       /**\r
+        *      Get the prefix for the names of event log files\r
+        */\r
+       public String   getEventLogPrefix() {\r
+               return(eventlogprefix);\r
+       }\r
+       /**\r
+        *      Get the suffix for the names of the event log files\r
+        */\r
+       public String   getEventLogSuffix() {\r
+               return(eventlogsuffix);\r
+       }\r
+       /**\r
+        *      Get the interval between event log file rollovers\r
+        */\r
+       public String getEventLogInterval() {\r
+               return(eventloginterval);\r
+       }\r
+       /**\r
+        *      Should I follow redirects from subscribers?\r
+        */\r
+       public boolean isFollowRedirects() {\r
+               return(followredirects);\r
+       }\r
+       /**\r
+        *      Get the directory where the event and node log files live\r
+        */\r
+       public String getLogDir() {\r
+               return(logdir);\r
+       }\r
+       /**\r
+        *      How long do I keep log files (in milliseconds)\r
+        */\r
+       public long getLogRetention() {\r
+               return(logretention);\r
+       }\r
+       /**\r
+        *      Get the timer\r
+        */\r
+       public Timer getTimer() {\r
+               return(timer);\r
+       }\r
+       /**\r
+        *      Get the feed ID for a subscription\r
+        *      @param subid    The subscription ID\r
+        *      @return The feed ID\r
+        */\r
+       public String getFeedId(String subid) {\r
+               return(config.getFeedId(subid));\r
+       }\r
+       /**\r
+        *      Get the authorization string this node uses\r
+        *      @return The Authorization string for this node\r
+        */\r
+       public String getMyAuth() {\r
+               return(config.getMyAuth());\r
+       }\r
+       /**\r
+        *      Get the fraction of free spool disk space where we start throwing away undelivered files.  This is FREE_DISK_RED_PERCENT / 100.0.  Default is 0.05.  Limited by 0.01 <= FreeDiskStart <= 0.5.\r
+        */\r
+       public double getFreeDiskStart() {\r
+               return(fdpstart);\r
+       }\r
+       /**\r
+        *      Get the fraction of free spool disk space where we stop throwing away undelivered files.  This is FREE_DISK_YELLOW_PERCENT / 100.0.  Default is 0.2.  Limited by FreeDiskStart <= FreeDiskStop <= 0.5.\r
+        */\r
+       public double getFreeDiskStop() {\r
+               return(fdpstop);\r
+       }\r
+       /**\r
+        *      Get the spool directory for a subscription\r
+        */\r
+       public String getSpoolDir(String subid, String remoteaddr) {\r
+               if (provcheck.isFrom(remoteaddr)) {\r
+                       String sdir = config.getSpoolDir(subid);\r
+                       if (sdir != null) {\r
+                               logger.info("NODE0310 Received subscription reset request for subscription " + subid + " from provisioning server " + remoteaddr);\r
+                       } else {\r
+                               logger.info("NODE0311 Received subscription reset request for unknown subscription " + subid + " from provisioning server " + remoteaddr);\r
+                       }\r
+                       return(sdir);\r
+               } else {\r
+                       logger.info("NODE0312 Received subscription reset request from unexpected server " + remoteaddr);\r
+                       return(null);\r
+               }\r
+       }\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java
new file mode 100644 (file)
index 0000000..c939041
--- /dev/null
@@ -0,0 +1,113 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+import org.eclipse.jetty.servlet.*;\r
+import org.eclipse.jetty.util.ssl.*;\r
+import org.eclipse.jetty.server.*;\r
+import org.eclipse.jetty.server.nio.*;\r
+import org.eclipse.jetty.server.ssl.*;\r
+import org.apache.log4j.Logger;\r
+\r
+/**\r
+ *     The main starting point for the Data Router node\r
+ */\r
+public class NodeMain  {\r
+       private NodeMain() {}\r
+       private static Logger   logger = Logger.getLogger("com.att.research.datarouter.node.NodeMain");\r
+       private static class wfconfig implements Runnable       {\r
+               private NodeConfigManager ncm;\r
+               public wfconfig(NodeConfigManager ncm) {\r
+                       this.ncm = ncm;\r
+               }\r
+               public synchronized void run() {\r
+                       notify();\r
+               }\r
+               public synchronized void waitforconfig() {\r
+                       ncm.registerConfigTask(this);\r
+                       while (!ncm.isConfigured()) {\r
+                               logger.info("NODE0003 Waiting for Node Configuration");\r
+                               try {\r
+                                       wait();\r
+                               } catch (Exception e) {\r
+                               }\r
+                       }\r
+                       ncm.deregisterConfigTask(this);\r
+                       logger.info("NODE0004 Node Configuration Data Received");\r
+               }\r
+       }\r
+       private static Delivery d;\r
+       private static NodeConfigManager ncm;\r
+       /**\r
+        *      Reset the retry timer for a subscription\r
+        */\r
+       public static void resetQueue(String subid, String ip) {\r
+               d.resetQueue(ncm.getSpoolDir(subid, ip));\r
+       }\r
+       /**\r
+        *      Start the data router.\r
+        *      <p>\r
+        *      The location of the node configuration file can be set using the\r
+        *      com.att.research.datarouter.node.ConfigFile system property.  By\r
+        *      default, it is "etc/node.properties".\r
+        */\r
+       public static void main(String[] args) throws Exception {\r
+               logger.info("NODE0001 Data Router Node Starting");\r
+               IsFrom.setDNSCache();\r
+               ncm = NodeConfigManager.getInstance();\r
+               logger.info("NODE0002 I am " + ncm.getMyName());\r
+               (new wfconfig(ncm)).waitforconfig();\r
+               d = new Delivery(ncm);\r
+               LogManager lm = new LogManager(ncm);\r
+               Server server = new Server();\r
+               SelectChannelConnector http = new SelectChannelConnector();\r
+               http.setPort(ncm.getHttpPort());\r
+               http.setMaxIdleTime(2000);\r
+               http.setRequestHeaderSize(2048);\r
+               SslSelectChannelConnector https = new SslSelectChannelConnector();\r
+               https.setPort(ncm.getHttpsPort());\r
+               https.setMaxIdleTime(30000);\r
+               https.setRequestHeaderSize(8192);\r
+               SslContextFactory cf = https.getSslContextFactory();\r
+               \r
+               /**Skip SSLv3 Fixes*/\r
+               cf.addExcludeProtocols("SSLv3");\r
+               logger.info("Excluded protocols node-"+cf.getExcludeProtocols());\r
+               /**End of SSLv3 Fixes*/\r
+\r
+               cf.setKeyStoreType(ncm.getKSType());\r
+               cf.setKeyStorePath(ncm.getKSFile());\r
+               cf.setKeyStorePassword(ncm.getKSPass());\r
+               cf.setKeyManagerPassword(ncm.getKPass());\r
+               server.setConnectors(new Connector[] { http, https });\r
+               ServletContextHandler ctxt = new ServletContextHandler(0);\r
+               ctxt.setContextPath("/");\r
+               server.setHandler(ctxt);\r
+               ctxt.addServlet(new ServletHolder(new NodeServlet()), "/*");\r
+               logger.info("NODE0005 Data Router Node Activating Service");\r
+               server.start();\r
+               server.join();\r
+       }\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java
new file mode 100644 (file)
index 0000000..e0ec1f5
--- /dev/null
@@ -0,0 +1,380 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+import javax.servlet.*;\r
+import javax.servlet.http.*;\r
+import java.util.*;\r
+import java.util.regex.*;\r
+import java.io.*;\r
+import java.nio.file.*;\r
+import org.apache.log4j.Logger;\r
+\r
+import com.att.eelf.configuration.EELFLogger;\r
+import com.att.eelf.configuration.EELFManager;\r
+import com.att.research.datarouter.node.eelf.EelfMsgs;\r
+\r
+import java.net.*;\r
+\r
+/**\r
+ *     Servlet for handling all http and https requests to the data router node\r
+ *     <p>\r
+ *     Handled requests are:\r
+ *     <br>\r
+ *     GET http://<i>node</i>/internal/fetchProv - fetch the provisioning data\r
+ *     <br>\r
+ *     PUT/DELETE https://<i>node</i>/internal/publish/<i>fileid</i> - n2n transfer\r
+ *     <br>\r
+ *     PUT/DELETE https://<i>node</i>/publish/<i>feedid</i>/<i>fileid</i> - publsh request\r
+ */\r
+public class NodeServlet extends HttpServlet   {\r
+       private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeServlet");\r
+       private static NodeConfigManager        config;\r
+       private static Pattern  MetaDataPattern;\r
+       private static SubnetMatcher internalsubnet = new SubnetMatcher("135.207.136.128/25");\r
+       //Adding EELF Logger Rally:US664892  \r
+    private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeServlet");\r
+\r
+       static {\r
+               try {\r
+                       String ws = "\\s*";\r
+                       // assume that \\ and \" have been replaced by X\r
+                       String string = "\"[^\"]*\"";\r
+                       //String string = "\"(?:[^\"\\\\]|\\\\.)*\"";\r
+                       String number = "[+-]?(?:\\.\\d+|(?:0|[1-9]\\d*)(?:\\.\\d*)?)(?:[eE][+-]?\\d+)?";\r
+                       String value = "(?:" + string + "|" + number + "|null|true|false)";\r
+                       String item = string + ws + ":" + ws + value + ws;\r
+                       String object = ws + "\\{" + ws + "(?:" + item + "(?:" + "," + ws + item + ")*)?\\}" + ws;\r
+                       MetaDataPattern = Pattern.compile(object, Pattern.DOTALL);\r
+               } catch (Exception e) {\r
+               }\r
+       }\r
+       /**\r
+        *      Get the NodeConfigurationManager\r
+        */\r
+       public void init() {\r
+               config = NodeConfigManager.getInstance();\r
+               logger.info("NODE0101 Node Servlet Configured");\r
+       }\r
+       private boolean down(HttpServletResponse resp) throws IOException {\r
+               if (config.isShutdown() || !config.isConfigured()) {\r
+                       resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);\r
+                       logger.info("NODE0102 Rejecting request: Service is being quiesced");\r
+                       return(true);\r
+               }\r
+               return(false);\r
+       }\r
+       /**\r
+        *      Handle a GET for /internal/fetchProv\r
+        */\r
+       protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {\r
+               NodeUtils.setIpAndFqdnForEelf("doGet");\r
+               eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+"");\r
+               if (down(resp)) {\r
+                       return;\r
+               }\r
+               String path = req.getPathInfo();\r
+               String qs = req.getQueryString();\r
+               String ip = req.getRemoteAddr();\r
+               if (qs != null) {\r
+                       path = path + "?" + qs;\r
+               }\r
+               if ("/internal/fetchProv".equals(path)) {\r
+                       config.gofetch(ip);\r
+                       resp.setStatus(HttpServletResponse.SC_NO_CONTENT);\r
+                       return;\r
+               } else if (path.startsWith("/internal/resetSubscription/")) {\r
+                       String subid = path.substring(28);\r
+                       if (subid.length() != 0 && subid.indexOf('/') == -1) {\r
+                               NodeMain.resetQueue(subid, ip);\r
+                               resp.setStatus(HttpServletResponse.SC_NO_CONTENT);\r
+                               return;\r
+                       }\r
+               }\r
+               if (internalsubnet.matches(NodeUtils.getInetAddress(ip))) {\r
+                       if (path.startsWith("/internal/logs/")) {\r
+                               String f = path.substring(15);\r
+                               File fn = new File(config.getLogDir() + "/" + f);\r
+                               if (f.indexOf('/') != -1 || !fn.isFile()) {\r
+                                       logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip);\r
+                                       resp.sendError(HttpServletResponse.SC_NOT_FOUND);\r
+                                       return;\r
+                               }\r
+                               byte[] buf = new byte[65536];\r
+                               resp.setContentType("text/plain");\r
+                               resp.setContentLength((int)fn.length());\r
+                               resp.setStatus(200);\r
+                               InputStream is = new FileInputStream(fn);\r
+                               OutputStream os = resp.getOutputStream();\r
+                               int i;\r
+                               while ((i = is.read(buf)) > 0) {\r
+                                       os.write(buf, 0, i);\r
+                               }\r
+                               is.close();\r
+                               return;\r
+                       }\r
+                       if (path.startsWith("/internal/rtt/")) {\r
+                               String xip = path.substring(14);\r
+                               long st = System.currentTimeMillis();\r
+                               String status = " unknown";\r
+                               try {\r
+                                       Socket s = new Socket(xip, 443);\r
+                                       s.close();\r
+                                       status = " connected";\r
+                               } catch (Exception e) {\r
+                                       status = " error " + e.toString();\r
+                               }\r
+                               long dur = System.currentTimeMillis() - st;\r
+                               resp.setContentType("text/plain");\r
+                               resp.setStatus(200);\r
+                               byte[] buf = (dur + status + "\n").getBytes();\r
+                               resp.setContentLength(buf.length);\r
+                               resp.getOutputStream().write(buf);\r
+                               return;\r
+                       }\r
+               }\r
+               logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip);\r
+               resp.sendError(HttpServletResponse.SC_NOT_FOUND);\r
+               return;\r
+       }\r
+       /**\r
+        *      Handle all PUT requests\r
+        */\r
+       protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {\r
+               NodeUtils.setIpAndFqdnForEelf("doPut");\r
+               eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+"");\r
+               common(req, resp, true);\r
+       }\r
+       /**\r
+        *      Handle all DELETE requests\r
+        */\r
+       protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {\r
+               NodeUtils.setIpAndFqdnForEelf("doDelete");\r
+               eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+"");\r
+               common(req, resp, false);\r
+       }\r
+       private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws ServletException, IOException {\r
+               if (down(resp)) {\r
+                       return;\r
+               }\r
+               if (!req.isSecure()) {\r
+                       logger.info("NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+                       resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");\r
+                       return;\r
+               }\r
+               String fileid = req.getPathInfo();\r
+               if (fileid == null) {\r
+                       logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+                       resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");\r
+                       return;\r
+               }\r
+               String feedid = null;\r
+               String user = null;\r
+               String credentials = req.getHeader("Authorization");\r
+               if (credentials == null) {\r
+                       logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+                       resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Authorization header required");\r
+                       return;\r
+               }\r
+               String ip = req.getRemoteAddr();\r
+               String lip = req.getLocalAddr();\r
+               String pubid = null;\r
+               String xpubid = null;\r
+               String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;\r
+               Target[]        targets = null;\r
+               if (fileid.startsWith("/publish/")) {\r
+                       fileid = fileid.substring(9);\r
+                       int i = fileid.indexOf('/');\r
+                       if (i == -1 || i == fileid.length() - 1) {\r
+                               logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+                               resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.  Possible missing fileid.");\r
+                               return;\r
+                       }\r
+                       feedid = fileid.substring(0, i);\r
+                       fileid = fileid.substring(i + 1);\r
+                       pubid = config.getPublishId();\r
+                       xpubid = req.getHeader("X-ATT-DR-PUBLISH-ID");\r
+                       targets = config.getTargets(feedid);\r
+               } else if (fileid.startsWith("/internal/publish/")) {\r
+                       if (!config.isAnotherNode(credentials, ip)) {\r
+                               logger.info("NODE0107 Rejecting unauthorized node-to-node transfer attempt from " + ip);\r
+                               resp.sendError(HttpServletResponse.SC_FORBIDDEN);\r
+                               return;\r
+                       }\r
+                       fileid = fileid.substring(18);\r
+                       pubid = req.getHeader("X-ATT-DR-PUBLISH-ID");\r
+                       targets = config.parseRouting(req.getHeader("X-ATT-DR-ROUTING"));\r
+               } else {\r
+                       logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+                       resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");\r
+                       return;\r
+               }\r
+               if (fileid.indexOf('/') != -1) {\r
+                       logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());\r
+                       resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");\r
+                       return;\r
+               }\r
+               String qs = req.getQueryString();\r
+               if (qs != null) {\r
+                       fileid = fileid + "?" + qs;\r
+               }\r
+               String hp = config.getMyName();\r
+               int xp = config.getExtHttpsPort();\r
+               if (xp != 443) {\r
+                       hp = hp + ":" + xp;\r
+               }\r
+               String logurl = "https://" + hp + "/internal/publish/" + fileid;\r
+               if (feedid != null) {\r
+                       logurl = "https://" + hp + "/publish/" + feedid + "/" + fileid;\r
+                       String reason = config.isPublishPermitted(feedid, credentials, ip);\r
+                       if (reason != null) {\r
+                               logger.info("NODE0111 Rejecting unauthorized publish attempt to feed " + feedid + " fileid " + fileid + " from " + ip + " reason " + reason);\r
+                               resp.sendError(HttpServletResponse.SC_FORBIDDEN,reason);\r
+                               return;\r
+                       }\r
+                       user = config.getAuthUser(feedid, credentials);\r
+                       String newnode = config.getIngressNode(feedid, user, ip);\r
+                       if (newnode != null) {\r
+                               String port = "";\r
+                               int iport = config.getExtHttpsPort();\r
+                               if (iport != 443) {\r
+                                       port = ":" + iport;\r
+                               }\r
+                               String redirto = "https://" + newnode + port + "/publish/" + feedid + "/" + fileid;\r
+                               logger.info("NODE0108 Redirecting publish attempt for feed " + feedid + " user " + user + " ip " + ip + " to " + redirto);\r
+                               resp.sendRedirect(redirto);\r
+                               return;\r
+                       }\r
+                       resp.setHeader("X-ATT-DR-PUBLISH-ID", pubid);\r
+               }\r
+               String fbase = config.getSpoolDir() + "/" + pubid;\r
+               File data = new File(fbase);\r
+               File meta = new File(fbase + ".M");\r
+               OutputStream dos = null;\r
+               Writer mw = null;\r
+               InputStream is = null;\r
+               try {\r
+                       StringBuffer mx = new StringBuffer();\r
+                       mx.append(req.getMethod()).append('\t').append(fileid).append('\n');\r
+                       Enumeration hnames = req.getHeaderNames();\r
+                       String ctype = null;\r
+                       while (hnames.hasMoreElements()) {\r
+                               String hn = (String)hnames.nextElement();\r
+                               String hnlc = hn.toLowerCase();\r
+                               if ((isput && ("content-type".equals(hnlc) ||\r
+                                   "content-language".equals(hnlc) ||\r
+                                   "content-md5".equals(hnlc) ||\r
+                                   "content-range".equals(hnlc))) ||\r
+                                   "x-att-dr-meta".equals(hnlc) ||\r
+                                   (feedid == null && "x-att-dr-received".equals(hnlc)) ||\r
+                                   (hnlc.startsWith("x-") && !hnlc.startsWith("x-att-dr-"))) {\r
+                                       Enumeration hvals = req.getHeaders(hn);\r
+                                       while (hvals.hasMoreElements()) {\r
+                                               String hv = (String)hvals.nextElement();\r
+                                               if ("content-type".equals(hnlc)) {\r
+                                                       ctype = hv;\r
+                                               }\r
+                                               if ("x-att-dr-meta".equals(hnlc)) {\r
+                                                       if (hv.length() > 4096) {\r
+                                                               logger.info("NODE0109 Rejecting publish attempt with metadata too long for feed " + feedid + " user " + user + " ip " + ip);\r
+                                                               resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Metadata too long");\r
+                                                               return;\r
+                                                       }\r
+                                                       if (!MetaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) {\r
+                                                               logger.info("NODE0109 Rejecting publish attempt with malformed metadata for feed " + feedid + " user " + user + " ip " + ip);\r
+                                                               resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Malformed metadata");\r
+                                                               return;\r
+                                                       }\r
+                                               }\r
+                                               mx.append(hn).append('\t').append(hv).append('\n');\r
+                                       }\r
+                               }\r
+                       }\r
+                       mx.append("X-ATT-DR-RECEIVED\t").append(rcvd).append('\n');\r
+                       String metadata = mx.toString();\r
+                       byte[] buf = new byte[1024 * 1024];\r
+                       int i;\r
+                       try {\r
+                               is = req.getInputStream();\r
+                               dos = new FileOutputStream(data);\r
+                               while ((i = is.read(buf)) > 0) {\r
+                                       dos.write(buf, 0, i);\r
+                               }\r
+                               is.close();\r
+                               is = null;\r
+                               dos.close();\r
+                               dos = null;\r
+                       } catch (IOException ioe) {\r
+                               long exlen = -1;\r
+                               try {\r
+                                       exlen = Long.parseLong(req.getHeader("Content-Length"));\r
+                               } catch (Exception e) {\r
+                               }\r
+                               StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage());\r
+                               throw ioe;\r
+                       }\r
+                       Path dpath = Paths.get(fbase);\r
+                       for (Target t: targets) {\r
+                               DestInfo di = t.getDestInfo();\r
+                               if (di == null) {\r
+                                       // TODO: unknown destination\r
+                                       continue;\r
+                               }\r
+                               String dbase = di.getSpool() + "/" + pubid;\r
+                               Files.createLink(Paths.get(dbase), dpath);\r
+                               mw = new FileWriter(meta);\r
+                               mw.write(metadata);\r
+                               if (di.getSubId() == null) {\r
+                                       mw.write("X-ATT-DR-ROUTING\t" + t.getRouting() + "\n");\r
+                               }\r
+                               mw.close();\r
+                               meta.renameTo(new File(dbase + ".M"));\r
+                       }\r
+                       resp.setStatus(HttpServletResponse.SC_NO_CONTENT);\r
+                       resp.getOutputStream().close();\r
+                       StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user, HttpServletResponse.SC_NO_CONTENT);\r
+               } catch (IOException ioe) {\r
+                       logger.info("NODE0110 IO Exception receiving publish attempt for feed " + feedid + " user " + user + " ip " + ip + " " + ioe.toString(), ioe);\r
+                       throw ioe;\r
+               } finally {\r
+                       if (is != null) { try { is.close(); } catch (Exception e) {}}\r
+                       if (dos != null) { try { dos.close(); } catch (Exception e) {}}\r
+                       if (mw != null) { try { mw.close(); } catch (Exception e) {}}\r
+                       try { data.delete(); } catch (Exception e) {}\r
+                       try { meta.delete(); } catch (Exception e) {}\r
+               }\r
+       }\r
+       \r
+       private int getIdFromPath(HttpServletRequest req) {\r
+               String path = req.getPathInfo();\r
+               if (path == null || path.length() < 2)\r
+                       return -1;\r
+               try {\r
+                       return Integer.parseInt(path.substring(1));\r
+               } catch (NumberFormatException e) {\r
+                       return -1;\r
+               }\r
+       }\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java
new file mode 100644 (file)
index 0000000..5471c0d
--- /dev/null
@@ -0,0 +1,226 @@
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package com.att.research.datarouter.node;\r
+\r
+import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN;\r
+import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS;\r
+import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME;\r
+\r
+import java.security.*;\r
+import java.io.*;\r
+import java.util.*;\r
+import java.security.cert.*;\r
+import java.net.*;\r
+import java.text.*;\r
+import org.apache.commons.codec.binary.Base64;\r
+import org.apache.log4j.Logger;\r
+import org.slf4j.MDC;\r
+\r
+import com.att.eelf.configuration.EELFLogger;\r
+import com.att.eelf.configuration.EELFManager;\r
+import com.att.research.datarouter.node.eelf.EelfMsgs;\r
+\r
+/**\r
+ *     Utility functions for the data router node\r
+ */\r
+public class NodeUtils {\r
+    private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeUtils");\r
+       private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeUtils");\r
+       private static SimpleDateFormat logdate;\r
+       static {\r
+               logdate = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");\r
+               logdate.setTimeZone(TimeZone.getTimeZone("GMT"));\r
+       }\r
+       private NodeUtils() {}\r
+       /**\r
+        *      Base64 encode a byte array\r
+        *      @param raw      The bytes to be encoded\r
+        *      @return The encoded string\r
+        */\r
+       public static String base64Encode(byte[] raw) {\r
+               return(Base64.encodeBase64String(raw));\r
+       }\r
+       /**\r
+        *      Given a user and password, generate the credentials\r
+        *      @param user     User name\r
+        *      @param password User password\r
+        *      @return Authorization header value\r
+        */\r
+       public static String getAuthHdr(String user, String password) {\r
+               if (user == null || password == null) {\r
+                       return(null);\r
+               }\r
+               return("Basic " + base64Encode((user + ":" + password).getBytes()));\r
+       }\r
+       /**\r
+        *      Given a node name, generate the credentials\r
+        *      @param node     Node name\r
+        */\r
+       public static String    getNodeAuthHdr(String node, String key) {\r
+               try {\r
+                       MessageDigest md = MessageDigest.getInstance("SHA");\r
+                       md.update(key.getBytes());\r
+                       md.update(node.getBytes());\r
+                       md.update(key.getBytes());\r
+                       return(getAuthHdr(node, base64Encode(md.digest())));\r
+               } catch (Exception e) {\r
+                       return(null);\r
+               }\r
+       }\r
+       /**\r
+        *      Given a keystore file and its password, return the value of the CN of the first private key entry with a certificate.\r
+        *      @param kstype   The type of keystore\r
+        *      @param ksfile   The file name of the keystore\r
+        *      @param kspass   The password of the keystore\r
+        *      @return CN of the certificate subject or null\r
+        */\r
+       public static String getCanonicalName(String kstype, String ksfile, String kspass) {\r
+               try {\r
+                       KeyStore ks = KeyStore.getInstance(kstype);\r
+                       ks.load(new FileInputStream(ksfile), kspass.toCharArray());\r
+                       return(getCanonicalName(ks));\r
+               } catch (Exception e) {\r
+                       setIpAndFqdnForEelf("getCanonicalName");\r
+                       eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_LOAD_ERROR, ksfile, e.toString());\r
+                       logger.error("NODE0401 Error loading my keystore file + " + ksfile + " " + e.toString(), e);\r
+                       return(null);\r
+               }\r
+       }\r
+       /**\r
+        *      Given a keystore, return the value of the CN of the first private key entry with a certificate.\r
+        *      @param ks       The KeyStore\r
+        *      @return CN of the certificate subject or null\r
+        */\r
+       public static String getCanonicalName(KeyStore ks) {\r
+               try {\r
+                       Enumeration<String> aliases = ks.aliases();\r
+                       while (aliases.hasMoreElements()) {\r
+                               String s = aliases.nextElement();\r
+                               if (ks.entryInstanceOf(s, KeyStore.PrivateKeyEntry.class)) {\r
+                                       X509Certificate c = (X509Certificate)ks.getCertificate(s);\r
+                                       if (c != null) {\r
+                                               String subject = c.getSubjectX500Principal().getName();\r
+                                               String[] parts = subject.split(",");\r
+                                               if (parts.length < 1) {\r
+                                                       return(null);\r
+                                               }\r
+                                               subject = parts[0].trim();\r
+                                               if (!subject.startsWith("CN=")) {\r
+                                                       return(null);\r
+\r
+                                               }\r
+                                               return(subject.substring(3));\r
+                                       }\r
+                               }\r
+                       }\r
+               } catch (Exception e) {\r
+                       logger.error("NODE0402 Error extracting my name from my keystore file " + e.toString(), e);\r
+               }\r
+               return(null);\r
+       }\r
+       /**\r
+        *      Given a string representation of an IP address, get the corresponding byte array\r
+        *      @param ip       The IP address as a string\r
+        *      @return The IP address as a byte array or null if the address is invalid\r
+        */\r
+       public static byte[] getInetAddress(String ip) {\r
+               try {\r
+                       return(InetAddress.getByName(ip).getAddress());\r
+               } catch (Exception e) {\r
+               }\r
+               return(null);\r
+       }\r
+       /**\r
+        *      Given a uri with parameters, split out the feed ID and file ID\r
+        */\r
+       public static String[] getFeedAndFileID(String uriandparams) {\r
+               int end = uriandparams.length();\r
+               int i = uriandparams.indexOf('#');\r
+               if (i != -1 && i < end) {\r
+                       end = i;\r
+               }\r
+               i = uriandparams.indexOf('?');\r
+               if (i != -1 && i < end) {\r
+                       end = i;\r
+               }\r
+               end = uriandparams.lastIndexOf('/', end);\r
+               if (end < 2) {\r
+                       return(null);\r
+               }\r
+               i = uriandparams.lastIndexOf('/', end - 1);\r
+               if (i == -1) {\r
+                       return(null);\r
+               }\r
+               return(new String[] { uriandparams.substring(i + 1, end - 1), uriandparams.substring(end + 1) });\r
+       }\r
+       /**\r
+        *      Escape fields that might contain vertical bar, backslash, or newline by replacing them with backslash p, backslash e and backslash n.\r
+        */\r
+       public static String loge(String s) {\r
+               if (s == null) {\r
+                       return(s);\r
+               }\r
+               return(s.replaceAll("\\\\", "\\\\e").replaceAll("\\|", "\\\\p").replaceAll("\n", "\\\\n"));\r
+       }\r
+       /**\r
+        *      Undo what loge does.\r
+        */\r
+       public static String unloge(String s) {\r
+               if (s == null) {\r
+                       return(s);\r
+               }\r
+               return(s.replaceAll("\\\\p", "\\|").replaceAll("\\\\n", "\n").replaceAll("\\\\e", "\\\\"));\r
+       }\r
+       /**\r
+        *      Format a logging timestamp as yyyy-mm-ddThh:mm:ss.mmmZ\r
+        */\r
+       public static String logts(long when) {\r
+               return(logts(new Date(when)));\r
+       }\r
+       /**\r
+        *      Format a logging timestamp as yyyy-mm-ddThh:mm:ss.mmmZ\r
+        */\r
+       public static synchronized String logts(Date when) {\r
+               return(logdate.format(when));\r
+       }\r
+       \r
+       /* Method prints method name, server FQDN and IP Address of the machine in EELF logs\r
+        * @Method - setIpAndFqdnForEelf - Rally:US664892  \r
+        * @Params - method, prints method name in EELF log.\r
+        */     \r
+       public static void setIpAndFqdnForEelf(String method) {\r
+               MDC.clear();\r
+        MDC.put(MDC_SERVICE_NAME, method);\r
+        try {\r
+            MDC.put(MDC_SERVER_FQDN, InetAddress.getLocalHost().getHostName());\r
+            MDC.put(MDC_SERVER_IP_ADDRESS, InetAddress.getLocalHost().getHostAddress());\r
+        } catch (Exception e) {\r
+            e.printStackTrace();\r
+        }\r
+\r
+       }\r
+       \r
+\r
+}\r
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java
new file mode 100644 (file)
index 0000000..7ff9183
--- /dev/null
@@ -0,0