Check-in [0d51a24c5d]
Not logged in

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Made the logic simply. Added tests.
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA1:0d51a24c5de0696f90551d523d9e0d9210bee892
User & Date: yusuke 2010-10-23 23:28:20
Context
2010-10-24
01:19
Added a doctest. Added author and setup information. check-in: b2ea003738 user: yusuke tags: trunk
2010-10-23
23:28
Made the logic simply. Added tests. check-in: 0d51a24c5d user: yusuke tags: trunk
2010-10-22
06:35
updated a method name to 'bind_to'. updated to use key for refer. check-in: 78c1ff7dfe user: yusuke tags: trunk
Changes

Changes to gaedeferred/__init__.py.


1
2
3
4
5
6



7
8
9
10
11







12
13
14
15


16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

40
41
42
43


44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

62





63
64

65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129

from sys import exc_info
from pickle import loads
from google.appengine.ext import db, deferred

class Transaction(db.Model):
    """ An entity group folder """




class Callback(db.Model):
    data = db.BlobProperty(required=True)
    next_callback = db.SelfReferenceProperty(collection_name='previous_callback_set')
    next_errback = db.SelfReferenceProperty(collection_name='previous_errback_set')








    def __call__(self, result):
        try:
            func, args, kwargs = loads(self.data)


            # insert the result at appropriate place.
            if func == deferred.invoke_member:
                args = args[0:2] + (result,) + args[2:]
            else:
                args = (result,) + args
        except Exception, e:
            raise deferred.PermanentTaskFailure(e)
        try:
            result = func(*args, **kwargs)
            if isinstance(result, Deferred):
                """ connect the end of result to next callback"""
        except Exception:
            logging.debug("an exception occurs in execution.", exc_info=True)
            if self.next_errback:
                self.next_errback(Failure(exc_info()))
            else:
                finish(self.parent())
        else:
            if self.next_callback:
                self.next_callback(result)
            else:
                finish(self.parent())

class Failure(deferred.Error):

    def __init__(self, exc_value=None, exc_type=None, exc_tb=None):
        self.value = exc_value
        self.type = exc_type
        self.tb = exc_tb



def finish(transaction):
    db.delete([transaction] + Callback.all().ancestor(transaction))

import logging
def _dummy_func(result):
    logging.debug(str(result))
    return result

def dummy_callback(transaction):
    return Callback(transaction, data=deferred.serialize(_dummy_func))

def build_callback(transaction, func_pair):
    if func_pair[0] is not None:
        return Callback(transaction, data=deferred.serialize(func_pair[0], *func_pair[1], **func_pair[2]))

class Runner(object):
    def __init__(self, callback_key, errback_key, obj, *args, **kwargs):

        self.callback_key = callback_key





        self.errback_key = errback_key
        self.data = deferred.serialize(obj, *args, **kwargs)


    def __call__(self):
        try:
            result = deferred.run(self.data)
            callback_key = self.callback_key
        except deferred.PermanentTaskFailure:
            logging.exception("The callback chain is stop")
            raise
        except Exception:
            logging.debug("an exception occurred in first callable evaluation, but this task still continue to errback chain.", exc_info=True)
            result = Failure(exc_info())
            callback_key = self.errback_key
        callback = Callback.get(callback_key)
        callback(result)

# isn't the deferred class to be the transaction object?
class Deferred(object):
    def __init__(self):
        self._callback_list = []
    def addCallbacks(self, callback, errback=None, callbackArgs=None, callbackKeywords=None, errbackArgs=None, errbackKeywords=None):
        return self._callback_list.append(((callback, callbackArgs, callbackKeywords),
                                           (errback, errbackArgs, errbackKeywords)))
    def addCallback(self, callback, *args, **kwargs):
        return self._callback_list.append(((callback, args, kwargs),
                                           (None, None, None)))
    def addErrback(self, errback, *args, **kwargs):
        return self._callback_list.append(((None, None, None),
                                           (errback, args, kwargs)))
    def addBoth(self, callback, *args, **kwargs):
        return self._callback_list.append(((callback, args, kwargs),
                                           (callback, args, kwargs)))
    # def chainDeferred(self, d):
    #     return self.addCallbacks(d.callback, d.errback)
    def bind_to(self, obj, *args, **kwargs):
        # by the del statement. you can't add callback anymore.
        del self._callback_list
        def func():
            transaction = Transaction()
            transaction.put()
            callback = dummy_callback(transaction)
            errback = dummy_callback(transaction)
            deferred.defer(Runner(callback.put(), errback.put(), obj, *args, **kwargs), _transactional=True)
            for callback_pair, errback_pair in self._callback_list:
                next_callback = build_callback(transaction, callback_pair)
                next_errback = build_callback(transaction, errback_pair)
                if next_callback is not None:
                    next_callback.put()
                    callback.succeeding_callback = next_callback
                    errback.succeeding_callback = next_callback
                    callback.put()
                    errback.put()
                if next_errback is not None:
                    next_errback.put()
                    callback.succeeding_errback = next_errback
                    errback.succeeding_errback = next_errback
                    callback.put()
                    errback.put()
                if next_callback is not None:
                    callback = next_callback
                if next_errback is not None:
                    errback = next_errback

        db.run_in_transaction(func)

__all__ = ['Deferred']
>


|

<
|
>
>
>

<
<
<
|
>
>
>
>
>
>
>

<
|
|
>
>
|
|
|
|
|
<
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<

<
>
|
<
<
<
>
>

<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
>
|
>
>
>
>
>
|
<
>



|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
<
1
2
3
4
5

6
7
8
9
10



11
12
13
14
15
16
17
18
19

20
21
22
23
24
25
26
27
28



29













30

31
32



