MobileBlur

scheduler.py at [b2d6f6e2e9]
Login

scheduler.py at [b2d6f6e2e9]

File gluon/scheduler.py artifact b19c6df200 part of check-in b2d6f6e2e9


#### WORK IN PROGRESS... NOT SUPPOSED TO WORK YET

USAGE = """
## Example

For any existing app

Create File: app/models/scheduler.py ======
from gluon.scheduler import Scheduler

def demo1(*args,**vars):
    print 'you passed args=%s and vars=%s' % (args, vars)
    return 'done!'

def demo2():
    1/0

scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2))
## run worker nodes with:

   cd web2py
   python gluon/scheduler.py -u sqlite://storage.sqlite \
                             -f applications/myapp/databases/ \
                             -t mytasks.py
(-h for info)
python scheduler.py -h

## schedule jobs using
http://127.0.0.1:8000/scheduler/appadmin/insert/db/scheduler_task

## monitor scheduled jobs
http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_task.id>0

## view completed jobs
http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_run.id>0

## view workers
http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_worker.id>0

## Comments
"""

import os
import time
import multiprocessing
import sys
import cStringIO
import threading
import traceback
import signal
import socket
import datetime
import logging
import optparse

try:
    from gluon.contrib.simplejson import loads, dumps
except:
    from simplejson import loads, dumps

if 'WEB2PY_PATH' in os.environ:
    sys.path.append(os.environ['WEB2PY_PATH'])
else:
    os.environ['WEB2PY_PATH'] = os.getcwd()

from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET
from gluon.utils import web2py_uuid

QUEUED = 'QUEUED'
ASSIGNED = 'ASSIGNED'
RUNNING = 'RUNNING'
COMPLETED = 'COMPLETED'
FAILED = 'FAILED'
TIMEOUT = 'TIMEOUT'
STOPPED = 'STOPPED'
ACTIVE = 'ACTIVE'
INACTIVE = 'INACTIVE'
DISABLED = 'DISABLED'
SECONDS = 1
HEARTBEAT = 3*SECONDS

class Task(object):
    def __init__(self,app,function,timeout,args='[]',vars='{}',**kwargs):
        logging.debug(' new task allocated: %s.%s' % (app,function))
        self.app = app
        self.function = function
        self.timeout = timeout
        self.args = args # json
        self.vars = vars # json
        self.__dict__.update(kwargs)
    def __str__(self):
        return '<Task: %s>' % self.function

class TaskReport(object):
    def __init__(self,status,result=None,output=None,tb=None):
        logging.debug('    new task report: %s' % status)
        if tb:
            logging.debug('   traceback: %s' % tb)
        else:
            logging.debug('   result: %s' % result)
        self.status = status
        self.result = result
        self.output = output
        self.tb = tb
    def __str__(self):
        return '<TaskReport: %s>' % self.status

def demo_function(*argv,**kwargs):
    """ test function """
    for i in range(argv[0]):
        print 'click',i
        time.sleep(1)    
    return 'done'
    
#the two functions below deal with simplejson decoding as unicode, esp for the dict decode
#and subsequent usage as function Keyword arguments unicode variable names won't work!
#borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python
def _decode_list(lst):
    newlist = []
    for i in lst:
        if isinstance(i, unicode):
            i = i.encode('utf-8')
        elif isinstance(i, list):
            i = _decode_list(i)
        newlist.append(i)
    return newlist

def _decode_dict(dct):
    newdict = {}
    for k, v in dct.iteritems():
        if isinstance(k, unicode):
            k = k.encode('utf-8')
        if isinstance(v, unicode):
             v = v.encode('utf-8')
        elif isinstance(v, list):
            v = _decode_list(v)
        newdict[k] = v
    return newdict
    
