2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.cdap.tca;
23 import co.cask.cdap.api.app.AbstractApplication;
24 import co.cask.cdap.api.data.stream.Stream;
25 import co.cask.cdap.api.dataset.DatasetProperties;
26 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
27 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
28 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusPersister;
29 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister;
30 import org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils;
31 import org.openecomp.dcae.apod.analytics.cdap.tca.flow.TCAVESCollectorFlow;
32 import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppConfig;
33 import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAAppConfigValidator;
34 import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPMockSubscriberWorker;
35 import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPPublisherWorker;
36 import org.openecomp.dcae.apod.analytics.cdap.tca.worker.TCADMaaPSubscriberWorker;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * @author Rajiv Singla . Creation Date: 10/21/2016.
43 public class TCAAnalyticsApplication extends AbstractApplication<TCAAppConfig> {
45 private static final Logger LOG = LoggerFactory.getLogger(TCAAnalyticsApplication.class);
48 public void configure() {
51 // ========= Application configuration Setup ============== //
52 final TCAAppConfig tcaAppConfig = getConfig();
54 LOG.info("Configuring TCA Application with startup application configuration: {}", tcaAppConfig);
56 // Validate application configuration
57 ValidationUtils.validateSettings(tcaAppConfig, new TCAAppConfigValidator());
60 setName(tcaAppConfig.getAppName());
61 setDescription(tcaAppConfig.getAppDescription());
63 // ========== Streams Setup ============== //
64 // Create DMaaP MR Subscriber CDAP output stream
65 final String tcaSubscriberOutputStreamName = tcaAppConfig.getTcaSubscriberOutputStreamName();
66 LOG.info("Creating TCA VES Output Stream: {}", tcaSubscriberOutputStreamName);
67 final Stream subscriberOutputStream = new Stream(tcaSubscriberOutputStreamName,
68 CDAPComponentsConstants.TCA_FIXED_SUBSCRIBER_OUTPUT_DESCRIPTION_STREAM);
69 addStream(subscriberOutputStream);
72 // ============ Datasets Setup ======== //
73 // Create TCA Message Status Table
74 final String tcaVESMessageStatusTableName = tcaAppConfig.getTcaVESMessageStatusTableName();
75 final Integer messageStatusTableTTLSeconds = tcaAppConfig.getTcaVESMessageStatusTableTTLSeconds();
76 LOG.info("Creating TCA Message Status Table: {} with TTL: {}",
77 tcaVESMessageStatusTableName, messageStatusTableTTLSeconds);
78 final DatasetProperties messageStatusTableProperties =
79 TCAMessageStatusPersister.getDatasetProperties(messageStatusTableTTLSeconds);
80 createDataset(tcaVESMessageStatusTableName, ObjectMappedTable.class, messageStatusTableProperties);
82 // Create TCA VES Alerts Table
83 final String tcaVESAlertsTableName = tcaAppConfig.getTcaVESAlertsTableName();
84 final Integer alertsTableTTLSeconds = tcaAppConfig.getTcaVESAlertsTableTTLSeconds();
85 LOG.info("Creating TCA Alerts Table: {} with TTL: {}",
86 tcaVESAlertsTableName, alertsTableTTLSeconds);
87 final DatasetProperties alertTableProperties =
88 TCAVESAlertsPersister.getDatasetProperties(alertsTableTTLSeconds);
89 createDataset(tcaVESAlertsTableName, ObjectMappedTable.class, alertTableProperties);
91 // =========== Flow Setup ============= //
92 addFlow(new TCAVESCollectorFlow(tcaAppConfig));
94 // ========== Workers Setup =========== //
95 LOG.info("Creating TCA DMaaP Subscriber Worker");
96 addWorker(new TCADMaaPSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName()));
97 LOG.info("Creating TCA DMaaP Publisher Worker");
98 addWorker(new TCADMaaPPublisherWorker(tcaAppConfig.getTcaVESAlertsTableName()));
99 // TODO: Remove this before going to production
100 addWorker(new TCADMaaPMockSubscriberWorker(tcaAppConfig.getTcaSubscriberOutputStreamName()));