2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.aai.util;
23 import com.att.eelf.configuration.EELFLogger;
24 import com.att.eelf.configuration.EELFManager;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.client.*;
27 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
28 import org.apache.hadoop.hbase.filter.Filter;
29 import org.apache.hadoop.hbase.filter.FilterList;
30 import org.apache.hadoop.hbase.filter.RegexStringComparator;
31 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.openecomp.aai.domain.notificationEvent.NotificationEvent;
34 import org.openecomp.aai.domain.translog.TransactionLogEntries;
35 import org.openecomp.aai.domain.translog.TransactionLogEntry;
36 import org.openecomp.aai.exceptions.AAIException;
38 import java.io.IOException;
39 import java.text.DateFormat;
40 import java.text.SimpleDateFormat;
41 import java.util.ArrayList;
42 import java.util.Date;
43 import java.util.List;
45 public class AAITxnLog {
47 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(AAITxnLog.class);
49 private final org.apache.hadoop.conf.Configuration config;
50 private HTable table = null;
51 private String tm = null;
54 * Instantiates a new AAI txn log.
56 * @param transId the trans id
57 * @param fromAppId the from app id
59 public AAITxnLog(String transId, String fromAppId) {
60 /* When you create a HBaseConfiguration, it reads in whatever you've set
61 into your hbase-site.xml and in hbase-default.xml, as long as these can
62 be found on the CLASSPATH */
64 config = HBaseConfiguration.create();
67 config.set(AAIConstants.ZOOKEEPER_ZNODE_PARENT, AAIConfig.get(AAIConstants.HBASE_ZOOKEEPER_ZNODE_PARENT));
68 config.set(AAIConstants.HBASE_CONFIGURATION_ZOOKEEPER_QUORUM, AAIConfig.get(AAIConstants.HBASE_CONFIGURATION_ZOOKEEPER_QUORUM));
69 config.set(AAIConstants.HBASE_CONFIGURATION_ZOOKEEPER_CLIENTPORT, AAIConfig.get(AAIConstants.HBASE_CONFIGURATION_ZOOKEEPER_CLIENTPORT));
71 Date date = new Date();
72 DateFormat formatter = new SimpleDateFormat(AAIConfig.get(AAIConstants.HBASE_TABLE_TIMESTAMP_FORMAT));
73 tm = formatter.format(date);
74 } catch (AAIException e) {
75 LOGGER.warn("Missing configuration in AAIConfig: " + e.getMessage());
82 * @param status the status
83 * @param srcId the src id
84 * @param rsrcId the rsrc id
85 * @param rsrcType the rsrc type
86 * @param rqstBuf the rqst buf
87 * @param respBuf the resp buf
98 return put ("",status,"","",srcId,rsrcId,rsrcType,rqstBuf,respBuf,false,new NotificationEvent());
106 * @param status the status
107 * @param srcId the src id
108 * @param rsrcId the rsrc id
109 * @param rsrcType the rsrc type
110 * @param rqstBuf the rqst buf
111 * @param respBuf the resp buf
123 return put (tid,status,"","",srcId,rsrcId,rsrcType,rqstBuf,respBuf,false,new NotificationEvent());
130 * @param status the status
131 * @param rqstTm the rqst tm
132 * @param respTm the resp tm
133 * @param srcId the src id
134 * @param rsrcId the rsrc id
135 * @param rsrcType the rsrc type
136 * @param rqstBuf the rqst buf
137 * @param respBuf the resp buf
151 return put (tid,status,"","",srcId,rsrcId,rsrcType,rqstBuf,respBuf,false,new NotificationEvent());
158 * @param status the status
159 * @param rqstTm the rqst tm
160 * @param respTm the resp tm
161 * @param srcId the src id
162 * @param rsrcId the rsrc id
163 * @param rsrcType the rsrc type
164 * @param rqstBuf the rqst buf
165 * @param respBuf the resp buf
166 * @param hasNotificationEvent the has notification event
180 boolean hasNotificationEvent,
184 if (tid == null || "".equals(tid)) {
185 Date date = new Date();
186 DateFormat formatter = null;
188 formatter = new SimpleDateFormat(AAIConfig.get(AAIConstants.HBASE_TABLE_TIMESTAMP_FORMAT));
189 } catch (Exception e) {
190 formatter = new SimpleDateFormat("YYYYMMdd-HH:mm:ss:SSS");
192 tm = formatter.format(date);
197 //need to add a prefix for better hbase logging server balancing
198 htid = HbaseSaltPrefixer.getInstance().prependSalt(htid);
200 if (rqstTm == null || "".equals(rqstTm)) {
204 if (respTm == null || "".equals(respTm)) {
209 table = new HTable(config, AAIConfig.get(AAIConstants.HBASE_TABLE_NAME));
211 Put p = new Put(Bytes.toBytes(htid));
213 p.add(Bytes.toBytes("transaction"),Bytes.toBytes("tid"),Bytes.toBytes(tid));
214 p.add(Bytes.toBytes("transaction"),Bytes.toBytes("status"),Bytes.toBytes(status));
215 p.add(Bytes.toBytes("transaction"),Bytes.toBytes("rqstDate"),Bytes.toBytes(rqstTm));
216 p.add(Bytes.toBytes("transaction"),Bytes.toBytes("respDate"),Bytes.toBytes(respTm));
217 p.add(Bytes.toBytes("transaction"),Bytes.toBytes("sourceId"),Bytes.toBytes(srcId));
219 p.add(Bytes.toBytes("resource"),Bytes.toBytes("resourceId"),Bytes.toBytes(rsrcId));
220 p.add(Bytes.toBytes("resource"),Bytes.toBytes("resourceType"),Bytes.toBytes(rsrcType));
222 p.add(Bytes.toBytes("payload"),Bytes.toBytes("rqstBuf"),Bytes.toBytes(rqstBuf));
223 p.add(Bytes.toBytes("payload"),Bytes.toBytes("respBuf"),Bytes.toBytes(respBuf));
225 if (hasNotificationEvent == true) {
226 String eventType = ne.getEventHeader().getEventType();
227 String eventStatus = ne.getEventHeader().getStatus();
229 if (eventStatus == null) {
230 eventStatus = AAIConfig.get("aai.notificationEvent.default.status", "UNPROCESSED");
232 if (eventType == null) {
233 eventType = AAIConfig.get("aai.notificationEvent.default.eventType", "AAI-EVENT");
236 if (ne.getEntity() != null) {
237 PojoUtils pu = new PojoUtils();
238 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationPayload"),Bytes.toBytes(pu.getJsonFromObject(ne)));
240 if (ne.getEventHeader().getId() != null) {
241 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationId"),Bytes.toBytes(ne.getEventHeader().getId()));
244 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationStatus"),Bytes.toBytes(eventStatus));
245 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationTopic"),Bytes.toBytes(eventType));
247 if (ne.getEventHeader().getEntityLink() != null) {
248 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationEntityLink"),Bytes.toBytes(ne.getEventHeader().getEntityLink()));
250 if (ne.getEventHeader().getAction() != null) {
251 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationAction"),Bytes.toBytes(ne.getEventHeader().getAction()) );
254 /* Once you've adorned your Put instance with all the updates you want to
255 make, to commit it do the following */
257 table.flushCommits();
260 } catch (Exception e) {
261 LOGGER.warn("AAITxnLog: put: Exception", e);
269 * @param htid the htid
270 * @return the transaction log entry
271 * @throws AAIException the AAI exception
273 public TransactionLogEntry get(String htid) throws AAIException {
275 LOGGER.debug("In get: searching hbase config file...");
277 TransactionLogEntry txObj = new TransactionLogEntry();
280 table = new HTable(config, AAIConfig.get(AAIConstants.HBASE_TABLE_NAME));
282 Get g = new Get(Bytes.toBytes(htid));
284 Result r = table.get(g);
285 byte [] tid = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("tid"));
286 byte [] status = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("status"));
287 byte [] rqstDate = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("rqstDate"));
288 byte [] respDate = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("respDate"));
289 byte [] sourceId = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("sourceId"));
291 byte [] resourceId = r.getValue(Bytes.toBytes("resource"),Bytes.toBytes("resourceId"));
292 byte [] resourceType = r.getValue(Bytes.toBytes("resource"),Bytes.toBytes("resourceType"));
294 byte [] rqstBuf = r.getValue(Bytes.toBytes("payload"),Bytes.toBytes("rqstBuf"));
295 byte [] respBuf = r.getValue(Bytes.toBytes("payload"),Bytes.toBytes("respBuf"));
297 byte [] notificationPayload = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationPayload"));
298 byte [] notificationStatus = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationStatus"));
299 byte [] notificationId = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationId"));
300 byte [] notificationTopic = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationTopic"));
301 byte [] notificationEntityLink = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationEntityLink"));
302 byte [] notificationAction = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationAction"));
306 tidStr = Bytes.toString(tid);
307 txObj.setTransactionLogEntryId(tidStr);
308 txObj.setStatus(Bytes.toString(status));
309 txObj.setRqstDate(Bytes.toString(rqstDate));
310 txObj.setRespDate(Bytes.toString(respDate));
311 txObj.setSourceId(Bytes.toString(sourceId));
312 txObj.setResourceId(Bytes.toString(resourceId));
313 txObj.setResourceType(Bytes.toString(resourceType));
314 txObj.setRqstBuf(Bytes.toString(rqstBuf));
315 txObj.setrespBuf(Bytes.toString(respBuf));
316 txObj.setNotificationPayload(Bytes.toString(notificationPayload));
317 txObj.setNotificationStatus(Bytes.toString(notificationStatus));
318 txObj.setNotificationId(Bytes.toString(notificationId));
319 txObj.setNotificationTopic(Bytes.toString(notificationTopic));
320 txObj.setNotificationEntityLink(Bytes.toString(notificationEntityLink));
321 txObj.setNotificationAction(Bytes.toString(notificationAction));
322 } catch (IOException e) {
323 LOGGER.error("IOException on hbase call", e);
324 throw new AAIException("AAI_4000");
334 * @param startMillis the start millis
335 * @param endMillis the end millis
336 * @param methodList the method list
337 * @param putFilter the put filter
338 * @param getFilter the get filter
339 * @param resourceFilter the resource filter
340 * @param fromAppIdFilter the from app id filter
341 * @return the transaction log entries
343 public TransactionLogEntries scanFiltered(long startMillis, long endMillis, List<String> methodList,
344 String putFilter, String getFilter, String resourceFilter, String fromAppIdFilter) {
346 LOGGER.debug("Starting scanFiltered()");
348 // we should have the config ready from the constructor
350 TransactionLogEntries txs = new TransactionLogEntries();
352 if (config == null) {
353 LOGGER.debug("in scan: can't create HBase configuration");
358 table = new HTable(config, AAIConfig.get(AAIConstants.HBASE_TABLE_NAME));
360 FilterList flMaster = new FilterList(FilterList.Operator.MUST_PASS_ALL);
361 FilterList methodflMaster = new FilterList(FilterList.Operator.MUST_PASS_ONE);
362 if (methodList != null) {
363 for (String method : methodList) {
364 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("resource"),
365 Bytes.toBytes("resourceType"), CompareOp.EQUAL, Bytes.toBytes(method));
366 methodflMaster.addFilter(filt);
368 flMaster.addFilter(methodflMaster);
371 if (getFilter != null) {
372 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("payload"),
373 Bytes.toBytes("respBuf"), CompareOp.EQUAL, new RegexStringComparator(getFilter));
374 flMaster.addFilter(filt);
376 if (putFilter != null) {
377 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("payload"),
378 Bytes.toBytes("rqstBuf"), CompareOp.EQUAL, new RegexStringComparator(putFilter));
379 flMaster.addFilter(filt);
381 if (resourceFilter != null) {
382 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("resource"),
383 Bytes.toBytes("resourceId"), CompareOp.EQUAL, new RegexStringComparator(resourceFilter));
384 flMaster.addFilter(filt);
386 if (fromAppIdFilter != null) {
387 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("transaction"),
388 Bytes.toBytes("sourceId"), CompareOp.EQUAL, new RegexStringComparator("^" + fromAppIdFilter));
389 flMaster.addFilter(filt);
392 if (flMaster.hasFilterRow()) {
393 s.setFilter(flMaster);
396 s.setTimeRange(startMillis, endMillis);
397 ResultScanner scanner = table.getScanner(s);
400 for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
402 byte [] tid = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("tid"));
403 byte [] status = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("status"));
404 byte [] rqstDate = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("rqstDate"));
405 byte [] respDate = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("respDate"));
406 byte [] sourceId = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("sourceId"));
408 byte [] resourceId = rr.getValue(Bytes.toBytes("resource"),Bytes.toBytes("resourceId"));
409 byte [] resourceType = rr.getValue(Bytes.toBytes("resource"),Bytes.toBytes("resourceType"));
411 byte [] rqstBuf = rr.getValue(Bytes.toBytes("payload"),Bytes.toBytes("rqstBuf"));
412 byte [] respBuf = rr.getValue(Bytes.toBytes("payload"),Bytes.toBytes("respBuf"));
414 byte [] notificationPayload = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationPayload"));
415 byte [] notificationStatus = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationStatus"));
416 byte [] notificationId = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationId"));
417 byte [] notificationTopic = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationTopic"));
418 byte [] notificationEntityLink = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationEntityLink"));
419 byte [] notificationAction = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationAction"));
420 TransactionLogEntry txObj = new TransactionLogEntry();
421 String tidStr = Bytes.toString(tid);
422 txObj.setTransactionLogEntryId(tidStr);
423 txObj.setStatus(Bytes.toString(status));
424 txObj.setRqstDate(Bytes.toString(rqstDate));
425 txObj.setRespDate(Bytes.toString(respDate));
426 txObj.setSourceId(Bytes.toString(sourceId));
427 txObj.setResourceId(Bytes.toString(resourceId));
428 txObj.setResourceType(Bytes.toString(resourceType));
429 txObj.setRqstBuf(Bytes.toString(rqstBuf));
430 txObj.setrespBuf(Bytes.toString(respBuf));
431 txObj.setNotificationPayload(Bytes.toString(notificationPayload));
432 txObj.setNotificationStatus(Bytes.toString(notificationStatus));
433 txObj.setNotificationId(Bytes.toString(notificationId));
434 txObj.setNotificationTopic(Bytes.toString(notificationTopic));
435 txObj.setNotificationEntityLink(Bytes.toString(notificationEntityLink));
436 txObj.setNotificationAction(Bytes.toString(notificationAction));
437 txs.getTransactionLogEntries().add(txObj);
440 // Make sure you close your scanners when you are done!
444 } catch (Exception e) {
445 LOGGER.warn("AAITxnLog: scan: Exception=" + e.toString());
454 * @param htid the htid
457 public List<String> scan(String htid) {
459 List<String> list = new ArrayList<String>();
460 LOGGER.debug("In scan: searching hbase config file...");
461 // we should have the config ready from the constructor
462 if (config == null) {
463 LOGGER.debug("in scan: can't create HBase configuration");
468 table = new HTable(config, AAIConfig.get(AAIConstants.HBASE_TABLE_NAME));
469 Scan s = new Scan(Bytes.toBytes(htid));
470 ResultScanner scanner = table.getScanner(s);
473 for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
474 list.add(rr.toString());
475 LOGGER.debug("in scan: Found row : " + rr);
479 // Make sure you close your scanners when you are done!
483 } catch (Exception e) {
485 LOGGER.debug("AAITxnLog: scan: Exception=" + e.toString());
493 Need to implement HBase Connection Pooling in the future.
494 This is to reduce the 1 second delay during the first open of HConnection, and HTable instantiation.
495 Hbase provides the Hconnection class and the HConnectionManager class.
496 Both provifde the functionaltity similar to jdbc connection pooling
497 to share pre-existing opened connections.
498 Here we should be able to use the getTable() method to get a
499 reference to an HTable instance.