vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / aria / utils / threading.py
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/utils/threading.py b/azure/aria/aria-extension-cloudify/src/aria/aria/utils/threading.py
new file mode 100644 (file)
index 0000000..f5ca302
--- /dev/null
@@ -0,0 +1,286 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Threading utilities.
+"""
+
+from __future__ import absolute_import  # so we can import standard 'threading'
+
+import sys
+import itertools
+import multiprocessing
+from threading import (Thread, Lock)
+from Queue import (Queue, Full, Empty)
+
+from .exceptions import print_exception
+
+class ExecutorException(Exception):
+    pass
+
+
+class DaemonThread(Thread):
+    def __init__(self, *args, **kwargs):
+        super(DaemonThread, self).__init__(*args, **kwargs)
+        self.daemon = True
+
+    def run(self):
+        """
+        We're overriding ``Thread.run`` in order to avoid annoying (but harmless) error messages
+        during shutdown. The problem is that CPython nullifies the global state _before_ shutting
+        down daemon threads, so that exceptions might happen, and then ``Thread.__bootstrap_inner``
+        prints them out.
+
+        Our solution is to swallow these exceptions here.
+
+        The side effect is that uncaught exceptions in our own thread code will _not_ be printed out
+        as usual, so it's our responsibility to catch them in our code.
+        """
+
+        try:
+            super(DaemonThread, self).run()
+        except SystemExit as e:
+            # This exception should be bubbled up
+            raise e
+        except BaseException:
+            # Exceptions might occur in daemon threads during interpreter shutdown
+            pass
+
+
+# https://gist.github.com/tliron/81dd915166b0bfc64be08b4f8e22c835
+class FixedThreadPoolExecutor(object):
+    """
+    Executes tasks in a fixed thread pool.
+
+    Makes sure to gather all returned results and thrown exceptions in one place, in order of task
+    submission.
+
+    Example::
+
+        def sum(arg1, arg2):
+            return arg1 + arg2
+
+        executor = FixedThreadPoolExecutor(10)
+        try:
+            for value in range(100):
+                executor.submit(sum, value, value)
+            executor.drain()
+        except:
+            executor.close()
+        executor.raise_first()
+        print executor.returns
+
+    You can also use it with the Python ``with`` keyword, in which case you don't need to call
+    ``close`` explicitly::
+
+        with FixedThreadPoolExecutor(10) as executor:
+            for value in range(100):
+                executor.submit(sum, value, value)
+            executor.drain()
+            executor.raise_first()
+            print executor.returns
+    """
+
+    _CYANIDE = object()  # Special task marker used to kill worker threads.
+
+    def __init__(self,
+                 size=None,
+                 timeout=None,
+                 print_exceptions=False):
+        """
+        :param size: number of threads in the pool; if ``None`` will use an optimal number for the
+         platform
+        :param timeout: timeout in seconds for all blocking operations (``None`` means no timeout)
+        :param print_exceptions: set to ``True`` in order to print exceptions from tasks
+        """
+        if not size:
+            try:
+                size = multiprocessing.cpu_count() * 2 + 1
+            except NotImplementedError:
+                size = 3
+
+        self.size = size
+        self.timeout = timeout
+        self.print_exceptions = print_exceptions
+
+        self._tasks = Queue()
+        self._returns = {}
+        self._exceptions = {}
+        self._id_creator = itertools.count()
+        self._lock = Lock() # for console output
+
+        self._workers = []
+        for index in range(size):
+            worker = DaemonThread(
+                name='%s%d' % (self.__class__.__name__, index),
+                target=self._thread_worker)
+            worker.start()
+            self._workers.append(worker)
+
+    def submit(self, func, *args, **kwargs):
+        """
+        Submit a task for execution.
+
+        The task will be called ASAP on the next available worker thread in the pool.
+
+        :raises ExecutorException: if cannot be submitted
+        """
+
+        try:
+            self._tasks.put((self._id_creator.next(), func, args, kwargs), timeout=self.timeout)
+        except Full:
+            raise ExecutorException('cannot submit task: queue is full')
+
+    def close(self):
+        """
+        Blocks until all current tasks finish execution and all worker threads are dead.
+
+        You cannot submit tasks anymore after calling this.
+
+        This is called automatically upon exit if you are using the ``with`` keyword.
+        """
+
+        self.drain()
+        while self.is_alive:
+            try:
+                self._tasks.put(self._CYANIDE, timeout=self.timeout)
+            except Full:
+                raise ExecutorException('cannot close executor: a thread seems to be hanging')
+        self._workers = None
+
+    def drain(self):
+        """
+        Blocks until all current tasks finish execution, but leaves the worker threads alive.
+        """
+
+        self._tasks.join()  # oddly, the API does not support a timeout parameter
+
+    @property
+    def is_alive(self):
+        """
+        True if any of the worker threads are alive.
+        """
+
+        for worker in self._workers:
+            if worker.is_alive():
+                return True
+        return False
+
+    @property
+    def returns(self):
+        """
+        The returned values from all tasks, in order of submission.
+        """
+
+        return [self._returns[k] for k in sorted(self._returns)]
+
+    @property
+    def exceptions(self):
+        """
+        The raised exceptions from all tasks, in order of submission.
+        """
+
+        return [self._exceptions[k] for k in sorted(self._exceptions)]
+
+    def raise_first(self):
+        """
+        If exceptions were thrown by any task, then the first one will be raised.
+
+        This is rather arbitrary: proper handling would involve iterating all the exceptions.
+        However, if you want to use the "raise" mechanism, you are limited to raising only one of
+        them.
+        """
+
+        exceptions = self.exceptions
+        if exceptions:
+            raise exceptions[0]
+
+    def _thread_worker(self):
+        while True:
+            if not self._execute_next_task():
+                break
+
+    def _execute_next_task(self):
+        try:
+            task = self._tasks.get(timeout=self.timeout)
+        except Empty:
+            # Happens if timeout is reached
+            return True
+        if task == self._CYANIDE:
+            # Time to die :(
+            return False
+        self._execute_task(*task)
+        return True
+
+    def _execute_task(self, task_id, func, args, kwargs):
+        try:
+            result = func(*args, **kwargs)
+            self._returns[task_id] = result
+        except Exception as e:
+            self._exceptions[task_id] = e
+            if self.print_exceptions:
+                with self._lock:
+                    print_exception(e)
+        self._tasks.task_done()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, the_type, value, traceback):
+        self.close()
+        return False
+
+
+class LockedList(list):
+    """
+    A list that supports the ``with`` keyword with a built-in lock.
+
+    Though Python lists are thread-safe in that they will not raise exceptions during concurrent
+    access, they do not guarantee atomicity. This class will let you gain atomicity when needed.
+    """
+
+    def __init__(self, *args, **kwargs):
+        super(LockedList, self).__init__(*args, **kwargs)
+        self.lock = Lock()
+
+    def __enter__(self):
+        return self.lock.__enter__()
+
+    def __exit__(self, the_type, value, traceback):
+        return self.lock.__exit__(the_type, value, traceback)
+
+
+class ExceptionThread(Thread):
+    """
+    A thread from which top level exceptions can be retrieved or re-raised.
+    """
+    def __init__(self, *args, **kwargs):
+        Thread.__init__(self, *args, **kwargs)
+        self.exception = None
+        self.daemon = True
+
+    def run(self):
+        try:
+            super(ExceptionThread, self).run()
+        except BaseException:
+            self.exception = sys.exc_info()
+
+    def is_error(self):
+        return self.exception is not None
+
+    def raise_error_if_exists(self):
+        if self.is_error():
+            type_, value, trace = self.exception
+            raise type_, value, trace