00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "pStatReader.h"
00020 #include "pStatServer.h"
00021 #include "pStatMonitor.h"
00022
00023 #include <pStatClientControlMessage.h>
00024 #include <pStatServerControlMessage.h>
00025 #include <pStatFrameData.h>
00026 #include <pStatProperties.h>
00027 #include <datagram.h>
00028 #include <datagramIterator.h>
00029 #include <connectionManager.h>
00030
00031
00032
00033
00034
00035
00036 PStatReader::
00037 PStatReader(PStatServer *manager, PStatMonitor *monitor) :
00038 ConnectionReader(manager, monitor->is_thread_safe() ? 1 : 0),
00039 _manager(manager),
00040 _monitor(monitor),
00041 _writer(manager, 0)
00042 {
00043 _udp_port = 0;
00044 _client_data = new PStatClientData(this);
00045 _monitor->set_client_data(_client_data);
00046 }
00047
00048
00049
00050
00051
00052
00053 PStatReader::
00054 ~PStatReader() {
00055 _manager->release_udp_port(_udp_port);
00056 }
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066 void PStatReader::
00067 close() {
00068 _manager->remove_reader(_tcp_connection, this);
00069 lost_connection();
00070 }
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080 void PStatReader::
00081 set_tcp_connection(Connection *tcp_connection) {
00082 _tcp_connection = tcp_connection;
00083 add_connection(_tcp_connection);
00084
00085 _udp_port = _manager->get_udp_port();
00086 _udp_connection = _manager->open_UDP_connection(_udp_port);
00087 while (_udp_connection.is_null()) {
00088
00089 _udp_port = _manager->get_udp_port();
00090 _udp_connection = _manager->open_UDP_connection(_udp_port);
00091 }
00092
00093 add_connection(_udp_connection);
00094
00095 send_hello();
00096 }
00097
00098
00099
00100
00101
00102
00103
00104
00105 void PStatReader::
00106 lost_connection() {
00107 _client_data->_is_alive = false;
00108 _monitor->lost_connection();
00109 _client_data.clear();
00110
00111 _manager->close_connection(_tcp_connection);
00112 _manager->close_connection(_udp_connection);
00113 _tcp_connection.clear();
00114 _udp_connection.clear();
00115 }
00116
00117
00118
00119
00120
00121
00122
00123 void PStatReader::
00124 idle() {
00125 dequeue_frame_data();
00126 _monitor->idle();
00127 }
00128
00129
00130
00131
00132
00133
00134 string PStatReader::
00135 get_hostname() {
00136 if (_hostname.empty()) {
00137 _hostname = ConnectionManager::get_host_name();
00138 if (_hostname.empty()) {
00139 _hostname = "unknown";
00140 }
00141 }
00142 return _hostname;
00143 }
00144
00145
00146
00147
00148
00149
00150 void PStatReader::
00151 send_hello() {
00152 PStatServerControlMessage message;
00153 message._type = PStatServerControlMessage::T_hello;
00154 message._server_hostname = get_hostname();
00155 message._server_progname = _monitor->get_monitor_name();
00156 message._udp_port = _udp_port;
00157
00158 Datagram datagram;
00159 message.encode(datagram);
00160 _writer.send(datagram, _tcp_connection);
00161 }
00162
00163
00164
00165
00166
00167
00168
00169 void PStatReader::
00170 receive_datagram(const NetDatagram &datagram) {
00171 Connection *connection = datagram.get_connection();
00172
00173 if (connection == _tcp_connection) {
00174 PStatClientControlMessage message;
00175 if (message.decode(datagram, _client_data)) {
00176 handle_client_control_message(message);
00177
00178 } else if (message._type == PStatClientControlMessage::T_datagram) {
00179 handle_client_udp_data(datagram);
00180
00181 } else {
00182 nout << "Got unexpected message from client.\n";
00183 }
00184
00185 } else if (connection == _udp_connection) {
00186 handle_client_udp_data(datagram);
00187
00188 } else {
00189 nout << "Got datagram from unexpected socket.\n";
00190 }
00191 }
00192
00193
00194
00195
00196
00197
00198
00199 void PStatReader::
00200 handle_client_control_message(const PStatClientControlMessage &message) {
00201 switch (message._type) {
00202 case PStatClientControlMessage::T_hello:
00203 {
00204 _client_data->set_version(message._major_version, message._minor_version);
00205 int server_major_version = get_current_pstat_major_version();
00206 int server_minor_version = get_current_pstat_minor_version();
00207
00208 if (message._major_version != server_major_version ||
00209 (message._major_version == server_major_version &&
00210 message._minor_version > server_minor_version)) {
00211 _monitor->bad_version(message._client_hostname, message._client_progname,
00212 message._major_version, message._minor_version,
00213 server_major_version, server_minor_version);
00214 _monitor->close();
00215 } else {
00216 _monitor->hello_from(message._client_hostname, message._client_progname);
00217 }
00218 }
00219 break;
00220
00221 case PStatClientControlMessage::T_define_collectors:
00222 {
00223 for (int i = 0; i < (int)message._collectors.size(); i++) {
00224 _client_data->add_collector(message._collectors[i]);
00225 _monitor->new_collector(message._collectors[i]->_index);
00226 }
00227 }
00228 break;
00229
00230 case PStatClientControlMessage::T_define_threads:
00231 {
00232 for (int i = 0; i < (int)message._names.size(); i++) {
00233 int thread_index = message._first_thread_index + i;
00234 string name = message._names[i];
00235 _client_data->define_thread(thread_index, name);
00236 _monitor->new_thread(thread_index);
00237 }
00238 }
00239 break;
00240
00241 default:
00242 nout << "Invalid control message received from client.\n";
00243 }
00244 }
00245
00246
00247
00248
00249
00250
00251
00252
00253 void PStatReader::
00254 handle_client_udp_data(const Datagram &datagram) {
00255 if (!_monitor->is_client_known()) {
00256
00257
00258
00259
00260 return;
00261 }
00262
00263 DatagramIterator source(datagram);
00264
00265 if (_client_data->is_at_least(2, 1)) {
00266
00267 int initial_byte = source.get_uint8();
00268 nassertv(initial_byte == 0);
00269 }
00270
00271 if (!_queued_frame_data.full()) {
00272 FrameData data;
00273 data._thread_index = source.get_uint16();
00274 data._frame_number = source.get_uint32();
00275 data._frame_data = new PStatFrameData;
00276 data._frame_data->read_datagram(source, _client_data);
00277
00278
00279
00280 _queued_frame_data.push_back(data);
00281 }
00282 }
00283
00284
00285
00286
00287
00288
00289
00290
00291 void PStatReader::
00292 dequeue_frame_data() {
00293 while (!_queued_frame_data.empty()) {
00294 const FrameData &data = _queued_frame_data.front();
00295
00296
00297 int num_levels = data._frame_data->get_num_levels();
00298 for (int i = 0; i < num_levels; i++) {
00299 int collector_index = data._frame_data->get_level_collector(i);
00300 if (!_client_data->get_collector_has_level(collector_index)) {
00301
00302
00303 _client_data->set_collector_has_level(collector_index, true);
00304 _monitor->new_collector(collector_index);
00305 }
00306 }
00307
00308 _client_data->record_new_frame(data._thread_index,
00309 data._frame_number,
00310 data._frame_data);
00311 _monitor->new_data(data._thread_index, data._frame_number);
00312
00313 _queued_frame_data.pop_front();
00314 }
00315 }
00316