2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.dmaap.dbcapi.database;
26 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
27 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
28 import org.onap.dmaap.dbcapi.model.*;
29 import org.onap.dmaap.dbcapi.util.DmaapConfig;
30 import org.onap.dmaap.dbcapi.util.Singleton;
35 public class DatabaseClass extends BaseLoggingClass {
37 private static Singleton<Dmaap> dmaap;
38 private static Map<String, DcaeLocation> dcaeLocations;
39 private static Map<String, DR_Node> dr_nodes;
40 private static Map<String, DR_Pub> dr_pubs;
41 private static Map<String, DR_Sub> dr_subs;
42 private static Map<String, MR_Client> mr_clients;
43 private static Map<String, MR_Cluster> mr_clusters;
44 private static Map<String, Feed> feeds;
45 private static Map<String, Topic> topics;
46 private static Map<String, MirrorMaker> mirrors;
48 private static long lastTime = 0L;
50 private static class MirrorVectorHandler implements DBFieldHandler.SqlOp {
51 public Object get(ResultSet rs, int index) throws Exception {
52 String val = rs.getString(index);
56 Set<ReplicationVector> rv = new HashSet<ReplicationVector>();
57 for (String s: val.split(",")) {
58 String[] f = s.split(";");
62 rv.add(new ReplicationVector(DBFieldHandler.funesc(f[0]), DBFieldHandler.funesc(f[1]), DBFieldHandler.funesc(f[2])));
66 public void set(PreparedStatement ps, int index, Object val) throws Exception {
68 ps.setString(index, null);
72 StringBuffer sb = new StringBuffer();
75 ReplicationVector rv = (ReplicationVector)o;
76 sb.append(sep).append(DBFieldHandler.fesc(rv.getFqtn())).append(';').append(DBFieldHandler.fesc(rv.getSourceCluster())).append(';').append(DBFieldHandler.fesc(rv.getTargetCluster()));
79 ps.setString(index, sb.toString());
83 // modified version of MirrorVectorHandler for Topics
84 private static class MirrorTopicsHandler implements DBFieldHandler.SqlOp {
85 public Object get(ResultSet rs, int index) throws Exception {
86 String val = rs.getString(index);
90 List<String> rv = new ArrayList<String>();
91 for (String s: val.split(",")) {
92 //String[] f = s.split(";");
96 rv.add(new String(s));
100 public void set(PreparedStatement ps, int index, Object val) throws Exception {
102 ps.setString(index, null);
105 @SuppressWarnings("unchecked")
106 List<String> xv = (List<String>)val;
107 StringBuffer sb = new StringBuffer();
110 String rv = (String)o;
111 sb.append(sep).append(DBFieldHandler.fesc(rv));
114 ps.setString(index, sb.toString());
117 private static class TopicReplicationTypeHandler implements DBFieldHandler.SqlOp {
118 public Object get(ResultSet rs, int index) throws Exception {
119 int val = rs.getInt(index);
121 return (ReplicationType.valueOf(val));
123 public void set(PreparedStatement ps, int index, Object val) throws Exception {
128 @SuppressWarnings("unchecked")
129 ReplicationType rep = (ReplicationType) val;
130 ps.setInt(index, rep.getValue());
133 public static Singleton<Dmaap> getDmaap() {
139 public static Map<String, DcaeLocation> getDcaeLocations() {
140 return dcaeLocations;
143 public static Map<String, DR_Node> getDr_nodes() {
147 public static Map<String, DR_Sub> getDr_subs() {
150 public static Map<String, DR_Pub> getDr_pubs() {
154 public static Map<String, MR_Client> getMr_clients() {
159 public static Map<String, MR_Cluster> getMr_clusters() {
163 public static Map<String, Feed> getFeeds() {
166 public static Map<String, Topic> getTopics() {
169 public static Map<String, MirrorMaker> getMirrorMakers() {
175 appLogger.info( "begin static initialization");
176 appLogger.info( "initializing dmaap" );
177 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
178 if ("true".equalsIgnoreCase(p.getProperty("UsePGSQL", "false"))) {
179 appLogger.info("Data from database");
181 LoadSchema.upgrade();
182 } catch (Exception e) {
183 appLogger.warn("Problem updating DB schema", e);
186 dmaap = new DBSingleton<Dmaap>(Dmaap.class, "dmaap");
187 dcaeLocations = new DBMap<DcaeLocation>(DcaeLocation.class, "dcae_location", "dcae_location_name");
188 dr_nodes = new DBMap<DR_Node>(DR_Node.class, "dr_node", "fqdn");
189 dr_pubs = new DBMap<DR_Pub>(DR_Pub.class, "dr_pub", "pub_id");
190 dr_subs = new DBMap<DR_Sub>(DR_Sub.class, "dr_sub", "sub_id");
191 mr_clients = new DBMap<MR_Client>(MR_Client.class, "mr_client", "mr_client_id");
192 mr_clusters = new DBMap<MR_Cluster>(MR_Cluster.class, "mr_cluster", "dcae_location_name");
193 feeds = new DBMap<Feed>(Feed.class, "feed", "feed_id");
194 TableHandler.setSpecialCase("topic", "replication_case", new TopicReplicationTypeHandler());
195 topics = new DBMap<Topic>(Topic.class, "topic", "fqtn");
196 //TableHandler.setSpecialCase("mirror_maker", "vectors", new MirrorVectorHandler());
197 TableHandler.setSpecialCase("mirror_maker", "topics", new MirrorTopicsHandler());
198 mirrors = new DBMap<MirrorMaker>(MirrorMaker.class, "mirror_maker", "mm_name");
199 } catch (Exception e) {
200 errorLogger.error("Error initializing database access " + e, e);
204 appLogger.info("Data from memory");
205 dmaap = new Singleton<Dmaap>() {
207 public void remove() {
210 public void init(Dmaap val) {
218 public void update(Dmaap nd) {
219 dmaap.setVersion(nd.getVersion());
220 dmaap.setTopicNsRoot(nd.getTopicNsRoot());
221 dmaap.setDmaapName(nd.getDmaapName());
222 dmaap.setDrProvUrl(nd.getDrProvUrl());
223 dmaap.setBridgeAdminTopic(nd.getBridgeAdminTopic());
224 dmaap.setLoggingUrl(nd.getLoggingUrl());
225 dmaap.setNodeKey(nd.getNodeKey());
226 dmaap.setAccessKeyOwner(nd.getAccessKeyOwner());
229 dcaeLocations = new HashMap<String, DcaeLocation>();
230 dr_nodes = new HashMap<String, DR_Node>();
231 dr_pubs = new HashMap<String, DR_Pub>();
232 dr_subs = new HashMap<String, DR_Sub>();
233 mr_clients = new HashMap<String, MR_Client>();
234 mr_clusters = new HashMap<String, MR_Cluster>();
235 feeds = new HashMap<String, Feed>();
236 topics = new HashMap<String, Topic>();
237 mirrors = new HashMap<String, MirrorMaker>();
239 dmaap.init(new Dmaap("0", "", "", "", "", "", "", ""));
240 // check for, and set up initial data, if it isn't already there
241 Dmaap dmx = dmaap.get();
242 if ("0".equals(dmx.getVersion())) {
244 dmx = new Dmaap("0", "", "", "", "", "", "", "");
245 dmx.setDmaapName(p.getProperty("DmaapName"));
246 dmx.setDrProvUrl("https://" + p.getProperty("DR.provhost", "notSet"));
247 dmx.setTopicNsRoot(p.getProperty("topicNsRoot"));
248 dmx.setBridgeAdminTopic("DCAE_MM_AGENT");
252 } catch (Exception e) {
253 errorLogger.error(DmaapbcLogMessageEnum.DB_UPDATE_ERROR, e.getMessage());
257 public synchronized static String getNextClientId() {
259 long id = System.currentTimeMillis();
260 if ( id <= lastTime ) {
264 return Long.toString(id);