MobileBlur

test_dal.py at [da2e9d5879]
Login

test_dal.py at [da2e9d5879]

File gluon/tests/test_dal.py artifact ace25276bd part of check-in da2e9d5879


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
    Unit tests for gluon.sql
"""

import sys
import os
if os.path.isdir('gluon'):
    sys.path.append(os.path.realpath('gluon'))
else:
    sys.path.append(os.path.realpath('../'))

import unittest
import datetime
from dal import DAL, Field, Table, SQLALL

ALLOWED_DATATYPES = [
    'string',
    'text',
    'integer',
    'boolean',
    'double',
    'blob',
    'date',
    'time',
    'datetime',
    'upload',
    'password',
    ]


def setUpModule():
    pass

def tearDownModule():
    if os.path.isfile('sql.log'):
        os.unlink('sql.log')


class TestFields(unittest.TestCase):

    def testFieldName(self):

        # Check that Fields cannot start with underscores
        self.assertRaises(SyntaxError, Field, '_abc', 'string')

        # Check that Fields cannot contain punctuation other than underscores
        self.assertRaises(SyntaxError, Field, 'a.bc', 'string')

        # Check that Fields cannot be a name of a method or property of Table
        for x in ['drop', 'on', 'truncate']:
            self.assertRaises(SyntaxError, Field, x, 'string')

        # Check that Fields allows underscores in the body of a field name.
        self.assert_(Field('a_bc', 'string'),
            "Field isn't allowing underscores in fieldnames.  It should.")

    def testFieldTypes(self):

        # Check that string, text, and password default length is 512
        for typ in ['string', 'password']:
            self.assert_(Field('abc', typ).length == 512,
                         "Default length for type '%s' is not 512 or 255" % typ)

        # Check that upload default length is 512
        self.assert_(Field('abc', 'upload').length == 512,
                     "Default length for type 'upload' is not 128")

        # Check that Tables passed in the type creates a reference
        self.assert_(Field('abc', Table(None, 'temp')).type
                      == 'reference temp',
                     'Passing an Table does not result in a reference type.')

    def testFieldLabels(self):

        # Check that a label is successfully built from the supplied fieldname
        self.assert_(Field('abc', 'string').label == 'Abc',
                     'Label built is incorrect')
        self.assert_(Field('abc_def', 'string').label == 'Abc Def',
                     'Label built is incorrect')

    def testFieldFormatters(self):  # Formatter should be called Validator

        # Test the default formatters
        for typ in ALLOWED_DATATYPES:
            f = Field('abc', typ)
            if typ not in ['date', 'time', 'datetime']:
                isinstance(f.formatter('test'), str)
            else:
                isinstance(f.formatter(datetime.datetime.now()), str)

    def testRun(self):
        db = DAL('sqlite:memory:')
        for ft in ['string', 'text', 'password', 'upload', 'blob']:
            db.define_table('t', Field('a', ft, default=''))
            self.assertEqual(db.t.insert(a='x'), 1)
            self.assertEqual(db().select(db.t.a)[0].a, 'x')
            db.t.drop()
        db.define_table('t', Field('a', 'integer', default=1))
        self.assertEqual(db.t.insert(a=3), 1)
        self.assertEqual(db().select(db.t.a)[0].a, 3)
        db.t.drop()
        db.define_table('t', Field('a', 'double', default=1))
        self.assertEqual(db.t.insert(a=3.1), 1)
        self.assertEqual(db().select(db.t.a)[0].a, 3.1)
        db.t.drop()
        db.define_table('t', Field('a', 'boolean', default=True))
        self.assertEqual(db.t.insert(a=True), 1)
        self.assertEqual(db().select(db.t.a)[0].a, True)
        db.t.drop()
        db.define_table('t', Field('a', 'date',
                        default=datetime.date.today()))
        t0 = datetime.date.today()
        self.assertEqual(db.t.insert(a=t0), 1)
        self.assertEqual(db().select(db.t.a)[0].a, t0)
        db.t.drop()
        db.define_table('t', Field('a', 'datetime',
                        default=datetime.datetime.today()))
        t0 = datetime.datetime(
            1971,
            12,
            21,
            10,
            30,
            55,
            0,
            )
        self.assertEqual(db.t.insert(a=t0), 1)
        self.assertEqual(db().select(db.t.a)[0].a, t0)
        db.t.drop()
        db.define_table('t', Field('a', 'time', default='11:30'))
        t0 = datetime.time(10, 30, 55)
        self.assertEqual(db.t.insert(a=t0), 1)
        self.assertEqual(db().select(db.t.a)[0].a, t0)
        db.t.drop()


class TestAll(unittest.TestCase):

    def setUp(self):
        self.pt = Table(None,'PseudoTable',Field('name'),Field('birthdate'))

    def testSQLALL(self):
        ans = 'PseudoTable.id, PseudoTable.name, PseudoTable.birthdate'
        self.assertEqual(str(SQLALL(self.pt)), ans)


class TestTable(unittest.TestCase):

    def testTableCreation(self):

        # Check for error when not passing type other than Field or Table

        self.assertRaises(SyntaxError, Table, None, 'test', None)

        persons = Table(None, 'persons',
                        Field('firstname','string'), 
                        Field('lastname', 'string'))
        
        # Does it have the correct fields?

        self.assert_(set(persons.fields).issuperset(set(['firstname',
                                                         'lastname'])))

        # ALL is set correctly

        self.assert_('persons.firstname, persons.lastname'
                      in str(persons.ALL))

    def testTableAlias(self):
        db = DAL('sqlite:memory:')
        persons = Table(db, 'persons', Field('firstname',
                           'string'), Field('lastname', 'string'))
        aliens = persons.with_alias('aliens')

        # Are the different table instances with the same fields

        self.assert_(persons is not aliens)
        self.assert_(set(persons.fields) == set(aliens.fields))

    def testTableInheritance(self):
        persons = Table(None, 'persons', Field('firstname',
                           'string'), Field('lastname', 'string'))
        customers = Table(None, 'customers',
                             Field('items_purchased', 'integer'),
                             persons)
        self.assert_(set(customers.fields).issuperset(set(
            ['items_purchased', 'firstname', 'lastname'])))


class TestInsert(unittest.TestCase):

    def testRun(self):
        db = DAL('sqlite:memory:')
        db.define_table('t', Field('a'))
        self.assertEqual(db.t.insert(a='1'), 1)
        self.assertEqual(db.t.insert(a='1'), 2)
        self.assertEqual(db.t.insert(a='1'), 3)
        self.assertEqual(db(db.t.a == '1').count(), 3)
        self.assertEqual(db(db.t.a == '1').update(a='2'), 3)
        self.assertEqual(db(db.t.a == '2').count(), 3)
        self.assertEqual(db(db.t.a == '2').delete(), 3)
        db.t.drop()


class TestSelect(unittest.TestCase):

    def testRun(self):
        db = DAL('sqlite:memory:')
        db.define_table('t', Field('a'))
        self.assertEqual(db.t.insert(a='1'), 1)
        self.assertEqual(db.t.insert(a='2'), 2)
        self.assertEqual(db.t.insert(a='3'), 3)
        self.assertEqual(len(db(db.t.id > 0).select()), 3)
        self.assertEqual(db(db.t.id > 0).select(orderby=~db.t.a
                          | db.t.id)[0].a, '3')
        self.assertEqual(len(db(db.t.id > 0).select(limitby=(1, 2))), 1)
        self.assertEqual(db(db.t.id > 0).select(limitby=(1, 2))[0].a,
                         '2')
        self.assertEqual(len(db().select(db.t.ALL)), 3)
        self.assertEqual(len(db(db.t.a == None).select()), 0)
        self.assertEqual(len(db(db.t.a != None).select()), 3)
        self.assertEqual(len(db(db.t.a > '1').select()), 2)
        self.assertEqual(len(db(db.t.a >= '1').select()), 3)
        self.assertEqual(len(db(db.t.a == '1').select()), 1)
        self.assertEqual(len(db(db.t.a != '1').select()), 2)
        self.assertEqual(len(db(db.t.a < '3').select()), 2)
        self.assertEqual(len(db(db.t.a <= '3').select()), 3)
        self.assertEqual(len(db(db.t.a > '1')(db.t.a < '3').select()), 1)
        self.assertEqual(len(db((db.t.a > '1') & (db.t.a < '3')).select()), 1)
        self.assertEqual(len(db((db.t.a > '1') | (db.t.a < '3')).select()), 3)
        self.assertEqual(len(db((db.t.a > '1') & ~(db.t.a > '2')).select()), 1)
        self.assertEqual(len(db(~(db.t.a > '1') & (db.t.a > '2')).select()), 0)
        db.t.drop()


class TestBelongs(unittest.TestCase):

    def testRun(self):
        db = DAL('sqlite:memory:')
        db.define_table('t', Field('a'))
        self.assertEqual(db.t.insert(a='1'), 1)
        self.assertEqual(db.t.insert(a='2'), 2)
        self.assertEqual(db.t.insert(a='3'), 3)
        self.assertEqual(len(db(db.t.a.belongs(('1', '3'))).select()),
                         2)
        self.assertEqual(len(db(db.t.a.belongs(db(db.t.id
                          > 2)._select(db.t.a))).select()), 1)
        self.assertEqual(len(db(db.t.a.belongs(db(db.t.a.belongs(('1',
                         '3')))._select(db.t.a))).select()), 2)
        self.assertEqual(len(db(db.t.a.belongs(db(db.t.a.belongs(db
                         (db.t.a.belongs(('1', '3')))._select(db.t.a)))._select(
                         db.t.a))).select()),
                         2)
        db.t.drop()


class TestLike(unittest.TestCase):

    def testRun(self):
        db = DAL('sqlite:memory:')
        db.define_table('t', Field('a'))
        self.assertEqual(db.t.insert(a='abc'), 1)
        self.assertEqual(len(db(db.t.a.like('a%')).select()), 1)
        self.assertEqual(len(db(db.t.a.like('%b%')).select()), 1)
        self.assertEqual(len(db(db.t.a.like('%c')).select()), 1)
        self.assertEqual(len(db(db.t.a.like('%d%')).select()), 0)
        self.assertEqual(len(db(db.t.a.lower().like('A%')).select()), 1)
        self.assertEqual(len(db(db.t.a.lower().like('%B%')).select()),
                         1)
        self.assertEqual(len(db(db.t.a.lower().like('%C')).select()), 1)
        self.assertEqual(len(db(db.t.a.upper().like('A%')).select()), 1)
        self.assertEqual(len(db(db.t.a.upper().like('%B%')).select()),
                         1)
        self.assertEqual(len(db(db.t.a.upper().like('%C')).select()), 1)
        db.t.drop()


class TestDatetime(unittest.TestCase):

    def testRun(self):
        db = DAL('sqlite:memory:')
        db.define_table('t', Field('a', 'datetime'))
        self.assertEqual(db.t.insert(a=datetime.datetime(1971, 12, 21,
                         11, 30)), 1)
        self.assertEqual(db.t.insert(a=datetime.datetime(1971, 11, 21,
                         10, 30)), 2)
        self.assertEqual(db.t.insert(a=datetime.datetime(1970, 12, 21,
                         9, 30)), 3)
        self.assertEqual(len(db(db.t.a == datetime.datetime(1971, 12,
                         21, 11, 30)).select()), 1)
        self.assertEqual(len(db(db.t.a.year() == 1971).select()), 2)
        self.assertEqual(len(db(db.t.a.month() == 12).select()), 2)
        self.assertEqual(len(db(db.t.a.day() == 21).select()), 3)
        self.assertEqual(len(db(db.t.a.hour() == 11).select()), 1)
        self.assertEqual(len(db(db.t.a.minutes() == 30).select()), 3)
        self.assertEqual(len(db(db.t.a.seconds() == 0).select()), 3)
        db.t.drop()


class TestExpressions(unittest.TestCase):

    def testRun(self):
        db = DAL('sqlite:memory:')
        db.define_table('t', Field('a', 'integer'))
        self.assertEqual(db.t.insert(a=1), 1)
        self.assertEqual(db.t.insert(a=2), 2)
        self.assertEqual(db.t.insert(a=3), 3)
        self.assertEqual(db(db.t.a == 3).update(a=db.t.a + 1), 1)
        self.assertEqual(len(db(db.t.a == 4).select()), 1)
        db.t.drop()


class TestJoin(unittest.TestCase):

    def testRun(self):
        db = DAL('sqlite:memory:')
        db.define_table('t1', Field('a'))
        db.define_table('t2', Field('a'), Field('b', db.t1))
        i1 = db.t1.insert(a='1')
        i2 = db.t1.insert(a='2')
        i3 = db.t1.insert(a='3')
        db.t2.insert(a='4', b=i1)
        db.t2.insert(a='5', b=i2)
        db.t2.insert(a='6', b=i2)
        self.assertEqual(len(db(db.t1.id
                          == db.t2.b).select(orderby=db.t1.a
                          | db.t2.a)), 3)
        self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.a
                          | db.t2.a)[2].t1.a, '2')
        self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.a
                          | db.t2.a)[2].t2.a, '6')
        self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL,
                         left=db.t2.on(db.t1.id == db.t2.b),
                         orderby=db.t1.a | db.t2.a)), 4)
        self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
                         left=db.t2.on(db.t1.id == db.t2.b),
                         orderby=db.t1.a | db.t2.a)[2].t1.a, '2')
        self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
                         left=db.t2.on(db.t1.id == db.t2.b),
                         orderby=db.t1.a | db.t2.a)[2].t2.a, '6')
        self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
                         left=db.t2.on(db.t1.id == db.t2.b),
                         orderby=db.t1.a | db.t2.a)[3].t1.a, '3')
        self.assertEqual(db().select(db.t1.ALL, db.t2.ALL,
                         left=db.t2.on(db.t1.id == db.t2.b),
                         orderby=db.t1.a | db.t2.a)[3].t2.a, None)
        self.assertEqual(len(db().select(db.t1.ALL, db.t2.id.count(),
                         left=db.t2.on(db.t1.id == db.t2.b),
                         orderby=db.t1.a | db.t2.a, groupby=db.t1.a)),
                         3)
        self.assertEqual(db().select(db.t1.ALL, db.t2.id.count(),
                         left=db.t2.on(db.t1.id == db.t2.b),
                         orderby=db.t1.a | db.t2.a,
                         groupby=db.t1.a)[0]._extra[db.t2.id.count()],
                         1)
        self.assertEqual(db().select(db.t1.ALL, db.t2.id.count(),
                         left=db.t2.on(db.t1.id == db.t2.b),
                         orderby=db.t1.a | db.t2.a,
                         groupby=db.t1.a)[1]._extra[db.t2.id.count()],
                         2)
        self.assertEqual(db().select(db.t1.ALL, db.t2.id.count(),
                         left=db.t2.on(db.t1.id == db.t2.b),
                         orderby=db.t1.a | db.t2.a,
                         groupby=db.t1.a)[2]._extra[db.t2.id.count()],
                         0)
        db.t1.drop()
        db.t2.drop()


class TestMinMaxSum(unittest.TestCase):

    def testRun(self):
        db = DAL('sqlite:memory:')
        db.define_table('t', Field('a', 'integer'))
        self.assertEqual(db.t.insert(a=1), 1)
        self.assertEqual(db.t.insert(a=2), 2)
        self.assertEqual(db.t.insert(a=3), 3)
        s = db.t.a.min()
        self.assertEqual(db(db.t.id > 0).select(s)[0]._extra[s], 1)
        s = db.t.a.max()
        self.assertEqual(db(db.t.id > 0).select(s)[0]._extra[s], 3)
        s = db.t.a.sum()
        self.assertEqual(db(db.t.id > 0).select(s)[0]._extra[s], 6)
        s = db.t.a.count()
        self.assertEqual(db(db.t.id > 0).select(s)[0]._extra[s], 3)
        db.t.drop()


#class TestCache(unittest.
#    def testRun(self):
#        cache = cache.ram
#        db = DAL('sqlite:memory:')
#        db.define_table('t', Field('a'))
#        db.t.insert(a='1')
#        r1 = db().select(db.t.ALL, cache=(cache, 1000))
#        db.t.insert(a='1')
#        r2 = db().select(db.t.ALL, cache=(cache, 1000))
#        self.assertEqual(r1.response, r2.response)
#        db.t.drop()


class TestMigrations(unittest.TestCase):

    def testRun(self):
        db = DAL('sqlite://.storage.db')
        db.define_table('t', Field('a'), migrate='.storage.table')
        db.commit()
        db = DAL('sqlite://.storage.db')
        db.define_table('t', Field('a'), Field('b'),
                        migrate='.storage.table')
        db.commit()
        db = DAL('sqlite://.storage.db')
        db.define_table('t', Field('a'), Field('b', 'text'),
                        migrate='.storage.table')
        db.commit()
        db = DAL('sqlite://.storage.db')
        db.define_table('t', Field('a'), migrate='.storage.table')
        db.t.drop()
        db.commit()

    def tearDown(self):
        os.unlink('.storage.db')

class TestReferece(unittest.TestCase):

    def testRun(self):
        db = DAL('sqlite:memory:')
        db.define_table('t', Field('name'), Field('a','reference t'))
        db.commit()
        x = db.t.insert(name='max')
        assert x.id == 1
        assert x['id'] == 1
        x.a = x
        assert x.a == 1
        x.update_record()
        y = db.t[1]
        assert y.a == 1
        assert y.a.a.a.a.a.a.name == 'max'
        z=db.t.insert(name='xxx', a = y)
        assert z.a == y.id
        db.t.drop()
        db.commit()

class TestClientLevelOps(unittest.TestCase):

    def testRun(self):
        db = DAL('sqlite:memory:')
        db.define_table('t', Field('a'))
        db.commit()
        db.t.insert(a="test")
        rows1 = db(db.t.id>0).select()
        rows2 = db(db.t.id>0).select()
        rows3 = rows1 & rows2
        assert len(rows3) == 2
        rows4 = rows1 | rows2
        assert len(rows4) == 1
        rows5 = rows1.find(lambda row: row.a=="test")
        assert len(rows5) == 1
        rows6 = rows2.exclude(lambda row: row.a=="test")
        assert len(rows6) == 1
        rows7 = rows5.sort(lambda row: row.a)
        assert len(rows7) == 1
        db.t.drop()
        db.commit()


class TestVirtualFields(unittest.TestCase):

    def testRun(self):
        db = DAL('sqlite:memory:')
        db.define_table('t', Field('a'))
        db.commit()
        db.t.insert(a="test")
        class Compute:
            def a_upper(row): return row.t.a.upper()
        db.t.virtualfields.append(Compute())
        assert db(db.t.id>0).select().first().a_upper == 'TEST'
        db.t.drop()
        db.commit()


if __name__ == '__main__':
    unittest.main()
    tearDownModule()