MobileBlur

rocket.py at [eba4a35a21]
Login

rocket.py at [eba4a35a21]

File gluon/rocket.py artifact 069a2f3dcb part of check-in eba4a35a21


# -*- coding: utf-8 -*-

# This file is part of the Rocket Web Server
# Copyright (c) 2010 Timothy Farrell

# Import System Modules
import sys
import errno
import socket
import logging
import platform

# Define Constants
VERSION = '1.2.2'
SERVER_NAME = socket.gethostname()
SERVER_SOFTWARE = 'Rocket %s' % VERSION
HTTP_SERVER_SOFTWARE = '%s Python/%s' % (SERVER_SOFTWARE, sys.version.split(' ')[0])
BUF_SIZE = 16384
SOCKET_TIMEOUT = 1 # in secs
THREAD_STOP_CHECK_INTERVAL = 1 # in secs, How often should threads check for a server stop message?
IS_JYTHON = platform.system() == 'Java' # Handle special cases for Jython
IGNORE_ERRORS_ON_CLOSE = set([errno.ECONNABORTED, errno.ECONNRESET])
DEFAULT_LISTEN_QUEUE_SIZE = 5
DEFAULT_MIN_THREADS = 10
DEFAULT_MAX_THREADS = 0
DEFAULTS = dict(LISTEN_QUEUE_SIZE = DEFAULT_LISTEN_QUEUE_SIZE,
                MIN_THREADS = DEFAULT_MIN_THREADS,
                MAX_THREADS = DEFAULT_MAX_THREADS)

PY3K = sys.version_info[0] > 2

class NullHandler(logging.Handler):
    "A Logging handler to prevent library errors."
    def emit(self, record):
        pass

if PY3K:
    def b(val):
        """ Convert string/unicode/bytes literals into bytes.  This allows for
        the same code to run on Python 2.x and 3.x. """
        if isinstance(val, str):
            return val.encode()
        else:
            return val

    def u(val, encoding="us-ascii"):
        """ Convert bytes into string/unicode.  This allows for the
        same code to run on Python 2.x and 3.x. """
        if isinstance(val, bytes):
            return val.decode(encoding)
        else:
            return val

else:
    def b(val):
        """ Convert string/unicode/bytes literals into bytes.  This allows for
        the same code to run on Python 2.x and 3.x. """
        if isinstance(val, unicode):
            return val.encode()
        else:
            return val

    def u(val, encoding="us-ascii"):
        """ Convert bytes into string/unicode.  This allows for the
        same code to run on Python 2.x and 3.x. """
        if isinstance(val, str):
            return val.decode(encoding)
        else:
            return val

# Import Package Modules
# package imports removed in monolithic build

__all__ = ['VERSION', 'SERVER_SOFTWARE', 'HTTP_SERVER_SOFTWARE', 'BUF_SIZE',
           'IS_JYTHON', 'IGNORE_ERRORS_ON_CLOSE', 'DEFAULTS', 'PY3K', 'b', 'u',
           'Rocket', 'CherryPyWSGIServer', 'SERVER_NAME', 'NullHandler']

# Monolithic build...end of module: rocket\__init__.py
# Monolithic build...start of module: rocket\connection.py

# Import System Modules
import sys
import time
import socket
try:
    import ssl
    has_ssl = True
except ImportError:
    has_ssl = False
# package imports removed in monolithic build

class Connection(object):
    __slots__ = [
        'setblocking',
        'sendall',
        'shutdown',
        'makefile',
        'fileno',
        'client_addr',
        'client_port',
        'server_port',
        'socket',
        'start_time',
        'ssl',
        'secure'
    ]

    def __init__(self, sock_tuple, port, secure=False):
        self.client_addr, self.client_port = sock_tuple[1]
        self.server_port = port
        self.socket = sock_tuple[0]
        self.start_time = time.time()
        self.ssl = has_ssl and isinstance(self.socket, ssl.SSLSocket)
        self.secure = secure

        if IS_JYTHON:
            # In Jython we must set TCP_NODELAY here since it does not
            # inherit from the listening socket.
            # See: http://bugs.jython.org/issue1309
            self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

        self.socket.settimeout(SOCKET_TIMEOUT)

        self.sendall = self.socket.sendall
        self.shutdown = self.socket.shutdown
        self.fileno = self.socket.fileno
        self.makefile = self.socket.makefile
        self.setblocking = self.socket.setblocking

    def close(self):
        if hasattr(self.socket, '_sock'):
            try:
                self.socket._sock.close()
            except socket.error:
                info = sys.exc_info()
                if info[1].errno != socket.EBADF:
                    raise info[1]
                else:
                    pass
        self.socket.close()

# Monolithic build...end of module: rocket\connection.py
# Monolithic build...start of module: rocket\listener.py

# Import System Modules
import os
import socket
import logging
import traceback
from threading import Thread

try:
    import ssl
    from ssl import SSLError
    has_ssl = True
except ImportError:
    has_ssl = False
    class SSLError(socket.error):
        pass
# Import Package Modules
# package imports removed in monolithic build

