From aaf2df8b908fcb48043d2cd51803d8fd99f18b43 Mon Sep 17 00:00:00 2001 From: sg481n Date: Thu, 3 Aug 2017 17:56:38 -0400 Subject: [PATCH] =?utf8?q?=C2=A0[DMAAP-48]=20Initial=20code=20import?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Change-Id: I3e65371093487d7de167ec6c29f327f366f1e299 Signed-off-by: sg481n --- Contributing.txt | 35 + Jenkinsfile | 24 + LICENSE | 22 + README.md | 149 ++ Subscriber/src/SSASubscriber.java | 115 ++ Subscriber/src/SubscriberServlet.java | 149 ++ Subscriber/src/log4j.properties | 9 + datarouter-node/pom.xml | 472 ++++++ datarouter-node/self_signed/cacerts.jks | Bin 0 -> 1936 bytes datarouter-node/self_signed/keystore.jks | Bin 0 -> 2273 bytes datarouter-node/self_signed/mykey.cer | Bin 0 -> 921 bytes datarouter-node/self_signed/nodekey.cer | Bin 0 -> 921 bytes .../com/att/research/datarouter/node/Delivery.java | 253 +++ .../research/datarouter/node/DeliveryQueue.java | 348 +++++ .../datarouter/node/DeliveryQueueHelper.java | 89 ++ .../att/research/datarouter/node/DeliveryTask.java | 308 ++++ .../datarouter/node/DeliveryTaskHelper.java | 72 + .../com/att/research/datarouter/node/DestInfo.java | 132 ++ .../com/att/research/datarouter/node/IsFrom.java | 82 + .../att/research/datarouter/node/LogManager.java | 159 ++ .../att/research/datarouter/node/NodeConfig.java | 722 +++++++++ .../datarouter/node/NodeConfigManager.java | 599 +++++++ .../com/att/research/datarouter/node/NodeMain.java | 113 ++ .../att/research/datarouter/node/NodeServlet.java | 380 +++++ .../att/research/datarouter/node/NodeUtils.java | 226 +++ .../att/research/datarouter/node/PathFinder.java | 132 ++ .../com/att/research/datarouter/node/ProvData.java | 302 ++++ .../att/research/datarouter/node/PublishId.java | 52 + .../datarouter/node/RateLimitedOperation.java | 102 ++ .../att/research/datarouter/node/RedirManager.java | 118 ++ .../att/research/datarouter/node/StatusLog.java | 229 +++ .../research/datarouter/node/SubnetMatcher.java | 71 + .../com/att/research/datarouter/node/Target.java | 60 + .../com/att/research/datarouter/node/TaskList.java | 113 ++ .../research/datarouter/node/eelf/EELFFilter.java | 43 + .../research/datarouter/node/eelf/EelfMsgs.java | 96 ++ .../src/main/resources/EelfMessages.properties | 70 + .../src/main/resources/docker/Dockerfile | 7 + .../src/main/resources/docker/startup.sh | 18 + .../src/main/resources/log4j.properties | 32 + .../src/main/resources/log4j.properties.tmpl | 11 + datarouter-node/src/main/resources/logback.xml | 405 +++++ .../src/main/resources/misc/descriptor.xml | 53 + datarouter-node/src/main/resources/misc/doaction | 42 + datarouter-node/src/main/resources/misc/drtrnode | 114 ++ .../src/main/resources/misc/havecert.tmpl | 11 + .../src/main/resources/misc/log4j.properties.tmpl | 11 + .../src/main/resources/misc/node.properties | 112 ++ datarouter-node/src/main/resources/misc/notes | 54 + datarouter-node/src/main/resources/node.properties | 112 ++ datarouter-prov/data/addFeed3.txt | 23 + datarouter-prov/data/addSubscriber.txt | 15 + datarouter-prov/pom.xml | 558 +++++++ datarouter-prov/self_signed/cacerts.jks | Bin 0 -> 983 bytes datarouter-prov/self_signed/keystore.jks | Bin 0 -> 2272 bytes datarouter-prov/self_signed/mykey.cer | Bin 0 -> 921 bytes .../datarouter/authz/AuthorizationResponse.java | 58 + .../authz/AuthorizationResponseSupplement.java | 52 + .../att/research/datarouter/authz/Authorizer.java | 62 + .../datarouter/authz/impl/AuthRespImpl.java | 97 ++ .../authz/impl/AuthRespSupplementImpl.java | 71 + .../datarouter/authz/impl/AuthzResource.java | 100 ++ .../datarouter/authz/impl/ProvAuthorizer.java | 179 +++ .../datarouter/authz/impl/ProvDataProvider.java | 66 + .../research/datarouter/authz/impl/package.html | 68 + .../com/att/research/datarouter/authz/package.html | 38 + .../datarouter/provisioning/BaseServlet.java | 869 ++++++++++ .../datarouter/provisioning/DRFeedsServlet.java | 300 ++++ .../datarouter/provisioning/FeedLogServlet.java | 38 + .../datarouter/provisioning/FeedServlet.java | 362 +++++ .../datarouter/provisioning/GroupServlet.java | 386 +++++ .../datarouter/provisioning/InternalServlet.java | 506 ++++++ .../datarouter/provisioning/LogServlet.java | 433 +++++ .../att/research/datarouter/provisioning/Main.java | 245 +++ .../research/datarouter/provisioning/Poker.java | 318 ++++ .../datarouter/provisioning/ProxyServlet.java | 304 ++++ .../datarouter/provisioning/PublishServlet.java | 192 +++ .../datarouter/provisioning/RouteServlet.java | 429 +++++ .../datarouter/provisioning/StatisticsServlet.java | 588 +++++++ .../datarouter/provisioning/SubLogServlet.java | 39 + .../datarouter/provisioning/SubscribeServlet.java | 288 ++++ .../provisioning/SubscriptionServlet.java | 476 ++++++ .../datarouter/provisioning/SynchronizerTask.java | 614 ++++++++ .../provisioning/beans/BaseLogRecord.java | 184 +++ .../datarouter/provisioning/beans/Deleteable.java | 41 + .../provisioning/beans/DeliveryExtraRecord.java | 68 + .../provisioning/beans/DeliveryRecord.java | 137 ++ .../datarouter/provisioning/beans/EgressRoute.java | 227 +++ .../provisioning/beans/EventLogRecord.java | 84 + .../provisioning/beans/ExpiryRecord.java | 141 ++ .../datarouter/provisioning/beans/Feed.java | 760 +++++++++ .../provisioning/beans/FeedAuthorization.java | 96 ++ .../provisioning/beans/FeedEndpointID.java | 87 ++ .../datarouter/provisioning/beans/FeedLinks.java | 103 ++ .../datarouter/provisioning/beans/Group.java | 417 +++++ .../provisioning/beans/IngressRoute.java | 542 +++++++ .../datarouter/provisioning/beans/Insertable.java | 41 + .../datarouter/provisioning/beans/JSONable.java | 40 + .../datarouter/provisioning/beans/LOGJSONable.java | 40 + .../datarouter/provisioning/beans/Loadable.java | 65 + .../datarouter/provisioning/beans/LogRecord.java | 235 +++ .../provisioning/beans/NetworkRoute.java | 230 +++ .../datarouter/provisioning/beans/NodeClass.java | 179 +++ .../datarouter/provisioning/beans/Parameters.java | 257 +++ .../provisioning/beans/PubFailRecord.java | 85 + .../provisioning/beans/PublishRecord.java | 153 ++ .../datarouter/provisioning/beans/SubDelivery.java | 109 ++ .../datarouter/provisioning/beans/SubLinks.java | 95 ++ .../provisioning/beans/Subscription.java | 511 ++++++ .../datarouter/provisioning/beans/Syncable.java | 57 + .../datarouter/provisioning/beans/Updateable.java | 40 + .../datarouter/provisioning/beans/package.html | 31 + .../datarouter/provisioning/eelf/EelfMsgs.java | 56 + .../datarouter/provisioning/eelf/JettyFilter.java | 38 + .../research/datarouter/provisioning/package.html | 123 ++ .../research/datarouter/provisioning/utils/DB.java | 711 +++++++++ .../datarouter/provisioning/utils/DRRouteCLI.java | 456 ++++++ .../provisioning/utils/JSONUtilities.java | 76 + .../provisioning/utils/LogfileLoader.java | 549 +++++++ .../provisioning/utils/PurgeLogDirTask.java | 70 + .../datarouter/provisioning/utils/RLEBitSet.java | 418 +++++ .../provisioning/utils/ThrottleFilter.java | 316 ++++ .../provisioning/utils/URLUtilities.java | 130 ++ .../datarouter/provisioning/utils/package.html | 30 + .../datarouter/reports/DailyLatencyReport.java | 194 +++ .../research/datarouter/reports/FeedReport.java | 395 +++++ .../research/datarouter/reports/LatencyReport.java | 179 +++ .../att/research/datarouter/reports/Report.java | 155 ++ .../research/datarouter/reports/ReportBase.java | 63 + .../datarouter/reports/SubscriberReport.java | 157 ++ .../research/datarouter/reports/VolumeReport.java | 140 ++ .../att/research/datarouter/reports/package.html | 43 + datarouter-prov/src/main/java/org/json/CDL.java | 301 ++++ datarouter-prov/src/main/java/org/json/Cookie.java | 191 +++ .../src/main/java/org/json/CookieList.java | 112 ++ datarouter-prov/src/main/java/org/json/HTTP.java | 185 +++ .../src/main/java/org/json/HTTPTokener.java | 99 ++ .../src/main/java/org/json/JSONArray.java | 970 ++++++++++++ .../src/main/java/org/json/JSONException.java | 63 + datarouter-prov/src/main/java/org/json/JSONML.java | 489 ++++++ .../src/main/java/org/json/JSONObject.java | 1653 ++++++++++++++++++++ .../src/main/java/org/json/JSONString.java | 40 + .../src/main/java/org/json/JSONStringer.java | 100 ++ .../src/main/java/org/json/JSONTokener.java | 468 ++++++ .../src/main/java/org/json/JSONWriter.java | 349 +++++ .../src/main/java/org/json/LOGJSONObject.java | 1653 ++++++++++++++++++++ datarouter-prov/src/main/java/org/json/None.java | 31 + datarouter-prov/src/main/java/org/json/XML.java | 530 +++++++ .../src/main/java/org/json/XMLTokener.java | 390 +++++ .../src/main/java/org/json/package.html | 40 + .../src/main/resources/EelfMessages.properties | 58 + datarouter-prov/src/main/resources/authz.jar | Bin 0 -> 8387 bytes .../docker-compose/database/install_db.sql | 143 ++ .../resources/docker-compose/docker-compose.yml | 69 + .../docker-compose/node_data/node.properties | 112 ++ .../node_data/self_signed/cacerts.jks | Bin 0 -> 1936 bytes .../node_data/self_signed/keystore.jks | Bin 0 -> 2273 bytes .../docker-compose/node_data/self_signed/mykey.cer | Bin 0 -> 921 bytes .../node_data/self_signed/nodekey.cer | Bin 0 -> 921 bytes .../docker-compose/prov_data/addFeed3.txt | 44 + .../docker-compose/prov_data/addSubscriber.txt | 36 + .../docker-compose/prov_data/provserver.properties | 59 + .../prov_data/self_signed/cacerts.jks | Bin 0 -> 983 bytes .../prov_data/self_signed/keystore.jks | Bin 0 -> 2272 bytes .../docker-compose/prov_data/self_signed/mykey.cer | Bin 0 -> 921 bytes .../src/main/resources/docker/Dockerfile | 9 + .../src/main/resources/docker/startup.sh | 16 + .../src/main/resources/log4j.properties | 68 + datarouter-prov/src/main/resources/logback.xml | 405 +++++ datarouter-prov/src/main/resources/misc/doaction | 53 + datarouter-prov/src/main/resources/misc/dr-route | 26 + datarouter-prov/src/main/resources/misc/drtrprov | 131 ++ .../src/main/resources/misc/havecert.tmpl | 11 + .../main/resources/misc/log4j.drroute.properties | 41 + .../src/main/resources/misc/log4j.properties.tmpl | 68 + .../src/main/resources/misc/mysql_dr_schema.sql | 139 ++ datarouter-prov/src/main/resources/misc/notes | 78 + datarouter-prov/src/main/resources/misc/provcmd | 163 ++ datarouter-prov/src/main/resources/misc/runreports | 54 + .../src/main/resources/provserver.properties | 48 + datarouter-prov/src/main/resources/startup.sh | 0 datarouter-prov/src/main/resources/subscriber.jar | Bin 0 -> 2150417 bytes .../java/datarouter/provisioning/AllTests.java | 48 + .../test/java/datarouter/provisioning/FillDB.java | 125 ++ .../test/java/datarouter/provisioning/package.html | 29 + .../java/datarouter/provisioning/testBase.java | 158 ++ .../java/datarouter/provisioning/testCleanup.java | 85 + .../datarouter/provisioning/testDRFeedsDelete.java | 59 + .../datarouter/provisioning/testDRFeedsGet.java | 188 +++ .../datarouter/provisioning/testDRFeedsPost.java | 282 ++++ .../datarouter/provisioning/testDRFeedsPut.java | 59 + .../datarouter/provisioning/testFeedDelete.java | 98 ++ .../java/datarouter/provisioning/testFeedPut.java | 202 +++ .../datarouter/provisioning/testInternalGet.java | 105 ++ .../datarouter/provisioning/testInternalMisc.java | 151 ++ .../java/datarouter/provisioning/testLogGet.java | 181 +++ .../java/datarouter/provisioning/testPublish.java | 119 ++ .../datarouter/provisioning/testRLEBitSet.java | 231 +++ .../java/datarouter/provisioning/testRouteAPI.java | 30 + .../datarouter/provisioning/testSubscribePost.java | 180 +++ 200 files changed, 35781 insertions(+) create mode 100644 Contributing.txt create mode 100644 Jenkinsfile create mode 100644 LICENSE create mode 100644 README.md create mode 100644 Subscriber/src/SSASubscriber.java create mode 100644 Subscriber/src/SubscriberServlet.java create mode 100644 Subscriber/src/log4j.properties create mode 100644 datarouter-node/pom.xml create mode 100644 datarouter-node/self_signed/cacerts.jks create mode 100644 datarouter-node/self_signed/keystore.jks create mode 100644 datarouter-node/self_signed/mykey.cer create mode 100644 datarouter-node/self_signed/nodekey.cer create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/ProvData.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/PublishId.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/RateLimitedOperation.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/RedirManager.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/StatusLog.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/SubnetMatcher.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/Target.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/TaskList.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EELFFilter.java create mode 100644 datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EelfMsgs.java create mode 100644 datarouter-node/src/main/resources/EelfMessages.properties create mode 100644 datarouter-node/src/main/resources/docker/Dockerfile create mode 100644 datarouter-node/src/main/resources/docker/startup.sh create mode 100644 datarouter-node/src/main/resources/log4j.properties create mode 100644 datarouter-node/src/main/resources/log4j.properties.tmpl create mode 100644 datarouter-node/src/main/resources/logback.xml create mode 100644 datarouter-node/src/main/resources/misc/descriptor.xml create mode 100644 datarouter-node/src/main/resources/misc/doaction create mode 100644 datarouter-node/src/main/resources/misc/drtrnode create mode 100644 datarouter-node/src/main/resources/misc/havecert.tmpl create mode 100644 datarouter-node/src/main/resources/misc/log4j.properties.tmpl create mode 100644 datarouter-node/src/main/resources/misc/node.properties create mode 100644 datarouter-node/src/main/resources/misc/notes create mode 100644 datarouter-node/src/main/resources/node.properties create mode 100644 datarouter-prov/data/addFeed3.txt create mode 100644 datarouter-prov/data/addSubscriber.txt create mode 100644 datarouter-prov/pom.xml create mode 100644 datarouter-prov/self_signed/cacerts.jks create mode 100644 datarouter-prov/self_signed/keystore.jks create mode 100644 datarouter-prov/self_signed/mykey.cer create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/authz/AuthorizationResponse.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/authz/AuthorizationResponseSupplement.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/authz/Authorizer.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthRespImpl.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthRespSupplementImpl.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthzResource.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/ProvAuthorizer.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/ProvDataProvider.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/package.html create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/authz/package.html create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/BaseServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/DRFeedsServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/FeedLogServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/FeedServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/GroupServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/InternalServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/LogServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/Main.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/Poker.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/ProxyServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/PublishServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/RouteServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/StatisticsServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/SubLogServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/SubscribeServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/SubscriptionServlet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/SynchronizerTask.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/BaseLogRecord.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Deleteable.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/DeliveryExtraRecord.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/DeliveryRecord.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/EgressRoute.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/EventLogRecord.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/ExpiryRecord.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Feed.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/FeedAuthorization.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/FeedEndpointID.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/FeedLinks.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Group.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/IngressRoute.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Insertable.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/JSONable.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/LOGJSONable.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Loadable.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/LogRecord.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/NetworkRoute.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/NodeClass.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Parameters.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/PubFailRecord.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/PublishRecord.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/SubDelivery.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/SubLinks.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Subscription.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Syncable.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/Updateable.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/beans/package.html create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/eelf/EelfMsgs.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/eelf/JettyFilter.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/package.html create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/DB.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/DRRouteCLI.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/JSONUtilities.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/LogfileLoader.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/PurgeLogDirTask.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/RLEBitSet.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/ThrottleFilter.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/URLUtilities.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/provisioning/utils/package.html create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/reports/DailyLatencyReport.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/reports/FeedReport.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/reports/LatencyReport.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/reports/Report.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/reports/ReportBase.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/reports/SubscriberReport.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/reports/VolumeReport.java create mode 100644 datarouter-prov/src/main/java/com/att/research/datarouter/reports/package.html create mode 100644 datarouter-prov/src/main/java/org/json/CDL.java create mode 100644 datarouter-prov/src/main/java/org/json/Cookie.java create mode 100644 datarouter-prov/src/main/java/org/json/CookieList.java create mode 100644 datarouter-prov/src/main/java/org/json/HTTP.java create mode 100644 datarouter-prov/src/main/java/org/json/HTTPTokener.java create mode 100644 datarouter-prov/src/main/java/org/json/JSONArray.java create mode 100644 datarouter-prov/src/main/java/org/json/JSONException.java create mode 100644 datarouter-prov/src/main/java/org/json/JSONML.java create mode 100644 datarouter-prov/src/main/java/org/json/JSONObject.java create mode 100644 datarouter-prov/src/main/java/org/json/JSONString.java create mode 100644 datarouter-prov/src/main/java/org/json/JSONStringer.java create mode 100644 datarouter-prov/src/main/java/org/json/JSONTokener.java create mode 100644 datarouter-prov/src/main/java/org/json/JSONWriter.java create mode 100644 datarouter-prov/src/main/java/org/json/LOGJSONObject.java create mode 100644 datarouter-prov/src/main/java/org/json/None.java create mode 100644 datarouter-prov/src/main/java/org/json/XML.java create mode 100644 datarouter-prov/src/main/java/org/json/XMLTokener.java create mode 100644 datarouter-prov/src/main/java/org/json/package.html create mode 100644 datarouter-prov/src/main/resources/EelfMessages.properties create mode 100644 datarouter-prov/src/main/resources/authz.jar create mode 100644 datarouter-prov/src/main/resources/docker-compose/database/install_db.sql create mode 100644 datarouter-prov/src/main/resources/docker-compose/docker-compose.yml create mode 100644 datarouter-prov/src/main/resources/docker-compose/node_data/node.properties create mode 100644 datarouter-prov/src/main/resources/docker-compose/node_data/self_signed/cacerts.jks create mode 100644 datarouter-prov/src/main/resources/docker-compose/node_data/self_signed/keystore.jks create mode 100644 datarouter-prov/src/main/resources/docker-compose/node_data/self_signed/mykey.cer create mode 100644 datarouter-prov/src/main/resources/docker-compose/node_data/self_signed/nodekey.cer create mode 100644 datarouter-prov/src/main/resources/docker-compose/prov_data/addFeed3.txt create mode 100644 datarouter-prov/src/main/resources/docker-compose/prov_data/addSubscriber.txt create mode 100644 datarouter-prov/src/main/resources/docker-compose/prov_data/provserver.properties create mode 100644 datarouter-prov/src/main/resources/docker-compose/prov_data/self_signed/cacerts.jks create mode 100644 datarouter-prov/src/main/resources/docker-compose/prov_data/self_signed/keystore.jks create mode 100644 datarouter-prov/src/main/resources/docker-compose/prov_data/self_signed/mykey.cer create mode 100644 datarouter-prov/src/main/resources/docker/Dockerfile create mode 100644 datarouter-prov/src/main/resources/docker/startup.sh create mode 100644 datarouter-prov/src/main/resources/log4j.properties create mode 100644 datarouter-prov/src/main/resources/logback.xml create mode 100644 datarouter-prov/src/main/resources/misc/doaction create mode 100644 datarouter-prov/src/main/resources/misc/dr-route create mode 100644 datarouter-prov/src/main/resources/misc/drtrprov create mode 100644 datarouter-prov/src/main/resources/misc/havecert.tmpl create mode 100644 datarouter-prov/src/main/resources/misc/log4j.drroute.properties create mode 100644 datarouter-prov/src/main/resources/misc/log4j.properties.tmpl create mode 100644 datarouter-prov/src/main/resources/misc/mysql_dr_schema.sql create mode 100644 datarouter-prov/src/main/resources/misc/notes create mode 100644 datarouter-prov/src/main/resources/misc/provcmd create mode 100644 datarouter-prov/src/main/resources/misc/runreports create mode 100644 datarouter-prov/src/main/resources/provserver.properties create mode 100644 datarouter-prov/src/main/resources/startup.sh create mode 100644 datarouter-prov/src/main/resources/subscriber.jar create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/AllTests.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/FillDB.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/package.html create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testBase.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testCleanup.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testDRFeedsDelete.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testDRFeedsGet.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testDRFeedsPost.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testDRFeedsPut.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testFeedDelete.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testFeedPut.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testInternalGet.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testInternalMisc.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testLogGet.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testPublish.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testRLEBitSet.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testRouteAPI.java create mode 100644 datarouter-prov/src/test/java/datarouter/provisioning/testSubscribePost.java diff --git a/Contributing.txt b/Contributing.txt new file mode 100644 index 00000000..d64568e0 --- /dev/null +++ b/Contributing.txt @@ -0,0 +1,35 @@ +This software is distributed under a permissive open source +license to allow it to be used in any projects, whether open +source or proprietary. Contributions to the project are welcome +and it is important to maintain clear record of contributions +and terms under which they are licensed. + +To indicate your acceptance of Developer's Certificate of Origin 1.1 +terms, please add the following line to the end of the commit message +for each contribution you make to the project: + +Signed-off-by : Your Name + +Developer's Certificate of Origin 1.1 + +By making a contribution to this project, I certify that: + +(a) The contribution was created in whole or inpart by me and I +have the right to submit it under the open source license indicated +in the file: or + +(b) The contribution is based upon previous work that, to the best +of my knowledge, is covered under an appropriate open source license +and I have the right under that license to submit that work with +modifications, whether created in whole or part by me, under the same +open source license (unless I am permitted to submit under a different +license), as indicated in the file; or + +(c) The contribution was provided directly to me by some other person +who certified (a), (b) or (c) I have not modified it. + +(d) I understand and agree that this project and the contribution are +public and that a record of the contribution (including all personal +information I submit with it, including my sign-off)is maintained +indefinitely and may be redistributed consistent with this project or +the open source license(s) involved. \ No newline at end of file diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 00000000..a8161fc7 --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,24 @@ +node { + // Get the maven tool. + // ** NOTE: This 'M3' maven tool must be configured + // ** in the Jenkins global configuration. + def mvnHome = tool 'M3' + sh "echo ${mvnHome}" + + + // Mark the code checkout 'stage'.... + stage 'Checkout' + // Get some code from a GitHub repository + checkout scm + + // Mark the code build 'stage'.... + stage 'Build DMAAP-DR' + // Run the maven build + //sh for unix bat for windows + + sh "${mvnHome}/bin/mvn -f datarouter-prov/pom.xml clean deploy" + sh "${mvnHome}/bin/mvn -f datarouter-node/pom.xml clean deploy" + + + +} diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..2ce945c1 --- /dev/null +++ b/LICENSE @@ -0,0 +1,22 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 00000000..39dbca9e --- /dev/null +++ b/README.md @@ -0,0 +1,149 @@ +# DMAAP_DATAROUTER + +## OVERVIEW + +The Data Routing System project is intended to provide a common framework by which data producers can make data available to data consumers and a way for potential consumers to find feeds with the data they require. +The delivery of data from these kinds of production systems is the domain of the Data Routing System. Its primary goal is to make it easier to move data from existing applications that may not have been designed from the ground up to share data. +The Data Routing System is different from many existing platforms for distributing messages from producers to consumers which focus on real-time delivery of small messages (on the order of a few kilobytes or so) for more + + Provisioning is implemented as a Java servlet running under Jetty in one JVM + + Provisioning data is stored in a MySQL database + + The backup provisioning server and each node is informed any time provisioning data changes + + The backup provisioning server and each node may request the complete set of provisioning data at any time + + A Node is implemented as a Java servlet running under Jetty in one JVM + +Assumptions + For 95% of all feeds (there will be some exceptions): + + Number of Publishing Endpoints per Feed: 1 – 10 + + Number of Subscribers per Feed: 2 – 10 + + File Size: 105 – 1010 bytes + + with a distribution towards the high end + + Frequency of Publishing: 1/day – 10/minute + + Lifetime of a Feed: months to years + + Lifetime of a Subscription: months to years + + +Data Router and Sensitive Data Handling + + A publisher of a Data Router feed of sensitive (e.g., PCI, SPI, etc.) data needs to encrypt that data prior to delivering it to the Data Router + + The Data Router will distribute that data to all of the subscribers of that feed. + + Data Router does not examine the Feed content or enforce any restrictions or Validations on the Feed Content in any way + + It is the responsibility of the subscribers to work with the publisher to determine how to decrypt that data + + + + + +What the Data Router is NOT: + + Does not support streaming data + + Does not tightly couple to any specific publish endpoint or subscriber + + Agnostic as to source and sink of data residing in an RDBMS, NoSQL DB, Other DBMS, Flat Files, etc. + + Does not transform any published data + + Does not “examine” any published data + + Does not verify the integrity of a published file + + Does not perform any data “cleansing” + + Does not store feeds (not a repository or archive) + + There is no long-term storage – assumes subscribers are responsive most of the time + + Does not encrypt data when queued on a node + + Does not provide guaranteed order of delivery + + Per-file metadata can be used for ordering + + External customers supported is via DITREX (MOTS 18274) + + + + +## BUILD + +Datarouter can be cloned and repository and builb using Maven +In the repository + +Go to datarouter-prov in the root + + mvn clean install + +Go to datarouter-node in the root + + mvn clean install + +Project Build will be Successful + + + + +## RUN + +Datarouter is a Unix based service + +Pre-requisites to run the service + +MySQL Version 5.6 + +Java JDK 1.8 + +Install MySQL and load needed table into the database + +Sample install_db.sql is provided in the datarouter-prov/data . + +Go to datarouter-prov module and run the service using main.java + +Go to datarouter-node module and run the service using nodemain.java + +Curl Commands to test: + +create a feed: + +curl -v -X POST -H "Content-Type : application/vnd.att-dr.feed" -H "X-ATT-DR-ON-BEHALF-OF: rs873m" --data-ascii @/opt/app/datartr/addFeed3.txt --post301 --location-trusted -k https://prov.datarouternew.com:8443 + +Subscribe to feed: + +curl -v -X POST -H "Content-Type: application/vnd.att-dr.subscription" -H "X-ATT-DR-ON-BEHALF-OF: rs873m" --data-ascii @/opt/app/datartr/addSubscriber.txt --post301 --location-trusted -k https://prov.datarouternew.com:8443/subscribe/1 + +Publish to feed: + +curl -v -X PUT --user rs873m:rs873m -H "Content-Type: application/octet-stream" --data-binary @/opt/app/datartr/addFeed3.txt --post301 --location-trusted -k https://prov.datarouternew.com:8443/publish/1/test1 + + + + + ## CONFIGURATION + +Recommended + +Environment - Unix based + +Java - 1.8 + +Maven - 3.2.5 + +MySQL - 5.6 + +Self Signed SSL certificates + + diff --git a/Subscriber/src/SSASubscriber.java b/Subscriber/src/SSASubscriber.java new file mode 100644 index 00000000..5ec099bd --- /dev/null +++ b/Subscriber/src/SSASubscriber.java @@ -0,0 +1,115 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.servlet.*; +import org.eclipse.jetty.util.ssl.*; +import org.eclipse.jetty.server.*; +import org.apache.log4j.Logger; + +/** + * Example stand alone subscriber + */ +public class SSASubscriber { + private static final int Port = 8447; + private static final String KeyStoreType = "jks"; + private static final String KeyStoreFile = "/root/sub/subscriber.jks"; + //private static final String KeyStoreFile = "c:/tmp/subscriber.jks"; + private static final String KeyStorePassword = "changeit"; + private static final String KeyPassword = "changeit"; + private static final String ContextPath = "/"; + private static final String URLPattern = "/*"; + + public static void main(String[] args) throws Exception { + //User story # US792630 -Jetty Upgrade to 9.3.11 + //SSASubscriber register Jetty server. + Server server = new Server(); + HttpConfiguration http_config = new HttpConfiguration(); + http_config.setSecureScheme("https"); + http_config.setSecurePort(Port); + http_config.setRequestHeaderSize(8192); + + // HTTP connector + ServerConnector http = new ServerConnector(server, + new HttpConnectionFactory(http_config)); + http.setPort(7070); + http.setIdleTimeout(30000); + + // SSL Context Factory + SslContextFactory sslContextFactory = new SslContextFactory(); + sslContextFactory.setKeyStoreType(KeyStoreType); + sslContextFactory.setKeyStorePath(KeyStoreFile); + sslContextFactory.setKeyStorePassword(KeyStorePassword); + sslContextFactory.setKeyManagerPassword(KeyPassword); + + // sslContextFactory.setTrustStorePath(ncm.getKSFile()); + // sslContextFactory.setTrustStorePassword("changeit"); + sslContextFactory.setExcludeCipherSuites("SSL_RSA_WITH_DES_CBC_SHA", + "SSL_DHE_RSA_WITH_DES_CBC_SHA", "SSL_DHE_DSS_WITH_DES_CBC_SHA", + "SSL_RSA_EXPORT_WITH_RC4_40_MD5", + "SSL_RSA_EXPORT_WITH_DES40_CBC_SHA", + "SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA", + "SSL_DHE_DSS_EXPORT_WITH_DES40_CBC_SHA"); + + // SSL HTTP Configuration + HttpConfiguration https_config = new HttpConfiguration(http_config); + https_config.addCustomizer(new SecureRequestCustomizer()); + + // SSL Connector + ServerConnector sslConnector = new ServerConnector(server, + new SslConnectionFactory(sslContextFactory,HttpVersion.HTTP_1_1.asString()), + new HttpConnectionFactory(https_config)); + sslConnector.setPort(Port); + server.addConnector(sslConnector); + + /**Skip SSLv3 Fixes*/ + sslContextFactory.addExcludeProtocols("SSLv3"); + System.out.println("Excluded protocols SSASubscriber-"+sslContextFactory.getExcludeProtocols().toString()); + /**End of SSLv3 Fixes*/ + + // HTTPS Configuration + ServerConnector https = new ServerConnector(server, + new SslConnectionFactory(sslContextFactory,HttpVersion.HTTP_1_1.asString()), + new HttpConnectionFactory(https_config)); + https.setPort(Port); + https.setIdleTimeout(30000); + //server.setConnectors(new Connector[] { http, https }); + server.setConnectors(new Connector[] { http }); + ServletContextHandler ctxt = new ServletContextHandler(0); + ctxt.setContextPath(ContextPath); + server.setHandler(ctxt); + + ctxt.addServlet(new ServletHolder(new SubscriberServlet()), "/*"); + + try { + server.start(); + } catch ( Exception e ) { + System.out.println("Jetty failed to start. Reporting will we unavailable-"+e); + }; + server.join(); + + System.out.println("Subscriber started-"+ server.getState()); + + } +} \ No newline at end of file diff --git a/Subscriber/src/SubscriberServlet.java b/Subscriber/src/SubscriberServlet.java new file mode 100644 index 00000000..1af62a63 --- /dev/null +++ b/Subscriber/src/SubscriberServlet.java @@ -0,0 +1,149 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URLEncoder; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.codec.binary.Base64; +import org.apache.log4j.Logger; + +/** + * Example stand alone subscriber servlet with Authorization header checking + */ +public class SubscriberServlet extends HttpServlet { + private static Logger logger = Logger.getLogger("com.att.datarouter.pubsub.ssasubscribe.SubscriberServlet"); + private String Login = "LOGIN"; + private String Password = "PASSWORD"; + private String OutputDirectory = "/root/sub/received"; + + private String auth; + + private static String gp(ServletConfig config, String param, String deflt) { + param = config.getInitParameter(param); + if (param == null || param.length() == 0) { + param = deflt; + } + return(param); + } + /** + * Configure this subscriberservlet. Configuration parameters from config.getInitParameter() are: + *
    + *
  • Login - The login expected in the Authorization header (default "LOGIN"). + *
  • Password - The password expected in the Authorization header (default "PASSWORD"). + *
  • OutputDirectory - The directory where files are placed (default "received"). + *
+ */ + public void init(ServletConfig config) throws ServletException { + Login = gp(config, "Login", Login); + Password = gp(config, "Password", Password); + OutputDirectory = gp(config, "OutputDirectory", OutputDirectory); + (new File(OutputDirectory)).mkdirs(); + auth = "Basic " + Base64.encodeBase64String((Login + ":" + Password).getBytes()); + } + /** + * Invoke common(req, resp, false). + */ + protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + common(req, resp, false); + } + /** + * Invoke common(req, resp, true). + */ + protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + common(req, resp, true); + } + /** + * Process a PUT or DELETE request. + *
    + *
  1. Verify that the request contains an Authorization header + * or else UNAUTHORIZED. + *
  2. Verify that the Authorization header matches the configured + * Login and Password or else FORBIDDEN. + *
  3. If the request is PUT, store the message body as a file + * in the configured OutputDirectory directory protecting against + * evil characters in the received FileID. The file is created + * initially with its name prefixed with a ".", and once it is complete, it is + * renamed to remove the leading "." character. + *
  4. If the request is DELETE, instead delete the file (if it exists) from the configured OutputDirectory directory. + *
  5. Respond with NO_CONTENT. + *
+ */ + protected void common(HttpServletRequest req, HttpServletResponse resp, boolean isdelete) throws ServletException, IOException { + String ah = req.getHeader("Authorization"); + if (ah == null) { + logger.info("Rejecting request with no Authorization header from " + req.getRemoteAddr() + ": " + req.getPathInfo()); + resp.sendError(HttpServletResponse.SC_UNAUTHORIZED); + return; + } + if (!auth.equals(ah)) { + logger.info("Rejecting request with incorrect Authorization header from " + req.getRemoteAddr() + ": " + req.getPathInfo()); + resp.sendError(HttpServletResponse.SC_FORBIDDEN); + return; + } + String fileid = req.getPathInfo(); + fileid = fileid.substring(fileid.lastIndexOf('/') + 1); + String qs = req.getQueryString(); + if (qs != null) { + fileid = fileid + "?" + qs; + } + String publishid = req.getHeader("X-ATT-DR-PUBLISH-ID"); + String filename = URLEncoder.encode(fileid, "UTF-8").replaceAll("^\\.", "%2E").replaceAll("\\*", "%2A"); + String finalname = OutputDirectory + "/" + filename; + String tmpname = OutputDirectory + "/." + filename; + try { + if (isdelete) { + (new File(finalname)).delete(); + logger.info("Received delete for file id " + fileid + " from " + req.getRemoteAddr() + " publish id " + publishid + " as " + finalname); + } else { + InputStream is = req.getInputStream(); + OutputStream os = new FileOutputStream(tmpname); + byte[] buf = new byte[65536]; + int i; + while ((i = is.read(buf)) > 0) { + os.write(buf, 0, i); + } + is.close(); + os.close(); + (new File(tmpname)).renameTo(new File(finalname)); + logger.info("Received file id " + fileid + " from " + req.getRemoteAddr() + " publish id " + publishid + " as " + finalname); + resp.setStatus(HttpServletResponse.SC_NO_CONTENT); + logger.info("Received file id " + fileid + " from " + req.getRemoteAddr() + " publish id " + publishid + " as " + finalname); + } + resp.setStatus(HttpServletResponse.SC_NO_CONTENT); + } catch (IOException ioe) { + (new File(tmpname)).delete(); + logger.info("Failure to save file " + finalname + " from " + req.getRemoteAddr() + ": " + req.getPathInfo(), ioe); + throw ioe; + } + } +} diff --git a/Subscriber/src/log4j.properties b/Subscriber/src/log4j.properties new file mode 100644 index 00000000..8c12d5ca --- /dev/null +++ b/Subscriber/src/log4j.properties @@ -0,0 +1,9 @@ +log4j.debug=FALSE +log4j.rootLogger=INFO,Root + +log4j.appender.Root=org.apache.log4j.DailyRollingFileAppender +log4j.appender.Root.file=/opt/app/datartr/logs/subscriber.log +log4j.appender.Root.datePattern='.'yyyyMMdd +log4j.appender.Root.append=true +log4j.appender.Root.layout=org.apache.log4j.PatternLayout +log4j.appender.Root.layout.ConversionPattern=%d %p %t %m%n diff --git a/datarouter-node/pom.xml b/datarouter-node/pom.xml new file mode 100644 index 00000000..b2b798b0 --- /dev/null +++ b/datarouter-node/pom.xml @@ -0,0 +1,472 @@ + + + 4.0.0 + + com.att.datarouter-node + datarouter-node + 0.0.1-SNAPSHOT + jar + + datarouter-node + https://github.com/att/DMAAP_DATAROUTER + + + BSD License + + + + + + + UTF-8 + 1.8 + 1.8 + ${basedir}/target/ + hub.docker.com + + + + + junit + junit + 3.8.1 + test + + + org.json + json + 20160810 + + + + javax.mail + javax.mail-api + 1.5.1 + + + com.att.eelf + eelf-core + 0.0.1 + + + javax.servlet + servlet-api + 2.5 + + + + org.eclipse.jetty + jetty-server + 7.6.14.v20131031 + + + org.eclipse.jetty + jetty-continuation + 7.6.14.v20131031 + + + org.eclipse.jetty + jetty-util + 7.6.14.v20131031 + + + org.eclipse.jetty + jetty-deploy + 7.6.14.v20131031 + + + org.eclipse.jetty + jetty-servlet + 7.6.14.v20131031 + + + org.eclipse.jetty + jetty-servlets + 7.6.14.v20131031 + + + org.eclipse.jetty + jetty-http + 7.6.14.v20131031 + + + + org.eclipse.jetty + jetty-security + 7.6.14.v20131031 + + + + org.eclipse.jetty + jetty-websocket + 7.6.14.v20131031 + + + + org.eclipse.jetty + jetty-io + 7.6.14.v20131031 + + + + org.apache.commons + commons-io + 1.3.2 + + + commons-lang + commons-lang + 2.4 + + + commons-io + commons-io + 2.1 + compile + + + org.apache.httpcomponents + httpcore + 4.2.2 + + + + commons-codec + commons-codec + 1.6 + + + + org.mozilla + rhino + 1.7R3 + + + org.apache.james + apache-mime4j-core + 0.7 + + + org.apache.httpcomponents + httpclient + 4.2.3 + + + org.sonatype.http-testing-harness + junit-runner + 0.11 + + + + + log4j + log4j + 1.2.17 + compile + + + + + datarouter-node + + + src/main/resources + true + + **/*.properties + + + + src/main/resources + true + + **/EelfMessages.properties + + + + src/main/resources + true + + **/log4j.properties + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + com.att.research.datarouter.node.NodeMain + + + + + 1.8 + 1.8 + + 3.6.0 + + + maven-assembly-plugin + 2.4 + + + jar-with-dependencies + + ${basedir}/target/opt/app/datartr/lib + + + + true + com.att.research.datarouter.node.NodeMain + + + + + + + make-assembly + package + + single + + + + + + org.apache.maven.plugins + maven-resources-plugin + 2.7 + + + copy-docker-file + package + + copy-resources + + + ${dockerLocation} + true + + + ${basedir}/src/main/resources/docker + true + + **/* + + + + + + + copy-resources + validate + + copy-resources + + + ${basedir}/target/opt/app/datartr/etc + + + ${basedir}/src/main/resources + + misc/** + **/** + + + + + + + copy-resources-1 + validate + + copy-resources + + + ${basedir}/target/opt/app/datartr/self_signed + + + ${basedir}/self_signed + + misc/** + **/** + + + + + + + + + com.spotify + docker-maven-plugin + 0.4.11 + + datarouter-node + ${dockerLocation} + docker-hub + https://${docker.registry} + + ${project.version} + latest + + true + + + + + org.apache.maven.plugins + maven-dependency-plugin + 2.10 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/opt/app/datartr/lib + false + false + true + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + false + + + + attach-javadocs + + jar + + + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + + jar-no-fork + + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.5 + + + sign-artifacts + verify + + sign + + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + true + + ossrhdme + https://oss.sonatype.org/ + true + + + + + org.codehaus.mojo + cobertura-maven-plugin + 2.7 + + + html + xml + + + + + + com.blackducksoftware.integration + hub-maven-plugin + 1.0.4 + false + + ${project.basedir} + + + + create-bdio-file + package + + createHubOutput + + + + + + + + + + ossrhdme + https://oss.sonatype.org/content/repositories/snapshots + + + ossrhdme + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + + https://github.com/att/DMAAP_DATAROUTER.git + ${project.scm.connection} + https://github.com/att/DMAAP_DATAROUTER/tree/master + + + diff --git a/datarouter-node/self_signed/cacerts.jks b/datarouter-node/self_signed/cacerts.jks new file mode 100644 index 0000000000000000000000000000000000000000..dfd81433eeb9e44fb36f56c68c4627daf0e65356 GIT binary patch literal 1936 zcmezO_TO6u1_mZLX3xt{NzG2JWME*7{CMf?V+Pg;JyQcq1_tJt22IRU4Vsv07cet1 zGBL4)eZ632z{|#|)#lOmotKf3o0Y+!+EBoNkBvE$g-w_}G%q_ZzdR2n!hs>ejv>N@ zA;OIikvEV9nanIK26CcaN@7W3QGRJjYEfQlxn6R9u7R95uc5htp|PO>2pC6!xuym% z?!Y>(iBSnTuozhxn41{+84Q{jxtN+585t(X>fN4{)^Q{1%C;9-J1!Oa?_hoA)wB3y z);$mALeqOv)3~QBDA>{8=B1I>5jSfd-;b@#6)}5!UU(inaPHi1w}+=AY%jQcoupmZ zw0?V1c=r2%(u}EhR+Y+&^zkg;YKONQyEs<(7DplnA~3}P1Cf!zwlp&BbN0c>fdZO0`&C!HzPc=i z``Ee;t+N5kt)EOwHj=pG^Wgr8U!V0$d{Vc6QS{TUmRWf-^vhg1F}@s?Em=z^m5ZNI zXSbPNP+?={o2%kCN1@xClO^uni=#_oMK3q0i%p(>CeJV=7;^~monF_f8p+-UD5tdT5A_q)yll7YeXtbazbCU27F**&gJ+l zEW;<t5IVXJ&RWUwCsOHr(&lxtrGOQr3Hm8%%RC zI200pyg1`m>g#|ycT!i?p95e*tG+*4#>1w!Pds z7k9Qb?8=Wv3q(5QjDn*4GCbMVz{uFt!pH=YZB2|}+<|o*p=`VI^m2wP%MY#2|NnC4 zfsNsz+c-Yv9d+}+wv#11{jB@duP&eEUPXO=8+GhN`trZ$rrfum@x;!(@M)Sw_B8z$ z^I0_$XD|DGe3Ea4@Cyv{ku`X;lsF$drxwwM)ySG;+Y>wPocepOrKcdh>!1_peF8!R+Ze`m}7QS@Y*^JiAY`rmeo>`hOax%=JT z%^~f7YWnweUsqaYTBP@$+ODhn^vEB(PX|uk4rf&GdXctf-7S;Pe?Gs_Y(MF{A1%vbs!*L*yBA7mtKsXZ~Zw ztrmB-K3eCtX6D+3B3|-@jwwY|%1|iEZ(MU7)ArHmGP*F^SXeirQTC77&nGp)vJ*X6s?Xk;P zvV_#QA~BIIF=fxzwdAUMo^zjb`UCFgoX-#M*AL(GIq&c1ygxtYf6M~_0MK3oe>IHi zM{)Od582a>%YAx>0RSifLx*f*5Ip>79v}p$4iW|ep#TUS!u_ivdzm>bW~Jh&9+H0- zOH-ql?^>c%nC(2|`Zjbwmo^lc)L+30MeXFy8rSNybzL7HVPOcB z^qyYUvBZu9zs?Qjxt%(S(w`PQ<;MzcH)kDj)_BGJz(B;j{K7T>AL-W?a6AV z9F4xs-pa;(oPp{eSCHF1=Kn6#_Ush$8lqgviP7C0D`!DhRl+s8``ZWD#p{2oo#G>6 z(r)m=RR?aOeO$DAhV(W@?&O%UYg`-0>Hf-sOIVh?U7qON(KqErk*!S|BCtGy6NCLJ z;c!n@zqGtBH+En?FV;?eo-83L`t51k3;J0-u-mziSXav2G-=u`%4Skn!b_u|kBg1OqYm*w5m?%bcV76A@}vg_oJ* za05FCMD)vdCHZ#L{^wu_=xao{m;RH36<+-?R1RC2A;T14k4=BMYEVO77-eayis!}(g zv|Rk=VG^EM$Xos4Rms+r896$!h$$Tw6D8^7@>FOI-Ev(OFZDusI@Fpc!B#T1U^w`m zuv4#O83MMk%PJX;Gh^ytn{Jx#HGzWm*eQq%Q@kaA?` zxHW30t4)Jad<+T+%ssfkQh5zC^P?9^M5Q2inwq6YXDqQLrl$&Uh|&XUe8OF{yj#)5 zTNO-vwg0DTmZgR!+xr7SN8hO;$i>wtJG0K5xusJ)ENvLFx$Q&mAFFHv%iq0ylYd5Z zJAUw)q93ZyJoJyh66_a}%sym?BsgU*JYaOZjhYb$9#tcf*d}c95s60A8l6ir_d4Cy zlo~-E6%mH2V(XgsT!KkSTy0ZY0@4e}@WIZ?^8x`oMKEJn8q{Ni*P zYn!ruJ1U~FigH){8^z`E)StY4)yhv6Zd09g;-yH z96LN;hI;%)m()0)7<3M5Xcln?5U#0^#qfBJ5V0(+lx+ae(hpj_-AYOwxRfa3ivKrZ z&dnVkuAwA^LVSMh>pfrRd;C$@wsIaT!bHj{vP`H}AFn!o4@7`AuOrVS_bF~kbKpu>fKgvlEWc_9 z#8O&lD1G&ZC$wq&G;6&}38OJ{EHtO>_DLf5+ys<3R)IKgj8bdK}{8m>&tM z`|k3+?}NmB$ypPzI)S5ZF3yaiWAO!oOM;X2D7NQ}=`=@?+Fn&?Le1_nYk#h19Cpv4 zAM{G@I~=F~qtmMyrD9UjlTa*WseGOInNdHotP-xcOE$rC!y^>Zqt#QhNZ5QNueppy zqGMtK`FwC$z8aUvw9qZ)#+!9M%k|VU^)WSNP-`YD&n~4SrLez(5$d3c#pnS5;0z2M zoPwc)Xb-_4AP59GyVi@tz0h=Bq{N#1S3geTc6o{pogv#t z%_7+sOb%+lUw@FADwO|?^c8j~M7J9jVGIHT0l>!!nE(E=ECM195lhzb(`bt5TV^AQ zOy<&}3oNaBqu%rMPA*bry!t3=F_kJaIW zIFE8)K5^RLPgfOBN_#i3_3~-Q6CXBP#aMq}Vsj=^PIYH3z@t>){`R8^oi>{X*T4~F z+!Ge)VCDi+y`Dd`keX5~7IM|co*QMm1Oof=u87JYMB@(8!AUiX3s1|8laN5w`pNpt zLBe~R49^&F?_8JDIpT*NjzKA@5qwPB{BO^B}gF&^SfB_#Hb0`a&FnefT zc3ysY9!!J-LxdeegbPE28zCZZAPX{?Sy-%~D8EcEC9x#2D8IBMwJ0yOTrW94*Fa93 z*U;R+(AdDp*wn(vBnr$mfpZ7eaZQX$$brSk%D~*j$j@NV#K^_e#K_37^7L|sE6We9 z&j0^%=7Ejjq1!k<<{fqOzqXSlJpHWu)vqp}$RE2;2TtA& zXH@Wdk+x>tEtAiGKEKgyKk2*Uu0pqpmEg8)R)f8Z=5B2NC-?Qm@c@p7U)LH=FsCQQ zY!ry~a9Ay}x=f2hf!H&-03&$ComW?`|s{FGF^l*E$6qWsd5)S|r9a=qmITmw0A zUPE&OLt{e&5HOAcb4?9k+<|pm6QdGxU@@{XFgG#sGZ-{6axpbAGBQk%)w?|@t>Z@2 zm2EGwc3dj--@*FKt7q}ata~2Jg{JqUrg2YMP_U!F%}XP%BW~6_z8_neD`NKeyzo4D z;M}?2ZVyjK*j{k?I!U{*Y5n%3@a*>ir5RK2tX^I5y@=-!=Wp%8!W{PLTOLPW=-iif zFr4S*mxK#91(ykbW?H;H-mWSBV*k5EN_QTw@-t*G<$anYn=iLh-6fJC_Mz?D>zuWx z68}zCjNg2uO<;cJEWc>E#KNL?K|N;sS^odn(Zv+9ZW{OBd(~Si_j4c0N|~^AZA7k@ znd_3vizc5Bco#Ks+p?4Yyv_czt=0*S32l8XeKx^C+4$PG`O~HeEc+Doiaov3;uL$G zJrgq{1LI;v14t~(^0A1qh)glh*WFil>El_x)edhrc5$rmEsjJEL|}>o1|lPaZE0lM z=j?-%0|hj1_N%UXeRWw3_px;yT4w{6TR)kWY$S2V=fV9Gzdq}i_@r+CqUfhxEwl1w z=$E;2VthF&Te6l;Di=Sa&Tcckpu)z?H&?}PjzYINCrjMD7e|-Gie7F~7n?l&%GKXT zmmgb_bM~y7z17u+zn<62ssH_2n6{34(w~K!%n$p|FJ-P<|H9ovyQ2M_wAL=Js+D(Q=R*S)U!&&=#%zVPNm zY`EX8b2qKmrL6ZBH<;#Pa401HcyY$B)Yk!z + * The Delivery class manages assignment of delivery threads to delivery + * queues and creation and destruction of delivery queues as + * configuration changes. DeliveryQueues are assigned threads based on a + * modified round-robin approach giving priority to queues with more work + * as measured by both bytes to deliver and files to deliver and lower + * priority to queues that already have delivery threads working. + * A delivery thread continues to work for a delivery queue as long as + * that queue has more files to deliver. + */ +public class Delivery { + private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.Delivery"); + private static class DelItem implements Comparable { + private String pubid; + private String spool; + public int compareTo(DelItem x) { + int i = pubid.compareTo(x.pubid); + if (i == 0) { + i = spool.compareTo(x.spool); + } + return(i); + } + public String getPublishId() { + return(pubid); + } + public String getSpool() { + return(spool); + } + public DelItem(String pubid, String spool) { + this.pubid = pubid; + this.spool = spool; + } + } + private double fdstart; + private double fdstop; + private int threads; + private int curthreads; + private NodeConfigManager config; + private Hashtable dqs = new Hashtable(); + private DeliveryQueue[] queues = new DeliveryQueue[0]; + private int qpos = 0; + private long nextcheck; + private Runnable cmon = new Runnable() { + public void run() { + checkconfig(); + } + }; + /** + * Constructs a new Delivery system using the specified configuration manager. + * @param config The configuration manager for this delivery system. + */ + public Delivery(NodeConfigManager config) { + this.config = config; + config.registerConfigTask(cmon); + checkconfig(); + } + private void cleardir(String dir) { + if (dqs.get(dir) != null) { + return; + } + File fdir = new File(dir); + for (File junk: fdir.listFiles()) { + if (junk.isFile()) { + junk.delete(); + } + } + fdir.delete(); + } + private void freeDiskCheck() { + File spoolfile = new File(config.getSpoolBase()); + long tspace = spoolfile.getTotalSpace(); + long start = (long)(tspace * fdstart); + long stop = (long)(tspace * fdstop); + long cur = spoolfile.getUsableSpace(); + if (cur >= start) { + return; + } + Vector cv = new Vector(); + for (String sdir: dqs.keySet()) { + for (String meta: (new File(sdir)).list()) { + if (!meta.endsWith(".M") || meta.charAt(0) == '.') { + continue; + } + cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir)); + } + } + DelItem[] items = cv.toArray(new DelItem[cv.size()]); + Arrays.sort(items); + logger.info("NODE0501 Free disk space below red threshold. current=" + cur + " red=" + start + " total=" + tspace); + for (DelItem item: items) { + long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId()); + logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk"); + if (amount > 0) { + cur += amount; + if (cur >= stop) { + cur = spoolfile.getUsableSpace(); + } + if (cur >= stop) { + logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace); + return; + } + } + } + cur = spoolfile.getUsableSpace(); + if (cur >= stop) { + logger.info("NODE0503 Free disk space at or above yellow threshold. current=" + cur + " yellow=" + stop + " total=" + tspace); + return; + } + logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status. current=" + cur + " yellow=" + stop + " total=" + tspace); + } + private void cleardirs() { + String basedir = config.getSpoolBase(); + String nbase = basedir + "/n"; + for (String nodedir: (new File(nbase)).list()) { + if (!nodedir.startsWith(".")) { + cleardir(nbase + "/" + nodedir); + } + } + String sxbase = basedir + "/s"; + for (String sxdir: (new File(sxbase)).list()) { + if (sxdir.startsWith(".")) { + continue; + } + File sxf = new File(sxbase + "/" + sxdir); + for (String sdir: sxf.list()) { + if (!sdir.startsWith(".")) { + cleardir(sxbase + "/" + sxdir + "/" + sdir); + } + } + sxf.delete(); // won't if anything still in it + } + } + private synchronized void checkconfig() { + if (!config.isConfigured()) { + return; + } + fdstart = config.getFreeDiskStart(); + fdstop = config.getFreeDiskStop(); + threads = config.getDeliveryThreads(); + if (threads < 1) { + threads = 1; + } + DestInfo[] alldis = config.getAllDests(); + DeliveryQueue[] nqs = new DeliveryQueue[alldis.length]; + qpos = 0; + Hashtable ndqs = new Hashtable(); + for (DestInfo di: alldis) { + String spl = di.getSpool(); + DeliveryQueue dq = dqs.get(spl); + if (dq == null) { + dq = new DeliveryQueue(config, di); + } else { + dq.config(di); + } + ndqs.put(spl, dq); + nqs[qpos++] = dq; + } + queues = nqs; + dqs = ndqs; + cleardirs(); + while (curthreads < threads) { + curthreads++; + (new Thread() { + { + setName("Delivery Thread"); + } + public void run() { + dodelivery(); + } + }).start(); + } + nextcheck = 0; + notify(); + } + private void dodelivery() { + DeliveryQueue dq; + while ((dq = getNextQueue()) != null) { + dq.run(); + } + } + private synchronized DeliveryQueue getNextQueue() { + while (true) { + if (curthreads > threads) { + curthreads--; + return(null); + } + if (qpos < queues.length) { + DeliveryQueue dq = queues[qpos++]; + if (dq.isSkipSet()) { + continue; + } + nextcheck = 0; + notify(); + return(dq); + } + long now = System.currentTimeMillis(); + if (now < nextcheck) { + try { + wait(nextcheck + 500 - now); + } catch (Exception e) { + } + now = System.currentTimeMillis(); + } + if (now >= nextcheck) { + nextcheck = now + 5000; + qpos = 0; + freeDiskCheck(); + } + } + } + /** + * Reset the retry timer for a delivery queue + */ + public synchronized void resetQueue(String spool) { + if (spool != null) { + DeliveryQueue dq = dqs.get(spool); + if (dq != null) { + dq.resetQueue(); + } + } + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java new file mode 100644 index 00000000..71c77978 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java @@ -0,0 +1,348 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import java.io.*; +import java.util.*; + +/** + * Mechanism for monitoring and controlling delivery of files to a destination. + *

