2 * Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.vfc.nfvo.emsdriver.collector;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.TimeUnit;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.onap.vfc.nfvo.emsdriver.commons.constant.Constant;
27 import org.onap.vfc.nfvo.emsdriver.commons.model.CollectMsg;
28 import org.onap.vfc.nfvo.emsdriver.commons.utils.StringUtil;
32 public class TaskThreadService extends Thread {
34 public Log log = LogFactory.getLog(TaskThreadService.class);
35 private final ExecutorService pool;
37 private BlockingQueue<CollectMsg> queue = new LinkedBlockingQueue<CollectMsg>();
38 private boolean startFlag = true;
40 public static TaskThreadService getInstance(int poolSize) {
41 return new TaskThreadService(poolSize);
43 private TaskThreadService(int poolSize) {
44 pool = Executors.newFixedThreadPool(poolSize);
47 private long timeStamp = System.currentTimeMillis();
48 public void run() { // run the service
53 if(System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE){
54 timeStamp = System.currentTimeMillis();
55 log.debug("task queue size " + queue.size());
58 CollectMsg data = receive();
63 pool.execute(new TaskThread(data));
66 } catch (Exception e) {
67 log.error("collect task process fail!"+StringUtil.getStackTrace(e));
72 } catch (Exception ex) {
73 log.error("task ThreadService error "+StringUtil.getStackTrace(ex));
76 log.error("Task ThreadService exit");
81 public CollectMsg receive() {
83 return queue.poll(100, TimeUnit.MILLISECONDS);
84 } catch (InterruptedException e) {
85 log.error("queue.poll is error"+StringUtil.getStackTrace(e));
92 public void add(CollectMsg data){
95 } catch (InterruptedException e) {
96 log.error("queue.put is error"+StringUtil.getStackTrace(e));
105 public void stopTask(){