Modify groupId for ems driver
[vfc/nfvo/driver/ems.git] / ems / boco / src / main / java / org / onap / vfc / nfvo / emsdriver / collector / TaskThreadService.java
1 /**
2  * Copyright 2017 BOCO Corporation.  CMCC Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.vfc.nfvo.emsdriver.collector;
17
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
27 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
28 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
29
30
31
32 public class TaskThreadService extends Thread {
33         
34         public  Log log = LogFactory.getLog(TaskThreadService.class);
35         private final ExecutorService pool;
36         
37         private BlockingQueue<CollectMsg> queue = new  LinkedBlockingQueue<CollectMsg>();
38         private boolean startFlag = true;
39         
40         public static TaskThreadService getInstance(int poolSize) {
41                         return new TaskThreadService(poolSize);
42         }
43         private TaskThreadService(int poolSize) {
44                 pool = Executors.newFixedThreadPool(poolSize);
45         }
46         
47         private long timeStamp = System.currentTimeMillis();
48         public void run() { // run the service
49                 try {
50                         while(startFlag) {
51                                 
52                                 try {
53                                         if(System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE){
54                                                 timeStamp = System.currentTimeMillis();
55                                                 log.debug("task queue size " + queue.size());
56                                         }
57
58                                         CollectMsg data = receive();
59                                         if(data == null){
60                                                 continue;
61                                         }
62                                         
63                                         pool.execute(new TaskThread(data));
64                                         
65                                         
66                                 } catch (Exception e) {
67                                         log.error("collect task process fail!"+StringUtil.getStackTrace(e));
68                                 }
69                                 
70                         }
71                         
72                 } catch (Exception ex) {
73                         log.error("task ThreadService error "+StringUtil.getStackTrace(ex));
74                         pool.shutdown();
75                 }
76                 log.error("Task ThreadService exit");
77         }
78         
79
80         
81         public CollectMsg receive() {
82                 try {
83                         return queue.poll(100, TimeUnit.MILLISECONDS);
84                 } catch (InterruptedException e) {
85                         log.error("queue.poll is error"+StringUtil.getStackTrace(e));
86                 }
87                 return null;
88         }
89         
90         
91
92         public void add(CollectMsg data){
93                 try {
94                         queue.put(data);
95                 } catch (InterruptedException e) {
96                         log.error("queue.put is error"+StringUtil.getStackTrace(e));
97                 }
98         }
99         
100
101         public int size(){
102                 return queue.size();
103         }
104         
105         public void stopTask(){
106                 startFlag = false;
107         }
108 }