class Listener(Thread):
    """The Listener class is a class responsible for accepting connections
    and queuing them to be processed by a worker thread."""

    def __init__(self, interface, queue_size, active_queue, *args, **kwargs):
        Thread.__init__(self, *args, **kwargs)

        # Instance variables
        self.active_queue = active_queue
        self.interface = interface
        self.addr = interface[0]
        self.port = interface[1]
        self.secure = len(interface) == 4 and \
                      os.path.exists(interface[2]) and \
                      os.path.exists(interface[3])
        self.ready = False

        # Error Log
        self.err_log = logging.getLogger('Rocket.Errors.Port%i' % self.port)
        self.err_log.addHandler(NullHandler())

        # Build the socket
        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        if not listener:
            self.err_log.error("Failed to get socket.")
            return

        if self.secure:
            if not has_ssl:
                self.err_log.error("ssl module required to serve HTTPS.")
                return
            elif not os.path.exists(interface[2]):
                data = (interface[2], interface[0], interface[1])
                self.err_log.error("Cannot find key file "
                          "'%s'.  Cannot bind to %s:%s" % data)
                return
            elif not os.path.exists(interface[3]):
                data = (interface[3], interface[0], interface[1])
                self.err_log.error("Cannot find certificate file "
                          "'%s'.  Cannot bind to %s:%s" % data)
                return

        # Set socket options
        try:
            listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        except:
            msg = "Cannot share socket.  Using %s:%i exclusively."
            self.err_log.warning(msg % (self.addr, self.port))

        try:
            if not IS_JYTHON:
                listener.setsockopt(socket.IPPROTO_TCP,
                                    socket.TCP_NODELAY,
                                    1)
        except:
            msg = "Cannot set TCP_NODELAY, things might run a little slower"
            self.err_log.warning(msg)

        try:
            listener.bind((self.addr, self.port))
        except:
            msg = "Socket %s:%i in use by other process and it won't share."
            self.err_log.error(msg % (self.addr, self.port))
        else:
            # We want socket operations to timeout periodically so we can
            # check if the server is shutting down
            listener.settimeout(THREAD_STOP_CHECK_INTERVAL)
            # Listen for new connections allowing queue_size number of
            # connections to wait before rejecting a connection.
            listener.listen(queue_size)

            self.listener = listener

            self.ready = True

    def wrap_socket(self, sock):
        try:
            sock = ssl.wrap_socket(sock,
                                   keyfile = self.interface[2],
                                   certfile = self.interface[3],
                                   server_side = True,
                                   ssl_version = ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock


    def run(self):
        if not self.ready:
            self.err_log.warning('Listener started when not ready.')
            return

        if __debug__:
            self.err_log.debug('Entering main loop.')
        while True:
            try:
                sock, addr = self.listener.accept()

                if self.secure:
                    sock = self.wrap_socket(sock)

                self.active_queue.put(((sock, addr),
                                       self.interface[1],
                                       self.secure))

            except socket.timeout:
                # socket.timeout will be raised every THREAD_STOP_CHECK_INTERVAL
                # seconds.  When that happens, we check if it's time to die.

                if not self.ready:
                    if __debug__:
                        self.err_log.debug('Listener exiting.')
                    return
                else:
                    continue
            except:
                self.err_log.error(str(traceback.format_exc()))

# Monolithic build...end of module: rocket\listener.py
# Monolithic build...start of module: rocket\main.py

# Import System Modules
import sys
import time
import socket
import logging
import traceback

try:
    from queue import Queue
except ImportError:
    from Queue import Queue

# Import Package Modules
# package imports removed in monolithic build





# Setup Logging
log = logging.getLogger('Rocket')
log.addHandler(NullHandler())

class Rocket(object):
    """The Rocket class is responsible for handling threads and accepting and
    dispatching connections."""

    def __init__(self,
                 interfaces = ('127.0.0.1', 8000),
                 method = 'wsgi',
                 app_info = None,
                 min_threads = None,
                 max_threads = None,
                 queue_size = None,
                 timeout = 600,
                 handle_signals = True):

        self.handle_signals = handle_signals

        if not isinstance(interfaces, list):
            self.interfaces = [interfaces]
        else:
            self.interfaces = interfaces

        if min_threads is None:
            min_threads = DEFAULTS['MIN_THREADS']

        if max_threads is None:
            max_threads = DEFAULTS['MAX_THREADS']

        if not queue_size:
            if hasattr(socket, 'SOMAXCONN'):
                queue_size = socket.SOMAXCONN
            else:
                queue_size = DEFAULTS['LISTEN_QUEUE_SIZE']

        if max_threads and queue_size > max_threads:
            queue_size = max_threads

        if isinstance(app_info, dict):
            app_info['server_software'] = SERVER_SOFTWARE

        monitor_queue = Queue()
        active_queue = Queue()

        self._monitor = Monitor(monitor_queue, active_queue, timeout)

        self._threadpool = ThreadPool(get_method(method),
                                      app_info = app_info,
                                      active_queue=active_queue,
                                      monitor_queue = monitor_queue,
                                      min_threads=min_threads,
                                      max_threads=max_threads)

        # Build our socket listeners
        self.listeners = [Listener(i, queue_size, active_queue) for i in self.interfaces]
        for ndx in range(len(self.listeners)-1, 0, -1):
            if not self.listeners[ndx].ready:
                del self.listeners[ndx]

        if not self.listeners:
            log.critical("No interfaces to listen on...closing.")
            sys.exit(1)

    def _sigterm(self, signum, frame):
        log.info('Received SIGTERM')
        self.stop()

    def _sighup(self, signum, frame):
        log.info('Received SIGHUP')
        self.restart()

    def start(self):
        log.info('Starting %s' % SERVER_SOFTWARE)

        # Set up our shutdown signals
        if self.handle_signals:
            try:
                import signal
                signal.signal(signal.SIGTERM, self._sigterm)
                signal.signal(signal.SIGUSR1, self._sighup)
            except:
                log.debug('This platform does not support signals.')

        # Start our worker threads
        self._threadpool.start()

        # Start our monitor thread
        self._monitor.setDaemon(True)
        self._monitor.start()

        # I know that EXPR and A or B is bad but I'm keeping it for Py2.4
        # compatibility.
        str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '')

        msg = 'Listening on sockets: '
        msg += ', '.join(['%s:%i%s' % str_extract(l) for l in self.listeners])
        log.info(msg)

        for l in self.listeners:
            l.start()

        tp = self._threadpool
        dynamic_resize = tp.dynamic_resize

        while not tp.stop_server:
            try:
                dynamic_resize()
                time.sleep(THREAD_STOP_CHECK_INTERVAL)
            except KeyboardInterrupt:
                # Capture a keyboard interrupt when running from a console
                break
            except:
                if not tp.stop_server:
                    log.error(str(traceback.format_exc()))
                    continue

        return self.stop()

    def stop(self, stoplogging = True):
        log.info("Stopping Server")

        # Stop listeners
        for l in self.listeners:
            l.ready = False
            if l.isAlive():
                l.join()

        # Stop Worker threads
        self._threadpool.stop()

        # Stop Monitor
        self._monitor.stop()
        if self._monitor.isAlive():
            self._monitor.join()

        if stoplogging:
            logging.shutdown()

    def restart(self):
        self.stop(False)
        self.start()

