Artifact [01b4f6f057]
Not logged in

Artifact 01b4f6f05733aba53750f5abd2629ea41a66c66b:


/*
 * New Datakit protocol
 */

#include "../h/param.h"
#include "../h/stream.h"
#include "../h/conf.h"
#include "../h/ioctl.h"
#include "../h/dkstat.h"
#include "../h/ttyld.h"
#include "dkp.h"

#if 	NDKP>0
#ifdef	CAREFUL
#define	TRC(c)	*dkptrp++ = c; if (dkptrp>=&dkptrb[1024]) dkptrp=dkptrb
char	dkptrb[1024]; char *dkptrp = dkptrb;
#else
#define	TRC(c)
#endif

struct dkp {
	struct	queue	*rdq;	/* associated read queue */
	struct	block	*inp;	/* msg being collected */
	struct	block	*inpe;	/*  end of msg */
	short	state;		/* flags */
	short	trx;		/* # bytes in trailer being collected */
	short	indata;		/* # bytes in message being collected */
	u_char	iseq;		/* last good input sequence number */
	u_char	lastecho;	/* last echo/rej sent */
	char	WS;		/* first non-consumed message */
	char	WACK;		/* first non-acknowledged message */
	char	WNX;		/* next message to be sent */
	u_char	XW;		/* size of xmit window */
	u_char	timer;		/* timeout for xmit */
	u_char	outcnt;		/* count output chars for char mode */
	u_char	trbuf[3];	/* trailer being collected */
	struct	block *xb[8];	/* the xmit window buffer */
};

extern	struct	dkstat dkstat;

/*
 *  Protocol control bytes
 */
#define	SEQ	0010		/* sequence number, ends trailers */
#undef	ECHO
#define	ECHO	0020		/* echos, data given to next queue */
#define	REJ	0030		/* rejections, transmission error */
#define	ACK	0040		/* acknowledgments */
#define	BOT	0050		/* beginning of trailer */
#define	BOTM	0051		/* beginning of trailer, more data follows */
#define	BOTS	0052		/* seq update algorithm on this trailer */
#define	SOU	0053		/* start of unsequenced trailer */
#define	EOU	0054		/* end of unsequenced trailer */
#define	ENQ	0055		/* xmitter requests flow/error status */
#define	CHECK	0056		/* xmitter requests error status */
#define	INITREQ 0057		/* request initialization */
#define	INIT0	0060		/* disable trailer processing */
#define	INIT1	0061		/* enable trailer procesing */
#define	AINIT	0062		/* response to INIT0/INIT1 */
#undef	DELAY
#define	DELAY	0100		/* real-time printing delay */
#define	BREAK	0110		/* Send/receive break (new style) */

#define	OPEN	01
#define	LCLOSE	02
#define	RCLOSE	04
#define	XCHARMODE 010
#define	OPENING	020
#define	RJING	040
#define	STOPPED	0100
#define	RCHARMODE 0200

#define	DKPPRI	28
#define	DKPTIME	2

struct	dkp	dkp[NDKP];

int	dkpiput(), dkpisrv(), dkpoput(), dkposrv(), dkpopen(), cdkpopen();
int	dkpclose();
static	struct qinit cdkprinit = { dkpiput,dkpisrv,cdkpopen,dkpclose,512,64 };
static	struct qinit dkprinit = { dkpiput,dkpisrv,dkpopen,dkpclose,512,64 };
static	struct qinit dkpwinit = { dkpoput,dkposrv,dkpopen,dkpclose,128,65 };
struct	streamtab dkpinfo = { &dkprinit, &dkpwinit };
struct	streamtab cdkpinfo = { &cdkprinit, &dkpwinit };

dkpopen(q)
{
	return(rdkpopen(q, !XCHARMODE));
}

cdkpopen(q)
{
	return(rdkpopen(q, XCHARMODE));
}

