Changed to unmaintained
[appc.git] / appc-event-listener / appc-event-listener-bundle / src / main / java / org / onap / appc / listener / LCM / impl / ListenerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.appc.listener.LCM.impl;
25
26 import com.att.eelf.configuration.EELFLogger;
27 import com.att.eelf.configuration.EELFManager;
28 import com.att.eelf.i18n.EELFResourceManager;
29 import com.fasterxml.jackson.databind.JsonNode;
30 import java.text.DateFormat;
31 import java.text.SimpleDateFormat;
32 import java.util.Date;
33 import java.util.List;
34 import java.util.TimeZone;
35 import java.util.concurrent.RejectedExecutionException;
36 import org.apache.commons.lang3.StringUtils;
37 import org.onap.appc.listener.AbstractListener;
38 import org.onap.appc.listener.LCM.conv.Converter;
39 import org.onap.appc.listener.LCM.model.DmaapIncomingMessage;
40 import org.onap.appc.listener.LCM.operation.ProviderOperations;
41 import org.onap.appc.listener.ListenerProperties;
42
43 public class ListenerImpl extends AbstractListener {
44
45     private final EELFLogger LOG = EELFManager.getInstance().getLogger(ListenerImpl.class);
46
47     private long startTime = 0;
48
49     private final ProviderOperations providerOperations;
50
51     public ListenerImpl(ListenerProperties props) {
52         super(props);
53
54         String url = props.getProperty("provider.url");
55         String user = props.getProperty("provider.user");
56         String pass = props.getProperty("provider.pass");
57         providerOperations = new ProviderOperations(url, user, pass);
58         LOG.info("DMaaP Provider Endpoint: " + url);
59     }
60
61     @Override
62     public void run() {
63         // Some vars for benchmarking
64         startTime = System.currentTimeMillis();
65
66         LOG.info("Running DMaaP Listener");
67
68         while (run.get()) {
69             // Only update if the queue is low. otherwise we read in more
70             // messages than we need
71             try {
72                 if (executor.getQueue().size() <= QUEUED_MIN) {
73                     LOG.debug("DMaaP queue running low. Querying for more jobs");
74
75                     List<DmaapIncomingMessage> messages = dmaap
76                         .getIncomingEvents(DmaapIncomingMessage.class, QUEUED_MAX);
77                     LOG.debug(String.format("Read %d messages from dmaap", messages.size()));
78                     for (DmaapIncomingMessage incoming : messages) {
79                         // Acknowledge that we read the event
80                         if (isValid(incoming)) {
81                             String requestIdWithSubId = getRequestIdWithSubId(incoming.getBody());
82                             LOG.info("Acknowledging Message: " + requestIdWithSubId);
83                         }
84                     }
85                     for (DmaapIncomingMessage incoming : messages) {
86                         String requestIdWithSubId = getRequestIdWithSubId(incoming.getBody());
87                         // Add to pool if still running
88                         if (run.get()) {
89                             if (isValid(incoming)) {
90                                 LOG.info(String.format("Adding DMaaP message to pool queue [%s]", requestIdWithSubId));
91                                 try {
92                                     executor.execute(new WorkerImpl(incoming, dmaap, providerOperations));
93                                 } catch (RejectedExecutionException rejectEx) {
94                                     LOG.error("Task Rejected: ", rejectEx);
95                                 }
96                             } else {
97                                 // Badly formed message
98                                 LOG.error("Message was not valid. Rejecting message: " + incoming);
99                             }
100                         } else {
101                             if (isValid(incoming)) {
102                                 LOG.info("Run stopped. Orphaning Message: " + requestIdWithSubId);
103                             } else {
104                                 // Badly formed message
105                                 LOG.error("Message was not valid. Rejecting message: " + incoming);
106                             }
107                         }
108                     }
109                 }
110             } catch (Exception e) {
111                 LOG.error("Exception " + e.getClass().getSimpleName() + " caught in DMaaP listener");
112                 LOG.error(EELFResourceManager.format(e));
113                 LOG.error("DMaaP Listener logging and ignoring the exception, continue...");
114             }
115         }
116
117         LOG.info("Stopping DMaaP Listener thread");
118
119         // We've told the listener to stop
120         // TODO - Should we:
121         // 1) Put a message back on the queue indicating that APP-C never got to
122         // the message
123         // or
124         // 2) Let downstream figure it out after timeout between PENDING and
125         // ACTIVE messages
126     }
127
128     private boolean isValid(DmaapIncomingMessage incoming) {
129         return ((incoming != null) &&
130             incoming.getBody() != null
131             && !StringUtils.isEmpty(incoming.getRpcName()));
132     }
133
134     @Override
135     public String getBenchmark() {
136         long time = System.currentTimeMillis();
137         DateFormat df = new SimpleDateFormat("HH:mm:ss");
138         df.setTimeZone(TimeZone.getTimeZone("UTC"));
139         String runningTime = df.format(new Date(time - startTime));
140
141         String out = String.format("Running for %s and completed %d jobs using %d threads.", runningTime,
142             executor.getCompletedTaskCount(), executor.getPoolSize());
143         LOG.info("***BENCHMARK*** " + out);
144         return out;
145     }
146
147     private String getRequestIdWithSubId(JsonNode event) {
148         String requestId = "";
149         try {
150             requestId = Converter.extractRequestIdWithSubId(event);
151         } catch (Exception e) {
152             LOG.error("failed to parse request-id and sub-request-id. Json not in expected format", e);
153         }
154         return requestId;
155     }
156 }