Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Overview
| Comment: | Refinements of FlushChannel() and its callers. Notably includes removal of the flag BUFFER_READY. |
|---|---|
| Timelines: | family | ancestors | descendants | both | core-8-5-branch |
| Files: | files | file ages | folders |
| SHA1: |
0ddf09e8bdcdb9228b9853301cba0ea1 |
| User & Date: | dgp 2014-05-28 18:58:51.035 |
Context
|
2014-05-31
| ||
| 02:30 | Correct the interest masks in the Tcl_CreateFileHandler() calls in PipeWatchProc(). When we are int... check-in: 74a2238ecc user: dgp tags: core-8-5-branch | |
|
2014-05-30
| ||
| 10:36 | win socket -async: do not loose connect notification by temporarily stop connect monitoring. Bug [33... check-in: 6ecb583012 user: oehhar tags: core-8-5-branch | |
|
2014-05-29
| ||
| 15:04 | Refinements of FlushChannel() and its callers. Notably includes removal of the flag BUFFER_READY. check-in: 37bcb4b42d user: dgp tags: trunk | |
| 14:53 | merge core-8-5-branch check-in: 4f0f24c6c3 user: oehhar tags: bug-336441ed59 | |
|
2014-05-28
| ||
| 18:58 | Refinements of FlushChannel() and its callers. Notably includes removal of the flag BUFFER_READY. check-in: 0ddf09e8bd user: dgp tags: core-8-5-branch | |
| 18:49 | Update comment to explain assumptions. Closed-Leaf check-in: b7bbbbc6ed user: dgp tags: dgp-flush-channel | |
|
2014-05-24
| ||
| 19:56 | Comment out lines of test io-53.4 that appear to do nothing of any value. check-in: fb90b0f1fd user: dgp tags: core-8-5-branch | |
Changes
Changes to generic/tclIO.c.
| ︙ | ︙ | |||
280 281 282 283 284 285 286 | #define SpaceLeft(bufPtr) ((bufPtr)->bufLength - (bufPtr)->nextAdded) #define IsBufferReady(bufPtr) ((bufPtr)->nextAdded > (bufPtr)->nextRemoved) #define IsBufferEmpty(bufPtr) ((bufPtr)->nextAdded == (bufPtr)->nextRemoved) | | | 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 | #define SpaceLeft(bufPtr) ((bufPtr)->bufLength - (bufPtr)->nextAdded) #define IsBufferReady(bufPtr) ((bufPtr)->nextAdded > (bufPtr)->nextRemoved) #define IsBufferEmpty(bufPtr) ((bufPtr)->nextAdded == (bufPtr)->nextRemoved) #define IsBufferFull(bufPtr) ((bufPtr) && (bufPtr)->nextAdded >= (bufPtr)->bufLength) #define IsBufferOverflowing(bufPtr) ((bufPtr)->nextAdded > (bufPtr)->bufLength) #define InsertPoint(bufPtr) ((bufPtr)->buf + (bufPtr)->nextAdded) #define RemovePoint(bufPtr) ((bufPtr)->buf + (bufPtr)->nextRemoved) |
| ︙ | ︙ | |||
1144 1145 1146 1147 1148 1149 1150 |
CheckForStdChannelsBeingClosed(chan);
/*
* If the refCount reached zero, close the actual channel.
*/
if (statePtr->refCount <= 0) {
| < < < < < < < < < | 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 |
CheckForStdChannelsBeingClosed(chan);
/*
* If the refCount reached zero, close the actual channel.
*/
if (statePtr->refCount <= 0) {
Tcl_Preserve((ClientData)statePtr);
if (!GotFlag(statePtr, BG_FLUSH_SCHEDULED)) {
/*
* We don't want to re-enter Tcl_Close().
*/
if (!GotFlag(statePtr, CHANNEL_CLOSED)) {
|
| ︙ | ︙ | |||
2486 2487 2488 2489 2490 2491 2492 |
Channel *chanPtr, /* The channel to flush on. */
int calledFromAsyncFlush) /* If nonzero then we are being called from an
* asynchronous flush callback. */
{
ChannelState *statePtr = chanPtr->state;
/* State of the channel stack. */
ChannelBuffer *bufPtr; /* Iterates over buffered output queue. */
| < < | < < | < < | > | > > > > > | | < | < > | < < | | | | | | | | > > > > > > > > > > > > > > > > > > < < < < < < < < < < < < < < < < < < < < < | | < | 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 |
Channel *chanPtr, /* The channel to flush on. */
int calledFromAsyncFlush) /* If nonzero then we are being called from an
* asynchronous flush callback. */
{
ChannelState *statePtr = chanPtr->state;
/* State of the channel stack. */
ChannelBuffer *bufPtr; /* Iterates over buffered output queue. */
int written; /* Amount of output data actually written in
* current round. */
int errorCode = 0; /* Stores POSIX error codes from channel
* driver operations. */
int wroteSome = 0; /* Set to one if any data was written to the
* driver. */
/*
* Prevent writing on a dead channel -- a channel that has been closed but
* not yet deallocated. This can occur if the exit handler for the channel
* deallocation runs before all channels are deregistered in all
* interpreters.
*/
if (CheckForDeadChannel(interp, statePtr)) {
return -1;
}
/*
* Should we shift the current output buffer over to the output queue?
* First check that there are bytes in it. If so then...
* If the output queue is empty, then yes, trusting the caller called
* us only when written bytes ought to be flushed.
* If the current output buffer is full, then yes, so we can meet
* the post-condition that on a successful return to caller we've
* left space in the current output buffer for more writing (the flush
* call was to make new room).
* Otherwise, no. Keep the current output buffer where it is so more
* can be written to it, possibly filling it, to promote more efficient
* buffer usage.
*/
bufPtr = statePtr->curOutPtr;
if (bufPtr && BytesLeft(bufPtr) && /* Keep empties off queue */
(statePtr->outQueueHead == NULL || IsBufferFull(bufPtr))) {
if (statePtr->outQueueHead == NULL) {
statePtr->outQueueHead = bufPtr;
} else {
statePtr->outQueueTail->nextPtr = bufPtr;
}
statePtr->outQueueTail = bufPtr;
statePtr->curOutPtr = NULL;
}
assert(!IsBufferFull(statePtr->curOutPtr));
/*
* If we are not being called from an async flush and an async flush
* is active, we just return without producing any output.
*/
if (!calledFromAsyncFlush && GotFlag(statePtr, BG_FLUSH_SCHEDULED)) {
return 0;
}
/*
* Loop over the queued buffers and attempt to flush as much as possible
* of the queued output to the channel.
*/
while (statePtr->outQueueHead) {
bufPtr = statePtr->outQueueHead;
/*
* Produce the output on the channel.
*/
PreserveChannelBuffer(bufPtr);
written = (chanPtr->typePtr->outputProc)(chanPtr->instanceData,
RemovePoint(bufPtr), BytesLeft(bufPtr), &errorCode);
/*
* If the write failed completely attempt to start the asynchronous
* flush mechanism and break out of this loop - do not attempt to
* write any more output at this time.
*/
|
| ︙ | ︙ | |||
2668 2669 2670 2671 2672 2673 2674 | /* * When we get an error we throw away all the output currently * queued. */ DiscardOutputQueued(statePtr); ReleaseChannelBuffer(bufPtr); | | > > | > > > > > > | 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 |
/*
* When we get an error we throw away all the output currently
* queued.
*/
DiscardOutputQueued(statePtr);
ReleaseChannelBuffer(bufPtr);
break;
} else {
/* TODO: Consider detecting and reacting to short writes
* on blocking channels. Ought not happen. See iocmd-24.2. */
wroteSome = 1;
}
bufPtr->nextRemoved += written;
/*
* If this buffer is now empty, recycle it.
*/
if (IsBufferEmpty(bufPtr)) {
statePtr->outQueueHead = bufPtr->nextPtr;
if (statePtr->outQueueHead == NULL) {
statePtr->outQueueTail = NULL;
}
RecycleBuffer(statePtr, bufPtr, 0);
}
ReleaseChannelBuffer(bufPtr);
} /* Closes "while". */
/*
* If we wrote some data while flushing in the background, we are done.
* We can't finish the background flush until we run out of data and the
* channel becomes writable again. This ensures that all of the pending
* data has been flushed at the system level.
*/
if (GotFlag(statePtr, BG_FLUSH_SCHEDULED)) {
if (wroteSome) {
return errorCode;
} else if (statePtr->outQueueHead == NULL) {
ResetFlag(statePtr, BG_FLUSH_SCHEDULED);
(chanPtr->typePtr->watchProc)(chanPtr->instanceData,
statePtr->interestMask);
} else {
/* TODO: If code reaches this point, it means a writable
* event is being handled on the channel, but the channel
* could not in fact be written to. This ought not happen,
* but Unix pipes appear to act this way (see io-53.4).
* Also can imagine broken reflected channels. */
}
}
/*
* If the channel is flagged as closed, delete it when the refCount drops
* to zero, the output queue is empty and there is no output in the
* current output buffer.
|
| ︙ | ︙ | |||
3247 3248 3249 3250 3251 3252 3253 |
statePtr->closeCbPtr = cbPtr->nextPtr;
(cbPtr->proc)(cbPtr->clientData);
ckfree((char *) cbPtr);
}
ResetFlag(statePtr, CHANNEL_INCLOSE);
| < < < < < < < < | 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 |
statePtr->closeCbPtr = cbPtr->nextPtr;
(cbPtr->proc)(cbPtr->clientData);
ckfree((char *) cbPtr);
}
ResetFlag(statePtr, CHANNEL_INCLOSE);
/*
* If this channel supports it, close the read side, since we don't need
* it anymore and this will help avoid deadlocks on some channel types.
*/
if (chanPtr->typePtr->closeProc == TCL_CLOSE2PROC) {
result = (chanPtr->typePtr->close2Proc)(chanPtr->instanceData, interp,
|
| ︙ | ︙ | |||
3665 3666 3667 3668 3669 3670 3671 |
/* Prevent read attempts on a closed channel */
DiscardInputQueued(chanPtr->state, 0);
Tcl_SetErrno(EINVAL);
return -1;
}
if ((chanPtr->typePtr->seekProc != NULL)
&& (Tcl_OutputBuffered((Tcl_Channel) chanPtr) > 0)) {
| | < > | > > > > > > > | | 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 |
/* Prevent read attempts on a closed channel */
DiscardInputQueued(chanPtr->state, 0);
Tcl_SetErrno(EINVAL);
return -1;
}
if ((chanPtr->typePtr->seekProc != NULL)
&& (Tcl_OutputBuffered((Tcl_Channel) chanPtr) > 0)) {
/*
* CAVEAT - The assumption here is that FlushChannel() will
* push out the bytes of any writes that are in progress.
* Since this is a seekable channel, we assume it is not one
* that can block and force bg flushing. Channels we know that
* can do that -- sockets, pipes -- are not seekable. If the
* assumption is wrong, more drastic measures may be required here
* like temporarily setting the channel into blocking mode.
*/
if (FlushChannel(NULL, chanPtr, 0) != 0) {
return -1;
}
}
return 0;
}
|
| ︙ | ︙ | |||
3850 3851 3852 3853 3854 3855 3856 |
needNlFlush = 0;
}
}
ReleaseChannelBuffer(bufPtr);
}
if ((flushed < total) && (GotFlag(statePtr, CHANNEL_UNBUFFERED) ||
(needNlFlush && GotFlag(statePtr, CHANNEL_LINEBUFFERED)))) {
| < | 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 |
needNlFlush = 0;
}
}
ReleaseChannelBuffer(bufPtr);
}
if ((flushed < total) && (GotFlag(statePtr, CHANNEL_UNBUFFERED) ||
(needNlFlush && GotFlag(statePtr, CHANNEL_LINEBUFFERED)))) {
if (FlushChannel(NULL, chanPtr, 0) != 0) {
return -1;
}
}
return total;
}
|
| ︙ | ︙ | |||
5969 5970 5971 5972 5973 5974 5975 |
chanPtr = statePtr->topChanPtr;
if (CheckChannelErrors(statePtr, TCL_WRITABLE) != 0) {
return -1;
}
| < < < < < < < < | 5959 5960 5961 5962 5963 5964 5965 5966 5967 5968 5969 5970 5971 5972 |
chanPtr = statePtr->topChanPtr;
if (CheckChannelErrors(statePtr, TCL_WRITABLE) != 0) {
return -1;
}
result = FlushChannel(NULL, chanPtr, 0);
if (result != 0) {
return TCL_ERROR;
}
return TCL_OK;
}
|
| ︙ | ︙ | |||
6116 6117 6118 6119 6120 6121 6122 |
/*
* See if we can fill an existing buffer. If we can, read only as much as
* will fit in it. Otherwise allocate a new buffer, add it to the input
* queue and attempt to fill it to the max.
*/
bufPtr = statePtr->inQueueTail;
| | | < | 6098 6099 6100 6101 6102 6103 6104 6105 6106 6107 6108 6109 6110 6111 6112 6113 |
/*
* See if we can fill an existing buffer. If we can, read only as much as
* will fit in it. Otherwise allocate a new buffer, add it to the input
* queue and attempt to fill it to the max.
*/
bufPtr = statePtr->inQueueTail;
if ((bufPtr == NULL) || IsBufferFull(bufPtr)) {
bufPtr = statePtr->saveInBufPtr;
statePtr->saveInBufPtr = NULL;
/*
* Check the actual buffersize against the requested buffersize.
* Saved buffers of the wrong size are squashed. This is done
* to honor dynamic changes of the buffersize made by the user.
|
| ︙ | ︙ | |||
6148 6149 6150 6151 6152 6153 6154 6155 6156 6157 6158 6159 6160 6161 |
if (statePtr->inQueueTail == NULL) {
statePtr->inQueueHead = bufPtr;
} else {
statePtr->inQueueTail->nextPtr = bufPtr;
}
statePtr->inQueueTail = bufPtr;
}
PreserveChannelBuffer(bufPtr);
nread = ChanRead(chanPtr, InsertPoint(bufPtr), toRead);
if (nread < 0) {
result = Tcl_GetErrno();
| > > | 6129 6130 6131 6132 6133 6134 6135 6136 6137 6138 6139 6140 6141 6142 6143 6144 |
if (statePtr->inQueueTail == NULL) {
statePtr->inQueueHead = bufPtr;
} else {
statePtr->inQueueTail->nextPtr = bufPtr;
}
statePtr->inQueueTail = bufPtr;
} else {
toRead = SpaceLeft(bufPtr);
}
PreserveChannelBuffer(bufPtr);
nread = ChanRead(chanPtr, InsertPoint(bufPtr), toRead);
if (nread < 0) {
result = Tcl_GetErrno();
|
| ︙ | ︙ | |||
6288 6289 6290 6291 6292 6293 6294 |
}
ResetFlag(statePtr, CHANNEL_NONBLOCKING);
if (GotFlag(statePtr, BG_FLUSH_SCHEDULED)) {
ResetFlag(statePtr, BG_FLUSH_SCHEDULED);
}
}
| < < < < < < < < < | 6271 6272 6273 6274 6275 6276 6277 6278 6279 6280 6281 6282 6283 6284 |
}
ResetFlag(statePtr, CHANNEL_NONBLOCKING);
if (GotFlag(statePtr, BG_FLUSH_SCHEDULED)) {
ResetFlag(statePtr, BG_FLUSH_SCHEDULED);
}
}
/*
* If the flush fails we cannot recover the original position. In that
* case the seek is not attempted because we do not know where the access
* position is - instead we return the error. FlushChannel has already
* called Tcl_SetErrno() to report the error upwards. If the flush
* succeeds we do the seek also.
*/
|
| ︙ | ︙ | |||
8821 8822 8823 8824 8825 8826 8827 |
if (GotFlag(statePtr, CHANNEL_EOF)
&& (bufPtr == NULL || IsBufferEmpty(bufPtr))) {
break;
}
/* If there is no full buffer, attempt to create and/or fill one. */
| | | 8795 8796 8797 8798 8799 8800 8801 8802 8803 8804 8805 8806 8807 8808 8809 |
if (GotFlag(statePtr, CHANNEL_EOF)
&& (bufPtr == NULL || IsBufferEmpty(bufPtr))) {
break;
}
/* If there is no full buffer, attempt to create and/or fill one. */
while (!IsBufferFull(bufPtr)) {
int code;
moreData:
code = GetInput(chanPtr);
bufPtr = statePtr->inQueueHead;
assert (bufPtr != NULL);
|
| ︙ | ︙ | |||
10376 10377 10378 10379 10380 10381 10382 |
#define ChanFlag(chr,bit) (buf[i++] = ((flags & (bit)) ? (chr) : '_'))
ChanFlag('r', TCL_READABLE);
ChanFlag('w', TCL_WRITABLE);
ChanFlag('n', CHANNEL_NONBLOCKING);
ChanFlag('l', CHANNEL_LINEBUFFERED);
ChanFlag('u', CHANNEL_UNBUFFERED);
| < | 10350 10351 10352 10353 10354 10355 10356 10357 10358 10359 10360 10361 10362 10363 |
#define ChanFlag(chr,bit) (buf[i++] = ((flags & (bit)) ? (chr) : '_'))
ChanFlag('r', TCL_READABLE);
ChanFlag('w', TCL_WRITABLE);
ChanFlag('n', CHANNEL_NONBLOCKING);
ChanFlag('l', CHANNEL_LINEBUFFERED);
ChanFlag('u', CHANNEL_UNBUFFERED);
ChanFlag('F', BG_FLUSH_SCHEDULED);
ChanFlag('c', CHANNEL_CLOSED);
ChanFlag('E', CHANNEL_EOF);
ChanFlag('S', CHANNEL_STICKY_EOF);
ChanFlag('B', CHANNEL_BLOCKED);
ChanFlag('/', INPUT_SAW_CR);
ChanFlag('D', CHANNEL_DEAD);
|
| ︙ | ︙ |
Changes to generic/tclIO.h.
| ︙ | ︙ | |||
223 224 225 226 227 228 229 | #define CHANNEL_NONBLOCKING (1<<3) /* Channel is currently in nonblocking * mode. */ #define CHANNEL_LINEBUFFERED (1<<4) /* Output to the channel must be * flushed after every newline. */ #define CHANNEL_UNBUFFERED (1<<5) /* Output to the channel must always * be flushed immediately. */ | < < < < < | 223 224 225 226 227 228 229 230 231 232 233 234 235 236 | #define CHANNEL_NONBLOCKING (1<<3) /* Channel is currently in nonblocking * mode. */ #define CHANNEL_LINEBUFFERED (1<<4) /* Output to the channel must be * flushed after every newline. */ #define CHANNEL_UNBUFFERED (1<<5) /* Output to the channel must always * be flushed immediately. */ #define BG_FLUSH_SCHEDULED (1<<7) /* A background flush of the queued * output buffers has been * scheduled. */ #define CHANNEL_CLOSED (1<<8) /* Channel has been closed. No further * Tcl-level IO on the channel is * allowed. */ #define CHANNEL_EOF (1<<9) /* EOF occurred on this channel. This |
| ︙ | ︙ |
Changes to tests/io.test.
| ︙ | ︙ | |||
2782 2783 2784 2785 2786 2787 2788 |
set r
} "hello\nbye\nstrange\n"
test io-29.34 {Tcl_Close, async flush on close, using sockets} {socket tempNotMac fileevent} {
variable c 0
variable x running
set l abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz
proc writelots {s l} {
| | | 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 |
set r
} "hello\nbye\nstrange\n"
test io-29.34 {Tcl_Close, async flush on close, using sockets} {socket tempNotMac fileevent} {
variable c 0
variable x running
set l abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz
proc writelots {s l} {
for {set i 0} {$i < 9000} {incr i} {
puts $s $l
}
}
proc accept {s a p} {
variable x
fileevent $s readable [namespace code [list readit $s]]
fconfigure $s -blocking off
|
| ︙ | ︙ | |||
2813 2814 2815 2816 2817 2818 2819 |
vwait [namespace which -variable x]
fconfigure $cs -blocking off
writelots $cs $l
close $cs
close $ss
vwait [namespace which -variable x]
set c
| | | 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 |
vwait [namespace which -variable x]
fconfigure $cs -blocking off
writelots $cs $l
close $cs
close $ss
vwait [namespace which -variable x]
set c
} 9000
test io-29.35 {Tcl_Close vs fileevent vs multiple interpreters} {socket tempNotMac fileevent} {
# On Mac, this test screws up sockets such that subsequent tests using port 2828
# either cause errors or panic().
catch {interp delete x}
catch {interp delete y}
interp create x
|
| ︙ | ︙ |