+ * The DeliveryQueue class maintains lists of DeliveryTasks for a single + * destination (a subscription or another data router node) and assigns + * delivery threads to try to deliver them. It also maintains a delivery + * status that causes it to back off on delivery attempts after a failure. + *

+ * If the most recent delivery result was a failure, then no more attempts + * will be made for a period of time. Initially, and on the first failure + * following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds). + * If, after this delay, additional failures occur, each failure will + * multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a + * maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer(). + * Note that this behavior applies to the delivery queue as a whole and not + * to individual files in the queue. If multiple files are being + * delivered and one fails, the delay will be started. If a second + * delivery fails while the delay was active, it will not change the delay + * or change the duration of any subsequent delay. + * If, however, it succeeds, it will cancel the delay. + *

+ * The queue maintains 3 collections of files to deliver: A todo list of + * files that will be attempted, a working set of files that are being + * attempted, and a retry set of files that were attempted and failed. + * Whenever the todo list is empty and needs to be refilled, a scan of the + * spool directory is made and the file names sorted. Any files in the working set are ignored. + * If a DeliveryTask for the file is in the retry set, then that delivery + * task is placed on the todo list. Otherwise, a new DeliveryTask for the + * file is created and placed on the todo list. + * If, when a DeliveryTask is about to be removed from the todo list, its + * age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead + * marked as expired. + *