rdkpopen(q, mode)
register struct queue *q;
{
	register struct dkp *dkpp;
	static timer = 0;
	int dkptimer();

	if (timer == 0) {
		timer = 1;
		timeout(dkptimer, (caddr_t)NULL, 60);
	}
	if (q->ptr)
		dkpp = (struct dkp *)q->ptr;
	else {
		for (dkpp = dkp; dkpp->state!=0; dkpp++)
			if (dkpp >= &dkp[NDKP])
				return(0);
		dkpp->rdq = q;
		q->ptr = (caddr_t)dkpp;
		WR(q)->ptr = (caddr_t)dkpp;
		WR(q)->flag |= QNOENB;
		putctl(q->next, M_FLUSH);
		dkpp->timer = DKPTIME;
		dkpp->trx = 0;
		dkpp->iseq = 0;
		dkpp->lastecho = ECHO+0;
		dkpp->WS = 1;
		dkpp->WACK = 1;
		dkpp->WNX = 1;
		dkpp->XW = 3;
		if (mode!=XCHARMODE) {
			WR(q)->flag |= QDELIM;
			dkpp->state = OPENING | RCHARMODE;
			putctl1(WR(q)->next, M_CTL, INIT1);
		} else {
			dkpp->XW = 1;
			dkpp->state = RCHARMODE | XCHARMODE | OPEN;
			putctl1(WR(q)->next, M_CTL, INIT0);
		}
	}
	return(1);
}

/*
 * Shut it down.
 *  The problem is to dispose of unacked stuff in the window.
 *   -- no real solution; the receiver might hang on for hours.
 *   Give it 15 seconds.
 */
dkpclose(q)
register struct queue *q;
{
	register struct dkp *dkpp;
	register s = spl5();
	register i;
	register struct block *bp;

	dkpp = (struct dkp *)q->ptr;
	dkpp->state |= LCLOSE;
	flushq(q, 1);
	for (i=0; dkpp->WACK < dkpp->WNX && i<15; i++)
		tsleep((caddr_t)dkpp, DKPPRI, 1);
	if (dkpp->WACK < dkpp->WNX)
		dkprack(dkpp, ACK+((dkpp->WNX-1) & 07));
	dkpinflush(dkpp);
	splx(s);
	dkpp->state = 0;
	flushq(WR(q), 1);
}


/*
 * Process a bunch of input
 *   -- for now, ignore strange control bytes
 */
dkpisrv(q)
register struct queue *q;
{
	register struct dkp *dkpp = (struct dkp *)q->ptr;
	register struct block *bp;
	register c;

	while (bp = getq(q)) {
		if (bp->type == M_CTL) {
			c = *bp->rptr & 0370;
			if (c==REJ || c==ECHO) {
				dkpp->lastecho = *bp->rptr;
				(*WR(q)->next->qinfo->putp)(WR(q)->next, bp);
			} else
				freeb(bp);
			continue;
		}
		if ((q->next->flag&QFULL)==0 || bp->type>=QPCTL
		 || dkpp->state&RCLOSE) {
			TRC('G'); TRC(*bp->rptr);
			(*q->next->qinfo->putp)(q->next, bp);
		} else {
			putbq(q, bp);
			return;
		}
	}
}

/*
 * Packet arrives.
 */
