98a78478b9e0d374dcff0f8cb49901d6b68198bb
[aai/aai-common.git] / aai-core / src / main / java / org / openecomp / aai / util / AAITxnLog.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.openecomp.aai
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
10
11      http://www.apache.org/licenses/LICENSE-2.0
12
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.aai.util;
22
23 import com.att.eelf.configuration.EELFLogger;
24 import com.att.eelf.configuration.EELFManager;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.client.*;
27 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
28 import org.apache.hadoop.hbase.filter.Filter;
29 import org.apache.hadoop.hbase.filter.FilterList;
30 import org.apache.hadoop.hbase.filter.RegexStringComparator;
31 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.openecomp.aai.domain.notificationEvent.NotificationEvent;
34 import org.openecomp.aai.domain.translog.TransactionLogEntries;
35 import org.openecomp.aai.domain.translog.TransactionLogEntry;
36 import org.openecomp.aai.exceptions.AAIException;
37
38 import java.io.IOException;
39 import java.text.DateFormat;
40 import java.text.SimpleDateFormat;
41 import java.util.ArrayList;
42 import java.util.Date;
43 import java.util.List;
44
45 public class AAITxnLog {
46
47         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(AAITxnLog.class);
48
49         private final org.apache.hadoop.conf.Configuration config;
50         private HTable table = null;
51         private String tm = null;
52
53         /**
54          * Instantiates a new AAI txn log.
55          *
56          * @param transId the trans id
57          * @param fromAppId the from app id
58          */
59         public AAITxnLog(String transId, String fromAppId) {
60                 /* When you create a HBaseConfiguration, it reads in whatever you've set
61                 into your hbase-site.xml and in hbase-default.xml, as long as these can
62                 be found on the CLASSPATH */
63
64                 config = HBaseConfiguration.create();
65                 
66                 try {
67                         config.set(AAIConstants.ZOOKEEPER_ZNODE_PARENT, AAIConfig.get(AAIConstants.HBASE_ZOOKEEPER_ZNODE_PARENT));
68                         config.set(AAIConstants.HBASE_CONFIGURATION_ZOOKEEPER_QUORUM, AAIConfig.get(AAIConstants.HBASE_CONFIGURATION_ZOOKEEPER_QUORUM));
69                         config.set(AAIConstants.HBASE_CONFIGURATION_ZOOKEEPER_CLIENTPORT, AAIConfig.get(AAIConstants.HBASE_CONFIGURATION_ZOOKEEPER_CLIENTPORT));
70                         
71                         Date date = new Date();
72                         DateFormat formatter = new SimpleDateFormat(AAIConfig.get(AAIConstants.HBASE_TABLE_TIMESTAMP_FORMAT));
73                         tm = formatter.format(date);
74                 } catch (AAIException e) {
75                         LOGGER.warn("Missing configuration in AAIConfig: " + e.getMessage());
76                 }
77         }
78
79         /**
80          * Put.
81          *
82          * @param status the status
83          * @param srcId the src id
84          * @param rsrcId the rsrc id
85          * @param rsrcType the rsrc type
86          * @param rqstBuf the rqst buf
87          * @param respBuf the resp buf
88          * @return the string
89          */
90         public String put(
91                         String status,
92                         String srcId,
93                         String rsrcId,
94                         String rsrcType,
95                         String rqstBuf,
96                         String respBuf
97                         ) {
98                 return put ("",status,"","",srcId,rsrcId,rsrcType,rqstBuf,respBuf,false,new NotificationEvent());
99
100         }
101
102         /**
103          * Put.
104          *
105          * @param tid the tid
106          * @param status the status
107          * @param srcId the src id
108          * @param rsrcId the rsrc id
109          * @param rsrcType the rsrc type
110          * @param rqstBuf the rqst buf
111          * @param respBuf the resp buf
112          * @return the string
113          */
114         public String put(
115                         String tid,
116                         String status,
117                         String srcId,
118                         String rsrcId,
119                         String rsrcType,
120                         String rqstBuf,
121                         String respBuf
122                         ) {
123                 return put (tid,status,"","",srcId,rsrcId,rsrcType,rqstBuf,respBuf,false,new NotificationEvent());
124         }
125
126         /**
127          * Put.
128          *
129          * @param tid the tid
130          * @param status the status
131          * @param rqstTm the rqst tm
132          * @param respTm the resp tm
133          * @param srcId the src id
134          * @param rsrcId the rsrc id
135          * @param rsrcType the rsrc type
136          * @param rqstBuf the rqst buf
137          * @param respBuf the resp buf
138          * @return the string
139          */
140         public String put(
141                         String tid,
142                         String status,
143                         String rqstTm,
144                         String respTm,
145                         String srcId,
146                         String rsrcId,
147                         String rsrcType,
148                         String rqstBuf,
149                         String respBuf
150                         ) { 
151                         return put (tid,status,"","",srcId,rsrcId,rsrcType,rqstBuf,respBuf,false,new NotificationEvent());
152         }
153         
154         /**
155          * Put.
156          *
157          * @param tid the tid
158          * @param status the status
159          * @param rqstTm the rqst tm
160          * @param respTm the resp tm
161          * @param srcId the src id
162          * @param rsrcId the rsrc id
163          * @param rsrcType the rsrc type
164          * @param rqstBuf the rqst buf
165          * @param respBuf the resp buf
166          * @param hasNotificationEvent the has notification event
167          * @param ne the ne
168          * @return the string
169          */
170         public String put(
171                         String tid,
172                         String status,
173                         String rqstTm,
174                         String respTm,
175                         String srcId,
176                         String rsrcId,
177                         String rsrcType,
178                         String rqstBuf,
179                         String respBuf, 
180                         boolean hasNotificationEvent,
181                         NotificationEvent ne
182                         ) {
183
184                 if (tid == null || "".equals(tid)) {
185                         Date date = new Date();
186                         DateFormat formatter = null;
187                         try {
188                                 formatter = new SimpleDateFormat(AAIConfig.get(AAIConstants.HBASE_TABLE_TIMESTAMP_FORMAT));
189                         } catch (Exception e) {
190                                 formatter = new SimpleDateFormat("YYYYMMdd-HH:mm:ss:SSS");
191                         }
192                         tm = formatter.format(date);
193                         tid = tm + "-";
194                 } 
195                 String htid = tid;
196                 
197                 //need to add a prefix for better hbase logging server balancing
198                 htid = HbaseSaltPrefixer.getInstance().prependSalt(htid);
199                 
200                 if (rqstTm == null || "".equals(rqstTm)) {
201                         rqstTm = tm;
202                 }
203
204                 if (respTm == null || "".equals(respTm)) {
205                         respTm = tm;
206                 }
207
208                 try {
209                         table = new HTable(config, AAIConfig.get(AAIConstants.HBASE_TABLE_NAME));
210
211                         Put p = new Put(Bytes.toBytes(htid));
212
213                         p.add(Bytes.toBytes("transaction"),Bytes.toBytes("tid"),Bytes.toBytes(tid));
214                         p.add(Bytes.toBytes("transaction"),Bytes.toBytes("status"),Bytes.toBytes(status));
215                         p.add(Bytes.toBytes("transaction"),Bytes.toBytes("rqstDate"),Bytes.toBytes(rqstTm));
216                         p.add(Bytes.toBytes("transaction"),Bytes.toBytes("respDate"),Bytes.toBytes(respTm));
217                         p.add(Bytes.toBytes("transaction"),Bytes.toBytes("sourceId"),Bytes.toBytes(srcId));
218
219                         p.add(Bytes.toBytes("resource"),Bytes.toBytes("resourceId"),Bytes.toBytes(rsrcId));
220                         p.add(Bytes.toBytes("resource"),Bytes.toBytes("resourceType"),Bytes.toBytes(rsrcType));
221
222                         p.add(Bytes.toBytes("payload"),Bytes.toBytes("rqstBuf"),Bytes.toBytes(rqstBuf));
223                         p.add(Bytes.toBytes("payload"),Bytes.toBytes("respBuf"),Bytes.toBytes(respBuf));
224
225                         if (hasNotificationEvent == true) { 
226                                 String eventType = ne.getEventHeader().getEventType();
227                                 String eventStatus = ne.getEventHeader().getStatus();
228                                 
229                                 if (eventStatus == null) { 
230                                         eventStatus = AAIConfig.get("aai.notificationEvent.default.status", "UNPROCESSED");
231                                 }
232                                 if (eventType == null) { 
233                                         eventType = AAIConfig.get("aai.notificationEvent.default.eventType", "AAI-EVENT");
234                                 }
235                                                         
236                                 if (ne.getEntity() != null) { 
237                                         PojoUtils pu = new PojoUtils();
238                                         p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationPayload"),Bytes.toBytes(pu.getJsonFromObject(ne)));
239                                 }
240                                 if (ne.getEventHeader().getId() != null) { 
241                                         p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationId"),Bytes.toBytes(ne.getEventHeader().getId()));
242                                 }       
243                                 
244                                 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationStatus"),Bytes.toBytes(eventStatus));
245                                 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationTopic"),Bytes.toBytes(eventType));
246                         
247                                 if (ne.getEventHeader().getEntityLink() != null) { 
248                                         p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationEntityLink"),Bytes.toBytes(ne.getEventHeader().getEntityLink()));
249                                 }
250                                 if (ne.getEventHeader().getAction() != null) {
251                                         p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationAction"),Bytes.toBytes(ne.getEventHeader().getAction())  );
252                                 }
253                         }
254                         /* Once you've adorned your Put instance with all the updates you want to
255                 make, to commit it do the following */
256                         table.put(p);
257                         table.flushCommits();
258                         table.close();
259                         return htid;
260                 } catch (Exception e) {
261                         LOGGER.warn("AAITxnLog: put: Exception", e);
262                         return htid;
263                 }
264         }
265
266         /**
267          * Gets the.
268          *
269          * @param htid the htid
270          * @return the transaction log entry
271          * @throws AAIException the AAI exception
272          */
273         public TransactionLogEntry get(String htid) throws AAIException {
274
275                 LOGGER.debug("In get: searching hbase config file...");
276                 String tidStr = "";
277                 TransactionLogEntry txObj = new TransactionLogEntry();
278
279                 try {
280                         table = new HTable(config, AAIConfig.get(AAIConstants.HBASE_TABLE_NAME));
281
282                         Get g = new Get(Bytes.toBytes(htid));
283
284                         Result r = table.get(g);
285                         byte [] tid = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("tid"));
286                         byte [] status = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("status"));
287                         byte [] rqstDate = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("rqstDate"));
288                         byte [] respDate = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("respDate"));
289                         byte [] sourceId = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("sourceId"));
290
291                         byte [] resourceId = r.getValue(Bytes.toBytes("resource"),Bytes.toBytes("resourceId"));
292                         byte [] resourceType = r.getValue(Bytes.toBytes("resource"),Bytes.toBytes("resourceType"));
293
294                         byte [] rqstBuf = r.getValue(Bytes.toBytes("payload"),Bytes.toBytes("rqstBuf"));
295                         byte [] respBuf = r.getValue(Bytes.toBytes("payload"),Bytes.toBytes("respBuf"));
296
297                         byte [] notificationPayload = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationPayload"));
298                         byte [] notificationStatus = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationStatus"));
299                         byte [] notificationId = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationId"));
300                         byte [] notificationTopic = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationTopic"));
301                         byte [] notificationEntityLink = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationEntityLink"));
302                         byte [] notificationAction = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationAction"));
303
304                         table.close();
305                         
306                         tidStr = Bytes.toString(tid);
307                         txObj.setTransactionLogEntryId(tidStr);
308                         txObj.setStatus(Bytes.toString(status));
309                         txObj.setRqstDate(Bytes.toString(rqstDate));
310                         txObj.setRespDate(Bytes.toString(respDate));
311                         txObj.setSourceId(Bytes.toString(sourceId));
312                         txObj.setResourceId(Bytes.toString(resourceId));
313                         txObj.setResourceType(Bytes.toString(resourceType));
314                         txObj.setRqstBuf(Bytes.toString(rqstBuf));
315                         txObj.setrespBuf(Bytes.toString(respBuf));
316                         txObj.setNotificationPayload(Bytes.toString(notificationPayload));
317                         txObj.setNotificationStatus(Bytes.toString(notificationStatus));
318                         txObj.setNotificationId(Bytes.toString(notificationId));
319                         txObj.setNotificationTopic(Bytes.toString(notificationTopic));
320                         txObj.setNotificationEntityLink(Bytes.toString(notificationEntityLink));
321                         txObj.setNotificationAction(Bytes.toString(notificationAction));
322                 } catch (IOException e) {
323                         LOGGER.error("IOException on hbase call", e);
324                         throw new AAIException("AAI_4000");
325                 }
326                 
327                 return txObj;
328         }
329
330         
331         /**
332          * Scan filtered.
333          *
334          * @param startMillis the start millis
335          * @param endMillis the end millis
336          * @param methodList the method list
337          * @param putFilter the put filter
338          * @param getFilter the get filter
339          * @param resourceFilter the resource filter
340          * @param fromAppIdFilter the from app id filter
341          * @return the transaction log entries
342          */
343         public TransactionLogEntries scanFiltered(long startMillis, long endMillis, List<String> methodList,
344                                               String putFilter, String getFilter, String resourceFilter, String fromAppIdFilter) {
345
346                 LOGGER.debug("Starting scanFiltered()");
347                 
348                 //              we should have the config ready from the constructor
349                 
350                 TransactionLogEntries txs = new TransactionLogEntries();
351                         
352                 if (config == null) {
353                         LOGGER.debug("in scan: can't create HBase configuration");
354                         return txs;
355                 }
356                 
357                 try {
358                         table = new HTable(config, AAIConfig.get(AAIConstants.HBASE_TABLE_NAME));
359                         Scan s = new Scan();
360                         FilterList flMaster = new FilterList(FilterList.Operator.MUST_PASS_ALL);
361                         FilterList methodflMaster = new FilterList(FilterList.Operator.MUST_PASS_ONE);
362                         if (methodList != null) { 
363                                 for (String method : methodList) { 
364                                         Filter filt = new SingleColumnValueFilter(Bytes.toBytes("resource"),
365                                                         Bytes.toBytes("resourceType"), CompareOp.EQUAL, Bytes.toBytes(method));
366                                         methodflMaster.addFilter(filt);
367                                 }
368                                 flMaster.addFilter(methodflMaster);
369                         }
370                         
371                         if (getFilter != null) { 
372                                 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("payload"),
373                                                 Bytes.toBytes("respBuf"), CompareOp.EQUAL, new RegexStringComparator(getFilter));
374                                 flMaster.addFilter(filt);
375                         }
376                         if (putFilter != null) { 
377                                 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("payload"),
378                                                 Bytes.toBytes("rqstBuf"), CompareOp.EQUAL, new RegexStringComparator(putFilter));
379                                 flMaster.addFilter(filt);
380                         }
381                         if (resourceFilter != null) { 
382                                 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("resource"),
383                                                 Bytes.toBytes("resourceId"), CompareOp.EQUAL, new RegexStringComparator(resourceFilter));
384                                 flMaster.addFilter(filt);
385                         }
386                         if (fromAppIdFilter != null) { 
387                                 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("transaction"),
388                                                 Bytes.toBytes("sourceId"), CompareOp.EQUAL, new RegexStringComparator("^" + fromAppIdFilter));
389                                 flMaster.addFilter(filt);
390                         }
391                         
392                         if (flMaster.hasFilterRow()) { 
393                                 s.setFilter(flMaster);
394                         }
395                         
396                         s.setTimeRange(startMillis, endMillis);
397                         ResultScanner scanner = table.getScanner(s);
398
399                         try {           
400                                 for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
401                                         
402                                         byte [] tid = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("tid"));
403                                         byte [] status = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("status"));
404                                         byte [] rqstDate = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("rqstDate"));
405                                         byte [] respDate = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("respDate"));
406                                         byte [] sourceId = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("sourceId"));
407
408                                         byte [] resourceId = rr.getValue(Bytes.toBytes("resource"),Bytes.toBytes("resourceId"));
409                                         byte [] resourceType = rr.getValue(Bytes.toBytes("resource"),Bytes.toBytes("resourceType"));
410
411                                         byte [] rqstBuf = rr.getValue(Bytes.toBytes("payload"),Bytes.toBytes("rqstBuf"));
412                                         byte [] respBuf = rr.getValue(Bytes.toBytes("payload"),Bytes.toBytes("respBuf"));
413                                         
414                                         byte [] notificationPayload = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationPayload"));
415                                         byte [] notificationStatus = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationStatus"));
416                                         byte [] notificationId = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationId"));
417                                         byte [] notificationTopic = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationTopic"));
418                                         byte [] notificationEntityLink = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationEntityLink"));
419                                         byte [] notificationAction = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationAction"));
420                                         TransactionLogEntry txObj = new TransactionLogEntry();
421                                         String tidStr = Bytes.toString(tid);
422                                         txObj.setTransactionLogEntryId(tidStr);
423                                         txObj.setStatus(Bytes.toString(status));
424                                         txObj.setRqstDate(Bytes.toString(rqstDate));
425                                         txObj.setRespDate(Bytes.toString(respDate));
426                                         txObj.setSourceId(Bytes.toString(sourceId));
427                                         txObj.setResourceId(Bytes.toString(resourceId));
428                                         txObj.setResourceType(Bytes.toString(resourceType));
429                                         txObj.setRqstBuf(Bytes.toString(rqstBuf));
430                                         txObj.setrespBuf(Bytes.toString(respBuf));
431                                         txObj.setNotificationPayload(Bytes.toString(notificationPayload));
432                                         txObj.setNotificationStatus(Bytes.toString(notificationStatus));
433                                         txObj.setNotificationId(Bytes.toString(notificationId));
434                                         txObj.setNotificationTopic(Bytes.toString(notificationTopic));
435                                         txObj.setNotificationEntityLink(Bytes.toString(notificationEntityLink));
436                                         txObj.setNotificationAction(Bytes.toString(notificationAction));
437                                         txs.getTransactionLogEntries().add(txObj);
438                                 }
439                         } finally {
440                                 // Make sure you close your scanners when you are done!
441                                 scanner.close();
442                         }
443                         table.close();
444                 } catch (Exception e) {
445                         LOGGER.warn("AAITxnLog: scan: Exception=" + e.toString());
446                 }
447                 
448                 return txs;
449         }
450
451         /**
452          * Scan.
453          *
454          * @param htid the htid
455          * @return the list
456          */
457         public List<String> scan(String htid) {
458
459                 List<String> list = new ArrayList<String>();
460                 LOGGER.debug("In scan: searching hbase config file...");
461                 //              we should have the config ready from the constructor
462                 if (config == null) {
463                         LOGGER.debug("in scan: can't create HBase configuration");
464                         return list;
465                 }
466
467                 try {
468                         table = new HTable(config, AAIConfig.get(AAIConstants.HBASE_TABLE_NAME));
469                         Scan s = new Scan(Bytes.toBytes(htid));
470                         ResultScanner scanner = table.getScanner(s);
471
472                         try {           
473                                 for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
474                                         list.add(rr.toString());
475                                         LOGGER.debug("in scan: Found row : " + rr);
476
477                                 }
478                         } finally {
479                                 // Make sure you close your scanners when you are done!
480                                 scanner.close();
481                         }
482                         table.close();
483                 } catch (Exception e) {
484
485                         LOGGER.debug("AAITxnLog: scan: Exception=" + e.toString());
486                 }
487                 return list;
488         }
489
490 }
491
492 /*
493 Need to implement HBase Connection Pooling in the future.
494 This is to reduce the 1 second delay during the first open of HConnection, and HTable instantiation. 
495 Hbase provides the Hconnection class and the HConnectionManager class. 
496 Both provifde the functionaltity similar to jdbc connection pooling 
497 to share pre-existing opened connections.
498 Here we should be able to use the getTable() method to get a 
499 reference to an HTable instance. 
500
501  */