tornado_util.py 3.28 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
"""Utilities used in tornado."""

import socket
import errno
from tornado import ioloop

class TCPHandler(object):
    """TCP socket handler backed tornado event loop.

    Parameters
    ----------
    sock : Socket
        The TCP socket, will set it to non-blocking mode.
    """
    def __init__(self, sock):
        self._sock = sock
        self._ioloop = ioloop.IOLoop.current()
        self._sock.setblocking(0)
        self._pending_write = []
        self._signal_close = False
        def _event_handler(_, events):
            self._event_handler(events)
        self._ioloop.add_handler(
            self._sock.fileno(), _event_handler,
            self._ioloop.READ | self._ioloop.ERROR)

    def signal_close(self):
        """Signal the handler to close.

        The handler will be closed after the existing
        pending message are sent to the peer.
        """
        if not self._pending_write:
            self.close()
        else:
            self._signal_close = True

    def close(self):
        """Close the socket"""
        if self._sock is not None:
            try:
                self._ioloop.remove_handler(self._sock.fileno())
                self._sock.close()
            except socket.error:
                pass
            self._sock = None
            self.on_close()

    def write_message(self, message, binary=True):
        assert binary
51 52
        if self._sock is None:
            raise IOError("socket is already closed")
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
        self._pending_write.append(message)
        self._update_write()

    def _event_handler(self, events):
        """centeral event handler"""
        if (events & self._ioloop.ERROR) or (events & self._ioloop.READ):
            if self._update_read() and (events & self._ioloop.WRITE):
                self._update_write()
        elif events & self._ioloop.WRITE:
            self._update_write()

    def _update_write(self):
        """Update the state on write"""
        while self._pending_write:
            try:
                msg = self._pending_write[0]
                nsend = self._sock.send(msg)
                if nsend != len(msg):
                    self._pending_write[0] = msg[nsend:]
                else:
                    self._pending_write.pop(0)
            except socket.error as err:
                if err.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK):
                    break
                else:
                    self.on_error(err)
        if self._pending_write:
            self._ioloop.update_handler(
                self._sock.fileno(), self._ioloop.READ | self._ioloop.ERROR | self._ioloop.WRITE)
        else:
            if self._signal_close:
                self.close()
            else:
                self._ioloop.update_handler(
                    self._sock.fileno(), self._ioloop.READ | self._ioloop.ERROR)

    def _update_read(self):
        """Update state when there is read event"""
        try:
            msg = bytes(self._sock.recv(4096))
            if msg:
                self.on_message(msg)
                return True
            else:
                # normal close, remote is closed
                self.close()
        except socket.error as err:
            if err.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK):
                pass
            else:
                self.on_error(err)
        return False