I/O Layering Sample
The following example demonstrates the use of layered I/O protocols. It
also depicts appropriate use of command line processing and thread managment.
The source in this chapter is heavily annotated and for purposes of presentation, the source may be presented out of sequence. The sequence number are accurate and may be correlated to the actual code.
Preliminary requirements
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* 3 * The contents of this file are subject to the Netscape Public License 4 * Version 1.0 (the "NPL"); you may not use this file except in 5 * compliance with the NPL. You may obtain a copy of the NPL at 6 * http://www.mozilla.org/NPL/ 7 * 8 * Software distributed under the NPL is distributed on an "AS IS" basis, 9 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the NPL 10 * for the specific language governing rights and limitations under the 11 * NPL. 12 * 13 * The Initial Developer of this code under the NPL is Netscape 14 * Communications Corporation. Portions created by Netscape are 15 * Copyright (C) 1998 Netscape Communications Corporation. All Rights 16 * Reserved. 17 */ 18 19 #include <prio.h> 20 #include <prprf.h> 21 #include <prlog.h> 22 #include <prnetdb.h> 23 #include <prthread.h> 24 25 #include <plerror.h> 26 #include <plgetopt.h> 27 #include <prwin16.h> 28 29 #include <stdlib.h> 30 31 /* 32 ** Testing layering of I/O 33 ** 34 ** The layered server 35 ** A thread that acts as a server. It creates a TCP listener with a dummy 36 ** layer pushed on top. Then listens for incoming connections. Each connection 37 ** request for connection will be layered as well, accept one request, echo 38 ** it back and close. 39 ** 40 ** The layered client 41 ** Pretty much what you'd expect. 42 */ 43 44 static PRFileDesc *logFile; 45 static PRDescIdentity identity; 46 static PRNetAddr server_address; 47 48 static PRIOMethods myMethods; 49 50 typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity; 51 52 static PRIntn minor_iterations = 5; 53 static PRIntn major_iterations = 1; 54 static Verbosity verbosity = quiet; 55 static PRUint16 default_port = 12273;
Utility function
The following function is a workaround for a feature of C++' treatment of enumerateds. It is not something one would normally hold up for review in front of a collection of language lawyers. But it is used by main() and therefore included here.223 static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta) 224 { 225 PRIntn verbage = (PRIntn)verbosity + delta; 226 if (verbage < (PRIntn)silent) verbage = (PRIntn)silent; 227 else if (verbage > (PRIntn)noisy) verbage = (PRIntn)noisy; 228 return (Verbosity)verbage; 229 } /* ChangeVerbosity */ 230
Application's main
The main function's duties are to process the command line options create the layered protocol stacks and the threads that are to operate on those stacks. The goal of the test is to verify that the clients of the layered protocol stacks can remain unaware of the workings of the layers making up the stack.231 PRIntn main(PRIntn argc, char **argv) 232 { 233 PRStatus rv; 234 PRIntn mits; 235 PLOptStatus os; 236 PRFileDesc *client, *service; 237 const char *server_name = NULL; 238 const PRIOMethods *stubMethods; 239 PRThread *client_thread, *server_thread; 240 PRThreadScope thread_scope = PR_LOCAL_THREAD; 241 PLOptState *opt = PL_CreateOptState(argc, argv, "dqGC:c:p:"); 242 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) 243 { 244 if (PL_OPT_BAD == os) continue; 245 switch (opt->option) 246 { 247 case 0: 248 server_name = opt->value; 249 break; 250 case 'd': /* debug mode */ 251 if (verbosity < noisy) 252 verbosity = ChangeVerbosity(verbosity, 1); 253 break; 254 case 'q': /* debug mode */ 255 if (verbosity > silent) 256 verbosity = ChangeVerbosity(verbosity, -1); 257 break; 258 case 'G': /* use global threads */ 259 thread_scope = PR_GLOBAL_THREAD; 260 break; 261 case 'C': /* number of threads waiting */ 262 major_iterations = atoi(opt->value); 263 break; 264 case 'c': /* number of client threads */ 265 minor_iterations = atoi(opt->value); 266 break; 267 case 'p': /* default port */ 268 default_port = atoi(opt->value); 269 break; 270 default: 271 break; 272 } 273 } 274 PL_DestroyOptState(opt); 275 PR_STDIO_INIT(); 276 277 logFile = PR_GetSpecialFD(PR_StandardError); 278 279 identity = PR_GetUniqueIdentity("Dummy"); 280 stubMethods = PR_GetDefaultIOMethods(); 281 282 /* 283 ** The protocol we're going to implement is one where in order to initiate 284 ** a send, the sender must first solicit permission. Therefore, every 285 ** send is really a send - receive - send sequence. 286 */ 287 myMethods = *stubMethods; /* first get the entire batch */ 288 myMethods.recv = MyRecv; /* then override the ones we care about */ 289 myMethods.send = MySend; /* then override the ones we care about */ 290 291 if (NULL == server_name) 292 rv = PR_InitializeNetAddr( 293 PR_IpAddrLoopback, default_port, &server_address); 294 else 295 { 296 rv = PR_StringToNetAddr(server_name, &server_address); 297 PR_ASSERT(PR_SUCCESS == rv); 298 rv = PR_InitializeNetAddr( 299 PR_IpAddrNull, default_port, &server_address); 300 } 301 PR_ASSERT(PR_SUCCESS == rv); 302 303 /* one type w/o layering */ 304 305 mits = minor_iterations; 306 while (major_iterations-- > 0) 307 { 308 if (verbosity > silent) 309 PR_fprintf(logFile, "Beginning non-layered test\n"); 310 client = PR_NewTCPSocket(); PR_ASSERT(NULL != client); 311 service = PR_NewTCPSocket(); PR_ASSERT(NULL != service); 312 313 minor_iterations = mits; 314 server_thread = PR_CreateThread( 315 PR_USER_THREAD, Server, service, 316 PR_PRIORITY_HIGH, thread_scope, 317 PR_JOINABLE_THREAD, 16 * 1024); 318 PR_ASSERT(NULL != server_thread); 319 320 client_thread = PR_CreateThread( 321 PR_USER_THREAD, Client, client, 322 PR_PRIORITY_NORMAL, thread_scope, 323 PR_JOINABLE_THREAD, 16 * 1024); 324 PR_ASSERT(NULL != client_thread); 325 326 rv = PR_JoinThread(client_thread); 327 PR_ASSERT(PR_SUCCESS == rv); 328 rv = PR_JoinThread(server_thread); 329 PR_ASSERT(PR_SUCCESS == rv); 330 331 rv = PR_Close(client); PR_ASSERT(PR_SUCCESS == rv); 332 rv = PR_Close(service); PR_ASSERT(PR_SUCCESS == rv); 333 if (verbosity > silent) 334 PR_fprintf(logFile, "Ending non-layered test\n"); 335 336 /* with layering */ 337 if (verbosity > silent) 338 PR_fprintf(logFile, "Beginning layered test\n"); 339 client = PR_NewTCPSocket(); PR_ASSERT(NULL != client); 340 service = PR_NewTCPSocket(); PR_ASSERT(NULL != service); 341 342 minor_iterations = mits; 343 server_thread = PR_CreateThread( 344 PR_USER_THREAD, Server, PushLayer(service), 345 PR_PRIORITY_HIGH, thread_scope, 346 PR_JOINABLE_THREAD, 16 * 1024); 347 PR_ASSERT(NULL != server_thread); 348 349 client_thread = PR_CreateThread( 350 PR_USER_THREAD, Client, PushLayer(client), 351 PR_PRIORITY_NORMAL, thread_scope, 352 PR_JOINABLE_THREAD, 16 * 1024); 353 PR_ASSERT(NULL != client_thread); 354 355 rv = PR_JoinThread(client_thread); 356 PR_ASSERT(PR_SUCCESS == rv); 357 rv = PR_JoinThread(server_thread); 358 PR_ASSERT(PR_SUCCESS == rv); 359 360 rv = PR_Close(client); PR_ASSERT(PR_SUCCESS == rv); 361 rv = PR_Close(service); PR_ASSERT(PR_SUCCESS == rv); 362 if (verbosity > silent) 363 PR_fprintf(logFile, "Ending layered test\n"); 364 } 365 return 0; 366 } /* main */
Lines 241 through 274 deal with the processing of the command line arguments. The facility used is one available (plgetopt.h) from one of the NSPR libraries.
The PR_STDIO_INIT macro in line 275 handles some of the differences between systems that define a convenient console I/O facility and systems that don't. (PR_STDIO_Init and other WIN-16-specific macros are defined in prwin16.h.) The macro must be invoked before any attempt to use the special I/O functions, including getting the file descriptor to access the console I/O. Special I/O functions are those that locate "special" file descriptors. At the present time, those are the Unix equivalents of stdin, stdout, and stderr. They are all console I/O devices, unique to Unix and therefore special.
It is strongly recommended that clients do not assume the makeup of network addresses. Even though Internet Protocol version 4 (IPv4) is the most popular protocol at this time, IPv6 is looming on the horizon. Using the API provided by NSPR rather than accessing the address fields directly isolates your code from the transition. Lines 291 through 300 illustrate the use of this API. For more information, see Network Address Types and Initializing a Network Address.
Lines 310 and 311 illustrate the use of PR_NewTCPSocket to create new network file descriptors. These sockets are bound to the IP/TCP protocol family, but not to a specific address. To create a UDP file descriptor, use PR_NewUDPSocket.
Lines 314 through 324 illustrate the use of PR_CreateThread to create a new thread that runs parallel to its parent. NSPR threads have no hierarchy. Once created, they have no defined relationship to their creator. Any thread can create another thread, and the life expectancy of the two threads is afterwards unrelated. This is true of all threads except the primordial thread (the thread that calls main). Should the primordial thread exit its root function--that is, main--all threads in the process will be abruptly terminated. Facilities are available to synchronize the termination of the other threads in a process (see Thread Synchronization Sample).
The newly created thread begins execution at the entry to the function noted in the start parameter of PR_CreateThread--in this example, Server and Client. NSPR does not define when that root function is actually entered relative to PR_CreateThread returning.
Lines 326 through 329 illustrate the use of PR_JoinThread to synchronize the termination of the calling thread with the termination of some other thread. The caller of PR_JoinThread is blocked until the target of the join operation terminates (actually, until it returns from the root function). Once PR_JoinThread returns, all references to the joined thread (the argument of PR_JoinThread) are no longer valid.
Lines 331 and 332 call PR_Close to close the sockets opened earlier with PR_NewTCPSocket. This operation ensures that all resources associated with the file descriptor are released. Once the call to PR_Close returns, the file descriptor is no longer valid.
Lines 308 though 334 run the newly created TCP connections in an unlayered mode. Lines 343 through 361 simple repeat the same test, but with layered protocol stacks.
PushLayer function
The PushLayer function simply takes the exiting protocol stack, creates a new layer, modifies the new layer appropriately, and pushes it on top of the existing stack. The result that is returned is the top of the stack, which coincides to the value of stack passed in.
57 static PRFileDesc *PushLayer(PRFileDesc *stack) 58 { 59 PRFileDesc *layer = PR_CreateIOLayerStub(identity, &myMethods); 60 PRStatus rv = PR_PushIOLayer(stack, PR_GetLayersIdentity(stack), layer); 61 if (verbosity > quiet) 62 PR_fprintf(logFile, "Pushed layer(0x%x) onto stack(0x%x)\n", layer, stack); 63 PR_ASSERT(PR_SUCCESS == rv); 64 return stack; 65 } /* PushLayer */
The modifications to the default layer object returned by PR_CreateIOLayerStub are to replace the methods table with one appropriate for this test. In this particular instance, the only methods replaced are the send() and recv() methods, all others having appropriate default behavior.
PopLayer function
The PopLayer function isn't actually used by this test program. It would be approprate if an application wished to modify the behvior of an existing protocol stack, then reinstate it to it's original condition. In the case of this program, the desire was to test PR_Close's ability to transparently deal with arbitrary layers.
68 static PRFileDesc *PopLayer(PRFileDesc *stack) 69 { 70 PRFileDesc *popped = PR_PopIOLayer(stack, identity); 71 if (verbosity > quiet) 72 PR_fprintf(logFile, "Popped layer(0x%x) from stack(0x%x)\n", popped, stack); 73 popped->dtor(popped); 74 75 return stack; 76 } /* PopLayer */
The argument passed to PopLayer is the address of the layer to be removed from the existing protocol stack. Once it is removed, the layer's destructor is called at line (A HREF="prlayer.html#Layer073">73. The value returned (unnecessarily from the function is the same that was passed in. PR_PopIOLayer() always preserves the stack's top address even if it is the layer popped.
Client callback function
The callback function Client is referenced from line 350 of main(). Client is the root function of the client thread. The argument passed the the thread is the address of the top layer of the protocol stack. Client is not aware of whether the stack is layered.
79 static void PR_CALLBACK Client(void *arg) 80 { 81 PRStatus rv; 82 PRUint8 buffer[100]; 83 PRIntn empty_flags = 0; 84 PRIntn bytes_read, bytes_sent; 85 PRFileDesc *stack = (PRFileDesc*)arg; 86 87 rv = PR_Connect(stack, &server_address, PR_INTERVAL_NO_TIMEOUT); 88 PR_ASSERT(PR_SUCCESS == rv); 89 while (minor_iterations-- > 0) 90 { 91 bytes_sent = PR_Send( 92 stack, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); 93 PR_ASSERT(sizeof(buffer) == bytes_sent); 94 if (verbosity > chatty) 95 PR_fprintf(logFile, "Client sending %d bytes\n", bytes_sent); 96 bytes_read = PR_Recv( 97 stack, buffer, bytes_sent, empty_flags, PR_INTERVAL_NO_TIMEOUT); 98 if (verbosity > chatty) 99 PR_fprintf(logFile, "Client receiving %d bytes\n", bytes_read); 100 PR_ASSERT(bytes_read == bytes_sent); 101 } 102 103 if (verbosity > quiet) 104 PR_fprintf(logFile, "Client shutting down stack\n"); 105 106 rv = PR_Shutdown(stack, PR_SHUTDOWN_BOTH); PR_ASSERT(PR_SUCCESS == rv); 107 } /* Client */The call to PR_Connect in line 87 binds the address the connection and establishes the virtual circuit to the peer. The use of PR_INTERVAL_NO_TIMEOUT as the second parameter to PR_Connect is risky. It indicates that the operation will either succeed or die trying.
Send operations like that shown at line 91 either completes (transmit all the data) or fails before returning. The use of PR_INTERVAL_NO_TIMEOUT here is also risky, though not as risky as in the call to PR_Connect. A transport may fail for a number of reasons. The timeout interval (had it been used) would apply to the interval allowed to send the entire buffer (sizeof(buffer) in this case).
A read operation like that shown at line 96 may actually return with fewer bytes than were asked for, thus requiring resubmission of the request. This is a reflection of the TCP and BSD approach to sockets. The call to PR_Recv is also using an infinite timeout. Since the PR_Recv function will return with one or more bytes, the timeout would be applied to the reception of the first byte of an arbitrary number of bytes.
Once the exchange has been accomplished, the transport is shut down with a call to PR_Shutdown, as shown in line 106. This operation effectively disables any further use of the file descriptor for anything other than closing it. The call to PR_Shutdown does not, however, eliminate the need to close the descriptor. The call to PR_Close is in main() (the creator the the TCP stream) and is required to reclaim the resources used by the runtime to support the connection.
Server callback function
The callback function Server is referenced from line 344 of main(). Like Client it is the root function of the server thread. Like the Client function, the argument passed to Server is the top layer of an arbitrary protocol stack. The server thread uses this stack to listen for incoming connections as a well known port number.
109 static void PR_CALLBACK Server(void *arg) 110 { 111 PRStatus rv; 112 PRUint8 buffer[100]; 113 PRFileDesc *service; 114 PRUintn empty_flags = 0; 115 PRIntn bytes_read, bytes_sent; 116 PRFileDesc *stack = (PRFileDesc*)arg; 117 PRNetAddr any_address, client_address; 118 119 rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &any_address); 120 PR_ASSERT(PR_SUCCESS == rv); 121 122 rv = PR_Bind(stack, &any_address); PR_ASSERT(PR_SUCCESS == rv); 123 rv = PR_Listen(stack, 10); PR_ASSERT(PR_SUCCESS == rv); 124 125 service = PR_Accept(stack, &client_address, PR_INTERVAL_NO_TIMEOUT); 126 if (verbosity > quiet) 127 PR_fprintf(logFile, "Server accepting connection\n"); 128 129 do 130 { 131 bytes_read = PR_Recv( 132 service, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); 133 if (0 != bytes_read) 134 { 135 if (verbosity > chatty) 136 PR_fprintf(logFile, "Server receiving %d bytes\n", bytes_read); 137 PR_ASSERT(bytes_read > 0); 138 bytes_sent = PR_Send( 139 service, buffer, bytes_read, empty_flags, PR_INTERVAL_NO_TIMEOUT); 140 if (verbosity > chatty) 141 PR_fprintf(logFile, "Server sending %d bytes\n", bytes_sent); 142 PR_ASSERT(bytes_read == bytes_sent); 143 } 144 145 } while (0 != bytes_read); 146 147 if (verbosity > quiet) 148 PR_fprintf(logFile, "Server shutting down and closing stack\n"); 149 rv = PR_Shutdown(service, PR_SHUTDOWN_BOTH); PR_ASSERT(PR_SUCCESS == rv); 150 rv = PR_Close(service); PR_ASSERT(PR_SUCCESS == rv); 151 152 } /* Server */
At line 112 is the allocation of a 100 byte buffer, an automatic variable. According the memo describing the use of such variables, this will fail miserably. It is insanely stupid. Don't ever do this if you want if you want your code to be portable to WIN-16.
Line 119 correctly uses PR_InitializeNetAddr() to initialize a network address suitable for listening for incoming connection requests and a specified port number. That address is then bound to the transport using PR_Bind() and then conditioned for listening using PR_Listen(). At that point the server is ready to accept the connection.
A new connection results from the successful completion of PR_Accept(). This connection is then used as the target of a PR_Recv(), followed by a PR_Send(). This pair of operations will continue until zero bytes are received, indicating an end of file. At that time the stream is shutown, and then since the server thread actually did create this file descriptor, the descriptor is closed as well.
MyRecv callback function
The MyRecv function is a replacement for the default I/O stub's recv() I/O method. Any time a receive operation is performed by a higher layer or the ultimate client of the protocol stack, this function will get invoked. When this function finishes with the duties defined by the layer, it must then call the next lower layer in the protocol stack.
154 static PRInt32 PR_CALLBACK MyRecv( 155 PRFileDesc *fd, void *buf, PRInt32 amount, 156 PRIntn flags, PRIntervalTime timeout) 157 { 158 char *b = (char*)buf; 159 PRFileDesc *lo = fd->lower; 160 PRInt32 rv, readin = 0, request; 161 rv = lo->methods->recv(lo, &request, sizeof(request), flags, timeout); 162 if (verbosity > chatty) PR_fprintf( 163 logFile, "MyRecv sending permission for %d bytes\n", request); 164 if (0 < rv) 165 { 166 if (verbosity > chatty) PR_fprintf( 167 logFile, "MyRecv received permission request for %d bytes\n", request); 168 rv = lo->methods->send( 169 lo, &request, sizeof(request), flags, timeout); 170 if (0 < rv) 171 { 172 if (verbosity > chatty) PR_fprintf( 173 logFile, "MyRecv sending permission for %d bytes\n", request); 174 while (readin < request) 175 { 176 rv = lo->methods->recv( 177 lo, b + readin, amount - readin, flags, timeout); 178 if (rv <= 0) break; 179 if (verbosity > chatty) PR_fprintf( 180 logFile, "MyRecv received %d bytes\n", rv); 181 readin += rv; 182 } 183 rv = readin; 184 } 185 } 186 return rv; 187 } /* MyRecv */
In this sample, the intent of the layer is to implement a protocol that requires the sender and receiver to share the knowledge about how many bytes are going to be transferred. The implication is that in order to do a receive, the layer must first do a receive of four bytes, those bytes being the number of bytes that the sender wishes to send. Furthermore, the reciever must transmit to the peer authorization. Then the method expects to receive the agreed amount of data. Thus, ever apparent read is really two reads, with an intervening write.
MySend callback function
The MySend function is the parallel of MyRecv. Before actually send the requested data, it must ask permission of the receiver.
189 static PRInt32 PR_CALLBACK MySend( 190 PRFileDesc *fd, const void *buf, PRInt32 amount, 191 PRIntn flags, PRIntervalTime timeout) 192 { 193 PRFileDesc *lo = fd->lower; 194 const char *b = (const char*)buf; 195 PRInt32 rv, wroteout = 0, request; 196 if (verbosity > chatty) PR_fprintf( 197 logFile, "MySend asking permission to send %d bytes\n", amount); 198 rv = lo->methods->send(lo, &amount, sizeof(amount), flags, timeout); 199 if (0 < rv) 200 { 201 rv = lo->methods->recv( 202 lo, &request, sizeof(request), flags, timeout); 203 if (0 < rv) 204 { 205 PR_ASSERT(request == amount); 206 if (verbosity > chatty) PR_fprintf( 207 logFile, "MySend got permission to send %d bytes\n", request); 208 while (wroteout < request) 209 { 210 rv = lo->methods->send( 211 lo, b + wroteout, request - wroteout, flags, timeout); 212 if (rv <= 0) break; 213 if (verbosity > chatty) PR_fprintf( 214 logFile, "MySend wrote %d bytes\n", rv); 215 wroteout += rv; 216 } 217 rv = amount; 218 } 219 } 220 return rv; 221 } /* MySend */
The protocol first asks permission by sending a message of well known size, four bytes (the size of a PRInt32). Once that is sent, it anticipates receiving a four byte message in response providing the permission. Once that exchange is complete, the actual data is sent as a single operation.