next up previous contents
Next: 7.6 Packet Sublayer Up: 7 Source Code Presentation Previous: 7.4 User Space Implementation:

7.5 Parcel Sublayer

 

7.5.1 xudp_queueparcel function

int xudp_queueparcel(struct XUDPCB *cb, u_char type);

 1 int xudp_queueparcel(struct XUDPCB *cb, u_char type)
 2 {
 3     
 4     u_short              wDataWaiting;
 5     char                 *buffer;
 6     struct XUDPParcel    *pParcel;
 7     Timestamp            tto;
 8 
 9     /** Read in the number of bytes in the parcel waiting on the data */
10     /** socket */
11 
12     streamRead(cb->localclient->fdControl, (char *)&wDataWaiting,
13                sizeof(u_short));
14 
15     /* Read in time to obsolence of parcel */
16 
17     streamRead(cb->localclient->fdControl, (char *) &tto,
18                sizeof(Timestamp));
19 
20     if ( !(buffer=malloc(wDataWaiting)))
21         xudpError("xudp_queuevideo: Allocating RAM");
22     
23     /** Read in the parcel's data */
24 
25     streamRead(cb->localclient->fdVideo, buffer, wDataWaiting);
26 
27     /** Set it up and queue it on the queue of parcels waiting to be */
28     /** fragmented and sent */
29 
30     pParcel=makeParcel();
31     pParcel->length=wDataWaiting;
32     pParcel->options=type;
33     pParcel->tto=tto;
34     pParcel->buffer=buffer;
35     pParcel->pos=buffer;
36     addParcel(cb->pOutHead, pParcel);
37 
38     return 0;
39 }

7.5.2 xudp_dequeueparcel function

int xudp_dequeueparcel(struct XUDPCB *cb);

 1 int xudp_dequeueparcel(struct XUDPCB *cb)
 2 {
 3     struct XUDPParcel *pParcel, *pParcelTmp;
 4     int err;
 5 
 6     /** Traverse list of assembled parcels received from the network */
 7 
 8     pParcel=cb->pInHead->next;
 9     while (pParcel!=NULL)
10     {
11         pParcelTmp=pParcel->next;
12         
13         /** Hits the earliest parcel first! */
14         /** Write the parcel's data to the appropriate socket stream */
15 
16         err=write(cb->localclient->fdVideo, pParcel->buffer,
17                   pParcel->length);
18 
19         /** Send the appropriate command and size over the control */
20         /** socket connection */
21 
22         xudpSendFunction(cb->localclient, F_RECVVIDEO,
23                          (char *)&pParcel->length);
24 
25 
26         /** Remove the parcel from the queue, if it was sent ok. */
27         /**  If it wasn't, return immediately */
28         
29         if (err==pParcel->length)
30             deleteParcel(pParcel);
31         else return -1;
32         pParcel=pParcelTmp;
33     }
34     return 0;
35 }

7.5.3 xudp_makecommandparcel function

int xudp_makecommandparcel(struct XUDPCB *cb);

 1 int xudp_makecommandparcel(struct XUDPCB *cb)
 2 {
 3     struct XUDPParcel *pParcel;
 4 
 5     /** Set up a parcel for a RELIABLE, COMMAND payload */
 6 
 7     pParcel=makeParcel();
 8     pParcel->length=0;
 9     pParcel->options=COMMAND;
10     pParcel->tto=RELIABLE;
11     pParcel->buffer=NULL;
12     pParcel->pos=NULL;
13 
14     /** Queue parcel on the list of parcels waiting to be fragmented */
15     /** and sent on the network */
16 
17     addParcel(cb->pOutHead, pParcel);
18     cb->push=TRUE;
19 
20     return 0;
21 }

7.5.4 xudp_fragmentparcel function

