237cac8c78c79107bba66b4006ffafaa3c3e38fe
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / backends / memory / MemoryConsumerFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.backends.memory;
23
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.HashMap;
27
28 import com.att.dmf.mr.CambriaApiException;
29 import com.att.dmf.mr.backends.Consumer;
30 import com.att.dmf.mr.backends.ConsumerFactory;
31 /**
32  * 
33  * @author anowarul.islam
34  *
35  */
36 public class MemoryConsumerFactory implements ConsumerFactory
37 {
38
39         private final MemoryQueue fQueue;
40         
41         /**
42          * 
43          * Initializing constructor
44          * @param q
45          */
46         public MemoryConsumerFactory ( MemoryQueue q )
47         {
48                 fQueue = q;
49         }
50
51         /**
52          * 
53          * @param topic
54          * @param consumerGroupId
55          * @param clientId
56          * @param timeoutMs
57          * @return Consumer
58          */
59         @Override
60         public Consumer getConsumerFor ( String topic, String consumerGroupId, String clientId, int timeoutMs, String remotehost )
61         {
62                 return new MemoryConsumer ( topic, consumerGroupId );
63         }
64
65         /**
66          * 
67          * Define nested inner class
68          *
69          */
70         private class MemoryConsumer implements Consumer
71         {
72
73                 private final String fTopic;
74                 private final String fConsumer;
75                 private final long fCreateMs;
76                 private long fLastAccessMs;
77                 
78                 /**
79                  * 
80                  * Initializing MemoryConsumer constructor 
81                  * @param topic
82                  * @param consumer
83                  * 
84                  */
85                 public MemoryConsumer ( String topic, String consumer )
86                 {
87                         fTopic = topic;
88                         fConsumer = consumer;
89                         fCreateMs = System.currentTimeMillis ();
90                         fLastAccessMs = fCreateMs;
91                 }
92
93                 @Override
94                 /**
95                  * 
96                  * return consumer details  
97                  */
98                 public Message nextMessage ()
99                 {
100                         return fQueue.get ( fTopic, fConsumer );
101                 }
102
103                 @Override
104                 public boolean close() {
105                         //Nothing to close/clean up.
106                         return true;
107                 }
108                 /**
109                  * 
110                  */
111                 public void commitOffsets()
112                 {
113                         // ignoring this aspect
114                 }
115                 /**
116                  * get offset
117                  */
118                 public long getOffset()
119                 {
120                         return 0;
121                 }
122
123                 @Override
124                 /**
125                  * get consumer topic name
126                  */
127                 public String getName ()
128                 {
129                         return fTopic + "/" + fConsumer;
130                 }
131
132                 @Override
133                 public long getCreateTimeMs ()
134                 {
135                         return fCreateMs;
136                 }
137
138                 @Override
139                 public long getLastAccessMs ()
140                 {
141                         return fLastAccessMs;
142                 }
143
144                 
145
146                 @Override
147                 public void setOffset(long offset) {
148                         // TODO Auto-generated method stub
149                         
150                 }
151
152                 
153         }
154
155         @Override
156         public void destroyConsumer(String topic, String consumerGroupId,
157                         String clientId) {
158                 //No cache for memory consumers, so NOOP
159         }
160
161         @Override
162         public void dropCache ()
163         {
164                 // nothing to do - there's no cache here
165         }
166
167         @Override
168         /**
169          * @return ArrayList<MemoryConsumer>
170          */
171         public Collection<? extends Consumer> getConsumers ()
172         {
173                 return new ArrayList<> ();
174         }
175
176         @Override
177         public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
178                         String remotehost) throws UnavailableException, CambriaApiException {
179                 // TODO Auto-generated method stub
180                 return null;
181         }
182
183         
184 }