2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.cdap.tca;
\r
23 import co.cask.cdap.api.app.AbstractApplication;
\r
24 import co.cask.cdap.api.data.stream.Stream;
\r
25 import co.cask.cdap.api.dataset.DatasetProperties;
\r
26 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
\r
27 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
\r
28 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
\r
29 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister;
\r
30 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister;
\r
31 import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
\r
32 import org.openecomp.dcae.apod.analytics.cdap.tca.flow.TCAVESCollectorFlow;
\r
33 import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig;
\r
34 import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAAppConfigValidator;
\r
35 import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPMockSubscriberWorker;
\r
36 import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPPublisherWorker;
\r
37 import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPSubscriberWorker;
\r
38 import org.slf4j.Logger;
\r
39 import org.slf4j.LoggerFactory;
\r
42 * @author Rajiv Singla . Creation Date: 10/21/2016.
\r
44 public class TCAAnalyticsApplication extends AbstractApplication<TCAAppConfig> {
\r
46 private static final Logger LOG = LoggerFactory.getLogger(TCAAnalyticsApplication.class);
\r
49 @SuppressWarnings("unchecked")
\r
50 public void configure() {
\r
53 // ========= Application configuration Setup ============== //
\r
54 final TCAAppConfig tcaAppConfig = getConfig();
\r
56 LOG.info("Configuring TCA Application with startup application configuration: {}", tcaAppConfig);
\r
58 // Validate application configuration
\r
59 ValidationUtils.validateSettings(tcaAppConfig, new TCAAppConfigValidator());
\r
62 setName(tcaAppConfig.getAppName());
\r
63 setDescription(tcaAppConfig.getAppDescription());
\r
65 // ========== Streams Setup ============== //
\r
66 // Create DMaaP MR Subscriber CDAP output stream
\r
67 final String tcaSubscriberOutputStreamName = tcaAppConfig.getTcaSubscriberOutputStreamName();
\r
68 LOG.info("Creating TCA VES Output Stream: {}", tcaSubscriberOutputStreamName);
\r
69 final Stream subscriberOutputStream = new Stream(tcaSubscriberOutputStreamName,
\r
70 CDAPComponentsConstants.TCA_FIXED_SUBSCRIBER_OUTPUT_DESCRIPTION_STREAM);
\r
71 addStream(subscriberOutputStream);
\r
74 // ============ Datasets Setup ======== //
\r
75 // Create TCA Message Status Table
\r
76 final String tcaVESMessageStatusTableName = tcaAppConfig.getTcaVESMessageStatusTableName();
\r
77 final Integer messageStatusTableTTLSeconds = tcaAppConfig.getTcaVESMessageStatusTableTTLSeconds();
\r
78 LOG.info("Creating TCA Message Status Table: {} with TTL: {}",
\r
79 tcaVESMessageStatusTableName, messageStatusTableTTLSeconds);
\r
80 final DatasetProperties messageStatusTableProperties =
\r
81 TCAMessageStatusPersister.getDatasetProperties(messageStatusTableTTLSeconds);
\r
82 createDataset(tcaVESMessageStatusTableName, ObjectMappedTable.class, messageStatusTableProperties);
\r
85 // Create TCA Alerts Abatement Table
\r
86 final String tcaAlertsAbatementTableName = tcaAppConfig.getTcaAlertsAbatementTableName();
\r
87 final Integer tcaAlertsAbatementTableTTLSeconds = tcaAppConfig.getTcaAlertsAbatementTableTTLSeconds();
\r
88 LOG.info("Creating Alerts Abatement Table: {} with TTL: {}",
\r
89 tcaAlertsAbatementTableName, tcaAlertsAbatementTableTTLSeconds);
\r
90 final DatasetProperties alertsAbatementTableProperties =
\r
91 TCAAlertsAbatementPersister.getDatasetProperties(tcaAlertsAbatementTableTTLSeconds);
\r
92 createDataset(tcaAlertsAbatementTableName, ObjectMappedTable.class, alertsAbatementTableProperties);
\r
94 // Create TCA VES Alerts Table
\r
95 final String tcaVESAlertsTableName = tcaAppConfig.getTcaVESAlertsTableName();
\r
96 final Integer alertsTableTTLSeconds = tcaAppConfig.getTcaVESAlertsTableTTLSeconds();
\r
97 LOG.info("Creating TCA Alerts Table: {} with TTL: {}",
\r
98 tcaVESAlertsTableName, alertsTableTTLSeconds);
\r
99 final DatasetProperties alertTableProperties =
\r
100 TCAVESAlertsPersister.getDatasetProperties(alertsTableTTLSeconds);
\r
101 createDataset(tcaVESAlertsTableName, ObjectMappedTable.class, alertTableProperties);
\r
103 // =========== Flow Setup ============= //
\r
104 addFlow(new TCAVESCollectorFlow(tcaAppConfig));
\r
106 // ========== Workers Setup =========== //
\r
107 LOG.info("Creating TCA DMaaP Subscriber Worker");
\r
108 addWorker(new TCADMaaPSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName()));
\r
109 LOG.info("Creating TCA DMaaP Publisher Worker");
\r
110 addWorker(new TCADMaaPPublisherWorker(tcaAppConfig.getTcaVESAlertsTableName()));
\r
111 // TODO: Remove this before going to production
\r
112 addWorker(new TCADMaaPMockSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName()));
\r