def CherryPyWSGIServer(bind_addr,
                       wsgi_app,
                       numthreads = 10,
                       server_name = None,
                       max = -1,
                       request_queue_size = 5,
                       timeout = 10,
                       shutdown_timeout = 5):
    """ A Cherrypy wsgiserver-compatible wrapper. """
    max_threads = max
    if max_threads < 0:
        max_threads = 0
    return Rocket(bind_addr, 'wsgi', {'wsgi_app': wsgi_app},
                  min_threads = numthreads,
                  max_threads = max_threads,
                  queue_size = request_queue_size,
                  timeout = timeout)

# Monolithic build...end of module: rocket\main.py
# Monolithic build...start of module: rocket\monitor.py

# Import System Modules
import time
import logging
import select
from threading import Thread

# Import Package Modules
# package imports removed in monolithic build

class Monitor(Thread):
    # Monitor worker class.

    def __init__(self,
                 monitor_queue,
                 active_queue,
                 timeout,
                 *args,
                 **kwargs):

        Thread.__init__(self, *args, **kwargs)

        # Instance Variables
        self.monitor_queue = monitor_queue
        self.active_queue = active_queue
        self.timeout = timeout

        self.connections = set()
        self.active = False

    def run(self):
        self.name = self.getName()
        self.log = logging.getLogger('Rocket.Monitor')
        self.log.addHandler(NullHandler())

        self.active = True
        conn_list = list()
        list_changed = False

        if __debug__:
            self.log.debug('Entering monitor loop.')

        # Enter thread main loop
        while self.active:
            # Move the queued connections to the selection pool
            while not self.monitor_queue.empty() or not len(self.connections):
                if __debug__:
                    self.log.debug('In "receive timed-out connections" loop.')

                c = self.monitor_queue.get()

                if c is None:
                    # A non-client is a signal to die
                    if __debug__:
                        self.log.debug('Received a death threat.')
                    return

                self.log.debug('Received a timed out connection.')

                if __debug__:
                    assert(c not in self.connections)

                if IS_JYTHON:
                    # Jython requires a socket to be in Non-blocking mode in
                    # order to select on it.
                    c.setblocking(False)

                if __debug__:
                    self.log.debug('Adding connection to monitor list.')

                self.connections.add(c)
                list_changed = True

            # Wait on those connections
            self.log.debug('Blocking on connections')
            if list_changed:
                conn_list = list(self.connections)
                list_changed = False

            try:
                readable = select.select(conn_list,
                                         [],
                                         [],
                                         THREAD_STOP_CHECK_INTERVAL)[0]
            except:
                if self.active:
                    raise
                else:
                    break

            # If we have any readable connections, put them back
            for r in readable:
                if __debug__:
                    self.log.debug('Restoring readable connection')

                if IS_JYTHON:
                    # Jython requires a socket to be in Non-blocking mode in
                    # order to select on it, but the rest of the code requires
                    # that it be in blocking mode.
                    r.setblocking(True)

                r.start_time = time.time()
                self.active_queue.put(r)

                self.connections.remove(r)
                list_changed = True

            # If we have any stale connections, kill them off.
            if self.timeout:
                now = time.time()
                stale = set()
                for c in self.connections:
                    if (now - c.start_time) >= self.timeout:
                        stale.add(c)

                for c in stale:
                    if __debug__:
                        # "EXPR and A or B" kept for Py2.4 compatibility
                        data = (c.client_addr, c.server_port, c.ssl and '*' or '')
                        self.log.debug('Flushing stale connection: %s:%i%s' % data)

                    self.connections.remove(c)
                    list_changed = True

                    try:
                        c.close()
                    finally:
                        del c

    def stop(self):
        self.active = False

        if __debug__:
            self.log.debug('Flushing waiting connections')

        for c in self.connections:
            try:
                c.close()
            finally:
                del c

        if __debug__:
            self.log.debug('Flushing queued connections')

        while not self.monitor_queue.empty():
            c = self.monitor_queue.get()

            if c is None:
                continue

            try:
                c.close()
            finally:
                del c

        # Place a None sentry value to cause the monitor to die.
        self.monitor_queue.put(None)