+ * A delivery queue also maintains a skip flag. This flag is true if the + * failure timer is active or if no files are found in a directory scan. + */ +public class DeliveryQueue implements Runnable, DeliveryTaskHelper { + private DeliveryQueueHelper dqh; + private DestInfo di; + private Hashtable working = new Hashtable(); + private Hashtable retry = new Hashtable(); + private int todoindex; + private boolean failed; + private long failduration; + private long resumetime; + File dir; + private Vector todo = new Vector(); + /** + * Try to cancel a delivery task. + * @return The length of the task in bytes or 0 if the task cannot be cancelled. + */ + public synchronized long cancelTask(String pubid) { + if (working.get(pubid) != null) { + return(0); + } + DeliveryTask dt = retry.get(pubid); + if (dt == null) { + for (int i = todoindex; i < todo.size(); i++) { + DeliveryTask xdt = todo.get(i); + if (xdt.getPublishId().equals(pubid)) { + dt = xdt; + break; + } + } + } + if (dt == null) { + dt = new DeliveryTask(this, pubid); + if (dt.getFileId() == null) { + return(0); + } + } + if (dt.isCleaned()) { + return(0); + } + StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts()); + dt.clean(); + return(dt.getLength()); + } + /** + * Mark that a delivery task has succeeded. + */ + public synchronized void markSuccess(DeliveryTask task) { + working.remove(task.getPublishId()); + task.clean(); + failed = false; + failduration = 0; + } + /** + * Mark that a delivery task has expired. + */ + public synchronized void markExpired(DeliveryTask task) { + task.clean(); + } + /** + * Mark that a delivery task has failed permanently. + */ + public synchronized void markFailNoRetry(DeliveryTask task) { + working.remove(task.getPublishId()); + task.clean(); + failed = false; + failduration = 0; + } + private void fdupdate() { + if (!failed) { + failed = true; + if (failduration == 0) { + failduration = dqh.getInitFailureTimer(); + } + resumetime = System.currentTimeMillis() + failduration; + long maxdur = dqh.getMaxFailureTimer(); + failduration = (long)(failduration * dqh.getFailureBackoff()); + if (failduration > maxdur) { + failduration = maxdur; + } + } + } + /** + * Mark that a delivery task has been redirected. + */ + public synchronized void markRedirect(DeliveryTask task) { + working.remove(task.getPublishId()); + retry.put(task.getPublishId(), task); + } + /** + * Mark that a delivery task has temporarily failed. + */ + public synchronized void markFailWithRetry(DeliveryTask task) { + working.remove(task.getPublishId()); + retry.put(task.getPublishId(), task); + fdupdate(); + } + /** + * Get the next task. + */ + public synchronized DeliveryTask getNext() { + DeliveryTask ret = peekNext(); + if (ret != null) { + todoindex++; + working.put(ret.getPublishId(), ret); + } + return(ret); + } + /** + * Peek at the next task. + */ + public synchronized DeliveryTask peekNext() { + long now = System.currentTimeMillis(); + long mindate = now - dqh.getExpirationTimer(); + if (failed) { + if (now > resumetime) { + failed = false; + } else { + return(null); + } + } + while (true) { + if (todoindex >= todo.size()) { + todoindex = 0; + todo = new Vector(); + String[] files = dir.list(); + Arrays.sort(files); + for (String fname: files) { + if (!fname.endsWith(".M")) { + continue; + } + String fname2 = fname.substring(0, fname.length() - 2); + long pidtime = 0; + int dot = fname2.indexOf('.'); + if (dot < 1) { + continue; + } + try { + pidtime = Long.parseLong(fname2.substring(0, dot)); + } catch (Exception e) { + } + if (pidtime < 1000000000000L) { + continue; + } + if (working.get(fname2) != null) { + continue; + } + DeliveryTask dt = retry.get(fname2); + if (dt == null) { + dt = new DeliveryTask(this, fname2); + } + todo.add(dt); + } + retry = new Hashtable(); + } + if (todoindex < todo.size()) { + DeliveryTask dt = todo.get(todoindex); + if (dt.isCleaned()) { + todoindex++; + continue; + } + if (dt.getDate() >= mindate) { + return(dt); + } + todoindex++; + reportExpiry(dt); + continue; + } + return(null); + } + } + /** + * Create a delivery queue for a given destination info + */ + public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) { + this.dqh = dqh; + this.di = di; + dir = new File(di.getSpool()); + dir.mkdirs(); + } + /** + * Update the destination info for this delivery queue + */ + public void config(DestInfo di) { + this.di = di; + } + /** + * Get the dest info + */ + public DestInfo getDestInfo() { + return(di); + } + /** + * Get the config manager + */ + public DeliveryQueueHelper getConfig() { + return(dqh); + } + /** + * Exceptional condition occurred during delivery + */ + public void reportDeliveryExtra(DeliveryTask task, long sent) { + StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent); + } + /** + * Message too old to deliver + */ + public void reportExpiry(DeliveryTask task) { + StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts()); + markExpired(task); + } + /** + * Completed a delivery attempt + */ + public void reportStatus(DeliveryTask task, int status, String xpubid, String location) { + if (status < 300) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid); + markSuccess(task); + } else if (status < 400 && dqh.isFollowRedirects()) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); + if (dqh.handleRedirection(di, location, task.getFileId())) { + markRedirect(task); + } else { + StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); + markFailNoRetry(task); + } + } else if (status < 500) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); + StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts()); + markFailNoRetry(task); + } else { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location); + markFailWithRetry(task); + } + } + /** + * Delivery failed by reason of an exception + */ + public void reportException(DeliveryTask task, Exception exception) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString()); + dqh.handleUnreachable(di); + markFailWithRetry(task); + } + /** + * Get the feed ID for a subscription + * @param subid The subscription ID + * @return The feed ID + */ + public String getFeedId(String subid) { + return(dqh.getFeedId(subid)); + } + /** + * Get the URL to deliver a message to given the file ID + */ + public String getDestURL(String fileid) { + return(dqh.getDestURL(di, fileid)); + } + /** + * Deliver files until there's a failure or there are no more + * files to deliver + */ + public void run() { + DeliveryTask t; + long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit(); + int filestogo = dqh.getFairFileLimit(); + while ((t = getNext()) != null) { + t.run(); + if (--filestogo <= 0 || System.currentTimeMillis() > endtime) { + break; + } + } + } + /** + * Is there no work to do for this queue right now? + */ + public synchronized boolean isSkipSet() { + return(peekNext() == null); + } + /** + * Reset the retry timer + */ + public void resetQueue() { + resumetime = System.currentTimeMillis(); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java new file mode 100644 index 00000000..770db1dc --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java @@ -0,0 +1,89 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +/** + * Interface to allow independent testing of the DeliveryQueue code + *

+ * This interface represents all of the configuration information and + * feedback mechanisms that a delivery queue needs. + */ +public interface DeliveryQueueHelper { + /** + * Get the timeout (milliseconds) before retrying after an initial delivery failure + */ + public long getInitFailureTimer(); + /** + * Get the ratio between timeouts on consecutive delivery attempts + */ + public double getFailureBackoff(); + /** + * Get the maximum timeout (milliseconds) between delivery attempts + */ + public long getMaxFailureTimer(); + /** + * Get the expiration timer (milliseconds) for deliveries + */ + public long getExpirationTimer(); + /** + * Get the maximum number of file delivery attempts before checking + * if another queue has work to be performed. + */ + public int getFairFileLimit(); + /** + * Get the maximum amount of time spent delivering files before checking if another queue has work to be performed. + */ + public long getFairTimeLimit(); + /** + * Get the URL for delivering a file + * @param dest The destination information for the file to be delivered. + * @param fileid The file id for the file to be delivered. + * @return The URL for delivering the file (typically, dest.getURL() + "/" + fileid). + */ + public String getDestURL(DestInfo dest, String fileid); + /** + * Forget redirections associated with a subscriber + * @param dest Destination information to forget + */ + public void handleUnreachable(DestInfo dest); + /** + * Post redirection for a subscriber + * @param dest Destination information to update + * @param location Location given by subscriber + * @param fileid File ID of request + * @return true if this 3xx response is retryable, otherwise, false. + */ + public boolean handleRedirection(DestInfo dest, String location, String fileid); + /** + * Should I handle 3xx responses differently than 4xx responses? + */ + public boolean isFollowRedirects(); + /** + * Get the feed ID for a subscription + * @param subid The subscription ID + * @return The feed ID + */ + public String getFeedId(String subid); +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java new file mode 100644 index 00000000..3d72a417 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java @@ -0,0 +1,308 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import java.io.*; +import java.net.*; +import java.util.*; +import org.apache.log4j.Logger; + +/** + * A file to be delivered to a destination. + *

+ * A Delivery task represents a work item for the data router - a file that + * needs to be delivered and provides mechanisms to get information about + * the file and its delivery data as well as to attempt delivery. + */ +public class DeliveryTask implements Runnable, Comparable { + private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.DeliveryTask"); + private DeliveryTaskHelper dth; + private String pubid; + private DestInfo di; + private String spool; + private File datafile; + private File metafile; + private long length; + private long date; + private String method; + private String fileid; + private String ctype; + private String url; + private String feedid; + private String subid; + private int attempts; + private String[][] hdrs; + /** + * Is the object a DeliveryTask with the same publication ID? + */ + public boolean equals(Object o) { + if (!(o instanceof DeliveryTask)) { + return(false); + } + return(pubid.equals(((DeliveryTask)o).pubid)); + } + /** + * Compare the publication IDs. + */ + public int compareTo(DeliveryTask o) { + return(pubid.compareTo(o.pubid)); + } + /** + * Get the hash code of the publication ID. + */ + public int hashCode() { + return(pubid.hashCode()); + } + /** + * Return the publication ID. + */ + public String toString() { + return(pubid); + } + /** + * Create a delivery task for a given delivery queue and pub ID + * @param dth The delivery task helper for the queue this task is in. + * @param pubid The publish ID for this file. This is used as + * the base for the file name in the spool directory and is of + * the form . + */ + public DeliveryTask(DeliveryTaskHelper dth, String pubid) { + this.dth = dth; + this.pubid = pubid; + di = dth.getDestInfo(); + subid = di.getSubId(); + feedid = di.getLogData(); + spool = di.getSpool(); + String dfn = spool + "/" + pubid; + String mfn = dfn + ".M"; + datafile = new File(spool + "/" + pubid); + metafile = new File(mfn); + boolean monly = di.isMetaDataOnly(); + date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); + Vector hdrv = new Vector(); + try { + BufferedReader br = new BufferedReader(new FileReader(metafile)); + String s = br.readLine(); + int i = s.indexOf('\t'); + method = s.substring(0, i); + if (!"DELETE".equals(method) && !monly) { + length = datafile.length(); + } + fileid = s.substring(i + 1); + while ((s = br.readLine()) != null) { + i = s.indexOf('\t'); + String h = s.substring(0, i); + String v = s.substring(i + 1); + if ("x-att-dr-routing".equalsIgnoreCase(h)) { + subid = v.replaceAll("[^ ]*/", ""); + feedid = dth.getFeedId(subid.replaceAll(" .*", "")); + } + if (length == 0 && h.toLowerCase().startsWith("content-")) { + continue; + } + if (h.equalsIgnoreCase("content-type")) { + ctype = v; + } + hdrv.add(new String[] {h, v}); + } + br.close(); + } catch (Exception e) { + } + hdrs = hdrv.toArray(new String[hdrv.size()][]); + url = dth.getDestURL(fileid); + } + /** + * Get the publish ID + */ + public String getPublishId() { + return(pubid); + } + /** + * Attempt delivery + */ + public void run() { + attempts++; + try { + di = dth.getDestInfo(); + boolean expect100 = di.isUsing100(); + boolean monly = di.isMetaDataOnly(); + length = 0; + if (!"DELETE".equals(method) && !monly) { + length = datafile.length(); + } + url = dth.getDestURL(fileid); + URL u = new URL(url); + HttpURLConnection uc = (HttpURLConnection)u.openConnection(); + uc.setConnectTimeout(60000); + uc.setReadTimeout(60000); + uc.setInstanceFollowRedirects(false); + uc.setRequestMethod(method); + uc.setRequestProperty("Content-Length", Long.toString(length)); + uc.setRequestProperty("Authorization", di.getAuth()); + uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid); + for (String[] nv: hdrs) { + uc.addRequestProperty(nv[0], nv[1]); + } + if (length > 0) { + if (expect100) { + uc.setRequestProperty("Expect", "100-continue"); + } + uc.setFixedLengthStreamingMode(length); + uc.setDoOutput(true); + OutputStream os = null; + try { + os = uc.getOutputStream(); + } catch (ProtocolException pe) { + dth.reportDeliveryExtra(this, -1L); + // Rcvd error instead of 100-continue + } + if (os != null) { + long sofar = 0; + try { + byte[] buf = new byte[1024 * 1024]; + InputStream is = new FileInputStream(datafile); + while (sofar < length) { + int i = buf.length; + if (sofar + i > length) { + i = (int)(length - sofar); + } + i = is.read(buf, 0, i); + if (i <= 0) { + throw new IOException("Unexpected problem reading data file " + datafile); + } + sofar += i; + os.write(buf, 0, i); + } + is.close(); + os.close(); + } catch (IOException ioe) { + dth.reportDeliveryExtra(this, sofar); + throw ioe; + } + } + } + int rc = uc.getResponseCode(); + String rmsg = uc.getResponseMessage(); + if (rmsg == null) { + String h0 = uc.getHeaderField(0); + if (h0 != null) { + int i = h0.indexOf(' '); + int j = h0.indexOf(' ', i + 1); + if (i != -1 && j != -1) { + rmsg = h0.substring(j + 1); + } + } + } + String xpubid = null; + InputStream is; + if (rc >= 200 && rc <= 299) { + is = uc.getInputStream(); + xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID"); + } else { + if (rc >= 300 && rc <= 399) { + rmsg = uc.getHeaderField("Location"); + } + is = uc.getErrorStream(); + } + byte[] buf = new byte[4096]; + if (is != null) { + while (is.read(buf) > 0) { + } + is.close(); + } + dth.reportStatus(this, rc, xpubid, rmsg); + } catch (Exception e) { + dth.reportException(this, e); + } + } + /** + * Remove meta and data files + */ + public void clean() { + datafile.delete(); + metafile.delete(); + hdrs = null; + } + /** + * Has this delivery task been cleaned? + */ + public boolean isCleaned() { + return(hdrs == null); + } + /** + * Get length of body + */ + public long getLength() { + return(length); + } + /** + * Get creation date as encoded in the publish ID. + */ + public long getDate() { + return(date); + } + /** + * Get the most recent delivery attempt URL + */ + public String getURL() { + return(url); + } + /** + * Get the content type + */ + public String getCType() { + return(ctype); + } + /** + * Get the method + */ + public String getMethod() { + return(method); + } + /** + * Get the file ID + */ + public String getFileId() { + return(fileid); + } + /** + * Get the number of delivery attempts + */ + public int getAttempts() { + return(attempts); + } + /** + * Get the (space delimited list of) subscription ID for this delivery task + */ + public String getSubId() { + return(subid); + } + /** + * Get the feed ID for this delivery task + */ + public String getFeedId() { + return(feedid); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java new file mode 100644 index 00000000..702bb29e --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java @@ -0,0 +1,72 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +/** + * Interface to allow independent testing of the DeliveryTask code. + *

+ * This interface represents all the configuraiton information and + * feedback mechanisms that a delivery task needs. + */ + +public interface DeliveryTaskHelper { + /** + * Report that a delivery attempt failed due to an exception (like can't connect to remote host) + * @param task The task that failed + * @param exception The exception that occurred + */ + public void reportException(DeliveryTask task, Exception exception); + /** + * Report that a delivery attempt completed (successfully or unsuccessfully) + * @param task The task that failed + * @param status The HTTP status + * @param xpubid The publish ID from the far end (if any) + * @param location The redirection location for a 3XX response + */ + public void reportStatus(DeliveryTask task, int status, String xpubid, String location); + /** + * Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue. + * @param task The task that failed + * @param sent The number of bytes sent or -1 if an error was returned instead of 100 Continue. + */ + public void reportDeliveryExtra(DeliveryTask task, long sent); + /** + * Get the destination information for the delivery queue + * @return The destination information + */ + public DestInfo getDestInfo(); + /** + * Given a file ID, get the URL to deliver to + * @param fileid The file id + * @return The URL to deliver to + */ + public String getDestURL(String fileid); + /** + * Get the feed ID for a subscription + * @param subid The subscription ID + * @return The feed iD + */ + public String getFeedId(String subid); +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java new file mode 100644 index 00000000..e57fef8b --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java @@ -0,0 +1,132 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +/** + * Information for a delivery destination that doesn't change from message to message + */ +public class DestInfo { + private String name; + private String spool; + private String subid; + private String logdata; + private String url; + private String authuser; + private String authentication; + private boolean metaonly; + private boolean use100; + /** + * Create a destination information object. + * @param name n:fqdn or s:subid + * @param spool The directory where files are spooled. + * @param subid The subscription ID (if applicable). + * @param logdata Text to be included in log messages + * @param url The URL to deliver to. + * @param authuser The auth user for logging. + * @param authentication The credentials. + * @param metaonly Is this a metadata only delivery? + * @param use100 Should I use expect 100-continue? + */ + public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100) { + this.name = name; + this.spool = spool; + this.subid = subid; + this.logdata = logdata; + this.url = url; + this.authuser = authuser; + this.authentication = authentication; + this.metaonly = metaonly; + this.use100 = use100; + } + public boolean equals(Object o) { + return((o instanceof DestInfo) && ((DestInfo)o).spool.equals(spool)); + } + public int hashCode() { + return(spool.hashCode()); + } + /** + * Get the name of this destination + */ + public String getName() { + return(name); + } + /** + * Get the spool directory for this destination. + * @return The spool directory + */ + public String getSpool() { + return(spool); + } + /** + * Get the subscription ID. + * @return Subscription ID or null if this is a node to node delivery. + */ + public String getSubId() { + return(subid); + } + /** + * Get the log data. + * @return Text to be included in a log message about delivery attempts. + */ + public String getLogData() { + return(logdata); + } + /** + * Get the delivery URL. + * @return The URL to deliver to (the primary URL). + */ + public String getURL() { + return(url); + + } + /** + * Get the user for authentication + * @return The name of the user for logging + */ + public String getAuthUser() { + return(authuser); + } + /** + * Get the authentication header + * @return The string to use to authenticate to the recipient. + */ + public String getAuth() { + return(authentication); + } + /** + * Is this a metadata only delivery? + * @return True if this is a metadata only delivery + */ + public boolean isMetaDataOnly() { + return(metaonly); + } + /** + * Should I send expect 100-continue header? + * @return True if I should. + */ + public boolean isUsing100() { + return(use100); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java new file mode 100644 index 00000000..bb3e4137 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java @@ -0,0 +1,82 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import java.util.*; +import java.net.*; + +/** + * Determine if an IP address is from a machine + */ +public class IsFrom { + private long nextcheck; + private String[] ips; + private String fqdn; + /** + * Configure the JVM DNS cache to have a 10 second TTL. This needs to be called very very early or it won't have any effect. + */ + public static void setDNSCache() { + java.security.Security.setProperty("networkaddress.cache.ttl", "10"); + } + /** + * Create an IsFrom for the specified fully qualified domain name. + */ + public IsFrom(String fqdn) { + this.fqdn = fqdn; + } + /** + * Check if an IP address matches. If it has been more than + * 10 seconds since DNS was last checked for changes to the + * IP address(es) of this FQDN, check again. Then check + * if the specified IP address belongs to the FQDN. + */ + public synchronized boolean isFrom(String ip) { + long now = System.currentTimeMillis(); + if (now > nextcheck) { + nextcheck = now + 10000; + Vector v = new Vector(); + try { + InetAddress[] addrs = InetAddress.getAllByName(fqdn); + for (InetAddress a: addrs) { + v.add(a.getHostAddress()); + } + } catch (Exception e) { + } + ips = v.toArray(new String[v.size()]); + } + for (String s: ips) { + if (s.equals(ip)) { + return(true); + } + } + return(false); + } + /** + * Return the fully qualified domain name + */ + public String toString() { + return(fqdn); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java new file mode 100644 index 00000000..078deaa1 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java @@ -0,0 +1,159 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ +package com.att.research.datarouter.node; + +import java.util.*; +import java.util.regex.*; +import java.io.*; +import java.nio.file.*; +import java.text.*; + +/** + * Cleanup of old log files. + *

+ * Periodically scan the log directory for log files that are older than + * the log file retention interval, and delete them. In a future release, + * This class will also be responsible for uploading events logs to the + * log server to support the log query APIs. + */ + +public class LogManager extends TimerTask { + private NodeConfigManager config; + private Matcher isnodelog; + private Matcher iseventlog; + private Uploader worker; + private String uploaddir; + private String logdir; + private class Uploader extends Thread implements DeliveryQueueHelper { + public long getInitFailureTimer() { return(10000L); } + public double getFailureBackoff() { return(2.0); } + public long getMaxFailureTimer() { return(150000L); } + public long getExpirationTimer() { return(604800000L); } + public int getFairFileLimit() { return(10000); } + public long getFairTimeLimit() { return(86400000); } + public String getDestURL(DestInfo dest, String fileid) { + return(config.getEventLogUrl()); + } + public void handleUnreachable(DestInfo dest) {} + public boolean handleRedirection(DestInfo dest, String location, String fileid) { return(false); } + public boolean isFollowRedirects() { return(false); } + public String getFeedId(String subid) { return(null); } + private DeliveryQueue dq; + public Uploader() { + dq = new DeliveryQueue(this, new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false, false)); + setDaemon(true); + setName("Log Uploader"); + start(); + } + private synchronized void snooze() { + try { + wait(10000); + } catch (Exception e) { + } + } + private synchronized void poke() { + notify(); + } + public void run() { + while (true) { + scan(); + dq.run(); + snooze(); + } + } + private void scan() { + long threshold = System.currentTimeMillis() - config.getLogRetention(); + File dir = new File(logdir); + String[] fns = dir.list(); + Arrays.sort(fns); + String lastqueued = "events-000000000000.log"; + String curlog = StatusLog.getCurLogFile(); + curlog = curlog.substring(curlog.lastIndexOf('/') + 1); + try { + Writer w = new FileWriter(uploaddir + "/.meta"); + w.write("POST\tlogdata\nContent-Type\ttext/plain\n"); + w.close(); + BufferedReader br = new BufferedReader(new FileReader(uploaddir + "/.lastqueued")); + lastqueued = br.readLine(); + br.close(); + } catch (Exception e) { + } + for (String fn: fns) { + if (!isnodelog.reset(fn).matches()) { + if (!iseventlog.reset(fn).matches()) { + continue; + } + if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) { + lastqueued = fn; + try { + String pid = config.getPublishId(); + Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn)); + Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + "/.meta")); + } catch (Exception e) { + } + } + } + File f = new File(dir, fn); + if (f.lastModified() < threshold) { + f.delete(); + } + } + try { + (new File(uploaddir + "/.meta")).delete(); + Writer w = new FileWriter(uploaddir + "/.lastqueued"); + w.write(lastqueued + "\n"); + w.close(); + } catch (Exception e) { + } + } + } + /** + * Construct a log manager + *

+ * The log manager will check for expired log files every 5 minutes + * at 20 seconds after the 5 minute boundary. (Actually, the + * interval is the event log rollover interval, which + * defaults to 5 minutes). + */ + public LogManager(NodeConfigManager config) { + this.config = config; + try { + isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher(""); + iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher(""); + } catch (Exception e) {} + logdir = config.getLogDir(); + uploaddir = logdir + "/.spool"; + (new File(uploaddir)).mkdirs(); + long now = System.currentTimeMillis(); + long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 300000); + long when = now - now % intvl + intvl + 20000L; + config.getTimer().scheduleAtFixedRate(this, when - now, intvl); + worker = new Uploader(); + } + /** + * Trigger check for expired log files and log files to upload + */ + public void run() { + worker.poke(); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java new file mode 100644 index 00000000..689f7653 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java @@ -0,0 +1,722 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import java.util.*; +import java.io.*; + +/** + * Processed configuration for this node. + *

+ * The NodeConfig represents a processed configuration from the Data Router + * provisioning server. Each time configuration data is received from the + * provisioning server, a new NodeConfig is created and the previous one + * discarded. + */ +public class NodeConfig { + /** + * Raw configuration entry for a data router node + */ + public static class ProvNode { + private String cname; + /** + * Construct a node configuration entry. + * @param cname The cname of the node. + */ + public ProvNode(String cname) { + this.cname = cname; + } + /** + * Get the cname of the node + */ + public String getCName() { + return(cname); + } + } + /** + * Raw configuration entry for a provisioning parameter + */ + public static class ProvParam { + private String name; + private String value; + /** + * Construct a provisioning parameter configuration entry. + * @param name The name of the parameter. + * @param value The value of the parameter. + */ + public ProvParam(String name, String value) { + this.name = name; + this.value = value; + } + /** + * Get the name of the parameter. + */ + public String getName() { + return(name); + } + /** + * Get the value of the parameter. + */ + public String getValue() { + return(value); + } + } + /** + * Raw configuration entry for a data feed. + */ + public static class ProvFeed { + private String id; + private String logdata; + private String status; + /** + * Construct a feed configuration entry. + * @param id The feed ID of the entry. + * @param logdata String for log entries about the entry. + * @param status The reason why this feed cannot be used (Feed has been deleted, Feed has been suspended) or null if it is valid. + */ + public ProvFeed(String id, String logdata, String status) { + this.id = id; + this.logdata = logdata; + this.status = status; + } + /** + * Get the feed id of the data feed. + */ + public String getId() { + return(id); + } + /** + * Get the log data of the data feed. + */ + public String getLogData() { + return(logdata); + } + /** + * Get the status of the data feed. + */ + public String getStatus() { + return(status); + } + } + /** + * Raw configuration entry for a feed user. + */ + public static class ProvFeedUser { + private String feedid; + private String user; + private String credentials; + /** + * Construct a feed user configuration entry + * @param feedid The feed id. + * @param user The user that will publish to the feed. + * @param credentials The Authorization header the user will use to publish. + */ + public ProvFeedUser(String feedid, String user, String credentials) { + this.feedid = feedid; + this.user = user; + this.credentials = credentials; + } + /** + * Get the feed id of the feed user. + */ + public String getFeedId() { + return(feedid); + } + /** + * Get the user for the feed user. + */ + public String getUser() { + return(user); + } + /** + * Get the credentials for the feed user. + */ + public String getCredentials() { + return(credentials); + } + } + /** + * Raw configuration entry for a feed subnet + */ + public static class ProvFeedSubnet { + private String feedid; + private String cidr; + /** + * Construct a feed subnet configuration entry + * @param feedid The feed ID + * @param cidr The CIDR allowed to publish to the feed. + */ + public ProvFeedSubnet(String feedid, String cidr) { + this.feedid = feedid; + this.cidr = cidr; + } + /** + * Get the feed id of the feed subnet. + */ + public String getFeedId() { + return(feedid); + } + /** + * Get the CIDR of the feed subnet. + */ + public String getCidr() { + return(cidr); + } + } + /** + * Raw configuration entry for a subscription + */ + public static class ProvSubscription { + private String subid; + private String feedid; + private String url; + private String authuser; + private String credentials; + private boolean metaonly; + private boolean use100; + /** + * Construct a subscription configuration entry + * @param subid The subscription ID + * @param feedid The feed ID + * @param url The base delivery URL (not including the fileid) + * @param authuser The user in the credentials used to deliver + * @param credentials The credentials used to authenticate to the delivery URL exactly as they go in the Authorization header. + * @param metaonly Is this a meta data only subscription? + * @param use100 Should we send Expect: 100-continue? + */ + public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, boolean metaonly, boolean use100) { + this.subid = subid; + this.feedid = feedid; + this.url = url; + this.authuser = authuser; + this.credentials = credentials; + this.metaonly = metaonly; + this.use100 = use100; + } + /** + * Get the subscription ID + */ + public String getSubId() { + return(subid); + } + /** + * Get the feed ID + */ + public String getFeedId() { + return(feedid); + } + /** + * Get the delivery URL + */ + public String getURL() { + return(url); + } + /** + * Get the user + */ + public String getAuthUser() { + return(authuser); + } + /** + * Get the delivery credentials + */ + public String getCredentials() { + return(credentials); + } + /** + * Is this a meta data only subscription? + */ + public boolean isMetaDataOnly() { + return(metaonly); + } + /** + * Should we send Expect: 100-continue? + */ + public boolean isUsing100() { + return(use100); + } + } + /** + * Raw configuration entry for controlled ingress to the data router node + */ + public static class ProvForceIngress { + private String feedid; + private String subnet; + private String user; + private String[] nodes; + /** + * Construct a forced ingress configuration entry + * @param feedid The feed ID that this entry applies to + * @param subnet The CIDR for which publisher IP addresses this entry applies to or "" if it applies to all publisher IP addresses + * @param user The publishing user this entry applies to or "" if it applies to all publishing users. + * @param nodes The array of FQDNs of the data router nodes to redirect publication attempts to. + */ + public ProvForceIngress(String feedid, String subnet, String user, String[] nodes) { + this.feedid = feedid; + this.subnet = subnet; + this.user = user; + this.nodes = nodes; + } + /** + * Get the feed ID + */ + public String getFeedId() { + return(feedid); + } + /** + * Get the subnet + */ + public String getSubnet() { + return(subnet); + } + /** + * Get the user + */ + public String getUser() { + return(user); + } + /** + * Get the node + */ + public String[] getNodes() { + return(nodes); + } + } + /** + * Raw configuration entry for controlled egress from the data router + */ + public static class ProvForceEgress { + private String subid; + private String node; + /** + * Construct a forced egress configuration entry + * @param subid The subscription ID the subscription with forced egress + * @param node The node handling deliveries for this subscription + */ + public ProvForceEgress(String subid, String node) { + this.subid = subid; + this.node = node; + } + /** + * Get the subscription ID + */ + public String getSubId() { + return(subid); + } + /** + * Get the node + */ + public String getNode() { + return(node); + } + } + /** + * Raw configuration entry for routing within the data router network + */ + public static class ProvHop { + private String from; + private String to; + private String via; + /** + * A human readable description of this entry + */ + public String toString() { + return("Hop " + from + "->" + to + " via " + via); + } + /** + * Construct a hop entry + * @param from The FQDN of the node with the data to be delivered + * @param to The FQDN of the node that will deliver to the subscriber + * @param via The FQDN of the node where the from node should send the data + */ + public ProvHop(String from, String to, String via) { + this.from = from; + this.to = to; + this.via = via; + } + /** + * Get the from node + */ + public String getFrom() { + return(from); + } + /** + * Get the to node + */ + public String getTo() { + return(to); + } + /** + * Get the next intermediate node + */ + public String getVia() { + return(via); + } + } + private static class Redirection { + public SubnetMatcher snm; + public String user; + public String[] nodes; + } + private static class Feed { + public String loginfo; + public String status; + public SubnetMatcher[] subnets; + public Hashtable authusers = new Hashtable(); + public Redirection[] redirections; + public Target[] targets; + } + private Hashtable params = new Hashtable(); + private Hashtable feeds = new Hashtable(); + private Hashtable nodeinfo = new Hashtable(); + private Hashtable subinfo = new Hashtable(); + private Hashtable nodes = new Hashtable(); + private String myname; + private String myauth; + private DestInfo[] alldests; + private int rrcntr; + /** + * Process the raw provisioning data to configure this node + * @param pd The parsed provisioning data + * @param myname My name as seen by external systems + * @param spooldir The directory where temporary files live + * @param port The port number for URLs + * @param nodeauthkey The keying string used to generate node authentication credentials + */ + public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) { + this.myname = myname; + for (ProvParam p: pd.getParams()) { + params.put(p.getName(), p.getValue()); + } + Vector div = new Vector(); + myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey); + for (ProvNode pn: pd.getNodes()) { + String cn = pn.getCName(); + if (nodeinfo.get(cn) != null) { + continue; + } + String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey); + DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn, "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true); + (new File(di.getSpool())).mkdirs(); + div.add(di); + nodeinfo.put(cn, di); + nodes.put(auth, new IsFrom(cn)); + } + PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[nodeinfo.size()]), pd.getHops()); + Hashtable> rdtab = new Hashtable>(); + for (ProvForceIngress pfi: pd.getForceIngress()) { + Vector v = rdtab.get(pfi.getFeedId()); + if (v == null) { + v = new Vector(); + rdtab.put(pfi.getFeedId(), v); + } + Redirection r = new Redirection(); + if (pfi.getSubnet() != null) { + r.snm = new SubnetMatcher(pfi.getSubnet()); + } + r.user = pfi.getUser(); + r.nodes = pfi.getNodes(); + v.add(r); + } + Hashtable> pfutab = new Hashtable>(); + for (ProvFeedUser pfu: pd.getFeedUsers()) { + Hashtable t = pfutab.get(pfu.getFeedId()); + if (t == null) { + t = new Hashtable(); + pfutab.put(pfu.getFeedId(), t); + } + t.put(pfu.getCredentials(), pfu.getUser()); + } + Hashtable egrtab = new Hashtable(); + for (ProvForceEgress pfe: pd.getForceEgress()) { + if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) { + continue; + } + egrtab.put(pfe.getSubId(), pfe.getNode()); + } + Hashtable> pfstab = new Hashtable>(); + for (ProvFeedSubnet pfs: pd.getFeedSubnets()) { + Vector v = pfstab.get(pfs.getFeedId()); + if (v == null) { + v = new Vector(); + pfstab.put(pfs.getFeedId(), v); + } + v.add(new SubnetMatcher(pfs.getCidr())); + } + Hashtable ttab = new Hashtable(); + HashSet allfeeds = new HashSet(); + for (ProvFeed pfx: pd.getFeeds()) { + if (pfx.getStatus() == null) { + allfeeds.add(pfx.getId()); + } + } + for (ProvSubscription ps: pd.getSubscriptions()) { + String sid = ps.getSubId(); + String fid = ps.getFeedId(); + if (!allfeeds.contains(fid)) { + continue; + } + if (subinfo.get(sid) != null) { + continue; + } + int sididx = 999; + try { + sididx = Integer.parseInt(sid); + sididx -= sididx % 100; + } catch (Exception e) { + } + String siddir = sididx + "/" + sid; + DestInfo di = new DestInfo("s:" + sid, spooldir + "/s/" + siddir, sid, fid, ps.getURL(), ps.getAuthUser(), ps.getCredentials(), ps.isMetaDataOnly(), ps.isUsing100()); + (new File(di.getSpool())).mkdirs(); + div.add(di); + subinfo.put(sid, di); + String egr = egrtab.get(sid); + if (egr != null) { + sid = pf.getPath(egr) + sid; + } + StringBuffer sb = ttab.get(fid); + if (sb == null) { + sb = new StringBuffer(); + ttab.put(fid, sb); + } + sb.append(' ').append(sid); + } + alldests = div.toArray(new DestInfo[div.size()]); + for (ProvFeed pfx: pd.getFeeds()) { + String fid = pfx.getId(); + Feed f = feeds.get(fid); + if (f != null) { + continue; + } + f = new Feed(); + feeds.put(fid, f); + f.loginfo = pfx.getLogData(); + f.status = pfx.getStatus(); + Vector v1 = pfstab.get(fid); + if (v1 == null) { + f.subnets = new SubnetMatcher[0]; + } else { + f.subnets = v1.toArray(new SubnetMatcher[v1.size()]); + } + Hashtable h1 = pfutab.get(fid); + if (h1 == null) { + h1 = new Hashtable(); + } + f.authusers = h1; + Vector v2 = rdtab.get(fid); + if (v2 == null) { + f.redirections = new Redirection[0]; + } else { + f.redirections = v2.toArray(new Redirection[v2.size()]); + } + StringBuffer sb = ttab.get(fid); + if (sb == null) { + f.targets = new Target[0]; + } else { + f.targets = parseRouting(sb.toString()); + } + } + } + /** + * Parse a target string into an array of targets + * @param routing Target string + * @return Array of targets. + */ + public Target[] parseRouting(String routing) { + routing = routing.trim(); + if ("".equals(routing)) { + return(new Target[0]); + } + String[] xx = routing.split("\\s+"); + Hashtable tmap = new Hashtable(); + HashSet subset = new HashSet(); + Vector tv = new Vector(); + Target[] ret = new Target[xx.length]; + for (int i = 0; i < xx.length; i++) { + String t = xx[i]; + int j = t.indexOf('/'); + if (j == -1) { + DestInfo di = subinfo.get(t); + if (di == null) { + tv.add(new Target(null, t)); + } else { + if (!subset.contains(t)) { + subset.add(t); + tv.add(new Target(di, null)); + } + } + } else { + String node = t.substring(0, j); + String rtg = t.substring(j + 1); + DestInfo di = nodeinfo.get(node); + if (di == null) { + tv.add(new Target(null, t)); + } else { + Target tt = tmap.get(node); + if (tt == null) { + tt = new Target(di, rtg); + tmap.put(node, tt); + tv.add(tt); + } else { + tt.addRouting(rtg); + } + } + } + } + return(tv.toArray(new Target[tv.size()])); + } + /** + * Check whether this is a valid node-to-node transfer + * @param credentials Credentials offered by the supposed node + * @param ip IP address the request came from + */ + public boolean isAnotherNode(String credentials, String ip) { + IsFrom n = nodes.get(credentials); + return (n != null && n.isFrom(ip)); + } + /** + * Check whether publication is allowed. + * @param feedid The ID of the feed being requested. + * @param credentials The offered credentials + * @param ip The requesting IP address + */ + public String isPublishPermitted(String feedid, String credentials, String ip) { + Feed f = feeds.get(feedid); + String nf = "Feed does not exist"; + if (f != null) { + nf = f.status; + } + if (nf != null) { + return(nf); + } + String user = f.authusers.get(credentials); + if (user == null) { + return("Publisher not permitted for this feed"); + } + if (f.subnets.length == 0) { + return(null); + } + byte[] addr = NodeUtils.getInetAddress(ip); + for (SubnetMatcher snm: f.subnets) { + if (snm.matches(addr)) { + return(null); + } + } + return("Publisher not permitted for this feed"); + } + /** + * Get authenticated user + */ + public String getAuthUser(String feedid, String credentials) { + return(feeds.get(feedid).authusers.get(credentials)); + } + /** + * Check if the request should be redirected to a different ingress node + */ + public String getIngressNode(String feedid, String user, String ip) { + Feed f = feeds.get(feedid); + if (f.redirections.length == 0) { + return(null); + } + byte[] addr = NodeUtils.getInetAddress(ip); + for (Redirection r: f.redirections) { + if (r.user != null && !user.equals(r.user)) { + continue; + } + if (r.snm != null && !r.snm.matches(addr)) { + continue; + } + for (String n: r.nodes) { + if (myname.equals(n)) { + return(null); + } + } + if (r.nodes.length == 0) { + return(null); + } + return(r.nodes[rrcntr++ % r.nodes.length]); + } + return(null); + } + /** + * Get a provisioned configuration parameter + */ + public String getProvParam(String name) { + return(params.get(name)); + } + /** + * Get all the DestInfos + */ + public DestInfo[] getAllDests() { + return(alldests); + } + /** + * Get the targets for a feed + * @param feedid The feed ID + * @return The targets this feed should be delivered to + */ + public Target[] getTargets(String feedid) { + if (feedid == null) { + return(new Target[0]); + } + Feed f = feeds.get(feedid); + if (f == null) { + return(new Target[0]); + } + return(f.targets); + } + /** + * Get the feed ID for a subscription + * @param subid The subscription ID + * @return The feed ID + */ + public String getFeedId(String subid) { + DestInfo di = subinfo.get(subid); + if (di == null) { + return(null); + } + return(di.getLogData()); + } + /** + * Get the spool directory for a subscription + * @param subid The subscription ID + * @return The spool directory + */ + public String getSpoolDir(String subid) { + DestInfo di = subinfo.get(subid); + if (di == null) { + return(null); + } + return(di.getSpool()); + } + /** + * Get the Authorization value this node uses + * @return The Authorization header value for this node + */ + public String getMyAuth() { + return(myauth); + } + +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java new file mode 100644 index 00000000..01ca4426 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java @@ -0,0 +1,599 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import java.net.*; +import java.util.*; +import java.io.*; +import org.apache.log4j.Logger; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.research.datarouter.node.eelf.EelfMsgs; + + +/** + * Maintain the configuration of a Data Router node + *