dkpiput(q, bp)
struct queue *q;
register struct block *bp;
{
	register struct dkp *dkpp;
	register i;
	register struct block *nbp;

	if ((dkpp = (struct dkp *)q->ptr)==NULL) {
		freeb(bp);
		return;
	}
	switch (bp->type) {

	moredata:
		bp->rptr++;
		bp->type = M_DATA;
	case M_DATA:
		if (bp->rptr >= bp->wptr||q->flag&QFULL||dkpp->state&LCLOSE) {
			freeb(bp);
			return;
		}
		if (dkpp->state & RCHARMODE) {
			putq(q, bp);
			return;
		}
		switch (dkpp->trx) {

			case 1:
			case 2:
				dkpp->trbuf[dkpp->trx++] = *bp->rptr;
				goto moredata;
			
			default:
				dkpp->trx = 0;
			case 0:
				break;
		}
		bp->next = NULL;
		if (dkpp->indata > 256) { 	/* protect against garbage */
			freeb(bp);
			return;
		}
		if (dkpp->inp) {
			dkpp->inpe->next = bp;
			dkpp->inpe = bp;
		} else {
			dkpp->inp = bp;
			dkpp->inpe = bp;
		}
		dkpp->indata += bp->wptr - bp->rptr;
		return;

	case M_CTL:
		switch (*bp->rptr) {

		case ENQ:
			putctl1(WR(q)->next, M_CTL, dkpp->lastecho);
		case CHECK:
			putctl1(WR(q)->next, M_CTL, ACK+dkpp->iseq);
			dkpinflush(dkpp);
			goto moredata;

		case AINIT:
			dkpp->state &= ~OPENING;
			dkpp->state |= OPEN;
			qenable(WR(q));
			dkpinflush(dkpp);
			goto moredata;

		case INIT0:
		case INIT1:
			putctl1(WR(q)->next, M_CTL, AINIT);
			if (*bp->rptr==INIT0 && (dkpp->state&RCHARMODE)==0) {
				dkpp->state |= RCHARMODE;
				dkpp->XW = 1;
				q->flag &= ~QDELIM;
			} else if (*bp->rptr==INIT1 && (dkpp->state&RCHARMODE)){
				dkpp->state &= ~RCHARMODE;
				dkpp->XW = 3;
				q->flag |= QDELIM;
			}
			dkpinflush(dkpp);
			dkpp->iseq = 0;
			wakeup(dkpp);
			goto moredata;

		case INITREQ:
			if (dkpp->state&XCHARMODE)
				putctl1(WR(q)->next, M_CTL, INIT0);
			else {
				if (dkpp->WS < dkpp->WNX)
					dkprack(dkpp, ECHO+((dkpp->WNX-1)&07));
				dkpp->WS = 1;
				dkpp->WACK = 1;
				dkpp->WNX = 1;
				putctl1(WR(q)->next, M_CTL, INIT1);
			}
			dkpinflush(dkpp);
			goto moredata;

		case BREAK:
			qpctl(q, M_BREAK);
			dkpp->indata++;
			goto moredata;

		case BOT:
		case BOTS:
		case BOTM:
			dkpp->trx = 1;
			dkpp->trbuf[0] = *bp->rptr;
			goto moredata;

		case REJ+0: case REJ+1: case REJ+2: case REJ+3:
		case REJ+4: case REJ+5: case REJ+6: case REJ+7:
			if (dkpp->state&RCHARMODE)
				goto moredata;
			TRC('r');
			if (((*bp->rptr+1)&07) == (dkpp->WACK&07)
			 && (dkpp->state&RJING) == 0) {
				dkstat.dkprxmit++;
				for (i=dkpp->WACK; i<dkpp->WNX; i++) {
					TRC('Z');
					TRC('0' + (i&07));
					dkpp->state |= RJING;
					dkpxmit(WR(q), dkpp->xb[i&07], i);
				}
			}
			goto moredata;
		
		case ACK+0: case ACK+1: case ACK+2: case ACK+3:
		case ACK+4: case ACK+5: case ACK+6: case ACK+7:
		case ECHO+0: case ECHO+1: case ECHO+2: case ECHO+3:
		case ECHO+4: case ECHO+5: case ECHO+6: case ECHO+7:
			dkprack(dkpp, *bp->rptr);
			goto moredata;

		case SEQ+0: case SEQ+1: case SEQ+2: case SEQ+3:
		case SEQ+4: case SEQ+5: case SEQ+6: case SEQ+7:
			i = *bp->rptr & 07;
			if (dkpp->state & RCHARMODE) {
				TRC('e');
				qpctl1(q, M_CTL, ECHO+i);
				goto moredata;
			}
			if (dkpp->trx !=3
			 || dkpp->indata != dkpp->trbuf[1] + (dkpp->trbuf[2]<<8)
			 || i != ((dkpp->iseq+1)&07)) {	/* reject? */
				if (dkpp->trx != 3)
					dkstat.dkprjtrs++;
				else if (i != ((dkpp->iseq+1)&07))
					dkstat.dkprjseq++;
				else
					dkstat.dkprjpks++;
				dkpinflush(dkpp);
				if (dkpp->trbuf[0]==BOTS)
					dkpp->iseq = i;
				TRC('R'); TRC('0'+dkpp->iseq);
				TRC(dkpp->trx!=3?'t':(i!=(dkpp->iseq+1)&07?'s':'c'));
				qpctl1(q, M_CTL, REJ+dkpp->iseq);
				goto moredata;
			}
			/* accept */
			while (nbp = dkpp->inp) {
				dkpp->inp = nbp->next;
				putq(q, nbp);
			}
			TRC('A'); TRC('0'+i);
			dkpp->inpe = NULL;
			dkpp->trx = 0;
			dkpp->indata = 0;
			dkpp->iseq = i;
			qpctl1(q, M_CTL, ECHO+i);
			if (dkpp->trbuf[0] != BOTM)
				qpctl(q, M_DELIM);
			goto moredata;

		default:
			if (*bp->rptr < 0200)	/* non-supervisory */
				dkpp->indata++;
			qpctl1(q, M_CTL, *bp->rptr);
			goto moredata;
		}

	case M_HANGUP:
		dkpp->state |= RCLOSE;
		flushq(WR(q), 1);
		dkprack(dkpp, ECHO+((dkpp->WNX-1) & 07));
		putq(q, bp);
		return;

	case M_IOCACK:
	case M_IOCNAK:
	case M_CLOSE:
		(*q->next->qinfo->putp)(q->next, bp);
		return;

	default:
		freeb(bp);
		return;
	}
}