# Monolithic build...end of module: rocket\monitor.py
# Monolithic build...start of module: rocket\threadpool.py

# Import System Modules
import logging
# Import Package Modules
# package imports removed in monolithic build

# Setup Logging
log = logging.getLogger('Rocket.Errors.ThreadPool')
log.addHandler(NullHandler())

class ThreadPool:
    """The ThreadPool class is a container class for all the worker threads. It
    manages the number of actively running threads."""

    def __init__(self,
                 method,
                 app_info,
                 active_queue,
                 monitor_queue,
                 min_threads=DEFAULTS['MIN_THREADS'],
                 max_threads=DEFAULTS['MAX_THREADS'],
                 ):

        if __debug__:
            log.debug("Initializing ThreadPool.")

        self.check_for_dead_threads = 0
        self.active_queue = active_queue

        self.worker_class = method
        self.min_threads = min_threads
        self.max_threads = max_threads
        self.monitor_queue = monitor_queue
        self.stop_server = False

        # TODO - Optimize this based on some real-world usage data
        self.grow_threshold = int(max_threads/10) + 2

        if not isinstance(app_info, dict):
            app_info = dict()

        app_info.update(max_threads=max_threads,
                        min_threads=min_threads)

        self.app_info = app_info

        self.threads = set()
        for x in range(min_threads):
            worker = self.worker_class(app_info,
                                       self.active_queue,
                                       self.monitor_queue)
            self.threads.add(worker)

    def start(self):
        self.stop_server = False
        if __debug__:
            log.debug("Starting threads.")

        for thread in self.threads:
            thread.setDaemon(True)
            thread.start()

    def stop(self):
        if __debug__:
            log.debug("Stopping threads.")

        self.stop_server = True

        # Prompt the threads to die
        for t in self.threads:
            self.active_queue.put(None)

        # Give them the gun
        for t in self.threads:
            t.kill()

        # Wait until they pull the trigger
        for t in self.threads:
            t.join()

        # Clean up the mess
        self.bring_out_your_dead()

    def bring_out_your_dead(self):
        # Remove dead threads from the pool

        dead_threads = [t for t in self.threads if not t.isAlive()]
        for t in dead_threads:
            if __debug__:
                log.debug("Removing dead thread: %s." % t.getName())
            try:
                # Py2.4 complains here so we put it in a try block
                self.threads.remove(t)
            except:
                pass
        self.check_for_dead_threads -= len(dead_threads)

    def grow(self, amount=None):
        if self.stop_server:
            return

        if not amount:
            amount = self.max_threads

        amount = min([amount, self.max_threads - len(self.threads)])

        if __debug__:
            log.debug("Growing by %i." % amount)

        for x in range(amount):
            worker = self.worker_class(self.app_info,
                                       self.active_queue,
                                       self.monitor_queue)

            worker.setDaemon(True)
            self.threads.add(worker)
            worker.start()

    def shrink(self, amount=1):
        if __debug__:
            log.debug("Shrinking by %i." % amount)

        self.check_for_dead_threads += amount

        for x in range(amount):
            self.active_queue.put(None)

    def dynamic_resize(self):
        if (self.max_threads > self.min_threads or self.max_threads == 0):
            if self.check_for_dead_threads > 0:
                self.bring_out_your_dead()

            queueSize = self.active_queue.qsize()
            threadCount = len(self.threads)

            if __debug__:
                log.debug("Examining ThreadPool. %i threads and %i Q'd conxions"
                          % (threadCount, queueSize))

            if queueSize == 0 and threadCount > self.min_threads:
                self.shrink()

            elif queueSize > self.grow_threshold:

                self.grow(queueSize)

# Monolithic build...end of module: rocket\threadpool.py
# Monolithic build...start of module: rocket\worker.py

# Import System Modules
import re
import sys
import socket
import logging
import traceback
#from wsgiref.headers import Headers
from threading import Thread
from datetime import datetime

try:
    from urllib import unquote
except ImportError:
    from urllib.parse import unquote

try:
    from io import StringIO
except ImportError:
    try:
        from cStringIO import StringIO
    except ImportError:
        from StringIO import StringIO

try:
    from ssl import SSLError
except ImportError:
    class SSLError(socket.error):
        pass
# Import Package Modules
# package imports removed in monolithic build


# Define Constants
re_SLASH = re.compile('%2F', re.IGNORECASE)
re_REQUEST_LINE = re.compile(r"""^
(?P<method>OPTIONS|GET|HEAD|POST|PUT|DELETE|TRACE|CONNECT)   # Request Method
\                                                            # (single space)
(
    (?P<scheme>[^:/]+)                                       # Scheme
    (://)  #
    (?P<host>[^/]+)                                          # Host
)? #
(?P<path>(\*|/[^ \?]*))                                      # Path
(\? (?P<query_string>[^ ]+))?                                # Query String
\                                                            # (single space)
(?P<protocol>HTTPS?/1\.[01])                                 # Protocol
$
""", re.X)
LOG_LINE = '%(client_ip)s - "%(request_line)s" - %(status)s %(size)s'
RESPONSE = '''\
HTTP/1.1 %s
Content-Length: %i
Content-Type: %s

%s
'''
if IS_JYTHON:
    HTTP_METHODS = set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT'])

###
# The Headers and FileWrapper classes are ripped straight from the Python
# Standard Library. I've removed some docstrings and integrated my BUF_SIZE.
# See the Python License here: http://docs.python.org/license.html
###

# Regular expression that matches `special' characters in parameters, the
# existance of which force quoting of the parameter value.
import re
_tspecials = re.compile(r'[ \(\)<>@,;:\\"/\[\]\?=]')

