@@ -1,19 +1,20 @@ # -*- coding: utf-8 -*- # This file is part of the Rocket Web Server -# Copyright (c) 2010 Timothy Farrell +# Copyright (c) 2011 Timothy Farrell # Import System Modules import sys import errno import socket import logging import platform +import traceback # Define Constants -VERSION = '1.2.2' +VERSION = '1.2.4' SERVER_NAME = socket.gethostname() SERVER_SOFTWARE = 'Rocket %s' % VERSION HTTP_SERVER_SOFTWARE = '%s Python/%s' % (SERVER_SOFTWARE, sys.version.split(' ')[0]) BUF_SIZE = 16384 SOCKET_TIMEOUT = 1 # in secs @@ -85,11 +86,14 @@ try: import ssl has_ssl = True except ImportError: has_ssl = False +# Import Package Modules # package imports removed in monolithic build +# TODO - This part is still very experimental. +#from .filelike import FileLikeSocket class Connection(object): __slots__ = [ 'setblocking', 'sendall', @@ -100,11 +104,15 @@ 'client_port', 'server_port', 'socket', 'start_time', 'ssl', - 'secure' + 'secure', + 'recv', + 'send', + 'read', + 'write' ] def __init__(self, sock_tuple, port, secure=False): self.client_addr, self.client_port = sock_tuple[1] self.server_port = port @@ -122,26 +130,267 @@ self.socket.settimeout(SOCKET_TIMEOUT) self.sendall = self.socket.sendall self.shutdown = self.socket.shutdown self.fileno = self.socket.fileno - self.makefile = self.socket.makefile self.setblocking = self.socket.setblocking + self.recv = self.socket.recv + self.send = self.socket.send + self.makefile = self.socket.makefile + +# FIXME - this is not ready for prime-time yet. +# def makefile(self, buf_size=BUF_SIZE): +# return FileLikeSocket(self, buf_size) def close(self): if hasattr(self.socket, '_sock'): try: self.socket._sock.close() except socket.error: info = sys.exc_info() - if info[1].errno != socket.EBADF: + if info[1].args[0] != socket.EBADF: raise info[1] else: pass self.socket.close() + # Monolithic build...end of module: rocket\connection.py +# Monolithic build...start of module: rocket\filelike.py + +# Import System Modules +import socket +try: + from io import StringIO +except ImportError: + try: + from cStringIO import StringIO + except ImportError: + from StringIO import StringIO +# Import Package Modules +# package imports removed in monolithic build + +class FileLikeSocket(object): + def __init__(self, conn, buf_size=BUF_SIZE): + self.conn = conn + self.buf_size = buf_size + self.buffer = StringIO() + self.content_length = None + + if self.conn.socket.gettimeout() == 0.0: + self.read = self.non_blocking_read + else: + self.read = self.blocking_read + + def __iter__(self): + return self + + def recv(self, size): + while True: + try: + return self.conn.recv(size) + except socket.error: + exc = sys.exc_info() + e = exc[1] + # FIXME - Don't raise socket_errors_nonblocking or socket_error_eintr + if (e.args[0] not in set()): + raise + + def next(self): + data = self.readline() + if data == '': + raise StopIteration + return data + + def non_blocking_read(self, size=None): + # Shamelessly adapted from Cherrypy! + bufr = self.buffer + bufr.seek(0, 2) + if size is None: + while True: + data = self.recv(self.buf_size) + if not data: + break + bufr.write(data) + + self.buffer = StringIO() + + return bufr.getvalue() + else: + buf_len = self.buffer.tell() + if buf_len >= size: + bufr.seek(0) + data = bufr.read(size) + self.buffer = StringIO(bufr.read()) + return data + + self.buffer = StringIO() + while True: + remaining = size - buf_len + data = self.recv(remaining) + + if not data: + break + + n = len(data) + if n == size and not buf_len: + return data + + if n == remaining: + bufr.write(data) + del data + break + + bufr.write(data) + buf_len += n + del data + + return bufr.getvalue() + + def blocking_read(self, length=None): + if length is None: + if self.content_length is not None: + length = self.content_length + else: + length = 1 + + try: + data = self.conn.recv(length) + except: + data = b('') + + return data + + def readline(self): + data = b("") + char = self.read(1) + while char != b('\n') and char is not b(''): + line = repr(char) + data += char + char = self.read(1) + data += char + return data + + def readlines(self, hint="ignored"): + return list(self) + + def close(self): + self.conn = None + self.content_length = None + +# Monolithic build...end of module: rocket\filelike.py +# Monolithic build...start of module: rocket\futures.py + +# Import System Modules +import time +try: + from concurrent.futures import Future, ThreadPoolExecutor + from concurrent.futures.thread import _WorkItem + has_futures = True +except ImportError: + has_futures = False + + class Future: + pass + + class ThreadPoolExecutor: + pass + + class _WorkItem: + pass + + +class WSGIFuture(Future): + def __init__(self, f_dict, *args, **kwargs): + Future.__init__(self, *args, **kwargs) + + self.timeout = None + + self._mem_dict = f_dict + self._lifespan = 30 + self._name = None + self._start_time = time.time() + + def set_running_or_notify_cancel(self): + if time.time() - self._start_time >= self._lifespan: + self.cancel() + else: + return super(WSGIFuture, self).set_running_or_notify_cancel() + + + def remember(self, name, lifespan=None): + self._lifespan = lifespan or self._lifespan + + if name in self._mem_dict: + raise NameError('Cannot remember future by name "%s". ' % name + \ + 'A future already exists with that name.' ) + self._name = name + self._mem_dict[name] = self + + return self + + def forget(self): + if self._name in self._mem_dict and self._mem_dict[self._name] is self: + del self._mem_dict[self._name] + self._name = None + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def run(self): + if not self.future.set_running_or_notify_cancel(): + return + + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException: + e = sys.exc_info()[1] + self.future.set_exception(e) + else: + self.future.set_result(result) + +class WSGIExecutor(ThreadPoolExecutor): + multithread = True + multiprocess = False + + def __init__(self, *args, **kwargs): + ThreadPoolExecutor.__init__(self, *args, **kwargs) + + self.futures = dict() + + def submit(self, fn, *args, **kwargs): + if self._shutdown_lock.acquire(): + if self._shutdown: + self._shutdown_lock.release() + raise RuntimeError('Cannot schedule new futures after shutdown') + + f = WSGIFuture(self.futures) + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + self._shutdown_lock.release() + return f + else: + return False + +class FuturesMiddleware(object): + "Futures middleware that adds a Futures Executor to the environment" + def __init__(self, app, threads=5): + self.app = app + self.executor = WSGIExecutor(threads) + + def __call__(self, environ, start_response): + environ["wsgiorg.executor"] = self.executor + environ["wsgiorg.futures"] = self.executor.futures + return self.app(environ, start_response) + +# Monolithic build...end of module: rocket\futures.py # Monolithic build...start of module: rocket\listener.py # Import System Modules import os import socket @@ -170,13 +419,14 @@ # Instance variables self.active_queue = active_queue self.interface = interface self.addr = interface[0] self.port = interface[1] - self.secure = len(interface) == 4 and \ - os.path.exists(interface[2]) and \ - os.path.exists(interface[3]) + self.secure = len(interface) >= 4 + self.clientcert_req = (len(interface) == 5 and interface[4]) + + self.thread = None self.ready = False # Error Log self.err_log = logging.getLogger('Rocket.Errors.Port%i' % self.port) self.err_log.addHandler(NullHandler()) @@ -201,10 +451,16 @@ data = (interface[3], interface[0], interface[1]) self.err_log.error("Cannot find certificate file " "'%s'. Cannot bind to %s:%s" % data) return + if self.clientcert_req and not os.path.exists(interface[4]): + data = (interface[4], interface[0], interface[1]) + self.err_log.error("Cannot find root ca certificate file " + "'%s'. Cannot bind to %s:%s" % data) + return + # Set socket options try: listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except: msg = "Cannot share socket. Using %s:%i exclusively." @@ -236,29 +492,68 @@ self.ready = True def wrap_socket(self, sock): try: - sock = ssl.wrap_socket(sock, - keyfile = self.interface[2], - certfile = self.interface[3], - server_side = True, - ssl_version = ssl.PROTOCOL_SSLv23) + if self.clientcert_req: + ca_certs = self.interface[4] + cert_reqs = ssl.CERT_OPTIONAL + sock = ssl.wrap_socket(sock, + keyfile = self.interface[2], + certfile = self.interface[3], + server_side = True, + cert_reqs = cert_reqs, + ca_certs = ca_certs, + ssl_version = ssl.PROTOCOL_SSLv23) + else: + sock = ssl.wrap_socket(sock, + keyfile = self.interface[2], + certfile = self.interface[3], + server_side = True, + ssl_version = ssl.PROTOCOL_SSLv23) + except SSLError: # Generally this happens when an HTTP request is received on a # secure socket. We don't do anything because it will be detected # by Worker and dealt with appropriately. + self.err_log.error('SSL Error: %s' % traceback.format_exc()) pass return sock - - def run(self): + def start(self): if not self.ready: self.err_log.warning('Listener started when not ready.') return + if self.thread is not None and self.thread.isAlive(): + self.err_log.warning('Listener already running.') + return + + self.thread = Thread(target=self.listen, name="Port" + str(self.port)) + + self.thread.start() + + def isAlive(self): + if self.thread is None: + return False + + return self.thread.isAlive() + + def join(self): + if self.thread is None: + return + + self.ready = False + + self.thread.join() + + del self.thread + self.thread = None + self.ready = True + + def listen(self): if __debug__: self.err_log.debug('Entering main loop.') while True: try: sock, addr = self.listener.accept() @@ -279,11 +574,11 @@ self.err_log.debug('Listener exiting.') return else: continue except: - self.err_log.error(str(traceback.format_exc())) + self.err_log.error(traceback.format_exc()) # Monolithic build...end of module: rocket\listener.py # Monolithic build...start of module: rocket\main.py # Import System Modules @@ -290,11 +585,11 @@ import sys import time import socket import logging import traceback - +from threading import Lock try: from queue import Queue except ImportError: from Queue import Queue @@ -322,10 +617,12 @@ queue_size = None, timeout = 600, handle_signals = True): self.handle_signals = handle_signals + self.startstop_lock = Lock() + self.timeout = timeout if not isinstance(interfaces, list): self.interfaces = [interfaces] else: self.interfaces = interfaces @@ -346,24 +643,22 @@ queue_size = max_threads if isinstance(app_info, dict): app_info['server_software'] = SERVER_SOFTWARE - monitor_queue = Queue() - active_queue = Queue() - - self._monitor = Monitor(monitor_queue, active_queue, timeout) + self.monitor_queue = Queue() + self.active_queue = Queue() self._threadpool = ThreadPool(get_method(method), app_info = app_info, - active_queue=active_queue, - monitor_queue = monitor_queue, - min_threads=min_threads, - max_threads=max_threads) + active_queue = self.active_queue, + monitor_queue = self.monitor_queue, + min_threads = min_threads, + max_threads = max_threads) # Build our socket listeners - self.listeners = [Listener(i, queue_size, active_queue) for i in self.interfaces] + self.listeners = [Listener(i, queue_size, self.active_queue) for i in self.interfaces] for ndx in range(len(self.listeners)-1, 0, -1): if not self.listeners[ndx].ready: del self.listeners[ndx] if not self.listeners: @@ -376,79 +671,107 @@ def _sighup(self, signum, frame): log.info('Received SIGHUP') self.restart() - def start(self): + def start(self, background=False): log.info('Starting %s' % SERVER_SOFTWARE) - # Set up our shutdown signals - if self.handle_signals: - try: - import signal - signal.signal(signal.SIGTERM, self._sigterm) - signal.signal(signal.SIGUSR1, self._sighup) - except: - log.debug('This platform does not support signals.') - - # Start our worker threads - self._threadpool.start() - - # Start our monitor thread - self._monitor.setDaemon(True) - self._monitor.start() - - # I know that EXPR and A or B is bad but I'm keeping it for Py2.4 - # compatibility. - str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '') - - msg = 'Listening on sockets: ' - msg += ', '.join(['%s:%i%s' % str_extract(l) for l in self.listeners]) - log.info(msg) - - for l in self.listeners: - l.start() - - tp = self._threadpool - dynamic_resize = tp.dynamic_resize - - while not tp.stop_server: - try: - dynamic_resize() + self.startstop_lock.acquire() + + try: + # Set up our shutdown signals + if self.handle_signals: + try: + import signal + signal.signal(signal.SIGTERM, self._sigterm) + signal.signal(signal.SIGUSR1, self._sighup) + except: + log.debug('This platform does not support signals.') + + # Start our worker threads + self._threadpool.start() + + # Start our monitor thread + self._monitor = Monitor(self.monitor_queue, + self.active_queue, + self.timeout, + self._threadpool) + self._monitor.setDaemon(True) + self._monitor.start() + + # I know that EXPR and A or B is bad but I'm keeping it for Py2.4 + # compatibility. + str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '') + + msg = 'Listening on sockets: ' + msg += ', '.join(['%s:%i%s' % str_extract(l) for l in self.listeners]) + log.info(msg) + + for l in self.listeners: + l.start() + + finally: + self.startstop_lock.release() + + if background: + return + + while self._monitor.isAlive(): + try: time.sleep(THREAD_STOP_CHECK_INTERVAL) except KeyboardInterrupt: # Capture a keyboard interrupt when running from a console break except: - if not tp.stop_server: - log.error(str(traceback.format_exc())) + if self._monitor.isAlive(): + log.error(traceback.format_exc()) continue return self.stop() - def stop(self, stoplogging = True): - log.info("Stopping Server") - - # Stop listeners - for l in self.listeners: - l.ready = False - if l.isAlive(): - l.join() - - # Stop Worker threads - self._threadpool.stop() - - # Stop Monitor - self._monitor.stop() - if self._monitor.isAlive(): - self._monitor.join() - - if stoplogging: - logging.shutdown() + def stop(self, stoplogging = False): + log.info('Stopping %s' % SERVER_SOFTWARE) + + self.startstop_lock.acquire() + + try: + # Stop listeners + for l in self.listeners: + l.ready = False + + # Encourage a context switch + time.sleep(0.01) + + for l in self.listeners: + if l.isAlive(): + l.join() + + # Stop Monitor + self._monitor.stop() + if self._monitor.isAlive(): + self._monitor.join() + + # Stop Worker threads + self._threadpool.stop() + + if stoplogging: + logging.shutdown() + msg = "Calling logging.shutdown() is now the responsibility of \ + the application developer. Please update your \ + applications to no longer call rocket.stop(True)" + try: + import warnings + raise warnings.DeprecationWarning(msg) + except ImportError: + raise RuntimeError(msg) + + finally: + self.startstop_lock.release() def restart(self): - self.stop(False) + self.stop() self.start() def CherryPyWSGIServer(bind_addr, wsgi_app, numthreads = 10, @@ -484,49 +807,57 @@ def __init__(self, monitor_queue, active_queue, timeout, + threadpool, *args, **kwargs): Thread.__init__(self, *args, **kwargs) + + self._threadpool = threadpool # Instance Variables self.monitor_queue = monitor_queue self.active_queue = active_queue self.timeout = timeout + self.log = logging.getLogger('Rocket.Monitor') + self.log.addHandler(NullHandler()) + self.connections = set() self.active = False def run(self): - self.name = self.getName() - self.log = logging.getLogger('Rocket.Monitor') - self.log.addHandler(NullHandler()) - self.active = True conn_list = list() list_changed = False + + # We need to make sure the queue is empty before we start + while not self.monitor_queue.empty(): + self.monitor_queue.get() if __debug__: self.log.debug('Entering monitor loop.') # Enter thread main loop while self.active: + # Move the queued connections to the selection pool - while not self.monitor_queue.empty() or not len(self.connections): + while not self.monitor_queue.empty(): if __debug__: self.log.debug('In "receive timed-out connections" loop.') c = self.monitor_queue.get() if c is None: # A non-client is a signal to die if __debug__: self.log.debug('Received a death threat.') - return + self.stop() + break self.log.debug('Received a timed out connection.') if __debug__: assert(c not in self.connections) @@ -541,43 +872,50 @@ self.connections.add(c) list_changed = True # Wait on those connections - self.log.debug('Blocking on connections') if list_changed: conn_list = list(self.connections) list_changed = False try: - readable = select.select(conn_list, - [], - [], - THREAD_STOP_CHECK_INTERVAL)[0] + if len(conn_list): + readable = select.select(conn_list, + [], + [], + THREAD_STOP_CHECK_INTERVAL)[0] + else: + time.sleep(THREAD_STOP_CHECK_INTERVAL) + readable = [] + + if not self.active: + break + + # If we have any readable connections, put them back + for r in readable: + if __debug__: + self.log.debug('Restoring readable connection') + + if IS_JYTHON: + # Jython requires a socket to be in Non-blocking mode in + # order to select on it, but the rest of the code requires + # that it be in blocking mode. + r.setblocking(True) + + r.start_time = time.time() + self.active_queue.put(r) + + self.connections.remove(r) + list_changed = True + except: if self.active: raise else: break - # If we have any readable connections, put them back - for r in readable: - if __debug__: - self.log.debug('Restoring readable connection') - - if IS_JYTHON: - # Jython requires a socket to be in Non-blocking mode in - # order to select on it, but the rest of the code requires - # that it be in blocking mode. - r.setblocking(True) - - r.start_time = time.time() - self.active_queue.put(r) - - self.connections.remove(r) - list_changed = True - # If we have any stale connections, kill them off. if self.timeout: now = time.time() stale = set() for c in self.connections: @@ -595,18 +933,23 @@ try: c.close() finally: del c + + # Dynamically resize the threadpool to adapt to our changing needs. + self._threadpool.dynamic_resize() + def stop(self): self.active = False if __debug__: self.log.debug('Flushing waiting connections') - for c in self.connections: + while self.connections: + c = self.connections.pop() try: c.close() finally: del c @@ -632,10 +975,11 @@ # Import System Modules import logging # Import Package Modules # package imports removed in monolithic build + # Setup Logging log = logging.getLogger('Rocket.Errors.ThreadPool') log.addHandler(NullHandler()) @@ -661,55 +1005,67 @@ self.worker_class = method self.min_threads = min_threads self.max_threads = max_threads self.monitor_queue = monitor_queue self.stop_server = False + self.alive = False # TODO - Optimize this based on some real-world usage data self.grow_threshold = int(max_threads/10) + 2 if not isinstance(app_info, dict): app_info = dict() + if has_futures and app_info.get('futures'): + app_info['executor'] = WSGIExecutor(max([DEFAULTS['MIN_THREADS'], + 2])) + app_info.update(max_threads=max_threads, min_threads=min_threads) + self.min_threads = min_threads self.app_info = app_info self.threads = set() - for x in range(min_threads): - worker = self.worker_class(app_info, - self.active_queue, - self.monitor_queue) - self.threads.add(worker) def start(self): self.stop_server = False if __debug__: log.debug("Starting threads.") - for thread in self.threads: - thread.setDaemon(True) - thread.start() + self.grow(self.min_threads) + + self.alive = True def stop(self): + self.alive = False + if __debug__: log.debug("Stopping threads.") self.stop_server = True # Prompt the threads to die - for t in self.threads: - self.active_queue.put(None) + self.shrink(len(self.threads)) + + # Stop futures initially + if has_futures and self.app_info.get('futures'): + if __debug__: + log.debug("Future executor is present. Python will not " + "exit until all jobs have finished.") + self.app_info['executor'].shutdown(wait=False) # Give them the gun - for t in self.threads: - t.kill() + #active_threads = [t for t in self.threads if t.isAlive()] + #while active_threads: + # t = active_threads.pop() + # t.kill() # Wait until they pull the trigger for t in self.threads: - t.join() + if t.isAlive(): + t.join() # Clean up the mess self.bring_out_your_dead() def bring_out_your_dead(self): @@ -731,11 +1087,12 @@ return if not amount: amount = self.max_threads - amount = min([amount, self.max_threads - len(self.threads)]) + if self.alive: + amount = min([amount, self.max_threads - len(self.threads)]) if __debug__: log.debug("Growing by %i." % amount) for x in range(amount): @@ -782,11 +1139,11 @@ import re import sys import socket import logging import traceback -#from wsgiref.headers import Headers +from wsgiref.headers import Headers from threading import Thread from datetime import datetime try: from urllib import unquote @@ -835,130 +1192,10 @@ %s ''' if IS_JYTHON: HTTP_METHODS = set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT']) -### -# The Headers and FileWrapper classes are ripped straight from the Python -# Standard Library. I've removed some docstrings and integrated my BUF_SIZE. -# See the Python License here: http://docs.python.org/license.html -### - -# Regular expression that matches `special' characters in parameters, the -# existance of which force quoting of the parameter value. -import re -_tspecials = re.compile(r'[ \(\)<>@,;:\\"/\[\]\?=]') - -def _formatparam(param, value=None, quote=1): - """Convenience function to format and return a key=value pair. - - This will quote the value if needed or if quote is true. - """ - if value is not None and len(value) > 0: - if quote or _tspecials.search(value): - value = value.replace('\\', '\\\\').replace('"', r'\"') - return '%s="%s"' % (param, value) - else: - return '%s=%s' % (param, value) - else: - return param - -class Headers: - def __init__(self,headers): - if type(headers) is not type([]): - raise TypeError("Headers must be a list of name/value tuples") - self._headers = headers - - def __len__(self): - return len(self._headers) - - def __setitem__(self, name, val): - del self[name] - self._headers.append((name, val)) - - def __delitem__(self,name): - name = name.lower() - self._headers[:] = [kv for kv in self._headers if kv[0].lower() != name] - - def __getitem__(self,name): - return self.get(name) - - def has_key(self, name): - return self.get(name) is not None - - __contains__ = has_key - - def get_all(self, name): - name = name.lower() - return [kv[1] for kv in self._headers if kv[0].lower()==name] - - def get(self,name,default=None): - name = name.lower() - for k,v in self._headers: - if k.lower()==name: - return v - return default - - def keys(self): - return [k for k, v in self._headers] - - def values(self): - return [v for k, v in self._headers] - - def items(self): - return self._headers[:] - - def __repr__(self): - return "Headers(%r)" % self._headers - - def __str__(self): - return '\r\n'.join(["%s: %s" % kv for kv in self._headers]+['','']) - - def setdefault(self,name,value): - result = self.get(name) - if result is None: - self._headers.append((name,value)) - return value - else: - return result - - - def add_header(self, _name, _value, **_params): - parts = [] - if _value is not None: - parts.append(_value) - for k, v in _params.items(): - if v is None: - parts.append(k.replace('_', '-')) - else: - parts.append(_formatparam(k.replace('_', '-'), v)) - self._headers.append((_name, "; ".join(parts))) - -class FileWrapper: - """Wrapper to convert file-like objects to iterables""" - - def __init__(self, filelike, blksize=BUF_SIZE): - self.filelike = filelike - self.blksize = blksize - if hasattr(filelike,'close'): - self.close = filelike.close - - def __getitem__(self,key): - data = self.filelike.read(self.blksize) - if data: - return data - raise IndexError - - def __iter__(self): - return self - - def next(self): - data = self.filelike.read(self.blksize) - if data: - return data - raise StopIteration - class Worker(Thread): """The Worker class is a base class responsible for receiving connections and (a subclass) will run an application to process the the connection """ def __init__(self, @@ -976,10 +1213,11 @@ self.monitor_queue = monitor_queue self.size = 0 self.status = "200 OK" self.closeConnection = True + self.request_line = "" # Request Log self.req_log = logging.getLogger('Rocket.Requests') self.req_log.addHandler(NullHandler()) @@ -1107,18 +1345,18 @@ except socket.error: self.closeConnection = True self.err_log.error('Tried to send "%s" to client but received socket' ' error' % status) - def kill(self): - if self.isAlive() and hasattr(self, 'conn'): - try: - self.conn.shutdown(socket.SHUT_RDWR) - except socket.error: - info = sys.exc_info() - if info[1].args[0] != socket.EBADF: - self.err_log.debug('Error on shutdown: '+str(info)) + #def kill(self): + # if self.isAlive() and hasattr(self, 'conn'): + # try: + # self.conn.shutdown(socket.SHUT_RDWR) + # except socket.error: + # info = sys.exc_info() + # if info[1].args[0] != socket.EBADF: + # self.err_log.debug('Error on shutdown: '+str(info)) def read_request_line(self, sock_file): self.request_line = '' try: # Grab the request line @@ -1207,38 +1445,42 @@ host=host) return req def read_headers(self, sock_file): - headers = dict() - l = sock_file.readline() - - lname = None - lval = None - while True: - if PY3K: - try: - l = str(l, 'ISO-8859-1') - except UnicodeDecodeError: - self.err_log.warning('Client sent invalid header: ' + repr(l)) - - if l == '\r\n': - break - - if l[0] in ' \t' and lname: - # Some headers take more than one line - lval += ',' + l.strip() - else: - # HTTP header values are latin-1 encoded - l = l.split(':', 1) - # HTTP header names are us-ascii encoded - - lname = l[0].strip().upper().replace('-', '_') - lval = l[-1].strip() - headers[str(lname)] = str(lval) - - l = sock_file.readline() + try: + headers = dict() + l = sock_file.readline() + + lname = None + lval = None + while True: + if PY3K: + try: + l = str(l, 'ISO-8859-1') + except UnicodeDecodeError: + self.err_log.warning('Client sent invalid header: ' + repr(l)) + + if l == '\r\n': + break + + if l[0] in ' \t' and lname: + # Some headers take more than one line + lval += ',' + l.strip() + else: + # HTTP header values are latin-1 encoded + l = l.split(':', 1) + # HTTP header names are us-ascii encoded + + lname = l[0].strip().upper().replace('-', '_') + lval = l[-1].strip() + headers[str(lname)] = str(lval) + + l = sock_file.readline() + except socket.timeout: + raise SocketTimeout("Socket timed out before request.") + return headers class SocketTimeout(Exception): "Exception for when a socket times out between requests." pass @@ -1298,26 +1540,222 @@ def readlines(self): yield self.readline() def get_method(method): - methods = dict(wsgi=WSGIWorker) + + methods = dict(wsgi=WSGIWorker, + fs=FileSystemWorker) return methods[method.lower()] # Monolithic build...end of module: rocket\worker.py # Monolithic build...start of module: rocket\methods\__init__.py # Monolithic build...end of module: rocket\methods\__init__.py +# Monolithic build...start of module: rocket\methods\fs.py + +# Import System Modules +import os +import time +import mimetypes +from email.utils import formatdate +from wsgiref.headers import Headers +from wsgiref.util import FileWrapper +# Import Package Modules +# package imports removed in monolithic build + + +# Define Constants +CHUNK_SIZE = 2**16 # 64 Kilobyte chunks +HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s''' +INDEX_HEADER = '''\ + +Directory Index: %(path)s + + +

