00001 // Filename: connectionWriter.cxx 00002 // Created by: drose (08Feb00) 00003 // 00004 //////////////////////////////////////////////////////////////////// 00005 // 00006 // PANDA 3D SOFTWARE 00007 // Copyright (c) 2001, Disney Enterprises, Inc. All rights reserved 00008 // 00009 // All use of this software is subject to the terms of the Panda 3d 00010 // Software license. You should have received a copy of this license 00011 // along with this source code; you will also find a current copy of 00012 // the license at http://www.panda3d.org/license.txt . 00013 // 00014 // To contact the maintainers of this program write to 00015 // panda3d@yahoogroups.com . 00016 // 00017 //////////////////////////////////////////////////////////////////// 00018 00019 #include "connectionWriter.h" 00020 #include "connectionManager.h" 00021 #include "pprerror.h" 00022 #include "config_net.h" 00023 00024 #include <notify.h> 00025 #include <prerror.h> 00026 00027 //////////////////////////////////////////////////////////////////// 00028 // Function: ConnectionWriter::Constructor 00029 // Access: Public 00030 // Description: Creates a new ConnectionWriter with the indicated 00031 // number of threads to handle output. 00032 // 00033 // If num_threads is 0, all datagrams will be sent 00034 // immediately instead of queueing for later 00035 // transmission by a thread. 00036 //////////////////////////////////////////////////////////////////// 00037 ConnectionWriter:: 00038 ConnectionWriter(ConnectionManager *manager, int num_threads) : 00039 _manager(manager) 00040 { 00041 _raw_mode = false; 00042 _immediate = (num_threads <= 0); 00043 00044 for (int i = 0; i < num_threads; i++) { 00045 PRThread *thread = 00046 PR_CreateThread(PR_USER_THREAD, 00047 thread_start, (void *)this, 00048 PR_PRIORITY_NORMAL, 00049 PR_GLOBAL_THREAD, // Since thread will mostly do I/O. 00050 PR_JOINABLE_THREAD, 00051 0); // Select a suitable stack size. 00052 00053 nassertv(thread != (PRThread *)NULL); 00054 _threads.push_back(thread); 00055 } 00056 00057 _manager->add_writer(this); 00058 } 00059 00060 //////////////////////////////////////////////////////////////////// 00061 // Function: ConnectionWriter::Destructor 00062 // Access: Public 00063 // Description: 00064 //////////////////////////////////////////////////////////////////// 00065 ConnectionWriter:: 00066 ~ConnectionWriter() { 00067 if (_manager != (ConnectionManager *)NULL) { 00068 _manager->remove_writer(this); 00069 } 00070 00071 // First, shutdown the queue. This will tell our threads they're 00072 // done. 00073 _queue.shutdown(); 00074 00075 // Now wait for all threads to terminate. 00076 Threads::iterator ti; 00077 for (ti = _threads.begin(); ti != _threads.end(); ++ti) { 00078 // Interrupt the thread just in case it was stuck waiting for I/O. 00079 PRStatus result = PR_Interrupt(*ti); 00080 if (result != PR_SUCCESS) { 00081 pprerror("PR_Interrupt"); 00082 } 00083 00084 result = PR_JoinThread(*ti); 00085 if (result != PR_SUCCESS) { 00086 pprerror("PR_JoinThread"); 00087 } 00088 } 00089 } 00090 00091 00092 //////////////////////////////////////////////////////////////////// 00093 // Function: ConnectionWriter::send 00094 // Access: Public 00095 // Description: Enqueues a datagram for transmittal on the indicated 00096 // socket. Since the host address is not specified with 00097 // this form, this function should only be used for 00098 // sending TCP packets. Use the other send() method for 00099 // sending UDP packets. 00100 // 00101 // Returns true if successful, false if there was an 00102 // error. In the normal, threaded case, this function 00103 // only returns false if the send queue is filled; it's 00104 // impossible to detect a transmission error at this 00105 // point. 00106 //////////////////////////////////////////////////////////////////// 00107 bool ConnectionWriter:: 00108 send(const Datagram &datagram, const PT(Connection) &connection) { 00109 nassertr(connection != (Connection *)NULL, false); 00110 nassertr(PR_GetDescType(connection->get_socket()) == PR_DESC_SOCKET_TCP, false); 00111 00112 NetDatagram copy(datagram); 00113 copy.set_connection(connection); 00114 00115 if (_immediate) { 00116 if (_raw_mode) { 00117 return connection->send_raw_datagram(copy); 00118 } else { 00119 return connection->send_datagram(copy); 00120 } 00121 } else { 00122 return _queue.insert(copy); 00123 } 00124 } 00125 00126 00127 //////////////////////////////////////////////////////////////////// 00128 // Function: ConnectionWriter::send 00129 // Access: Public 00130 // Description: Enqueues a datagram for transmittal on the indicated 00131 // socket. This form of the function allows the 00132 // specification of a destination host address, and so 00133 // is appropriate for UDP packets. Use the other send() 00134 // method for sending TCP packets. 00135 // 00136 // Returns true if successful, false if there was an 00137 // error. In the normal, threaded case, this function 00138 // only returns false if the send queue is filled; it's 00139 // impossible to detect a transmission error at this 00140 // point. 00141 //////////////////////////////////////////////////////////////////// 00142 bool ConnectionWriter:: 00143 send(const Datagram &datagram, const PT(Connection) &connection, 00144 const NetAddress &address) { 00145 nassertr(connection != (Connection *)NULL, false); 00146 nassertr(PR_GetDescType(connection->get_socket()) == PR_DESC_SOCKET_UDP, false); 00147 00148 if (PR_GetDescType(connection->get_socket()) == PR_DESC_SOCKET_UDP && 00149 (int)datagram.get_length() > maximum_udp_datagram) { 00150 net_cat.warning() 00151 << "Attempt to send UDP datagram of " << datagram.get_length() 00152 << " bytes, more than the\n" 00153 << "currently defined maximum of " << maximum_udp_datagram 00154 << " bytes.\n"; 00155 } 00156 00157 NetDatagram copy(datagram); 00158 copy.set_connection(connection); 00159 copy.set_address(address); 00160 00161 if (_immediate) { 00162 if (_raw_mode) { 00163 return connection->send_raw_datagram(copy); 00164 } else { 00165 return connection->send_datagram(copy); 00166 } 00167 } else { 00168 return _queue.insert(copy); 00169 } 00170 } 00171 00172 //////////////////////////////////////////////////////////////////// 00173 // Function: ConnectionWriter::is_valid_for_udp 00174 // Access: Public 00175 // Description: Returns true if the datagram is small enough to be 00176 // sent over a UDP packet, false otherwise. 00177 //////////////////////////////////////////////////////////////////// 00178 bool ConnectionWriter:: 00179 is_valid_for_udp(const Datagram &datagram) const { 00180 return (int)datagram.get_length() <= maximum_udp_datagram; 00181 } 00182 00183 //////////////////////////////////////////////////////////////////// 00184 // Function: ConnectionWriter::get_manager 00185 // Access: Public 00186 // Description: Returns a pointer to the ConnectionManager object 00187 // that serves this ConnectionWriter. 00188 //////////////////////////////////////////////////////////////////// 00189 ConnectionManager *ConnectionWriter:: 00190 get_manager() const { 00191 return _manager; 00192 } 00193 00194 //////////////////////////////////////////////////////////////////// 00195 // Function: ConnectionWriter::is_immediate 00196 // Access: Public 00197 // Description: Returns true if the writer is an immediate writer, 00198 // i.e. it has no threads. 00199 //////////////////////////////////////////////////////////////////// 00200 bool ConnectionWriter:: 00201 is_immediate() const { 00202 return _immediate; 00203 } 00204 00205 //////////////////////////////////////////////////////////////////// 00206 // Function: ConnectionWriter::get_num_threads 00207 // Access: Public 00208 // Description: Returns the number of threads the ConnectionWriter 00209 // has been created with. 00210 //////////////////////////////////////////////////////////////////// 00211 int ConnectionWriter:: 00212 get_num_threads() const { 00213 return _threads.size(); 00214 } 00215 00216 //////////////////////////////////////////////////////////////////// 00217 // Function: ConnectionWriter::set_raw_mode 00218 // Access: Public 00219 // Description: Sets the ConnectionWriter into raw mode (or turns off 00220 // raw mode). In raw mode, datagrams are not sent along 00221 // with their headers; the bytes in the datagram are 00222 // simply sent down the pipe. 00223 // 00224 // Setting the ConnectionWriter to raw mode must be done 00225 // with care. This can only be done when the matching 00226 // ConnectionReader is also set to raw mode, or when the 00227 // ConnectionWriter is communicating to a process that 00228 // does not expect datagrams. 00229 //////////////////////////////////////////////////////////////////// 00230 void ConnectionWriter:: 00231 set_raw_mode(bool mode) { 00232 _raw_mode = mode; 00233 } 00234 00235 //////////////////////////////////////////////////////////////////// 00236 // Function: ConnectionWriter::get_raw_mode 00237 // Access: Public 00238 // Description: Returns the current setting of the raw mode flag. 00239 // See set_raw_mode(). 00240 //////////////////////////////////////////////////////////////////// 00241 bool ConnectionWriter:: 00242 get_raw_mode() const { 00243 return _raw_mode; 00244 } 00245 00246 //////////////////////////////////////////////////////////////////// 00247 // Function: ConnectionWriter::clear_manager 00248 // Access: Protected 00249 // Description: This should normally only be called when the 00250 // associated ConnectionManager destructs. It resets 00251 // the ConnectionManager pointer to NULL so we don't 00252 // have a floating pointer. This makes the 00253 // ConnectionWriter invalid; presumably it also will be 00254 // destructed momentarily. 00255 //////////////////////////////////////////////////////////////////// 00256 void ConnectionWriter:: 00257 clear_manager() { 00258 _manager = (ConnectionManager *)NULL; 00259 } 00260 00261 //////////////////////////////////////////////////////////////////// 00262 // Function: ConnectionWriter::thread_start 00263 // Access: Private, Static 00264 // Description: The static wrapper around the thread's executing 00265 // function. This must be a static member function 00266 // because it is passed through the C interface to 00267 // PR_CreateThread(). 00268 //////////////////////////////////////////////////////////////////// 00269 void ConnectionWriter:: 00270 thread_start(void *data) { 00271 ((ConnectionWriter *)data)->thread_run(); 00272 } 00273 00274 //////////////////////////////////////////////////////////////////// 00275 // Function: ConnectionWriter::thread_run 00276 // Access: Private 00277 // Description: This is the actual executing function for each 00278 // thread. 00279 //////////////////////////////////////////////////////////////////// 00280 void ConnectionWriter:: 00281 thread_run() { 00282 nassertv(!_immediate); 00283 00284 NetDatagram datagram; 00285 while (_queue.extract(datagram)) { 00286 if (_raw_mode) { 00287 datagram.get_connection()->send_raw_datagram(datagram); 00288 } else { 00289 datagram.get_connection()->send_datagram(datagram); 00290 } 00291 } 00292 }