Source code for moulinette.utils.stream

import os
import time

from multiprocessing.process import Process
from multiprocessing.queues import SimpleQueue


# Read from a stream ---------------------------------------------------


class AsynchronousFileReader(Process):

    """
    Helper class to implement asynchronous reading of a file
    in a separate thread. Pushes read lines on a queue to
    be consumed in another thread.

    Based on:
    http://stefaanlippens.net/python-asynchronous-subprocess-pipe-reading

    """

    def __init__(self, fd, queue):
        assert hasattr(queue, "put")
        assert hasattr(queue, "empty")
        assert isinstance(fd, int) or callable(fd.readline)
        Process.__init__(self)
        self._fd = fd
        self._queue = queue

    def run(self):
        """The body of the tread: read lines and put them on the queue."""

        # If self._fd is a file opened with open()...
        # Typically that's for stdout/stderr pipes
        # We can read the stuff easily with 'readline'
        if not isinstance(self._fd, int):
            for line in iter(self._fd.readline, ""):
                self._queue.put(line)

        # Else, it got opened with os.open() and we have to read it
        # wit low level crap...
        else:
            data = ""
            while True:
                try:
                    # Try to read (non-blockingly) a few bytes, append them to
                    # the buffer
                    data += os.read(self._fd, 50)
                except Exception as e:
                    print(
                        "from moulinette.utils.stream: could not read file descriptor : %s"
                        % str(e)
                    )
                    continue

                # If nobody's writing in there anymore, get out
                if not data and os.fstat(self._fd).st_nlink == 0:
                    return

                # If we have data, extract a line (ending with \n) and feed
                # it to the consumer
                if data and "\n" in data:
                    lines = data.split("\n")
                    self._queue.put(lines[0])
                    data = "\n".join(lines[1:])
                else:
                    time.sleep(0.05)

    def eof(self):
        """Check whether there is no more content to expect."""
        return not self.is_alive() and self._queue.empty()

    def join(self, timeout=None, close=True):
        """Close the file and join the thread."""
        if close:
            self._queue.put(StopIteration)
            if isinstance(self._fd, int):
                os.close(self._fd)
            else:
                self._fd.close()
        Process.join(self, timeout)


class Consummer(object):
    def __init__(self, queue, callback):
        self.queue = queue
        self.callback = callback

    def empty(self):
        return self.queue.empty()

    def process_next_line(self):
        if not self.empty():
            line = self.queue.get()
            if line:
                if line == StopIteration:
                    return
                self.callback(line)

    def process_current_queue(self):
        while not self.empty():
            line = self.queue.get()
            if line:
                if line == StopIteration:
                    break
                self.callback(line)


[docs]def async_file_reading(fd, callback): """Helper which instantiate and run an AsynchronousFileReader.""" queue = SimpleQueue() reader = AsynchronousFileReader(fd, queue) reader.start() consummer = Consummer(queue, callback) return (reader, consummer)