00bd9e3a94f13ee9118c33c25bda4114598d37f1
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7 * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.util.UUID;
25 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink;
26 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
27 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink
33  * regardless if it is UEB or DMaaP.
34  *
35  */
36 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
37
38     /**
39      * Loggers.
40      */
41     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
42
43     /**
44      * The partition key to publish to.
45      */
46     protected String partitionId;
47
48     /**
49      * Message bus publisher.
50      */
51     protected BusPublisher publisher;
52
53     /**
54      * Constructor for abstract sink.
55      * @param busTopicParams contains below listed attributes
56      *     servers servers
57      *     topic topic
58      *     apiKey api secret
59      *     apiSecret api secret
60      *     partitionId partition id
61      *     useHttps does connection use HTTPS?
62      *     allowSelfSignedCerts are self-signed certificates allow     *
63      * @throws IllegalArgumentException in invalid parameters are passed in
64      */
65     public InlineBusTopicSink(BusTopicParams busTopicParams) {
66
67         super(busTopicParams);
68
69         if (busTopicParams.isPartitionIdInvalid()) {
70             this.partitionId = UUID.randomUUID().toString();
71         } else {
72             this.partitionId = busTopicParams.getPartitionId();
73         }
74     }
75
76     /**
77      * Initialize the Bus publisher.
78      */
79     public abstract void init();
80
81     @Override
82     public boolean start() {
83         logger.info("{}: starting", this);
84
85         synchronized (this) {
86             if (!this.alive) {
87                 if (locked) {
88                     throw new IllegalStateException(this + " is locked.");
89                 }
90
91                 this.init();
92                 this.alive = true;
93             }
94         }
95
96         return true;
97     }
98
99     @Override
100     public boolean stop() {
101
102         BusPublisher publisherCopy;
103         synchronized (this) {
104             this.alive = false;
105             publisherCopy = this.publisher;
106             this.publisher = null;
107         }
108
109         if (publisherCopy != null) {
110             try {
111                 publisherCopy.close();
112             } catch (Exception e) {
113                 logger.warn("{}: cannot stop publisher because of {}", this, e.getMessage(), e);
114             }
115         } else {
116             logger.warn("{}: there is no publisher", this);
117             return false;
118         }
119
120         return true;
121     }
122
123     @Override
124     public boolean send(String message) {
125
126         if (message == null || message.isEmpty()) {
127             throw new IllegalArgumentException("Message to send is empty");
128         }
129
130         if (!this.alive) {
131             throw new IllegalStateException(this + " is stopped");
132         }
133
134         try {
135             synchronized (this) {
136                 this.recentEvents.add(message);
137             }
138
139             NetLoggerUtil.log(EventType.OUT, this.getTopicCommInfrastructure(), this.topic, message);
140
141             publisher.send(this.partitionId, message);
142             broadcast(message);
143         } catch (Exception e) {
144             logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
145             return false;
146         }
147
148         return true;
149     }
150
151     @Override
152     public void setPartitionKey(String partitionKey) {
153         this.partitionId = partitionKey;
154     }
155
156     @Override
157     public String getPartitionKey() {
158         return this.partitionId;
159     }
160
161     @Override
162     public void shutdown() {
163         this.stop();
164     }
165
166     @Override
167     protected boolean anyNullOrEmpty(String... args) {
168         for (String arg : args) {
169             if (arg == null || arg.isEmpty()) {
170                 return true;
171             }
172         }
173
174         return false;
175     }
176
177     @Override
178     protected boolean allNullOrEmpty(String... args) {
179         for (String arg : args) {
180             if (!(arg == null || arg.isEmpty())) {
181                 return false;
182             }
183         }
184
185         return true;
186     }
187
188     @Override
189     public String toString() {
190         return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]";
191     }
192 }