int xudp_fragmentparcel(struct XUDPCB *cb);

 1 int xudp_fragmentparcel(struct XUDPCB *cb)
 2 {
 3     static Sequence      sequence=0;
 4     struct XUDPParcel *pParcel, *pParcelTmp;
 5     struct XUDPPacket    *pHead, *pPacket;
 6     struct XUDPCmd       *pCmd, *pCmdTmp;
 7     u_char               *pPayload;
 8     u_short              cmds=0;
 9     u_short              temp;
10     u_short              length=0;
11     u_short              datalen=0;
12     
13     pHead=cb->pXmitQueue;
14     pPacket=NULL;
15 
16     pParcel=cb->pOutHead->next;
17     pCmd=cb->pCmdQueue->next;
18 
19     /** We're only going to fragment parcels to create packets while */
20     /** there is room available on the network */
21     /***  PARCEL LEVEL: Traverses the list of parcels waiting to be */
22     /***                fragmented */
23 
24     while ((pParcel!=NULL) &&
25            (cb->packetsInPipe < (min(cb->winRemote, cb->winCongestion))
26             || cb->push))
27     { 
28         pParcelTmp=pParcel->next;
29 
30         /** Test again to make sure we still have room on the net. */
31         /** This may create a packet with data from this parcel, with */
32         /** data from commands that are queued up to be sent (even a */
33         /** zero-length payload with only commands is okay). */
34         /*** PACKET LEVEL: Creates a new packet with either commands, */
35         /***               payload or both */
36 
37         while (((pParcel->length) || (pCmd!=NULL)) &&
38                ((cb->packetsInPipe < min(cb->winRemote, cb->winCongestion)) ||
39                 cb->push))
40         {
41             cmds=0;
42             length=0;
43             datalen=0;
44 
45             pPacket=makePacket();
46             addPacket(pHead, pPacket);  /* Tack this new packet onto the */
47                                         /* Xmit queue */ 
48 
49             pPayload=(pPacket->payload)+HEADERSIZE; /* Skip over the space */
50                                                   /* where the header goes */
51 
52             if (pParcel->buffer==pParcel->pos) sequence=0;
53                                /* if we're at the start of a parcel, reset */
54                                /* packet sequence */
55             
56             /* Go through the list of queued up commands, and add up all */
57             /* the commands and lengths of arguments */
58 
59             pCmdTmp=pCmd;
60             temp=0;
61             while (pCmdTmp!=NULL)
62             {
63                 temp+=1+pCmdTmp->length;
64                 pCmdTmp=pCmdTmp->next;
65             }
66             /** "temp" now holds the total length of all of the */
67             /** commands in the command queue */
68 
69             /* Sees if this is going to be the last packet in the parcel, */
70             /* if so, queues up a EOP command for it */
71             /** Commands actually take precedence to data here, so all */
72             /** commands that are queued up when xudp_fragmentparcel */
73             /** is called get sent before any data */
74 
75             if ((temp+HEADERSIZE+pParcel->length+1)<=cb->mps)
76             {
77                 pCmdTmp=makeCmd();
78                 pCmdTmp->cmd=C_EOP;
79                 pCmdTmp->length=0;
80                 addCmd(cb->pCmdQueue, pCmdTmp);
81                 pCmd=cb->pCmdQueue->next;
82             }
83 
84             /** COMMAND LEVEL: Traverse list of commands waiting to be */
85             /**                sent, preparing and de-queueing them */
86             /**    Add commands to packet until we've reached the end */
87             /**    of the list or we've reached the maximum allowable */
88             /**    commands in a packet.  If the packet is a COMMAND */
89             /**    packet, then there isn't a limit. */
90 
91             while ((pCmd!=NULL) && ((cmds<=MAXCMD) ||
92                                     (pParcel->options==COMMAND)))
93             {
94                 pCmdTmp=pCmd;
95                 
96                 /** Make sure we have space left in the packet! */ 
97 
98                 if (length <= (cb->mps - HEADERSIZE - pCmd->length))
99                 {
100                     *(pPayload++)=pCmd->cmd;
101                     if (pCmd->length>0)
102                         memcpy(pPayload, pCmd->argbuffer, pCmd->length); 
103                     pPayload+=pCmd->length;
104                     length+=(pCmd->length)+1;
105                     cmds++;
106                     pCmd=pCmd->next;
107                     deleteCmd(pCmdTmp); /* Delete old commands */
108                 }
109                 else pCmd=pCmd->next;
110             }
111                 
112             /** Finish setting up the packet headers and stuff */
113             /*** "temp" is now the minimum of the maximum remaining */
114             /*** payload capacity of this packet and the remaining */
115             /*** bytes in the current parcel */
116 
117             temp=min((cb->mps - HEADERSIZE - length), pParcel->length);
118             length+=temp;
119             datalen+=temp;
120 
121             /** Copy the payload out of the parcel into the packet */
122 
123             if (temp) memcpy(pPayload, pParcel->pos, temp);
124             
125             pPacket->header.sequence=sequence;
126             pPacket->header.parcel=pParcel->parcel;
127             pPacket->header.length=datalen; /* Header length field only */
128                                             /* represents the data */
129                                             /* payload (no commands) */
130             pPacket->header.window=cb->winAdvertised;
131             pPacket->header.timestamp=makeTimestamp();
132             pPacket->header.options=pParcel->options;
133             pPacket->header.tto=pParcel->tto;
134             pPacket->header.cmds=cmds;
135             pPacket->length=length+HEADERSIZE;  /* Packet length field */
136                                                 /* represents both */
137                                                 /* header and payload */
138 
139             /** Pop data off the top of the parcel's payload */
140 
141             pParcel->length-=datalen;
142             pParcel->pos+=datalen;
143 
144             if (++sequence==65535) sequence=0; /* Wrap sequence numbers */
145             
146             cb->packetsInPipe++;
147             cb->push=FALSE;
148 
149 #ifdef DEBUG2
150             show_status(cb);
151 #endif
152         }
153 
154         if (pParcel->length==0) deleteParcel(pParcel);
155 
156         pParcel=pParcelTmp;
157     }
158 
159     return 0;
160 }