def executor(queue,task):
    """ the background process """
    logging.debug('    task started')
    stdout, sys.stdout = sys.stdout, cStringIO.StringIO()
    try:        
        if task.app:
            os.chdir(os.environ['WEB2PY_PATH'])
            from gluon.shell import env
            from gluon.dal import BaseAdapter
            from gluon import current
            level = logging.getLogger().getEffectiveLevel()
            logging.getLogger().setLevel(logging.WARN)
            _env = env(task.app,import_models=True)
            logging.getLogger().setLevel(level)
            scheduler = current._scheduler
            scheduler_tasks = current._scheduler.tasks            
            _function = scheduler_tasks[task.function]
            globals().update(_env)            
            args = loads(task.args)
            vars = loads(task.vars, object_hook=_decode_dict)
            result = dumps(_function(*args,**vars))
        else:
            ### for testing purpose only
            result = eval(task.function)(*loads(task.args, list_hook),**loads(task.vars, object_hook=_decode_dict))
        stdout, sys.stdout = sys.stdout, stdout
        queue.put(TaskReport(COMPLETED, result,stdout.getvalue()))
    except BaseException,e:
        sys.stdout = stdout
        tb = traceback.format_exc()
        queue.put(TaskReport(FAILED,tb=tb))

class MetaScheduler(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.process = None     # the backround process
        self.have_heartbeat = True   # set to False to kill
    def async(self,task):
        """
        starts the background process and returns:
        ('ok',result,output)
        ('error',exception,None)
        ('timeout',None,None)
        ('terminated',None,None)
        """
        queue = multiprocessing.Queue(maxsize=1)
        p = multiprocessing.Process(target=executor,args=(queue,task))        
        self.process = p
        logging.debug('   task starting')
        p.start()
        try:
            p.join(task.timeout)
        except:
            p.terminate()
            p.join()
            self.have_heartbeat = False
            logging.debug('    task stopped')
            return TaskReport(STOPPED)
        if p.is_alive():
            p.terminate()
            p.join()            
            logging.debug('    task timeout')
            return TaskReport(TIMEOUT)
        elif queue.empty():
            self.have_heartbeat = False
            logging.debug('    task stopped')
            return TaskReport(STOPPED)
        else:
            logging.debug('  task completed or failed')
            return queue.get()

    def die(self):
        logging.info('die!')
        self.have_heartbeat = False
        self.terminate_process()
        
    def terminate_process(self):
        try:
            self.process.terminate()
        except:
            pass # no process to terminate

    def run(self):
        """ the thread that sends heartbeat """
        counter = 0
        while self.have_heartbeat:            
            self.send_heartbeat(counter)
            counter += 1

    def start_heartbeats(self):
        self.start()
        
    def send_heartbeat(self,counter):
        print 'thum'
        time.sleep(1)
            
    def pop_task(self):
        return Task(
            app = None,
            function = 'demo_function',
            timeout = 7,
            args = '[2]',
            vars = '{}')

    def report_task(self,task,task_report):
        print 'reporting task'
        pass
    
    def sleep(self):
        pass

    def loop(self):
        try:
            self.start_heartbeats()
            while True and self.have_heartbeat:
                logging.debug('looping...')
                task = self.pop_task()
                if task:
                    self.report_task(task,self.async(task))
                else:
                    logging.debug('sleeping...')
                    self.sleep()
        except KeyboardInterrupt:
            self.die()


TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
WORKER_STATUS = (ACTIVE,INACTIVE,DISABLED)

class TYPE(object):
    """
    validator that check whether field is valid json and validate its type
    """

    def __init__(self,myclass=list,parse=False):
        self.myclass = myclass
        self.parse=parse

    def __call__(self,value):
        from gluon import current
        try:
            obj = loads(value)
        except:
            return (value,current.T('invalid json'))
        else:
            if isinstance(obj,self.myclass):
                if self.parse:
                    return (obj,None)
                else:
                    return (value,None)
            else:
                return (value,current.T('Not of type: %s') % self.myclass)

class Scheduler(MetaScheduler):
    def __init__(self,db,tasks={},migrate=True,
                 worker_name=None,group_names=None,heartbeat=HEARTBEAT):

        MetaScheduler.__init__(self)

        self.db = db
        self.db_thread = None
        self.tasks = tasks
        self.group_names = group_names or ['main']
        self.heartbeat = heartbeat
        self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid())

        from gluon import current
        current._scheduler = self        

        self.define_tables(db,migrate=migrate)

    def define_tables(self,db,migrate):
        from gluon import current
        logging.debug('defining tables (migrate=%s)' % migrate)
        now = datetime.datetime.now()
        db.define_table(
            'scheduler_task',
            Field('application_name',requires=IS_NOT_EMPTY(),
                  default=None,writable=False),
            Field('task_name',requires=IS_NOT_EMPTY()),
            Field('group_name',default='main',writable=False),
            Field('status',requires=IS_IN_SET(TASK_STATUS),
                  default=QUEUED,writable=False),
            Field('function_name',
                  requires=IS_IN_SET(sorted(self.tasks.keys()))),
            Field('args','text',default='[]',requires=TYPE(list)),
            Field('vars','text',default='{}',requires=TYPE(dict)),
            Field('enabled','boolean',default=True),
            Field('start_time','datetime',default=now),
            Field('next_run_time','datetime',default=now),
            Field('stop_time','datetime',default=now+datetime.timedelta(days=1)),
            Field('repeats','integer',default=1,comment="0=unlimted"),
            Field('period','integer',default=60,comment='seconds'),
            Field('timeout','integer',default=60,comment='seconds'),
            Field('times_run','integer',default=0,writable=False),
            Field('last_run_time','datetime',writable=False,readable=False),
            Field('assigned_worker_name',default='',writable=False),
            migrate=migrate,format='%(task_name)s')
        if hasattr(current,'request'):
            db.scheduler_task.application_name.default=current.request.application

        db.define_table(
            'scheduler_run',
            Field('scheduler_task','reference scheduler_task'),
            Field('status',requires=IS_IN_SET(RUN_STATUS)),
            Field('start_time','datetime'),
            Field('stop_time','datetime'),
            Field('output','text'),
            Field('result','text'),
            Field('traceback','text'),
            Field('worker_name',default=self.worker_name),
            migrate=migrate)

        db.define_table(
            'scheduler_worker',
            Field('worker_name'),
            Field('first_heartbeat','datetime'),
            Field('last_heartbeat','datetime'),
            Field('status',requires=IS_IN_SET(WORKER_STATUS)),
            migrate=migrate)
        db.commit()

    def loop(self,worker_name=None):
        MetaScheduler.loop(self)

    def pop_task(self):
        now = datetime.datetime.now()
        db, ts = self.db, self.db.scheduler_task        
        try:
            logging.debug(' grabbing all queued tasks')
            all_available = db(ts.status.belongs((QUEUED,RUNNING)))\
                ((ts.times_run<ts.repeats)|(ts.repeats==0))\
                (ts.start_time<=now)\
                (ts.stop_time>now)\
                (ts.next_run_time<=now)\
                (ts.enabled==True)\
                (ts.assigned_worker_name.belongs((None,'',self.worker_name))) #None?
            number_grabbed = all_available.update(
                assigned_worker_name=self.worker_name,status=ASSIGNED)
            db.commit()
        except:
            db.rollback()
        logging.debug('  grabbed %s tasks' % number_grabbed)
        if number_grabbed:
            grabbed = db(ts.assigned_worker_name==self.worker_name)\
                (ts.status==ASSIGNED)
            task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first()
                                  
            logging.debug('   releasing all but one (running)')
            if task:
                task.update_record(status=RUNNING,last_run_time=now)
                grabbed.update(assigned_worker_name='',status=QUEUED)
                db.commit()
        else:
            return None
        next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period)
        times_run = task.times_run + 1
        if times_run < task.repeats or task.repeats==0:
            run_again = True
        else:
            run_again = False
        logging.debug('    new scheduler_run record')    
        while True:
            try:
                run_id = db.scheduler_run.insert(
                    scheduler_task = task.id,
                    status=RUNNING,
                    start_time=now,
                    worker_name=self.worker_name)
                db.commit()
                break
            except:
                db.rollback
        logging.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task)
        return Task(
            app = task.application_name,
            function = task.function_name,
            timeout = task.timeout,
            args = task.args, #in json
            vars = task.vars, #in json
            task_id = task.id,
            run_id = run_id,
            run_again = run_again,
            next_run_time=next_run_time,
            times_run = times_run)

    def report_task(self,task,task_report):
        logging.debug(' recording task report in db (%s)' % task_report.status)
        db = self.db
        db(db.scheduler_run.id==task.run_id).update(
            status = task_report.status,
            stop_time = datetime.datetime.now(),
            result = task_report.result,
            output = task_report.output,
            traceback = task_report.tb)        
        if task_report.status == COMPLETED:
            d = dict(status = task.run_again and QUEUED or COMPLETED,
                     next_run_time = task.next_run_time,
                     times_run = task.times_run,
                     assigned_worker_name = '')
        else:
            d = dict(
                assigned_worker_name = '',
                status = {'FAILED':'FAILED',
                          'TIMEOUT':'TIMEOUT',
                          'STOPPED':'QUEUED'}[task_report.status])
        db(db.scheduler_task.id==task.task_id)\
            (db.scheduler_task.status==RUNNING).update(**d)
        db.commit()
        logging.info('task completed (%s)' % task_report.status)    
    
    def send_heartbeat(self,counter):
        if not self.db_thread:
            logging.debug('thread building own DAL object')    
            self.db_thread = DAL(self.db._uri,folder = self.db._adapter.folder)
            self.define_tables(self.db_thread,migrate=False)
        try:
            db = self.db_thread
            sw, st = db.scheduler_worker, db.scheduler_task
            now = datetime.datetime.now()
            expiration = now-datetime.timedelta(seconds=self.heartbeat*3)    
            # record heartbeat
            logging.debug('........recording heartbeat')    
            if not db(sw.worker_name==self.worker_name)\
                    .update(last_heartbeat = now, status = ACTIVE):
                sw.insert(status = ACTIVE,worker_name = self.worker_name,
                          first_heartbeat = now,last_heartbeat = now)
            if counter % 10 == 0:
                # deallocate jobs assigned to inactive workers and requeue them
                logging.debug('    freeing workers that have not sent heartbeat')    
                inactive_workers = db(sw.last_heartbeat<expiration)
                db(st.assigned_worker_name.belongs(
                        inactive_workers._select(sw.worker_name)))\
                        (st.status.belongs((RUNNING,ASSIGNED,QUEUED)))\
                        .update(assigned_worker_name='',status=QUEUED)
                inactive_workers.delete()
            db.commit()
        except:
            db.rollback()
        time.sleep(self.heartbeat)
    
    def sleep(self):
        time.sleep(self.heartbeat) # should only sleep until next available task

