tornado_util.py 4.09 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
"""Utilities used in tornado."""

import socket
import errno
from tornado import ioloop

class TCPHandler(object):
    """TCP socket handler backed tornado event loop.

    Parameters
    ----------
    sock : Socket
        The TCP socket, will set it to non-blocking mode.
    """
    def __init__(self, sock):
        self._sock = sock
        self._ioloop = ioloop.IOLoop.current()
        self._sock.setblocking(0)
        self._pending_write = []
        self._signal_close = False
        def _event_handler(_, events):
            self._event_handler(events)
        self._ioloop.add_handler(
            self._sock.fileno(), _event_handler,
            self._ioloop.READ | self._ioloop.ERROR)

    def signal_close(self):
        """Signal the handler to close.

        The handler will be closed after the existing
        pending message are sent to the peer.
        """
        if not self._pending_write:
            self.close()
        else:
            self._signal_close = True

    def close(self):
        """Close the socket"""
        if self._sock is not None:
            try:
                self._ioloop.remove_handler(self._sock.fileno())
                self._sock.close()
            except socket.error:
                pass
            self._sock = None
            self.on_close()

    def write_message(self, message, binary=True):
        assert binary
67 68
        if self._sock is None:
            raise IOError("socket is already closed")
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
        self._pending_write.append(message)
        self._update_write()

    def _event_handler(self, events):
        """centeral event handler"""
        if (events & self._ioloop.ERROR) or (events & self._ioloop.READ):
            if self._update_read() and (events & self._ioloop.WRITE):
                self._update_write()
        elif events & self._ioloop.WRITE:
            self._update_write()

    def _update_write(self):
        """Update the state on write"""
        while self._pending_write:
            try:
                msg = self._pending_write[0]
85 86
                if self._sock is None:
                    return
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
                nsend = self._sock.send(msg)
                if nsend != len(msg):
                    self._pending_write[0] = msg[nsend:]
                else:
                    self._pending_write.pop(0)
            except socket.error as err:
                if err.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK):
                    break
                else:
                    self.on_error(err)
        if self._pending_write:
            self._ioloop.update_handler(
                self._sock.fileno(), self._ioloop.READ | self._ioloop.ERROR | self._ioloop.WRITE)
        else:
            if self._signal_close:
                self.close()
            else:
                self._ioloop.update_handler(
                    self._sock.fileno(), self._ioloop.READ | self._ioloop.ERROR)

    def _update_read(self):
        """Update state when there is read event"""
        try:
            msg = bytes(self._sock.recv(4096))
            if msg:
                self.on_message(msg)
                return True
114 115
            # normal close, remote is closed
            self.close()
116 117 118 119 120 121
        except socket.error as err:
            if err.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK):
                pass
            else:
                self.on_error(err)
        return False