2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
 
   6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
 
   7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
 
   8 * ================================================================================
 
   9  * Licensed under the Apache License, Version 2.0 (the "License");
 
  10  * you may not use this file except in compliance with the License.
 
  11  * You may obtain a copy of the License at
 
  13  *      http://www.apache.org/licenses/LICENSE-2.0
 
  15  * Unless required by applicable law or agreed to in writing, software
 
  16  * distributed under the License is distributed on an "AS IS" BASIS,
 
  17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  18  * See the License for the specific language governing permissions and
 
  19  * limitations under the License.
 
  20  * ============LICENSE_END=========================================================
 
  23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
 
  25 import java.util.UUID;
 
  28 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink;
 
  29 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
 
  30 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 
  31 import org.slf4j.Logger;
 
  32 import org.slf4j.LoggerFactory;
 
  35  * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink
 
  36  * regardless if it is UEB or DMaaP.
 
  39 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
 
  44     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
 
  47      * The partition key to publish to.
 
  51     protected String partitionKey;
 
  54      * Message bus publisher.
 
  56     protected BusPublisher publisher;
 
  59      * Constructor for abstract sink.
 
  60      * @param busTopicParams contains below listed attributes
 
  64      *     apiSecret api secret
 
  65      *     partitionId partition id
 
  66      *     useHttps does connection use HTTPS?
 
  67      *     allowSelfSignedCerts are self-signed certificates allow     *
 
  68      * @throws IllegalArgumentException if invalid parameters are passed in
 
  70     protected InlineBusTopicSink(BusTopicParams busTopicParams) {
 
  72         super(busTopicParams);
 
  74         if (busTopicParams.isPartitionIdInvalid()) {
 
  75             this.partitionKey = UUID.randomUUID().toString();
 
  77             this.partitionKey = busTopicParams.getPartitionId();
 
  82      * Initialize the Bus publisher.
 
  84     public abstract void init();
 
  87     public boolean start() {
 
  88         logger.info("{}: starting", this);
 
  93                     throw new IllegalStateException(this + " is locked.");
 
 105     public boolean stop() {
 
 107         BusPublisher publisherCopy;
 
 108         synchronized (this) {
 
 110             publisherCopy = this.publisher;
 
 111             this.publisher = null;
 
 114         if (publisherCopy != null) {
 
 116                 publisherCopy.close();
 
 117             } catch (Exception e) {
 
 118                 logger.warn("{}: cannot stop publisher because of {}", this, e.getMessage(), e);
 
 121             logger.warn("{}: there is no publisher", this);
 
 129     public boolean send(String message) {
 
 131         if (message == null || message.isEmpty()) {
 
 132             throw new IllegalArgumentException("Message to send is empty");
 
 136             throw new IllegalStateException(this + " is stopped");
 
 140             synchronized (this) {
 
 141                 this.recentEvents.add(message);
 
 144             NetLoggerUtil.log(EventType.OUT, this.getTopicCommInfrastructure(), this.topic, message);
 
 146             publisher.send(this.partitionKey, message);
 
 148         } catch (Exception e) {
 
 149             logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
 
 157     public void shutdown() {
 
 162     protected boolean anyNullOrEmpty(String... args) {
 
 163         for (String arg : args) {
 
 164             if (arg == null || arg.isEmpty()) {
 
 173     protected boolean allNullOrEmpty(String... args) {
 
 174         for (String arg : args) {
 
 175             if (!(arg == null || arg.isEmpty())) {
 
 184     public String toString() {
 
 185         return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher