Limit number of topics per mmagent whitelist
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / MirrorMakerService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.onap.dmaap
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dmaap.dbcapi.service;
22
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26
27
28
29
30
31
32
33
34
35 //import org.openecomp.dmaapbc.aaf.AndrewDecryptor;
36 import org.onap.dmaap.dbcapi.aaf.AafDecrypt;
37 import org.onap.dmaap.dbcapi.client.MrTopicConnection;
38 import org.onap.dmaap.dbcapi.database.DatabaseClass;
39 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
40 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
41 import org.onap.dmaap.dbcapi.model.ApiError;
42 import org.onap.dmaap.dbcapi.model.MR_Cluster;
43 import org.onap.dmaap.dbcapi.model.MirrorMaker;
44 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
45 import org.onap.dmaap.dbcapi.util.DmaapConfig;
46 import org.onap.dmaap.dbcapi.util.RandomInteger;
47
48 public class MirrorMakerService extends BaseLoggingClass {
49         
50         private Map<String, MirrorMaker> mirrors = DatabaseClass.getMirrorMakers();
51         private static MrTopicConnection prov;
52         private static AafDecrypt decryptor;
53         
54         private static String provUser;
55         private static String provUserPwd;
56         private static String defaultProducerPort;
57         private static String defaultConsumerPort;
58         private static String centralFqdn;
59         private int maxTopicsPerMM;
60         
61         public MirrorMakerService() {
62                 super();
63                 decryptor = new AafDecrypt();
64                 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
65                 provUser = p.getProperty("MM.ProvUserMechId");
66                 provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
67                 defaultProducerPort = p.getProperty( "MR.SourceReplicationPort", "9092");
68                 defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");       
69                 centralFqdn = p.getProperty("MR.CentralCname", "notSet");
70                 maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
71         }
72
73         // will create a MM on MMagent if needed
74         // will update the MMagent whitelist with all topics for this MM
75         public MirrorMaker updateMirrorMaker( MirrorMaker mm ) {
76                 logger.info( "updateMirrorMaker");
77         
78                 prov = new MrTopicConnection( provUser, provUserPwd );
79         
80                 DmaapService dmaap = new DmaapService();
81                 MR_ClusterService clusters = new MR_ClusterService();
82         
83                 // in 1610, MM should only exist for edge-to-central
84                 //  we use a cname for the central MR cluster that is active, and provision on agent topic on that target
85                 // but only send 1 message so MM Agents can read it relying on kafka delivery
86                 for( MR_Cluster central: clusters.getCentralClusters() ) {
87                         prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn  );
88                         ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
89                         if ( ! resp.is2xx() ) {
90         
91                                 errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
92                                 mm.setStatus(DmaapObject_Status.INVALID);
93                         } else {
94                                 prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
95                                 resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
96                                 if ( ! resp.is2xx()) {
97                                         errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
98                                         mm.setStatus(DmaapObject_Status.INVALID);
99                                 } else {
100                                         mm.setStatus(DmaapObject_Status.VALID);
101                                 }
102                         }
103                         
104                         // we only want to send one message even if there are multiple central clusters
105                         break;
106                 
107                 } 
108                 
109
110
111                 mm.setLastMod();
112                 return mirrors.put( mm.getMmName(), mm);
113         }
114         public MirrorMaker getMirrorMaker( String part1, String part2, int index ) {
115                 String targetPart;
116
117                 // original mm names did not have any index, so leave off index 0 for
118                 // backwards compatibility
119                 if ( index == 0 ) {
120                         targetPart = part2;
121                 } else {
122                         targetPart = part2 + "_" + index;
123                 }
124                 logger.info( "getMirrorMaker using " + part1 + " and " + targetPart );
125                 return mirrors.get(MirrorMaker.genKey(part1, targetPart));
126         }
127         public MirrorMaker getMirrorMaker( String part1, String part2 ) {
128                 logger.info( "getMirrorMaker using " + part1 + " and " + part2 );
129                 return mirrors.get(MirrorMaker.genKey(part1, part2));
130         }       
131         public MirrorMaker getMirrorMaker( String key ) {
132                 logger.info( "getMirrorMaker using " + key);
133                 return mirrors.get(key);
134         }
135         
136         
137         public void delMirrorMaker( MirrorMaker mm ) {
138                 logger.info( "delMirrorMaker");
139                 mirrors.remove(mm.getMmName());
140         }
141         
142         // TODO: this should probably return sequential values or get replaced by the MM client API
143         // but it should be sufficient for initial 1610 development
144         public static String genTransactionId() {
145                 RandomInteger ri = new RandomInteger(100000);
146             int randomInt = ri.next();
147             return Integer.toString(randomInt);
148         }
149         public List<String> getAllMirrorMakers() {
150                 List<String> ret = new ArrayList<String>();
151                 for( String key: mirrors.keySet()) {
152                         ret.add( key );
153                 }
154                 
155                 return ret;
156         }
157         
158         public MirrorMaker getNextMM( String source, String target ) {
159                 int i = 0;
160                 MirrorMaker mm = null;
161                 while( mm == null ) {
162                         
163                         mm = this.getMirrorMaker( source, target, i);
164                         if ( mm == null ) {
165                                 mm = new MirrorMaker(source, target, i);
166                         }
167                         if ( mm.getTopicCount() >= maxTopicsPerMM ) {
168                                 logger.info( "getNextMM: MM " + mm.getMmName() + " has " + mm.getTopicCount() + " topics.  Moving to next MM");
169                                 i++;
170                                 mm = null;
171                         }
172                 }
173          
174                 
175                 return mm;
176         }
177
178         public MirrorMaker splitMM( MirrorMaker orig ) {
179                 
180                 int index = 1;
181                 String source = orig.getSourceCluster();
182                 String target = orig.getTargetCluster();
183                 
184                 
185                 ArrayList<String> whitelist = orig.getTopics();
186                 while( whitelist.size() > maxTopicsPerMM ) {
187                         MirrorMaker mm = this.getNextMM( source, target );
188                         int last = whitelist.size() - 1;
189                         String topic = whitelist.get(last);
190                         whitelist.remove(last);
191                         mm.addTopic(topic);     
192                         this.updateMirrorMaker(mm);
193                 }
194                 
195                 orig.setTopics(whitelist);
196
197                 return orig;
198                 
199         }
200
201 }