1
2
3
4
5
6
7 import sys
8 import errno
9 import socket
10 import logging
11 import platform
12
13
14 VERSION = '1.2.2'
15 SERVER_NAME = socket.gethostname()
16 SERVER_SOFTWARE = 'Rocket %s' % VERSION
17 HTTP_SERVER_SOFTWARE = '%s Python/%s' % (SERVER_SOFTWARE, sys.version.split(' ')[0])
18 BUF_SIZE = 16384
19 SOCKET_TIMEOUT = 1
20 THREAD_STOP_CHECK_INTERVAL = 1
21 IS_JYTHON = platform.system() == 'Java'
22 IGNORE_ERRORS_ON_CLOSE = set([errno.ECONNABORTED, errno.ECONNRESET])
23 DEFAULT_LISTEN_QUEUE_SIZE = 5
24 DEFAULT_MIN_THREADS = 10
25 DEFAULT_MAX_THREADS = 0
26 DEFAULTS = dict(LISTEN_QUEUE_SIZE = DEFAULT_LISTEN_QUEUE_SIZE,
27 MIN_THREADS = DEFAULT_MIN_THREADS,
28 MAX_THREADS = DEFAULT_MAX_THREADS)
29
30 PY3K = sys.version_info[0] > 2
31
33 "A Logging handler to prevent library errors."
34 - def emit(self, record):
36
37 if PY3K:
39 """ Convert string/unicode/bytes literals into bytes. This allows for
40 the same code to run on Python 2.x and 3.x. """
41 if isinstance(val, str):
42 return val.encode()
43 else:
44 return val
45
46 - def u(val, encoding="us-ascii"):
47 """ Convert bytes into string/unicode. This allows for the
48 same code to run on Python 2.x and 3.x. """
49 if isinstance(val, bytes):
50 return val.decode(encoding)
51 else:
52 return val
53
54 else:
56 """ Convert string/unicode/bytes literals into bytes. This allows for
57 the same code to run on Python 2.x and 3.x. """
58 if isinstance(val, unicode):
59 return val.encode()
60 else:
61 return val
62
63 - def u(val, encoding="us-ascii"):
64 """ Convert bytes into string/unicode. This allows for the
65 same code to run on Python 2.x and 3.x. """
66 if isinstance(val, str):
67 return val.decode(encoding)
68 else:
69 return val
70
71
72
73
74 __all__ = ['VERSION', 'SERVER_SOFTWARE', 'HTTP_SERVER_SOFTWARE', 'BUF_SIZE',
75 'IS_JYTHON', 'IGNORE_ERRORS_ON_CLOSE', 'DEFAULTS', 'PY3K', 'b', 'u',
76 'Rocket', 'CherryPyWSGIServer', 'SERVER_NAME', 'NullHandler']
77
78
79
80
81
82 import sys
83 import time
84 import socket
85 try:
86 import ssl
87 has_ssl = True
88 except ImportError:
89 has_ssl = False
90
91
93 __slots__ = [
94 'setblocking',
95 'sendall',
96 'shutdown',
97 'makefile',
98 'fileno',
99 'client_addr',
100 'client_port',
101 'server_port',
102 'socket',
103 'start_time',
104 'ssl',
105 'secure'
106 ]
107
108 - def __init__(self, sock_tuple, port, secure=False):
129
131 if hasattr(self.socket, '_sock'):
132 try:
133 self.socket._sock.close()
134 except socket.error:
135 info = sys.exc_info()
136 if info[1].errno != socket.EBADF:
137 raise info[1]
138 else:
139 pass
140 self.socket.close()
141
142
143
144
145
146 import os
147 import socket
148 import logging
149 import traceback
150 from threading import Thread
151
152 try:
153 import ssl
154 from ssl import SSLError
155 has_ssl = True
156 except ImportError:
157 has_ssl = False
160
161
162
164 """The Listener class is a class responsible for accepting connections
165 and queuing them to be processed by a worker thread."""
166
167 - def __init__(self, interface, queue_size, active_queue, *args, **kwargs):
168 Thread.__init__(self, *args, **kwargs)
169
170
171 self.active_queue = active_queue
172 self.interface = interface
173 self.addr = interface[0]
174 self.port = interface[1]
175 self.secure = len(interface) == 4 and \
176 os.path.exists(interface[2]) and \
177 os.path.exists(interface[3])
178 self.ready = False
179
180
181 self.err_log = logging.getLogger('Rocket.Errors.Port%i' % self.port)
182 self.err_log.addHandler(NullHandler())
183
184
185 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
186
187 if not listener:
188 self.err_log.error("Failed to get socket.")
189 return
190
191 if self.secure:
192 if not has_ssl:
193 self.err_log.error("ssl module required to serve HTTPS.")
194 return
195 elif not os.path.exists(interface[2]):
196 data = (interface[2], interface[0], interface[1])
197 self.err_log.error("Cannot find key file "
198 "'%s'. Cannot bind to %s:%s" % data)
199 return
200 elif not os.path.exists(interface[3]):
201 data = (interface[3], interface[0], interface[1])
202 self.err_log.error("Cannot find certificate file "
203 "'%s'. Cannot bind to %s:%s" % data)
204 return
205
206
207 try:
208 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
209 except:
210 msg = "Cannot share socket. Using %s:%i exclusively."
211 self.err_log.warning(msg % (self.addr, self.port))
212
213 try:
214 if not IS_JYTHON:
215 listener.setsockopt(socket.IPPROTO_TCP,
216 socket.TCP_NODELAY,
217 1)
218 except:
219 msg = "Cannot set TCP_NODELAY, things might run a little slower"
220 self.err_log.warning(msg)
221
222 try:
223 listener.bind((self.addr, self.port))
224 except:
225 msg = "Socket %s:%i in use by other process and it won't share."
226 self.err_log.error(msg % (self.addr, self.port))
227 else:
228
229
230 listener.settimeout(THREAD_STOP_CHECK_INTERVAL)
231
232
233 listener.listen(queue_size)
234
235 self.listener = listener
236
237 self.ready = True
238
240 try:
241 sock = ssl.wrap_socket(sock,
242 keyfile = self.interface[2],
243 certfile = self.interface[3],
244 server_side = True,
245 ssl_version = ssl.PROTOCOL_SSLv23)
246 except SSLError:
247
248
249
250 pass
251
252 return sock
253
254
256 if not self.ready:
257 self.err_log.warning('Listener started when not ready.')
258 return
259
260 if __debug__:
261 self.err_log.debug('Entering main loop.')
262 while True:
263 try:
264 sock, addr = self.listener.accept()
265
266 if self.secure:
267 sock = self.wrap_socket(sock)
268
269 self.active_queue.put(((sock, addr),
270 self.interface[1],
271 self.secure))
272
273 except socket.timeout:
274
275
276
277 if not self.ready:
278 if __debug__:
279 self.err_log.debug('Listener exiting.')
280 return
281 else:
282 continue
283 except:
284 self.err_log.error(str(traceback.format_exc()))
285
286
287
288
289
290 import sys
291 import time
292 import socket
293 import logging
294 import traceback
295
296 try:
297 from queue import Queue
298 except ImportError:
299 from Queue import Queue
300
301
302
303
304
305
306
307
308
309 log = logging.getLogger('Rocket')
310 log.addHandler(NullHandler())
311
313 """The Rocket class is responsible for handling threads and accepting and
314 dispatching connections."""
315
316 - def __init__(self,
317 interfaces = ('127.0.0.1', 8000),
318 method = 'wsgi',
319 app_info = None,
320 min_threads = None,
321 max_threads = None,
322 queue_size = None,
323 timeout = 600,
324 handle_signals = True):
325
326 self.handle_signals = handle_signals
327
328 if not isinstance(interfaces, list):
329 self.interfaces = [interfaces]
330 else:
331 self.interfaces = interfaces
332
333 if min_threads is None:
334 min_threads = DEFAULTS['MIN_THREADS']
335
336 if max_threads is None:
337 max_threads = DEFAULTS['MAX_THREADS']
338
339 if not queue_size:
340 if hasattr(socket, 'SOMAXCONN'):
341 queue_size = socket.SOMAXCONN
342 else:
343 queue_size = DEFAULTS['LISTEN_QUEUE_SIZE']
344
345 if max_threads and queue_size > max_threads:
346 queue_size = max_threads
347
348 if isinstance(app_info, dict):
349 app_info['server_software'] = SERVER_SOFTWARE
350
351 monitor_queue = Queue()
352 active_queue = Queue()
353
354 self._monitor = Monitor(monitor_queue, active_queue, timeout)
355
356 self._threadpool = ThreadPool(get_method(method),
357 app_info = app_info,
358 active_queue=active_queue,
359 monitor_queue = monitor_queue,
360 min_threads=min_threads,
361 max_threads=max_threads)
362
363
364 self.listeners = [Listener(i, queue_size, active_queue) for i in self.interfaces]
365 for ndx in range(len(self.listeners)-1, 0, -1):
366 if not self.listeners[ndx].ready:
367 del self.listeners[ndx]
368
369 if not self.listeners:
370 log.critical("No interfaces to listen on...closing.")
371 sys.exit(1)
372
374 log.info('Received SIGTERM')
375 self.stop()
376
378 log.info('Received SIGHUP')
379 self.restart()
380
382 log.info('Starting %s' % SERVER_SOFTWARE)
383
384
385 if self.handle_signals:
386 try:
387 import signal
388 signal.signal(signal.SIGTERM, self._sigterm)
389 signal.signal(signal.SIGUSR1, self._sighup)
390 except:
391 log.debug('This platform does not support signals.')
392
393
394 self._threadpool.start()
395
396
397 self._monitor.setDaemon(True)
398 self._monitor.start()
399
400
401
402 str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '')
403
404 msg = 'Listening on sockets: '
405 msg += ', '.join(['%s:%i%s' % str_extract(l) for l in self.listeners])
406 log.info(msg)
407
408 for l in self.listeners:
409 l.start()
410
411 tp = self._threadpool
412 dynamic_resize = tp.dynamic_resize
413
414 while not tp.stop_server:
415 try:
416 dynamic_resize()
417 time.sleep(THREAD_STOP_CHECK_INTERVAL)
418 except KeyboardInterrupt:
419
420 break
421 except:
422 if not tp.stop_server:
423 log.error(str(traceback.format_exc()))
424 continue
425
426 return self.stop()
427
428 - def stop(self, stoplogging = True):
429 log.info("Stopping Server")
430
431
432 for l in self.listeners:
433 l.ready = False
434 if l.isAlive():
435 l.join()
436
437
438 self._threadpool.stop()
439
440
441 self._monitor.stop()
442 if self._monitor.isAlive():
443 self._monitor.join()
444
445 if stoplogging:
446 logging.shutdown()
447
451
452 -def CherryPyWSGIServer(bind_addr,
453 wsgi_app,
454 numthreads = 10,
455 server_name = None,
456 max = -1,
457 request_queue_size = 5,
458 timeout = 10,
459 shutdown_timeout = 5):
460 """ A Cherrypy wsgiserver-compatible wrapper. """
461 max_threads = max
462 if max_threads < 0:
463 max_threads = 0
464 return Rocket(bind_addr, 'wsgi', {'wsgi_app': wsgi_app},
465 min_threads = numthreads,
466 max_threads = max_threads,
467 queue_size = request_queue_size,
468 timeout = timeout)
469
470
471
472
473
474 import time
475 import logging
476 import select
477 from threading import Thread
478
479
480
481
483
484
485 - def __init__(self,
486 monitor_queue,
487 active_queue,
488 timeout,
489 *args,
490 **kwargs):
491
492 Thread.__init__(self, *args, **kwargs)
493
494
495 self.monitor_queue = monitor_queue
496 self.active_queue = active_queue
497 self.timeout = timeout
498
499 self.connections = set()
500 self.active = False
501
503 self.name = self.getName()
504 self.log = logging.getLogger('Rocket.Monitor')
505 self.log.addHandler(NullHandler())
506
507 self.active = True
508 conn_list = list()
509 list_changed = False
510
511 if __debug__:
512 self.log.debug('Entering monitor loop.')
513
514
515 while self.active:
516
517 while not self.monitor_queue.empty() or not len(self.connections):
518 if __debug__:
519 self.log.debug('In "receive timed-out connections" loop.')
520
521 c = self.monitor_queue.get()
522
523 if c is None:
524
525 if __debug__:
526 self.log.debug('Received a death threat.')
527 return
528
529 self.log.debug('Received a timed out connection.')
530
531 if __debug__:
532 assert(c not in self.connections)
533
534 if IS_JYTHON:
535
536
537 c.setblocking(False)
538
539 if __debug__:
540 self.log.debug('Adding connection to monitor list.')
541
542 self.connections.add(c)
543 list_changed = True
544
545
546 self.log.debug('Blocking on connections')
547 if list_changed:
548 conn_list = list(self.connections)
549 list_changed = False
550
551 try:
552 readable = select.select(conn_list,
553 [],
554 [],
555 THREAD_STOP_CHECK_INTERVAL)[0]
556 except:
557 if self.active:
558 raise
559 else:
560 break
561
562
563 for r in readable:
564 if __debug__:
565 self.log.debug('Restoring readable connection')
566
567 if IS_JYTHON:
568
569
570
571 r.setblocking(True)
572
573 r.start_time = time.time()
574 self.active_queue.put(r)
575
576 self.connections.remove(r)
577 list_changed = True
578
579
580 if self.timeout:
581 now = time.time()
582 stale = set()
583 for c in self.connections:
584 if (now - c.start_time) >= self.timeout:
585 stale.add(c)
586
587 for c in stale:
588 if __debug__:
589
590 data = (c.client_addr, c.server_port, c.ssl and '*' or '')
591 self.log.debug('Flushing stale connection: %s:%i%s' % data)
592
593 self.connections.remove(c)
594 list_changed = True
595
596 try:
597 c.close()
598 finally:
599 del c
600
602 self.active = False
603
604 if __debug__:
605 self.log.debug('Flushing waiting connections')
606
607 for c in self.connections:
608 try:
609 c.close()
610 finally:
611 del c
612
613 if __debug__:
614 self.log.debug('Flushing queued connections')
615
616 while not self.monitor_queue.empty():
617 c = self.monitor_queue.get()
618
619 if c is None:
620 continue
621
622 try:
623 c.close()
624 finally:
625 del c
626
627
628 self.monitor_queue.put(None)
629
630
631
632
633
634 import logging
635
636
637
638
639 log = logging.getLogger('Rocket.Errors.ThreadPool')
640 log.addHandler(NullHandler())
641
643 """The ThreadPool class is a container class for all the worker threads. It
644 manages the number of actively running threads."""
645
646 - def __init__(self,
647 method,
648 app_info,
649 active_queue,
650 monitor_queue,
651 min_threads=DEFAULTS['MIN_THREADS'],
652 max_threads=DEFAULTS['MAX_THREADS'],
653 ):
654
655 if __debug__:
656 log.debug("Initializing ThreadPool.")
657
658 self.check_for_dead_threads = 0
659 self.active_queue = active_queue
660
661 self.worker_class = method
662 self.min_threads = min_threads
663 self.max_threads = max_threads
664 self.monitor_queue = monitor_queue
665 self.stop_server = False
666
667
668 self.grow_threshold = int(max_threads/10) + 2
669
670 if not isinstance(app_info, dict):
671 app_info = dict()
672
673 app_info.update(max_threads=max_threads,
674 min_threads=min_threads)
675
676 self.app_info = app_info
677
678 self.threads = set()
679 for x in range(min_threads):
680 worker = self.worker_class(app_info,
681 self.active_queue,
682 self.monitor_queue)
683 self.threads.add(worker)
684
686 self.stop_server = False
687 if __debug__:
688 log.debug("Starting threads.")
689
690 for thread in self.threads:
691 thread.setDaemon(True)
692 thread.start()
693
695 if __debug__:
696 log.debug("Stopping threads.")
697
698 self.stop_server = True
699
700
701 for t in self.threads:
702 self.active_queue.put(None)
703
704
705 for t in self.threads:
706 t.kill()
707
708
709 for t in self.threads:
710 t.join()
711
712
713 self.bring_out_your_dead()
714
716
717
718 dead_threads = [t for t in self.threads if not t.isAlive()]
719 for t in dead_threads:
720 if __debug__:
721 log.debug("Removing dead thread: %s." % t.getName())
722 try:
723
724 self.threads.remove(t)
725 except:
726 pass
727 self.check_for_dead_threads -= len(dead_threads)
728
729 - def grow(self, amount=None):
730 if self.stop_server:
731 return
732
733 if not amount:
734 amount = self.max_threads
735
736 amount = min([amount, self.max_threads - len(self.threads)])
737
738 if __debug__:
739 log.debug("Growing by %i." % amount)
740
741 for x in range(amount):
742 worker = self.worker_class(self.app_info,
743 self.active_queue,
744 self.monitor_queue)
745
746 worker.setDaemon(True)
747 self.threads.add(worker)
748 worker.start()
749
751 if __debug__:
752 log.debug("Shrinking by %i." % amount)
753
754 self.check_for_dead_threads += amount
755
756 for x in range(amount):
757 self.active_queue.put(None)
758
760 if (self.max_threads > self.min_threads or self.max_threads == 0):
761 if self.check_for_dead_threads > 0:
762 self.bring_out_your_dead()
763
764 queueSize = self.active_queue.qsize()
765 threadCount = len(self.threads)
766
767 if __debug__:
768 log.debug("Examining ThreadPool. %i threads and %i Q'd conxions"
769 % (threadCount, queueSize))
770
771 if queueSize == 0 and threadCount > self.min_threads:
772 self.shrink()
773
774 elif queueSize > self.grow_threshold:
775
776 self.grow(queueSize)
777
778
779
780
781
782 import re
783 import sys
784 import socket
785 import logging
786 import traceback
787
788 from threading import Thread
789 from datetime import datetime
790
791 try:
792 from urllib import unquote
793 except ImportError:
794 from urllib.parse import unquote
795
796 try:
797 from io import StringIO
798 except ImportError:
799 try:
800 from cStringIO import StringIO
801 except ImportError:
802 from StringIO import StringIO
803
804 try:
805 from ssl import SSLError
806 except ImportError:
809
810
811
812
813
814 re_SLASH = re.compile('%2F', re.IGNORECASE)
815 re_REQUEST_LINE = re.compile(r"""^
816 (?P<method>OPTIONS|GET|HEAD|POST|PUT|DELETE|TRACE|CONNECT) # Request Method
817 \ # (single space)
818 (
819 (?P<scheme>[^:/]+) # Scheme
820 (://) #
821 (?P<host>[^/]+) # Host
822 )? #
823 (?P<path>(\*|/[^ \?]*)) # Path
824 (\? (?P<query_string>[^ ]+))? # Query String
825 \ # (single space)
826 (?P<protocol>HTTPS?/1\.[01]) # Protocol
827 $
828 """, re.X)
829 LOG_LINE = '%(client_ip)s - "%(request_line)s" - %(status)s %(size)s'
830 RESPONSE = '''\
831 HTTP/1.1 %s
832 Content-Length: %i
833 Content-Type: %s
834
835 %s
836 '''
837 if IS_JYTHON:
838 HTTP_METHODS = set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT'])
839
840
841
842
843
844
845
846
847
848 import re
849 _tspecials = re.compile(r'[ \(\)<>@,;:\\"/\[\]\?=]')
850
864
867 if type(headers) is not type([]):
868 raise TypeError("Headers must be a list of name/value tuples")
869 self._headers = headers
870
872 return len(self._headers)
873
877
881
884
886 return self.get(name) is not None
887
888 __contains__ = has_key
889
893
900
902 return [k for k, v in self._headers]
903
905 return [v for k, v in self._headers]
906
908 return self._headers[:]
909
911 return "Headers(%r)" % self._headers
912
914 return '\r\n'.join(["%s: %s" % kv for kv in self._headers]+['',''])
915
917 result = self.get(name)
918 if result is None:
919 self._headers.append((name,value))
920 return value
921 else:
922 return result
923
924
926 parts = []
927 if _value is not None:
928 parts.append(_value)
929 for k, v in _params.items():
930 if v is None:
931 parts.append(k.replace('_', '-'))
932 else:
933 parts.append(_formatparam(k.replace('_', '-'), v))
934 self._headers.append((_name, "; ".join(parts)))
935
937 """Wrapper to convert file-like objects to iterables"""
938
940 self.filelike = filelike
941 self.blksize = blksize
942 if hasattr(filelike,'close'):
943 self.close = filelike.close
944
946 data = self.filelike.read(self.blksize)
947 if data:
948 return data
949 raise IndexError
950
953
955 data = self.filelike.read(self.blksize)
956 if data:
957 return data
958 raise StopIteration
959
961 """The Worker class is a base class responsible for receiving connections
962 and (a subclass) will run an application to process the the connection """
963
964 - def __init__(self,
965 app_info,
966 active_queue,
967 monitor_queue,
968 *args,
969 **kwargs):
970
971 Thread.__init__(self, *args, **kwargs)
972
973
974 self.app_info = app_info
975 self.active_queue = active_queue
976 self.monitor_queue = monitor_queue
977
978 self.size = 0
979 self.status = "200 OK"
980 self.closeConnection = True
981
982
983 self.req_log = logging.getLogger('Rocket.Requests')
984 self.req_log.addHandler(NullHandler())
985
986
987 self.err_log = logging.getLogger('Rocket.Errors.'+self.getName())
988 self.err_log.addHandler(NullHandler())
989
991 if typ == SSLError:
992 if 'timed out' in val.args[0]:
993 typ = SocketTimeout
994 if typ == SocketTimeout:
995 if __debug__:
996 self.err_log.debug('Socket timed out')
997 self.monitor_queue.put(self.conn)
998 return True
999 if typ == SocketClosed:
1000 self.closeConnection = True
1001 if __debug__:
1002 self.err_log.debug('Client closed socket')
1003 return False
1004 if typ == BadRequest:
1005 self.closeConnection = True
1006 if __debug__:
1007 self.err_log.debug('Client sent a bad request')
1008 return True
1009 if typ == socket.error:
1010 self.closeConnection = True
1011 if val.args[0] in IGNORE_ERRORS_ON_CLOSE:
1012 if __debug__:
1013 self.err_log.debug('Ignorable socket Error received...'
1014 'closing connection.')
1015 return False
1016 else:
1017 self.status = "999 Utter Server Failure"
1018 tb_fmt = traceback.format_exception(typ, val, tb)
1019 self.err_log.error('Unhandled Error when serving '
1020 'connection:\n' + '\n'.join(tb_fmt))
1021 return False
1022
1023 self.closeConnection = True
1024 tb_fmt = traceback.format_exception(typ, val, tb)
1025 self.err_log.error('\n'.join(tb_fmt))
1026 self.send_response('500 Server Error')
1027 return False
1028
1030 if __debug__:
1031 self.err_log.debug('Entering main loop.')
1032
1033
1034 while True:
1035 conn = self.active_queue.get()
1036
1037 if not conn:
1038
1039 if __debug__:
1040 self.err_log.debug('Received a death threat.')
1041 return conn
1042
1043 if isinstance(conn, tuple):
1044 conn = Connection(*conn)
1045
1046 self.conn = conn
1047
1048 if conn.ssl != conn.secure:
1049 self.err_log.info('Received HTTP connection on HTTPS port.')
1050 self.send_response('400 Bad Request')
1051 self.closeConnection = True
1052 conn.close()
1053 continue
1054 else:
1055 if __debug__:
1056 self.err_log.debug('Received a connection.')
1057 self.closeConnection = False
1058
1059
1060 while True:
1061 if __debug__:
1062 self.err_log.debug('Serving a request')
1063 try:
1064 self.run_app(conn)
1065 log_info = dict(client_ip = conn.client_addr,
1066 time = datetime.now().strftime('%c'),
1067 status = self.status.split(' ')[0],
1068 size = self.size,
1069 request_line = self.request_line)
1070 self.req_log.info(LOG_LINE % log_info)
1071 except:
1072 exc = sys.exc_info()
1073 handled = self._handleError(*exc)
1074 if handled:
1075 break
1076 else:
1077 if self.request_line:
1078 log_info = dict(client_ip = conn.client_addr,
1079 time = datetime.now().strftime('%c'),
1080 status = self.status.split(' ')[0],
1081 size = self.size,
1082 request_line = self.request_line + ' - not stopping')
1083 self.req_log.info(LOG_LINE % log_info)
1084
1085 if self.closeConnection:
1086 try:
1087 conn.close()
1088 except:
1089 self.err_log.error(str(traceback.format_exc()))
1090
1091 break
1092
1094
1095
1096 self.closeConnection = True
1097 raise NotImplementedError('Overload this method!')
1098
1100 stat_msg = status.split(' ', 1)[1]
1101 msg = RESPONSE % (status,
1102 len(stat_msg),
1103 'text/plain',
1104 stat_msg)
1105 try:
1106 self.conn.sendall(b(msg))
1107 except socket.error:
1108 self.closeConnection = True
1109 self.err_log.error('Tried to send "%s" to client but received socket'
1110 ' error' % status)
1111
1113 if self.isAlive() and hasattr(self, 'conn'):
1114 try:
1115 self.conn.shutdown(socket.SHUT_RDWR)
1116 except socket.error:
1117 info = sys.exc_info()
1118 if info[1].args[0] != socket.EBADF:
1119 self.err_log.debug('Error on shutdown: '+str(info))
1120
1122 self.request_line = ''
1123 try:
1124
1125 d = sock_file.readline()
1126 if PY3K:
1127 d = d.decode('ISO-8859-1')
1128
1129 if d == '\r\n':
1130
1131 if __debug__:
1132 self.err_log.debug('Client sent newline')
1133
1134 d = sock_file.readline()
1135 if PY3K:
1136 d = d.decode('ISO-8859-1')
1137 except socket.timeout:
1138 raise SocketTimeout("Socket timed out before request.")
1139
1140 d = d.strip()
1141
1142 if not d:
1143 if __debug__:
1144 self.err_log.debug('Client did not send a recognizable request.')
1145 raise SocketClosed('Client closed socket.')
1146
1147 self.request_line = d
1148
1149
1150
1151
1152
1153
1154 if IS_JYTHON:
1155 return self._read_request_line_jython(d)
1156
1157 match = re_REQUEST_LINE.match(d)
1158
1159 if not match:
1160 self.send_response('400 Bad Request')
1161 raise BadRequest
1162
1163 req = match.groupdict()
1164 for k,v in req.items():
1165 if not v:
1166 req[k] = ""
1167 if k == 'path':
1168 req['path'] = r'%2F'.join([unquote(x) for x in re_SLASH.split(v)])
1169
1170 return req
1171
1173 d = d.strip()
1174 try:
1175 method, uri, proto = d.split(' ')
1176 if not proto.startswith('HTTP') or \
1177 proto[-3:] not in ('1.0', '1.1') or \
1178 method not in HTTP_METHODS:
1179 self.send_response('400 Bad Request')
1180 raise BadRequest
1181 except ValueError:
1182 self.send_response('400 Bad Request')
1183 raise BadRequest
1184
1185 req = dict(method=method, protocol = proto)
1186 scheme = ''
1187 host = ''
1188 if uri == '*' or uri.startswith('/'):
1189 path = uri
1190 elif '://' in uri:
1191 scheme, rest = uri.split('://')
1192 host, path = rest.split('/', 1)
1193 path = '/' + path
1194 else:
1195 self.send_response('400 Bad Request')
1196 raise BadRequest
1197
1198 query_string = ''
1199 if '?' in path:
1200 path, query_string = path.split('?', 1)
1201
1202 path = r'%2F'.join([unquote(x) for x in re_SLASH.split(path)])
1203
1204 req.update(path=path,
1205 query_string=query_string,
1206 scheme=scheme.lower(),
1207 host=host)
1208 return req
1209
1210
1212 headers = dict()
1213 l = sock_file.readline()
1214
1215 lname = None
1216 lval = None
1217 while True:
1218 if PY3K:
1219 try:
1220 l = str(l, 'ISO-8859-1')
1221 except UnicodeDecodeError:
1222 self.err_log.warning('Client sent invalid header: ' + repr(l))
1223
1224 if l == '\r\n':
1225 break
1226
1227 if l[0] in ' \t' and lname:
1228
1229 lval += ',' + l.strip()
1230 else:
1231
1232 l = l.split(':', 1)
1233
1234
1235 lname = l[0].strip().upper().replace('-', '_')
1236 lval = l[-1].strip()
1237 headers[str(lname)] = str(lval)
1238
1239 l = sock_file.readline()
1240 return headers
1241
1243 "Exception for when a socket times out between requests."
1244 pass
1245
1247 "Exception for when a client sends an incomprehensible request."
1248 pass
1249
1251 "Exception for when a socket is closed by the client."
1252 pass
1253
1256 self.stream = sock_file
1257 self.chunk_size = 0
1258
1260 chunk_len = ""
1261 try:
1262 while "" == chunk_len:
1263 chunk_len = self.stream.readline().strip()
1264 return int(chunk_len, 16)
1265 except ValueError:
1266 return 0
1267
1268 - def read(self, size):
1269 data = b('')
1270 chunk_size = self.chunk_size
1271 while size:
1272 if not chunk_size:
1273 chunk_size = self._read_header()
1274
1275 if size < chunk_size:
1276 data += self.stream.read(size)
1277 chunk_size -= size
1278 break
1279 else:
1280 if not chunk_size:
1281 break
1282 data += self.stream.read(chunk_size)
1283 size -= chunk_size
1284 chunk_size = 0
1285
1286 self.chunk_size = chunk_size
1287 return data
1288
1290 data = b('')
1291 c = self.read(1)
1292 while c and c != b('\n'):
1293 data += c
1294 c = self.read(1)
1295 data += c
1296 return data
1297
1300
1302
1303 methods = dict(wsgi=WSGIWorker)
1304 return methods[method.lower()]
1305
1306
1307
1308
1309
1310
1311
1312
1313 import sys
1314 import socket
1315
1316
1317
1318
1319
1320
1321 if PY3K:
1322 from email.utils import formatdate
1323 else:
1324
1325 from email.Utils import formatdate
1326
1327
1328 NEWLINE = b('\r\n')
1329 HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s'''
1330 BASE_ENV = {'SERVER_NAME': SERVER_NAME,
1331 'SCRIPT_NAME': '',
1332 'wsgi.errors': sys.stderr,
1333 'wsgi.version': (1, 0),
1334 'wsgi.multiprocess': False,
1335 'wsgi.run_once': False,
1336 'wsgi.file_wrapper': FileWrapper
1337 }
1338
1341 """Builds some instance variables that will last the life of the
1342 thread."""
1343 Worker.__init__(self, *args, **kwargs)
1344
1345 if isinstance(self.app_info, dict):
1346 multithreaded = self.app_info.get('max_threads') != 1
1347 else:
1348 multithreaded = False
1349 self.base_environ = dict({'SERVER_SOFTWARE': self.app_info['server_software'],
1350 'wsgi.multithread': multithreaded,
1351 })
1352 self.base_environ.update(BASE_ENV)
1353
1354 self.app = self.app_info.get('wsgi_app')
1355
1356 if not hasattr(self.app, "__call__"):
1357 raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app))
1358
1359
1361 """ Build the execution environment. """
1362
1363 request = self.read_request_line(sock_file)
1364
1365
1366 environ = self.base_environ.copy()
1367
1368
1369 for k, v in self.read_headers(sock_file).items():
1370 environ[str('HTTP_'+k)] = v
1371
1372
1373 environ['REQUEST_METHOD'] = request['method']
1374 environ['PATH_INFO'] = request['path']
1375 environ['SERVER_PROTOCOL'] = request['protocol']
1376 environ['SERVER_PORT'] = str(conn.server_port)
1377 environ['REMOTE_PORT'] = str(conn.client_port)
1378 environ['REMOTE_ADDR'] = str(conn.client_addr)
1379 environ['QUERY_STRING'] = request['query_string']
1380 if 'HTTP_CONTENT_LENGTH' in environ:
1381 environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH']
1382 if 'HTTP_CONTENT_TYPE' in environ:
1383 environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE']
1384
1385
1386 self.request_method = environ['REQUEST_METHOD']
1387
1388
1389 if conn.ssl:
1390 environ['wsgi.url_scheme'] = 'https'
1391 environ['HTTPS'] = 'on'
1392 else:
1393 environ['wsgi.url_scheme'] = 'http'
1394
1395 if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
1396 environ['wsgi.input'] = ChunkedReader(sock_file)
1397 else:
1398 environ['wsgi.input'] = sock_file
1399
1400 return environ
1401
1403 h_set = self.header_set
1404
1405
1406 self.chunked = h_set.get('transfer-encoding', '').lower() == 'chunked'
1407
1408
1409 if not 'date' in h_set:
1410 h_set['Date'] = formatdate(usegmt=True)
1411
1412
1413 if not 'server' in h_set:
1414 h_set['Server'] = HTTP_SERVER_SOFTWARE
1415
1416 if 'content-length' in h_set:
1417 self.size = int(h_set['content-length'])
1418 else:
1419 s = int(self.status.split(' ')[0])
1420 if s < 200 or s not in (204, 205, 304):
1421 if not self.chunked:
1422 if sections == 1:
1423
1424 h_set['Content-Length'] = str(len(data))
1425 self.size = len(data)
1426 else:
1427
1428 h_set['Transfer-Encoding'] = 'Chunked'
1429 self.chunked = True
1430 if __debug__:
1431 self.err_log.debug('Adding header...'
1432 'Transfer-Encoding: Chunked')
1433
1434 if 'connection' not in h_set:
1435
1436 client_conn = self.environ.get('HTTP_CONNECTION', '').lower()
1437 if self.environ['SERVER_PROTOCOL'] == 'HTTP/1.1':
1438
1439 if client_conn:
1440 h_set['Connection'] = client_conn
1441 else:
1442 h_set['Connection'] = 'keep-alive'
1443 else:
1444
1445 h_set['Connection'] = 'close'
1446
1447
1448 self.closeConnection = h_set.get('connection', '').lower() == 'close'
1449
1450
1451 header_data = HEADER_RESPONSE % (self.status, str(h_set))
1452
1453
1454 if __debug__:
1455 self.err_log.debug('Sending Headers: %s' % repr(header_data))
1456 self.conn.sendall(b(header_data))
1457 self.headers_sent = True
1458
1460 self.err_log.warning('WSGI app called write method directly. This is '
1461 'deprecated behavior. Please update your app.')
1462 return self.write(data, sections)
1463
1464 - def write(self, data, sections=None):
1465 """ Write the data to the output socket. """
1466
1467 if self.error[0]:
1468 self.status = self.error[0]
1469 data = b(self.error[1])
1470
1471 if not self.headers_sent:
1472 self.send_headers(data, sections)
1473
1474 if self.request_method != 'HEAD':
1475 try:
1476 if self.chunked:
1477 self.conn.sendall(b('%x\r\n%s\r\n' % (len(data), data)))
1478 else:
1479 self.conn.sendall(data)
1480 except socket.error:
1481
1482
1483 self.closeConnection = True
1484
1486 """ Store the HTTP status and headers to be sent when self.write is
1487 called. """
1488 if exc_info:
1489 try:
1490 if self.headers_sent:
1491
1492
1493 raise
1494 finally:
1495 exc_info = None
1496 elif self.header_set:
1497 raise AssertionError("Headers already set!")
1498
1499 if PY3K and not isinstance(status, str):
1500 self.status = str(status, 'ISO-8859-1')
1501 else:
1502 self.status = status
1503
1504 try:
1505 self.header_set = Headers(response_headers)
1506 except UnicodeDecodeError:
1507 self.error = ('500 Internal Server Error',
1508 'HTTP Headers should be bytes')
1509 self.err_log.error('Received HTTP Headers from client that contain'
1510 ' invalid characters for Latin-1 encoding.')
1511
1512 return self.write_warning
1513
1515 self.size = 0
1516 self.header_set = Headers([])
1517 self.headers_sent = False
1518 self.error = (None, None)
1519 self.chunked = False
1520 sections = None
1521 output = None
1522
1523 if __debug__:
1524 self.err_log.debug('Getting sock_file')
1525
1526
1527 sock_file = conn.makefile('rb',BUF_SIZE)
1528
1529 try:
1530
1531 self.environ = environ = self.build_environ(sock_file, conn)
1532
1533
1534 if environ.get('HTTP_EXPECT', '') == '100-continue':
1535 res = environ['SERVER_PROTOCOL'] + ' 100 Continue\r\n\r\n'
1536 conn.sendall(b(res))
1537
1538
1539 output = self.app(environ, self.start_response)
1540
1541 if not hasattr(output, '__len__') and not hasattr(output, '__iter__'):
1542 self.error = ('500 Internal Server Error',
1543 'WSGI applications must return a list or '
1544 'generator type.')
1545
1546 if hasattr(output, '__len__'):
1547 sections = len(output)
1548
1549 for data in output:
1550
1551 if data:
1552 self.write(data, sections)
1553
1554 if self.chunked:
1555
1556 self.conn.sendall(b('0\r\n\r\n'))
1557 elif not self.headers_sent:
1558
1559 self.send_headers('', sections)
1560
1561
1562
1563 finally:
1564 if __debug__:
1565 self.err_log.debug('Finally closing output and sock_file')
1566
1567 if hasattr(output,'close'):
1568 output.close()
1569
1570 sock_file.close()
1571
1572
1573
1574
1575
1576
1577
1579 global static_folder
1580 import os
1581 types = {'htm': 'text/html','html': 'text/html','gif': 'image/gif',
1582 'jpg': 'image/jpeg','png': 'image/png','pdf': 'applications/pdf'}
1583 if static_folder:
1584 if not static_folder.startswith('/'):
1585 static_folder = os.path.join(os.getcwd(),static_folder)
1586 path = os.path.join(static_folder, environ['PATH_INFO'][1:] or 'index.html')
1587 type = types.get(path.split('.')[-1],'text')
1588 if os.path.exists(path):
1589 try:
1590 pathfile = open(path,'rb')
1591 try:
1592 data = pathfile.read()
1593 finally:
1594 pathfile.close()
1595 start_response('200 OK', [('Content-Type', type)])
1596 except IOError:
1597 start_response('404 NOT FOUND', [])
1598 data = '404 NOT FOUND'
1599 else:
1600 start_response('500 INTERNAL SERVER ERROR', [])
1601 data = '500 INTERNAL SERVER ERROR'
1602 else:
1603 start_response('200 OK', [('Content-Type', 'text/html')])
1604 data = '<html><body><h1>Hello from Rocket Web Server</h1></body></html>'
1605 return [data]
1606
1608 from optparse import OptionParser
1609 parser = OptionParser()
1610 parser.add_option("-i", "--ip", dest="ip",default="127.0.0.1",
1611 help="ip address of the network interface")
1612 parser.add_option("-p", "--port", dest="port",default="8000",
1613 help="post where to run web server")
1614 parser.add_option("-s", "--static", dest="static",default=None,
1615 help="folder containing static files")
1616 (options, args) = parser.parse_args()
1617 global static_folder
1618 static_folder = options.static
1619 print 'Rocket running on %s:%s' % (options.ip, options.port)
1620 r=Rocket((options.ip,int(options.port)),'wsgi', {'wsgi_app':demo_app})
1621 r.start()
1622
1623 if __name__=='__main__':
1624 demo()
1625