Index: src/copas.lua ================================================================== --- src/copas.lua +++ src/copas.lua @@ -8,22 +8,20 @@ -- Contributors: Diego Nehab, Mike Pall, David Burgess, Leonardo Godinho, -- Thomas Harning Jr., and Gary NG -- -- Copyright 2005 - Kepler Project (www.keplerproject.org) -- --- Based on https://github.com/keplerproject/copas --- coroutines can now sleep() +-- $Id: copas.lua,v 1.37 2009/04/07 22:09:52 carregal Exp $ ------------------------------------------------------------------------------- if package.loaded["socket.http"] then error("you must require copas before require'ing socket.http") end local socket = require "socket" -local gettime = socket.gettime() -require "coxpcall" +local coxpcall = require "coxpcall" local WATCH_DOG_TIMEOUT = 120 local UDP_DATAGRAM_MAX = 8192 -- Redefines LuaSocket functions with coroutine safe versions @@ -38,37 +36,36 @@ end end function socket.protect(func) return function (...) - return statusHandler(copcall(func, ...)) + return statusHandler(coxpcall.pcall(func, ...)) end end function socket.newtry(finalizer) return function (...) local status = (...) if not status then - copcall(finalizer, select(2, ...)) + coxpcall.pcall(finalizer, select(2, ...)) error({ (select(2, ...)) }, 0) end return ... end end -- end of LuaSocket redefinitions - -module ("copas", package.seeall) +local copas = {} -- Meta information is public even if beginning with an "_" -_COPYRIGHT = "Copyright (C) 2005-2010 Kepler Project" -_DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services" -_VERSION = "Copas 1.1.7" +copas._COPYRIGHT = "Copyright (C) 2005-2010 Kepler Project" +copas._DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services" +copas._VERSION = "Copas 1.1.7" -- Close the socket associated with the current connection after the handler finishes -autoclose = true +copas.autoclose = true ------------------------------------------------------------------------------- -- Simple set implementation based on LuaSocket's tinyirc.lua example -- adds a FIFO queue for each value in the set ------------------------------------------------------------------------------- @@ -119,46 +116,59 @@ }}) return set end local fnil = function()end -local _sleeping = { - times = {}, cos = {} - - , insert = fnil, remove = fnil - --нить должна проснуться после времени whenwakeup - , push = function(self, whenwakeup, co) - if not co then return end - local t, c = self.times, self.cos - local i, cou = 1, #t - --TODO: сделать бинарный поиск - while i<=cou and t[i] 90) then - _writing_log[client] = gettime() + _writing_log[client] = os.time() coroutine.yield(client, _writing) end if s or err ~= "timeout" then _writing_log[client] = nil return s, err,lastIndex end - _writing_log[client] = gettime() + _writing_log[client] = os.time() coroutine.yield(client, _writing) until false end -- sends data to a client over UDP. Not available for TCP. -- (this is a copy of send() method, adapted for sendto() use) -function sendto(client,data, ip, port) +function copas.sendto(client, data, ip, port) local s, err,sent repeat s, err = client:sendto(data, ip, port) -- adds extra corrotine swap -- garantees that high throuput dont take other threads to starvation if (math.random(100) > 90) then - _writing_log[client] = gettime() + _writing_log[client] = os.time() coroutine.yield(client, _writing) end if s or err ~= "timeout" then _writing_log[client] = nil return s, err end - _writing_log[client] = gettime() + _writing_log[client] = os.time() coroutine.yield(client, _writing) until false end -- waits until connection is completed -function connect(skt, host, port) +function copas.connect(skt, host, port) skt:settimeout(0) local ret, err repeat ret, err = skt:connect (host, port) if ret or err ~= "timeout" then _writing_log[skt] = nil return ret, err end - _writing_log[skt] = gettime() + _writing_log[skt] = os.time() coroutine.yield(skt, _writing) until false return ret, err end -- flushes a client write buffer (deprecated) -function flush(client) +function copas.flush(client) end -- wraps a TCP socket to use Copas methods (send, receive, flush and settimeout) local _skt_mt = {__index = { send = function (self, data, from, to) - return send (self.socket, data, from, to) + return copas.send (self.socket, data, from, to) end, receive = function (self, pattern) if (self.timeout==0) then - return receivePartial(self.socket, pattern) + return copas.receivePartial(self.socket, pattern) end - return receive (self.socket, pattern) + return copas.receive(self.socket, pattern) end, flush = function (self) - return flush (self.socket) + return copas.flush(self.socket) end, settimeout = function (self,time) self.timeout=time return @@ -311,36 +321,36 @@ -- wraps a UDP socket, copy of TCP one adapted for UDP. -- Mainly adds sendto() and receivefrom() local _skt_mt_udp = {__index = { send = function (self, data) - return send (self.socket, data) + return copas.send (self.socket, data) end, sendto = function (self, data, ip, port) - return sendto (self.socket, data, ip, port) + return copas.sendto (self.socket, data, ip, port) end, receive = function (self, size) - return receive (self.socket, (size or UDP_DATAGRAM_MAX)) + return copas.receive (self.socket, (size or UDP_DATAGRAM_MAX)) end, receivefrom = function (self, size) - return receivefrom (self.socket, (size or UDP_DATAGRAM_MAX)) + return copas.receivefrom (self.socket, (size or UDP_DATAGRAM_MAX)) end, flush = function (self) - return flush (self.socket) + return copas.flush (self.socket) end, settimeout = function (self,time) self.timeout=time return end, }} -function wrap (skt) +function copas.wrap (skt) if string.sub(tostring(skt),1,3) == "udp" then return setmetatable ({socket = skt}, _skt_mt_udp) else return setmetatable ({socket = skt}, _skt_mt) end @@ -350,11 +360,11 @@ -- Error handling -------------------------------------------------- local _errhandlers = {} -- error handler per coroutine -function setErrorHandler (err) +function copas.setErrorHandler (err) local co = coroutine.running() if co then _errhandlers [co] = err end end @@ -374,12 +384,12 @@ if ok and res and new_q then new_q:insert (res) new_q:push (res, co) else - if not ok then copcall (_errhandlers [co] or _deferror, res, co, skt) end - if skt and autoclose then skt:close() end + if not ok then coxpcall.pcall (_errhandlers [co] or _deferror, res, co, skt) end + if skt and copas.autoclose then skt:close() end _errhandlers [co] = nil end end -- accepts a connection on socket input @@ -417,21 +427,21 @@ local co = coroutine.create(handler) _reading:insert(server) _doTick (co, server) end -function addserver(server, handler, timeout) +function copas.addserver(server, handler, timeout) if string.sub(tostring(server),1,3) == "udp" then addUDPserver(server, handler, timeout) else addTCPserver(server, handler, timeout) end end ------------------------------------------------------------------------------- -- Adds an new courotine thread to Copas dispatcher ------------------------------------------------------------------------------- -function addthread(thread, ...) +function copas.addthread(thread, ...) if type(thread) ~= "thread" then thread = coroutine.create(thread) end _doTick (thread, nil, ...) return thread @@ -503,23 +513,34 @@ self.def_tick (output) end } addtaskWrite (_writable_t) +-- +--sleeping threads task +local _sleeping_t = { + tick = function (self, time, ...) + _doTick(_sleeping:pop(time)) + end +} + +function copas.sleep(sleeptime) + coroutine.yield((sleeptime or 0), _sleeping) +end + +function copas.wakeup(co) + _sleeping:wakeup(co) +end local last_cleansing = 0 ------------------------------------------------------------------------------- -- Checks for reads and writes on sockets ------------------------------------------------------------------------------- local function _select (timeout) local err - local readable={} - local writable={} - local r={} - local w={} - local now = gettime() + local now = os.time() local duration = os.difftime _readable_t._evs, _writable_t._evs, err = socket.select(_reading, _writing, timeout) local r_evs, w_evs = _readable_t._evs, _writable_t._evs @@ -554,13 +575,12 @@ -- Dispatcher loop step. -- Listen to client requests and handles them -- Returns false if no data was handled (timeout), or true if there was data -- handled (or nil + error message) ------------------------------------------------------------------------------- -function step(timeout) - _sleeping_t:tick(gettime()) - +function copas.step(timeout) + _sleeping_t:tick(os.time()) local err = _select (timeout) if err == "timeout" then return false end if err then error(err) @@ -576,11 +596,12 @@ ------------------------------------------------------------------------------- -- Dispatcher endless loop. -- Listen to client requests and handles them forever ------------------------------------------------------------------------------- -function loop(timeout) +function copas.loop(timeout) while true do - step(timeout) + copas.step(timeout) end end +return copas