1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.dmf.mr.backends.memory;
24 import java.util.ArrayList;
25 import java.util.HashMap;
27 import com.att.dmf.mr.backends.Consumer;
28 import com.att.dmf.mr.backends.Publisher.message;
31 * When broker type is memory, then this class is doing all the topic related
34 * @author anowarul.islam
37 public class MemoryQueue {
38 // map from topic to list of msgs
39 private HashMap<String, LogBuffer> fQueue;
40 private HashMap<String, HashMap<String, Integer>> fOffsets;
43 * constructor storing hashMap objects in Queue and Offsets object
45 public MemoryQueue() {
46 fQueue = new HashMap<>();
47 fOffsets = new HashMap<>();
51 * method used to create topic
55 public synchronized void createTopic(String topic) {
56 LogBuffer q = fQueue.get(topic);
58 q = new LogBuffer(1024 * 1024);
64 * method used to remove topic
68 public synchronized void removeTopic(String topic) {
69 LogBuffer q = fQueue.get(topic);
76 * method to write message on topic
81 public synchronized void put(String topic, message m) {
82 LogBuffer q = fQueue.get(topic);
85 q = fQueue.get(topic);
87 q.push(m.getMessage());
91 * method to read consumer messages
97 public synchronized Consumer.Message get(String topic, String consumerName) {
98 final LogBuffer q = fQueue.get(topic);
103 HashMap<String, Integer> offsetMap = fOffsets.get(consumerName);
104 if (offsetMap == null) {
105 offsetMap = new HashMap<>();
106 fOffsets.put(consumerName, offsetMap);
108 Integer offset = offsetMap.get(topic);
109 if (offset == null) {
113 final msgInfo result = q.read(offset);
114 if (result != null && result.msg != null) {
115 offsetMap.put(topic, result.offset + 1);
121 * static inner class used to details about consumed messages
123 * @author anowarul.islam
126 private static class msgInfo implements Consumer.Message {
128 * published message which is consumed
132 * offset associated with message
137 * get offset of messages
140 public long getOffset() {
145 * get consumed message
148 public String getMessage() {
155 * @author sneha.d.desai
157 * private LogBuffer class has synchronized push and read method
159 private class LogBuffer {
160 private int fBaseOffset;
161 private final int fMaxSize;
162 private final ArrayList<String> fList;
165 * constructor initializing the offset, maxsize and list
169 public LogBuffer(int maxSize) {
172 fList = new ArrayList<>();
180 public synchronized void push(String msg) {
182 while (fList.size() > fMaxSize) {
194 public synchronized msgInfo read(int offset) {
195 final int actual = Math.max(0, offset - fBaseOffset);
197 final msgInfo mi = new msgInfo();
198 mi.msg = (actual >= fList.size()) ? null : fList.get(actual);
202 mi.offset = actual + fBaseOffset;