00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "connectionReader.h"
00020 #include "connectionManager.h"
00021 #include "netDatagram.h"
00022 #include "datagramTCPHeader.h"
00023 #include "datagramUDPHeader.h"
00024 #include "pprerror.h"
00025 #include "config_net.h"
00026
00027 #include <notify.h>
00028 #include <prerror.h>
00029 #include <pratom.h>
00030 #include <algorithm>
00031
00032 static const int read_buffer_size = maximum_udp_datagram + datagram_udp_header_size;
00033
00034
00035
00036 static const PRUint32 max_timeout_ms = 100;
00037
00038
00039
00040
00041
00042
00043 ConnectionReader::SocketInfo::
00044 SocketInfo(const PT(Connection) &connection) :
00045 _connection(connection)
00046 {
00047 _busy = false;
00048 _error = false;
00049 }
00050
00051
00052
00053
00054
00055
00056 bool ConnectionReader::SocketInfo::
00057 is_udp() const {
00058 return (PR_GetDescType(_connection->get_socket()) == PR_DESC_SOCKET_UDP);
00059 }
00060
00061
00062
00063
00064
00065
00066 PRFileDesc *ConnectionReader::SocketInfo::
00067 get_socket() const {
00068 return _connection->get_socket();
00069 }
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080 ConnectionReader::
00081 ConnectionReader(ConnectionManager *manager, int num_threads) :
00082 _manager(manager)
00083 {
00084 _raw_mode = false;
00085 _polling = (num_threads <= 0);
00086
00087 _shutdown = false;
00088 _startup_mutex = PR_NewLock();
00089
00090 _next_index = 0;
00091 _num_results = 0;
00092 _select_mutex = PR_NewLock();
00093
00094 _currently_polling_thread = -1;
00095
00096 _reexamine_sockets = false;
00097 _sockets_mutex = PR_NewLock();
00098
00099
00100
00101
00102 PR_Lock(_startup_mutex);
00103
00104 for (int i = 0; i < num_threads; i++) {
00105 PRThread *thread =
00106 PR_CreateThread(PR_USER_THREAD,
00107 thread_start, (void *)this,
00108 PR_PRIORITY_NORMAL,
00109 PR_GLOBAL_THREAD,
00110 PR_JOINABLE_THREAD,
00111 0);
00112
00113 nassertv(thread != (PRThread *)NULL);
00114 _threads.push_back(thread);
00115 }
00116
00117 PR_Unlock(_startup_mutex);
00118
00119 _manager->add_reader(this);
00120 }
00121
00122
00123
00124
00125
00126
00127 ConnectionReader::
00128 ~ConnectionReader() {
00129 if (_manager != (ConnectionManager *)NULL) {
00130 _manager->remove_reader(this);
00131 }
00132
00133 shutdown();
00134
00135
00136 Sockets::iterator si;
00137 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00138 delete (*si);
00139 }
00140 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
00141 SocketInfo *sinfo = (*si);
00142 if (!sinfo->_busy) {
00143 delete sinfo;
00144 } else {
00145 net_cat.error()
00146 << "Reentrant deletion of ConnectionReader--don't delete these\n"
00147 << "in response to connection_reset().\n";
00148
00149
00150 sinfo->_connection.clear();
00151 }
00152 }
00153
00154 PR_DestroyLock(_startup_mutex);
00155 PR_DestroyLock(_select_mutex);
00156 PR_DestroyLock(_sockets_mutex);
00157 }
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176 bool ConnectionReader::
00177 add_connection(const PT(Connection) &connection) {
00178 nassertr(connection != (Connection *)NULL, false);
00179
00180 PR_Lock(_sockets_mutex);
00181
00182
00183 Sockets::const_iterator si;
00184 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00185 if ((*si)->_connection == connection) {
00186
00187 PR_Unlock(_sockets_mutex);
00188 return false;
00189 }
00190 }
00191
00192 _sockets.push_back(new SocketInfo(connection));
00193 _reexamine_sockets = true;
00194 PR_Unlock(_sockets_mutex);
00195
00196 return true;
00197 }
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210 bool ConnectionReader::
00211 remove_connection(const PT(Connection) &connection) {
00212 PR_Lock(_sockets_mutex);
00213
00214
00215 Sockets::iterator si;
00216 si = _sockets.begin();
00217 while (si != _sockets.end() && (*si)->_connection != connection) {
00218 ++si;
00219 }
00220 if (si == _sockets.end()) {
00221 PR_Unlock(_sockets_mutex);
00222 return false;
00223 }
00224
00225 _removed_sockets.push_back(*si);
00226 _sockets.erase(si);
00227 _reexamine_sockets = true;
00228 PR_Unlock(_sockets_mutex);
00229
00230 return true;
00231 }
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244 bool ConnectionReader::
00245 is_connection_ok(const PT(Connection) &connection) {
00246 PR_Lock(_sockets_mutex);
00247
00248
00249
00250 Sockets::iterator si;
00251 si = _sockets.begin();
00252 while (si != _sockets.end() && (*si)->_connection != connection) {
00253 ++si;
00254 }
00255 if (si == _sockets.end()) {
00256
00257 PR_Unlock(_sockets_mutex);
00258 return false;
00259 }
00260
00261 SocketInfo *sinfo = (*si);
00262 bool is_ok = !sinfo->_error;
00263 PR_Unlock(_sockets_mutex);
00264
00265 return is_ok;
00266 }
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280 void ConnectionReader::
00281 poll() {
00282 if (!_polling) {
00283 return;
00284 }
00285
00286 SocketInfo *sinfo = get_next_available_socket(PR_INTERVAL_NO_WAIT, -2);
00287 if (sinfo != (SocketInfo *)NULL) {
00288 process_incoming_data(sinfo);
00289 }
00290 }
00291
00292
00293
00294
00295
00296
00297
00298 ConnectionManager *ConnectionReader::
00299 get_manager() const {
00300 return _manager;
00301 }
00302
00303
00304
00305
00306
00307
00308
00309 bool ConnectionReader::
00310 is_polling() const {
00311 return _polling;
00312 }
00313
00314
00315
00316
00317
00318
00319
00320 int ConnectionReader::
00321 get_num_threads() const {
00322 return _threads.size();
00323 }
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333 void ConnectionReader::
00334 set_raw_mode(bool mode) {
00335 _raw_mode = mode;
00336 }
00337
00338
00339
00340
00341
00342
00343
00344 bool ConnectionReader::
00345 get_raw_mode() const {
00346 return _raw_mode;
00347 }
00348
00349
00350
00351
00352
00353
00354
00355 void ConnectionReader::
00356 shutdown() {
00357 if (_shutdown) {
00358 return;
00359 }
00360
00361
00362
00363 _shutdown = true;
00364
00365
00366 Threads::iterator ti;
00367 for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
00368
00369 PRStatus result = PR_Interrupt(*ti);
00370 if (result != PR_SUCCESS) {
00371 pprerror("PR_Interrupt");
00372 }
00373
00374 result = PR_JoinThread(*ti);
00375 if (result != PR_SUCCESS) {
00376 pprerror("PR_JoinThread");
00377 }
00378 }
00379 }
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391 void ConnectionReader::
00392 clear_manager() {
00393 _manager = (ConnectionManager *)NULL;
00394 }
00395
00396
00397
00398
00399
00400
00401
00402 void ConnectionReader::
00403 finish_socket(SocketInfo *sinfo) {
00404 nassertv(sinfo->_busy);
00405
00406
00407
00408 sinfo->_busy = false;
00409 _reexamine_sockets = true;
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429 }
00430
00431
00432
00433
00434
00435
00436
00437
00438 void ConnectionReader::
00439 process_incoming_data(SocketInfo *sinfo) {
00440 if (_raw_mode) {
00441 if (sinfo->is_udp()) {
00442 process_raw_incoming_udp_data(sinfo);
00443 } else {
00444 process_raw_incoming_tcp_data(sinfo);
00445 }
00446 } else {
00447 if (sinfo->is_udp()) {
00448 process_incoming_udp_data(sinfo);
00449 } else {
00450 process_incoming_tcp_data(sinfo);
00451 }
00452 }
00453 }
00454
00455
00456
00457
00458
00459
00460 void ConnectionReader::
00461 process_incoming_udp_data(SocketInfo *sinfo) {
00462 PRFileDesc *socket = sinfo->get_socket();
00463 PRNetAddr addr;
00464
00465
00466 PRInt8 buffer[read_buffer_size];
00467 PRInt32 bytes_read;
00468
00469 bytes_read = PR_RecvFrom(socket, buffer, read_buffer_size, 0,
00470 &addr, PR_INTERVAL_NO_TIMEOUT);
00471
00472 if (bytes_read < 0) {
00473 PRErrorCode errcode = PR_GetError();
00474 if (errcode != PR_PENDING_INTERRUPT_ERROR) {
00475 pprerror("PR_RecvFrom");
00476 }
00477 finish_socket(sinfo);
00478 return;
00479
00480 } else if (bytes_read == 0) {
00481
00482
00483 if (_manager != (ConnectionManager *)NULL) {
00484 _manager->connection_reset(sinfo->_connection);
00485 }
00486 finish_socket(sinfo);
00487 return;
00488 }
00489
00490
00491
00492
00493 if (bytes_read < datagram_udp_header_size) {
00494 net_cat.error()
00495 << "Did not read entire header, discarding UDP datagram.\n";
00496 finish_socket(sinfo);
00497 return;
00498 }
00499
00500 DatagramUDPHeader header(buffer);
00501
00502 PRInt8 *dp = buffer + datagram_udp_header_size;
00503 bytes_read -= datagram_udp_header_size;
00504
00505 NetDatagram datagram(dp, bytes_read);
00506
00507
00508
00509 finish_socket(sinfo);
00510
00511 if (_shutdown) {
00512 return;
00513 }
00514
00515
00516 if (!header.verify_datagram(datagram)) {
00517 net_cat.error()
00518 << "Ignoring invalid UDP datagram.\n";
00519 } else {
00520 datagram.set_connection(sinfo->_connection);
00521 datagram.set_address(NetAddress(addr));
00522 receive_datagram(datagram);
00523 }
00524 }
00525
00526
00527
00528
00529
00530
00531 void ConnectionReader::
00532 process_incoming_tcp_data(SocketInfo *sinfo) {
00533 PRFileDesc *socket = sinfo->get_socket();
00534 PRNetAddr addr;
00535
00536
00537 PRInt8 buffer[read_buffer_size];
00538 PRInt32 header_bytes_read = 0;
00539
00540 if (PR_GetSockName(socket, &addr) != PR_SUCCESS) {
00541 pprerror("PR_GetSockName");
00542 }
00543
00544
00545 while (header_bytes_read < datagram_tcp_header_size) {
00546 PRInt32 bytes_read =
00547 PR_Recv(socket, buffer + header_bytes_read,
00548 datagram_tcp_header_size - header_bytes_read, 0,
00549 PR_INTERVAL_NO_TIMEOUT);
00550
00551 if (bytes_read < 0) {
00552 PRErrorCode errcode = PR_GetError();
00553 if (errcode == PR_CONNECT_RESET_ERROR
00554 #ifdef PR_SOCKET_SHUTDOWN_ERROR
00555 || errcode == PR_SOCKET_SHUTDOWN_ERROR
00556 || errcode == PR_CONNECT_ABORTED_ERROR
00557 #endif
00558 ) {
00559
00560 if (_manager != (ConnectionManager *)NULL) {
00561 _manager->connection_reset(sinfo->_connection);
00562 }
00563
00564 } else if (errcode != PR_PENDING_INTERRUPT_ERROR) {
00565 pprerror("PR_Recv");
00566 }
00567 finish_socket(sinfo);
00568 return;
00569
00570 } else if (bytes_read == 0) {
00571
00572 if (_manager != (ConnectionManager *)NULL) {
00573 _manager->connection_reset(sinfo->_connection);
00574 }
00575 finish_socket(sinfo);
00576 return;
00577 }
00578
00579 header_bytes_read += bytes_read;
00580 }
00581
00582
00583
00584 if (header_bytes_read != datagram_tcp_header_size) {
00585
00586
00587 net_cat.error()
00588 << "Did not read entire header, discarding TCP datagram.\n";
00589 finish_socket(sinfo);
00590 return;
00591 }
00592
00593 DatagramTCPHeader header(buffer);
00594 PRInt32 size = header.get_datagram_size();
00595
00596
00597 NetDatagram datagram;
00598
00599 while (!_shutdown && (int)datagram.get_length() < size) {
00600 PRInt32 bytes_read;
00601
00602 bytes_read =
00603 PR_Recv(socket, buffer,
00604 min((PRInt32)read_buffer_size,
00605 (PRInt32)(size - datagram.get_length())),
00606 0, PR_INTERVAL_NO_TIMEOUT);
00607 PRInt8 *dp = buffer;
00608
00609 if (bytes_read < 0) {
00610 PRErrorCode errcode = PR_GetError();
00611 if (errcode == PR_CONNECT_RESET_ERROR
00612 #ifdef PR_SOCKET_SHUTDOWN_ERROR
00613 || errcode == PR_SOCKET_SHUTDOWN_ERROR
00614 || errcode == PR_CONNECT_ABORTED_ERROR
00615 #endif
00616 ) {
00617
00618 if (_manager != (ConnectionManager *)NULL) {
00619 _manager->connection_reset(sinfo->_connection);
00620 }
00621
00622 } else if (errcode != PR_PENDING_INTERRUPT_ERROR) {
00623 pprerror("PR_Recv");
00624 }
00625 finish_socket(sinfo);
00626 return;
00627
00628 } else if (bytes_read == 0) {
00629
00630 if (_manager != (ConnectionManager *)NULL) {
00631 _manager->connection_reset(sinfo->_connection);
00632 }
00633 finish_socket(sinfo);
00634 return;
00635 }
00636
00637 PRInt32 datagram_bytes =
00638 min(bytes_read, (PRInt32)(size - datagram.get_length()));
00639 datagram.append_data(dp, datagram_bytes);
00640
00641 if (bytes_read > datagram_bytes) {
00642
00643
00644 net_cat.error()
00645 << "Discarding " << bytes_read - datagram_bytes
00646 << " bytes following TCP datagram.\n";
00647 }
00648 }
00649
00650
00651
00652 finish_socket(sinfo);
00653
00654 if (_shutdown) {
00655 return;
00656 }
00657
00658
00659 if (!header.verify_datagram(datagram)) {
00660 net_cat.error()
00661 << "Ignoring invalid TCP datagram.\n";
00662 } else {
00663 datagram.set_connection(sinfo->_connection);
00664 datagram.set_address(NetAddress(addr));
00665 receive_datagram(datagram);
00666 }
00667 }
00668
00669
00670
00671
00672
00673
00674 void ConnectionReader::
00675 process_raw_incoming_udp_data(SocketInfo *sinfo) {
00676 PRFileDesc *socket = sinfo->get_socket();
00677 PRNetAddr addr;
00678
00679
00680 PRInt8 buffer[read_buffer_size];
00681 PRInt32 bytes_read;
00682
00683 bytes_read = PR_RecvFrom(socket, buffer, read_buffer_size, 0,
00684 &addr, PR_INTERVAL_NO_TIMEOUT);
00685
00686 if (bytes_read < 0) {
00687 PRErrorCode errcode = PR_GetError();
00688 if (errcode != PR_PENDING_INTERRUPT_ERROR) {
00689 pprerror("PR_RecvFrom");
00690 }
00691 finish_socket(sinfo);
00692 return;
00693
00694 } else if (bytes_read == 0) {
00695
00696
00697 if (_manager != (ConnectionManager *)NULL) {
00698 _manager->connection_reset(sinfo->_connection);
00699 }
00700 finish_socket(sinfo);
00701 return;
00702 }
00703
00704
00705
00706 NetDatagram datagram(buffer, bytes_read);
00707
00708
00709
00710 finish_socket(sinfo);
00711
00712 if (_shutdown) {
00713 return;
00714 }
00715
00716 datagram.set_connection(sinfo->_connection);
00717 datagram.set_address(NetAddress(addr));
00718 receive_datagram(datagram);
00719 }
00720
00721
00722
00723
00724
00725
00726 void ConnectionReader::
00727 process_raw_incoming_tcp_data(SocketInfo *sinfo) {
00728 PRFileDesc *socket = sinfo->get_socket();
00729 PRNetAddr addr;
00730
00731
00732 PRInt8 buffer[read_buffer_size];
00733 PRInt32 bytes_read;
00734
00735 if (PR_GetSockName(socket, &addr) != PR_SUCCESS) {
00736 pprerror("PR_GetSockName");
00737 }
00738
00739 bytes_read = PR_Recv(socket, buffer, read_buffer_size, 0,
00740 PR_INTERVAL_NO_TIMEOUT);
00741
00742 if (bytes_read < 0) {
00743 PRErrorCode errcode = PR_GetError();
00744 if (errcode != PR_PENDING_INTERRUPT_ERROR) {
00745 pprerror("PR_RecvFrom");
00746 }
00747 finish_socket(sinfo);
00748 return;
00749
00750 } else if (bytes_read == 0) {
00751
00752 if (_manager != (ConnectionManager *)NULL) {
00753 _manager->connection_reset(sinfo->_connection);
00754 }
00755 finish_socket(sinfo);
00756 return;
00757 }
00758
00759
00760
00761 NetDatagram datagram(buffer, bytes_read);
00762
00763
00764
00765 finish_socket(sinfo);
00766
00767 if (_shutdown) {
00768 return;
00769 }
00770
00771 datagram.set_connection(sinfo->_connection);
00772 datagram.set_address(NetAddress(addr));
00773 receive_datagram(datagram);
00774 }
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785 void ConnectionReader::
00786 thread_start(void *data) {
00787 ((ConnectionReader *)data)->thread_run();
00788 }
00789
00790
00791
00792
00793
00794
00795
00796 void ConnectionReader::
00797 thread_run() {
00798 nassertv(!_polling);
00799
00800
00801 PR_Lock(_startup_mutex);
00802 Threads::const_iterator ti =
00803 find(_threads.begin(), _threads.end(), PR_GetCurrentThread());
00804
00805 nassertv(ti != _threads.end());
00806 PRInt32 current_thread_index = (ti - _threads.begin());
00807
00808 nassertv(_threads[current_thread_index] == PR_GetCurrentThread());
00809 PR_Unlock(_startup_mutex);
00810
00811 while (!_shutdown) {
00812 SocketInfo *sinfo =
00813 get_next_available_socket(PR_INTERVAL_NO_TIMEOUT,
00814 current_thread_index);
00815 if (sinfo != (SocketInfo *)NULL) {
00816 process_incoming_data(sinfo);
00817 }
00818 }
00819 }
00820
00821
00822
00823
00824
00825
00826
00827
00828
00829
00830
00831
00832
00833
00834 ConnectionReader::SocketInfo *ConnectionReader::
00835 get_next_available_socket(PRIntervalTime timeout,
00836 PRInt32 current_thread_index) {
00837
00838
00839 PR_Lock(_select_mutex);
00840
00841 int num_sockets = _polled_sockets.size();
00842 nassertr(num_sockets == (int)_poll.size(), NULL);
00843
00844 do {
00845
00846
00847 while (!_shutdown && _num_results > 0) {
00848 nassertr(_next_index < num_sockets, NULL);
00849 int i = _next_index;
00850 _next_index++;
00851
00852 if (_poll[i].out_flags != 0) {
00853 _num_results--;
00854 SocketInfo *sinfo = _polled_sockets[i];
00855
00856 if ((_poll[i].out_flags & PR_POLL_READ) != 0) {
00857
00858 sinfo->_busy = true;
00859 _reexamine_sockets = true;
00860 PR_Unlock(_select_mutex);
00861 PR_Sleep(PR_INTERVAL_NO_WAIT);
00862 return sinfo;
00863
00864 } else if ((_poll[i].out_flags &
00865 (PR_POLL_ERR | PR_POLL_NVAL | PR_POLL_HUP)) != 0) {
00866
00867
00868 if (_manager != (ConnectionManager *)NULL) {
00869 _manager->connection_reset(sinfo->_connection);
00870 }
00871 sinfo->_error = true;
00872 _reexamine_sockets = true;
00873 }
00874 }
00875 }
00876
00877 bool interrupted;
00878 do {
00879 interrupted = false;
00880
00881
00882
00883
00884
00885
00886
00887 PR_AtomicSet(&_currently_polling_thread, current_thread_index);
00888
00889 if (_reexamine_sockets) {
00890 _reexamine_sockets = false;
00891 rebuild_poll_list();
00892 num_sockets = _polled_sockets.size();
00893 nassertr(num_sockets == (int)_poll.size(), NULL);
00894 }
00895
00896
00897
00898 _num_results = 0;
00899 _next_index = 0;
00900
00901 if (!_shutdown) {
00902 PRIntervalTime poll_timeout =
00903 PR_MillisecondsToInterval(max_timeout_ms);
00904 if (timeout != PR_INTERVAL_NO_TIMEOUT) {
00905 poll_timeout = min(timeout, poll_timeout);
00906 }
00907
00908 _num_results = PR_Poll(&_poll[0], num_sockets, poll_timeout);
00909 }
00910
00911 if (_num_results == 0 && timeout == PR_INTERVAL_NO_TIMEOUT) {
00912
00913
00914
00915
00916 interrupted = true;
00917
00918 } else if (_num_results < 0) {
00919
00920
00921 PRErrorCode errcode = PR_GetError();
00922 if (errcode == PR_PENDING_INTERRUPT_ERROR) {
00923 interrupted = true;
00924 } else {
00925 pprerror("PR_Poll");
00926 }
00927 }
00928 } while (!_shutdown && interrupted);
00929
00930 PR_AtomicSet(&_currently_polling_thread, -1);
00931
00932
00933 PR_ClearInterrupt();
00934
00935
00936
00937 } while (!_shutdown && _num_results > 0);
00938
00939 PR_Unlock(_select_mutex);
00940 return (SocketInfo *)NULL;
00941 }
00942
00943
00944
00945
00946
00947
00948
00949
00950
00951 void ConnectionReader::
00952 rebuild_poll_list() {
00953 _poll.clear();
00954 _polled_sockets.clear();
00955
00956 PR_Lock(_sockets_mutex);
00957 Sockets::const_iterator si;
00958 for (si = _sockets.begin(); si != _sockets.end(); ++si) {
00959 SocketInfo *sinfo = (*si);
00960 if (!sinfo->_busy && !sinfo->_error) {
00961 PRPollDesc pd;
00962 pd.fd = sinfo->get_socket();
00963 pd.in_flags = PR_POLL_READ;
00964 pd.out_flags = 0;
00965
00966 _poll.push_back(pd);
00967 _polled_sockets.push_back(sinfo);
00968 }
00969 }
00970
00971
00972
00973 if (!_removed_sockets.empty()) {
00974 Sockets still_busy_sockets;
00975 for (si = _removed_sockets.begin(); si != _removed_sockets.end(); ++si) {
00976 SocketInfo *sinfo = (*si);
00977 if (sinfo->_busy) {
00978 still_busy_sockets.push_back(sinfo);
00979 } else {
00980 delete sinfo;
00981 }
00982 }
00983 _removed_sockets.swap(still_busy_sockets);
00984 }
00985
00986 PR_Unlock(_sockets_mutex);
00987 }