2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.aai.util;
23 import java.io.IOException;
24 import java.text.DateFormat;
25 import java.text.SimpleDateFormat;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.List;
30 import org.apache.hadoop.hbase.HBaseConfiguration;
31 import org.apache.hadoop.hbase.client.Get;
32 import org.apache.hadoop.hbase.client.HTable;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.client.ResultScanner;
36 import org.apache.hadoop.hbase.client.Scan;
37 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
38 import org.apache.hadoop.hbase.filter.Filter;
39 import org.apache.hadoop.hbase.filter.FilterList;
40 import org.apache.hadoop.hbase.filter.RegexStringComparator;
41 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
42 import org.apache.hadoop.hbase.util.Bytes;
44 import org.openecomp.aai.domain.notificationEvent.NotificationEvent;
45 import org.openecomp.aai.domain.translog.TransactionLogEntries;
46 import org.openecomp.aai.domain.translog.TransactionLogEntry;
47 import org.openecomp.aai.exceptions.AAIException;
48 import com.att.eelf.configuration.EELFLogger;
49 import com.att.eelf.configuration.EELFManager;
51 public class AAITxnLog {
53 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(AAITxnLog.class);
55 private final org.apache.hadoop.conf.Configuration config;
56 private HTable table = null;
57 private String tm = null;
60 * Instantiates a new AAI txn log.
62 * @param transId the trans id
63 * @param fromAppId the from app id
65 public AAITxnLog(String transId, String fromAppId) {
66 /* When you create a HBaseConfiguration, it reads in whatever you've set
67 into your hbase-site.xml and in hbase-default.xml, as long as these can
68 be found on the CLASSPATH */
70 config = HBaseConfiguration.create();
73 config.set(AAIConstants.ZOOKEEPER_ZNODE_PARENT, AAIConfig.get(AAIConstants.HBASE_ZOOKEEPER_ZNODE_PARENT));
74 config.set(AAIConstants.HBASE_CONFIGURATION_ZOOKEEPER_QUORUM, AAIConfig.get(AAIConstants.HBASE_CONFIGURATION_ZOOKEEPER_QUORUM));
75 config.set(AAIConstants.HBASE_CONFIGURATION_ZOOKEEPER_CLIENTPORT, AAIConfig.get(AAIConstants.HBASE_CONFIGURATION_ZOOKEEPER_CLIENTPORT));
77 Date date = new Date();
78 DateFormat formatter = new SimpleDateFormat(AAIConfig.get(AAIConstants.HBASE_TABLE_TIMESTAMP_FORMAT));
79 tm = formatter.format(date);
80 } catch (AAIException e) {
81 LOGGER.warn("Missing configuration in AAIConfig: " + e.getMessage());
88 * @param status the status
89 * @param srcId the src id
90 * @param rsrcId the rsrc id
91 * @param rsrcType the rsrc type
92 * @param rqstBuf the rqst buf
93 * @param respBuf the resp buf
104 return put ("",status,"","",srcId,rsrcId,rsrcType,rqstBuf,respBuf,false,new NotificationEvent());
112 * @param status the status
113 * @param srcId the src id
114 * @param rsrcId the rsrc id
115 * @param rsrcType the rsrc type
116 * @param rqstBuf the rqst buf
117 * @param respBuf the resp buf
129 return put (tid,status,"","",srcId,rsrcId,rsrcType,rqstBuf,respBuf,false,new NotificationEvent());
136 * @param status the status
137 * @param rqstTm the rqst tm
138 * @param respTm the resp tm
139 * @param srcId the src id
140 * @param rsrcId the rsrc id
141 * @param rsrcType the rsrc type
142 * @param rqstBuf the rqst buf
143 * @param respBuf the resp buf
157 return put (tid,status,"","",srcId,rsrcId,rsrcType,rqstBuf,respBuf,false,new NotificationEvent());
164 * @param status the status
165 * @param rqstTm the rqst tm
166 * @param respTm the resp tm
167 * @param srcId the src id
168 * @param rsrcId the rsrc id
169 * @param rsrcType the rsrc type
170 * @param rqstBuf the rqst buf
171 * @param respBuf the resp buf
172 * @param hasNotificationEvent the has notification event
186 boolean hasNotificationEvent,
190 if (tid == null || "".equals(tid)) {
191 Date date = new Date();
192 DateFormat formatter = null;
194 formatter = new SimpleDateFormat(AAIConfig.get(AAIConstants.HBASE_TABLE_TIMESTAMP_FORMAT));
195 } catch (Exception e) {
196 formatter = new SimpleDateFormat("YYYYMMdd-HH:mm:ss:SSS");
198 tm = formatter.format(date);
203 //need to add a prefix for better hbase logging server balancing
204 htid = HbaseSaltPrefixer.getInstance().prependSalt(htid);
206 if (rqstTm == null || "".equals(rqstTm)) {
210 if (respTm == null || "".equals(respTm)) {
215 table = new HTable(config, AAIConfig.get(AAIConstants.HBASE_TABLE_NAME));
217 Put p = new Put(Bytes.toBytes(htid));
219 p.add(Bytes.toBytes("transaction"),Bytes.toBytes("tid"),Bytes.toBytes(tid));
220 p.add(Bytes.toBytes("transaction"),Bytes.toBytes("status"),Bytes.toBytes(status));
221 p.add(Bytes.toBytes("transaction"),Bytes.toBytes("rqstDate"),Bytes.toBytes(rqstTm));
222 p.add(Bytes.toBytes("transaction"),Bytes.toBytes("respDate"),Bytes.toBytes(respTm));
223 p.add(Bytes.toBytes("transaction"),Bytes.toBytes("sourceId"),Bytes.toBytes(srcId));
225 p.add(Bytes.toBytes("resource"),Bytes.toBytes("resourceId"),Bytes.toBytes(rsrcId));
226 p.add(Bytes.toBytes("resource"),Bytes.toBytes("resourceType"),Bytes.toBytes(rsrcType));
228 p.add(Bytes.toBytes("payload"),Bytes.toBytes("rqstBuf"),Bytes.toBytes(rqstBuf));
229 p.add(Bytes.toBytes("payload"),Bytes.toBytes("respBuf"),Bytes.toBytes(respBuf));
231 if (hasNotificationEvent == true) {
232 String eventType = ne.getEventHeader().getEventType();
233 String eventStatus = ne.getEventHeader().getStatus();
235 if (eventStatus == null) {
236 eventStatus = AAIConfig.get("aai.notificationEvent.default.status", "UNPROCESSED");
238 if (eventType == null) {
239 eventType = AAIConfig.get("aai.notificationEvent.default.eventType", "AAI-EVENT");
242 if (ne.getEntity() != null) {
243 PojoUtils pu = new PojoUtils();
244 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationPayload"),Bytes.toBytes(pu.getJsonFromObject(ne)));
246 if (ne.getEventHeader().getId() != null) {
247 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationId"),Bytes.toBytes(ne.getEventHeader().getId()));
250 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationStatus"),Bytes.toBytes(eventStatus));
251 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationTopic"),Bytes.toBytes(eventType));
253 if (ne.getEventHeader().getEntityLink() != null) {
254 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationEntityLink"),Bytes.toBytes(ne.getEventHeader().getEntityLink()));
256 if (ne.getEventHeader().getAction() != null) {
257 p.add(Bytes.toBytes("notification"),Bytes.toBytes("notificationAction"),Bytes.toBytes(ne.getEventHeader().getAction()) );
260 /* Once you've adorned your Put instance with all the updates you want to
261 make, to commit it do the following */
263 table.flushCommits();
266 } catch (Exception e) {
267 LOGGER.warn("AAITxnLog: put: Exception", e);
275 * @param htid the htid
276 * @return the transaction log entry
277 * @throws AAIException the AAI exception
279 public TransactionLogEntry get(String htid) throws AAIException {
281 LOGGER.debug("In get: searching hbase config file...");
283 TransactionLogEntry txObj = new TransactionLogEntry();
286 table = new HTable(config, AAIConfig.get(AAIConstants.HBASE_TABLE_NAME));
288 Get g = new Get(Bytes.toBytes(htid));
290 Result r = table.get(g);
291 byte [] tid = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("tid"));
292 byte [] status = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("status"));
293 byte [] rqstDate = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("rqstDate"));
294 byte [] respDate = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("respDate"));
295 byte [] sourceId = r.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("sourceId"));
297 byte [] resourceId = r.getValue(Bytes.toBytes("resource"),Bytes.toBytes("resourceId"));
298 byte [] resourceType = r.getValue(Bytes.toBytes("resource"),Bytes.toBytes("resourceType"));
300 byte [] rqstBuf = r.getValue(Bytes.toBytes("payload"),Bytes.toBytes("rqstBuf"));
301 byte [] respBuf = r.getValue(Bytes.toBytes("payload"),Bytes.toBytes("respBuf"));
303 byte [] notificationPayload = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationPayload"));
304 byte [] notificationStatus = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationStatus"));
305 byte [] notificationId = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationId"));
306 byte [] notificationTopic = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationTopic"));
307 byte [] notificationEntityLink = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationEntityLink"));
308 byte [] notificationAction = r.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationAction"));
312 tidStr = Bytes.toString(tid);
313 txObj.setTransactionLogEntryId(tidStr);
314 txObj.setStatus(Bytes.toString(status));
315 txObj.setRqstDate(Bytes.toString(rqstDate));
316 txObj.setRespDate(Bytes.toString(respDate));
317 txObj.setSourceId(Bytes.toString(sourceId));
318 txObj.setResourceId(Bytes.toString(resourceId));
319 txObj.setResourceType(Bytes.toString(resourceType));
320 txObj.setRqstBuf(Bytes.toString(rqstBuf));
321 txObj.setrespBuf(Bytes.toString(respBuf));
322 txObj.setNotificationPayload(Bytes.toString(notificationPayload));
323 txObj.setNotificationStatus(Bytes.toString(notificationStatus));
324 txObj.setNotificationId(Bytes.toString(notificationId));
325 txObj.setNotificationTopic(Bytes.toString(notificationTopic));
326 txObj.setNotificationEntityLink(Bytes.toString(notificationEntityLink));
327 txObj.setNotificationAction(Bytes.toString(notificationAction));
328 } catch (IOException e) {
329 LOGGER.error("IOException on hbase call", e);
330 throw new AAIException("AAI_4000");
340 * @param startMillis the start millis
341 * @param endMillis the end millis
342 * @param methodList the method list
343 * @param putFilter the put filter
344 * @param getFilter the get filter
345 * @param resourceFilter the resource filter
346 * @param fromAppIdFilter the from app id filter
347 * @return the transaction log entries
349 public TransactionLogEntries scanFiltered(long startMillis, long endMillis, List<String> methodList,
350 String putFilter, String getFilter, String resourceFilter, String fromAppIdFilter) {
352 LOGGER.debug("Starting scanFiltered()");
354 // we should have the config ready from the constructor
356 TransactionLogEntries txs = new TransactionLogEntries();
358 if (config == null) {
359 LOGGER.debug("in scan: can't create HBase configuration");
364 table = new HTable(config, AAIConfig.get(AAIConstants.HBASE_TABLE_NAME));
366 FilterList flMaster = new FilterList(FilterList.Operator.MUST_PASS_ALL);
367 FilterList methodflMaster = new FilterList(FilterList.Operator.MUST_PASS_ONE);
368 if (methodList != null) {
369 for (String method : methodList) {
370 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("resource"),
371 Bytes.toBytes("resourceType"), CompareOp.EQUAL, Bytes.toBytes(method));
372 methodflMaster.addFilter(filt);
374 flMaster.addFilter(methodflMaster);
377 if (getFilter != null) {
378 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("payload"),
379 Bytes.toBytes("respBuf"), CompareOp.EQUAL, new RegexStringComparator(getFilter));
380 flMaster.addFilter(filt);
382 if (putFilter != null) {
383 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("payload"),
384 Bytes.toBytes("rqstBuf"), CompareOp.EQUAL, new RegexStringComparator(putFilter));
385 flMaster.addFilter(filt);
387 if (resourceFilter != null) {
388 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("resource"),
389 Bytes.toBytes("resourceId"), CompareOp.EQUAL, new RegexStringComparator(resourceFilter));
390 flMaster.addFilter(filt);
392 if (fromAppIdFilter != null) {
393 Filter filt = new SingleColumnValueFilter(Bytes.toBytes("transaction"),
394 Bytes.toBytes("sourceId"), CompareOp.EQUAL, new RegexStringComparator("^" + fromAppIdFilter));
395 flMaster.addFilter(filt);
398 if (flMaster.hasFilterRow()) {
399 s.setFilter(flMaster);
402 s.setTimeRange(startMillis, endMillis);
403 ResultScanner scanner = table.getScanner(s);
406 for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
408 byte [] tid = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("tid"));
409 byte [] status = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("status"));
410 byte [] rqstDate = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("rqstDate"));
411 byte [] respDate = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("respDate"));
412 byte [] sourceId = rr.getValue(Bytes.toBytes("transaction"),Bytes.toBytes("sourceId"));
414 byte [] resourceId = rr.getValue(Bytes.toBytes("resource"),Bytes.toBytes("resourceId"));
415 byte [] resourceType = rr.getValue(Bytes.toBytes("resource"),Bytes.toBytes("resourceType"));
417 byte [] rqstBuf = rr.getValue(Bytes.toBytes("payload"),Bytes.toBytes("rqstBuf"));
418 byte [] respBuf = rr.getValue(Bytes.toBytes("payload"),Bytes.toBytes("respBuf"));
420 byte [] notificationPayload = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationPayload"));
421 byte [] notificationStatus = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationStatus"));
422 byte [] notificationId = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationId"));
423 byte [] notificationTopic = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationTopic"));
424 byte [] notificationEntityLink = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationEntityLink"));
425 byte [] notificationAction = rr.getValue(Bytes.toBytes("notification"),Bytes.toBytes("notificationAction"));
426 TransactionLogEntry txObj = new TransactionLogEntry();
427 String tidStr = Bytes.toString(tid);
428 txObj.setTransactionLogEntryId(tidStr);
429 txObj.setStatus(Bytes.toString(status));
430 txObj.setRqstDate(Bytes.toString(rqstDate));
431 txObj.setRespDate(Bytes.toString(respDate));
432 txObj.setSourceId(Bytes.toString(sourceId));
433 txObj.setResourceId(Bytes.toString(resourceId));
434 txObj.setResourceType(Bytes.toString(resourceType));
435 txObj.setRqstBuf(Bytes.toString(rqstBuf));
436 txObj.setrespBuf(Bytes.toString(respBuf));
437 txObj.setNotificationPayload(Bytes.toString(notificationPayload));
438 txObj.setNotificationStatus(Bytes.toString(notificationStatus));
439 txObj.setNotificationId(Bytes.toString(notificationId));
440 txObj.setNotificationTopic(Bytes.toString(notificationTopic));
441 txObj.setNotificationEntityLink(Bytes.toString(notificationEntityLink));
442 txObj.setNotificationAction(Bytes.toString(notificationAction));
443 txs.getTransactionLogEntries().add(txObj);
446 // Make sure you close your scanners when you are done!
450 } catch (Exception e) {
451 LOGGER.warn("AAITxnLog: scan: Exception=" + e.toString());
460 * @param htid the htid
463 public List<String> scan(String htid) {
465 List<String> list = new ArrayList<String>();
466 LOGGER.debug("In scan: searching hbase config file...");
467 // we should have the config ready from the constructor
468 if (config == null) {
469 LOGGER.debug("in scan: can't create HBase configuration");
474 table = new HTable(config, AAIConfig.get(AAIConstants.HBASE_TABLE_NAME));
475 Scan s = new Scan(Bytes.toBytes(htid));
476 ResultScanner scanner = table.getScanner(s);
479 for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
480 list.add(rr.toString());
481 LOGGER.debug("in scan: Found row : " + rr);
485 // Make sure you close your scanners when you are done!
489 } catch (Exception e) {
491 LOGGER.debug("AAITxnLog: scan: Exception=" + e.toString());
499 Need to implement HBase Connection Pooling in the future.
500 This is to reduce the 1 second delay during the first open of HConnection, and HTable instantiation.
501 Hbase provides the Hconnection class and the HConnectionManager class.
502 Both provifde the functionaltity similar to jdbc connection pooling
503 to share pre-existing opened connections.
504 Here we should be able to use the getTable() method to get a
505 reference to an HTable instance.