def main():
    """
    allows to run worker without python web2py.py .... by simply python this.py
    """
    parser = optparse.OptionParser()
    parser.add_option(
        "-w", "--worker_name", dest="worker_name", default=None,
        help="start a worker with name")
    parser.add_option(
        "-b", "--heartbeat",dest="heartbeat", default = 10,
        help="heartbeat time in seconds (default 10)")
    parser.add_option(
        "-L", "--logger_level",dest="logger_level",
        default = 'INFO',
        help="level of logging (DEBUG, INFO, WARNING, ERROR)")
    parser.add_option(
        "-g", "--group_names",dest="group_names",
        default = 'main',
        help="comma separated list of groups to be picked by the worker")
    parser.add_option(
        "-f", "--db_folder",dest="db_folder",
        default = '/Users/mdipierro/web2py/applications/scheduler/databases',
        help="location of the dal database folder")
    parser.add_option(
        "-u", "--db_uri",dest="db_uri",
        default = 'sqlite://storage.sqlite',
        help="database URI string (web2py DAL syntax)")
    parser.add_option(
        "-t", "--tasks",dest="tasks",default=None,
        help="file containing task files, must define" + \
            "tasks = {'task_name':(lambda: 'output')} or similar set of tasks")
    (options, args) = parser.parse_args()
    if not options.tasks or not options.db_uri:
        print USAGE
    if options.tasks:
        path,filename = os.path.split(options.tasks)
        if filename.endswith('.py'):
            filename = filename[:-3]
        sys.path.append(path)
        print 'importing tasks...'
        tasks = __import__(filename, globals(), locals(), [], -1).tasks
        print 'tasks found: '+', '.join(tasks.keys())
    else:
        tasks = {}
    group_names = [x.strip() for x in options.group_names.split(',')]

    logging.getLogger().setLevel(logging.DEBUG)

    print 'groups for this worker: '+', '.join(group_names)
    print 'connecting to database in folder: ' + options.db_folder or './'
    print 'using URI: '+options.db_uri
    db = DAL(options.db_uri,folder=options.db_folder)
    print 'instantiating scheduler...'
    scheduler=Scheduler(db = db,
                        worker_name = options.worker_name,
                        tasks = tasks,
                        migrate = True,
                        group_names = group_names,
                        heartbeat = options.heartbeat)
    print 'starting main worker loop...'
    scheduler.loop()

if __name__=='__main__':
    main()