def _formatparam(param, value=None, quote=1):
    """Convenience function to format and return a key=value pair.

    This will quote the value if needed or if quote is true.
    """
    if value is not None and len(value) > 0:
        if quote or _tspecials.search(value):
            value = value.replace('\\', '\\\\').replace('"', r'\"')
            return '%s="%s"' % (param, value)
        else:
            return '%s=%s' % (param, value)
    else:
        return param

class Headers:
    def __init__(self,headers):
        if type(headers) is not type([]):
            raise TypeError("Headers must be a list of name/value tuples")
        self._headers = headers

    def __len__(self):
        return len(self._headers)

    def __setitem__(self, name, val):
        del self[name]
        self._headers.append((name, val))

    def __delitem__(self,name):
        name = name.lower()
        self._headers[:] = [kv for kv in self._headers if kv[0].lower() != name]

    def __getitem__(self,name):
        return self.get(name)

    def has_key(self, name):
        return self.get(name) is not None

    __contains__ = has_key

    def get_all(self, name):
        name = name.lower()
        return [kv[1] for kv in self._headers if kv[0].lower()==name]

    def get(self,name,default=None):
        name = name.lower()
        for k,v in self._headers:
            if k.lower()==name:
                return v
        return default

    def keys(self):
        return [k for k, v in self._headers]

    def values(self):
        return [v for k, v in self._headers]

    def items(self):
        return self._headers[:]

    def __repr__(self):
        return "Headers(%r)" % self._headers

    def __str__(self):
        return '\r\n'.join(["%s: %s" % kv for kv in self._headers]+['',''])

    def setdefault(self,name,value):
        result = self.get(name)
        if result is None:
            self._headers.append((name,value))
            return value
        else:
            return result


    def add_header(self, _name, _value, **_params):
        parts = []
        if _value is not None:
            parts.append(_value)
        for k, v in _params.items():
            if v is None:
                parts.append(k.replace('_', '-'))
            else:
                parts.append(_formatparam(k.replace('_', '-'), v))
        self._headers.append((_name, "; ".join(parts)))

class FileWrapper:
    """Wrapper to convert file-like objects to iterables"""

    def __init__(self, filelike, blksize=BUF_SIZE):
        self.filelike = filelike
        self.blksize = blksize
        if hasattr(filelike,'close'):
            self.close = filelike.close

    def __getitem__(self,key):
        data = self.filelike.read(self.blksize)
        if data:
            return data
        raise IndexError

    def __iter__(self):
        return self

    def next(self):
        data = self.filelike.read(self.blksize)
        if data:
            return data
        raise StopIteration

