2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.onap.dcae.apod.analytics.cdap.tca;
23 import co.cask.cdap.api.app.AbstractApplication;
24 import co.cask.cdap.api.data.stream.Stream;
25 import co.cask.cdap.api.dataset.DatasetProperties;
26 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
27 import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
28 import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
29 import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister;
30 import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister;
31 import org.onap.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
32 import org.onap.dcae.apod.analytics.cdap.tca.flow.TCAVESCollectorFlow;
33 import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig;
34 import org.onap.dcae.apod.analytics.cdap.tca.validator.TCAAppConfigValidator;
35 import org.onap.dcae.apod.analytics.cdap.tca.worker.TCADMaaPMockSubscriberWorker;
36 import org.onap.dcae.apod.analytics.cdap.tca.worker.TCADMaaPPublisherWorker;
37 import org.onap.dcae.apod.analytics.cdap.tca.worker.TCADMaaPSubscriberWorker;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * @author Rajiv Singla . Creation Date: 10/21/2016.
44 public class TCAAnalyticsApplication extends AbstractApplication<TCAAppConfig> {
46 private static final Logger LOG = LoggerFactory.getLogger(TCAAnalyticsApplication.class);
49 @SuppressWarnings("unchecked")
50 public void configure() {
53 // ========= Application configuration Setup ============== //
54 final TCAAppConfig tcaAppConfig = getConfig();
56 LOG.info("Configuring TCA Application with startup application configuration: {}", tcaAppConfig);
58 // Validate application configuration
59 ValidationUtils.validateSettings(tcaAppConfig, new TCAAppConfigValidator());
62 setName(tcaAppConfig.getAppName());
63 setDescription(tcaAppConfig.getAppDescription());
65 // ========== Streams Setup ============== //
66 // Create DMaaP MR Subscriber CDAP output stream
67 final String tcaSubscriberOutputStreamName = tcaAppConfig.getTcaSubscriberOutputStreamName();
68 LOG.info("Creating TCA VES Output Stream: {}", tcaSubscriberOutputStreamName);
69 final Stream subscriberOutputStream = new Stream(tcaSubscriberOutputStreamName,
70 CDAPComponentsConstants.TCA_FIXED_SUBSCRIBER_OUTPUT_DESCRIPTION_STREAM);
71 addStream(subscriberOutputStream);
74 // ============ Datasets Setup ======== //
75 // Create TCA Message Status Table
76 final String tcaVESMessageStatusTableName = tcaAppConfig.getTcaVESMessageStatusTableName();
77 final Integer messageStatusTableTTLSeconds = tcaAppConfig.getTcaVESMessageStatusTableTTLSeconds();
78 LOG.info("Creating TCA Message Status Table: {} with TTL: {}",
79 tcaVESMessageStatusTableName, messageStatusTableTTLSeconds);
80 final DatasetProperties messageStatusTableProperties =
81 TCAMessageStatusPersister.getDatasetProperties(messageStatusTableTTLSeconds);
82 createDataset(tcaVESMessageStatusTableName, ObjectMappedTable.class, messageStatusTableProperties);
85 // Create TCA Alerts Abatement Table
86 final String tcaAlertsAbatementTableName = tcaAppConfig.getTcaAlertsAbatementTableName();
87 final Integer tcaAlertsAbatementTableTTLSeconds = tcaAppConfig.getTcaAlertsAbatementTableTTLSeconds();
88 LOG.info("Creating Alerts Abatement Table: {} with TTL: {}",
89 tcaAlertsAbatementTableName, tcaAlertsAbatementTableTTLSeconds);
90 final DatasetProperties alertsAbatementTableProperties =
91 TCAAlertsAbatementPersister.getDatasetProperties(tcaAlertsAbatementTableTTLSeconds);
92 createDataset(tcaAlertsAbatementTableName, ObjectMappedTable.class, alertsAbatementTableProperties);
94 // Create TCA VES Alerts Table
95 final String tcaVESAlertsTableName = tcaAppConfig.getTcaVESAlertsTableName();
96 final Integer alertsTableTTLSeconds = tcaAppConfig.getTcaVESAlertsTableTTLSeconds();
97 LOG.info("Creating TCA Alerts Table: {} with TTL: {}",
98 tcaVESAlertsTableName, alertsTableTTLSeconds);
99 final DatasetProperties alertTableProperties =
100 TCAVESAlertsPersister.getDatasetProperties(alertsTableTTLSeconds);
101 createDataset(tcaVESAlertsTableName, ObjectMappedTable.class, alertTableProperties);
103 // =========== Flow Setup ============= //
104 addFlow(new TCAVESCollectorFlow(tcaAppConfig));
106 // ========== Workers Setup =========== //
107 LOG.info("Creating TCA DMaaP Subscriber Worker");
108 addWorker(new TCADMaaPSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName()));
109 LOG.info("Creating TCA DMaaP Publisher Worker");
110 addWorker(new TCADMaaPPublisherWorker(tcaAppConfig.getTcaVESAlertsTableName()));
111 // TODO: Remove this before going to production
112 addWorker(new TCADMaaPMockSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName()));