6e8c4011f923d1cb93328f28a82a4e45c5e584dd
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.util.UUID;
25
26 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink
32  * regardless if it is UEB or DMaaP.
33  *
34  */
35 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
36
37     /**
38      * Loggers.
39      */
40     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
41     private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
42
43     /**
44      * The partition key to publish to.
45      */
46     protected String partitionId;
47
48     /**
49      * Message bus publisher.
50      */
51     protected BusPublisher publisher;
52
53     /**
54      * Constructor for abstract sink.
55      * @param busTopicParams contains below listed attributes
56      *     servers servers
57      *     topic topic
58      *     apiKey api secret
59      *     apiSecret api secret
60      *     partitionId partition id
61      *     useHttps does connection use HTTPS?
62      *     allowSelfSignedCerts are self-signed certificates allow     *
63      * @throws IllegalArgumentException in invalid parameters are passed in
64      */
65     public InlineBusTopicSink(BusTopicParams busTopicParams) {
66
67         super(busTopicParams);
68
69         if (busTopicParams.isPartitionIdInvalid()) {
70             this.partitionId = UUID.randomUUID().toString();
71         } else {
72             this.partitionId = busTopicParams.getPartitionId();
73         }
74     }
75
76     /**
77      * Initialize the Bus publisher.
78      */
79     public abstract void init();
80
81     /**
82      * {@inheritDoc}
83      */
84     @Override
85     public boolean start() {
86         logger.info("{}: starting", this);
87
88         synchronized (this) {
89
90             if (this.alive) {
91                 return true;
92             }
93
94             if (locked) {
95                 throw new IllegalStateException(this + " is locked.");
96             }
97
98             this.alive = true;
99         }
100
101         this.init();
102         return true;
103     }
104
105     /**
106      * {@inheritDoc}
107      */
108     @Override
109     public boolean stop() {
110
111         BusPublisher publisherCopy;
112         synchronized (this) {
113             this.alive = false;
114             publisherCopy = this.publisher;
115             this.publisher = null;
116         }
117
118         if (publisherCopy != null) {
119             try {
120                 publisherCopy.close();
121             } catch (Exception e) {
122                 logger.warn("{}: cannot stop publisher because of {}", this, e.getMessage(), e);
123             }
124         } else {
125             logger.warn("{}: there is no publisher", this);
126             return false;
127         }
128
129         return true;
130     }
131
132     /**
133      * {@inheritDoc}
134      */
135     @Override
136     public boolean send(String message) {
137
138         if (message == null || message.isEmpty()) {
139             throw new IllegalArgumentException("Message to send is empty");
140         }
141
142         if (!this.alive) {
143             throw new IllegalStateException(this + " is stopped");
144         }
145
146         try {
147             synchronized (this) {
148                 this.recentEvents.add(message);
149             }
150
151             netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(),
152                     message);
153
154             publisher.send(this.partitionId, message);
155             broadcast(message);
156         } catch (Exception e) {
157             logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
158             return false;
159         }
160
161         return true;
162     }
163
164
165     /**
166      * {@inheritDoc}
167      */
168     @Override
169     public void setPartitionKey(String partitionKey) {
170         this.partitionId = partitionKey;
171     }
172
173     /**
174      * {@inheritDoc}
175      */
176     @Override
177     public String getPartitionKey() {
178         return this.partitionId;
179     }
180
181     /**
182      * {@inheritDoc}
183      */
184     @Override
185     public void shutdown() {
186         this.stop();
187     }
188
189     @Override
190     protected boolean anyNullOrEmpty(String... args) {
191         for (String arg : args) {
192             if (arg == null || arg.isEmpty()) {
193                 return true;
194             }
195         }
196
197         return false;
198     }
199
200     @Override
201     protected boolean allNullOrEmpty(String... args) {
202         for (String arg : args) {
203             if (!(arg == null || arg.isEmpty())) {
204                 return false;
205             }
206         }
207
208         return true;
209     }
210
211
212     @Override
213     public String toString() {
214         return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]";
215     }
216 }