class Worker(Thread):
    """The Worker class is a base class responsible for receiving connections
    and (a subclass) will run an application to process the the connection """

    def __init__(self,
                 app_info,
                 active_queue,
                 monitor_queue,
                 *args,
                 **kwargs):

        Thread.__init__(self, *args, **kwargs)

        # Instance Variables
        self.app_info = app_info
        self.active_queue = active_queue
        self.monitor_queue = monitor_queue

        self.size = 0
        self.status = "200 OK"
        self.closeConnection = True

        # Request Log
        self.req_log = logging.getLogger('Rocket.Requests')
        self.req_log.addHandler(NullHandler())

        # Error Log
        self.err_log = logging.getLogger('Rocket.Errors.'+self.getName())
        self.err_log.addHandler(NullHandler())

    def _handleError(self, typ, val, tb):
        if typ == SSLError:
            if 'timed out' in val.args[0]:
                typ = SocketTimeout
        if typ == SocketTimeout:
            if __debug__:
                self.err_log.debug('Socket timed out')
            self.monitor_queue.put(self.conn)
            return True
        if typ == SocketClosed:
            self.closeConnection = True
            if __debug__:
                self.err_log.debug('Client closed socket')
            return False
        if typ == BadRequest:
            self.closeConnection = True
            if __debug__:
                self.err_log.debug('Client sent a bad request')
            return True
        if typ == socket.error:
            self.closeConnection = True
            if val.args[0] in IGNORE_ERRORS_ON_CLOSE:
                if __debug__:
                    self.err_log.debug('Ignorable socket Error received...'
                                       'closing connection.')
                return False
            else:
                self.status = "999 Utter Server Failure"
                tb_fmt = traceback.format_exception(typ, val, tb)
                self.err_log.error('Unhandled Error when serving '
                                   'connection:\n' + '\n'.join(tb_fmt))
                return False

        self.closeConnection = True
        tb_fmt = traceback.format_exception(typ, val, tb)
        self.err_log.error('\n'.join(tb_fmt))
        self.send_response('500 Server Error')
        return False

    def run(self):
        if __debug__:
            self.err_log.debug('Entering main loop.')

        # Enter thread main loop
        while True:
            conn = self.active_queue.get()

            if not conn:
                # A non-client is a signal to die
                if __debug__:
                    self.err_log.debug('Received a death threat.')
                return conn

            if isinstance(conn, tuple):
                conn = Connection(*conn)

            self.conn = conn

            if conn.ssl != conn.secure:
                self.err_log.info('Received HTTP connection on HTTPS port.')
                self.send_response('400 Bad Request')
                self.closeConnection = True
                conn.close()
                continue
            else:
                if __debug__:
                    self.err_log.debug('Received a connection.')
                self.closeConnection = False

            # Enter connection serve loop
            while True:
                if __debug__:
                    self.err_log.debug('Serving a request')
                try:
                    self.run_app(conn)
                    log_info = dict(client_ip = conn.client_addr,
                                    time = datetime.now().strftime('%c'),
                                    status = self.status.split(' ')[0],
                                    size = self.size,
                                    request_line = self.request_line)
                    self.req_log.info(LOG_LINE % log_info)
                except:
                    exc = sys.exc_info()
                    handled = self._handleError(*exc)
                    if handled:
                        break
                    else:
                        if self.request_line:
                            log_info = dict(client_ip = conn.client_addr,
                                            time = datetime.now().strftime('%c'),
                                            status = self.status.split(' ')[0],
                                            size = self.size,
                                            request_line = self.request_line + ' - not stopping')
                            self.req_log.info(LOG_LINE % log_info)

                if self.closeConnection:
                    try:
                        conn.close()
                    except:
                        self.err_log.error(str(traceback.format_exc()))

                    break

    def run_app(self, conn):
        # Must be overridden with a method reads the request from the socket
        # and sends a response.
        self.closeConnection = True
        raise NotImplementedError('Overload this method!')

    def send_response(self, status):
        stat_msg = status.split(' ', 1)[1]
        msg = RESPONSE % (status,
                          len(stat_msg),
                          'text/plain',
                          stat_msg)
        try:
            self.conn.sendall(b(msg))
        except socket.error:
            self.closeConnection = True
            self.err_log.error('Tried to send "%s" to client but received socket'
                           ' error' % status)

    def kill(self):
        if self.isAlive() and hasattr(self, 'conn'):
            try:
                self.conn.shutdown(socket.SHUT_RDWR)
            except socket.error:
                info = sys.exc_info()
                if info[1].args[0] != socket.EBADF:
                    self.err_log.debug('Error on shutdown: '+str(info))

    def read_request_line(self, sock_file):
        self.request_line = ''
        try:
            # Grab the request line
            d = sock_file.readline()
            if PY3K:
                d = d.decode('ISO-8859-1')

            if d == '\r\n':
                # Allow an extra NEWLINE at the beginning per HTTP 1.1 spec
                if __debug__:
                    self.err_log.debug('Client sent newline')

                d = sock_file.readline()
                if PY3K:
                    d = d.decode('ISO-8859-1')
        except socket.timeout:
            raise SocketTimeout("Socket timed out before request.")

        d = d.strip()

        if not d:
            if __debug__:
                self.err_log.debug('Client did not send a recognizable request.')
            raise SocketClosed('Client closed socket.')

        self.request_line = d

        # NOTE: I've replaced the traditional method of procedurally breaking
        # apart the request line with a (rather unsightly) regular expression.
        # However, Java's regexp support sucks so bad that it actually takes
        # longer in Jython to process the regexp than procedurally. So I've
        # left the old code here for Jython's sake...for now.
        if IS_JYTHON:
            return self._read_request_line_jython(d)

        match = re_REQUEST_LINE.match(d)

        if not match:
            self.send_response('400 Bad Request')
            raise BadRequest

        req = match.groupdict()
        for k,v in req.items():
            if not v:
                req[k] = ""
            if k == 'path':
                req['path'] = r'%2F'.join([unquote(x) for x in re_SLASH.split(v)])

        return req

    def _read_request_line_jython(self, d):
        d = d.strip()
        try:
            method, uri, proto = d.split(' ')
            if not proto.startswith('HTTP') or \
               proto[-3:] not in ('1.0', '1.1') or \
               method not in HTTP_METHODS:
                self.send_response('400 Bad Request')
                raise BadRequest
        except ValueError:
            self.send_response('400 Bad Request')
            raise BadRequest

        req = dict(method=method, protocol = proto)
        scheme = ''
        host = ''
        if uri == '*' or uri.startswith('/'):
            path = uri
        elif '://' in uri:
            scheme, rest = uri.split('://')
            host, path = rest.split('/', 1)
            path = '/' + path
        else:
            self.send_response('400 Bad Request')
            raise BadRequest

        query_string = ''
        if '?' in path:
            path, query_string = path.split('?', 1)

        path = r'%2F'.join([unquote(x) for x in re_SLASH.split(path)])

        req.update(path=path,
                   query_string=query_string,
                   scheme=scheme.lower(),
                   host=host)
        return req


    def read_headers(self, sock_file):
        headers = dict()
        l = sock_file.readline()

        lname = None
        lval = None
        while True:
            if PY3K:
                try:
                    l = str(l, 'ISO-8859-1')
                except UnicodeDecodeError:
                    self.err_log.warning('Client sent invalid header: ' + repr(l))

            if l == '\r\n':
                break

            if l[0] in ' \t' and lname:
                # Some headers take more than one line
                lval += ',' + l.strip()
            else:
                # HTTP header values are latin-1 encoded
                l = l.split(':', 1)
                # HTTP header names are us-ascii encoded

                lname = l[0].strip().upper().replace('-', '_')
                lval = l[-1].strip()
            headers[str(lname)] = str(lval)

            l = sock_file.readline()
        return headers

class SocketTimeout(Exception):
    "Exception for when a socket times out between requests."
    pass

class BadRequest(Exception):
    "Exception for when a client sends an incomprehensible request."
    pass

class SocketClosed(Exception):
    "Exception for when a socket is closed by the client."
    pass

