Sonar major issues
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / backends / memory / MemoryQueue.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.backends.memory;
23
24 import java.util.ArrayList;
25 import java.util.HashMap;
26
27 import com.att.dmf.mr.backends.Consumer;
28 import com.att.dmf.mr.backends.Publisher.message;
29
30 /**
31  * When broker type is memory, then this class is doing all the topic related
32  * operations
33  * 
34  * @author anowarul.islam
35  *
36  */
37 public class MemoryQueue {
38         // map from topic to list of msgs
39         private HashMap<String, LogBuffer> fQueue;
40         private HashMap<String, HashMap<String, Integer>> fOffsets;
41
42         /**
43          * constructor storing hashMap objects in Queue and Offsets object
44          */
45         public MemoryQueue() {
46                 fQueue = new HashMap<>();
47                 fOffsets = new HashMap<>();
48         }
49
50         /**
51          * method used to create topic
52          * 
53          * @param topic
54          */
55         public synchronized void createTopic(String topic) {
56                 LogBuffer q = fQueue.get(topic);
57                 if (q == null) {
58                         q = new LogBuffer(1024 * 1024);
59                         fQueue.put(topic, q);
60                 }
61         }
62
63         /**
64          * method used to remove topic
65          * 
66          * @param topic
67          */
68         public synchronized void removeTopic(String topic) {
69                 LogBuffer q = fQueue.get(topic);
70                 if (q != null) {
71                         fQueue.remove(topic);
72                 }
73         }
74
75         /**
76          * method to write message on topic
77          * 
78          * @param topic
79          * @param m
80          */
81         public synchronized void put(String topic, message m) {
82                 LogBuffer q = fQueue.get(topic);
83                 if (q == null) {
84                         createTopic(topic);
85                         q = fQueue.get(topic);
86                 }
87                 q.push(m.getMessage());
88         }
89
90         /**
91          * method to read consumer messages
92          * 
93          * @param topic
94          * @param consumerName
95          * @return
96          */
97         public synchronized Consumer.Message get(String topic, String consumerName) {
98                 final LogBuffer q = fQueue.get(topic);
99                 if (q == null) {
100                         return null;
101                 }
102
103                 HashMap<String, Integer> offsetMap = fOffsets.get(consumerName);
104                 if (offsetMap == null) {
105                         offsetMap = new HashMap<>();
106                         fOffsets.put(consumerName, offsetMap);
107                 }
108                 Integer offset = offsetMap.get(topic);
109                 if (offset == null) {
110                         offset = 0;
111                 }
112
113                 final msgInfo result = q.read(offset);
114                 if (result != null && result.msg != null) {
115                         offsetMap.put(topic, result.offset + 1);
116                 }
117                 return result;
118         }
119
120         /**
121          * static inner class used to details about consumed messages
122          * 
123          * @author anowarul.islam
124          *
125          */
126         private static class msgInfo implements Consumer.Message {
127                 /**
128                  * published message which is consumed
129                  */
130                 public String msg;
131                 /**
132                  * offset associated with message
133                  */
134                 public int offset;
135
136                 /**
137                  * get offset of messages
138                  */
139                 @Override
140                 public long getOffset() {
141                         return offset;
142                 }
143
144                 /**
145                  * get consumed message
146                  */
147                 @Override
148                 public String getMessage() {
149                         return msg;
150                 }
151         }
152
153  /**
154  * 
155  * @author sneha.d.desai
156  *
157  * private LogBuffer class has synchronized push and read method
158  */
159         private class LogBuffer {
160                 private int fBaseOffset;
161                 private final int fMaxSize;
162                 private final ArrayList<String> fList;
163
164                 /**
165                  * constructor initializing the offset, maxsize and list
166                  * 
167                  * @param maxSize
168                  */
169                 public LogBuffer(int maxSize) {
170                         fBaseOffset = 0;
171                         fMaxSize = maxSize;
172                         fList = new ArrayList<>();
173                 }
174
175                 /**
176                  * pushing message
177                  * 
178                  * @param msg
179                  */
180                 public synchronized void push(String msg) {
181                         fList.add(msg);
182                         while (fList.size() > fMaxSize) {
183                                 fList.remove(0);
184                                 fBaseOffset++;
185                         }
186                 }
187
188                 /**
189                  * reading messages
190                  * 
191                  * @param offset
192                  * @return
193                  */
194                 public synchronized msgInfo read(int offset) {
195                         final int actual = Math.max(0, offset - fBaseOffset);
196
197                         final msgInfo mi = new msgInfo();
198                         mi.msg = (actual >= fList.size()) ? null : fList.get(actual);
199                         if (mi.msg == null)
200                                 return null;
201
202                         mi.offset = actual + fBaseOffset;
203                         return mi;
204                 }
205
206         }
207 }