33
34
35

















36
37
38
39
40
41
42
43

44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108

import logging
from sys import exc_info
from pickle import loads
from google.appengine.ext import deferred


class Failure(deferred.Error):
    def __init__(self, exc_value=None, exc_type=None):
        self.value = exc_value
        self.type = exc_type




def run_with_result(data, result):
    """Unpickles and executes a task with the result that is returned from previous function.
    Args:
        data: A pickled tuple of (function, args, kwargs) to execute.
        result: The returned value of previous function.
    Returns:
        The return value of the function invocation.
    """


    try:
        func, args, kwargs = loads(data)
    except Exception, e:
        raise deferred.PermanentTaskFailure(e)
    # insert the result at appropriate place.
    if func == deferred.invoke_member:
        args = args[0:2] + (result,) + args[2:]
    else:
        args = (result,) + args



    return func(*args, **kwargs)















class Deferred(object):
    def __init__(self, obj, *args, **kwargs):



        self.startup = deferred.serialize(obj, *args, **kwargs)
        self.callback_pairs = []


















    def _callback_chain(self):
        while self.callback_pairs:
            callback, errback = self.callback_pairs.pop(0)
            if self._in_callback:
                if callback:
                    yield callback
            else:
                if errback:

                    yield errback

    def __call__(self):
        try:
            result = deferred.run(self.startup)
            logging.debug("The result of the startup execution is %s", result)
            self._in_callback = True
        except deferred.PermanentTaskFailure:
            raise
        except Exception:
            result = Failure(*exc_info()[:2])
            logging.debug("An exception occurs in the startup execution.", exc_info=True)
            self._in_callback = False

        for callback in self._callback_chain():
            try:
                result = run_with_result(callback, result)
                logging.debug("The result of a callback is %s", result)
                self._in_callback = True
                if isinstance(result, Deferred):
                    result.callback_pairs.extend(self.callback_pairs)
                    # assign the result deferred to new task.
                    deferred.defer(result)
            except deferred.PermanentTaskFailure:
                raise
            except Exception:
                result = Failure(*exc_info()[:2])
                logging.debug("An exception occurs in callback.", exc_info=True)
                self._in_callback = False

    def addCallback(self, callback, *args, **kwargs):
        return self.addCallbacks(callback=callback,
                                 callbackArgs=args,
                                 callbackKeywords=kwargs)
    next = addCallback # alias

    def addErrback(self, errback, *args, **kwargs):
        return self.addCallbacks(errback=errback,
                                 errbackArgs=args,
                                 errbackKeywords=kwargs)
    error = addErrback # alias

    def addBoth(self, callback, *args, **kwargs):
        return self.addCallbacks(callback=callback,
                                 callbackArgs=args,
                                 callbackKeywords=kwargs,
                                 errback=callback,
                                 errbackArgs=args,
                                 errbackKeywords=kwargs)

    def addCallbacks(self, callback=None, callbackArgs=(), callbackKeywords={},
                     errback=None, errbackArgs=(), errbackKeywords={}):
        callback_data = None
        if callback:
            callback_data = deferred.serialize(callback,
                                               *callbackArgs,
                                               **callbackKeywords)
        errback_data = None
        if errback:
            errback_data = deferred.serialize(errback,
                                              *errbackArgs,
                                              **errbackKeywords)
        self.callback_pairs.append((callback_data, errback_data))

__all__ = ['Deferred']

Added tests/conftest.py.





































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from os import path, sys
sys.path.insert(0, path.join(*path.split(path.abspath(path.dirname(__file__)))[:-1]))

import py

def mimic_defer(obj, *args, **kwargs):
    obj(*args, **kwargs)

def pytest_addoption(parser):
    parser.addoption("--appengine", help="The path to google appengine runtime")

def pytest_configure(config):
    runtime_path = config.getvalue("appengine") 
    if runtime_path:
        sys.path.insert(0, path.join(runtime_path, "lib/yaml/lib"))
        sys.path.insert(0, path.join(runtime_path, "lib/webob"))
        sys.path.insert(0, runtime_path)

def pytest_runtest_setup(item):
    try:
        from google.appengine.ext import deferred 
        deferred.defer = mimic_defer
    except ImportError:
        py.skip("Ensure the path of google appengine for testing")

def pytest_funcarg__context(request):
    return Context(request)

class Context(object):
    def __init__(self, request):
        self.config = request.config

    def defer(self, d, *args, **kwargs):
        mimic_defer(d, *args, **kwargs)

Added tests/test_deferred.py.





























































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from gaedeferred import Deferred, Failure

def value(value=0):
    return value

def increase(result, value=1):
    return result + value

def raise_(result):
    raise Exception("This exception occured from raise_.")

def must_be_failure(result):
    assert isinstance(result, Failure)
    assert result.value == "This exception occured from raise_."
    assert result.type == Exception

def must_be(result, value):
    assert result == value

class Object(object):
    def multiple(self, result, value=2):
        return result * value

def test_deferred(context):
    d = Deferred(value, 0)
    d.addCallback(increase)
    d.addBoth(increase, 2)
    d.addCallbacks(callback=increase, callbackArgs=(3,))
    d.next(must_be, 5) # test for the alias
    d.next(Object().multiple)
    d.next(must_be, 10)
    d.next(raise_)
    d.addErrback(must_be_failure)
    d.next(raise_)
    d.error(must_be_failure) # test for the alias
    context.defer(d)

def test_chain(context):
    d1 = Deferred(value, 0)
    d2 = Deferred(value, 10)
    d1.next(increase, 5)
    d1.next(value, d2)
    d2.next(increase, 3)
    d2.next(must_be, 13)
    d1.next(must_be, 13)
    context.defer(d1)