00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "connection.h"
00020 #include "connectionManager.h"
00021 #include "netDatagram.h"
00022 #include "datagramTCPHeader.h"
00023 #include "datagramUDPHeader.h"
00024 #include "pprerror.h"
00025 #include "config_net.h"
00026 #include "config_express.h"
00027 #include "clockObject.h"
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 Connection::
00039 Connection(ConnectionManager *manager, PRFileDesc *socket) :
00040 _manager(manager),
00041 _socket(socket)
00042 {
00043 _write_mutex = PR_NewLock();
00044 _collect_tcp = collect_tcp;
00045 _collect_tcp_interval = collect_tcp_interval;
00046 _queued_data_start = 0.0;
00047 _queued_count = 0;
00048 }
00049
00050
00051
00052
00053
00054
00055 Connection::
00056 ~Connection() {
00057 net_cat.info()
00058 << "Deleting connection " << (void *)this << "\n";
00059
00060 if (_socket != (PRFileDesc *)NULL) {
00061 flush();
00062
00063 PRStatus result = PR_Close(_socket);
00064 if (result != PR_SUCCESS) {
00065 pprerror("PR_Close");
00066 }
00067 }
00068
00069 PR_DestroyLock(_write_mutex);
00070 }
00071
00072
00073
00074
00075
00076
00077
00078 NetAddress Connection::
00079 get_address() const {
00080 PRNetAddr addr;
00081 if (PR_GetSockName(_socket, &addr) != PR_SUCCESS) {
00082 pprerror("PR_GetSockName");
00083 }
00084
00085 return NetAddress(addr);
00086 }
00087
00088
00089
00090
00091
00092
00093
00094 ConnectionManager *Connection::
00095 get_manager() const {
00096 return _manager;
00097 }
00098
00099
00100
00101
00102
00103
00104
00105 PRFileDesc *Connection::
00106 get_socket() const {
00107 return _socket;
00108 }
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131 void Connection::
00132 set_collect_tcp(bool collect_tcp) {
00133 _collect_tcp = collect_tcp;
00134 }
00135
00136
00137
00138
00139
00140
00141
00142 bool Connection::
00143 get_collect_tcp() const {
00144 return _collect_tcp;
00145 }
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156 void Connection::
00157 set_collect_tcp_interval(double interval) {
00158 _collect_tcp_interval = interval;
00159 }
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170 double Connection::
00171 get_collect_tcp_interval() const {
00172 return _collect_tcp_interval;
00173 }
00174
00175
00176
00177
00178
00179
00180
00181
00182 bool Connection::
00183 consider_flush() {
00184 PR_Lock(_write_mutex);
00185
00186 if (!_collect_tcp ||
00187 ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) {
00188 return do_flush();
00189 }
00190
00191 PR_Unlock(_write_mutex);
00192 return true;
00193 }
00194
00195
00196
00197
00198
00199
00200
00201
00202 bool Connection::
00203 flush() {
00204 PR_Lock(_write_mutex);
00205 return do_flush();
00206 }
00207
00208
00209
00210
00211
00212
00213 void Connection::
00214 set_nonblock(bool flag) {
00215 PRSocketOptionData data;
00216 data.option = PR_SockOpt_Nonblocking;
00217 data.value.non_blocking = flag;
00218 PR_SetSocketOption(_socket, &data);
00219 }
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233 void Connection::
00234 set_linger(bool flag, double time) {
00235 PRSocketOptionData data;
00236 data.option = PR_SockOpt_Linger;
00237 data.value.linger.polarity = flag;
00238 data.value.linger.linger = PRIntervalTime(time * PR_INTERVAL_MIN);
00239 PR_SetSocketOption(_socket, &data);
00240 }
00241
00242
00243
00244
00245
00246
00247 void Connection::
00248 set_reuse_addr(bool flag) {
00249 PRSocketOptionData data;
00250 data.option = PR_SockOpt_Reuseaddr;
00251 data.value.reuse_addr = flag;
00252 PR_SetSocketOption(_socket, &data);
00253 }
00254
00255
00256
00257
00258
00259
00260
00261 void Connection::
00262 set_keep_alive(bool flag) {
00263 PRSocketOptionData data;
00264 data.option = PR_SockOpt_Keepalive;
00265 data.value.keep_alive = flag;
00266 PR_SetSocketOption(_socket, &data);
00267 }
00268
00269
00270
00271
00272
00273
00274 void Connection::
00275 set_recv_buffer_size(int size) {
00276 PRSocketOptionData data;
00277 data.option = PR_SockOpt_RecvBufferSize;
00278 data.value.recv_buffer_size = size;
00279 PR_SetSocketOption(_socket, &data);
00280 }
00281
00282
00283
00284
00285
00286
00287 void Connection::
00288 set_send_buffer_size(int size) {
00289 PRSocketOptionData data;
00290 data.option = PR_SockOpt_SendBufferSize;
00291 data.value.send_buffer_size = size;
00292 PR_SetSocketOption(_socket, &data);
00293 }
00294
00295
00296
00297
00298
00299
00300 void Connection::
00301 set_ip_time_to_live(int ttl) {
00302 PRSocketOptionData data;
00303 data.option = PR_SockOpt_IpTimeToLive;
00304 data.value.ip_ttl = ttl;
00305 PR_SetSocketOption(_socket, &data);
00306 }
00307
00308
00309
00310
00311
00312
00313 void Connection::
00314 set_ip_type_of_service(int tos) {
00315 PRSocketOptionData data;
00316 data.option = PR_SockOpt_IpTypeOfService;
00317 data.value.tos = tos;
00318 PR_SetSocketOption(_socket, &data);
00319 }
00320
00321
00322
00323
00324
00325
00326
00327 void Connection::
00328 set_no_delay(bool flag) {
00329 PRSocketOptionData data;
00330 data.option = PR_SockOpt_NoDelay;
00331 data.value.no_delay = flag;
00332 PR_SetSocketOption(_socket, &data);
00333 }
00334
00335
00336
00337
00338
00339
00340 void Connection::
00341 set_max_segment(int size) {
00342 PRSocketOptionData data;
00343 data.option = PR_SockOpt_MaxSegment;
00344 data.value.max_segment = size;
00345 PR_SetSocketOption(_socket, &data);
00346 }
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357 bool Connection::
00358 send_datagram(const NetDatagram &datagram) {
00359 nassertr(_socket != (PRFileDesc *)NULL, false);
00360
00361 if (PR_GetDescType(_socket) == PR_DESC_SOCKET_UDP) {
00362
00363 PR_Lock(_write_mutex);
00364 DatagramUDPHeader header(datagram);
00365 string data;
00366 data += header.get_header();
00367 data += datagram.get_message();
00368
00369 PRInt32 bytes_to_send = data.length();
00370 PRInt32 result;
00371 result = PR_SendTo(_socket,
00372 data.data(), bytes_to_send,
00373 0,
00374 datagram.get_address().get_addr(),
00375 PR_INTERVAL_NO_TIMEOUT);
00376 PRErrorCode errcode = PR_GetError();
00377
00378 if (net_cat.is_debug()) {
00379 header.verify_datagram(datagram);
00380 }
00381
00382 PR_Unlock(_write_mutex);
00383 return check_send_error(result, errcode, bytes_to_send);
00384 }
00385
00386
00387 DatagramTCPHeader header(datagram);
00388
00389 PR_Lock(_write_mutex);
00390 _queued_data += header.get_header();
00391 _queued_data += datagram.get_message();
00392 _queued_count++;
00393
00394 if (net_cat.is_debug()) {
00395 header.verify_datagram(datagram);
00396 }
00397
00398 if (!_collect_tcp ||
00399 ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) {
00400 return do_flush();
00401 }
00402
00403 PR_Unlock(_write_mutex);
00404 return true;
00405 }
00406
00407
00408
00409
00410
00411
00412
00413
00414 bool Connection::
00415 send_raw_datagram(const NetDatagram &datagram) {
00416 nassertr(_socket != (PRFileDesc *)NULL, false);
00417
00418 if (PR_GetDescType(_socket) == PR_DESC_SOCKET_UDP) {
00419
00420
00421 string data = datagram.get_message();
00422 PRInt32 bytes_to_send = data.length();
00423
00424 if (net_cat.is_spam()) {
00425 net_cat.spam()
00426 << "Sending UDP datagram with "
00427 << bytes_to_send << " bytes to " << (void *)this << "\n";
00428 }
00429
00430 PR_Lock(_write_mutex);
00431 PRInt32 result;
00432 result = PR_SendTo(_socket,
00433 data.data(), bytes_to_send,
00434 0,
00435 datagram.get_address().get_addr(),
00436 PR_INTERVAL_NO_TIMEOUT);
00437 PRErrorCode errcode = PR_GetError();
00438
00439 PR_Unlock(_write_mutex);
00440 return check_send_error(result, errcode, bytes_to_send);
00441 }
00442
00443
00444
00445 PR_Lock(_write_mutex);
00446 _queued_data += datagram.get_message();
00447 _queued_count++;
00448
00449 if (!_collect_tcp ||
00450 ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) {
00451 return do_flush();
00452 }
00453
00454 PR_Unlock(_write_mutex);
00455 return true;
00456 }
00457
00458
00459
00460
00461
00462
00463
00464
00465 bool Connection::
00466 do_flush() {
00467 PRInt32 bytes_to_send = _queued_data.length();
00468 if (bytes_to_send == 0) {
00469 _queued_count = 0;
00470 _queued_data_start = ClockObject::get_global_clock()->get_real_time();
00471 PR_Unlock(_write_mutex);
00472 return true;
00473 }
00474
00475 if (net_cat.is_spam()) {
00476 net_cat.spam()
00477 << "Sending " << _queued_count << " TCP datagram(s) with "
00478 << bytes_to_send << " total bytes to " << (void *)this << "\n";
00479 }
00480
00481 PRInt32 result;
00482 result = PR_Send(_socket,
00483 _queued_data.data(), bytes_to_send,
00484 0,
00485 PR_INTERVAL_NO_TIMEOUT);
00486 PRErrorCode errcode = PR_GetError();
00487
00488 _queued_data = string();
00489 _queued_count = 0;
00490 _queued_data_start = ClockObject::get_global_clock()->get_real_time();
00491
00492 PR_Unlock(_write_mutex);
00493
00494 return check_send_error(result, errcode, bytes_to_send);
00495 }
00496
00497
00498
00499
00500
00501
00502
00503 bool Connection::
00504 check_send_error(PRInt32 result, PRErrorCode errcode, PRInt32 bytes_to_send) {
00505 if (result < 0) {
00506 if (errcode == PR_CONNECT_RESET_ERROR
00507 #ifdef PR_SOCKET_SHUTDOWN_ERROR
00508 || errcode == PR_SOCKET_SHUTDOWN_ERROR
00509 || errcode == PR_CONNECT_ABORTED_ERROR
00510 #endif
00511 ) {
00512
00513
00514 if (_manager != (ConnectionManager *)NULL) {
00515 _manager->connection_reset(this);
00516 }
00517
00518 } else if (errcode != PR_PENDING_INTERRUPT_ERROR) {
00519 pprerror("PR_SendTo");
00520 }
00521
00522 return false;
00523
00524 } else if (result != bytes_to_send) {
00525 net_cat.error() << "Not enough bytes sent to socket.\n";
00526 return false;
00527 }
00528
00529 return true;
00530 }