+ * The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention subsystems to access configuration information. (Log4J has its own configuration mechanism). + *

+ * There are two basic sets of configuration data. The + * static local configuration data, stored in a local configuration file (created + * as part of installation by SWM), and the dynamic global + * configuration data fetched from the data router provisioning server. + */ +public class NodeConfigManager implements DeliveryQueueHelper { + private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeConfigManager"); + private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeConfigManager"); + private static NodeConfigManager base = new NodeConfigManager(); + + private Timer timer = new Timer("Node Configuration Timer", true); + private long maxfailuretimer; + private long initfailuretimer; + private long expirationtimer; + private double failurebackoff; + private long fairtimelimit; + private int fairfilelimit; + private double fdpstart; + private double fdpstop; + private int deliverythreads; + private String provurl; + private String provhost; + private IsFrom provcheck; + private int gfport; + private int svcport; + private int port; + private String spooldir; + private String logdir; + private long logretention; + private String redirfile; + private String kstype; + private String ksfile; + private String kspass; + private String kpass; + private String tstype; + private String tsfile; + private String tspass; + private String myname; + private RedirManager rdmgr; + private RateLimitedOperation pfetcher; + private NodeConfig config; + private File quiesce; + private PublishId pid; + private String nak; + private TaskList configtasks = new TaskList(); + private String eventlogurl; + private String eventlogprefix; + private String eventlogsuffix; + private String eventloginterval; + private boolean followredirects; + + + /** + * Get the default node configuration manager + */ + public static NodeConfigManager getInstance() { + return(base); + } + /** + * Initialize the configuration of a Data Router node + */ + private NodeConfigManager() { + Properties p = new Properties(); + try { + p.load(new FileInputStream(System.getProperty("com.att.research.datarouter.node.ConfigFile", "/opt/app/datartr/etc/node.properties"))); + } catch (Exception e) { + + NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); + eelflogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR); + logger.error("NODE0301 Unable to load local configuration file " + System.getProperty("com.att.research.datarouter.node.ConfigFile", "/opt/app/datartr/etc/node.properties"), e); + } + provurl = p.getProperty("ProvisioningURL", "https://feeds-drtr.web.att.com/internal/prov"); + try { + provhost = (new URL(provurl)).getHost(); + } catch (Exception e) { + NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); + eelflogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, provurl); + logger.error("NODE0302 Bad provisioning server URL " + provurl); + System.exit(1); + } + logger.info("NODE0303 Provisioning server is " + provhost); + eventlogurl = p.getProperty("LogUploadURL", "https://feeds-drtr.web.att.com/internal/logs"); + provcheck = new IsFrom(provhost); + gfport = Integer.parseInt(p.getProperty("IntHttpPort", "8080")); + svcport = Integer.parseInt(p.getProperty("IntHttpsPort", "8443")); + port = Integer.parseInt(p.getProperty("ExtHttpsPort", "443")); + long minpfinterval = Long.parseLong(p.getProperty("MinProvFetchInterval", "10000")); + long minrsinterval = Long.parseLong(p.getProperty("MinRedirSaveInterval", "10000")); + spooldir = p.getProperty("SpoolDir", "spool"); + File fdir = new File(spooldir + "/f"); + fdir.mkdirs(); + for (File junk: fdir.listFiles()) { + if (junk.isFile()) { + junk.delete(); + } + } + logdir = p.getProperty("LogDir", "logs"); + (new File(logdir)).mkdirs(); + logretention = Long.parseLong(p.getProperty("LogRetention", "30")) * 86400000L; + eventlogprefix = logdir + "/events"; + eventlogsuffix = ".log"; + String redirfile = p.getProperty("RedirectionFile", "etc/redirections.dat"); + kstype = p.getProperty("KeyStoreType", "jks"); + ksfile = p.getProperty("KeyStoreFile", "etc/keystore"); + kspass = p.getProperty("KeyStorePassword", "changeme"); + kpass = p.getProperty("KeyPassword", "changeme"); + tstype = p.getProperty("TrustStoreType", "jks"); + tsfile = p.getProperty("TrustStoreFile"); + tspass = p.getProperty("TrustStorePassword", "changeme"); + if (tsfile != null && tsfile.length() > 0) { + System.setProperty("javax.net.ssl.trustStoreType", tstype); + System.setProperty("javax.net.ssl.trustStore", tsfile); + System.setProperty("javax.net.ssl.trustStorePassword", tspass); + } + nak = p.getProperty("NodeAuthKey", "Node123!"); + quiesce = new File(p.getProperty("QuiesceFile", "etc/SHUTDOWN")); + myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass); + if (myname == null) { + NodeUtils.setIpAndFqdnForEelf("NodeConfigManager"); + eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile); + logger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile); + System.exit(1); + } + logger.info("NODE0304 My certificate says my name is " + myname); + pid = new PublishId(myname); + rdmgr = new RedirManager(redirfile, minrsinterval, timer); + pfetcher = new RateLimitedOperation(minpfinterval, timer) { + public void run() { + fetchconfig(); + } + }; + logger.info("NODE0305 Attempting to fetch configuration at " + provurl); + pfetcher.request(); + } + private void localconfig() { + followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false")); + eventloginterval = getProvParam("LOGROLL_INTERVAL", "5m"); + initfailuretimer = 10000; + maxfailuretimer = 3600000; + expirationtimer = 86400000; + failurebackoff = 2.0; + deliverythreads = 40; + fairfilelimit = 100; + fairtimelimit = 60000; + fdpstart = 0.05; + fdpstop = 0.2; + try { initfailuretimer = (long)(Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000); } catch (Exception e) {} + try { maxfailuretimer = (long)(Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000); } catch (Exception e) {} + try { expirationtimer = (long)(Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000); } catch (Exception e) {} + try { failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO")); } catch (Exception e) {} + try { deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS")); } catch (Exception e) {} + try { fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT")); } catch (Exception e) {} + try { fairtimelimit = (long)(Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000); } catch (Exception e) {} + try { fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0; } catch (Exception e) {} + try { fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0; } catch (Exception e) {} + if (fdpstart < 0.01) { + fdpstart = 0.01; + } + if (fdpstart > 0.5) { + fdpstart = 0.5; + } + if (fdpstop < fdpstart) { + fdpstop = fdpstart; + } + if (fdpstop > 0.5) { + fdpstop = 0.5; + } + } + private void fetchconfig() { + try { + System.out.println("provurl:: "+provurl); + Reader r = new InputStreamReader((new URL(provurl)).openStream()); + config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak); + localconfig(); + configtasks.startRun(); + Runnable rr; + while ((rr = configtasks.next()) != null) { + try { + rr.run(); + } catch (Exception e) { + } + } + } catch (Exception e) { + e.printStackTrace(); + NodeUtils.setIpAndFqdnForEelf("fetchconfigs"); + eelflogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString()); + logger.error("NODE0306 Configuration failed " + e.toString() + " - try again later", e); + pfetcher.request(); + } + } + /** + * Process a gofetch request from a particular IP address. If the + * IP address is not an IP address we would go to to fetch the + * provisioning data, ignore the request. If the data has been + * fetched very recently (default 10 seconds), wait a while before fetching again. + */ + public synchronized void gofetch(String remoteaddr) { + if (provcheck.isFrom(remoteaddr)) { + logger.info("NODE0307 Received configuration fetch request from provisioning server " + remoteaddr); + pfetcher.request(); + } else { + logger.info("NODE0308 Received configuration fetch request from unexpected server " + remoteaddr); + } + } + /** + * Am I configured? + */ + public boolean isConfigured() { + return(config != null); + } + /** + * Am I shut down? + */ + public boolean isShutdown() { + return(quiesce.exists()); + } + /** + * Given a routing string, get the targets. + * @param routing Target string + * @return array of targets + */ + public Target[] parseRouting(String routing) { + return(config.parseRouting(routing)); + } + /** + * Given a set of credentials and an IP address, is this request from another node? + * @param credentials Credentials offered by the supposed node + * @param ip IP address the request came from + * @return If the credentials and IP address are recognized, true, otherwise false. + */ + public boolean isAnotherNode(String credentials, String ip) { + return(config.isAnotherNode(credentials, ip)); + } + /** + * Check whether publication is allowed. + * @param feedid The ID of the feed being requested + * @param credentials The offered credentials + * @param ip The requesting IP address + * @return True if the IP and credentials are valid for the specified feed. + */ + public String isPublishPermitted(String feedid, String credentials, String ip) { + return(config.isPublishPermitted(feedid, credentials, ip)); + } + /** + * Check who the user is given the feed ID and the offered credentials. + * @param feedid The ID of the feed specified + * @param credentials The offered credentials + * @return Null if the credentials are invalid or the user if they are valid. + */ + public String getAuthUser(String feedid, String credentials) { + return(config.getAuthUser(feedid, credentials)); + } + /** + * Check if the publish request should be sent to another node based on the feedid, user, and source IP address. + * @param feedid The ID of the feed specified + * @param user The publishing user + * @param ip The IP address of the publish endpoint + * @return Null if the request should be accepted or the correct hostname if it should be sent to another node. + */ + public String getIngressNode(String feedid, String user, String ip) { + return(config.getIngressNode(feedid, user, ip)); + } + /** + * Get a provisioned configuration parameter (from the provisioning server configuration) + * @param name The name of the parameter + * @return The value of the parameter or null if it is not defined. + */ + public String getProvParam(String name) { + return(config.getProvParam(name)); + } + /** + * Get a provisioned configuration parameter (from the provisioning server configuration) + * @param name The name of the parameter + * @param deflt The value to use if the parameter is not defined + * @return The value of the parameter or deflt if it is not defined. + */ + public String getProvParam(String name, String deflt) { + name = config.getProvParam(name); + if (name == null) { + name = deflt; + } + return(name); + } + /** + * Generate a publish ID + */ + public String getPublishId() { + return(pid.next()); + } + /** + * Get all the outbound spooling destinations. + * This will include both subscriptions and nodes. + */ + public DestInfo[] getAllDests() { + return(config.getAllDests()); + } + /** + * Register a task to run whenever the configuration changes + */ + public void registerConfigTask(Runnable task) { + configtasks.addTask(task); + } + /** + * Deregister a task to run whenever the configuration changes + */ + public void deregisterConfigTask(Runnable task) { + configtasks.removeTask(task); + } + /** + * Get the URL to deliver a message to. + * @param destinfo The destination information + * @param fileid The file ID + * @return The URL to deliver to + */ + public String getDestURL(DestInfo destinfo, String fileid) { + String subid = destinfo.getSubId(); + String purl = destinfo.getURL(); + if (followredirects && subid != null) { + purl = rdmgr.lookup(subid, purl); + } + return(purl + "/" + fileid); + } + /** + * Is a destination redirected? + */ + public boolean isDestRedirected(DestInfo destinfo) { + return(followredirects && rdmgr.isRedirected(destinfo.getSubId())); + } + /** + * Set up redirection on receipt of a 3XX from a target URL + */ + public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) { + fileid = "/" + fileid; + String subid = destinfo.getSubId(); + String purl = destinfo.getURL(); + if (followredirects && subid != null && redirto.endsWith(fileid)) { + redirto = redirto.substring(0, redirto.length() - fileid.length()); + if (!redirto.equals(purl)) { + rdmgr.redirect(subid, purl, redirto); + return(true); + } + } + return(false); + } + /** + * Handle unreachable target URL + */ + public void handleUnreachable(DestInfo destinfo) { + String subid = destinfo.getSubId(); + if (followredirects && subid != null) { + rdmgr.forget(subid); + } + } + /** + * Get the timeout before retrying after an initial delivery failure + */ + public long getInitFailureTimer() { + return(initfailuretimer); + } + /** + * Get the maximum timeout between delivery attempts + */ + public long getMaxFailureTimer() { + return(maxfailuretimer); + } + /** + * Get the ratio between consecutive delivery attempts + */ + public double getFailureBackoff() { + return(failurebackoff); + } + /** + * Get the expiration timer for deliveries + */ + public long getExpirationTimer() { + return(expirationtimer); + } + /** + * Get the maximum number of file delivery attempts before checking + * if another queue has work to be performed. + */ + public int getFairFileLimit() { + return(fairfilelimit); + } + /** + * Get the maximum amount of time spent delivering files before + * checking if another queue has work to be performed. + */ + public long getFairTimeLimit() { + return(fairtimelimit); + } + /** + * Get the targets for a feed + * @param feedid The feed ID + * @return The targets this feed should be delivered to + */ + public Target[] getTargets(String feedid) { + return(config.getTargets(feedid)); + } + /** + * Get the spool directory for temporary files + */ + public String getSpoolDir() { + return(spooldir + "/f"); + } + /** + * Get the base directory for spool directories + */ + public String getSpoolBase() { + return(spooldir); + } + /** + * Get the key store type + */ + public String getKSType() { + return(kstype); + } + /** + * Get the key store file + */ + public String getKSFile() { + return(ksfile); + } + /** + * Get the key store password + */ + public String getKSPass() { + return(kspass); + } + /** + * Get the key password + */ + public String getKPass() { + return(kpass); + } + /** + * Get the http port + */ + public int getHttpPort() { + return(gfport); + } + /** + * Get the https port + */ + public int getHttpsPort() { + return(svcport); + } + /** + * Get the externally visible https port + */ + public int getExtHttpsPort() { + return(port); + } + /** + * Get the external name of this machine + */ + public String getMyName() { + return(myname); + } + /** + * Get the number of threads to use for delivery + */ + public int getDeliveryThreads() { + return(deliverythreads); + } + /** + * Get the URL for uploading the event log data + */ + public String getEventLogUrl() { + return(eventlogurl); + } + /** + * Get the prefix for the names of event log files + */ + public String getEventLogPrefix() { + return(eventlogprefix); + } + /** + * Get the suffix for the names of the event log files + */ + public String getEventLogSuffix() { + return(eventlogsuffix); + } + /** + * Get the interval between event log file rollovers + */ + public String getEventLogInterval() { + return(eventloginterval); + } + /** + * Should I follow redirects from subscribers? + */ + public boolean isFollowRedirects() { + return(followredirects); + } + /** + * Get the directory where the event and node log files live + */ + public String getLogDir() { + return(logdir); + } + /** + * How long do I keep log files (in milliseconds) + */ + public long getLogRetention() { + return(logretention); + } + /** + * Get the timer + */ + public Timer getTimer() { + return(timer); + } + /** + * Get the feed ID for a subscription + * @param subid The subscription ID + * @return The feed ID + */ + public String getFeedId(String subid) { + return(config.getFeedId(subid)); + } + /** + * Get the authorization string this node uses + * @return The Authorization string for this node + */ + public String getMyAuth() { + return(config.getMyAuth()); + } + /** + * Get the fraction of free spool disk space where we start throwing away undelivered files. This is FREE_DISK_RED_PERCENT / 100.0. Default is 0.05. Limited by 0.01 <= FreeDiskStart <= 0.5. + */ + public double getFreeDiskStart() { + return(fdpstart); + } + /** + * Get the fraction of free spool disk space where we stop throwing away undelivered files. This is FREE_DISK_YELLOW_PERCENT / 100.0. Default is 0.2. Limited by FreeDiskStart <= FreeDiskStop <= 0.5. + */ + public double getFreeDiskStop() { + return(fdpstop); + } + /** + * Get the spool directory for a subscription + */ + public String getSpoolDir(String subid, String remoteaddr) { + if (provcheck.isFrom(remoteaddr)) { + String sdir = config.getSpoolDir(subid); + if (sdir != null) { + logger.info("NODE0310 Received subscription reset request for subscription " + subid + " from provisioning server " + remoteaddr); + } else { + logger.info("NODE0311 Received subscription reset request for unknown subscription " + subid + " from provisioning server " + remoteaddr); + } + return(sdir); + } else { + logger.info("NODE0312 Received subscription reset request from unexpected server " + remoteaddr); + return(null); + } + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java new file mode 100644 index 00000000..c9390419 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java @@ -0,0 +1,113 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import org.eclipse.jetty.servlet.*; +import org.eclipse.jetty.util.ssl.*; +import org.eclipse.jetty.server.*; +import org.eclipse.jetty.server.nio.*; +import org.eclipse.jetty.server.ssl.*; +import org.apache.log4j.Logger; + +/** + * The main starting point for the Data Router node + */ +public class NodeMain { + private NodeMain() {} + private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeMain"); + private static class wfconfig implements Runnable { + private NodeConfigManager ncm; + public wfconfig(NodeConfigManager ncm) { + this.ncm = ncm; + } + public synchronized void run() { + notify(); + } + public synchronized void waitforconfig() { + ncm.registerConfigTask(this); + while (!ncm.isConfigured()) { + logger.info("NODE0003 Waiting for Node Configuration"); + try { + wait(); + } catch (Exception e) { + } + } + ncm.deregisterConfigTask(this); + logger.info("NODE0004 Node Configuration Data Received"); + } + } + private static Delivery d; + private static NodeConfigManager ncm; + /** + * Reset the retry timer for a subscription + */ + public static void resetQueue(String subid, String ip) { + d.resetQueue(ncm.getSpoolDir(subid, ip)); + } + /** + * Start the data router. + *

+ * The location of the node configuration file can be set using the + * com.att.research.datarouter.node.ConfigFile system property. By + * default, it is "etc/node.properties". + */ + public static void main(String[] args) throws Exception { + logger.info("NODE0001 Data Router Node Starting"); + IsFrom.setDNSCache(); + ncm = NodeConfigManager.getInstance(); + logger.info("NODE0002 I am " + ncm.getMyName()); + (new wfconfig(ncm)).waitforconfig(); + d = new Delivery(ncm); + LogManager lm = new LogManager(ncm); + Server server = new Server(); + SelectChannelConnector http = new SelectChannelConnector(); + http.setPort(ncm.getHttpPort()); + http.setMaxIdleTime(2000); + http.setRequestHeaderSize(2048); + SslSelectChannelConnector https = new SslSelectChannelConnector(); + https.setPort(ncm.getHttpsPort()); + https.setMaxIdleTime(30000); + https.setRequestHeaderSize(8192); + SslContextFactory cf = https.getSslContextFactory(); + + /**Skip SSLv3 Fixes*/ + cf.addExcludeProtocols("SSLv3"); + logger.info("Excluded protocols node-"+cf.getExcludeProtocols()); + /**End of SSLv3 Fixes*/ + + cf.setKeyStoreType(ncm.getKSType()); + cf.setKeyStorePath(ncm.getKSFile()); + cf.setKeyStorePassword(ncm.getKSPass()); + cf.setKeyManagerPassword(ncm.getKPass()); + server.setConnectors(new Connector[] { http, https }); + ServletContextHandler ctxt = new ServletContextHandler(0); + ctxt.setContextPath("/"); + server.setHandler(ctxt); + ctxt.addServlet(new ServletHolder(new NodeServlet()), "/*"); + logger.info("NODE0005 Data Router Node Activating Service"); + server.start(); + server.join(); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java new file mode 100644 index 00000000..e0ec1f5b --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java @@ -0,0 +1,380 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import javax.servlet.*; +import javax.servlet.http.*; +import java.util.*; +import java.util.regex.*; +import java.io.*; +import java.nio.file.*; +import org.apache.log4j.Logger; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.research.datarouter.node.eelf.EelfMsgs; + +import java.net.*; + +/** + * Servlet for handling all http and https requests to the data router node + *

+ * Handled requests are: + *
+ * GET http://node/internal/fetchProv - fetch the provisioning data + *
+ * PUT/DELETE https://node/internal/publish/fileid - n2n transfer + *
+ * PUT/DELETE https://node/publish/feedid/fileid - publsh request + */ +public class NodeServlet extends HttpServlet { + private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeServlet"); + private static NodeConfigManager config; + private static Pattern MetaDataPattern; + private static SubnetMatcher internalsubnet = new SubnetMatcher("135.207.136.128/25"); + //Adding EELF Logger Rally:US664892 + private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeServlet"); + + static { + try { + String ws = "\\s*"; + // assume that \\ and \" have been replaced by X + String string = "\"[^\"]*\""; + //String string = "\"(?:[^\"\\\\]|\\\\.)*\""; + String number = "[+-]?(?:\\.\\d+|(?:0|[1-9]\\d*)(?:\\.\\d*)?)(?:[eE][+-]?\\d+)?"; + String value = "(?:" + string + "|" + number + "|null|true|false)"; + String item = string + ws + ":" + ws + value + ws; + String object = ws + "\\{" + ws + "(?:" + item + "(?:" + "," + ws + item + ")*)?\\}" + ws; + MetaDataPattern = Pattern.compile(object, Pattern.DOTALL); + } catch (Exception e) { + } + } + /** + * Get the NodeConfigurationManager + */ + public void init() { + config = NodeConfigManager.getInstance(); + logger.info("NODE0101 Node Servlet Configured"); + } + private boolean down(HttpServletResponse resp) throws IOException { + if (config.isShutdown() || !config.isConfigured()) { + resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); + logger.info("NODE0102 Rejecting request: Service is being quiesced"); + return(true); + } + return(false); + } + /** + * Handle a GET for /internal/fetchProv + */ + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + NodeUtils.setIpAndFqdnForEelf("doGet"); + eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+""); + if (down(resp)) { + return; + } + String path = req.getPathInfo(); + String qs = req.getQueryString(); + String ip = req.getRemoteAddr(); + if (qs != null) { + path = path + "?" + qs; + } + if ("/internal/fetchProv".equals(path)) { + config.gofetch(ip); + resp.setStatus(HttpServletResponse.SC_NO_CONTENT); + return; + } else if (path.startsWith("/internal/resetSubscription/")) { + String subid = path.substring(28); + if (subid.length() != 0 && subid.indexOf('/') == -1) { + NodeMain.resetQueue(subid, ip); + resp.setStatus(HttpServletResponse.SC_NO_CONTENT); + return; + } + } + if (internalsubnet.matches(NodeUtils.getInetAddress(ip))) { + if (path.startsWith("/internal/logs/")) { + String f = path.substring(15); + File fn = new File(config.getLogDir() + "/" + f); + if (f.indexOf('/') != -1 || !fn.isFile()) { + logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip); + resp.sendError(HttpServletResponse.SC_NOT_FOUND); + return; + } + byte[] buf = new byte[65536]; + resp.setContentType("text/plain"); + resp.setContentLength((int)fn.length()); + resp.setStatus(200); + InputStream is = new FileInputStream(fn); + OutputStream os = resp.getOutputStream(); + int i; + while ((i = is.read(buf)) > 0) { + os.write(buf, 0, i); + } + is.close(); + return; + } + if (path.startsWith("/internal/rtt/")) { + String xip = path.substring(14); + long st = System.currentTimeMillis(); + String status = " unknown"; + try { + Socket s = new Socket(xip, 443); + s.close(); + status = " connected"; + } catch (Exception e) { + status = " error " + e.toString(); + } + long dur = System.currentTimeMillis() - st; + resp.setContentType("text/plain"); + resp.setStatus(200); + byte[] buf = (dur + status + "\n").getBytes(); + resp.setContentLength(buf.length); + resp.getOutputStream().write(buf); + return; + } + } + logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip); + resp.sendError(HttpServletResponse.SC_NOT_FOUND); + return; + } + /** + * Handle all PUT requests + */ + protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + NodeUtils.setIpAndFqdnForEelf("doPut"); + eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+""); + common(req, resp, true); + } + /** + * Handle all DELETE requests + */ + protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + NodeUtils.setIpAndFqdnForEelf("doDelete"); + eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+""); + common(req, resp, false); + } + private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws ServletException, IOException { + if (down(resp)) { + return; + } + if (!req.isSecure()) { + logger.info("NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests"); + return; + } + String fileid = req.getPathInfo(); + if (fileid == null) { + logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting /."); + return; + } + String feedid = null; + String user = null; + String credentials = req.getHeader("Authorization"); + if (credentials == null) { + logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Authorization header required"); + return; + } + String ip = req.getRemoteAddr(); + String lip = req.getLocalAddr(); + String pubid = null; + String xpubid = null; + String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip; + Target[] targets = null; + if (fileid.startsWith("/publish/")) { + fileid = fileid.substring(9); + int i = fileid.indexOf('/'); + if (i == -1 || i == fileid.length() - 1) { + logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting /. Possible missing fileid."); + return; + } + feedid = fileid.substring(0, i); + fileid = fileid.substring(i + 1); + pubid = config.getPublishId(); + xpubid = req.getHeader("X-ATT-DR-PUBLISH-ID"); + targets = config.getTargets(feedid); + } else if (fileid.startsWith("/internal/publish/")) { + if (!config.isAnotherNode(credentials, ip)) { + logger.info("NODE0107 Rejecting unauthorized node-to-node transfer attempt from " + ip); + resp.sendError(HttpServletResponse.SC_FORBIDDEN); + return; + } + fileid = fileid.substring(18); + pubid = req.getHeader("X-ATT-DR-PUBLISH-ID"); + targets = config.parseRouting(req.getHeader("X-ATT-DR-ROUTING")); + } else { + logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting /."); + return; + } + if (fileid.indexOf('/') != -1) { + logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr()); + resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI. Expecting /."); + return; + } + String qs = req.getQueryString(); + if (qs != null) { + fileid = fileid + "?" + qs; + } + String hp = config.getMyName(); + int xp = config.getExtHttpsPort(); + if (xp != 443) { + hp = hp + ":" + xp; + } + String logurl = "https://" + hp + "/internal/publish/" + fileid; + if (feedid != null) { + logurl = "https://" + hp + "/publish/" + feedid + "/" + fileid; + String reason = config.isPublishPermitted(feedid, credentials, ip); + if (reason != null) { + logger.info("NODE0111 Rejecting unauthorized publish attempt to feed " + feedid + " fileid " + fileid + " from " + ip + " reason " + reason); + resp.sendError(HttpServletResponse.SC_FORBIDDEN,reason); + return; + } + user = config.getAuthUser(feedid, credentials); + String newnode = config.getIngressNode(feedid, user, ip); + if (newnode != null) { + String port = ""; + int iport = config.getExtHttpsPort(); + if (iport != 443) { + port = ":" + iport; + } + String redirto = "https://" + newnode + port + "/publish/" + feedid + "/" + fileid; + logger.info("NODE0108 Redirecting publish attempt for feed " + feedid + " user " + user + " ip " + ip + " to " + redirto); + resp.sendRedirect(redirto); + return; + } + resp.setHeader("X-ATT-DR-PUBLISH-ID", pubid); + } + String fbase = config.getSpoolDir() + "/" + pubid; + File data = new File(fbase); + File meta = new File(fbase + ".M"); + OutputStream dos = null; + Writer mw = null; + InputStream is = null; + try { + StringBuffer mx = new StringBuffer(); + mx.append(req.getMethod()).append('\t').append(fileid).append('\n'); + Enumeration hnames = req.getHeaderNames(); + String ctype = null; + while (hnames.hasMoreElements()) { + String hn = (String)hnames.nextElement(); + String hnlc = hn.toLowerCase(); + if ((isput && ("content-type".equals(hnlc) || + "content-language".equals(hnlc) || + "content-md5".equals(hnlc) || + "content-range".equals(hnlc))) || + "x-att-dr-meta".equals(hnlc) || + (feedid == null && "x-att-dr-received".equals(hnlc)) || + (hnlc.startsWith("x-") && !hnlc.startsWith("x-att-dr-"))) { + Enumeration hvals = req.getHeaders(hn); + while (hvals.hasMoreElements()) { + String hv = (String)hvals.nextElement(); + if ("content-type".equals(hnlc)) { + ctype = hv; + } + if ("x-att-dr-meta".equals(hnlc)) { + if (hv.length() > 4096) { + logger.info("NODE0109 Rejecting publish attempt with metadata too long for feed " + feedid + " user " + user + " ip " + ip); + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Metadata too long"); + return; + } + if (!MetaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) { + logger.info("NODE0109 Rejecting publish attempt with malformed metadata for feed " + feedid + " user " + user + " ip " + ip); + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Malformed metadata"); + return; + } + } + mx.append(hn).append('\t').append(hv).append('\n'); + } + } + } + mx.append("X-ATT-DR-RECEIVED\t").append(rcvd).append('\n'); + String metadata = mx.toString(); + byte[] buf = new byte[1024 * 1024]; + int i; + try { + is = req.getInputStream(); + dos = new FileOutputStream(data); + while ((i = is.read(buf)) > 0) { + dos.write(buf, 0, i); + } + is.close(); + is = null; + dos.close(); + dos = null; + } catch (IOException ioe) { + long exlen = -1; + try { + exlen = Long.parseLong(req.getHeader("Content-Length")); + } catch (Exception e) { + } + StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage()); + throw ioe; + } + Path dpath = Paths.get(fbase); + for (Target t: targets) { + DestInfo di = t.getDestInfo(); + if (di == null) { + // TODO: unknown destination + continue; + } + String dbase = di.getSpool() + "/" + pubid; + Files.createLink(Paths.get(dbase), dpath); + mw = new FileWriter(meta); + mw.write(metadata); + if (di.getSubId() == null) { + mw.write("X-ATT-DR-ROUTING\t" + t.getRouting() + "\n"); + } + mw.close(); + meta.renameTo(new File(dbase + ".M")); + } + resp.setStatus(HttpServletResponse.SC_NO_CONTENT); + resp.getOutputStream().close(); + StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user, HttpServletResponse.SC_NO_CONTENT); + } catch (IOException ioe) { + logger.info("NODE0110 IO Exception receiving publish attempt for feed " + feedid + " user " + user + " ip " + ip + " " + ioe.toString(), ioe); + throw ioe; + } finally { + if (is != null) { try { is.close(); } catch (Exception e) {}} + if (dos != null) { try { dos.close(); } catch (Exception e) {}} + if (mw != null) { try { mw.close(); } catch (Exception e) {}} + try { data.delete(); } catch (Exception e) {} + try { meta.delete(); } catch (Exception e) {} + } + } + + private int getIdFromPath(HttpServletRequest req) { + String path = req.getPathInfo(); + if (path == null || path.length() < 2) + return -1; + try { + return Integer.parseInt(path.substring(1)); + } catch (NumberFormatException e) { + return -1; + } + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java new file mode 100644 index 00000000..5471c0d2 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java @@ -0,0 +1,226 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN; +import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS; +import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME; + +import java.security.*; +import java.io.*; +import java.util.*; +import java.security.cert.*; +import java.net.*; +import java.text.*; +import org.apache.commons.codec.binary.Base64; +import org.apache.log4j.Logger; +import org.slf4j.MDC; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.research.datarouter.node.eelf.EelfMsgs; + +/** + * Utility functions for the data router node + */ +public class NodeUtils { + private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeUtils"); + private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeUtils"); + private static SimpleDateFormat logdate; + static { + logdate = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + logdate.setTimeZone(TimeZone.getTimeZone("GMT")); + } + private NodeUtils() {} + /** + * Base64 encode a byte array + * @param raw The bytes to be encoded + * @return The encoded string + */ + public static String base64Encode(byte[] raw) { + return(Base64.encodeBase64String(raw)); + } + /** + * Given a user and password, generate the credentials + * @param user User name + * @param password User password + * @return Authorization header value + */ + public static String getAuthHdr(String user, String password) { + if (user == null || password == null) { + return(null); + } + return("Basic " + base64Encode((user + ":" + password).getBytes())); + } + /** + * Given a node name, generate the credentials + * @param node Node name + */ + public static String getNodeAuthHdr(String node, String key) { + try { + MessageDigest md = MessageDigest.getInstance("SHA"); + md.update(key.getBytes()); + md.update(node.getBytes()); + md.update(key.getBytes()); + return(getAuthHdr(node, base64Encode(md.digest()))); + } catch (Exception e) { + return(null); + } + } + /** + * Given a keystore file and its password, return the value of the CN of the first private key entry with a certificate. + * @param kstype The type of keystore + * @param ksfile The file name of the keystore + * @param kspass The password of the keystore + * @return CN of the certificate subject or null + */ + public static String getCanonicalName(String kstype, String ksfile, String kspass) { + try { + KeyStore ks = KeyStore.getInstance(kstype); + ks.load(new FileInputStream(ksfile), kspass.toCharArray()); + return(getCanonicalName(ks)); + } catch (Exception e) { + setIpAndFqdnForEelf("getCanonicalName"); + eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_LOAD_ERROR, ksfile, e.toString()); + logger.error("NODE0401 Error loading my keystore file + " + ksfile + " " + e.toString(), e); + return(null); + } + } + /** + * Given a keystore, return the value of the CN of the first private key entry with a certificate. + * @param ks The KeyStore + * @return CN of the certificate subject or null + */ + public static String getCanonicalName(KeyStore ks) { + try { + Enumeration aliases = ks.aliases(); + while (aliases.hasMoreElements()) { + String s = aliases.nextElement(); + if (ks.entryInstanceOf(s, KeyStore.PrivateKeyEntry.class)) { + X509Certificate c = (X509Certificate)ks.getCertificate(s); + if (c != null) { + String subject = c.getSubjectX500Principal().getName(); + String[] parts = subject.split(","); + if (parts.length < 1) { + return(null); + } + subject = parts[0].trim(); + if (!subject.startsWith("CN=")) { + return(null); + + } + return(subject.substring(3)); + } + } + } + } catch (Exception e) { + logger.error("NODE0402 Error extracting my name from my keystore file " + e.toString(), e); + } + return(null); + } + /** + * Given a string representation of an IP address, get the corresponding byte array + * @param ip The IP address as a string + * @return The IP address as a byte array or null if the address is invalid + */ + public static byte[] getInetAddress(String ip) { + try { + return(InetAddress.getByName(ip).getAddress()); + } catch (Exception e) { + } + return(null); + } + /** + * Given a uri with parameters, split out the feed ID and file ID + */ + public static String[] getFeedAndFileID(String uriandparams) { + int end = uriandparams.length(); + int i = uriandparams.indexOf('#'); + if (i != -1 && i < end) { + end = i; + } + i = uriandparams.indexOf('?'); + if (i != -1 && i < end) { + end = i; + } + end = uriandparams.lastIndexOf('/', end); + if (end < 2) { + return(null); + } + i = uriandparams.lastIndexOf('/', end - 1); + if (i == -1) { + return(null); + } + return(new String[] { uriandparams.substring(i + 1, end - 1), uriandparams.substring(end + 1) }); + } + /** + * Escape fields that might contain vertical bar, backslash, or newline by replacing them with backslash p, backslash e and backslash n. + */ + public static String loge(String s) { + if (s == null) { + return(s); + } + return(s.replaceAll("\\\\", "\\\\e").replaceAll("\\|", "\\\\p").replaceAll("\n", "\\\\n")); + } + /** + * Undo what loge does. + */ + public static String unloge(String s) { + if (s == null) { + return(s); + } + return(s.replaceAll("\\\\p", "\\|").replaceAll("\\\\n", "\n").replaceAll("\\\\e", "\\\\")); + } + /** + * Format a logging timestamp as yyyy-mm-ddThh:mm:ss.mmmZ + */ + public static String logts(long when) { + return(logts(new Date(when))); + } + /** + * Format a logging timestamp as yyyy-mm-ddThh:mm:ss.mmmZ + */ + public static synchronized String logts(Date when) { + return(logdate.format(when)); + } + + /* Method prints method name, server FQDN and IP Address of the machine in EELF logs + * @Method - setIpAndFqdnForEelf - Rally:US664892 + * @Params - method, prints method name in EELF log. + */ + public static void setIpAndFqdnForEelf(String method) { + MDC.clear(); + MDC.put(MDC_SERVICE_NAME, method); + try { + MDC.put(MDC_SERVER_FQDN, InetAddress.getLocalHost().getHostName()); + MDC.put(MDC_SERVER_IP_ADDRESS, InetAddress.getLocalHost().getHostAddress()); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java new file mode 100644 index 00000000..7ff91839 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java @@ -0,0 +1,132 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import java.util.*; + +/** + * Given a set of node names and next hops, identify and ignore any cycles and figure out the sequence of next hops to get from this node to any other node + */ + +public class PathFinder { + private static class Hop { + public boolean mark; + public boolean bad; + public NodeConfig.ProvHop basis; + } + private Vector errors = new Vector(); + private Hashtable routes = new Hashtable(); + /** + * Get list of errors encountered while finding paths + * @return array of error descriptions + */ + public String[] getErrors() { + return(errors.toArray(new String[errors.size()])); + } + /** + * Get the route from this node to the specified node + * @param destination node + * @return list of node names separated by and ending with "/" + */ + public String getPath(String destination) { + String ret = routes.get(destination); + if (ret == null) { + return(""); + } + return(ret); + } + private String plot(String from, String to, Hashtable info) { + Hop nh = info.get(from); + if (nh == null || nh.bad) { + return(to); + } + if (nh.mark) { + // loop detected; + while (!nh.bad) { + nh.bad = true; + errors.add(nh.basis + " is part of a cycle"); + nh = info.get(nh.basis.getVia()); + } + return(to); + } + nh.mark = true; + String x = plot(nh.basis.getVia(), to, info); + nh.mark = false; + if (nh.bad) { + return(to); + } + return(nh.basis.getVia() + "/" + x); + } + /** + * Find routes from a specified origin to all of the nodes given a set of specified next hops. + * @param origin where we start + * @param nodes where we can go + * @param hops detours along the way + */ + public PathFinder(String origin, String[] nodes, NodeConfig.ProvHop[] hops) { + HashSet known = new HashSet(); + Hashtable> ht = new Hashtable>(); + for (String n: nodes) { + known.add(n); + ht.put(n, new Hashtable()); + } + for (NodeConfig.ProvHop ph: hops) { + if (!known.contains(ph.getFrom())) { + errors.add(ph + " references unknown from node"); + continue; + } + if (!known.contains(ph.getTo())) { + errors.add(ph + " references unknown destination node"); + continue; + } + Hashtable ht2 = ht.get(ph.getTo()); + Hop h = ht2.get(ph.getFrom()); + if (h != null) { + h.bad = true; + errors.add(ph + " gives duplicate next hop - previous via was " + h.basis.getVia()); + continue; + } + h = new Hop(); + h.basis = ph; + ht2.put(ph.getFrom(), h); + if (!known.contains(ph.getVia())) { + errors.add(ph + " references unknown via node"); + h.bad = true; + continue; + } + if (ph.getVia().equals(ph.getTo())) { + errors.add(ph + " gives destination as via"); + h.bad = true; + continue; + } + } + for (String n: known) { + if (n.equals(origin)) { + routes.put(n, ""); + } + routes.put(n, plot(origin, n, ht.get(n)) + "/"); + } + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/ProvData.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/ProvData.java new file mode 100644 index 00000000..19cb8993 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/ProvData.java @@ -0,0 +1,302 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import java.io.*; +import java.util.*; +import org.json.*; +import org.apache.log4j.Logger; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.research.datarouter.node.eelf.EelfMsgs; + +/** + * Parser for provisioning data from the provisioning server. + *

+ * The ProvData class uses a Reader for the text configuration from the + * provisioning server to construct arrays of raw configuration entries. + */ +public class ProvData { + private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.ProvData"); + private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.ProvData"); + private NodeConfig.ProvNode[] pn; + private NodeConfig.ProvParam[] pp; + private NodeConfig.ProvFeed[] pf; + private NodeConfig.ProvFeedUser[] pfu; + private NodeConfig.ProvFeedSubnet[] pfsn; + private NodeConfig.ProvSubscription[] ps; + private NodeConfig.ProvForceIngress[] pfi; + private NodeConfig.ProvForceEgress[] pfe; + private NodeConfig.ProvHop[] ph; + private static String[] gvasa(JSONArray a, int index) { + return(gvasa(a.get(index))); + } + private static String[] gvasa(JSONObject o, String key) { + return(gvasa(o.opt(key))); + } + private static String[] gvasa(Object o) { + if (o instanceof JSONArray) { + JSONArray a = (JSONArray)o; + Vector v = new Vector(); + for (int i = 0; i < a.length(); i++) { + String s = gvas(a, i); + if (s != null) { + v.add(s); + } + } + return(v.toArray(new String[v.size()])); + } else { + String s = gvas(o); + if (s == null) { + return(new String[0]); + } else { + return(new String[] { s }); + } + } + } + private static String gvas(JSONArray a, int index) { + return(gvas(a.get(index))); + } + private static String gvas(JSONObject o, String key) { + return(gvas(o.opt(key))); + } + private static String gvas(Object o) { + if (o instanceof Boolean || o instanceof Number || o instanceof String) { + return(o.toString()); + } + return(null); + } + /** + * Construct raw provisioing data entries from the text (JSON) + * provisioning document received from the provisioning server + * @param r The reader for the JSON text. + */ + public ProvData(Reader r) throws IOException { + Vector pnv = new Vector(); + Vector ppv = new Vector(); + Vector pfv = new Vector(); + Vector pfuv = new Vector(); + Vector pfsnv = new Vector(); + Vector psv = new Vector(); + Vector pfiv = new Vector(); + Vector pfev = new Vector(); + Vector phv = new Vector(); + try { + JSONTokener jtx = new JSONTokener(r); + JSONObject jcfg = new JSONObject(jtx); + char c = jtx.nextClean(); + if (c != '\0') { + throw new JSONException("Spurious characters following configuration"); + } + r.close(); + JSONArray jfeeds = jcfg.optJSONArray("feeds"); + if (jfeeds != null) { + for (int fx = 0; fx < jfeeds.length(); fx++) { + JSONObject jfeed = jfeeds.getJSONObject(fx); + String stat = null; + if (jfeed.optBoolean("suspend", false)) { + stat = "Feed is suspended"; + } + if (jfeed.optBoolean("deleted", false)) { + stat = "Feed is deleted"; + } + String fid = gvas(jfeed, "feedid"); + String fname = gvas(jfeed, "name"); + String fver = gvas(jfeed, "version"); + pfv.add(new NodeConfig.ProvFeed(fid, fname + "//" + fver, stat)); + JSONObject jauth = jfeed.optJSONObject("authorization"); + if (jauth == null) { + continue; + } + JSONArray jeids = jauth.optJSONArray("endpoint_ids"); + if (jeids != null) { + for (int ux = 0; ux < jeids.length(); ux++) { + JSONObject ju = jeids.getJSONObject(ux); + String login = gvas(ju, "id"); + String password = gvas(ju, "password"); + pfuv.add(new NodeConfig.ProvFeedUser(fid, login, NodeUtils.getAuthHdr(login, password))); + } + } + JSONArray jeips = jauth.optJSONArray("endpoint_addrs"); + if (jeips != null) { + for (int ix = 0; ix < jeips.length(); ix++) { + String sn = gvas(jeips, ix); + pfsnv.add(new NodeConfig.ProvFeedSubnet(fid, sn)); + } + } + } + } + JSONArray jsubs = jcfg.optJSONArray("subscriptions"); + if (jsubs != null) { + for (int sx = 0; sx < jsubs.length(); sx++) { + JSONObject jsub = jsubs.getJSONObject(sx); + if (jsub.optBoolean("suspend", false)) { + continue; + } + String sid = gvas(jsub, "subid"); + String fid = gvas(jsub, "feedid"); + JSONObject jdel = jsub.getJSONObject("delivery"); + String delurl = gvas(jdel, "url"); + String id = gvas(jdel, "user"); + String password = gvas(jdel, "password"); + boolean monly = jsub.getBoolean("metadataOnly"); + boolean use100 = jdel.getBoolean("use100"); + psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100)); + } + } + JSONObject jparams = jcfg.optJSONObject("parameters"); + if (jparams != null) { + for (String pname: JSONObject.getNames(jparams)) { + String pvalue = gvas(jparams, pname); + if (pvalue != null) { + ppv.add(new NodeConfig.ProvParam(pname, pvalue)); + } + } + String sfx = gvas(jparams, "PROV_DOMAIN"); + JSONArray jnodes = jparams.optJSONArray("NODES"); + if (jnodes != null) { + for (int nx = 0; nx < jnodes.length(); nx++) { + String nn = gvas(jnodes, nx); + if (nn.indexOf('.') == -1) { + nn = nn + "." + sfx; + } + pnv.add(new NodeConfig.ProvNode(nn)); + } + } + } + JSONArray jingresses = jcfg.optJSONArray("ingress"); + if (jingresses != null) { + for (int fx = 0; fx < jingresses.length(); fx++) { + JSONObject jingress = jingresses.getJSONObject(fx); + String fid = gvas(jingress, "feedid"); + String subnet = gvas(jingress, "subnet"); + String user = gvas(jingress, "user"); + String[] nodes = gvasa(jingress, "node"); + if (fid == null || "".equals(fid)) { + continue; + } + if ("".equals(subnet)) { + subnet = null; + } + if ("".equals(user)) { + user = null; + } + pfiv.add(new NodeConfig.ProvForceIngress(fid, subnet, user, nodes)); + } + } + JSONObject jegresses = jcfg.optJSONObject("egress"); + if (jegresses != null && JSONObject.getNames(jegresses) != null) { + for (String esid: JSONObject.getNames(jegresses)) { + String enode = gvas(jegresses, esid); + if (esid != null && enode != null && !"".equals(esid) && !"".equals(enode)) { + pfev.add(new NodeConfig.ProvForceEgress(esid, enode)); + } + } + } + JSONArray jhops = jcfg.optJSONArray("routing"); + if (jhops != null) { + for (int fx = 0; fx < jhops.length(); fx++) { + JSONObject jhop = jhops.getJSONObject(fx); + String from = gvas(jhop, "from"); + String to = gvas(jhop, "to"); + String via = gvas(jhop, "via"); + if (from == null || to == null || via == null || "".equals(from) || "".equals(to) || "".equals(via)) { + continue; + } + phv.add(new NodeConfig.ProvHop(from, to, via)); + } + } + } catch (JSONException jse) { + NodeUtils.setIpAndFqdnForEelf("ProvData"); + eelflogger.error(EelfMsgs.MESSAGE_PARSING_ERROR, jse.toString()); + logger.error("NODE0201 Error parsing configuration data from provisioning server " + jse.toString(), jse); + throw new IOException(jse.toString(), jse); + } + pn = pnv.toArray(new NodeConfig.ProvNode[pnv.size()]); + pp = ppv.toArray(new NodeConfig.ProvParam[ppv.size()]); + pf = pfv.toArray(new NodeConfig.ProvFeed[pfv.size()]); + pfu = pfuv.toArray(new NodeConfig.ProvFeedUser[pfuv.size()]); + pfsn = pfsnv.toArray(new NodeConfig.ProvFeedSubnet[pfsnv.size()]); + ps = psv.toArray(new NodeConfig.ProvSubscription[psv.size()]); + pfi = pfiv.toArray(new NodeConfig.ProvForceIngress[pfiv.size()]); + pfe = pfev.toArray(new NodeConfig.ProvForceEgress[pfev.size()]); + ph = phv.toArray(new NodeConfig.ProvHop[phv.size()]); + } + /** + * Get the raw node configuration entries + */ + public NodeConfig.ProvNode[] getNodes() { + return(pn); + } + /** + * Get the raw parameter configuration entries + */ + public NodeConfig.ProvParam[] getParams() { + return(pp); + } + /** + * Ge the raw feed configuration entries + */ + public NodeConfig.ProvFeed[] getFeeds() { + return(pf); + } + /** + * Get the raw feed user configuration entries + */ + public NodeConfig.ProvFeedUser[] getFeedUsers() { + return(pfu); + } + /** + * Get the raw feed subnet configuration entries + */ + public NodeConfig.ProvFeedSubnet[] getFeedSubnets() { + return(pfsn); + } + /** + * Get the raw subscription entries + */ + public NodeConfig.ProvSubscription[] getSubscriptions() { + return(ps); + } + /** + * Get the raw forced ingress entries + */ + public NodeConfig.ProvForceIngress[] getForceIngress() { + return(pfi); + } + /** + * Get the raw forced egress entries + */ + public NodeConfig.ProvForceEgress[] getForceEgress() { + return(pfe); + } + /** + * Get the raw next hop entries + */ + public NodeConfig.ProvHop[] getHops() { + return(ph); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/PublishId.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/PublishId.java new file mode 100644 index 00000000..436adbad --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/PublishId.java @@ -0,0 +1,52 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +/** + * Generate publish IDs + */ +public class PublishId { + private long nextuid; + private String myname; + + /** + * Generate publish IDs for the specified name + * @param myname Unique identifier for this publish ID generator (usually fqdn of server) + */ + public PublishId(String myname) { + this.myname = myname; + } + /** + * Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log correlation purposes. + */ + public synchronized String next() { + long now = System.currentTimeMillis(); + if (now < nextuid) { + now = nextuid; + } + nextuid = now + 1; + return(now + "." + myname); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/RateLimitedOperation.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/RateLimitedOperation.java new file mode 100644 index 00000000..5bcbed83 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/RateLimitedOperation.java @@ -0,0 +1,102 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import java.util.*; + +/** + * Execute an operation no more frequently than a specified interval + */ + +public abstract class RateLimitedOperation implements Runnable { + private boolean marked; // a timer task exists + private boolean executing; // the operation is currently in progress + private boolean remark; // a request was made while the operation was in progress + private Timer timer; + private long last; // when the last operation started + private long mininterval; + /** + * Create a rate limited operation + * @param mininterval The minimum number of milliseconds after the last execution starts before a new execution can begin + * @param timer The timer used to perform deferred executions + */ + public RateLimitedOperation(long mininterval, Timer timer) { + this.timer = timer; + this.mininterval = mininterval; + } + private class deferred extends TimerTask { + public void run() { + execute(); + } + } + private synchronized void unmark() { + marked = false; + } + private void execute() { + unmark(); + request(); + } + /** + * Request that the operation be performed by this thread or at a later time by the timer + */ + public void request() { + if (premark()) { + return; + } + do { + run(); + } while (demark()); + } + private synchronized boolean premark() { + if (executing) { + // currently executing - wait until it finishes + remark = true; + return(true); + } + if (marked) { + // timer currently running - will run when it expires + return(true); + } + long now = System.currentTimeMillis(); + if (last + mininterval > now) { + // too soon - schedule a timer + marked = true; + timer.schedule(new deferred(), last + mininterval - now); + return(true); + } + last = now; + executing = true; + // start execution + return(false); + } + private synchronized boolean demark() { + executing = false; + if (remark) { + remark = false; + return(!premark()); + } + return(false); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/RedirManager.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/RedirManager.java new file mode 100644 index 00000000..09473c14 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/RedirManager.java @@ -0,0 +1,118 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import java.util.*; +import java.io.*; + +/** + * Track redirections of subscriptions + */ +public class RedirManager { + private Hashtable sid2primary = new Hashtable(); + private Hashtable sid2secondary = new Hashtable(); + private String redirfile; + RateLimitedOperation op; + /** + * Create a mechanism for maintaining subscription redirections. + * @param redirfile The file to store the redirection information. + * @param mininterval The minimum number of milliseconds between writes to the redirection information file. + * @param timer The timer thread used to run delayed file writes. + */ + public RedirManager(String redirfile, long mininterval, Timer timer) { + this.redirfile = redirfile; + op = new RateLimitedOperation(mininterval, timer) { + public void run() { + try { + StringBuffer sb = new StringBuffer(); + for (String s: sid2primary.keySet()) { + sb.append(s).append(' ').append(sid2primary.get(s)).append(' ').append(sid2secondary.get(s)).append('\n'); + } + OutputStream os = new FileOutputStream(RedirManager.this.redirfile); + os.write(sb.toString().getBytes()); + os.close(); + } catch (Exception e) { + } + } + }; + try { + String s; + BufferedReader br = new BufferedReader(new FileReader(redirfile)); + while ((s = br.readLine()) != null) { + s = s.trim(); + String[] sx = s.split(" "); + if (s.startsWith("#") || sx.length != 3) { + continue; + } + sid2primary.put(sx[0], sx[1]); + sid2secondary.put(sx[0], sx[2]); + } + br.close(); + } catch (Exception e) { + // missing file is normal + } + } + /** + * Set up redirection. If a request is to be sent to subscription ID sid, and that is configured to go to URL primary, instead, go to secondary. + * @param sid The subscription ID to be redirected + * @param primary The URL associated with that subscription ID + * @param secondary The replacement URL to use instead + */ + public synchronized void redirect(String sid, String primary, String secondary) { + sid2primary.put(sid, primary); + sid2secondary.put(sid, secondary); + op.request(); + } + /** + * Cancel redirection. If a request is to be sent to subscription ID sid, send it to its primary URL. + * @param sid The subscription ID to remove from the table. + */ + public synchronized void forget(String sid) { + sid2primary.remove(sid); + sid2secondary.remove(sid); + op.request(); + } + /** + * Look up where to send a subscription. If the primary has changed or there is no redirection, use the primary. Otherwise, redirect to the secondary URL. + * @param sid The subscription ID to look up. + * @param primary The configured primary URL. + * @return The destination URL to really use. + */ + public synchronized String lookup(String sid, String primary) { + String oprim = sid2primary.get(sid); + if (primary.equals(oprim)) { + return(sid2secondary.get(sid)); + } else if (oprim != null) { + forget(sid); + } + return(primary); + } + /** + * Is a subscription redirected? + */ + public synchronized boolean isRedirected(String sid) { + return(sid != null && sid2secondary.get(sid) != null); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/StatusLog.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/StatusLog.java new file mode 100644 index 00000000..66aa4add --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/StatusLog.java @@ -0,0 +1,229 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + +package com.att.research.datarouter.node; + +import java.util.regex.*; +import java.util.*; +import java.io.*; +import java.nio.file.*; +import java.text.*; + +/** + * Logging for data router delivery events (PUB/DEL/EXP) + */ +public class StatusLog { + private static StatusLog instance = new StatusLog(); + private HashSet toship = new HashSet(); + private SimpleDateFormat filedate; + private String prefix = "logs/events"; + private String suffix = ".log"; + private String plainfile; + private String curfile; + private long nexttime; + private OutputStream os; + private long intvl; + private NodeConfigManager config = NodeConfigManager.getInstance(); + { + try { filedate = new SimpleDateFormat("-yyyyMMddHHmm"); } catch (Exception e) {} + } + /** + * Parse an interval of the form xxhyymzzs and round it to the nearest whole fraction of 24 hours. If no units are specified, assume seconds. + */ + public static long parseInterval(String interval, int def) { + try { + Matcher m = Pattern.compile("(?:(\\d+)[Hh])?(?:(\\d+)[Mm])?(?:(\\d+)[Ss]?)?").matcher(interval); + if (m.matches()) { + int dur = 0; + String x = m.group(1); + if (x != null) { + dur += 3600 * Integer.parseInt(x); + } + x = m.group(2); + if (x != null) { + dur += 60 * Integer.parseInt(x); + } + x = m.group(3); + if (x != null) { + dur += Integer.parseInt(x); + } + if (dur < 60) { + dur = 60; + } + int best = 86400; + int dist = best - dur; + if (dur > best) { + dist = dur - best; + } + int base = 1; + for (int i = 0; i < 8; i++) { + int base2 = base; + base *= 2; + for (int j = 0; j < 4; j++) { + int base3 = base2; + base2 *= 3; + for (int k = 0; k < 3; k++) { + int cur = base3; + base3 *= 5; + int ndist = cur - dur; + if (dur > cur) { + ndist = dur - cur; + } + if (ndist < dist) { + best = cur; + dist = ndist; + } + } + } + } + def = best * 1000; + } + } catch (Exception e) { + } + return(def); + } + private synchronized void checkRoll(long now) throws IOException { + if (now >= nexttime) { + if (os != null) { + os.close(); + os = null; + } + intvl = parseInterval(config.getEventLogInterval(), 300000); + prefix = config.getEventLogPrefix(); + suffix = config.getEventLogSuffix(); + nexttime = now - now % intvl + intvl; + curfile = prefix + filedate.format(new Date(nexttime - intvl)) + suffix; + plainfile = prefix + suffix; + notify(); + } + } + /** + * Get the name of the current log file + * @return The full path name of the current event log file + */ + public static synchronized String getCurLogFile() { + try { + instance.checkRoll(System.currentTimeMillis()); + } catch (Exception e) { + } + return(instance.curfile); + } + private synchronized void log(String s) { + try { + long now = System.currentTimeMillis(); + checkRoll(now); + if (os == null) { + os = new FileOutputStream(curfile, true); + (new File(plainfile)).delete(); + Files.createLink(Paths.get(plainfile), Paths.get(curfile)); + } + os.write((NodeUtils.logts(new Date(now)) + '|' + s + '\n').getBytes()); + os.flush(); + } catch (IOException ioe) { + } + } + /** + * Log a received publication attempt. + * @param pubid The publish ID assigned by the node + * @param feedid The feed id given by the publisher + * @param requrl The URL of the received request + * @param method The method (DELETE or PUT) in the received request + * @param ctype The content type (if method is PUT and clen > 0) + * @param clen The content length (if method is PUT) + * @param srcip The IP address of the publisher + * @param user The identity of the publisher + * @param status The status returned to the publisher + */ + public static void logPub(String pubid, String feedid, String requrl, String method, String ctype, long clen, String srcip, String user, int status) { + instance.log("PUB|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + srcip + "|" + user + "|" + status); + } + /** + * Log a data transfer error receiving a publication attempt + * @param pubid The publish ID assigned by the node + * @param feedid The feed id given by the publisher + * @param requrl The URL of the received request + * @param method The method (DELETE or PUT) in the received request + * @param ctype The content type (if method is PUT and clen > 0) + * @param clen The expected content length (if method is PUT) + * @param rcvd The content length received + * @param srcip The IP address of the publisher + * @param user The identity of the publisher + * @param error The error message from the IO exception + */ + public static void logPubFail(String pubid, String feedid, String requrl, String method, String ctype, long clen, long rcvd, String srcip, String user, String error) { + instance.log("PBF|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + rcvd + "|" + srcip + "|" + user + "|" + error); + } + /** + * Log a delivery attempt. + * @param pubid The publish ID assigned by the node + * @param feedid The feed ID + * @param subid The (space delimited list of) subscription ID + * @param requrl The URL used in the attempt + * @param method The method (DELETE or PUT) in the attempt + * @param ctype The content type (if method is PUT, not metaonly, and clen > 0) + * @param clen The content length (if PUT and not metaonly) + * @param user The identity given to the subscriber + * @param status The status returned by the subscriber or -1 if an exeception occured trying to connect + * @param xpubid The publish ID returned by the subscriber + */ + public static void logDel(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String user, int status, String xpubid) { + if (feedid == null) { + return; + } + instance.log("DEL|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + user + "|" + status + "|" + xpubid); + } + /** + * Log delivery attempts expired + * @param pubid The publish ID assigned by the node + * @param feedid The feed ID + * @param subid The (space delimited list of) subscription ID + * @param requrl The URL that would be delivered to + * @param method The method (DELETE or PUT) in the request + * @param ctype The content type (if method is PUT, not metaonly, and clen > 0) + * @param clen The content length (if PUT and not metaonly) + * @param reason The reason the attempts were discontinued + * @param attempts The number of attempts made + */ + public static void logExp(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String reason, int attempts) { + if (feedid == null) { + return; + } + instance.log("EXP|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + reason + "|" + attempts); + } + /** + * Log extra statistics about unsuccessful delivery attempts. + * @param pubid The publish ID assigned by the node + * @param feedid The feed ID + * @param subid The (space delimited list of) subscription ID + * @param clen The content length + * @param sent The # of bytes sent or -1 if subscriber returned an error instead of 100 Continue, otherwise, the number of bytes sent before an error occurred. + */ + public static void logDelExtra(String pubid, String feedid, String subid, long clen, long sent) { + if (feedid == null) { + return; + } + instance.log("DLX|" + pubid + "|" + feedid + "|" + subid + "|" + clen + "|" + sent); + } + private StatusLog() { + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/SubnetMatcher.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/SubnetMatcher.java new file mode 100644 index 00000000..c1cfeaad --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/SubnetMatcher.java @@ -0,0 +1,71 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import java.net.*; + +/** + * Compare IP addresses as byte arrays to a subnet specified as a CIDR + */ +public class SubnetMatcher { + private byte[] sn; + private int len; + private int mask; + /** + * Construct a subnet matcher given a CIDR + * @param subnet The CIDR to match + */ + public SubnetMatcher(String subnet) { + int i = subnet.lastIndexOf('/'); + if (i == -1) { + sn = NodeUtils.getInetAddress(subnet); + len = sn.length; + } else { + len = Integer.parseInt(subnet.substring(i + 1)); + sn = NodeUtils.getInetAddress(subnet.substring(0, i)); + mask = ((0xff00) >> (len % 8)) & 0xff; + len /= 8; + } + } + /** + * Is the IP address in the CIDR? + * @param addr the IP address as bytes in network byte order + * @return true if the IP address matches. + */ + public boolean matches(byte[] addr) { + if (addr.length != sn.length) { + return(false); + } + for (int i = 0; i < len; i++) { + if (addr[i] != sn[i]) { + return(false); + } + } + if (mask != 0 && ((addr[len] ^ sn[len]) & mask) != 0) { + return(false); + } + return(true); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/Target.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/Target.java new file mode 100644 index 00000000..fe595d50 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/Target.java @@ -0,0 +1,60 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +/** + * A destination to deliver a message + */ +public class Target { + private DestInfo destinfo; + private String routing; + /** + * A destination to deliver a message + * @param destinfo Either info for a subscription ID or info for a node-to-node transfer + * @param routing For a node-to-node transfer, what to do when it gets there. + */ + public Target(DestInfo destinfo, String routing) { + this.destinfo = destinfo; + this.routing = routing; + } + /** + * Add additional routing + */ + public void addRouting(String routing) { + this.routing = this.routing + " " + routing; + } + /** + * Get the destination information for this target + */ + public DestInfo getDestInfo() { + return(destinfo); + } + /** + * Get the next hop information for this target + */ + public String getRouting() { + return(routing); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/TaskList.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/TaskList.java new file mode 100644 index 00000000..401c72a6 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/TaskList.java @@ -0,0 +1,113 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.node; + +import java.util.*; + +/** + * Manage a list of tasks to be executed when an event occurs. + * This makes the following guarantees: + *

    + *
  • Tasks can be safely added and removed in the middle of a run.
  • + *
  • No task will be returned more than once during a run.
  • + *
  • No task will be returned when it is not, at that moment, in the list of tasks.
  • + *
  • At the moment when next() returns null, all tasks on the list have been returned during the run.
  • + *
  • Initially and once next() returns null during a run, next() will continue to return null until startRun() is called. + *
+ */ +public class TaskList { + private Iterator runlist; + private HashSet tasks = new HashSet(); + private HashSet togo; + private HashSet sofar; + private HashSet added; + private HashSet removed; + /** + * Construct a new TaskList + */ + public TaskList() { + } + /** + * Start executing the sequence of tasks. + */ + public synchronized void startRun() { + sofar = new HashSet(); + added = new HashSet(); + removed = new HashSet(); + togo = new HashSet(tasks); + runlist = togo.iterator(); + } + /** + * Get the next task to execute + */ + public synchronized Runnable next() { + while (runlist != null) { + if (runlist.hasNext()) { + Runnable task = runlist.next(); + if (removed.contains(task)) { + continue; + } + if (sofar.contains(task)) { + continue; + } + sofar.add(task); + return(task); + } + if (added.size() != 0) { + togo = added; + added = new HashSet(); + removed.clear(); + runlist = togo.iterator(); + continue; + } + togo = null; + added = null; + removed = null; + sofar = null; + runlist = null; + } + return(null); + } + /** + * Add a task to the list of tasks to run whenever the event occurs. + */ + public synchronized void addTask(Runnable task) { + if (runlist != null) { + added.add(task); + removed.remove(task); + } + tasks.add(task); + } + /** + * Remove a task from the list of tasks to run whenever the event occurs. + */ + public synchronized void removeTask(Runnable task) { + if (runlist != null) { + removed.add(task); + added.remove(task); + } + tasks.remove(task); + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EELFFilter.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EELFFilter.java new file mode 100644 index 00000000..9b006585 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EELFFilter.java @@ -0,0 +1,43 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ +package com.att.research.datarouter.node.eelf; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.filter.Filter; +import ch.qos.logback.core.spi.FilterReply; + +/* + * When EELF functionality added it default started logging Jetty logs as well which in turn stopped existing functionality of logging jetty statements in node.log + * added code in logback.xml to add jetty statements in node.log. + * This class removes extran EELF statements from node.log since they are being logged in apicalls.log + */ +public class EELFFilter extends Filter{ + @Override + public FilterReply decide(ILoggingEvent event) { + if (event.getMessage().contains("EELF")) { + return FilterReply.DENY; + } else { + return FilterReply.ACCEPT; + } + } +} diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EelfMsgs.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EelfMsgs.java new file mode 100644 index 00000000..9963f413 --- /dev/null +++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EelfMsgs.java @@ -0,0 +1,96 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ +package com.att.research.datarouter.node.eelf; + +import com.att.eelf.i18n.EELFResolvableErrorEnum; +import com.att.eelf.i18n.EELFResourceManager; + +public enum EelfMsgs implements EELFResolvableErrorEnum { + + /** + * Application message prints user (accepts one argument) + */ + MESSAGE_WITH_BEHALF, + + /** + * Application message prints user and FeedID (accepts two arguments) + */ + + MESSAGE_WITH_BEHALF_AND_FEEDID, + + /** + * Application message prints keystore file error in EELF errors log + */ + + MESSAGE_KEYSTORE_LOAD_ERROR, + + /** + * Application message prints Error extracting my name from my keystore file + */ + + MESSAGE_KEYSORE_NAME_ERROR, + + /** + * Application message prints Error parsing configuration data from provisioning server. + */ + + + MESSAGE_PARSING_ERROR, + + /** + * Application message printsConfiguration failed + */ + + + MESSAGE_CONF_FAILED, + + /** + * Application message prints Bad provisioning server URL + */ + + + MESSAGE_BAD_PROV_URL, + + /** + * Application message prints Unable to fetch canonical name from keystore file + */ + + + MESSAGE_KEYSTORE_FETCH_ERROR, + + /** + * Application message prints Unable to load local configuration file. + */ + + + MESSAGE_PROPERTIES_LOAD_ERROR; + + + /** + * Static initializer to ensure the resource bundles for this class are loaded... + * Here this application loads messages from three bundles + */ + static { + EELFResourceManager.loadMessageBundle("EelfMessages"); + } +} diff --git a/datarouter-node/src/main/resources/EelfMessages.properties b/datarouter-node/src/main/resources/EelfMessages.properties new file mode 100644 index 00000000..8c17417d --- /dev/null +++ b/datarouter-node/src/main/resources/EelfMessages.properties @@ -0,0 +1,70 @@ +#------------------------------------------------------------------------------- +# ============LICENSE_START================================================== +# * org.onap.dmaap +# * =========================================================================== +# * Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# * =========================================================================== +# * Licensed under the Apache License, Version 2.0 (the "License"); +# * you may not use this file except in compliance with the License. +# * You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# * ============LICENSE_END==================================================== +# * +# * ECOMP is a trademark and service mark of AT&T Intellectual Property. +# * +#------------------------------------------------------------------------------- +######################################################################## +#Resource key=Error Code|Message text|Resolution text |Description text +####### +#Newlines can be utilized to add some clarity ensuring continuing line +#has atleast one leading space +#ResourceKey=\ +# ERR0000E\ +# Sample error msg txt\ +# Sample resolution msg\ +# Sample description txt +# +###### +#Error code classification category +#100 Permission errors +#200 Availability errors/Timeouts +#300 Data errors +#400 Schema Interface type/validation errors +#500 Business process errors +#900 Unknown errors +# +######################################################################## + +# Messages for Data Router EELF framework + +#Prints FeedID in the EELF apicalls log +MESSAGE_WITH__FEEDID=EELF0001I| FeedID = {0} + +#Prints User in the EELF apicalls log +MESSAGE_WITH_BEHALF=EELF0002I| User = {0} + +#Prints User and FeedID in the EELF apicalls log +MESSAGE_WITH_BEHALF_AND_FEEDID=EELF0003I| User = {0} FeedID = {1} + +#Prints keystore file error in EELF errors log +MESSAGE_KEYSTORE_LOAD_ERROR=EELF0001E| Error loading my keystore file {0} {1} + +MESSAGE_KEYSORE_NAME_ERROR=EELF0002E| Error extracting my name from my keystore file. {0} + +MESSAGE_PARSING_ERROR=EELF0003E| Error parsing configuration data from provisioning server. {0} + +MESSAGE_CONF_FAILED=EELF0004E| Configuration failed. {0} - try again later. + +MESSAGE_BAD_PROV_URL=EELF0005E| Bad provisioning server URL {0} + +MESSAGE_KEYSTORE_FETCH_ERROR=EELF0006E| Unable to fetch canonical name from keystore file {0} + +MESSAGE_PROPERTIES_LOAD_ERROR=EELF0007E| Unable to load local configuration file - etc/node.properties + diff --git a/datarouter-node/src/main/resources/docker/Dockerfile b/datarouter-node/src/main/resources/docker/Dockerfile new file mode 100644 index 00000000..fbf54566 --- /dev/null +++ b/datarouter-node/src/main/resources/docker/Dockerfile @@ -0,0 +1,7 @@ +FROM java:8 +ADD opt /opt/ +ADD startup.sh /startup.sh +RUN chmod 700 /startup.sh +ENTRYPOINT ./startup.sh start +EXPOSE 8443 +EXPOSE 8080 \ No newline at end of file diff --git a/datarouter-node/src/main/resources/docker/startup.sh b/datarouter-node/src/main/resources/docker/startup.sh new file mode 100644 index 00000000..8cb71dd6 --- /dev/null +++ b/datarouter-node/src/main/resources/docker/startup.sh @@ -0,0 +1,18 @@ +LIB=/opt/app/datartr/lib +ETC=/opt/app/datartr/etc +echo "this is LIB" $LIB +echo "this is ETC" $ETC +mkdir -p /opt/app/datartr/logs +mkdir -p /opt/app/datartr/spool +mkdir -p /opt/app/datartr/spool/f +mkdir -p /opt/app/datartr/spool/n +mkdir -p /opt/app/datartr/spool/s +CLASSPATH=$ETC +for FILE in `find $LIB -name *.jar`; do + CLASSPATH=$CLASSPATH:$FILE +done +java -classpath $CLASSPATH com.att.research.datarouter.node.NodeMain + +runner_file="$LIB/datarouter-node-jar-with-dependencies.jar" +echo "Starting using" $runner_file +java -Dcom.att.eelf.logging.file==/opt/app/datartr/etc/logback.xml -Dcom.att.eelf.logging.path=/ -Dcom.att.research.datarouter.node.ConfigFile==/opt/app/datartr/etc/node.properties -jar $runner_file \ No newline at end of file diff --git a/datarouter-node/src/main/resources/log4j.properties b/datarouter-node/src/main/resources/log4j.properties new file mode 100644 index 00000000..5b2f019f --- /dev/null +++ b/datarouter-node/src/main/resources/log4j.properties @@ -0,0 +1,32 @@ +#------------------------------------------------------------------------------- +# ============LICENSE_START================================================== +# * org.onap.dmaap +# * =========================================================================== +# * Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# * =========================================================================== +# * Licensed under the Apache License, Version 2.0 (the "License"); +# * you may not use this file except in compliance with the License. +# * You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# * ============LICENSE_END==================================================== +# * +# * ECOMP is a trademark and service mark of AT&T Intellectual Property. +# * +#------------------------------------------------------------------------------- +log4j.debug=FALSE +log4j.rootLogger=INFO,Root + +log4j.appender.Root=org.apache.log4j.DailyRollingFileAppender +log4j.appender.Root.file=/root/node.log +log4j.appender.Root.datePattern='.'yyyyMMdd +log4j.appender.Root.append=true +log4j.appender.Root.layout=org.apache.log4j.PatternLayout +log4j.appender.Root.layout.ConversionPattern=%d %p %m%n +! diff --git a/datarouter-node/src/main/resources/log4j.properties.tmpl b/datarouter-node/src/main/resources/log4j.properties.tmpl new file mode 100644 index 00000000..299edbfe --- /dev/null +++ b/datarouter-node/src/main/resources/log4j.properties.tmpl @@ -0,0 +1,11 @@ +cat < + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ${defaultPattern} + + + + + + + + + + + + ${logDirectory}/${generalLogName}.log + + INFO + ACCEPT + DENY + + + ${logDirectory}/${generalLogName}.%i.log.zip + + 1 + 9 + + + 5MB + + + ${defaultPattern} + + + + + 256 + + + + + + + + + + + + + + + + + + + + + + + + + + + ${logDirectory}/${errorLogName}.log + + ERROR + ACCEPT + DENY + + + ${logDirectory}/${errorLogName}.%i.log.zip + + 1 + 9 + + + 5MB + + + ${defaultPattern} + + + + + 256 + + + + + + ${logDirectory}/${jettyAndNodeLogName}.log + + + ${logDirectory}/${jettyAndNodeLogName}.%i.log.zip + + 1 + 9 + + + 5MB + + + ${jettyAndNodeLoggerPattern} + + + + + 256 + + true + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/datarouter-node/src/main/resources/misc/descriptor.xml b/datarouter-node/src/main/resources/misc/descriptor.xml new file mode 100644 index 00000000..88fccc19 --- /dev/null +++ b/datarouter-node/src/main/resources/misc/descriptor.xml @@ -0,0 +1,53 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/datarouter-node/src/main/resources/misc/doaction b/datarouter-node/src/main/resources/misc/doaction new file mode 100644 index 00000000..617b01d9 --- /dev/null +++ b/datarouter-node/src/main/resources/misc/doaction @@ -0,0 +1,42 @@ +#!/bin/bash + +cd /opt/app/datartr/etc +for action in "$@" +do +case "$action" in +'backup') + cp log4j.properties log4j.properties.save 2>/dev/null + cp node.properties node.properties.save 2>/dev/null + cp havecert havecert.save 2>/dev/null + ;; +'stop') + /opt/app/platform/init.d/drtrnode stop + ;; +'start') + /opt/app/platform/init.d/drtrnode start || exit 1 + ;; +'config') + /bin/bash log4j.properties.tmpl >log4j.properties + /bin/bash node.properties.tmpl >node.properties + /bin/bash havecert.tmpl >havecert + echo "$AFTSWM_ACTION_NEW_VERSION" >VERSION.node + chmod +x havecert + rm -f /opt/app/platform/rc.d/K90drtrnode /opt/app/platform/rc.d/S10drtrnode + ln -s ../init.d/drtrnode /opt/app/platform/rc.d/K90drtrnode + ln -s ../init.d/drtrnode /opt/app/platform/rc.d/S10drtrnode + ;; +'restore') + cp log4j.properties.save log4j.properties 2>/dev/null + cp node.properties.save node.properties 2>/dev/null + cp havecert.save havecert 2>/dev/null + ;; +'clean') + rm -f log4j.properties node.properties havecert log4j.properties.save node.properties.save havecert.save SHUTDOWN redirections.dat VERSION.node + rm -f /opt/app/platform/rc.d/K90drtrnode /opt/app/platform/rc.d/S10drtrnode + ;; +*) + exit 1 + ;; +esac +done +exit 0 diff --git a/datarouter-node/src/main/resources/misc/drtrnode b/datarouter-node/src/main/resources/misc/drtrnode new file mode 100644 index 00000000..ba784f36 --- /dev/null +++ b/datarouter-node/src/main/resources/misc/drtrnode @@ -0,0 +1,114 @@ +#!/bin/bash + +umask 0022 +TZ=GMT0 +export TZ +PATH=/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/sbin:/opt/java/jdk/jdk180/bin +export PATH +CLASSPATH=`echo /opt/app/datartr/etc /opt/app/datartr/lib/*.jar | tr ' ' ':'` +export CLASSPATH + +pids() { + ps -ef | grep java | grep node.NodeMain | sed -e 's/[^ ]* *//' -e 's/ .*//' +} + +start() { + ID=`id -n -u` + GRP=`id -n -g` + if [ "$ID" != "root" ] + then + echo drtrnode must be started as user datartr not $ID + exit 1 + fi + if [ "$GRP" != "datartr" ] + then + echo drtrnode must be started as group datartr not $GRP + exit 1 + fi + cd /opt/app/datartr + if etc/havecert + then + echo >/dev/null + else + echo No certificate file available. Cannot start + exit 0 + fi + PIDS=`pids` + if [ "$PIDS" != "" ] + then + echo drtrnode already running + exit 0 + fi + + mkdir -p /opt/app/datartr/spool/s + chmod 755 /opt/app/datartr/spool/s + + rm -f /opt/app/datartr/etc/SHUTDOWN + nohup java com.att.research.datarouter.node.NodeMain /dev/null 2>&1 & + sleep 5 + PIDS=`pids` + if [ "$PIDS" = "" ] + then + echo drtrnode startup failed + else + echo drtrnode started + fi +} + +stop() { + ID=`id -n -u` + GRP=`id -n -g` + if [ "$ID" != "datartr" ] + then + echo drtrnode must be stopped as user datartr not $ID + exit 1 + fi + if [ "$GRP" != "datartr" ] + then + echo drtrnode must be stopped as group datartr not $GRP + exit 1 + fi + touch /opt/app/datartr/etc/SHUTDOWN + PIDS=`pids` + if [ "$PIDS" != "" ] + then + sleep 5 + kill -9 $PIDS + sleep 5 + echo drtrnode stopped + else + echo drtrnode not running + fi +} + +status() { + PIDS=`pids` + if [ "$PIDS" != "" ] + then + echo drtrnode running + else + echo drtrnode not running + fi +} + +case "$1" in +'start') + start + ;; +'stop') + stop + ;; +'restart') + stop + sleep 20 + start + ;; +'status') + status + ;; +*) + echo "Usage: $0 { start | stop | restart }" + exit 1 + ;; +esac +exit 0 diff --git a/datarouter-node/src/main/resources/misc/havecert.tmpl b/datarouter-node/src/main/resources/misc/havecert.tmpl new file mode 100644 index 00000000..2e813ba3 --- /dev/null +++ b/datarouter-node/src/main/resources/misc/havecert.tmpl @@ -0,0 +1,11 @@ +#!/bin/bash +cat <>${DRTR_NODE_LOGS:-logs}/node.log +exit 1 +!EOF diff --git a/datarouter-node/src/main/resources/misc/log4j.properties.tmpl b/datarouter-node/src/main/resources/misc/log4j.properties.tmpl new file mode 100644 index 00000000..24bd3df0 --- /dev/null +++ b/datarouter-node/src/main/resources/misc/log4j.properties.tmpl @@ -0,0 +1,11 @@ +cat < + + 4.0.0 + + com.att.datarouter-prov + datarouter-prov + 0.0.1-SNAPSHOT + jar + + datarouter-prov + https://github.com/att/DMAAP_DATAROUTER + + + BSD License + + + + + UTF-8 + 1.8 + 1.8 + ${basedir}/target/ + hub.docker.com + + + + + org.json + json + 20160810 + + + javax.mail + javax.mail-api + 1.5.1 + + + com.att.eelf + eelf-core + 0.0.1 + + + javax.servlet + servlet-api + 2.5 + + + + org.eclipse.jetty + jetty-server + 7.6.14.v20131031 + + + org.eclipse.jetty + jetty-continuation + 7.6.14.v20131031 + + + org.eclipse.jetty + jetty-util + 7.6.14.v20131031 + + + org.eclipse.jetty + jetty-deploy + 7.6.14.v20131031 + + + org.eclipse.jetty + jetty-servlet + 7.6.14.v20131031 + + + org.eclipse.jetty + jetty-servlets + 7.6.14.v20131031 + + + org.eclipse.jetty + jetty-http + 7.6.14.v20131031 + + + + org.eclipse.jetty + jetty-security + 7.6.14.v20131031 + + + + org.eclipse.jetty + jetty-websocket + 7.6.14.v20131031 + + + + org.eclipse.jetty + jetty-io + 7.6.14.v20131031 + + + + org.apache.commons + commons-io + 1.3.2 + + + commons-lang + commons-lang + 2.4 + + + commons-io + commons-io + 2.1 + compile + + + org.apache.httpcomponents + httpcore + 4.2.2 + + + + org.mozilla + rhino + 1.7R3 + + + org.apache.james + apache-mime4j-core + 0.7 + + + org.apache.httpcomponents + httpclient + 4.2.3 + + + org.sonatype.http-testing-harness + junit-runner + 0.11 + + + junit + junit + 4.10 + test + + + + org.mockito + mockito-core + 1.10.19 + test + + + org.powermock + powermock-module-junit4 + 1.6.4 + test + + + org.powermock + powermock-api-mockito + 1.6.4 + test + + + + mysql + mysql-connector-java + 5.1.21 + + + org.eclipse.jetty.cdi + cdi-websocket + 9.3.11.v20160721 + + + + log4j + log4j + 1.2.17 + compile + + + + + datarouter-prov + + + src/main/resources + true + + **/*.properties + + + + src/main/resources + true + + **/proserver.properties + + + + src/main/resources + true + + **/EelfMessages.properties + + + + src/main/resources + true + + **/log4j.properties + + + + + + + + maven-assembly-plugin + 2.4 + + + jar-with-dependencies + + ${basedir}/target/opt/app/datartr/lib + + + + true + com.att.research.datarouter.provisioning.Main + + + + + + + + make-assembly + package + + single + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + true + com.att.research.datarouter.provisioning.Main + ${basedir}/target/opt/app/datartr/lib + + + + 1.8 + 1.8 + + 3.6.0 + + + org.apache.maven.plugins + maven-resources-plugin + 2.7 + + + copy-docker-file + package + + copy-resources + + + ${dockerLocation} + true + + + ${basedir}/src/main/resources/docker + true + + **/* + + + + + + + + + com.spotify + docker-maven-plugin + 0.4.11 + + datarouter-prov + ${dockerLocation} + docker-hub + https://${docker.registry} + + ${project.version} + latest + + true + + + + + + com.blackducksoftware.integration + hub-maven-plugin + 1.0.4 + false + + ${project.basedir} + + + + create-bdio-file + package + + createHubOutput + + + + + + + maven-resources-plugin + 2.7 + + + copy-resources-1 + validate + + copy-resources + + + ${basedir}/target/opt/app/datartr/lib + + + ${project.basedir}/src/main/resources + + **/*.jar + + + + + + + copy-resources-2 + validate + + copy-resources + + + ${basedir}/target/opt/app/datartr/etc + + + ${basedir}/src/main/resources + + misc/** + **/** + + + + + + + copy-resources-3 + validate + + copy-resources + + + ${basedir}/target/opt/app/datartr + + + ${basedir}/data + + misc/** + **/** + + + + + + + copy-resources-4 + validate + + copy-resources + + + ${basedir}/target/opt/app/datartr/self_signed + + + ${basedir}/self_signed + + misc/** + **/** + + + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 2.10 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/opt/app/datartr/lib + false + false + true + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + false + + + + attach-javadocs + + jar + + + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + + jar-no-fork + + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.5 + + + sign-artifacts + verify + + sign + + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + true + + ossrhdme + https://oss.sonatype.org/ + true + + + + + org.codehaus.mojo + cobertura-maven-plugin + 2.7 + + + html + xml + + + + + + + + + + ossrhdme + https://oss.sonatype.org/content/repositories/snapshots + + + ossrhdme + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + + https://github.com/att/DMAAP_DATAROUTER.git + ${project.scm.connection} + https://github.com/att/DMAAP_DATAROUTER/tree/master + + + diff --git a/datarouter-prov/self_signed/cacerts.jks b/datarouter-prov/self_signed/cacerts.jks new file mode 100644 index 0000000000000000000000000000000000000000..76a480ad6857e7dc62c603f1354d7b15faebb203 GIT binary patch literal 983 zcmezO_TO6u1_mY|W(3o$xs}$oOHCFHAM=j7 z`Cr?~5}tn6{pwej&vLJ#KEI7Rb|QWG-*Z#$+s}An=U(_U%_4i6{)_pnnu)WQ{XRa) zH{~=i$S>aUi+988H7~nfnjPnz(z5S_jb&c8dFD2$KN4a)C;eV{ z&;8QFCw6E0%{b)ue3o|J;Ou8O<2hgL9@Sq_(Wk{q#3Lpm}FvS4_k&$8QmfRhFTOMzmAIW#h!{qMg zMFOGqty5B1Rrj!(bbRZH^}ZQzzp5?ryVm~<0|P$84Hg=yzq4ikD0(u@`7^6x{cpQP z_NFJz-2HCv=8*P3HU0a#uPZGxEz)~WZP(R(dgPDYrvoQ%hcha8y+~WL?v}~tKcC-d zwx9IfaaW;R#Y%8nHmkwjMRPZ{|C9Ur;&=c@!>?-%Cz#WdVm1oIdN`~WSzV^ZA@Yp5 zi$_ARGyk#TR*O4ZAFXp+Gjr`i5wH1+uYS3~H-)o&&8>`FyO^im-pfE_tA=x6?)1V145h7W` zL>VoXY=urU$8tK;Veoj)xt`}a*VP~JytrR{KQHdLc^02kz4BK+E&GeaXNrYZUy z`e<~%zd9)bpO`Rp+88NHkUJH+GXG=z69EZr5TLppGL>#3zC5_l`2teFI1u%6fm!_d zbwX6{q5Mxm2LxLWI~Q)a&zixnm3L&S<+5d)VKE*MIu+QKsIX4bFCx9 zSErF)Y|j!q&Nf7!v~9X$_+17Xw>pfs(dvkv+foUlPI}tjJYaCQ?M!joFUL+mJ(;R@ zfjYqAjuep5b#9c##>>VMaw-B7RHcVnm(M5PxPPl|#bbl{3nGK`?Fg|kX;7_(XWR?> zB_U&Byh*hxJ|_j8RiCi0TV|A2rTokr1 zj>r93nW{+)rxuQ9KEL*YIsWIF1Lt=e#+ba`m8Qs=SMb1?|TUKK7#Rb-?7*fMrq{mEn7uDo2T@S{h z#bj+Ghge#Z`?Yu2To0EZ2ej<)h;$reueo<=TkG60hdT~4M>LdU=mzMlp5o2?j=<|K zqOqyHMxJF9W=t%sO=c1b+WYBWh$?OBd#yY#mdFrwR>~R@XjA;N7jB!WT|9!lkb_Ad zW8IhIdGgI%l^GtK77BQ(P3IDw1ClKdeI=JTwd$3gl@@>X<^-zm@zF&vn=B@wvxnP* zdF*B9T=G27Hl{c)zO6K4R+vTrX+JzOnB8`2BTrdngsd)diDa-4%BxB^(azvO#U ztE2dZn!NhG-qEK5$eqx)YdXh!Tg{I$4@&(Hux@^ zK3&Frezoxuu`=7wMNnL!@>M6nxMp(U#~}lSd;B>X`{bCuCehS*eUlk&q}Ji7 zLQ0qNsend!M7UC(py^DP{r=YSdshxZ4F(&2(0Ly@p8yHUlfJl&E?)VWRTrnCZ?@SJ z4-6fvJ!G+GD&ABdkSbT)>M_*h8(lmQk&*|U-nr06Ix zL^DDUx&%XB9Qou@7V^x$DdXzvXJj`eU6ezi=0m%OxNITd!%HgJ1MghlGW@ckgyD!5 zkbE04fDp6%a!cbvtq5P5$qtF&5~F@?M(hOPba0sk8w_*jY@|`C4)% zu$62{cp4(ldyvicc{T|AR+_X^QbtK(Cd+pa2vK*G(n*fr3ar%^S8iID!DM2LS=2bwzd)6Xps>iu1UJ5<-cQq5mPg zKM|gv2>(xH&;N)LsPOO3Fmb6c5-~z2(EqYOiFoC55Ggb$QU^y20TrbMbqzpWJ@AMg z3Ufr?>o2MQU-|zXjsooar&+tN1V#abAs_|dhf)9lLLIDvj8%2j5O+q4>0IS(4uSrRA(ii-@PI*E*#chEqa9nvBoGy$pjCPB~6MY=#dCfGbH&rd0Qu_Oh zhm0=8z0mPj*C@XV)t_1~6LF1A$__-RwITFx{&DM! z{Rcm@>KmRM&7E@~D5P75Q4iWAC~#FyK<{x2u0Gy0Ei*$6plz@dt-Nwhuktvw?^MI{ znBeqRPqno_cXLhGItM1)0cE=-d@b#x{yKO4n?~}0WAl`9n(9$eIspN;mlf3~Z!3OZ z>O09Bw=o{q4+{?PsTVn8YhEK!6QRi~@dcJDAS0SWoL6bWOg7EYtZVN+DwVJ+E@%G! zUMQO{vi4)w1mFz_idU}wme}|p?0S}u`oiVZIoxfrcO5QflEO>$_4}knzbh6Wt%!Bi n#_OXe+auYGQq)1I+Y2Mwb12e=-`1Ad6b%*6aMH6}XW9J=fbibl literal 0 HcmV?d00001 diff --git a/datarouter-prov/self_signed/mykey.cer b/datarouter-prov/self_signed/mykey.cer new file mode 100644 index 0000000000000000000000000000000000000000..2a5c9d701adebe1c1bad90d8157287ecb3cce6d4 GIT binary patch literal 921 zcmXqLVxDTy#8kV0nTe5!iN&xhKi+_sjZ>@5qwPB{BO^B}gF&^SfB_#Hb0`a&FnefT zc3ysY9!!J-LxdeegbPE28zCZZAPX{?Sy-%~D8EcEC9x#2D8IBMwJ0yOTrW94*Fa93 z*U;R+(AdDp*wn(vBnr$mfpZ7eaZQX$$brSk%D~*j$j@NV#K^_e#K_37^7L|sE6We9 z&j0^%=7Ejjq1!k<<{fqOzqXSlJpHWu)vqp}$RE2;2TtA& zXH@Wdk+x>tEtAiGKEKgyKk2*Uu0pqpmEg8)R)f8Z=5B2NC-?Qm@c@p7U)LH=FsCQQ zY!ry~a9Ay}x=f2hf!H&-03&$CoAuthorizationResponse interface gives the caller access to information about an authorization + * decision. This information includes the permit/deny decision itself, along with supplementary information in the form of + * advice and obligations. (The advice and obligations will not be used in Data Router R1.) + * + * @author J. F. Lucas + * + */ +public interface AuthorizationResponse { + /** + * Indicates whether the request is authorized or not. + * + * @return a boolean flag that is true if the request is permitted, and false otherwise. + */ + public boolean isAuthorized(); + + /** + * Returns any advice elements that were included in the authorization response. + * + * @return A list of objects implementing the AuthorizationResponseSupplement interface, with each object representing an + * advice element from the authorization response. + */ + public List getAdvice(); + + /** + * Returns any obligation elements that were included in the authorization response. + * + * @return A list of objects implementing the AuthorizationResponseSupplement interface, with each object representing an + * obligation element from the authorization response. + */ + public List getObligations(); +} diff --git a/datarouter-prov/src/main/java/com/att/research/datarouter/authz/AuthorizationResponseSupplement.java b/datarouter-prov/src/main/java/com/att/research/datarouter/authz/AuthorizationResponseSupplement.java new file mode 100644 index 00000000..2829c507 --- /dev/null +++ b/datarouter-prov/src/main/java/com/att/research/datarouter/authz/AuthorizationResponseSupplement.java @@ -0,0 +1,52 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.authz; + +import java.util.Map; + +/** An object that meets the AuthorizationResponseSupplement interface carries supplementary + * information for an authorization response. In a XACML-based system, a response to an authorization request + * carries not just the permit/deny decision but, optionally, supplemental information in the form of advice and + * obligation elements. The structure of a XACML advice element and a XACML obligation element are similar: each has an identifier and + * a set of attributes (name-value) pairs. (The difference between a XACML advice element and a XACML obligation element is in + * how the recipient of the response--the Policy Enforcement Point, in XACML terminology--handles the element.) + * + * @author J. F. Lucas + * + */ +public interface AuthorizationResponseSupplement { + /** Return the identifier for the supplementary information element. + * + * @return a String containing the identifier. + */ + public String getId(); + + /** Return the attributes for the supplementary information element, as a Map in which + * keys represent attribute identifiers and values represent attribute values. + * + * @return attributes for the supplementary information element. + */ + public Map getAttributes(); +} diff --git a/datarouter-prov/src/main/java/com/att/research/datarouter/authz/Authorizer.java b/datarouter-prov/src/main/java/com/att/research/datarouter/authz/Authorizer.java new file mode 100644 index 00000000..bfed5c37 --- /dev/null +++ b/datarouter-prov/src/main/java/com/att/research/datarouter/authz/Authorizer.java @@ -0,0 +1,62 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.authz; + +import java.util.Map; +import javax.servlet.http.HttpServletRequest; + +/** + * A Data Router API that requires authorization of incoming requests creates an instance of a class that implements + * the Authorizer interface. The class implements all of the logic necessary to determine if an API + * request is permitted. In Data Router R1, the classes that implement the Authorizer interface will have + * local logic that makes the authorization decision. After R1, these classes will instead have logic that creates XACML + * authorization requests, sends these requests to a Policy Decision Point (PDP), and parses the XACML responses. + * + * @author J. F. Lucas + * + */ +public interface Authorizer { + /** + * Determine if the API request carried in the request parameter is permitted. + * + * @param request the HTTP request for which an authorization decision is needed + * @return an object implementing the AuthorizationResponse interface. This object includes the + * permit/deny decision for the request and (after R1) supplemental information related to the response in the form + * of advice and obligations. + */ + public AuthorizationResponse decide(HttpServletRequest request); + + /** + * Determine if the API request carried in the request parameter, with additional attributes provided in + * the additionalAttrs parameter, is permitted. + * + * @param request the HTTP request for which an authorization decision is needed + * @param additionalAttrs additional attributes that the Authorizer can in making an authorization decision + * @return an object implementing the AuthorizationResponse interface. This object includes the + * permit/deny decision for the request and (after R1) supplemental information related to the response in the form + * of advice and obligations. + */ + public AuthorizationResponse decide(HttpServletRequest request, Map additionalAttrs); +} diff --git a/datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthRespImpl.java b/datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthRespImpl.java new file mode 100644 index 00000000..db318d39 --- /dev/null +++ b/datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthRespImpl.java @@ -0,0 +1,97 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.authz.impl; + +import java.util.ArrayList; +import java.util.List; + +import com.att.research.datarouter.authz.AuthorizationResponse; +import com.att.research.datarouter.authz.AuthorizationResponseSupplement; + + +/** A representation of an authorization response returned by a XACML Policy Decision Point. + * In Data Router R1, advice and obligations are not used. + * @author J. F. Lucas + * + */ +public class AuthRespImpl implements AuthorizationResponse { + private boolean authorized; + private List advice; + private List obligations; + + /** Constructor. This version will not be used in Data Router R1 since we will not have advice and obligations. + * + * @param authorized flag indicating whether the response carried a permit response (true) + * or something else (false). + * @param advice list of advice elements returned in the response. + * @param obligations list of obligation elements returned in the response. + */ + public AuthRespImpl(boolean authorized, List advice, List obligations) { + this.authorized = authorized; + this.advice = (advice == null ? null : new ArrayList (advice)); + this.obligations = (obligations == null ? null : new ArrayList (obligations)); + } + + /** Constructor. Simple version for authorization responses that have no advice and no obligations. + * + * @param authorized flag indicating whether the response carried a permit (true) or something else (false). + */ + public AuthRespImpl(boolean authorized) { + this(authorized, null, null); + } + + /** + * Indicates whether the request is authorized or not. + * + * @return a boolean flag that is true if the request is permitted, and false otherwise. + */ + @Override + public boolean isAuthorized() { + return authorized; + } + + /** + * Returns any advice elements that were included in the authorization response. + * + * @return A list of objects implementing the AuthorizationResponseSupplement interface, with each object representing an + * advice element from the authorization response. + */ + @Override + public List getAdvice() { + return advice; + } + + /** + * Returns any obligation elements that were included in the authorization response. + * + * @return A list of objects implementing the AuthorizationResponseSupplement interface, with each object representing an + * obligation element from the authorization response. + */ + @Override + public List getObligations() { + return obligations; + } + +} diff --git a/datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthRespSupplementImpl.java b/datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthRespSupplementImpl.java new file mode 100644 index 00000000..5d2b61c8 --- /dev/null +++ b/datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthRespSupplementImpl.java @@ -0,0 +1,71 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.authz.impl; + +import java.util.HashMap; +import java.util.Map; + +import com.att.research.datarouter.authz.AuthorizationResponseSupplement; + +/** Carries supplementary information--an advice or an obligation--from the authorization response returned + * by a XACML Policy Decision Point. Not used in Data Router R1. + * @author J. F. Lucas + * + */ +public class AuthRespSupplementImpl implements AuthorizationResponseSupplement { + + private String id = null; + private Map attributes = null; + + /** Constructor, available within the package. + * + * @param id The identifier for the advice or obligation element + * @param attributes The attributes (name-value pairs) for the advice or obligation element. + */ + AuthRespSupplementImpl (String id, Map attributes) { + this.id = id; + this.attributes = new HashMap(attributes); + } + + /** Return the identifier for the supplementary information element. + * + * @return a String containing the identifier. + */ + @Override + public String getId() { + return id; + } + + /** Return the attributes for the supplementary information element, as a Map in which + * keys represent attribute identifiers and values represent attribute values. + * + * @return attributes for the supplementary information element. + */ + @Override + public Map getAttributes() { + return attributes; + } + +} diff --git a/datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthzResource.java b/datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthzResource.java new file mode 100644 index 00000000..1a201b7e --- /dev/null +++ b/datarouter-prov/src/main/java/com/att/research/datarouter/authz/impl/AuthzResource.java @@ -0,0 +1,100 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package com.att.research.datarouter.authz.impl; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** Internal representation of an authorization resource (the entity to which access is being requested). Consists + * of a type and an identifier. The constructor takes the request URI from an HTTP request and checks it against + * patterns for the the different resource types. In DR R1, there are four resource types: + *