Newsgrouper

distcl.tcl at tip
Login

distcl.tcl at tip

File scripts/distcl.tcl from the latest check-in


# DisTcl - Distributed Programming Infrastructure for Tcl

# DisTcl provides a communication channel between clients and named services.
# Each named service can process certain forms of request.  A service may be
# implemented by multiple server processes and used by multiple client
# processes.  All these processes may be on different machines.  Redis
# (or one of the compatible forks) is used to communicate between clients
# and servers, and to cache results.


# Details of operation:
#
# A request to service "abc" for key "def" will proceed as follows:
# The value may be requested by writing def to redis queue q:abc .
# A prefetch may be requested by writing def to redis queue p:abc .
#
# A server reads "def" from q:abc or p:abc, and pushes 0 (for a prefetch)
# or 1 (for a get) onto list r:abc:def. If this list already had an entry,
# this request is already being computed by another server so skip it, otherwise...
#
# The server computes value "ghi" with status s from key "def" .
# The server writes "s:ghi" to the Value cache v:abc:def .
# The server pushes "s:ghi" to the Waitlist w:abc:def as many times as there
# are get requests recorded in r:abc:def .
# Each client which requested the value for def reads it from queue w:abc:def .
# After this, any client which requests this value will read it from v:abc:def .
#
# Each server also monitors an individual control queue z:id through which
# it can be requested to shut down cleanly.

package require retcl

namespace eval distcl {

# Loop serving requests, will continue until told 'stop' via the control queue.
#
# redis - a retcl connection to redis, authenticated if necessary;
# service - name of the service being provided;
# proc - command to call to process the request and return its value.
# id - optional identifier for this service instance.
proc serve {redis service proc {id {}}} {
    set reqqueue q:$service
    set prequeue p:$service
    if {$id eq {}} {set id [pid]}
    set ctlqueue z:$id
    puts stderr "Control queue is '$ctlqueue'"
    set verbose 0

    while 1 {
        # wait for a request to appear on one of the queues
        set qreq [$redis -sync blpop $ctlqueue $reqqueue $prequeue 60]
        if {$qreq eq "(nil)"} {
            # keep things alive?
            continue
        }
        lassign $qreq queue request
        if {$verbose} {puts "QUEUE $queue REQUEST '$request'"}

        # server control request?
        if {$queue eq $ctlqueue} {
	    switch -glob -- $request {
	        stop break
	        v* {set verbose 1}
	        q* {set verbose 0}
	    }
            continue
	}

        # request is get or prefetch
        set is_get [expr {$queue eq $reqqueue}]
        set runlist r:${service}:$request
        set runcount [$redis -sync rpush $runlist $is_get]
        # is the same request already running on another server?
        if {$runcount > 1} continue

        # call the request processor
        $redis -sync expire $runlist 10
        set status [catch {$proc {*}$request} value options]
        set result ${status}:$value

        # cache the result if an expiry time was specified
	if {[dict exists $options -secs2keep]} {
	    set expiry [dict get $options -secs2keep]
            $redis -sync set v:${service}:$request $result ex $expiry
        }

        # push the result to the waitlist for each client waiting
        set requests [$redis -sync lpop $runlist 999]
        if {$requests eq {(nil)}} {set requests {}}
        set waiters [tcl::mathop::+ {*}$requests]
        if {$waiters} {
            set waitlist w:${service}:$request
            while {$waiters} {
                $redis -sync rpush $waitlist $result
                incr waiters -1
            }
            $redis -sync expire $waitlist 10
        }
    }
}


# Request the data computed by service for these arguments.
#
# redis - a retcl connection to Redis, authenticated if necessary;
# service - name of the service to call;
# args - one or more arguments to pass to the service.
proc get {redis service args} {
    set key v:${service}:$args
    # try to read the data from the cache
    set res [$redis -sync get $key]
    if {$res eq "(nil)"} {
        # data not in cache, send a request for it
        $redis -sync rpush q:$service $args

        # wait for the data to be returned in the waitlist
        set qres [$redis -sync blpop w:${service}:$args 10]
        # if 10 second timeout expired, report error
        if {$qres eq "(nil)"} {error "Request for '$key' timed out."}
        set res [lindex $qres 1]
    }
    # parse the result and return it
    if {[string index $res 1] ne ":"} {error "Malformed result for '$key'."}
    set status [string index $res 0]
    set value [string range $res 2 end]
    return -code $status $value
}

# Request that a data item be precomputed as it will soon be needed.
# We don't wait for the reply, so multiple prefetches can be issued
# and processed in parallel if multiple servers are available.
#
# redis - a retcl connection to Redis, authenticated if necessary;
# service - name of the service to call;
# args - one or more arguments to pass to the service.
proc prefetch {redis service args} {
    set key v:${service}:$args
    # check if it's already cached
    if {! [$redis -sync exists $key]} {
        # not cached, send request to precompute it
        $redis -sync rpush p:$service $args
    }
}

# Remove a previously-computed data item from the cache.
#
# redis - a retcl connection to Redis, authenticated if necessary;
# service - name of the service;
# args - one or more arguments.
proc forget {redis service args} {
    set key v:${service}:$args
    # waitlist could be left behind by a crash, so delete it too
    set waitlist w:${service}:$args
    $redis -sync del $key $waitlist
}

}