Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

panda/src/net/connectionReader.cxx

Go to the documentation of this file.
00001 // Filename: connectionReader.cxx
00002 // Created by:  drose (08Feb00)
00003 //
00004 ////////////////////////////////////////////////////////////////////
00005 //
00006 // PANDA 3D SOFTWARE
00007 // Copyright (c) 2001, Disney Enterprises, Inc.  All rights reserved
00008 //
00009 // All use of this software is subject to the terms of the Panda 3d
00010 // Software license.  You should have received a copy of this license
00011 // along with this source code; you will also find a current copy of
00012 // the license at http://www.panda3d.org/license.txt .
00013 //
00014 // To contact the maintainers of this program write to
00015 // panda3d@yahoogroups.com .
00016 //
00017 ////////////////////////////////////////////////////////////////////
00018 
00019 #include "connectionReader.h"
00020 #include "connectionManager.h"
00021 #include "netDatagram.h"
00022 #include "datagramTCPHeader.h"
00023 #include "datagramUDPHeader.h"
00024 #include "pprerror.h"
00025 #include "config_net.h"
00026 
00027 #include <notify.h>
00028 #include <prerror.h>
00029 #include <pratom.h>
00030 #include <algorithm>
00031 
00032 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
00033 
00034 // We have to impose a maximum timeout on the PR_Poll() call because
00035 // PR_Poll() doesn't seem to be interruptible! (!)
00036 static const PRUint32 max_timeout_ms = 100;
00037 
00038 ////////////////////////////////////////////////////////////////////
00039 //     Function: ConnectionReader::SocketInfo::Constructor
00040 //       Access: Public
00041 //  Description:
00042 ////////////////////////////////////////////////////////////////////
00043 ConnectionReader::SocketInfo::
00044 SocketInfo(const PT(Connection) &connection) :
00045   _connection(connection)
00046 {
00047   _busy = false;
00048   _error = false;
00049 }
00050 
00051 ////////////////////////////////////////////////////////////////////
00052 //     Function: ConnectionReader::SocketInfo::is_udp
00053 //       Access: Public
00054 //  Description:
00055 ////////////////////////////////////////////////////////////////////
00056 bool ConnectionReader::SocketInfo::
00057 is_udp() const {
00058   return (PR_GetDescType(_connection->get_socket()) == PR_DESC_SOCKET_UDP);
00059 }
00060 
00061 ////////////////////////////////////////////////////////////////////
00062 //     Function: ConnectionReader::SocketInfo::get_socket
00063 //       Access: Public
00064 //  Description:
00065 ////////////////////////////////////////////////////////////////////
00066 PRFileDesc *ConnectionReader::SocketInfo::
00067 get_socket() const {
00068   return _connection->get_socket();
00069 }
00070 
00071 ////////////////////////////////////////////////////////////////////
00072 //     Function: ConnectionReader::Constructor
00073 //       Access: Public
00074 //  Description: Creates a new ConnectionReader with the indicated
00075 //               number of threads to handle requests.  If num_threads
00076 //               is 0, the sockets will only be read by polling,
00077 //               during an explicit poll() call.
00078 //               (QueuedConnectionReader will do this automatically.)
00079 ////////////////////////////////////////////////////////////////////
00080 ConnectionReader::
00081 ConnectionReader(ConnectionManager *manager, int num_threads) :
00082   _manager(manager)
00083 {
00084   _raw_mode = false;
00085   _polling = (num_threads <= 0);
00086 
00087   _shutdown = false;
00088   _startup_mutex = PR_NewLock();
00089 
00090   _next_index = 0;
00091   _num_results = 0;
00092   _select_mutex = PR_NewLock();
00093 
00094   _currently_polling_thread = -1;
00095 
00096   _reexamine_sockets = false;
00097   _sockets_mutex = PR_NewLock();
00098 
00099   // Before we create all the threads, grab _startup_mutex.  That will
00100   // prevent our new threads from trying to look themselves up in the
00101   // _threads array before we have filled it up.
00102   PR_Lock(_startup_mutex);
00103 
00104   for (int i = 0; i < num_threads; i++) {
00105     PRThread *thread =
00106       PR_CreateThread(PR_USER_THREAD,
00107                       thread_start, (void *)this,
00108                       PR_PRIORITY_NORMAL,
00109                       PR_GLOBAL_THREAD, // Since thread will mostly do I/O.
00110                       PR_JOINABLE_THREAD,
00111                       0);  // Select a suitable stack size.
00112 
00113     nassertv(thread != (PRThread *)NULL);
00114     _threads.push_back(thread);
00115   }
00116 
00117   PR_Unlock(_startup_mutex);
00118 
00119   _manager->add_reader(this);
00120 }
00121 
00122 ////////////////////////////////////////////////////////////////////
00123 //     Function: ConnectionReader::Destructor
00124 //       Access: Public, Virtual
00125 //  Description:
00126 ////////////////////////////////////////////////////////////////////
00127 ConnectionReader::
00128 ~ConnectionReader() {
00129   if (_manager != (ConnectionManager *)NULL) {
00130     _manager->remove_reader(this);
00131   }
00132 
00133   shutdown();
00134 
00135   // Delete all of our old sockets.
00136   Sockets::iterator si;
00137   for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00138     delete (*si);
00139   }
00140   for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
00141     SocketInfo *sinfo = (*si);
00142     if (!sinfo->_busy) {
00143       delete sinfo;
00144     } else {
00145       net_cat.error()
00146         << "Reentrant deletion of ConnectionReader--don't delete these\n"
00147         << "in response to connection_reset().\n";
00148 
00149       // We'll have to do the best we can to recover.
00150       sinfo->_connection.clear();
00151     }
00152   }
00153 
00154   PR_DestroyLock(_startup_mutex);
00155   PR_DestroyLock(_select_mutex);
00156   PR_DestroyLock(_sockets_mutex);
00157 }
00158 
00159 ////////////////////////////////////////////////////////////////////
00160 //     Function: ConnectionReader::add_connection
00161 //       Access: Public
00162 //  Description: Adds a new socket to the list of sockets the
00163 //               ConnectionReader will monitor.  A datagram that comes
00164 //               in on any of the monitored sockets will be reported.
00165 //               In the case of a ConnectionListener, this adds a new
00166 //               rendezvous socket; any activity on any of the
00167 //               monitored sockets will cause a connection to be
00168 //               accepted.
00169 //
00170 //               The return value is true if the connection was added,
00171 //               false if it was already there.
00172 //
00173 //               add_connection() is thread-safe, and may be called at
00174 //               will by any thread.
00175 ////////////////////////////////////////////////////////////////////
00176 bool ConnectionReader::
00177 add_connection(const PT(Connection) &connection) {
00178   nassertr(connection != (Connection *)NULL, false);
00179 
00180   PR_Lock(_sockets_mutex);
00181 
00182   // Make sure it's not already on the _sockets list.
00183   Sockets::const_iterator si;
00184   for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00185     if ((*si)->_connection == connection) {
00186       // Whoops, already there.
00187       PR_Unlock(_sockets_mutex);
00188       return false;
00189     }
00190   }
00191 
00192   _sockets.push_back(new SocketInfo(connection));
00193   _reexamine_sockets = true;
00194   PR_Unlock(_sockets_mutex);
00195 
00196   return true;
00197 }
00198 
00199 ////////////////////////////////////////////////////////////////////
00200 //     Function: ConnectionReader::remove_connection
00201 //       Access: Public
00202 //  Description: Removes a socket from the list of sockets being
00203 //               monitored.  Returns true if the socket was correctly
00204 //               removed, false if it was not on the list in the first
00205 //               place.
00206 //
00207 //               remove_connection() is thread-safe, and may be called
00208 //               at will by any thread.
00209 ////////////////////////////////////////////////////////////////////
00210 bool ConnectionReader::
00211 remove_connection(const PT(Connection) &connection) {
00212   PR_Lock(_sockets_mutex);
00213 
00214   // Walk through the list of sockets to find the one we're removing.
00215   Sockets::iterator si;
00216   si = _sockets.begin();
00217   while (si != _sockets.end() && (*si)->_connection != connection) {
00218     ++si;
00219   }
00220   if (si == _sockets.end()) {
00221     PR_Unlock(_sockets_mutex);
00222     return false;
00223   }
00224 
00225   _removed_sockets.push_back(*si);
00226   _sockets.erase(si);
00227   _reexamine_sockets = true;
00228   PR_Unlock(_sockets_mutex);
00229 
00230   return true;
00231 }
00232 
00233 ////////////////////////////////////////////////////////////////////
00234 //     Function: ConnectionReader::is_connection_ok
00235 //       Access: Public
00236 //  Description: Returns true if the indicated connection has been
00237 //               added to the ConnectionReader and is being monitored
00238 //               properly, false if it is not known, or if there was
00239 //               some error condition detected on the connection.  (If
00240 //               there was an error condition, normally the
00241 //               ConnectionManager would have been informed and closed
00242 //               the connection.)
00243 ////////////////////////////////////////////////////////////////////
00244 bool ConnectionReader::
00245 is_connection_ok(const PT(Connection) &connection) {
00246   PR_Lock(_sockets_mutex);
00247 
00248   // Walk through the list of sockets to find the one we're asking
00249   // about.
00250   Sockets::iterator si;
00251   si = _sockets.begin();
00252   while (si != _sockets.end() && (*si)->_connection != connection) {
00253     ++si;
00254   }
00255   if (si == _sockets.end()) {
00256     // Don't know that connection.
00257     PR_Unlock(_sockets_mutex);
00258     return false;
00259   }
00260 
00261   SocketInfo *sinfo = (*si);
00262   bool is_ok = !sinfo->_error;
00263   PR_Unlock(_sockets_mutex);
00264 
00265   return is_ok;
00266 }
00267 
00268 ////////////////////////////////////////////////////////////////////
00269 //     Function: ConnectionReader::poll
00270 //       Access: Public
00271 //  Description: Explicitly polls the available sockets to see if any
00272 //               of them have any noise.  This function does nothing
00273 //               unless this is a polling-type ConnectionReader,
00274 //               i.e. it was created with zero threads (and
00275 //               is_polling() will return true).
00276 //
00277 //               It is not necessary to call this explicitly for a
00278 //               QueuedConnectionReader.
00279 ////////////////////////////////////////////////////////////////////
00280 void ConnectionReader::
00281 poll() {
00282   if (!_polling) {
00283     return;
00284   }
00285 
00286   SocketInfo *sinfo = get_next_available_socket(PR_INTERVAL_NO_WAIT, -2);
00287   if (sinfo != (SocketInfo *)NULL) {
00288     process_incoming_data(sinfo);
00289   }
00290 }
00291 
00292 ////////////////////////////////////////////////////////////////////
00293 //     Function: ConnectionReader::get_manager
00294 //       Access: Public
00295 //  Description: Returns a pointer to the ConnectionManager object
00296 //               that serves this ConnectionReader.
00297 ////////////////////////////////////////////////////////////////////
00298 ConnectionManager *ConnectionReader::
00299 get_manager() const {
00300   return _manager;
00301 }
00302 
00303 ////////////////////////////////////////////////////////////////////
00304 //     Function: ConnectionReader::is_polling
00305 //       Access: Public
00306 //  Description: Returns true if the reader is a polling reader,
00307 //               i.e. it has no threads.
00308 ////////////////////////////////////////////////////////////////////
00309 bool ConnectionReader::
00310 is_polling() const {
00311   return _polling;
00312 }
00313 
00314 ////////////////////////////////////////////////////////////////////
00315 //     Function: ConnectionReader::get_num_threads
00316 //       Access: Public
00317 //  Description: Returns the number of threads the ConnectionReader
00318 //               has been created with.
00319 ////////////////////////////////////////////////////////////////////
00320 int ConnectionReader::
00321 get_num_threads() const {
00322   return _threads.size();
00323 }
00324 
00325 ////////////////////////////////////////////////////////////////////
00326 //     Function: ConnectionReader::set_raw_mode
00327 //       Access: Public
00328 //  Description: Sets the ConnectionReader into raw mode (or turns off
00329 //               raw mode).  In raw mode, datagram headers are not
00330 //               expected; instead, all the data available on the pipe
00331 //               is treated as a single datagram.
00332 ////////////////////////////////////////////////////////////////////
00333 void ConnectionReader::
00334 set_raw_mode(bool mode) {
00335   _raw_mode = mode;
00336 }
00337 
00338 ////////////////////////////////////////////////////////////////////
00339 //     Function: ConnectionReader::get_raw_mode
00340 //       Access: Public
00341 //  Description: Returns the current setting of the raw mode flag.
00342 //               See set_raw_mode().
00343 ////////////////////////////////////////////////////////////////////
00344 bool ConnectionReader::
00345 get_raw_mode() const {
00346   return _raw_mode;
00347 }
00348 
00349 ////////////////////////////////////////////////////////////////////
00350 //     Function: ConnectionReader::shutdown
00351 //       Access: Protected
00352 //  Description: Terminates all threads cleanly.  Normally this is
00353 //               only called by the destructor.
00354 ////////////////////////////////////////////////////////////////////
00355 void ConnectionReader::
00356 shutdown() {
00357   if (_shutdown) {
00358     return;
00359   }
00360 
00361   // First, begin the shutdown.  This will tell our threads we want
00362   // them to quit.
00363   _shutdown = true;
00364 
00365   // Now wait for all of our threads to terminate.
00366   Threads::iterator ti;
00367   for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
00368     // Interrupt the thread so it can notice the shutdown.
00369     PRStatus result = PR_Interrupt(*ti);
00370     if (result != PR_SUCCESS) {
00371       pprerror("PR_Interrupt");
00372     }
00373 
00374     result = PR_JoinThread(*ti);
00375     if (result != PR_SUCCESS) {
00376       pprerror("PR_JoinThread");
00377     }
00378   }
00379 }
00380 
00381 ////////////////////////////////////////////////////////////////////
00382 //     Function: ConnectionReader::clear_manager
00383 //       Access: Protected
00384 //  Description: This should normally only be called when the
00385 //               associated ConnectionManager destructs.  It resets
00386 //               the ConnectionManager pointer to NULL so we don't
00387 //               have a floating pointer.  This makes the
00388 //               ConnectionReader invalid; presumably it also will be
00389 //               destructed momentarily.
00390 ////////////////////////////////////////////////////////////////////
00391 void ConnectionReader::
00392 clear_manager() {
00393   _manager = (ConnectionManager *)NULL;
00394 }
00395 
00396 ////////////////////////////////////////////////////////////////////
00397 //     Function: ConnectionReader::finish_socket
00398 //       Access: Protected
00399 //  Description: To be called when a socket has been fully read and is
00400 //               ready for polling for additional data.
00401 ////////////////////////////////////////////////////////////////////
00402 void ConnectionReader::
00403 finish_socket(SocketInfo *sinfo) {
00404   nassertv(sinfo->_busy);
00405 
00406   // By marking the SocketInfo nonbusy, we make it available for
00407   // future polls.
00408   sinfo->_busy = false;
00409   _reexamine_sockets = true;
00410 
00411   // However, someone might be already blocking on an
00412   // earlier-established PR_Poll() that doesn't involve this socket.
00413   // That complicates things.  It means we'll have to wake that thread
00414   // up so it can rebuild the poll with the new socket.
00415 
00416   // Actually, don't bother, since it turns out that PR_Poll() isn't
00417   // interruptible anyway.  Sigh.  Maybe we'll revisit this later, but
00418   // in the meantime it means we have to rig up the PR_Poll() to
00419   // return every so often and check the _reexamine_sockets flag by
00420   // itself.
00421 
00422   /*
00423   int thread = _currently_polling_thread;
00424   if (thread != -1) {
00425     nassertv(thread >= 0 && thread < _threads.size());
00426     PR_Interrupt(_threads[thread]);
00427   }
00428   */
00429 }
00430 
00431 ////////////////////////////////////////////////////////////////////
00432 //     Function: ConnectionReader::process_incoming_data
00433 //       Access: Protected, Virtual
00434 //  Description: This is run within a thread when the call to
00435 //               PR_Poll() indicates there is data available
00436 //               on a socket.
00437 ////////////////////////////////////////////////////////////////////
00438 void ConnectionReader::
00439 process_incoming_data(SocketInfo *sinfo) {
00440   if (_raw_mode) {
00441     if (sinfo->is_udp()) {
00442       process_raw_incoming_udp_data(sinfo);
00443     } else {
00444       process_raw_incoming_tcp_data(sinfo);
00445     }
00446   } else {
00447     if (sinfo->is_udp()) {
00448       process_incoming_udp_data(sinfo);
00449     } else {
00450       process_incoming_tcp_data(sinfo);
00451     }
00452   }
00453 }
00454 
00455 ////////////////////////////////////////////////////////////////////
00456 //     Function: ConnectionReader::process_incoming_udp_data
00457 //       Access: Protected
00458 //  Description:
00459 ////////////////////////////////////////////////////////////////////
00460 void ConnectionReader::
00461 process_incoming_udp_data(SocketInfo *sinfo) {
00462   PRFileDesc *socket = sinfo->get_socket();
00463   PRNetAddr addr;
00464 
00465   // Read as many bytes as we can.
00466   PRInt8 buffer[read_buffer_size];
00467   PRInt32 bytes_read;
00468 
00469   bytes_read = PR_RecvFrom(socket, buffer, read_buffer_size, 0,
00470                            &addr, PR_INTERVAL_NO_TIMEOUT);
00471 
00472   if (bytes_read < 0) {
00473     PRErrorCode errcode = PR_GetError();
00474     if (errcode != PR_PENDING_INTERRUPT_ERROR) {
00475       pprerror("PR_RecvFrom");
00476     }
00477     finish_socket(sinfo);
00478     return;
00479 
00480   } else if (bytes_read == 0) {
00481     // The socket was closed (!).  This shouldn't happen with a UDP
00482     // connection.  Oh well.  Report that and return.
00483     if (_manager != (ConnectionManager *)NULL) {
00484       _manager->connection_reset(sinfo->_connection);
00485     }
00486     finish_socket(sinfo);
00487     return;
00488   }
00489 
00490   // Since we are not running in raw mode, we decode the header to
00491   // determine how big the datagram is.  This means we must have read
00492   // at least a full header.
00493   if (bytes_read < datagram_udp_header_size) {
00494     net_cat.error()
00495       << "Did not read entire header, discarding UDP datagram.\n";
00496     finish_socket(sinfo);
00497     return;
00498   }
00499   
00500   DatagramUDPHeader header(buffer);
00501   
00502   PRInt8 *dp = buffer + datagram_udp_header_size;
00503   bytes_read -= datagram_udp_header_size;
00504   
00505   NetDatagram datagram(dp, bytes_read);
00506   
00507   // Now that we've read all the data, it's time to finish the socket
00508   // so another thread can read the next datagram.
00509   finish_socket(sinfo);
00510   
00511   if (_shutdown) {
00512     return;
00513   }
00514   
00515   // And now do whatever we need to do to process the datagram.
00516   if (!header.verify_datagram(datagram)) {
00517     net_cat.error()
00518       << "Ignoring invalid UDP datagram.\n";
00519   } else {
00520     datagram.set_connection(sinfo->_connection);
00521     datagram.set_address(NetAddress(addr));
00522     receive_datagram(datagram);
00523   }
00524 }
00525 
00526 ////////////////////////////////////////////////////////////////////
00527 //     Function: ConnectionReader::process_incoming_tcp_data
00528 //       Access: Protected
00529 //  Description:
00530 ////////////////////////////////////////////////////////////////////
00531 void ConnectionReader::
00532 process_incoming_tcp_data(SocketInfo *sinfo) {
00533   PRFileDesc *socket = sinfo->get_socket();
00534   PRNetAddr addr;
00535 
00536   // Read only the header bytes to start with.
00537   PRInt8 buffer[read_buffer_size];
00538   PRInt32 header_bytes_read = 0;
00539 
00540   if (PR_GetSockName(socket, &addr) != PR_SUCCESS) {
00541     pprerror("PR_GetSockName");
00542   }
00543 
00544   // First, we have to read the first datagram_tcp_header_size bytes.
00545   while (header_bytes_read < datagram_tcp_header_size) {
00546     PRInt32 bytes_read =
00547       PR_Recv(socket, buffer + header_bytes_read,
00548               datagram_tcp_header_size - header_bytes_read, 0,
00549               PR_INTERVAL_NO_TIMEOUT);
00550 
00551     if (bytes_read < 0) {
00552       PRErrorCode errcode = PR_GetError();
00553       if (errcode == PR_CONNECT_RESET_ERROR
00554 #ifdef PR_SOCKET_SHUTDOWN_ERROR
00555           || errcode == PR_SOCKET_SHUTDOWN_ERROR
00556           || errcode == PR_CONNECT_ABORTED_ERROR
00557 #endif
00558           ) {
00559         // The socket was closed.
00560         if (_manager != (ConnectionManager *)NULL) {
00561           _manager->connection_reset(sinfo->_connection);
00562         }
00563 
00564       } else if (errcode != PR_PENDING_INTERRUPT_ERROR) {
00565         pprerror("PR_Recv");
00566       }
00567       finish_socket(sinfo);
00568       return;
00569 
00570     } else if (bytes_read == 0) {
00571       // The socket was closed.  Report that and return.
00572       if (_manager != (ConnectionManager *)NULL) {
00573         _manager->connection_reset(sinfo->_connection);
00574       }
00575       finish_socket(sinfo);
00576       return;
00577     }
00578 
00579     header_bytes_read += bytes_read;
00580   }
00581 
00582   // Now we must decode the header to determine how big the datagram
00583   // is.  This means we must have read at least a full header.
00584   if (header_bytes_read != datagram_tcp_header_size) {
00585     // This should actually be impossible, by the read-loop logic
00586     // above.
00587     net_cat.error()
00588       << "Did not read entire header, discarding TCP datagram.\n";
00589     finish_socket(sinfo);
00590     return;
00591   }
00592 
00593   DatagramTCPHeader header(buffer);
00594   PRInt32 size = header.get_datagram_size();
00595 
00596   // We have to loop until the entire datagram is read.
00597   NetDatagram datagram;
00598 
00599   while (!_shutdown && (int)datagram.get_length() < size) {
00600     PRInt32 bytes_read;
00601 
00602     bytes_read =
00603       PR_Recv(socket, buffer,
00604               min((PRInt32)read_buffer_size,
00605                   (PRInt32)(size - datagram.get_length())),
00606               0, PR_INTERVAL_NO_TIMEOUT);
00607     PRInt8 *dp = buffer;
00608 
00609     if (bytes_read < 0) {
00610       PRErrorCode errcode = PR_GetError();
00611       if (errcode == PR_CONNECT_RESET_ERROR
00612 #ifdef PR_SOCKET_SHUTDOWN_ERROR
00613           || errcode == PR_SOCKET_SHUTDOWN_ERROR
00614           || errcode == PR_CONNECT_ABORTED_ERROR
00615 #endif
00616           ) {
00617         // The socket was closed.
00618         if (_manager != (ConnectionManager *)NULL) {
00619           _manager->connection_reset(sinfo->_connection);
00620         }
00621 
00622       } else if (errcode != PR_PENDING_INTERRUPT_ERROR) {
00623         pprerror("PR_Recv");
00624       }
00625       finish_socket(sinfo);
00626       return;
00627 
00628     } else if (bytes_read == 0) {
00629       // The socket was closed.  Report that and return.
00630       if (_manager != (ConnectionManager *)NULL) {
00631         _manager->connection_reset(sinfo->_connection);
00632       }
00633       finish_socket(sinfo);
00634       return;
00635     }
00636 
00637     PRInt32 datagram_bytes =
00638       min(bytes_read, (PRInt32)(size - datagram.get_length()));
00639     datagram.append_data(dp, datagram_bytes);
00640 
00641     if (bytes_read > datagram_bytes) {
00642       // There were some extra bytes at the end of the datagram.  Maybe
00643       // the beginning of the next datagram?  Huh.
00644       net_cat.error()
00645         << "Discarding " << bytes_read - datagram_bytes
00646         << " bytes following TCP datagram.\n";
00647     }
00648   }
00649 
00650   // Now that we've read all the data, it's time to finish the socket
00651   // so another thread can read the next datagram.
00652   finish_socket(sinfo);
00653 
00654   if (_shutdown) {
00655     return;
00656   }
00657 
00658   // And now do whatever we need to do to process the datagram.
00659   if (!header.verify_datagram(datagram)) {
00660     net_cat.error()
00661       << "Ignoring invalid TCP datagram.\n";
00662   } else {
00663     datagram.set_connection(sinfo->_connection);
00664     datagram.set_address(NetAddress(addr));
00665     receive_datagram(datagram);
00666   }
00667 }
00668 
00669 ////////////////////////////////////////////////////////////////////
00670 //     Function: ConnectionReader::process_raw_incoming_udp_data
00671 //       Access: Protected
00672 //  Description:
00673 ////////////////////////////////////////////////////////////////////
00674 void ConnectionReader::
00675 process_raw_incoming_udp_data(SocketInfo *sinfo) {
00676   PRFileDesc *socket = sinfo->get_socket();
00677   PRNetAddr addr;
00678 
00679   // Read as many bytes as we can.
00680   PRInt8 buffer[read_buffer_size];
00681   PRInt32 bytes_read;
00682 
00683   bytes_read = PR_RecvFrom(socket, buffer, read_buffer_size, 0,
00684                            &addr, PR_INTERVAL_NO_TIMEOUT);
00685 
00686   if (bytes_read < 0) {
00687     PRErrorCode errcode = PR_GetError();
00688     if (errcode != PR_PENDING_INTERRUPT_ERROR) {
00689       pprerror("PR_RecvFrom");
00690     }
00691     finish_socket(sinfo);
00692     return;
00693 
00694   } else if (bytes_read == 0) {
00695     // The socket was closed (!).  This shouldn't happen with a UDP
00696     // connection.  Oh well.  Report that and return.
00697     if (_manager != (ConnectionManager *)NULL) {
00698       _manager->connection_reset(sinfo->_connection);
00699     }
00700     finish_socket(sinfo);
00701     return;
00702   }
00703 
00704   // In raw mode, we simply extract all the bytes and make that a
00705   // datagram.
00706   NetDatagram datagram(buffer, bytes_read);
00707   
00708   // Now that we've read all the data, it's time to finish the socket
00709   // so another thread can read the next datagram.
00710   finish_socket(sinfo);
00711   
00712   if (_shutdown) {
00713     return;
00714   }
00715   
00716   datagram.set_connection(sinfo->_connection);
00717   datagram.set_address(NetAddress(addr));
00718   receive_datagram(datagram);
00719 }
00720 
00721 ////////////////////////////////////////////////////////////////////
00722 //     Function: ConnectionReader::process_raw_incoming_tcp_data
00723 //       Access: Protected
00724 //  Description:
00725 ////////////////////////////////////////////////////////////////////
00726 void ConnectionReader::
00727 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
00728   PRFileDesc *socket = sinfo->get_socket();
00729   PRNetAddr addr;
00730 
00731   // Read as many bytes as we can.
00732   PRInt8 buffer[read_buffer_size];
00733   PRInt32 bytes_read;
00734 
00735   if (PR_GetSockName(socket, &addr) != PR_SUCCESS) {
00736     pprerror("PR_GetSockName");
00737   }
00738 
00739   bytes_read = PR_Recv(socket, buffer, read_buffer_size, 0,
00740                        PR_INTERVAL_NO_TIMEOUT);
00741 
00742   if (bytes_read < 0) {
00743     PRErrorCode errcode = PR_GetError();
00744     if (errcode != PR_PENDING_INTERRUPT_ERROR) {
00745       pprerror("PR_RecvFrom");
00746     }
00747     finish_socket(sinfo);
00748     return;
00749 
00750   } else if (bytes_read == 0) {
00751     // The socket was closed.  Report that and return.
00752     if (_manager != (ConnectionManager *)NULL) {
00753       _manager->connection_reset(sinfo->_connection);
00754     }
00755     finish_socket(sinfo);
00756     return;
00757   }
00758 
00759   // In raw mode, we simply extract all the bytes and make that a
00760   // datagram.
00761   NetDatagram datagram(buffer, bytes_read);
00762   
00763   // Now that we've read all the data, it's time to finish the socket
00764   // so another thread can read the next datagram.
00765   finish_socket(sinfo);
00766   
00767   if (_shutdown) {
00768     return;
00769   }
00770   
00771   datagram.set_connection(sinfo->_connection);
00772   datagram.set_address(NetAddress(addr));
00773   receive_datagram(datagram);
00774 }
00775 
00776 
00777 ////////////////////////////////////////////////////////////////////
00778 //     Function: ConnectionReader::thread_start
00779 //       Access: Private, Static
00780 //  Description: The static wrapper around the thread's executing
00781 //               function.  This must be a static member function
00782 //               because it is passed through the C interface to
00783 //               PR_CreateThread().
00784 ////////////////////////////////////////////////////////////////////
00785 void ConnectionReader::
00786 thread_start(void *data) {
00787   ((ConnectionReader *)data)->thread_run();
00788 }
00789 
00790 ////////////////////////////////////////////////////////////////////
00791 //     Function: ConnectionReader::thread_run
00792 //       Access: Private
00793 //  Description: This is the actual executing function for each
00794 //               thread.
00795 ////////////////////////////////////////////////////////////////////
00796 void ConnectionReader::
00797 thread_run() {
00798   nassertv(!_polling);
00799 
00800   // First determine our own thread index.
00801   PR_Lock(_startup_mutex);
00802   Threads::const_iterator ti =
00803     find(_threads.begin(), _threads.end(), PR_GetCurrentThread());
00804 
00805   nassertv(ti != _threads.end());
00806   PRInt32 current_thread_index = (ti - _threads.begin());
00807 
00808   nassertv(_threads[current_thread_index] == PR_GetCurrentThread());
00809   PR_Unlock(_startup_mutex);
00810 
00811   while (!_shutdown) {
00812     SocketInfo *sinfo =
00813       get_next_available_socket(PR_INTERVAL_NO_TIMEOUT,
00814                                 current_thread_index);
00815     if (sinfo != (SocketInfo *)NULL) {
00816       process_incoming_data(sinfo);
00817     }
00818   }
00819 }
00820 
00821 
00822 ////////////////////////////////////////////////////////////////////
00823 //     Function: ConnectionReader::get_next_available_socket
00824 //       Access: Private
00825 //  Description: Polls the known connections for activity and returns
00826 //               the next one known to have activity, or NULL if no
00827 //               activity is detected within the timeout interval.
00828 //
00829 //               This function may block indefinitely if it is being
00830 //               called by multiple threads; if there are no other
00831 //               threads, it may block only if timeout !=
00832 //               PR_INTERVAL_NO_WAIT.
00833 ////////////////////////////////////////////////////////////////////
00834 ConnectionReader::SocketInfo *ConnectionReader::
00835 get_next_available_socket(PRIntervalTime timeout,
00836                           PRInt32 current_thread_index) {
00837   // Go to sleep on the select() mutex.  This guarantees that only one
00838   // thread is in this function at a time.
00839   PR_Lock(_select_mutex);
00840 
00841   int num_sockets = _polled_sockets.size();
00842   nassertr(num_sockets == (int)_poll.size(), NULL);
00843 
00844   do {
00845     // First, check the result from the previous PR_Poll() call.  If
00846     // there are any sockets remaining there, process them first.
00847     while (!_shutdown && _num_results > 0) {
00848       nassertr(_next_index < num_sockets, NULL);
00849       int i = _next_index;
00850       _next_index++;
00851 
00852       if (_poll[i].out_flags != 0) {
00853         _num_results--;
00854         SocketInfo *sinfo = _polled_sockets[i];
00855 
00856         if ((_poll[i].out_flags & PR_POLL_READ) != 0) {
00857           // Some genuine noise on the port.
00858           sinfo->_busy = true;
00859           _reexamine_sockets = true;
00860           PR_Unlock(_select_mutex);
00861           PR_Sleep(PR_INTERVAL_NO_WAIT);
00862           return sinfo;
00863 
00864         } else if ((_poll[i].out_flags &
00865                     (PR_POLL_ERR | PR_POLL_NVAL | PR_POLL_HUP)) != 0) {
00866           // Something bad happened to this socket.  Tell the
00867           // ConnectionManager to drop it.
00868           if (_manager != (ConnectionManager *)NULL) {
00869             _manager->connection_reset(sinfo->_connection);
00870           }
00871           sinfo->_error = true;
00872           _reexamine_sockets = true;
00873         }
00874       }
00875     }
00876 
00877     bool interrupted;
00878     do {
00879       interrupted = false;
00880 
00881       // Ok, no results from previous PR_Poll() calls.  Prepare to set
00882       // up for a new poll.
00883 
00884       // First, report to anyone else who cares that we're the thread
00885       // about to do the poll.  That way, if any new sockets come
00886       // available while we're polling, we can service them.
00887       PR_AtomicSet(&_currently_polling_thread, current_thread_index);
00888 
00889       if (_reexamine_sockets) {
00890         _reexamine_sockets = false;
00891         rebuild_poll_list();
00892         num_sockets = _polled_sockets.size();
00893         nassertr(num_sockets == (int)_poll.size(), NULL);
00894       }
00895 
00896       // Now we can execute PR_Poll().  This basically maps to a Unix
00897       // select() call.
00898       _num_results = 0;
00899       _next_index = 0;
00900 
00901       if (!_shutdown) {
00902         PRIntervalTime poll_timeout =
00903           PR_MillisecondsToInterval(max_timeout_ms);
00904         if (timeout != PR_INTERVAL_NO_TIMEOUT) {
00905           poll_timeout = min(timeout, poll_timeout);
00906         }
00907 
00908         _num_results = PR_Poll(&_poll[0], num_sockets, poll_timeout);
00909       }
00910 
00911       if (_num_results == 0 && timeout == PR_INTERVAL_NO_TIMEOUT) {
00912         // If we reached max_timeout_ms, but the caller didn't request
00913         // a timeout, consider that an interrupt: go back and
00914         // reconsider.  (This is a kludge around the fact that
00915         // PR_Poll() appears to be non-interruptible.)
00916         interrupted = true;
00917 
00918       } else if (_num_results < 0) {
00919         // If our poll was interrupted by another thread, rebuild the
00920         // list and poll again.
00921         PRErrorCode errcode = PR_GetError();
00922         if (errcode == PR_PENDING_INTERRUPT_ERROR) {
00923           interrupted = true;
00924         } else {
00925           pprerror("PR_Poll");
00926         }
00927       }
00928     } while (!_shutdown && interrupted);
00929 
00930     PR_AtomicSet(&_currently_polling_thread, -1);
00931     // Just in case someone interrupted us while we were polling and
00932     // we didn't catch it, clear it now--we don't care any more.
00933     PR_ClearInterrupt();
00934 
00935     // Repeat the above until we (a) find a socket with actual noise
00936     // on it, or (b) return from PR_Poll() with no sockets available.
00937   } while (!_shutdown && _num_results > 0);
00938 
00939   PR_Unlock(_select_mutex);
00940   return (SocketInfo *)NULL;
00941 }
00942 
00943 
00944 ////////////////////////////////////////////////////////////////////
00945 //     Function: ConnectionReader::rebuild_poll_list
00946 //       Access: Private
00947 //  Description: Rebuilds the _poll and _polled_sockets arrays based
00948 //               on the sockets that are currently available for
00949 //               polling.
00950 ////////////////////////////////////////////////////////////////////
00951 void ConnectionReader::
00952 rebuild_poll_list() {
00953   _poll.clear();
00954   _polled_sockets.clear();
00955 
00956   PR_Lock(_sockets_mutex);
00957   Sockets::const_iterator si;
00958   for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00959     SocketInfo *sinfo = (*si);
00960     if (!sinfo->_busy && !sinfo->_error) {
00961       PRPollDesc pd;
00962       pd.fd = sinfo->get_socket();
00963       pd.in_flags = PR_POLL_READ;
00964       pd.out_flags = 0;
00965 
00966       _poll.push_back(pd);
00967       _polled_sockets.push_back(sinfo);
00968     }
00969   }
00970 
00971   // This is also a fine time to delete the contents of the
00972   // _removed_sockets list.
00973   if (!_removed_sockets.empty()) {
00974     Sockets still_busy_sockets;
00975     for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
00976       SocketInfo *sinfo = (*si);
00977       if (sinfo->_busy) {
00978         still_busy_sockets.push_back(sinfo);
00979       } else {
00980         delete sinfo;
00981       }
00982     }
00983     _removed_sockets.swap(still_busy_sockets);
00984   }
00985 
00986   PR_Unlock(_sockets_mutex);
00987 }

Generated on Fri May 2 00:40:33 2003 for Panda by doxygen1.3