/*
 * --- Output processor
 */

/*
 * accept data from writer
 *  -- handle most non-data messages
 */
dkpoput(q, bp)
register struct queue *q;
register struct block *bp;
{
	register struct dkp *dkpp = (struct dkp *)q->ptr;
	register union stmsg *sp;
	register x;

	if (dkpp->state & RCLOSE) {
		freeb(bp);
		return;
	}
	switch (bp->type) {

	case M_STOP:
		dkpp->state |= STOPPED;
		freeb(bp);
		return;

	case M_START:
		dkpp->state &= ~STOPPED;
		freeb(bp);
		qenable(q);
		return;

	case M_FLUSH:
		flushq(q, 0);
		freeb(bp);
		return;

	case M_IOCTL:
		sp = (union stmsg *)bp->rptr;
		switch (sp->ioc0.com) {

		case TIOCSETP:
		case TIOCSETN:
			x = sp->ioc1.sb.sg_ispeed;
			bp->wptr = bp->rptr;
			bp->type = M_IOCACK;
			qreply(q, bp);
			if (x==0)
				putctl(OTHERQ(q), M_HANGUP);
			return;

		case TIOCGETP:
			sp->ioc1.sb.sg_ispeed =
			  sp->ioc1.sb.sg_ospeed = B9600;
			bp->type = M_IOCACK;
			qreply(q, bp);
			return;

		case DIOCSTREAM:
			RD(q)->flag &= ~QDELIM;
			bp->wptr = bp->rptr;
			bp->type = M_IOCACK;
			qreply(q, bp);
			return;
		
		case DIOCRECORD:
			if ((dkpp->state&RCHARMODE) == 0) {
				RD(q)->flag | = QDELIM;
				bp->wptr = bp->rptr;
				bp->type = M_IOCACK;
			} else
				bp->type = M_IOCNAK;
			qreply(q, bp);
			return;

		case KIOCINIT:
			if (dkpp->state&XCHARMODE)
				putctl1(q->next, M_CTL, INIT0);
			else {
				if (dkpp->WS < dkpp->WNX)
					dkprack(dkpp, ECHO+((dkpp->WNX-1)&07));
				dkpp->WS = 1;
				dkpp->WACK = 1;
				dkpp->WNX = 1;
				putctl1(q->next, M_CTL, INIT1);
			}
			bp->wptr = bp->rptr;
			bp->type = M_IOCACK;
			qreply(q, bp);
			return;

		case KIOCISURP:
			bp->wptr = bp->rptr;
			bp->type = M_IOCACK;
			qreply(q, bp);
			return;

		default:
			(*q->next->qinfo->putp)(q->next, bp);
			return;
		}

	case M_DELAY:
		x = *bp->rptr;
		*bp->rptr = DELAY;
		bp->type = M_CTL;
		while (x) {
			(*bp->rptr)++;
			x >>= 1;
		}
		goto putonq;

	case M_CLOSE:
		(*q->next->qinfo->putp)(q->next, bp);
		return;

	default:
		freeb(bp);
		return;

	case M_BREAK:
		bp->type = M_CTL;
		*bp->wptr++ = BREAK;
	case M_DELIM:
	case M_DATA:
	putonq:
		putq(q, bp);
		if (dkpp->WNX < dkpp->WS+dkpp->XW)
			qenable(q);
		return;
	}
}

