#! /usr/bin/env tclsh
package require {ycl proc}
[yclprefix] proc alias alias [yclprefix] proc alias
alias aliases [yclprefix] proc aliases
aliases {
{ycl chan chan} {
command
}
{ycl coro call} {
hi
}
{ycl dict deep}
{ycl dict} {
ddict deep
}
{ycl db sqlite util} {
column
table
dbget_ get
lossless
minpagesize
explain_pretty
}
{ycl list} {
take
}
{ycl math rand} {
randbytes
}
{ycl package} {
vcomp
}
{ycl proc} {
optswitch
}
{ycl string printable}
{ycl struct tree}
}
package require sha256c
package require sqlite3
proc .id {. _ args} {
dbget_ [list $_ db] {select v from system where e = 0 and a = 'id'}
}
.my .method .id
proc .init {. _ args} {
variable defaults
variable magic
variable peertypeb
variable version
$_ .vars connections connectmethods counter dbcreated hostsnode \
initialized newrepos pathsnode repositoriesnode topnode workdir
dict size $args
set counter [clock milliseconds]
set create 0
set newrepos 0
set topnode {}
foreach {key val} $args {
switch $key {
create - workdir {
set $key $val
}
default {
error [list {unknown option} $key]
}
}
}
dict set connectmethods sqlite_compressed connect_sqlite_compressed
dict set connectmethods $peertypeb connect_peer
set varns [$_ .namespace]::var
set dbroutine ${varns}::db
set dbname [file join $workdir system]
if {$create} {
file mkdir $workdir
}
$_ = defaults $defaults
$_ = peertypeb $peertypeb
sqlite3 $dbroutine $dbname -create $create
namespace eval $varns [list $_ .routine db]
$_ db transaction {
$_ dbsetup
tree .new ${varns}::tree
namespace eval $varns [list $_ .routine tree]
$_ tree .init dbconn $dbroutine
$_ tree db transaction {
set magicnode [$_ tree node pivot? $topnode $magic]
if {$magicnode eq {}} {
if {$create} {
$_ tree node forge $topnode $magic
$_ tree node forge $topnode paths
$_ tree node forge $topnode hosts
$_ tree node forge $topnode repositories
} else {
error [list {not a keep database}]
}
}
if {$dbcreated} {
$_ version {*}$version
} else {
$_ versionupdate
}
set pathsnode [$_ tree node pivot $topnode paths]
set hostsnode [$_ tree node pivot $topnode hosts]
set repositoriesnode [$_ tree node pivot $topnode repositories]
}
}
set connections {}
$_ connections
set initialized 1
return $_
}
.my .method .init
proc addpath {. _ path} {
$_ .vars pathsnode
$_ tree node forge $pathsnode $path
}
.my .method addpath
proc addrepotype {. _ type} {
variable sql_repotype_rowid_by_name
variable sql_repotype_insert
$_ db transaction {
set found [$_ db onecolumn $sql_repotype_rowid_by_name]
if {$found eq {}} {
set created 1
$_ db eval $sql_repotype_insert
set found [$_ db onecolumn $sql_repotype_rowid_by_name]
} else {
set created 0
}
}
return [list {repository type} id $found created $created]
}
.my .method addrepotype
proc call {. _ pubkey host port args} {
set chan [socket -async $host $port]
try {
set key [$_ pkikey]
ddict set header counter [$_ counter]
ddict set header id [$_ .id]
set msg [list $header $args]
set msg [pki::encrypt -pad -binary -pub $msg $pubkey]
ddict set header2 encrypted 1
set msg [list [list $header2 $msg]]
puts $chan $msg
flush $chan
$_ call_readresponse $chan response
lassign $response[set response {}] header response
if {![dict exists $header encrypted]} {
error [list {received unencrypted response}]
}
set response [pki::decrypt -unpad -binary -pub $response[
set response {}] $pubkey]
lassign $response[set response {}] header response
lassign $response[set response {}] res opts
} finally {
if {$chan in [chan names]} {
close $chan
}
}
return -options $opts $res
}
.my .method call
proc call_insecure {. _ host port args} {
set chan [socket -async $host $port]
chan configure $chan -encoding utf-8
$_ waitconn $chan
try {
set msg [list [list {} $args]]
puts [list calling host $host port $port chan $chan msg $msg]
puts $chan $msg
flush $chan
$_ call_readresponse $chan response
lassign $response[set response {}] header response
lassign $response[set response {}] res opts
} finally {
if {$chan in [chan names]} {
close $chan
}
}
return -options $opts $res
}
.my .method call_insecure
proc call_readresponse {. _ chan responsevar} {
upvar 1 $responsevar var
puts [list {waiting for response}]
command $chan response
if {[llength $response]} {
# only set res if there was a real response
set var $response
} else {
if {[eof $chan]} {
set error {end of channel}
} else {
set error [chan configure $chan -error]
}
error [list chan $chan error $error]
}
}
.my .method call_readresponse
proc connect {. _ type instance args} {
$_ .vars connectmethods
set routine [dict get $connectmethods $type]
puts [list connection routine is $routine]
$_ $routine $instance {*}$args
}
.my .method connect
proc connect_peer {. _ instance args} {
$_ .vars peertypeb repositoriesnode
set conn [$_ .namespace]::[info cmdcount]_conn
$_ .eval [list $_ .routine $conn]
alias $conn $_ peer_interface $instance
return $conn
}
.my .method connect_peer
proc peer_interface {. _ instance args} {
$_ .vars defaults repositoriesnode
foreach arg $args {
puts [list jub $arg]
}
$_ tree node ls& $repositoriesnode {
upvar args args defaults defaults _ _
set id [$_ tree node last $node id]
set host [$_ tree node last $node host]
set portnode [$_ tree node pivot? $node port]
if {$portnode eq {}} {
set port [dict get $defaults listen port]
} else {
set port [$_ tree node last $portnode]
}
set pubkey [$_ tree node last $node pubkey]
foreach arg $args {
puts [list funk [printable tcl 0 ascii 0 $arg]]
}
error [list piddle [printable tcl 0 ascii 0 $pubkey] $host $port]
}
return
}
.my .method peer_interface
proc connect_sqlite_compressed {. _ instance args} {
package require {ycl struct map sqlite_compressed}
while {[llength $args]} {
take args arg
optswitch $arg {
path {
take args path
}
}
}
if {![info exists path]} {
set path [$_ db onecolumn {
select path from repositories where instance = @instance
}]
}
set syspath [file join $path system]
set connection [$_ .namespace]::[info cmdcount]_conn
[yclprefix] struct map sqlite_compressed .new $connection
$_ .eval [list $_ .routine $connection]
$_ $connection .init path $syspath
set signature [$_ $connection .id]
if {$signature ne $instance} {
error [list {wrong instance}]
}
return $connection
}
.my .method connect_sqlite_compressed
proc connections {. _} {
$_ .vars connections newrepos pathsnode
set conncount 0
$_ db transaction {
$_ db eval {
select repositories.rowid as rowid ,instance ,path,
repotype.name as type from repositories
join repotype on repositories.typeid = repotype.rowid
} {
puts [list {attempt to connect to keep repository} $rowid $path]
if {![dict exists $connections $rowid]} {
set success 0
try {
set connection [$_ connect $type $instance]
dict set connections $rowid $connection
} on error {tres topts} {
puts [list {keep connection error}]
puts [dict get $topts -errorinfo]
$_ db eval {
update repositories set accessfailtime = date('now')
, connectfailure = connectfailure + 1
where instance = $instance
; insert into repository_errors (
rowid , date , repo , type , error , errorinfo
) values (
null ,date('now') ,$instance , 'connect' ,$tres
,$topts
)
}
set success 0
$_ tree node ls $pathsnode {
set path $value
try {
$_ connect $type $instance path $path
} on error {tres topts} {
lappend errors $tres
} on ok {} {
set success 1
break
}
}
} on ok {cres copts} {
puts [list {keep connection to} $path]
set success 1
}
if {$success} {
incr conncount
set origpath [$_ db onecolumn {
update repositories set accesstime = date('now')
, connectsuccess = connectsuccess + 1
where instance = $instance
; select path from repositories where instance = $instance
}]
puts [list {connected to keep repository} $path]
if {$path ne $origpath} {
$_ db eval {
update repositories set path = $path
, connectsuccess = connectsuccess + 1
where instance = $instance
}
}
}
}
}
}
set newrepos 0
return
}
.my .method connections
proc counter {. _} {
$_ .vars counter
incr counter
}
.my .method counter
proc db_table_repositories {. _} {
if {![column exists [list $_ db] repositories accessfailtime]} {
$_ db eval {
alter table repositories add column accessfailtime numeric
}
}
if {![column exists [list $_ db] repositories connectattempts]} {
$_ db eval {
alter table repositories add column connectattempts numeric
}
}
if {![column exists [list $_ db] repositories lasterror]} {
$_ db eval {
alter table repositories add column lasterror
}
}
if {![column exists [list $_ db] repositories readattempts]} {
$_ db eval {
alter table repositories add column readattempts numeric
}
}
if {![column exists [list $_ db] repositories writeattempts]} {
$_ db eval {
alter table repositories add column writeattempts numeric
}
}
return
}
.my .method db_table_repositories
proc db_table_repository_errors {. _} {
if {![table exists [list $_ db] repository_errors]} {
$_ db eval {
; create table repository_errors (
rowid integer primary key autoincrement
, date
, repo
, type
, error
, errorinfo
)
}
} elseif {![column exists [list $_ db] repository_errors errorinfo]} {
update table repository_errors add column errorinfo
}
return
}
.my .method db_table_repository_errors
proc dbsetup {. _} {
variable magicb
$_ .vars dbcreated workdir
set dbcreated 0
if {[$_ db exists {select 1 from sqlite_master where type = 'table'}]} {
if {![table exists [list $_ db] system]
|| ![$_ db exists {
select 1 from system
where e = 0 and a = 'typeid' and v = @magicb}]
} {
error [list {not a valid keep work directory}]
}
} else {
minpagesize [list $_ db] 8192
}
$_ db transaction {
$_ db eval {
create table if not exists system (
rowid integer primary key
,e ,a ,v
)
; create table if not exists keys (
rowid integer primary key
, key unique not null
)
; create unique index if not exists idx_keys_key_unique on keys (
key
)
; create table if not exists holdings (
rowid integer primary key
, keyid integer not null
, repositoryid integer not null
)
; create unique index if not exists index_holdings
on holdings (
keyid ,repositoryid
)
; create table if not exists repositories (
rowid integer primary key autoincrement
, typeid integer not null
, instance unique
, path
, connectattempts numeric
, connectsuccess numeric
, connectfailure numeric
, readattempts numeric
, readsuccess numeric
, readfailure numeric
, writeattempts numeric
, writesuccess numeric
, writefailure numeric
, readcorrupt numeric
, accesstime numeric
, accessfailtime numeric
, lasterror
)
;create table if not exists repotype (
rowid integer primary key
, name
)
}
if {![$_ db exists {select 1 from system where e = 0 and a = 'typeid'}]} {
set id [$_ randbytes]
$_ db transaction {
$_ db eval {
insert into system (rowid ,e ,a ,v) values (null , 0 ,'typeid' ,@magicb)
; insert into system (rowid ,e ,a ,v) values (null , 0 ,'id',@id)
; insert into system (rowid ,e ,a ,v) values (null ,0 ,'version' ,0.1)
; insert into system (rowid ,e ,a ,v) values (null , 0 ,'epoch', 0)
}
}
set dbcreated 1
}
$_ db_table_repositories
$_ db_table_repository_errors
}
if {![$_ db exists {
select * from system
where e = 0 and a = 'typeid' and v = @magicb
}]} {
error [list {not a valid keep work directory} $workdir]
}
return
}
.my .method dbsetup
proc defaults {. _} {
$_ $ defaults
}
.my .method defaults
proc dispatch {. _ usercmd args} {
switch $usercmd {
.id - distribute - missing - pubkey - retrieve - retrievem - set
- setbatch {
set cmd $usercmd
}
default {
error [list {command not allowed} $usercmd]
}
}
$_ $cmd {*}$args
}
.my .method dispatch
proc distribute {. _} {
$_ .vars connections
puts [list distributing]
set conncount [dict size $connections]
set batchsize [expr {1 << 12}]
set stored 0
set existing 0
set total 0
$_ holdingsreport
dict for {repo connection} $connections {
set othercons [dict filter $connections script {key val} {
if {$key eq $repo} continue
lindex 1
}]
puts [list {distributing from repo}]
set repoinfo [$_ repository info repo $repo]
foreach {key val} $repoinfo {
puts [list repoinfo $key [printable tcl 0 $val]]
}
puts {}
set batch {}
$_ db eval {
select h1.rowid as rowid ,h1.keyid , keys.key
,(
select count(*) from holdings as h2
where h2.keyid = h1.keyid
) as count
from holdings as h1
join keys on h1.keyid = keys.rowid
where h1.repositoryid = $repo
and count < $conncount
order by count
} {
lappend batch $key
if {[llength $batch] >= $batchsize} {
lassign [$_ distributebatch $repo $batch] \
stored1 existing1
incr stored $stored1
incr existing $existing1
incr total [llength $batch]
set batch {}
}
}
if {[llength $batch]} {
$_ distributebatch $repo $batch
incr total [llength $batch]
set batch {}
}
}
puts [list distributed cuts $total stored $stored existing $existing]
$_ holdingsreport
return
}
.my .method distribute
proc distributebatch {. _ repo batch} {
set batch [$_ retrievem $batch[set batch {}] repo $repo]
if {[llength $batch]} {
$_ setbatch $batch
} else {
return [list 0 0]
}
}
.my .method distributebatch
proc existing {. _ keys} {
set res {}
$_ db transaction {
foreach key $keys {
if {[$_ db exists {
select 1 from keys where key = @key
}]} {
lappend res 1
} else {
lappend res 0
}
}
}
return $res
}
.my .method existing
proc exists {. _ key} {
set res [$_ db exists {
select 1 from keys where key = @key
}]
return $res
}
.my .method exists
proc inventory {. _} {
$_ .vars connections
$_ db transaction {
set keys [$_ .namespace]::[info cmdcount]_keys
puts [list bleep $connections]
dict for {repo connection} $connections {
$_ $connection keys $keys
while 1 {
if {[incr i] % 10000 == 0} {
puts [list {keep inventory processed} $i]
}
set returned [yieldto $keys [info coroutine]]
set key [return -level 0 {*}$returned]
$_ updateholdings $repo [list $key]
}
}
}
return
}
.my .method inventory
proc holdingsreport {. _ } {
$_ .vars connections
puts [list {holdings report}]
$_ db eval {
select repositoryid ,count(keyid) as count
from holdings
group by repositoryid
} {
if {[dict exists $connections $repositoryid]} {
set conn [dict get $connections $repositoryid]
set size [$_ $conn size]
set maxsize [$_ $conn maxsize]
} else {
set size disconnected
set maxsize disconnected
}
set path [$_ db onecolumn {
select path from repositories
where rowid = $repositoryid
}]
puts [list repository [printable $repositoryid] path $path items $count \
size $size maxsize $maxsize]
}
return
}
.my .method holdingsreport
proc open {. _ key} {
$_ .vars connections
dict for {repo connection} $connections {
try {
set res [$_ $connection open $key]
} on error {cres copts} {
# to do
# log this error
continue
}
return $res
}
error [list {could not retrieve chunk}]
}
.my .method open
proc pkikey {. _} {
$_ .vars pkinode topnode
package require pki
if {[info exists pkinode]} {
set key [$_ tree node last $pkinode key]
} else {
set key [$_ tree node pivot? $topnode pki key]
if {$key eq {}} {
set key [pki::rsa::generate 2048]
set cert [self_sign $key CN me]
set public [pki::x509::parse_cert $cert]
$_ tree db transaction {
lassign [$_ tree node forge $topnode pki] pkinode
$_ tree node set $pkinode key $key
$_ tree node set $pkinode cert $cert
$_ tree node set $pkinode public $public
}
}
set pkinode [$_ tree node pivot $topnode pki]
}
return $key
}
.my .method pkikey
proc addpubkey {. _ args} {
$_ .vars repositoriesnode
while {[llength $args]} {
take args arg
optswitch $arg {
id {
take args id
}
pubkey {
take args pubkey
}
}
}
$_ tree node transaction {
set found [$_ tree node findeq& $repositoriesnode $id]
optswitch [llength $found] {
0 {
set peer [$_tree node new $repositoriesnode]
$_ tree node set $peer id $id
}
1 {
set peer [lindex $found 0]
}
}
$_ tree node set $peer pubkey $pubkey
}
return
}
.my .method addpubkey
proc getpubkey {. _ id} {
$_ .vars repositoriesnode
set idnode [$_ tree node findeq $repositoriesnode $id]
set peernode [$_ tree node up& $idnode]
set pubkey [$_ tree node last $peernode pubkey]
return $pubkey
}
.my .method getpubkey
proc pubkey {. _} {
$_ pkikey
$_ .vars pkinode
set public [$_ tree node last $pkinode public]
return $public
}
.my .method pubkey
proc self_sign {key args} {
set csr [pki::pkcs::create_csr $key $args 1]
set csr [pki::pkcs::parse_csr $csr]
dict for {n v} $args {lappend subject "$n=$v"}
lappend key subject [join $subject ", "]
set crt [::pki::x509::create_cert $csr $key 1 [clock seconds] [clock seconds] 1 [list] 1]
}
proc randbytes_ {. _} {
randbytes 32
}
.my .method randbytes randbytes_
namespace eval repository {
namespace ensemble create -parameters {. _} -map {
add add
add_peer add_peer
add_sqlite add_sqlite
info info_
rm rm
}
namespace path [list [namespace parent]]
proc add {. _ type args} {
$_ .vars newrepos
dict size $args
switch $type {
peer {
$_ repository add_peer
}
sqlite {
foreach {opt val} $args {
switch $opt {
path {
set $opt $val
}
default {
error [list {unknown option} $opt]
}
}
}
$_ repository add_sqlite $path
}
default {
error [list {unknown type} $type]
}
}
incr newrepos
}
proc add_peer {. _ args} {
namespace upvar [namespace parent] sql_peer_add sql_peer_add
$_ .vars hostsnode peertypeb repositoriesnode
set trusted 0
$_ db transaction {
while {[llength $args]} {
take args arg
optswitch $arg {
name - id - host - port - pubkey {
take args $arg
}
trusted {
take args trusted
set trusted [expr {!!$trusted}]
}
}
}
set keys [list $host]
if {[info exists port]} {
set port1 $port
lappend keys $port
} else {
$_ .vars defaults
set port1 $defaults
ddict get port1 listen port
}
set attributes [list name $name type $peertypeb \
host $host trusted $trusted]
if {![info exists pubkey]} {
set pubkey [$_ call_insecure $host $port1 pubkey]
}
lappend attributes pubkey $pubkey
if {![info exists id]} {
set id [$_ call $pubkey $host $port1 .id]
}
lappend attributes id $id
$_ tree node forge $hostsnode {*}$keys
$_ repository_entry $peertypeb $id {}
if {[info exists port]} {
lappend attributes port $port
}
set new [$_ tree node new $repositoriesnode {}]
$_ tree node setd $new {*}$attributes
$_ addrepotype $peertypeb
set instance [$_ randbytes]
$_ db eval $sql_peer_add
}
}
proc add_sqlite {. _ path} {
$_ .vars connections
package require {ycl struct map sqlite_compressed}
# preserive any final symlink
set path [file normalize $path]
set system [file join $path system]
file mkdir $path
set repo [$_ .namespace]::[info cmdcount]_repo
[yclprefix] struct map sqlite_compressed .new $repo
$_ .eval [list $_ .routine $repo]
set newid [$_ randbytes]
$_ $repo .init path $system id $newid create 1
set instance [$_ $repo .id]
$_ db transaction {
$_ addrepotype sqlite_compressed
$_ repository_entry sqlite_compressed $instance $path
set rowid [$_ db onecolumn {
select rowid from repositories
where instance = @instance
}]
if {[dict exists $connections $rowid]} {
rename $repo {}
$_ .rm $repo
} else {
dict set connections $rowid $repo
}
$_ addpath $path
}
ddict set res {new repository} $rowid
return $res
}
proc info_ {. _ args} {
set repos {}
set res {}
while {[llength $args]} {
take args arg
optswitch $arg {
repo {
take args arg
lappend repos $arg
}
repos {
take args arg
lappend repos {*}$arg
}
}
}
if {![llength $repos]} {
set repos [$_ db eval {
select rowid from repositories order by rowid
}]
}
foreach repo $repos {
set res1 {}
$_ db eval {
select * from repositories where rowid = $repo
} {
foreach key {
typeid
instance
path
connectattempts
connectsuccess
connectfailure
readattempts
readsuccess
readfailure
writeattempts
writesuccess
writefailure
readcorrupt
accesstime
accessfailtime
lasterror
} {
ddict set res $rowid $key [set $key]
}
}
}
return $res
}
proc rm {. _ rowid} {
$_ .vars repositoriesnode
set holdings [$_ db exists {
select 1 from holdings where repositoryid = $rowid
}]
if {$holdings} {
error [list {holdings exist for this repository}]
}
set exists [$_ db exists {
select 1 from repositories where rowid = $rowid limit 1
}]
if {!$exists} {
error [list {no such repository} $rowid]
}
$_ db eval {
delete from repositories where rowid = $rowid
}
return [list deleted $rowid]
}
}
.my .method repository
proc repository_entry {. _ type instance path} {
$_ db transaction {
set rowid [$_ db onecolumn {select max(rowid) + 1 from repositories}]
$_ db eval "
insert or ignore into repositories (
rowid
, typeid
, instance
, path
, connectattempts
, connectsuccess
, connectfailure
, readattempts
, readsuccess
, readfailure
, writeattempts
, writesuccess
, writefailure
, readcorrupt
, accesstime
, accessfailtime
, lasterror
) values (
$rowid
, (
select rowid from repotype where name =
[lossless \$type]
)
, @instance
, [lossless \$path]
, 0 , 0 , 0 , 0 , 0 , 0 , 0 ,0 ,0 ,0
, date('now')
, ''
, ''
)
"
}
return $rowid
}
.my .method repository_entry
proc retrieve {. _ key} {
$_ .vars connections
# save time by checking our own records first
set failures 0
if {[$_ exists $key]} {
dict for {repo connection} $connections {
try {
set res [$_ $connection get $key]
} on error {cres copts} {
# to do
# log this
#puts stderr [list {keep retrieval failure}]
#puts stderr [dict get $copts -errorinfo]
incr failures
continue
}
return $res
}
}
error [list {failed to retrieve chunk} connections $failures]
}
.my .method retrieve
proc retrievem {. _ keys args} {
$_ .vars connections
set res {}
set priority {}
while {[llength $args]} {
take args arg
optswitch $arg {
repo {
take args repo
lappend priority $repo
}
}
}
# save time by checking our own records first
lappend repos {*}$priority {*}[lmap {repo conn} $connections {
if {$conn in $priority} continue
set repo
}]
puts [list {retrievem looking through} [llength $repos] connections \
for [llength $keys] keys]
foreach repo $repos {
set repoinfo [$_ repository info repo $repo]
foreach {key val} $repoinfo {
puts [list repoinfo $key [printable tcl 0 $val]]
}
set connection [dict get $connections $repo]
try {
set res1 [$_ $connection getm $keys]
} on error {cres copts} {
# to do
# log this
puts [list error in getm $cres]
set res1 {}
continue
}
puts [list retrievem {got an answer of} [llength [dict keys $res1]]]
foreach {key val} $res1 {
lappend res $key $val
dict unset keys $key
}
if {![dict size $keys]} break
}
return $res
}
.my .method retrievem
proc repositories {. _} {
coroutine [$_ .namespace]::[info cmdcount]_repositories ::apply [list _ {
yield [info coroutine]
$_ db transaction {
$_ db eval {
select rowid ,typeid ,instance ,path ,lastaccess
,lastaccessattempt ,lasterror
from repositories
} {
yield [dict create rowid $rowid typeid $typeid instance \
$instance path $path lastaccess $lastaccess \
lastaccessattemp $lastaccessattempt lasterror $lasterror]
}
}
rename [info coroutine] {}
return -code break
} [namespace current]] $_
}
.my .method repositories
proc safeprint data {
binary scan $data H* res
return $res
}
proc set_ {. _ key value} {
$_ .vars connections newrepos
if {$newrepos} {
$_ connections
}
set stored 0
dict for {repo connection} $connections {
try {
$_ storeat $repo $connection $key $value
} on ok {} {
incr stored
} on error {tres topts} {
$_ writefailure $repo $tres $topts
}
}
if {!$stored} {
error [list {could not store item}]
}
return $stored
}
.my .method set set_
proc storeat {. _ repo connection key value} {
$_ db transaction {
set new [$_ $connection set $key $value]
$_ updateholdings $repo [list $key]
if {$new} {
$_ db eval {
; update repositories set writesuccess = writesuccess + 1
where rowid = $repo
}
}
}
return $new
}
.my .method storeat
proc setbatch {. _ map} {
$_ .vars connections newrepos repositoriesnode
set existing 0
set success 0
set stored 0
set keys [dict keys $map]
if {$newrepos} {
$_ connections
}
$_ db transaction {
dict for {repo connection} $connections {
puts [list {storing batch to} $repo]
set repoinfo [$_ repository info repo $repo]
foreach {key val} $repoinfo {
puts [list repoinfo $key [printable tcl 0 $val]]
}
try {
puts [list {checking missing} {batch size} [llength $keys]]
set missing [$_ $connection missing $keys]
puts [list {missing} [llength $missing] repository $repo]
set newmap {}
foreach key $missing {
dict set newmap $key [dict get $map $key]
}
lassign [$_ $connection setbatch $newmap[set newmap {}]] stored1 existing1
} on ok {} {
puts [list stored cuts $stored1 repo $repo]
incr stored $stored1
incr existing $existing1
$_ updateholdings $repo [dict keys $map]
} on error {tres topts} {
puts stderr [list {error storing batch} repository $repo]
puts stderr [dict get $topts -errorinfo]
$_ writefailure $repo $tres $topts
#return -options $topts $tres
}
}
}
return [list $stored $existing]
}
.my .method setbatch
proc search_prefix {. _ prefix} {
coroutine [$_ .namespace]::[
info cmdcount]_search_prefix $_ search_prefix_coro $prefix
}
.my .method search_prefix
proc search_prefix_coro {. _ prefix} {
hi
dict for {repo connection} $connections {
set token
reply [$_ connection search $prefix]
}
bye
}
.my .method search_prefix_coro
proc version {. _ args} {
$_ .vars topnode
lassign [$_ tree node forge $topnode version] versionnode
if {[llength $args]} {
$_ tree node clear $versionnode
foreach arg $args {
$_ tree node new $versionnode $arg
}
}
$_ tree node ls $versionnode
}
.my .method version
proc versionupdate {. _} {
variable versionmap
set version [$_ version]
if {$version eq {}} {
set version [$_ db onecolumn {
select v from system where e = 0 and a = 'version'
}]
# only version 0.1 applies here
$_ versionupdate0.1
} else {
while 1 {
set exists [dict exists $versionmap {*}$version]
if {$exists} {
$_ "versionupdate $version"
} else break
set version [$_ version]
}
}
}
.my .method versionupdate
proc versionupdate0.1 {. _} {
$_ .vars pathsnode
$_ db transaction {
puts [list updating to version 0 2]
$_ db eval {
select repositories.rowid as rowid ,instance ,path,
repotype.name as type from repositories
join repotype on repositories.typeid = repotype.rowid
} {
$_ tree node forge $pathsnode $path
}
$_ version 0 2
puts [list updated to version 0 2]
}
}
.my .method versionupdate0.1
proc {versionupdate 0 2} {. _} {
$_ db transaction {
$_ db eval {
select repositories.*, repotype.name from repositories join repotype
on repositories.typeid = repotype.rowid
} {
if {$name eq {sqlite_compressed}} {
set path [file dirname $path]
puts [list updating to $path]
set newpath [$_ db onecolumn {
update repositories set path = $path
where rowid = $rowid
; select path from repositories where rowid = $rowid
}]
if {$path ne $newpath} {
error [list {could not update path}]
}
}
}
$_ version 0 3
}
}
.my .method {versionupdate 0 2}
proc {versionupdate 0 3} {. _} {
$_ db transaction {
$_ .vars topnode
$_ tree node forge $topnode repositories
$_ version 0 4
}
}
.my .method {versionupdate 0 3}
proc {versionupdate 0 4} {. _} {
$_ db transaction {
$_ .vars topnode
$_ tree node forge $topnode hosts
$_ version 0 5
}
}
.my .method {versionupdate 0 4}
proc {versionupdate 0 5} {. _} {
$_ db transaction {
$_ .vars topnode
$_ db eval {
alter table repositories
rename column read to readcorrupt
}
$_ version 0 6
}
}
.my .method {versionupdate 0 5}
proc updateholdings {. _ repo keys} {
foreach key $keys {
$_ db eval {
insert or ignore into keys values (null ,@key)
; insert or ignore into holdings values (
null
, (select rowid from keys where key = @key)
, $repo
)
}
}
$_ db eval {
; update system set v = v + 1 where e = 0 and a = 'epoch'
}
return
}
.my .method updateholdings
proc waitconn {. _ chan} {
chan event $chan writable [list [info coroutine]]
yield
set error [chan configure $chan -error]
if {$error ne {}} {
error [list chan $chan error $error]
}
set connecting [chan configure $chan -connecting]
if {$connecting} {
error [list chan $chan connecting $connected]
}
return
}
.my .method waitconn
proc writefailure {. _ repo tres topts} {
set errorinfo [dict get $topts -errorinfo]
puts stderr [list {write failure} resp [printable tcl 0 $repo] res $tres]
#puts stderr $errorinfo
$_ db eval {
update repositories set writefailure = writefailure + 1
, lasterror = $errorinfo
where rowid = $repo
}
return
}
.my .method writefailure
variable magic 6051da36483cdd9c7790cb936c9941fb63dbc134872472796e64c8fba5094df2
set magicb [binary format H* $magic]
variable peertype 0844d993cf1efcd669fed0e417e2495cefa0f2f06fbd7b00166f3380cd2ba863
variable peertypeb [binary format H* $peertype]
variable defaults {
listen {
port 24414
}
}
variable sql_repotype_rowid_by_name "
select rowid from repotype where name = [lossless \$type]
"
variable sql_repotype_insert "
insert into repotype (rowid ,name) values (null ,[lossless \$type])
"
variable sql_peer_add "
insert into repositories (
rowid
, typeid
, instance
, path
, connectattempts
, connectsuccess
, connectfailure
, readattempts
, readsuccess
, readfailure
, writeattempts
, writesuccess
, writefailure
, readcorrupt
, accesstime
, accessfailtime
, lasterror
) values (
null
, (
select rowid from repotype where name =
[lossless \$peertypeb]
)
, @instance
, ''
, 0 , 0 , 0 , 0 , 0 , 0 , 0 ,0 ,0 ,0
, date('now')
, ''
, ''
)
"
variable version {0 3}
variable versionmap {
0 {
2 {}
3 {}
4 {}
5 {}
}
}