7.5.5 xudp_reassembleparcel function

{int xudp_reassembleparcel(struct XUDPCB *cb);

 1 int xudp_reassembleparcel(struct XUDPCB *cb)
 2 {
 3     struct XUDPParcel        *pParcel;
 4     struct XUDPFragParcel    *pFragParcel, *pFragParcelTmp;
 5     struct XUDPPacket        *pPacket, *pPacketTmp;
 6     char                     *buffer;
 7 
 8     /** Traverse list of fragmented parcels that have been received */
 9     /** from the network */
10 
11     pFragParcel=cb->pRecvWaitQueue->next;
12     while(pFragParcel!=NULL)
13     {
14         pFragParcelTmp=pFragParcel->next;
15         
16         /** Only consider the ones that are complete and have sent an */
17         /** acknowledgement to the sender */
18 
19         if (pFragParcel->ack)
20         {
21             /** Only attempt to reassemble something if its not a command */
22             /** parcel! */
23 
24             if (pFragParcel->headPacket->header.length>0)
25             {
26                 /** Create a new raw parcel to reassemble into */
27 
28                 pParcel=makeParcel();
29                 pParcel->options=pFragParcel->options;
30                 pParcel->tto=pFragParcel->tto;
31                 pParcel->parcel=pFragParcel->parcel;
32                 pParcel->length=0;
33                 
34                 /** Traverse list of packets that form our parcel to */
35                 /** determine the size of buffer to allocate for the raw */
36                 /** parcel */
37                 
38                 pPacket=pFragParcel->headPacket;
39                 while (pPacket!=NULL)
40                 {
41                     pParcel->length+=pPacket->header.length;
42                     pPacket=pPacket->next;
43                 }
44                 
45                 if ( !(pParcel->buffer=malloc(pParcel->length)) )
46                     xudpError("xudp_reassembleparcel: Allocating RAM");
47                 buffer=pParcel->buffer;
48                 pParcel->pos=buffer;
49                 
50                 /** Traverse the list of packets that form our parcel */
51                 /** and tack their payloads onto our parcel's buffer */
52                 
53                 pPacket=pFragParcel->headPacket;
54                 while (pPacket!=NULL)
55                 {
56                     pPacketTmp=pPacket->next;
57                     memcpy(buffer, pPacket->payload, pPacket->header.length); 
58                     buffer+=pPacket->header.length;
59                     deletePacket(pPacket);
60                     pPacket=pPacketTmp;
61                 }
62             
63                 /** Queue up our reassembled RAW parcel on the list of */
64                 /** parcels waiting to be read by the application */
65                 
66                 addParcel(cb->pInHead, pParcel);
67             }
68             
69             /** Record that this parcel number has been used */
70             cb->usedparcels[pFragParcel->parcel]=TRUE;
71 
72             /** Wipe the fragmented version */
73             deleteFragParcel(pFragParcel);
74 
75 #ifdef DEBUG2
76             show_status(cb);
77 #endif
78         }
79         pFragParcel=pFragParcelTmp;
80     }
81 
82     return 0;
83 }

7.5.6 xudp_recvackparcels function

int xudp_recvackparcels(struct XUDPCB *cb);

 1 int xudp_recvackparcels(struct XUDPCB *cb)
 2 {
 3     struct XUDPCmd    *pCmd, *pCmdTmp;
 4     struct XUDPFragParcel *pFragParcel, *pFragParcelTmp;
 5     struct XUDPPacket *pPacket, *pPacketTmp;
 6     Sequence          parcel;
 7     Boolean           eflag=FALSE, anyacks=FALSE;
 8     Timestamp         ts;
 9 
10     pCmd=cb->pCmdInQueue->next;
11     while (pCmd!=NULL)
12     {
13         pCmdTmp=pCmd->next;
14         if (pCmd->cmd==C_ACKPARCEL)
15         {
16             memcpy(&parcel, pCmd->argbuffer, sizeof(Sequence));
17             parcel=ntohs(parcel);
18             eflag=FALSE;
19             pFragParcel=cb->pAckWaitQueue->next;
20             while ((pFragParcel!=NULL)&&!eflag)
21             {
22                 pFragParcelTmp=pFragParcel->next;
23                 if (pFragParcel->parcel==parcel)
24                 {
25                     eflag=TRUE;
26                     pPacket=pFragParcel->headPacket;
27                     while (pPacket!=NULL)
28                     {
29                         pPacketTmp=pPacket->next;
30                         if (!pPacket->ack)
31                         {
32                             anyacks=TRUE;
33                             if (cb->packetsInPipe>0) cb->packetsInPipe--;
34                             ts=makeTimestamp();
35                             cb->timeElapsed=ts;
36                             cb->bytesACKed=cb->bytesACKed+pPacket->length;
37                             cb->rtt=ts-pPacket->header.timestamp;
38                             xudp_rttmath(cb);
39                         }
40 
41                         deletePacket(pPacket);
42                         pPacket=pPacketTmp;
43                     }
44                     deleteFragParcel(pFragParcel);
45                 }
46                 pFragParcel=pFragParcelTmp;
47             }
48             deleteCmd(pCmd);
49 #ifdef DEBUG2
50             show_status(cb);
51 #endif
52         }
53         
54         pCmd=pCmdTmp;
55     }
56     if (anyacks) xudp_windowchange(cb);
57     
58     return 0;
59 }

7.5.7 xudp_ackparcels function

int xudp_ackparcels(struct XUDPCB *cb);

 1 int xudp_ackparcels(struct XUDPCB *cb)
 2 {
 3     struct XUDPCmd    *pCmd;
 4     struct XUDPFragParcel *pFragParcel;
 5     struct XUDPPacket *pPacket;
 6     Sequence          sequence;
 7     Boolean           eflag=FALSE;
 8 
 9     /** Traverse list of received fragmented parcels */
10 
11     pFragParcel=cb->pRecvWaitQueue->next;
12     while (pFragParcel!=NULL)
13     {
14         /** Continue if an End-Of-Parcel marker has been received (parcel's */
15         /** end marker is less than 65535) and parcel hasn't already been */
16         /** acknowledged. */
17 
18         if ((pFragParcel->end<65535) && (!pFragParcel->ack))
19         {
20             sequence=0;
21 
22             /** Traverse list of packets that form our fragmented parcel, */
23             /** exiting by setting "eflag" if we find an hole in the */
24             /** parcel, or by reaching the end of the fragments */
25             /** successfully */
26 
27             pPacket=pFragParcel->headPacket;
28             eflag=FALSE;
29             while (!eflag&&(pPacket!=NULL)&&(sequence!=pFragParcel->end))
30             {
31                 if (pPacket->header.sequence==sequence)
32                 {
33                     sequence++;
34                     pPacket=pPacket->next;
35                 }
36                 else eflag=TRUE;
37             }
38 
39             /** Do this if we actually have the entire parcel */
40 
41             if (!eflag && (pPacket!=NULL) && (sequence==pFragParcel->end))
42             {
43                 if (pPacket->header.sequence==sequence)
44                 {
45                     pCmd=makeCmd();
46                     
47                     /** Acknowledge as either a regular parcel or a */
48                     /** command only parcel */
49 
50                     if (pPacket->header.length==0)
51                         pCmd->cmd=C_ACKCMDPCL;
52                     else pCmd->cmd=C_ACKPARCEL;
53                     
54                     /** Queue up an ACKPARCEL for this parcel and mark it */
55                     /** as such. */
56                     
57                     pCmd->length=sizeof(Sequence);
58                     sequence=pFragParcel->parcel;
59                     sequence=htons(sequence);
60                     memcpy(pCmd->argbuffer, &sequence, sizeof(Sequence));
61                     addCmd(cb->pCmdQueue, pCmd);
62                     pFragParcel->ack=TRUE;
63                     
64                     /** Traverse through the packets that form this parcel */
65                     /** and adjust the remote packetsInPipe size to reflect */
66                     /** the ones that haven't been individually ACKed. */
67                     
68                     pPacket=pFragParcel->headPacket;
69                     while (pPacket!=NULL)
70                     {
71                         if (!(pPacket->ack))
72                             if (cb->packetsInPipeRmt>0)
73                                 cb->packetsInPipeRmt--;
74                         pPacket=pPacket->next;
75                     }
76                 }
77             }
78 #ifdef DEBUG2
79             show_status(cb);
80 #endif
81         }
82 
83         pFragParcel=pFragParcel->next;
84     }
85 
86     return 0;
87 }

7.5.8 xudp_endparcels function

int xudp_endparcel(struct XUDPCB *cb);

 1 int xudp_endparcel(struct XUDPCB *cb)
 2 {
 3     struct XUDPFragParcel *pFragParcel;
 4     struct XUDPCmd        *pCmd, *pCmdTmp;
 5     Sequence              sequence;
 6     Boolean               eflag;
 7 
 8 
 9     /*** Scan through the incoming commands queue for an End Of Parcel. */
10     /*** If we find it, locate the appropriate parcel in the Receive */
11     /*** Wait Queue and mark its ending packet. */
12 
13     /** Traverse the list of commands that have been dequeued from */
14     /** packets received from the network */
15 
16     pCmd=cb->pCmdInQueue->next;
17     while (pCmd!=NULL)
18     {
19         pCmdTmp=pCmd->next;
20 
21         /** Go until we find an End Of Parcel command */
22 
23         if (pCmd->cmd==C_EOP)
24         {
25             memcpy(&sequence, pCmd->argbuffer, sizeof(Sequence));
26 
27             /** Traverse the list of parcels that are in the process of */
28             /** receiving packets from the network */
29 
30             eflag=FALSE;
31             pFragParcel=cb->pRecvWaitQueue->next;
32             while ((pFragParcel!=NULL)&&!eflag)
33             {
34                 /** If we find a parcel that matches the one we have an */
35                 /** EOP command for, or we reach the end, exit the loop. */
36 
37                 if (pFragParcel->parcel!=sequence)
38                     pFragParcel=pFragParcel->next;
39                 else eflag=TRUE;
40             }
41 
42             /** Set the end packet sequence number on the parcel */
43 
44             if (pFragParcel!=NULL)
45                 memcpy(&pFragParcel->end,
46                        pCmd->argbuffer+sizeof(Sequence),
47                        sizeof(Sequence)); 
48 
49             /** De-queue the command, as it was either used or refers */
50             /** to an erroneous parcel that we don't have any data for */
51 
52             deleteCmd(pCmd);
53         }
54         pCmd=pCmdTmp;
55     }
56     return 0;
57 }

7.5.9 xudp_removeobsolete function

int xudp_removeobsolete(struct XUDPCB *cb);

 1 int xudp_removeobsolete(struct XUDPCB *cb)
 2 {
 3     struct XUDPParcel     *pParcel, *pParcelTmp;
 4     struct XUDPFragParcel *pFragParcel, *pFragParcelTmp;
 5     struct XUDPPacket     *pPacket;
 6     Timestamp             ts;
 7     int                   count;
 8 
 9     /** Get the current time */
10     ts=makeTimestamp();
11     
12     /** Add 1/2 the recent average RTT to it (the delivery time) */
13     ts+=(int)(0.5*cb->rttavg);
14 
15     /** Traverse through the outgoing raw parcel queue */
16     
17     pParcel=cb->pOutHead->next;
18     while (pParcel!=NULL)
19     {
20         pParcelTmp=pParcel->next;
21 
22         /** Test if we've exceeded the "sell by" time on this parcel, if */
23         /** we have, delete it. */
24 
25         if (pParcel->tto<=(ts-pParcel->timestamp))
26             deleteParcel(pParcel);
27 
28         pParcel=pParcelTmp;
29     }
30 
31     /** Traverse the Outgoing ACK wait queue */
32 
33     pFragParcel=cb->pAckWaitQueue->next;
34     while (pFragParcel!=NULL)
35     {
36         pFragParcelTmp=pFragParcel->next;
37 
38         /** Test to see if we've exceeded the "sell by" time on this */
39         /** parcel */
40 
41         if (pFragParcel->tto<=(ts-pFragParcel->timestamp))
42         {
43             /** Traverse the list of packets that form the fragmented */
44             /** parcel */
45 
46             count=0;
47             pPacket=pFragParcel->headPacket;
48             while (pPacket!=NULL)
49             {
50                 /** Keep the accounting straight by incrementing a */
51                 /** counter variable every time we find a packet that */
52                 /** hasn't been acknowledged.  (i.e. we still think */
53                 /** its in the network */
54 
55                 if (!(pPacket->ack)) count++;
56 
57                 pPacket=pPacket->next;
58             }
59             
60             /** Delete the parcel */
61 
62             deleteFragParcel(pFragParcel);
63 
64             /** Realize that these packets can't be in the network */
65             /** anymore, and that they have lived out their */
66             /** usefullness.  Remove them from the accounting stuff */
67 
68             cb->packetsInPipe-=count;
69         }
70         pFragParcel=pFragParcelTmp;
71     }
72 
73     /** Traverse the incoming queue of received fragmented parcels */
74 
75     pFragParcel=cb->pRecvWaitQueue->next;
76     while (pFragParcel!=NULL)
77     {
78         pFragParcelTmp=pFragParcel->next;
79 
80         /** Test to see if we've exceeded the "sell by" time on this */
81         /** parcel, if we have, delete it now. */
82         
83         if (pFragParcel->tto<=(ts-pFragParcel->timestamp))
84             deleteFragParcel(pFragParcel);
85         
86         pFragParcel=pFragParcelTmp;
87     }
88 
89     return 0;
90 }



Mike Andrews
Wed Mar 19 16:07:58 EST 1997