/*
 * Out server:
 *  if space in window, process queue
 */
dkposrv(q)
register struct queue *q;
{
	register struct dkp *dkpp = (struct dkp *)q->ptr;
	register struct block *bp, *xbp;
	int c;

	if (dkpp->state & (STOPPED|OPENING))
		return;
	while (dkpp->WNX < dkpp->WS+dkpp->XW) {
		if ((bp = getq(q)) == NULL)
			break;
		if (dkpp->state & XCHARMODE) {
			switch (bp->type) {

			case M_DELIM:
				freeb(bp);
				continue;

			default:
				dkpp->outcnt += bp->wptr - bp->rptr;
				(*q->next->qinfo->putp)(q->next, bp);
				if (dkpp->outcnt >= 64) {
					putctl1(q->next, M_CTL,
					   SEQ+(dkpp->WNX&07));
					dkpp->WNX++;
					dkpp->WACK = dkpp->WNX;
					dkpp->outcnt = 0;
				}
				continue;
			}
		}
		/*
		 * look ahead for delimiters.
		 * Don't transmit a single data block,
		 * to avoid 0-len block next time
		 */
		if (bp->type==M_DATA && (xbp = q->first) && xbp->type==M_DELIM){
			xbp = getq(q);
			if (xbp)
				freeb(xbp);	/* toss delimiter */
			bp->type = M_DELIM;
		}
		if (bp->type==M_DATA && q->first==NULL) {
			putbq(q, bp);
			return;
		}
		TRC('x'); TRC('0'+dkpp->WS/10); TRC('0'+dkpp->WS%10);
		TRC('.'); TRC('0'+dkpp->WNX/10); TRC('0'+dkpp->WNX%10);
		if (dkpp->xb[dkpp->WNX&07]) {
			freeb(dkpp->xb[dkpp->WNX&07]);
			printf("dkp losing block");
		}
		dkpp->xb[dkpp->WNX & 07] = bp;
		dkpxmit(q, bp, dkpp->WNX);
		dkpp->WNX++;
	}
}

/*
 *  Send out a message, with trailer.
 */