class ChunkedReader(object):
    def __init__(self, sock_file):
        self.stream = sock_file
        self.chunk_size = 0

    def _read_header(self):
        chunk_len = ""
        try:
            while "" == chunk_len:
                chunk_len = self.stream.readline().strip()
            return int(chunk_len, 16)
        except ValueError:
            return 0

    def read(self, size):
        data = b('')
        chunk_size = self.chunk_size
        while size:
            if not chunk_size:
                chunk_size = self._read_header()

            if size < chunk_size:
                data += self.stream.read(size)
                chunk_size -= size
                break
            else:
                if not chunk_size:
                    break
                data += self.stream.read(chunk_size)
                size -= chunk_size
                chunk_size = 0

        self.chunk_size = chunk_size
        return data

    def readline(self):
        data = b('')
        c = self.read(1)
        while c and c != b('\n'):
            data += c
            c = self.read(1)
        data += c
        return data

    def readlines(self):
        yield self.readline()

def get_method(method):

    methods = dict(wsgi=WSGIWorker)
    return methods[method.lower()]

# Monolithic build...end of module: rocket\worker.py
# Monolithic build...start of module: rocket\methods\__init__.py

# Monolithic build...end of module: rocket\methods\__init__.py
# Monolithic build...start of module: rocket\methods\wsgi.py

# Import System Modules
import sys
import socket
#from wsgiref.headers import Headers
#from wsgiref.util import FileWrapper
# Import Package Modules
# package imports removed in monolithic build


if PY3K:
    from email.utils import formatdate
else:
    # Caps Utils for Py2.4 compatibility
    from email.Utils import formatdate

