eed502268474372dbdb7c0b88e22f9663be08caa
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / TopicService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.onap.dmaap
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dmaap.dbcapi.service;
22
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28
29 import javax.ws.rs.core.Response.Status;
30
31 import org.onap.dmaap.dbcapi.aaf.AafService;
32 import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
33 import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType;
34 import org.onap.dmaap.dbcapi.database.DatabaseClass;
35 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
36 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
37 import org.onap.dmaap.dbcapi.model.ApiError;
38 import org.onap.dmaap.dbcapi.model.Dmaap;
39 import org.onap.dmaap.dbcapi.model.MR_Client;
40 import org.onap.dmaap.dbcapi.model.MR_Cluster;
41 import org.onap.dmaap.dbcapi.model.MirrorMaker;
42 import org.onap.dmaap.dbcapi.model.ReplicationType;
43 import org.onap.dmaap.dbcapi.model.Topic;
44 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
45 import org.onap.dmaap.dbcapi.util.DmaapConfig;
46 import org.onap.dmaap.dbcapi.util.Fqdn;
47 import org.onap.dmaap.dbcapi.util.Graph;
48
49 public class TopicService extends BaseLoggingClass {
50
51         
52
53         // REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122
54         private static String defaultGlobalMrHost;
55         
56         private Map<String, Topic> mr_topics = DatabaseClass.getTopics();
57         private Map<String, MR_Cluster> clusters = DatabaseClass.getMr_clusters();
58         
59         private static DmaapService dmaapSvc = new DmaapService();
60         private static Dmaap dmaap = new DmaapService().getDmaap();
61         private MR_ClientService clientService = new MR_ClientService();
62         private MirrorMakerService      bridge = new MirrorMakerService();
63
64         public TopicService(){
65                 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
66                 defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
67         }
68         
69         public Map<String, Topic> getTopics() {                 
70                 return mr_topics;
71         }
72                 
73         public List<Topic> getAllTopics() {
74                 ArrayList<Topic> topics = new ArrayList<Topic>(mr_topics.values());
75                 for( Topic topic: topics ) {
76                         topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
77                 }
78                 return topics;
79         }
80         
81                 
82         public Topic getTopic( String key, ApiError apiError ) {        
83                 logger.info( "getTopic: key=" + key);
84                 Topic t = mr_topics.get( key );
85                 if ( t == null ) {
86                         apiError.setCode(Status.NOT_FOUND.getStatusCode());
87                         apiError.setFields( "fqtn");
88                         apiError.setMessage("topic with fqtn " + key + " not found");
89                         return null;
90                 }
91                 t.setClients( clientService.getAllMrClients( key ));
92                 apiError.setCode(Status.OK.getStatusCode());
93                 return t;
94         }
95
96         public Topic addTopic( Topic topic, ApiError err ) {
97                 logger.info( "Entry: addTopic");
98                 logger.info( "Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle() );
99                 String nFqtn =  topic.genFqtn();
100                 logger.info( "FQTN=" + nFqtn );
101                 if ( getTopic( nFqtn, err ) != null ) {
102                         String t = "topic already exists: " + nFqtn;
103                         logger.info( t );
104                         err.setMessage( t );
105                         err.setFields( "fqtn");
106                         err.setCode(Status.CONFLICT.getStatusCode());
107                         return null;
108                 }
109                 err.reset();  // err filled with NOT_FOUND is expected case, but don't want to litter...
110
111                 topic.setFqtn( nFqtn );
112
113                 AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
114                 String t = dmaap.getTopicNsRoot() + "." + dmaap.getDmaapName() + ".mr.topic";
115                 String instance = ":topic." + topic.getFqtn();
116
117                 String[] actions = { "pub", "sub", "view" };
118                 for ( String action : actions ){
119                         DmaapPerm perm = new DmaapPerm( t, instance, action );
120                         int rc = aaf.addPerm( perm );
121                         if ( rc != 201 && rc != 409 ) {
122                                 err.setCode(500);
123                                 err.setMessage("Unexpected response from AAF:" + rc );
124                                 err.setFields("t="+t + " instance="+ instance + " action="+ action);
125                                 return null;
126                         }
127                 }
128                 if ( topic.getReplicationCase().involvesGlobal() ) {
129                         if ( topic.getGlobalMrURL() == null ) {
130                                 topic.setGlobalMrURL(defaultGlobalMrHost);
131                         }
132                         if ( ! Fqdn.isValid( topic.getGlobalMrURL())) {
133                                 logger.error( "GlobalMR FQDN not valid: " + topic.getGlobalMrURL());
134                                 topic.setStatus( DmaapObject_Status.INVALID);
135                                 err.setCode(500);
136                                 err.setMessage("Value is not a valid FQDN:" +  topic.getGlobalMrURL() );
137                                 err.setFields("globalMrURL");
138         
139                                 return null;
140                         }
141                 }
142
143
144                 if ( topic.getNumClients() > 0 ) {
145                         ArrayList<MR_Client> clients = new ArrayList<MR_Client>(topic.getClients());
146                 
147         
148                         ArrayList<MR_Client> clients2 = new ArrayList<MR_Client>();
149                         for ( Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) {
150                                 MR_Client c = it.next();
151
152                                 logger.info( "c fqtn=" + c.getFqtn() + " ID=" + c.getMrClientId() + " url=" + c.getTopicURL());
153                                 MR_Client nc = new MR_Client( c.getDcaeLocationName(), topic.getFqtn(), c.getClientRole(), c.getAction());
154                                 nc.setFqtn(topic.getFqtn());
155                                 logger.info( "nc fqtn=" + nc.getFqtn() + " ID=" + nc.getMrClientId() + " url=" + nc.getTopicURL());
156                                 clients2.add( clientService.addMr_Client(nc, topic, err));
157                                 if ( ! err.is2xx()) {
158                                         return null;
159                                 }
160                         }
161
162                         topic.setClients(clients2);
163                 }
164
165                 Topic ntopic = checkForBridge( topic, err );
166                 if ( ntopic == null ) {
167                         topic.setStatus( DmaapObject_Status.INVALID);
168                         return null;
169                 }
170
171                 mr_topics.put( nFqtn, ntopic );
172
173                 err.setCode(Status.OK.getStatusCode());
174                 return ntopic;
175         }
176         
177                 
178         public Topic updateTopic( Topic topic, ApiError err ) {
179                 logger.info( "Entry: updateTopic");
180                 if ( topic.getFqtn().isEmpty()) {
181                         return null;
182                 }
183                 Topic ntopic = checkForBridge( topic, err );
184                 if ( ntopic == null ) {
185                         topic.setStatus( DmaapObject_Status.INVALID);
186                         return null;
187                 }
188                 mr_topics.put( ntopic.getFqtn(), ntopic );
189                 err.setCode(Status.OK.getStatusCode());
190                 return ntopic;
191         }
192                 
193         public Topic removeTopic( String pubId, ApiError apiError ) {
194                 Topic topic = mr_topics.get(pubId);
195                 if ( topic == null ) {
196                         apiError.setCode(Status.NOT_FOUND.getStatusCode());
197                         apiError.setMessage("Topic " + pubId + " does not exist");
198                         apiError.setFields("fqtn");
199                         return null;
200                 }
201                 ArrayList<MR_Client> clients = new ArrayList<MR_Client>(clientService.getAllMrClients( pubId ));
202                 for ( Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) {
203                         MR_Client c = it.next();
204                         
205         
206                         clientService.removeMr_Client(c.getMrClientId(), false, apiError);
207                         if ( ! apiError.is2xx()) {
208                                 return null;
209                         }
210                 }
211                 apiError.setCode(Status.OK.getStatusCode());
212                 return mr_topics.remove(pubId);
213         }       
214         public static ApiError setBridgeClientPerms( MR_Cluster node ) {
215                 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
216                 String mmProvRole = p.getProperty("MM.ProvRole");
217                 String mmAgentRole = p.getProperty("MM.AgentRole");
218                 String[] Roles = { mmProvRole, mmAgentRole };
219                 String[] actions = { "view", "pub", "sub" };
220                 Topic bridgeAdminTopic = new Topic();
221                 bridgeAdminTopic.setTopicName( dmaapSvc.getBridgeAdminFqtn() );
222                 bridgeAdminTopic.setTopicDescription( "RESERVED topic for MirroMaker Provisioning");
223                 bridgeAdminTopic.setOwner( "DBC" );
224                 ArrayList<MR_Client> clients = new ArrayList<MR_Client>();
225                 for( String role: Roles ) {
226                         MR_Client client = new MR_Client();
227                         client.setAction(actions);
228                         client.setClientRole(role);
229                         client.setDcaeLocationName( node.getDcaeLocationName());
230                         clients.add( client );
231                 }
232                 bridgeAdminTopic.setClients(clients);
233                 
234                 TopicService ts = new TopicService();
235                 ApiError err = new ApiError();
236                 ts.addTopic(bridgeAdminTopic, err);
237                 
238                 if ( err.is2xx() || err.getCode() == 409 ){
239                         err.setCode(200);
240                         return err;
241                 }
242                 
243                 errorLogger.error( DmaapbcLogMessageEnum.TOPIC_CREATE_ERROR,  bridgeAdminTopic.getFqtn(), Integer.toString(err.getCode()), err.getFields(), err.getMessage());
244                 return err;
245         }       
246         
247         
248         public Topic checkForBridge( Topic topic, ApiError err ) {
249                 
250                 if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) {
251                         topic.setStatus( DmaapObject_Status.VALID);
252                         return topic;   
253                 }
254                 
255                 boolean anythingWrong = false;                          
256                 String centralFqdn = new String();
257                 Graph graph = new Graph( topic.getClients(), true );
258                 
259                 if ( graph.isHasCentral() ) {
260                         DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
261                         centralFqdn = p.getProperty("MR.CentralCname");
262                         logger.info( "CentralCname=" + centralFqdn );
263                 } else {
264                         logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no cental clients");
265                 }
266                 Collection<String> locations = graph.getKeys();
267                 for( String loc : locations ) {
268                         logger.info( "loc=" + loc );
269                         MR_Cluster cluster = clusters.get(loc);
270                         logger.info( "cluster=" + cluster );
271
272                         
273                                 
274                         String source = null;
275                         String target = null;
276                         /*
277                          * all replication rules have 1 bridge...
278                          */
279                         switch( topic.getReplicationCase() ) {
280                         case REPLICATION_EDGE_TO_CENTRAL:
281                         case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
282                                 if ( graph.isHasCentral() &&  graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
283                                         break;
284                                 }
285                                 source = cluster.getFqdn();
286                                 target = centralFqdn;
287                                 break;
288                         case REPLICATION_CENTRAL_TO_EDGE:
289                                 if ( graph.isHasCentral() &&  graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
290                                         continue;
291                                 }
292                                 source = centralFqdn;
293                                 target = cluster.getFqdn();
294                                 break;
295                         case REPLICATION_CENTRAL_TO_GLOBAL:
296                                 if ( graph.isHasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
297                                         continue;
298                                 }
299                                 source = centralFqdn;
300                                 target = topic.getGlobalMrURL();
301                                 break;
302                         case REPLICATION_GLOBAL_TO_CENTRAL:
303                         case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for G2C portion only
304                                 if ( graph.isHasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
305                                         continue;
306                                 }
307                                 source = topic.getGlobalMrURL();
308                                 target = centralFqdn;
309                                 break;
310                         default:
311                                 logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
312                                 anythingWrong = true;
313                                 continue;
314                         }
315                         if ( source != null && target != null ) {
316                                 try { 
317                                         logger.info( "Create a MM from " + source + " to " + target );
318                                         MirrorMaker mm = bridge.getMirrorMaker( source, target);
319                                         if ( mm == null ) {
320                                                 mm = new MirrorMaker(source, target);
321                                         }
322                                         mm.addTopic(topic.getFqtn());
323                                         bridge.updateMirrorMaker(mm);
324                                 } catch ( Exception ex ) {
325                                         err.setCode(500);
326                                         err.setFields( "mirror_maker.topic");
327                                         err.setMessage("Unexpected condition: " + ex );
328                                         anythingWrong = true;
329                                         break;
330                                 }
331                         }
332                         
333                         
334                         /*
335                          * some replication rules have a 2nd bridge!
336                          */
337                         source = target = null;
338                         switch( topic.getReplicationCase() ) {
339                         case REPLICATION_EDGE_TO_CENTRAL:
340                         case REPLICATION_CENTRAL_TO_EDGE:
341                         case REPLICATION_CENTRAL_TO_GLOBAL:
342                         case REPLICATION_GLOBAL_TO_CENTRAL:
343                                 continue;
344                         case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for C2G portion only
345                                 if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
346                                         continue;
347                                 }
348                                 source = centralFqdn;
349                                 target = topic.getGlobalMrURL();
350                                 break;
351         
352                         case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
353                                 if ( graph.isHasCentral() &&  graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
354                                         continue;
355                                 }
356                                 source = centralFqdn;
357                                 target = cluster.getFqdn();
358                                 break;
359                         default:
360                                 logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
361                                 anythingWrong = true;
362                                 break;
363                         }
364                         if ( source != null && target != null ) {
365                                 try { 
366                                         logger.info( "Create a MM from " + source + " to " + target );
367                                         MirrorMaker mm = bridge.getMirrorMaker( source, target);
368                                         if ( mm == null ) {
369                                                 mm = new MirrorMaker(source, target);
370                                         }
371                                         mm.addTopic(topic.getFqtn());
372                                         bridge.updateMirrorMaker(mm);
373                                 } catch ( Exception ex ) {
374                                         err.setCode(500);
375                                         err.setFields( "mirror_maker.topic");
376                                         err.setMessage("Unexpected condition: " + ex );
377                                         anythingWrong = true;
378                                         break;
379                                 }       
380                         }
381                         
382                 }
383                 if ( anythingWrong ) {
384                         topic.setStatus( DmaapObject_Status.INVALID);
385                         return null;
386                 }
387         
388                 topic.setStatus( DmaapObject_Status.VALID);
389                 return topic;
390         }
391         
392         /*
393          * Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
394          * This was determined automatically based on presence of edge publishers and central subscribers.
395          * The following method is a modification of that original logic, to preserve some backwards compatibility, 
396          * i.e. to be used when no ReplicationType is specified.
397          */
398         public ReplicationType reviewTopic( Topic topic ) {
399         
400                 
401                 if ( topic.getNumClients() > 1 ) {
402                         Graph graph = new Graph( topic.getClients(), false );
403                         
404                         String centralFqdn = new String();
405                         if ( graph.isHasCentral() ) {
406                                 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
407                                 centralFqdn = p.getProperty("MR.CentralCname");
408                         }
409
410                         Collection<String> locations = graph.getKeys();
411                         for( String loc : locations ) {
412                                 logger.info( "loc=" + loc );
413                                 MR_Cluster cluster = clusters.get(loc);
414                                 if ( cluster == null ) {
415                                         logger.info( "No MR cluster for location " + loc );
416                                         continue;
417                                 }
418                                 if ( graph.isHasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
419                                         logger.info( "Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn );
420                                         return ReplicationType.REPLICATION_EDGE_TO_CENTRAL;
421                                         
422                                 }
423                                 
424                         }
425                 }
426         
427                 return ReplicationType.REPLICATION_NONE;
428         }
429
430 }