dkpxmit(q, bp, seqno)
struct queue *q;
register struct block *bp;
{
	register struct dkp *dkpp = (struct dkp *)q->ptr;
	register type;
	register size;
	register struct block *xbp;

	if (bp==NULL) {
		printf("null bp in dkpxmit\n");
		return;
	}
	type = bp->type;
	size = bp->wptr - bp->rptr;
	seqno &= 07;
	/* send ptr to block, if non-empty */
	if (size) {
		if ((xbp = allocb(0)) == NULL)
			return;
		TRC('X'); TRC('0'+seqno);
		xbp->rptr = bp->rptr;
		xbp->wptr = bp->wptr;
		if (type!=M_DELIM)
			xbp->type = type;
		(*q->next->qinfo->putp)(q->next, xbp);
	}
	/* send trailer */
	if ((xbp = allocb(3)) == NULL)
		return;
	xbp->type = M_CTL;
	*xbp->wptr++ = type==M_DATA? BOTM: BOT;
	*xbp->wptr++ = size;
	*xbp->wptr++ = size >> 8;
	(*q->next->qinfo->putp)(q->next, xbp);
	putctl1(q->next, M_CTL, SEQ + seqno);
	dkpp->timer = DKPTIME;
}

/*
 * Receive an ack of some sort for a transmitted message.
 *  Advance various windows.
 */
dkprack(dkpp, msg)
register struct dkp *dkpp;
{
	register struct block **bpp;
	register seqno, i;

	seqno = msg & 07;
	msg &= 0370;
	/* invariants: 0 <= WS <= WACK <= WNX; seqno maximal < WNX; WS < 8 */
	if (seqno >= dkpp->WNX)
		seqno -= 8;
	else if (seqno+8 < dkpp->WNX)
		seqno += 8;
	dkpp->state &= ~RJING;
	for (i=dkpp->WS; i<=seqno; i++) {
		bpp = &dkpp->xb[i&07];
		if (*bpp) {
			freeb(*bpp);
			*bpp = NULL;
		}
	}
	if ((int)dkpp->WACK <= seqno)
		dkpp->WACK = seqno+1;
	if (msg==ECHO) {
		TRC('E'); TRC('0'+(seqno&07));
		if (dkpp->WS <= seqno) {
			dkpp->timer = DKPTIME;	/* push off timeout */
			dkpp->WS = seqno+1;
			if (dkpp->WNX<dkpp->WS+dkpp->XW && WR(dkpp->rdq)->count)
				qenable(WR(dkpp->rdq));
		}
	} else {
		for (i=dkpp->WACK; i<dkpp->WNX; i++) {
			if (dkpp->xb[i&07]==0)
			 printf("WS %d WACK %d WNX %d i %d seqno %d\n",
			   dkpp->WS, dkpp->WACK, dkpp->WNX, i, seqno);
			dkpxmit(WR(dkpp->rdq), dkpp->xb[i&07], i);
			dkstat.dkprxmit++;
		}
	}
	if (dkpp->WS >= 8) {
		dkpp->WS -= 8;
		dkpp->WACK -= 8;
		dkpp->WNX -= 8;
	}
}

dkptimer()
{
	register struct dkp *dkpp;
	register struct queue *q;

	for (dkpp = dkp; dkpp < &dkp[NDKP]; dkpp++) {
		if ((dkpp->state&(OPEN|OPENING)) == 0)
			continue;
		if (--dkpp->timer>0)
			continue;
		q = WR(dkpp->rdq)->next;
		if (q->flag&QFULL)
			continue;
		if (dkpp->state & XCHARMODE) {
			if (dkpp->WS < dkpp->WNX)
				putctl1(q, M_CTL, SEQ+((dkpp->WNX-1)&07));
			dkpp->timer = 10;
			continue;
		}
		if (dkpp->state&OPENING)
			putctl1(q, M_CTL, INIT1);
		if (dkpp->WS != dkpp->WNX)
			putctl1(q, M_CTL, ENQ);
		dkpp->timer = DKPTIME;
	}
	timeout(dkptimer, (caddr_t)NULL, 60);
}

/*
 * throw away data in front of the barrier, and clear the trailer buffer
 */
dkpinflush(dkpp)
register struct dkp *dkpp;
{
	register struct block *bp;

	while (bp = dkpp->inp) {
		dkpp->inp = bp->next;
		freeb(bp);
	}
	dkpp->inpe = NULL;
	dkpp->trx = 0;
	dkpp->indata = 0;
}
#endif