2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.cdap.tca;
23 import co.cask.cdap.api.app.AbstractApplication;
24 import co.cask.cdap.api.data.stream.Stream;
25 import co.cask.cdap.api.dataset.DatasetProperties;
26 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
27 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
28 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
29 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister;
30 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister;
31 import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
32 import org.openecomp.dcae.apod.analytics.cdap.tca.flow.TCAVESCollectorFlow;
33 import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig;
34 import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAAppConfigValidator;
35 import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPMockSubscriberWorker;
36 import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPPublisherWorker;
37 import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPSubscriberWorker;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * @author Rajiv Singla . Creation Date: 10/21/2016.
44 public class TCAAnalyticsApplication extends AbstractApplication<TCAAppConfig> {
46 private static final Logger LOG = LoggerFactory.getLogger(TCAAnalyticsApplication.class);
49 public void configure() {
52 // ========= Application configuration Setup ============== //
53 final TCAAppConfig tcaAppConfig = getConfig();
55 LOG.info("Configuring TCA Application with startup application configuration: {}", tcaAppConfig);
57 // Validate application configuration
58 ValidationUtils.validateSettings(tcaAppConfig, new TCAAppConfigValidator());
61 setName(tcaAppConfig.getAppName());
62 setDescription(tcaAppConfig.getAppDescription());
64 // ========== Streams Setup ============== //
65 // Create DMaaP MR Subscriber CDAP output stream
66 final String tcaSubscriberOutputStreamName = tcaAppConfig.getTcaSubscriberOutputStreamName();
67 LOG.info("Creating TCA VES Output Stream: {}", tcaSubscriberOutputStreamName);
68 final Stream subscriberOutputStream = new Stream(tcaSubscriberOutputStreamName,
69 CDAPComponentsConstants.TCA_FIXED_SUBSCRIBER_OUTPUT_DESCRIPTION_STREAM);
70 addStream(subscriberOutputStream);
73 // ============ Datasets Setup ======== //
74 // Create TCA Message Status Table
75 final String tcaVESMessageStatusTableName = tcaAppConfig.getTcaVESMessageStatusTableName();
76 final Integer messageStatusTableTTLSeconds = tcaAppConfig.getTcaVESMessageStatusTableTTLSeconds();
77 LOG.info("Creating TCA Message Status Table: {} with TTL: {}",
78 tcaVESMessageStatusTableName, messageStatusTableTTLSeconds);
79 final DatasetProperties messageStatusTableProperties =
80 TCAMessageStatusPersister.getDatasetProperties(messageStatusTableTTLSeconds);
81 createDataset(tcaVESMessageStatusTableName, ObjectMappedTable.class, messageStatusTableProperties);
84 // Create TCA Alerts Abatement Table
85 final String tcaAlertsAbatementTableName = tcaAppConfig.getTcaAlertsAbatementTableName();
86 final Integer tcaAlertsAbatementTableTTLSeconds = tcaAppConfig.getTcaAlertsAbatementTableTTLSeconds();
87 LOG.info("Creating Alerts Abatement Table: {} with TTL: {}",
88 tcaAlertsAbatementTableName, tcaAlertsAbatementTableTTLSeconds);
89 final DatasetProperties alertsAbatementTableProperties =
90 TCAAlertsAbatementPersister.getDatasetProperties(tcaAlertsAbatementTableTTLSeconds);
91 createDataset(tcaAlertsAbatementTableName, ObjectMappedTable.class, alertsAbatementTableProperties);
93 // Create TCA VES Alerts Table
94 final String tcaVESAlertsTableName = tcaAppConfig.getTcaVESAlertsTableName();
95 final Integer alertsTableTTLSeconds = tcaAppConfig.getTcaVESAlertsTableTTLSeconds();
96 LOG.info("Creating TCA Alerts Table: {} with TTL: {}",
97 tcaVESAlertsTableName, alertsTableTTLSeconds);
98 final DatasetProperties alertTableProperties =
99 TCAVESAlertsPersister.getDatasetProperties(alertsTableTTLSeconds);
100 createDataset(tcaVESAlertsTableName, ObjectMappedTable.class, alertTableProperties);
102 // =========== Flow Setup ============= //
103 addFlow(new TCAVESCollectorFlow(tcaAppConfig));
105 // ========== Workers Setup =========== //
106 LOG.info("Creating TCA DMaaP Subscriber Worker");
107 addWorker(new TCADMaaPSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName()));
108 LOG.info("Creating TCA DMaaP Publisher Worker");
109 addWorker(new TCADMaaPPublisherWorker(tcaAppConfig.getTcaVESAlertsTableName()));
110 // TODO: Remove this before going to production
111 addWorker(new TCADMaaPMockSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName()));