# -*- coding: utf-8 -*-
# This file is part of the Rocket Web Server
# Copyright (c) 2010 Timothy Farrell
# Import System Modules
import sys
import errno
import socket
import logging
import platform
# Define Constants
VERSION = '1.2.2'
SERVER_NAME = socket.gethostname()
SERVER_SOFTWARE = 'Rocket %s' % VERSION
HTTP_SERVER_SOFTWARE = '%s Python/%s' % (SERVER_SOFTWARE, sys.version.split(' ')[0])
BUF_SIZE = 16384
SOCKET_TIMEOUT = 1 # in secs
THREAD_STOP_CHECK_INTERVAL = 1 # in secs, How often should threads check for a server stop message?
IS_JYTHON = platform.system() == 'Java' # Handle special cases for Jython
IGNORE_ERRORS_ON_CLOSE = set([errno.ECONNABORTED, errno.ECONNRESET])
DEFAULT_LISTEN_QUEUE_SIZE = 5
DEFAULT_MIN_THREADS = 10
DEFAULT_MAX_THREADS = 0
DEFAULTS = dict(LISTEN_QUEUE_SIZE = DEFAULT_LISTEN_QUEUE_SIZE,
MIN_THREADS = DEFAULT_MIN_THREADS,
MAX_THREADS = DEFAULT_MAX_THREADS)
PY3K = sys.version_info[0] > 2
class NullHandler(logging.Handler):
"A Logging handler to prevent library errors."
def emit(self, record):
pass
if PY3K:
def b(val):
""" Convert string/unicode/bytes literals into bytes. This allows for
the same code to run on Python 2.x and 3.x. """
if isinstance(val, str):
return val.encode()
else:
return val
def u(val, encoding="us-ascii"):
""" Convert bytes into string/unicode. This allows for the
same code to run on Python 2.x and 3.x. """
if isinstance(val, bytes):
return val.decode(encoding)
else:
return val
else:
def b(val):
""" Convert string/unicode/bytes literals into bytes. This allows for
the same code to run on Python 2.x and 3.x. """
if isinstance(val, unicode):
return val.encode()
else:
return val
def u(val, encoding="us-ascii"):
""" Convert bytes into string/unicode. This allows for the
same code to run on Python 2.x and 3.x. """
if isinstance(val, str):
return val.decode(encoding)
else:
return val
# Import Package Modules
# package imports removed in monolithic build
__all__ = ['VERSION', 'SERVER_SOFTWARE', 'HTTP_SERVER_SOFTWARE', 'BUF_SIZE',
'IS_JYTHON', 'IGNORE_ERRORS_ON_CLOSE', 'DEFAULTS', 'PY3K', 'b', 'u',
'Rocket', 'CherryPyWSGIServer', 'SERVER_NAME', 'NullHandler']
# Monolithic build...end of module: rocket\__init__.py
# Monolithic build...start of module: rocket\connection.py
# Import System Modules
import sys
import time
import socket
try:
import ssl
has_ssl = True
except ImportError:
has_ssl = False
# package imports removed in monolithic build
class Connection(object):
__slots__ = [
'setblocking',
'sendall',
'shutdown',
'makefile',
'fileno',
'client_addr',
'client_port',
'server_port',
'socket',
'start_time',
'ssl',
'secure'
]
def __init__(self, sock_tuple, port, secure=False):
self.client_addr, self.client_port = sock_tuple[1]
self.server_port = port
self.socket = sock_tuple[0]
self.start_time = time.time()
self.ssl = has_ssl and isinstance(self.socket, ssl.SSLSocket)
self.secure = secure
if IS_JYTHON:
# In Jython we must set TCP_NODELAY here since it does not
# inherit from the listening socket.
# See: http://bugs.jython.org/issue1309
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.socket.settimeout(SOCKET_TIMEOUT)
self.sendall = self.socket.sendall
self.shutdown = self.socket.shutdown
self.fileno = self.socket.fileno
self.makefile = self.socket.makefile
self.setblocking = self.socket.setblocking
def close(self):
if hasattr(self.socket, '_sock'):
try:
self.socket._sock.close()
except socket.error:
info = sys.exc_info()
if info[1].errno != socket.EBADF:
raise info[1]
else:
pass
self.socket.close()
# Monolithic build...end of module: rocket\connection.py
# Monolithic build...start of module: rocket\listener.py
# Import System Modules
import os
import socket
import logging
import traceback
from threading import Thread
try:
import ssl
from ssl import SSLError
has_ssl = True
except ImportError:
has_ssl = False
class SSLError(socket.error):
pass
# Import Package Modules
# package imports removed in monolithic build
class Listener(Thread):
"""The Listener class is a class responsible for accepting connections
and queuing them to be processed by a worker thread."""
def __init__(self, interface, queue_size, active_queue, *args, **kwargs):
Thread.__init__(self, *args, **kwargs)
# Instance variables
self.active_queue = active_queue
self.interface = interface
self.addr = interface[0]
self.port = interface[1]
self.secure = len(interface) == 4 and \
os.path.exists(interface[2]) and \
os.path.exists(interface[3])
self.ready = False
# Error Log
self.err_log = logging.getLogger('Rocket.Errors.Port%i' % self.port)
self.err_log.addHandler(NullHandler())
# Build the socket
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if not listener:
self.err_log.error("Failed to get socket.")
return
if self.secure:
if not has_ssl:
self.err_log.error("ssl module required to serve HTTPS.")
return
elif not os.path.exists(interface[2]):
data = (interface[2], interface[0], interface[1])
self.err_log.error("Cannot find key file "
"'%s'. Cannot bind to %s:%s" % data)
return
elif not os.path.exists(interface[3]):
data = (interface[3], interface[0], interface[1])
self.err_log.error("Cannot find certificate file "
"'%s'. Cannot bind to %s:%s" % data)
return
# Set socket options
try:
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except:
msg = "Cannot share socket. Using %s:%i exclusively."
self.err_log.warning(msg % (self.addr, self.port))
try:
if not IS_JYTHON:
listener.setsockopt(socket.IPPROTO_TCP,
socket.TCP_NODELAY,
1)
except:
msg = "Cannot set TCP_NODELAY, things might run a little slower"
self.err_log.warning(msg)
try:
listener.bind((self.addr, self.port))
except:
msg = "Socket %s:%i in use by other process and it won't share."
self.err_log.error(msg % (self.addr, self.port))
else:
# We want socket operations to timeout periodically so we can
# check if the server is shutting down
listener.settimeout(THREAD_STOP_CHECK_INTERVAL)
# Listen for new connections allowing queue_size number of
# connections to wait before rejecting a connection.
listener.listen(queue_size)
self.listener = listener
self.ready = True
def wrap_socket(self, sock):
try:
sock = ssl.wrap_socket(sock,
keyfile = self.interface[2],
certfile = self.interface[3],
server_side = True,
ssl_version = ssl.PROTOCOL_SSLv23)
except SSLError:
# Generally this happens when an HTTP request is received on a
# secure socket. We don't do anything because it will be detected
# by Worker and dealt with appropriately.
pass
return sock
def run(self):
if not self.ready:
self.err_log.warning('Listener started when not ready.')
return
if __debug__:
self.err_log.debug('Entering main loop.')
while True:
try:
sock, addr = self.listener.accept()
if self.secure:
sock = self.wrap_socket(sock)
self.active_queue.put(((sock, addr),
self.interface[1],
self.secure))
except socket.timeout:
# socket.timeout will be raised every THREAD_STOP_CHECK_INTERVAL
# seconds. When that happens, we check if it's time to die.
if not self.ready:
if __debug__:
self.err_log.debug('Listener exiting.')
return
else:
continue
except:
self.err_log.error(str(traceback.format_exc()))
# Monolithic build...end of module: rocket\listener.py
# Monolithic build...start of module: rocket\main.py
# Import System Modules
import sys
import time
import socket
import logging
import traceback
try:
from queue import Queue
except ImportError:
from Queue import Queue
# Import Package Modules
# package imports removed in monolithic build
# Setup Logging
log = logging.getLogger('Rocket')
log.addHandler(NullHandler())
class Rocket(object):
"""The Rocket class is responsible for handling threads and accepting and
dispatching connections."""
def __init__(self,
interfaces = ('127.0.0.1', 8000),
method = 'wsgi',
app_info = None,
min_threads = None,
max_threads = None,
queue_size = None,
timeout = 600,
handle_signals = True):
self.handle_signals = handle_signals
if not isinstance(interfaces, list):
self.interfaces = [interfaces]
else:
self.interfaces = interfaces
if min_threads is None:
min_threads = DEFAULTS['MIN_THREADS']
if max_threads is None:
max_threads = DEFAULTS['MAX_THREADS']
if not queue_size:
if hasattr(socket, 'SOMAXCONN'):
queue_size = socket.SOMAXCONN
else:
queue_size = DEFAULTS['LISTEN_QUEUE_SIZE']
if max_threads and queue_size > max_threads:
queue_size = max_threads
if isinstance(app_info, dict):
app_info['server_software'] = SERVER_SOFTWARE
monitor_queue = Queue()
active_queue = Queue()
self._monitor = Monitor(monitor_queue, active_queue, timeout)
self._threadpool = ThreadPool(get_method(method),
app_info = app_info,
active_queue=active_queue,
monitor_queue = monitor_queue,
min_threads=min_threads,
max_threads=max_threads)
# Build our socket listeners
self.listeners = [Listener(i, queue_size, active_queue) for i in self.interfaces]
for ndx in range(len(self.listeners)-1, 0, -1):
if not self.listeners[ndx].ready:
del self.listeners[ndx]
if not self.listeners:
log.critical("No interfaces to listen on...closing.")
sys.exit(1)
def _sigterm(self, signum, frame):
log.info('Received SIGTERM')
self.stop()
def _sighup(self, signum, frame):
log.info('Received SIGHUP')
self.restart()
def start(self):
log.info('Starting %s' % SERVER_SOFTWARE)
# Set up our shutdown signals
if self.handle_signals:
try:
import signal
signal.signal(signal.SIGTERM, self._sigterm)
signal.signal(signal.SIGUSR1, self._sighup)
except:
log.debug('This platform does not support signals.')
# Start our worker threads
self._threadpool.start()
# Start our monitor thread
self._monitor.setDaemon(True)
self._monitor.start()
# I know that EXPR and A or B is bad but I'm keeping it for Py2.4
# compatibility.
str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '')
msg = 'Listening on sockets: '
msg += ', '.join(['%s:%i%s' % str_extract(l) for l in self.listeners])
log.info(msg)
for l in self.listeners:
l.start()
tp = self._threadpool
dynamic_resize = tp.dynamic_resize
while not tp.stop_server:
try:
dynamic_resize()
time.sleep(THREAD_STOP_CHECK_INTERVAL)
except KeyboardInterrupt:
# Capture a keyboard interrupt when running from a console
break
except:
if not tp.stop_server:
log.error(str(traceback.format_exc()))
continue
return self.stop()
def stop(self, stoplogging = True):
log.info("Stopping Server")
# Stop listeners
for l in self.listeners:
l.ready = False
if l.isAlive():
l.join()
# Stop Worker threads
self._threadpool.stop()
# Stop Monitor
self._monitor.stop()
if self._monitor.isAlive():
self._monitor.join()
if stoplogging:
logging.shutdown()
def restart(self):
self.stop(False)
self.start()
def CherryPyWSGIServer(bind_addr,
wsgi_app,
numthreads = 10,
server_name = None,
max = -1,
request_queue_size = 5,
timeout = 10,
shutdown_timeout = 5):
""" A Cherrypy wsgiserver-compatible wrapper. """
max_threads = max
if max_threads < 0:
max_threads = 0
return Rocket(bind_addr, 'wsgi', {'wsgi_app': wsgi_app},
min_threads = numthreads,
max_threads = max_threads,
queue_size = request_queue_size,
timeout = timeout)
# Monolithic build...end of module: rocket\main.py
# Monolithic build...start of module: rocket\monitor.py
# Import System Modules
import time
import logging
import select
from threading import Thread
# Import Package Modules
# package imports removed in monolithic build
class Monitor(Thread):
# Monitor worker class.
def __init__(self,
monitor_queue,
active_queue,
timeout,
*args,
**kwargs):
Thread.__init__(self, *args, **kwargs)
# Instance Variables
self.monitor_queue = monitor_queue
self.active_queue = active_queue
self.timeout = timeout
self.connections = set()
self.active = False
def run(self):
self.name = self.getName()
self.log = logging.getLogger('Rocket.Monitor')
self.log.addHandler(NullHandler())
self.active = True
conn_list = list()
list_changed = False
if __debug__:
self.log.debug('Entering monitor loop.')
# Enter thread main loop
while self.active:
# Move the queued connections to the selection pool
while not self.monitor_queue.empty() or not len(self.connections):
if __debug__:
self.log.debug('In "receive timed-out connections" loop.')
c = self.monitor_queue.get()
if c is None:
# A non-client is a signal to die
if __debug__:
self.log.debug('Received a death threat.')
return
self.log.debug('Received a timed out connection.')
if __debug__:
assert(c not in self.connections)
if IS_JYTHON:
# Jython requires a socket to be in Non-blocking mode in
# order to select on it.
c.setblocking(False)
if __debug__:
self.log.debug('Adding connection to monitor list.')
self.connections.add(c)
list_changed = True
# Wait on those connections
self.log.debug('Blocking on connections')
if list_changed:
conn_list = list(self.connections)
list_changed = False
try:
readable = select.select(conn_list,
[],
[],
THREAD_STOP_CHECK_INTERVAL)[0]
except:
if self.active:
raise
else:
break
# If we have any readable connections, put them back
for r in readable:
if __debug__:
self.log.debug('Restoring readable connection')
if IS_JYTHON:
# Jython requires a socket to be in Non-blocking mode in
# order to select on it, but the rest of the code requires
# that it be in blocking mode.
r.setblocking(True)
r.start_time = time.time()
self.active_queue.put(r)
self.connections.remove(r)
list_changed = True
# If we have any stale connections, kill them off.
if self.timeout:
now = time.time()
stale = set()
for c in self.connections:
if (now - c.start_time) >= self.timeout:
stale.add(c)
for c in stale:
if __debug__:
# "EXPR and A or B" kept for Py2.4 compatibility
data = (c.client_addr, c.server_port, c.ssl and '*' or '')
self.log.debug('Flushing stale connection: %s:%i%s' % data)
self.connections.remove(c)
list_changed = True
try:
c.close()
finally:
del c
def stop(self):
self.active = False
if __debug__:
self.log.debug('Flushing waiting connections')
for c in self.connections:
try:
c.close()
finally:
del c
if __debug__:
self.log.debug('Flushing queued connections')
while not self.monitor_queue.empty():
c = self.monitor_queue.get()
if c is None:
continue
try:
c.close()
finally:
del c
# Place a None sentry value to cause the monitor to die.
self.monitor_queue.put(None)
# Monolithic build...end of module: rocket\monitor.py
# Monolithic build...start of module: rocket\threadpool.py
# Import System Modules
import logging
# Import Package Modules
# package imports removed in monolithic build
# Setup Logging
log = logging.getLogger('Rocket.Errors.ThreadPool')
log.addHandler(NullHandler())
class ThreadPool:
"""The ThreadPool class is a container class for all the worker threads. It
manages the number of actively running threads."""
def __init__(self,
method,
app_info,
active_queue,
monitor_queue,
min_threads=DEFAULTS['MIN_THREADS'],
max_threads=DEFAULTS['MAX_THREADS'],
):
if __debug__:
log.debug("Initializing ThreadPool.")
self.check_for_dead_threads = 0
self.active_queue = active_queue
self.worker_class = method
self.min_threads = min_threads
self.max_threads = max_threads
self.monitor_queue = monitor_queue
self.stop_server = False
# TODO - Optimize this based on some real-world usage data
self.grow_threshold = int(max_threads/10) + 2
if not isinstance(app_info, dict):
app_info = dict()
app_info.update(max_threads=max_threads,
min_threads=min_threads)
self.app_info = app_info
self.threads = set()
for x in range(min_threads):
worker = self.worker_class(app_info,
self.active_queue,
self.monitor_queue)
self.threads.add(worker)
def start(self):
self.stop_server = False
if __debug__:
log.debug("Starting threads.")
for thread in self.threads:
thread.setDaemon(True)
thread.start()
def stop(self):
if __debug__:
log.debug("Stopping threads.")
self.stop_server = True
# Prompt the threads to die
for t in self.threads:
self.active_queue.put(None)
# Give them the gun
for t in self.threads:
t.kill()
# Wait until they pull the trigger
for t in self.threads:
t.join()
# Clean up the mess
self.bring_out_your_dead()
def bring_out_your_dead(self):
# Remove dead threads from the pool
dead_threads = [t for t in self.threads if not t.isAlive()]
for t in dead_threads:
if __debug__:
log.debug("Removing dead thread: %s." % t.getName())
try:
# Py2.4 complains here so we put it in a try block
self.threads.remove(t)
except:
pass
self.check_for_dead_threads -= len(dead_threads)
def grow(self, amount=None):
if self.stop_server:
return
if not amount:
amount = self.max_threads
amount = min([amount, self.max_threads - len(self.threads)])
if __debug__:
log.debug("Growing by %i." % amount)
for x in range(amount):
worker = self.worker_class(self.app_info,
self.active_queue,
self.monitor_queue)
worker.setDaemon(True)
self.threads.add(worker)
worker.start()
def shrink(self, amount=1):
if __debug__:
log.debug("Shrinking by %i." % amount)
self.check_for_dead_threads += amount
for x in range(amount):
self.active_queue.put(None)
def dynamic_resize(self):
if (self.max_threads > self.min_threads or self.max_threads == 0):
if self.check_for_dead_threads > 0:
self.bring_out_your_dead()
queueSize = self.active_queue.qsize()
threadCount = len(self.threads)
if __debug__:
log.debug("Examining ThreadPool. %i threads and %i Q'd conxions"
% (threadCount, queueSize))
if queueSize == 0 and threadCount > self.min_threads:
self.shrink()
elif queueSize > self.grow_threshold:
self.grow(queueSize)
# Monolithic build...end of module: rocket\threadpool.py
# Monolithic build...start of module: rocket\worker.py
# Import System Modules
import re
import sys
import socket
import logging
import traceback
#from wsgiref.headers import Headers
from threading import Thread
from datetime import datetime
try:
from urllib import unquote
except ImportError:
from urllib.parse import unquote
try:
from io import StringIO
except ImportError:
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
try:
from ssl import SSLError
except ImportError:
class SSLError(socket.error):
pass
# Import Package Modules
# package imports removed in monolithic build
# Define Constants
re_SLASH = re.compile('%2F', re.IGNORECASE)
re_REQUEST_LINE = re.compile(r"""^
(?P<method>OPTIONS|GET|HEAD|POST|PUT|DELETE|TRACE|CONNECT) # Request Method
\ # (single space)
(
(?P<scheme>[^:/]+) # Scheme
(://) #
(?P<host>[^/]+) # Host
)? #
(?P<path>(\*|/[^ \?]*)) # Path
(\? (?P<query_string>[^ ]+))? # Query String
\ # (single space)
(?P<protocol>HTTPS?/1\.[01]) # Protocol
$
""", re.X)
LOG_LINE = '%(client_ip)s - "%(request_line)s" - %(status)s %(size)s'
RESPONSE = '''\
HTTP/1.1 %s
Content-Length: %i
Content-Type: %s
%s
'''
if IS_JYTHON:
HTTP_METHODS = set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT'])
###
# The Headers and FileWrapper classes are ripped straight from the Python
# Standard Library. I've removed some docstrings and integrated my BUF_SIZE.
# See the Python License here: http://docs.python.org/license.html
###
# Regular expression that matches `special' characters in parameters, the
# existance of which force quoting of the parameter value.
import re
_tspecials = re.compile(r'[ \(\)<>@,;:\\"/\[\]\?=]')
def _formatparam(param, value=None, quote=1):
"""Convenience function to format and return a key=value pair.
This will quote the value if needed or if quote is true.
"""
if value is not None and len(value) > 0:
if quote or _tspecials.search(value):
value = value.replace('\\', '\\\\').replace('"', r'\"')
return '%s="%s"' % (param, value)
else:
return '%s=%s' % (param, value)
else:
return param
class Headers:
def __init__(self,headers):
if type(headers) is not type([]):
raise TypeError("Headers must be a list of name/value tuples")
self._headers = headers
def __len__(self):
return len(self._headers)
def __setitem__(self, name, val):
del self[name]
self._headers.append((name, val))
def __delitem__(self,name):
name = name.lower()
self._headers[:] = [kv for kv in self._headers if kv[0].lower() != name]
def __getitem__(self,name):
return self.get(name)
def has_key(self, name):
return self.get(name) is not None
__contains__ = has_key
def get_all(self, name):
name = name.lower()
return [kv[1] for kv in self._headers if kv[0].lower()==name]
def get(self,name,default=None):
name = name.lower()
for k,v in self._headers:
if k.lower()==name:
return v
return default
def keys(self):
return [k for k, v in self._headers]
def values(self):
return [v for k, v in self._headers]
def items(self):
return self._headers[:]
def __repr__(self):
return "Headers(%r)" % self._headers
def __str__(self):
return '\r\n'.join(["%s: %s" % kv for kv in self._headers]+['',''])
def setdefault(self,name,value):
result = self.get(name)
if result is None:
self._headers.append((name,value))
return value
else:
return result
def add_header(self, _name, _value, **_params):
parts = []
if _value is not None:
parts.append(_value)
for k, v in _params.items():
if v is None:
parts.append(k.replace('_', '-'))
else:
parts.append(_formatparam(k.replace('_', '-'), v))
self._headers.append((_name, "; ".join(parts)))
class FileWrapper:
"""Wrapper to convert file-like objects to iterables"""
def __init__(self, filelike, blksize=BUF_SIZE):
self.filelike = filelike
self.blksize = blksize
if hasattr(filelike,'close'):
self.close = filelike.close
def __getitem__(self,key):
data = self.filelike.read(self.blksize)
if data:
return data
raise IndexError
def __iter__(self):
return self
def next(self):
data = self.filelike.read(self.blksize)
if data:
return data
raise StopIteration
class Worker(Thread):
"""The Worker class is a base class responsible for receiving connections
and (a subclass) will run an application to process the the connection """
def __init__(self,
app_info,
active_queue,
monitor_queue,
*args,
**kwargs):
Thread.__init__(self, *args, **kwargs)
# Instance Variables
self.app_info = app_info
self.active_queue = active_queue
self.monitor_queue = monitor_queue
self.size = 0
self.status = "200 OK"
self.closeConnection = True
# Request Log
self.req_log = logging.getLogger('Rocket.Requests')
self.req_log.addHandler(NullHandler())
# Error Log
self.err_log = logging.getLogger('Rocket.Errors.'+self.getName())
self.err_log.addHandler(NullHandler())
def _handleError(self, typ, val, tb):
if typ == SSLError:
if 'timed out' in val.args[0]:
typ = SocketTimeout
if typ == SocketTimeout:
if __debug__:
self.err_log.debug('Socket timed out')
self.monitor_queue.put(self.conn)
return True
if typ == SocketClosed:
self.closeConnection = True
if __debug__:
self.err_log.debug('Client closed socket')
return False
if typ == BadRequest:
self.closeConnection = True
if __debug__:
self.err_log.debug('Client sent a bad request')
return True
if typ == socket.error:
self.closeConnection = True
if val.args[0] in IGNORE_ERRORS_ON_CLOSE:
if __debug__:
self.err_log.debug('Ignorable socket Error received...'
'closing connection.')
return False
else:
self.status = "999 Utter Server Failure"
tb_fmt = traceback.format_exception(typ, val, tb)
self.err_log.error('Unhandled Error when serving '
'connection:\n' + '\n'.join(tb_fmt))
return False
self.closeConnection = True
tb_fmt = traceback.format_exception(typ, val, tb)
self.err_log.error('\n'.join(tb_fmt))
self.send_response('500 Server Error')
return False
def run(self):
if __debug__:
self.err_log.debug('Entering main loop.')
# Enter thread main loop
while True:
conn = self.active_queue.get()
if not conn:
# A non-client is a signal to die
if __debug__:
self.err_log.debug('Received a death threat.')
return conn
if isinstance(conn, tuple):
conn = Connection(*conn)
self.conn = conn
if conn.ssl != conn.secure:
self.err_log.info('Received HTTP connection on HTTPS port.')
self.send_response('400 Bad Request')
self.closeConnection = True
conn.close()
continue
else:
if __debug__:
self.err_log.debug('Received a connection.')
self.closeConnection = False
# Enter connection serve loop
while True:
if __debug__:
self.err_log.debug('Serving a request')
try:
self.run_app(conn)
log_info = dict(client_ip = conn.client_addr,
time = datetime.now().strftime('%c'),
status = self.status.split(' ')[0],
size = self.size,
request_line = self.request_line)
self.req_log.info(LOG_LINE % log_info)
except:
exc = sys.exc_info()
handled = self._handleError(*exc)
if handled:
break
else:
if self.request_line:
log_info = dict(client_ip = conn.client_addr,
time = datetime.now().strftime('%c'),
status = self.status.split(' ')[0],
size = self.size,
request_line = self.request_line + ' - not stopping')
self.req_log.info(LOG_LINE % log_info)
if self.closeConnection:
try:
conn.close()
except:
self.err_log.error(str(traceback.format_exc()))
break
def run_app(self, conn):
# Must be overridden with a method reads the request from the socket
# and sends a response.
self.closeConnection = True
raise NotImplementedError('Overload this method!')
def send_response(self, status):
stat_msg = status.split(' ', 1)[1]
msg = RESPONSE % (status,
len(stat_msg),
'text/plain',
stat_msg)
try:
self.conn.sendall(b(msg))
except socket.error:
self.closeConnection = True
self.err_log.error('Tried to send "%s" to client but received socket'
' error' % status)
def kill(self):
if self.isAlive() and hasattr(self, 'conn'):
try:
self.conn.shutdown(socket.SHUT_RDWR)
except socket.error:
info = sys.exc_info()
if info[1].args[0] != socket.EBADF:
self.err_log.debug('Error on shutdown: '+str(info))
def read_request_line(self, sock_file):
self.request_line = ''
try:
# Grab the request line
d = sock_file.readline()
if PY3K:
d = d.decode('ISO-8859-1')
if d == '\r\n':
# Allow an extra NEWLINE at the beginning per HTTP 1.1 spec
if __debug__:
self.err_log.debug('Client sent newline')
d = sock_file.readline()
if PY3K:
d = d.decode('ISO-8859-1')
except socket.timeout:
raise SocketTimeout("Socket timed out before request.")
d = d.strip()
if not d:
if __debug__:
self.err_log.debug('Client did not send a recognizable request.')
raise SocketClosed('Client closed socket.')
self.request_line = d
# NOTE: I've replaced the traditional method of procedurally breaking
# apart the request line with a (rather unsightly) regular expression.
# However, Java's regexp support sucks so bad that it actually takes
# longer in Jython to process the regexp than procedurally. So I've
# left the old code here for Jython's sake...for now.
if IS_JYTHON:
return self._read_request_line_jython(d)
match = re_REQUEST_LINE.match(d)
if not match:
self.send_response('400 Bad Request')
raise BadRequest
req = match.groupdict()
for k,v in req.items():
if not v:
req[k] = ""
if k == 'path':
req['path'] = r'%2F'.join([unquote(x) for x in re_SLASH.split(v)])
return req
def _read_request_line_jython(self, d):
d = d.strip()
try:
method, uri, proto = d.split(' ')
if not proto.startswith('HTTP') or \
proto[-3:] not in ('1.0', '1.1') or \
method not in HTTP_METHODS:
self.send_response('400 Bad Request')
raise BadRequest
except ValueError:
self.send_response('400 Bad Request')
raise BadRequest
req = dict(method=method, protocol = proto)
scheme = ''
host = ''
if uri == '*' or uri.startswith('/'):
path = uri
elif '://' in uri:
scheme, rest = uri.split('://')
host, path = rest.split('/', 1)
path = '/' + path
else:
self.send_response('400 Bad Request')
raise BadRequest
query_string = ''
if '?' in path:
path, query_string = path.split('?', 1)
path = r'%2F'.join([unquote(x) for x in re_SLASH.split(path)])
req.update(path=path,
query_string=query_string,
scheme=scheme.lower(),
host=host)
return req
def read_headers(self, sock_file):
headers = dict()
l = sock_file.readline()
lname = None
lval = None
while True:
if PY3K:
try:
l = str(l, 'ISO-8859-1')
except UnicodeDecodeError:
self.err_log.warning('Client sent invalid header: ' + repr(l))
if l == '\r\n':
break
if l[0] in ' \t' and lname:
# Some headers take more than one line
lval += ',' + l.strip()
else:
# HTTP header values are latin-1 encoded
l = l.split(':', 1)
# HTTP header names are us-ascii encoded
lname = l[0].strip().upper().replace('-', '_')
lval = l[-1].strip()
headers[str(lname)] = str(lval)
l = sock_file.readline()
return headers
class SocketTimeout(Exception):
"Exception for when a socket times out between requests."
pass
class BadRequest(Exception):
"Exception for when a client sends an incomprehensible request."
pass
class SocketClosed(Exception):
"Exception for when a socket is closed by the client."
pass
class ChunkedReader(object):
def __init__(self, sock_file):
self.stream = sock_file
self.chunk_size = 0
def _read_header(self):
chunk_len = ""
try:
while "" == chunk_len:
chunk_len = self.stream.readline().strip()
return int(chunk_len, 16)
except ValueError:
return 0
def read(self, size):
data = b('')
chunk_size = self.chunk_size
while size:
if not chunk_size:
chunk_size = self._read_header()
if size < chunk_size:
data += self.stream.read(size)
chunk_size -= size
break
else:
if not chunk_size:
break
data += self.stream.read(chunk_size)
size -= chunk_size
chunk_size = 0
self.chunk_size = chunk_size
return data
def readline(self):
data = b('')
c = self.read(1)
while c and c != b('\n'):
data += c
c = self.read(1)
data += c
return data
def readlines(self):
yield self.readline()
def get_method(method):
methods = dict(wsgi=WSGIWorker)
return methods[method.lower()]
# Monolithic build...end of module: rocket\worker.py
# Monolithic build...start of module: rocket\methods\__init__.py
# Monolithic build...end of module: rocket\methods\__init__.py
# Monolithic build...start of module: rocket\methods\wsgi.py
# Import System Modules
import sys
import socket
#from wsgiref.headers import Headers
#from wsgiref.util import FileWrapper
# Import Package Modules
# package imports removed in monolithic build
if PY3K:
from email.utils import formatdate
else:
# Caps Utils for Py2.4 compatibility
from email.Utils import formatdate
# Define Constants
NEWLINE = b('\r\n')
HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s'''
BASE_ENV = {'SERVER_NAME': SERVER_NAME,
'SCRIPT_NAME': '', # Direct call WSGI does not need a name
'wsgi.errors': sys.stderr,
'wsgi.version': (1, 0),
'wsgi.multiprocess': False,
'wsgi.run_once': False,
'wsgi.file_wrapper': FileWrapper
}
class WSGIWorker(Worker):
def __init__(self, *args, **kwargs):
"""Builds some instance variables that will last the life of the
thread."""
Worker.__init__(self, *args, **kwargs)
if isinstance(self.app_info, dict):
multithreaded = self.app_info.get('max_threads') != 1
else:
multithreaded = False
self.base_environ = dict({'SERVER_SOFTWARE': self.app_info['server_software'],
'wsgi.multithread': multithreaded,
})
self.base_environ.update(BASE_ENV)
# Grab our application
self.app = self.app_info.get('wsgi_app')
if not hasattr(self.app, "__call__"):
raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app))
def build_environ(self, sock_file, conn):
""" Build the execution environment. """
# Grab the request line
request = self.read_request_line(sock_file)
# Copy the Base Environment
environ = self.base_environ.copy()
# Grab the headers
for k, v in self.read_headers(sock_file).items():
environ[str('HTTP_'+k)] = v
# Add CGI Variables
environ['REQUEST_METHOD'] = request['method']
environ['PATH_INFO'] = request['path']
environ['SERVER_PROTOCOL'] = request['protocol']
environ['SERVER_PORT'] = str(conn.server_port)
environ['REMOTE_PORT'] = str(conn.client_port)
environ['REMOTE_ADDR'] = str(conn.client_addr)
environ['QUERY_STRING'] = request['query_string']
if 'HTTP_CONTENT_LENGTH' in environ:
environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
if 'HTTP_CONTENT_TYPE' in environ:
environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
# Save the request method for later
self.request_method = environ['REQUEST_METHOD']
# Add Dynamic WSGI Variables
if conn.ssl:
environ['wsgi.url_scheme'] = 'https'
environ['HTTPS'] = 'on'
else:
environ['wsgi.url_scheme'] = 'http'
if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
environ['wsgi.input'] = ChunkedReader(sock_file)
else:
environ['wsgi.input'] = sock_file
return environ
def send_headers(self, data, sections):
h_set = self.header_set
# Does the app want us to send output chunked?
self.chunked = h_set.get('transfer-encoding', '').lower() == 'chunked'
# Add a Date header if it's not there already
if not 'date' in h_set:
h_set['Date'] = formatdate(usegmt=True)
# Add a Server header if it's not there already
if not 'server' in h_set:
h_set['Server'] = HTTP_SERVER_SOFTWARE
if 'content-length' in h_set:
self.size = int(h_set['content-length'])
else:
s = int(self.status.split(' ')[0])
if s < 200 or s not in (204, 205, 304):
if not self.chunked:
if sections == 1:
# Add a Content-Length header if it's not there already
h_set['Content-Length'] = str(len(data))
self.size = len(data)
else:
# If they sent us more than one section, we blow chunks
h_set['Transfer-Encoding'] = 'Chunked'
self.chunked = True
if __debug__:
self.err_log.debug('Adding header...'
'Transfer-Encoding: Chunked')
if 'connection' not in h_set:
# If the application did not provide a connection header, fill it in
client_conn = self.environ.get('HTTP_CONNECTION', '').lower()
if self.environ['SERVER_PROTOCOL'] == 'HTTP/1.1':
# HTTP = 1.1 defaults to keep-alive connections
if client_conn:
h_set['Connection'] = client_conn
else:
h_set['Connection'] = 'keep-alive'
else:
# HTTP < 1.1 supports keep-alive but it's quirky so we don't support it
h_set['Connection'] = 'close'
# Close our connection if we need to.
self.closeConnection = h_set.get('connection', '').lower() == 'close'
# Build our output headers
header_data = HEADER_RESPONSE % (self.status, str(h_set))
# Send the headers
if __debug__:
self.err_log.debug('Sending Headers: %s' % repr(header_data))
self.conn.sendall(b(header_data))
self.headers_sent = True
def write_warning(self, data, sections=None):
self.err_log.warning('WSGI app called write method directly. This is '
'deprecated behavior. Please update your app.')
return self.write(data, sections)
def write(self, data, sections=None):
""" Write the data to the output socket. """
if self.error[0]:
self.status = self.error[0]
data = b(self.error[1])
if not self.headers_sent:
self.send_headers(data, sections)
if self.request_method != 'HEAD':
try:
if self.chunked:
self.conn.sendall(b('%x\r\n%s\r\n' % (len(data), data)))
else:
self.conn.sendall(data)
except socket.error:
# But some clients will close the connection before that
# resulting in a socket error.
self.closeConnection = True
def start_response(self, status, response_headers, exc_info=None):
""" Store the HTTP status and headers to be sent when self.write is
called. """
if exc_info:
try:
if self.headers_sent:
# Re-raise original exception if headers sent
# because this violates WSGI specification.
raise
finally:
exc_info = None
elif self.header_set:
raise AssertionError("Headers already set!")
if PY3K and not isinstance(status, str):
self.status = str(status, 'ISO-8859-1')
else:
self.status = status
# Make sure headers are bytes objects
try:
self.header_set = Headers(response_headers)
except UnicodeDecodeError:
self.error = ('500 Internal Server Error',
'HTTP Headers should be bytes')
self.err_log.error('Received HTTP Headers from client that contain'
' invalid characters for Latin-1 encoding.')
return self.write_warning
def run_app(self, conn):
self.size = 0
self.header_set = Headers([])
self.headers_sent = False
self.error = (None, None)
self.chunked = False
sections = None
output = None
if __debug__:
self.err_log.debug('Getting sock_file')
# Build our file-like object
sock_file = conn.makefile('rb',BUF_SIZE)
try:
# Read the headers and build our WSGI environment
self.environ = environ = self.build_environ(sock_file, conn)
# Handle 100 Continue
if environ.get('HTTP_EXPECT', '') == '100-continue':
res = environ['SERVER_PROTOCOL'] + ' 100 Continue\r\n\r\n'
conn.sendall(b(res))
# Send it to our WSGI application
output = self.app(environ, self.start_response)
if not hasattr(output, '__len__') and not hasattr(output, '__iter__'):
self.error = ('500 Internal Server Error',
'WSGI applications must return a list or '
'generator type.')
if hasattr(output, '__len__'):
sections = len(output)
for data in output:
# Don't send headers until body appears
if data:
self.write(data, sections)
if self.chunked:
# If chunked, send our final chunk length
self.conn.sendall(b('0\r\n\r\n'))
elif not self.headers_sent:
# Send headers if the body was empty
self.send_headers('', sections)
# Don't capture exceptions here. The Worker class handles
# them appropriately.
finally:
if __debug__:
self.err_log.debug('Finally closing output and sock_file')
if hasattr(output,'close'):
output.close()
sock_file.close()
# Monolithic build...end of module: rocket\methods\wsgi.py
#
# the following code is not part of Rocket but was added in web2py for testing purposes
#
def demo_app(environ, start_response):
global static_folder
import os
types = {'htm': 'text/html','html': 'text/html','gif': 'image/gif',
'jpg': 'image/jpeg','png': 'image/png','pdf': 'applications/pdf'}
if static_folder:
if not static_folder.startswith('/'):
static_folder = os.path.join(os.getcwd(),static_folder)
path = os.path.join(static_folder, environ['PATH_INFO'][1:] or 'index.html')
type = types.get(path.split('.')[-1],'text')
if os.path.exists(path):
try:
pathfile = open(path,'rb')
try:
data = pathfile.read()
finally:
pathfile.close()
start_response('200 OK', [('Content-Type', type)])
except IOError:
start_response('404 NOT FOUND', [])
data = '404 NOT FOUND'
else:
start_response('500 INTERNAL SERVER ERROR', [])
data = '500 INTERNAL SERVER ERROR'
else:
start_response('200 OK', [('Content-Type', 'text/html')])
data = '<html><body><h1>Hello from Rocket Web Server</h1></body></html>'
return [data]
def demo():
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-i", "--ip", dest="ip",default="127.0.0.1",
help="ip address of the network interface")
parser.add_option("-p", "--port", dest="port",default="8000",
help="post where to run web server")
parser.add_option("-s", "--static", dest="static",default=None,
help="folder containing static files")
(options, args) = parser.parse_args()
global static_folder
static_folder = options.static
print 'Rocket running on %s:%s' % (options.ip, options.port)
r=Rocket((options.ip,int(options.port)),'wsgi', {'wsgi_app':demo_app})
r.start()
if __name__=='__main__':
demo()