Directory Index: %(path)s

+ + +''' +INDEX_ROW = '''''' +INDEX_FOOTER = '''
Directories
\r\n''' + +class LimitingFileWrapper(FileWrapper): + def __init__(self, limit=None, *args, **kwargs): + self.limit = limit + FileWrapper.__init__(self, *args, **kwargs) + + def read(self, amt): + if amt > self.limit: + amt = self.limit + self.limit -= amt + return FileWrapper.read(self, amt) + +class FileSystemWorker(Worker): + def __init__(self, *args, **kwargs): + """Builds some instance variables that will last the life of the + thread.""" + + Worker.__init__(self, *args, **kwargs) + + self.root = os.path.abspath(self.app_info['document_root']) + self.display_index = self.app_info['display_index'] + + def serve_file(self, filepath, headers): + filestat = os.stat(filepath) + self.size = filestat.st_size + modtime = time.strftime("%a, %d %b %Y %H:%M:%S GMT", + time.gmtime(filestat.st_mtime)) + self.headers.add_header('Last-Modified', modtime) + if headers.get('if_modified_since') == modtime: + # The browser cache is up-to-date, send a 304. + self.status = "304 Not Modified" + self.data = [] + return + + ct = mimetypes.guess_type(filepath)[0] + self.content_type = ct if ct else 'text/plain' + try: + f = open(filepath, 'rb') + self.headers['Pragma'] = 'cache' + self.headers['Cache-Control'] = 'private' + self.headers['Content-Length'] = str(self.size) + if self.etag: + self.headers.add_header('Etag', self.etag) + if self.expires: + self.headers.add_header('Expires', self.expires) + + try: + # Implement 206 partial file support. + start, end = headers['range'].split('-') + start = 0 if not start.isdigit() else int(start) + end = self.size if not end.isdigit() else int(end) + if self.size < end or start < 0: + self.status = "214 Unsatisfiable Range Requested" + self.data = FileWrapper(f, CHUNK_SIZE) + else: + f.seek(start) + self.data = LimitingFileWrapper(f, CHUNK_SIZE, limit=end) + self.status = "206 Partial Content" + except: + self.data = FileWrapper(f, CHUNK_SIZE) + except IOError: + self.status = "403 Forbidden" + + def serve_dir(self, pth, rpth): + def rel_path(path): + return os.path.normpath(path[len(self.root):] if path.startswith(self.root) else path) + + if not self.display_index: + self.status = '404 File Not Found' + return b('') + else: + self.content_type = 'text/html' + + dir_contents = [os.path.join(pth, x) for x in os.listdir(os.path.normpath(pth))] + dir_contents.sort() + + dirs = [rel_path(x)+'/' for x in dir_contents if os.path.isdir(x)] + files = [rel_path(x) for x in dir_contents if os.path.isfile(x)] + + self.data = [INDEX_HEADER % dict(path='/'+rpth)] + if rpth: + self.data += [INDEX_ROW % dict(name='(parent directory)', cls='dir parent', link='/'.join(rpth[:-1].split('/')[:-1]))] + self.data += [INDEX_ROW % dict(name=os.path.basename(x[:-1]), link=os.path.join(rpth, os.path.basename(x[:-1])).replace('\\', '/'), cls='dir') for x in dirs] + self.data += ['Files'] + self.data += [INDEX_ROW % dict(name=os.path.basename(x), link=os.path.join(rpth, os.path.basename(x)).replace('\\', '/'), cls='file') for x in files] + self.data += [INDEX_FOOTER] + self.headers['Content-Length'] = self.size = str(sum([len(x) for x in self.data])) + self.status = '200 OK' + + def run_app(self, conn): + self.status = "200 OK" + self.size = 0 + self.expires = None + self.etag = None + self.content_type = 'text/plain' + self.content_length = None + + if __debug__: + self.err_log.debug('Getting sock_file') + + # Build our file-like object + sock_file = conn.makefile('rb',BUF_SIZE) + request = self.read_request_line(sock_file) + if request['method'].upper() not in ('GET', ): + self.status = "501 Not Implemented" + + try: + # Get our file path + headers = dict([(str(k.lower()), v) for k, v in self.read_headers(sock_file).items()]) + rpath = request.get('path', '').lstrip('/') + filepath = os.path.join(self.root, rpath) + filepath = os.path.abspath(filepath) + if __debug__: + self.err_log.debug('Request for path: %s' % filepath) + + self.closeConnection = headers.get('connection', 'close').lower() == 'close' + self.headers = Headers([('Date', formatdate(usegmt=True)), + ('Server', HTTP_SERVER_SOFTWARE), + ('Connection', headers.get('connection', 'close')), + ]) + + if not filepath.lower().startswith(self.root.lower()): + # File must be within our root directory + self.status = "400 Bad Request" + self.closeConnection = True + elif not os.path.exists(filepath): + self.status = "404 File Not Found" + self.closeConnection = True + elif os.path.isdir(filepath): + self.serve_dir(filepath, rpath) + elif os.path.isfile(filepath): + self.serve_file(filepath, headers) + else: + # It exists but it's not a file or a directory???? + # What is it then? + self.status = "501 Not Implemented" + self.closeConnection = True + + h = self.headers + statcode, statstr = self.status.split(' ', 1) + statcode = int(statcode) + if statcode >= 400: + h.add_header('Content-Type', self.content_type) + self.data = [statstr] + + # Build our output headers + header_data = HEADER_RESPONSE % (self.status, str(h)) + + # Send the headers + if __debug__: + self.err_log.debug('Sending Headers: %s' % repr(header_data)) + self.conn.sendall(b(header_data)) + + for data in self.data: + self.conn.sendall(b(data)) + + if hasattr(self.data, 'close'): + self.data.close() + + finally: + if __debug__: + self.err_log.debug('Finally closing sock_file') + sock_file.close() + +# Monolithic build...end of module: rocket\methods\fs.py # Monolithic build...start of module: rocket\methods\wsgi.py # Import System Modules import sys import socket -#from wsgiref.headers import Headers -#from wsgiref.util import FileWrapper +from wsgiref.headers import Headers +from wsgiref.util import FileWrapper + # Import Package Modules # package imports removed in monolithic build + if PY3K: from email.utils import formatdate else: @@ -1348,16 +1786,22 @@ multithreaded = False self.base_environ = dict({'SERVER_SOFTWARE': self.app_info['server_software'], 'wsgi.multithread': multithreaded, }) self.base_environ.update(BASE_ENV) + # Grab our application self.app = self.app_info.get('wsgi_app') if not hasattr(self.app, "__call__"): raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app)) + # Enable futures + if has_futures and self.app_info.get('futures'): + executor = self.app_info['executor'] + self.base_environ.update({"wsgiorg.executor": executor, + "wsgiorg.futures": executor.futures}) def build_environ(self, sock_file, conn): """ Build the execution environment. """ # Grab the request line request = self.read_request_line(sock_file) @@ -1390,10 +1834,18 @@ environ['wsgi.url_scheme'] = 'https' environ['HTTPS'] = 'on' else: environ['wsgi.url_scheme'] = 'http' + if conn.ssl: + try: + peercert = conn.socket.getpeercert(binary_form=True) + environ['SSL_CLIENT_RAW_CERT'] = \ + peercert and ssl.DER_cert_to_PEM_cert(peercert) + except Exception,e: + print e + if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked': environ['wsgi.input'] = ChunkedReader(sock_file) else: environ['wsgi.input'] = sock_file @@ -1522,11 +1974,14 @@ if __debug__: self.err_log.debug('Getting sock_file') # Build our file-like object - sock_file = conn.makefile('rb',BUF_SIZE) + if PY3K: + sock_file = conn.makefile(mode='rb', buffering=BUF_SIZE) + else: + sock_file = conn.makefile(BUF_SIZE) try: # Read the headers and build our WSGI environment self.environ = environ = self.build_environ(sock_file, conn) @@ -1585,15 +2040,11 @@ static_folder = os.path.join(os.getcwd(),static_folder) path = os.path.join(static_folder, environ['PATH_INFO'][1:] or 'index.html') type = types.get(path.split('.')[-1],'text') if os.path.exists(path): try: - pathfile = open(path,'rb') - try: - data = pathfile.read() - finally: - pathfile.close() + data = open(path,'rb').read() start_response('200 OK', [('Content-Type', type)]) except IOError: start_response('404 NOT FOUND', []) data = '404 NOT FOUND' else: