2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2018 Intel Corp. All rights reserved.
4 * Copyright (C) 2019 Nordix Foundation.
5 * Modifications Copyright (C) 2019 AT&T Intellectual Property.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.distribution.reception.handling.file;
25 import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
28 import java.io.IOException;
29 import java.nio.file.FileSystems;
30 import java.nio.file.Path;
31 import java.nio.file.Paths;
32 import java.nio.file.WatchEvent;
33 import java.nio.file.WatchKey;
34 import java.nio.file.WatchService;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.TimeUnit;
38 import java.util.zip.ZipFile;
40 import org.onap.policy.common.parameters.ParameterService;
41 import org.onap.policy.distribution.model.Csar;
42 import org.onap.policy.distribution.reception.decoding.PolicyDecodingException;
43 import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler;
44 import org.onap.policy.distribution.reception.statistics.DistributionStatisticsManager;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 * Handles reception of inputs from File System which can be used to decode policies.
51 public class FileSystemReceptionHandler extends AbstractReceptionHandler {
53 private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemReceptionHandler.class);
54 private boolean running = false;
60 protected void initializeReception(final String parameterGroupName) {
61 LOGGER.debug("FileSystemReceptionHandler init...");
63 final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
64 ParameterService.get(parameterGroupName);
65 final FileClientHandler fileClientHandler =
66 new FileClientHandler(this, handlerParameters.getWatchPath(), handlerParameters.getMaxThread());
67 final Thread fileWatcherThread = new Thread(fileClientHandler);
68 fileWatcherThread.start();
69 } catch (final Exception ex) {
70 LOGGER.error("FileSystemReceptionHandler initialization failed", ex);
78 public void destroy() {
83 * Method to check the running status of file watcher thread.
85 * @return the running status
87 public boolean isRunning() {
92 * Initialize the file watcher thread.
94 * @param watchPath Path to watch
96 public void initFileWatcher(final String watchPath, final int maxThread) throws IOException {
97 try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
98 final Path dir = Paths.get(watchPath);
99 dir.register(watcher, ENTRY_CREATE);
100 LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName());
101 startWatchService(watcher, dir, maxThread);
102 } catch (final Exception ex) {
103 LOGGER.error("FileWatcher initialization failed", ex);
104 Thread.currentThread().interrupt();
109 * Method to keep watching the given path for any new file created.
111 * @param watcher the watcher
112 * @param dir the watch directory
113 * @param maxThread the max thread number
114 * @throws InterruptedException if it occurs
116 protected void startWatchService(final WatchService watcher, final Path dir, final int maxThread)
117 throws InterruptedException {
119 final ExecutorService pool = Executors.newFixedThreadPool(maxThread);
124 key = watcher.take();
125 processFileEvents(dir, key, pool);
126 final boolean valid = key.reset();
128 LOGGER.error("Watch key no longer valid!");
137 private void processFileEvents(final Path dir, final WatchKey key, final ExecutorService pool) {
138 for (final WatchEvent<?> event : key.pollEvents()) {
139 @SuppressWarnings("unchecked")
140 final WatchEvent<Path> ev = (WatchEvent<Path>) event;
141 final Path fileName = ev.context();
143 LOGGER.debug("new CSAR found: {}", fileName);
144 DistributionStatisticsManager.updateTotalDistributionCount();
145 final String fullFilePath = dir.toString() + File.separator + fileName.toString();
147 waitForFileToBeReady(fullFilePath);
148 createPolicyInputAndCallHandler(fullFilePath);
149 LOGGER.debug("CSAR complete: {}", fileName);
150 } catch (final InterruptedException e) {
151 LOGGER.error("waitForFileToBeReady interrupted", e);
152 Thread.currentThread().interrupt();
159 * Method to create policy input & call policy handlers.
161 * @param fileName the filename
163 protected void createPolicyInputAndCallHandler(final String fileName) {
165 final Csar csarObject = new Csar(fileName);
166 DistributionStatisticsManager.updateTotalDownloadCount();
167 inputReceived(csarObject);
168 DistributionStatisticsManager.updateDownloadSuccessCount();
169 DistributionStatisticsManager.updateDistributionSuccessCount();
170 } catch (final PolicyDecodingException ex) {
171 DistributionStatisticsManager.updateDownloadFailureCount();
172 DistributionStatisticsManager.updateDistributionFailureCount();
173 LOGGER.error("Policy creation failed", ex);
177 private void waitForFileToBeReady(final String fullFilePath) throws InterruptedException {
180 TimeUnit.MILLISECONDS.sleep(100);
181 try (ZipFile zipFile = new ZipFile(fullFilePath)) {
183 } catch (final IOException exp) {
184 LOGGER.error("file is not ready for reading, wait for sometime and try again", exp);