77269bb0096da22ff6e3a9d55b8684ad1364fda0
[policy/distribution.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2018 Intel Corp. All rights reserved.
4  *  Copyright (C) 2019 Nordix Foundation.
5  *  Modifications Copyright (C) 2019 AT&T Intellectual Property.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.distribution.reception.handling.file;
24
25 import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
26
27 import java.io.File;
28 import java.io.IOException;
29 import java.nio.file.FileSystems;
30 import java.nio.file.Path;
31 import java.nio.file.Paths;
32 import java.nio.file.WatchEvent;
33 import java.nio.file.WatchKey;
34 import java.nio.file.WatchService;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.TimeUnit;
38 import java.util.zip.ZipFile;
39
40 import org.onap.policy.common.parameters.ParameterService;
41 import org.onap.policy.distribution.model.Csar;
42 import org.onap.policy.distribution.reception.decoding.PolicyDecodingException;
43 import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler;
44 import org.onap.policy.distribution.reception.statistics.DistributionStatisticsManager;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * Handles reception of inputs from File System which can be used to decode policies.
50  */
51 public class FileSystemReceptionHandler extends AbstractReceptionHandler {
52
53     private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemReceptionHandler.class);
54     private boolean running = false;
55
56     /**
57      * {@inheritDoc}.
58      */
59     @Override
60     protected void initializeReception(final String parameterGroupName) {
61         LOGGER.debug("FileSystemReceptionHandler init...");
62         try {
63             final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
64                     ParameterService.get(parameterGroupName);
65             final FileClientHandler fileClientHandler = new FileClientHandler(this,
66                     handlerParameters.getWatchPath(),
67                     handlerParameters.getMaxThread());
68             final Thread fileWatcherThread = new Thread(fileClientHandler);
69             fileWatcherThread.start();
70         } catch (final Exception ex) {
71             LOGGER.error("FileSystemReceptionHandler initialization failed", ex);
72         }
73     }
74
75     /**
76      * {@inheritDoc}.
77      */
78     @Override
79     public void destroy() {
80         running = false;
81     }
82
83     /**
84      * Method to check the running status of file watcher thread.
85      *
86      * @return the running status
87      */
88     public boolean isRunning() {
89         return running;
90     }
91
92     /**
93      * Initialize the file watcher thread.
94      *
95      * @param watchPath Path to watch
96      */
97     public void initFileWatcher(final String watchPath, final int maxThread) throws IOException {
98         try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
99             final Path dir = Paths.get(watchPath);
100             dir.register(watcher, ENTRY_CREATE);
101             LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName());
102             startWatchService(watcher, dir, maxThread);
103         } catch (final Exception ex) {
104             LOGGER.error("FileWatcher initialization failed", ex);
105             Thread.currentThread().interrupt();
106         }
107     }
108
109     /**
110      * Method to keep watching the given path for any new file created.
111      *
112      * @param watcher the watcher
113      * @param dir the watch directory
114      * @param maxThread the max thread number
115      * @throws InterruptedException if it occurs
116      */
117     protected void startWatchService(final WatchService watcher,
118             final Path dir, int maxThread) throws InterruptedException {
119         WatchKey key;
120         ExecutorService pool = Executors.newFixedThreadPool(maxThread);
121
122         try {
123             running = true;
124             while (running) {
125                 key = watcher.take();
126                 processFileEvents(dir, key, pool);
127                 final boolean valid = key.reset();
128                 if (!valid) {
129                     LOGGER.error("Watch key no longer valid!");
130                     break;
131                 }
132             }
133         } finally {
134             pool.shutdown();
135         }
136     }
137
138     private void processFileEvents(Path dir, WatchKey key, ExecutorService pool) {
139         for (final WatchEvent<?> event : key.pollEvents()) {
140             @SuppressWarnings("unchecked")
141             final WatchEvent<Path> ev = (WatchEvent<Path>) event;
142             final Path fileName = ev.context();
143             pool.execute(() -> {
144                 LOGGER.debug("new CSAR found: {}", fileName);
145                 DistributionStatisticsManager.updateTotalDistributionCount();
146                 final String fullFilePath = dir.toString() + File.separator + fileName.toString();
147                 try {
148                     waitForFileToBeReady(fullFilePath);
149                     createPolicyInputAndCallHandler(fullFilePath);
150                     LOGGER.debug("CSAR complete: {}", fileName);
151                 } catch (InterruptedException e) {
152                     LOGGER.error("waitForFileToBeReady interrupted", e);
153                     Thread.currentThread().interrupt();
154                 }
155             });
156         }
157     }
158
159     /**
160      * Method to create policy input & call policy handlers.
161      *
162      * @param fileName the filename
163      */
164     protected void createPolicyInputAndCallHandler(final String fileName) {
165         try {
166             final Csar csarObject = new Csar(fileName);
167             DistributionStatisticsManager.updateTotalDownloadCount();
168             inputReceived(csarObject);
169             DistributionStatisticsManager.updateDownloadSuccessCount();
170             DistributionStatisticsManager.updateDistributionSuccessCount();
171         } catch (final PolicyDecodingException ex) {
172             DistributionStatisticsManager.updateDownloadFailureCount();
173             DistributionStatisticsManager.updateDistributionFailureCount();
174             LOGGER.error("Policy creation failed", ex);
175         }
176     }
177
178     private void waitForFileToBeReady(final String fullFilePath) throws InterruptedException {
179         boolean flag = true;
180         while (flag) {
181             TimeUnit.MILLISECONDS.sleep(100);
182             try (ZipFile zipFile = new ZipFile(fullFilePath)) {
183                 flag = false;
184             } catch (final IOException exp) {
185                 LOGGER.error("file is not ready for reading, wait for sometime and try again", exp);
186             }
187         }
188     }
189 }