# Define Constants
NEWLINE = b('\r\n')
HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s'''
BASE_ENV = {'SERVER_NAME': SERVER_NAME,
            'SCRIPT_NAME': '',  # Direct call WSGI does not need a name
            'wsgi.errors': sys.stderr,
            'wsgi.version': (1, 0),
            'wsgi.multiprocess': False,
            'wsgi.run_once': False,
            'wsgi.file_wrapper': FileWrapper
            }

class WSGIWorker(Worker):
    def __init__(self, *args, **kwargs):
        """Builds some instance variables that will last the life of the
        thread."""
        Worker.__init__(self, *args, **kwargs)

        if isinstance(self.app_info, dict):
            multithreaded = self.app_info.get('max_threads') != 1
        else:
            multithreaded = False
        self.base_environ = dict({'SERVER_SOFTWARE': self.app_info['server_software'],
                                  'wsgi.multithread': multithreaded,
                                  })
        self.base_environ.update(BASE_ENV)
        # Grab our application
        self.app = self.app_info.get('wsgi_app')

        if not hasattr(self.app, "__call__"):
            raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app))


    def build_environ(self, sock_file, conn):
        """ Build the execution environment. """
        # Grab the request line
        request = self.read_request_line(sock_file)

        # Copy the Base Environment
        environ = self.base_environ.copy()

        # Grab the headers
        for k, v in self.read_headers(sock_file).items():
            environ[str('HTTP_'+k)] = v

        # Add CGI Variables
        environ['REQUEST_METHOD'] = request['method']
        environ['PATH_INFO'] = request['path']
        environ['SERVER_PROTOCOL'] = request['protocol']
        environ['SERVER_PORT'] = str(conn.server_port)
        environ['REMOTE_PORT'] = str(conn.client_port)
        environ['REMOTE_ADDR'] = str(conn.client_addr)
        environ['QUERY_STRING'] = request['query_string']
        if 'HTTP_CONTENT_LENGTH' in environ:
            environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
        if 'HTTP_CONTENT_TYPE' in environ:
            environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']

        # Save the request method for later
        self.request_method = environ['REQUEST_METHOD']

        # Add Dynamic WSGI Variables
        if conn.ssl:
            environ['wsgi.url_scheme'] = 'https'
            environ['HTTPS'] = 'on'
        else:
            environ['wsgi.url_scheme'] = 'http'

        if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
            environ['wsgi.input'] = ChunkedReader(sock_file)
        else:
            environ['wsgi.input'] = sock_file

        return environ

    def send_headers(self, data, sections):
        h_set = self.header_set

        # Does the app want us to send output chunked?
        self.chunked = h_set.get('transfer-encoding', '').lower() == 'chunked'

        # Add a Date header if it's not there already
        if not 'date' in h_set:
            h_set['Date'] = formatdate(usegmt=True)

        # Add a Server header if it's not there already
        if not 'server' in h_set:
            h_set['Server'] = HTTP_SERVER_SOFTWARE

        if 'content-length' in h_set:
            self.size = int(h_set['content-length'])
        else:
            s = int(self.status.split(' ')[0])
            if s < 200 or s not in (204, 205, 304):
                if not self.chunked:
                    if sections == 1:
                        # Add a Content-Length header if it's not there already
                        h_set['Content-Length'] = str(len(data))
                        self.size = len(data)
                    else:
                        # If they sent us more than one section, we blow chunks
                        h_set['Transfer-Encoding'] = 'Chunked'
                        self.chunked = True
                        if __debug__:
                            self.err_log.debug('Adding header...'
                                               'Transfer-Encoding: Chunked')

        if 'connection' not in h_set:
            # If the application did not provide a connection header, fill it in
            client_conn = self.environ.get('HTTP_CONNECTION', '').lower()
            if self.environ['SERVER_PROTOCOL'] == 'HTTP/1.1':
                # HTTP = 1.1 defaults to keep-alive connections
                if client_conn:
                    h_set['Connection'] = client_conn
                else:
                    h_set['Connection'] = 'keep-alive'
            else:
                # HTTP < 1.1 supports keep-alive but it's quirky so we don't support it
                h_set['Connection'] = 'close'

        # Close our connection if we need to.
        self.closeConnection = h_set.get('connection', '').lower() == 'close'

        # Build our output headers
        header_data = HEADER_RESPONSE % (self.status, str(h_set))

        # Send the headers
        if __debug__:
            self.err_log.debug('Sending Headers: %s' % repr(header_data))
        self.conn.sendall(b(header_data))
        self.headers_sent = True

    def write_warning(self, data, sections=None):
        self.err_log.warning('WSGI app called write method directly.  This is '
                             'deprecated behavior.  Please update your app.')
        return self.write(data, sections)

    def write(self, data, sections=None):
        """ Write the data to the output socket. """

        if self.error[0]:
            self.status = self.error[0]
            data = b(self.error[1])

        if not self.headers_sent:
            self.send_headers(data, sections)

        if self.request_method != 'HEAD':
            try:
                if self.chunked:
                    self.conn.sendall(b('%x\r\n%s\r\n' % (len(data), data)))
                else:
                    self.conn.sendall(data)
            except socket.error:
                # But some clients will close the connection before that
                # resulting in a socket error.
                self.closeConnection = True

    def start_response(self, status, response_headers, exc_info=None):
        """ Store the HTTP status and headers to be sent when self.write is
        called. """
        if exc_info:
            try:
                if self.headers_sent:
                    # Re-raise original exception if headers sent
                    # because this violates WSGI specification.
                    raise
            finally:
                exc_info = None
        elif self.header_set:
            raise AssertionError("Headers already set!")

        if PY3K and not isinstance(status, str):
            self.status = str(status, 'ISO-8859-1')
        else:
            self.status = status
        # Make sure headers are bytes objects
        try:
            self.header_set = Headers(response_headers)
        except UnicodeDecodeError:
            self.error = ('500 Internal Server Error',
                          'HTTP Headers should be bytes')
            self.err_log.error('Received HTTP Headers from client that contain'
                               ' invalid characters for Latin-1 encoding.')

        return self.write_warning

    def run_app(self, conn):
        self.size = 0
        self.header_set = Headers([])
        self.headers_sent = False
        self.error = (None, None)
        self.chunked = False
        sections = None
        output = None

        if __debug__:
            self.err_log.debug('Getting sock_file')

        # Build our file-like object
        sock_file = conn.makefile('rb',BUF_SIZE)

        try:
            # Read the headers and build our WSGI environment
            self.environ = environ = self.build_environ(sock_file, conn)

            # Handle 100 Continue
            if environ.get('HTTP_EXPECT', '') == '100-continue':
                res = environ['SERVER_PROTOCOL'] + ' 100 Continue\r\n\r\n'
                conn.sendall(b(res))

            # Send it to our WSGI application
            output = self.app(environ, self.start_response)

            if not hasattr(output, '__len__') and not hasattr(output, '__iter__'):
                self.error = ('500 Internal Server Error',
                              'WSGI applications must return a list or '
                              'generator type.')

            if hasattr(output, '__len__'):
                sections = len(output)

            for data in output:
                # Don't send headers until body appears
                if data:
                    self.write(data, sections)

            if self.chunked:
                # If chunked, send our final chunk length
                self.conn.sendall(b('0\r\n\r\n'))
            elif not self.headers_sent:
                # Send headers if the body was empty
                self.send_headers('', sections)

        # Don't capture exceptions here.  The Worker class handles
        # them appropriately.
        finally:
            if __debug__:
                self.err_log.debug('Finally closing output and sock_file')

            if hasattr(output,'close'):
                output.close()

            sock_file.close()

# Monolithic build...end of module: rocket\methods\wsgi.py

#
# the following code is not part of Rocket but was added in web2py for testing purposes
#

def demo_app(environ, start_response):
    global static_folder
    import os
    types = {'htm': 'text/html','html': 'text/html','gif': 'image/gif',
             'jpg': 'image/jpeg','png': 'image/png','pdf': 'applications/pdf'}
    if static_folder:
        if not static_folder.startswith('/'):
            static_folder = os.path.join(os.getcwd(),static_folder)
        path = os.path.join(static_folder, environ['PATH_INFO'][1:] or 'index.html')
        type = types.get(path.split('.')[-1],'text')
        if os.path.exists(path):
            try:
                pathfile = open(path,'rb')
                try:
                    data = pathfile.read()
                finally:
                    pathfile.close()
                start_response('200 OK', [('Content-Type', type)])
            except IOError:
                start_response('404 NOT FOUND', [])
                data = '404 NOT FOUND'
        else:
            start_response('500 INTERNAL SERVER ERROR', [])
            data = '500 INTERNAL SERVER ERROR'
    else:
        start_response('200 OK', [('Content-Type', 'text/html')])
        data = '<html><body><h1>Hello from Rocket Web Server</h1></body></html>'
    return [data]

def demo():
    from optparse import OptionParser
    parser = OptionParser()
    parser.add_option("-i", "--ip", dest="ip",default="127.0.0.1",
                      help="ip address of the network interface")
    parser.add_option("-p", "--port", dest="port",default="8000",
                      help="post where to run web server")
    parser.add_option("-s", "--static", dest="static",default=None,
                      help="folder containing static files")
    (options, args) = parser.parse_args()
    global static_folder
    static_folder = options.static
    print 'Rocket running on %s:%s' % (options.ip, options.port)
    r=Rocket((options.ip,int(options.port)),'wsgi', {'wsgi_app':demo_app})
    r.start()

if __name__=='__main__':
    demo()