PAASS
Software suite to Acquire and Analyze Data from Pixie16
listener.cpp
Go to the documentation of this file.
1 
12 #include <iostream>
13 #include <string.h>
14 #include <chrono>
15 #include <cmath>
16 
17 #include "poll2_socket.h"
18 
19 typedef std::chrono::high_resolution_clock hr_clock;
20 typedef std::chrono::high_resolution_clock::time_point hr_time;
21 
23 int order(double input_){
24  if(input_ < 0.0){ input_ *= -1; }
25  if(input_ >= 1.0){
26  for(int i = 0; i <= 100; i++){
27  if(input_/std::pow(10.0, (double)i) < 1.0){
28  return i-1;
29  }
30  }
31  }
32  else{
33  for(int i = -100; i <= 1; i++){
34  if(input_/std::pow(10.0, (double)i) < 1.0){
35  return i-1;
36  }
37  }
38  }
39  return 999;
40 }
41 
42 int main(){
43  char buffer[1024];
44  Server poll_server;
45 
46  size_t total_size;
47  int spillID, buffSize;
48  int end_packet_flag;
49  std::streampos file_size;
50  std::streampos new_size;
51 
52  hr_time clock1;
53  hr_time clock2;
54  double dT;
55  std::chrono::duration<double> time_span;
56 
57  size_t size_of_int; // The first byte is always the size of an integer on the sending machine
58  size_t size_of_spos; // The second byte is always the size of a std::streampos type on the sending machine
59 
60  std::cout << " Size of integer on this machine: " << sizeof(int) << " bytes\n";
61  std::cout << " Size of streampos on this machine: " << sizeof(std::streampos) << " bytes\n";
62 
63  size_t min_pack_size = 2 + 2*sizeof(int);
64  std::cout << " Minimum packet size: " << min_pack_size << " bytes\n\n";
65 
66  bool first_packet = true;
67  if(poll_server.Init(5555)){
68  while(true){
69  int recv_bytes = poll_server.RecvMessage(buffer, 1024);
70 
71  if(strcmp(buffer, "$CLOSE_FILE") == 0){
72  std::cout << " Received CLOSE_FILE flag...\n\n";
73  first_packet = true;
74  continue;
75  }
76  else if(strcmp(buffer, "$OPEN_FILE") == 0){
77  std::cout << " Received OPEN_FILE flag...\n\n";
78  first_packet = true;
79  continue;
80  }
81  else if(strcmp(buffer, "$KILL_SOCKET") == 0){
82  std::cout << " Received KILL_SOCKET flag...\n\n";
83  break;
84  }
85 
86  // decode the packet
87  if(first_packet){
88  size_of_int = (size_t)buffer[0];
89  size_of_spos = (size_t)buffer[1];
90 
91  if(size_of_int != sizeof(int) || size_of_spos != sizeof(std::streampos)){
92  std::cout << " Warning! basic type size on remote machine does not match local size\n";
93  std::cout << " Size of integer on remote machine: " << size_of_int << " bytes\n";
94  std::cout << " Size of streampos on remote machine: " << size_of_spos << " bytes\n\n";
95  break;
96  }
97  }
98  else{
99  clock2 = hr_clock::now();
100  time_span = std::chrono::duration_cast<std::chrono::duration<double> >(clock2 - clock1); // Time between packets in seconds
101  if(time_span.count() < 2.0){ continue; }
102  }
103 
104  // The third byte is the start of an integer specifying the total length of the packet
105  // (in bytes) including itself and the first two size bytes.
106  memcpy((char *)&total_size, &buffer[2], size_of_int);
107 
108  if(total_size <= min_pack_size){
109  // Below is the buffer packet structure
110  // ------------------------------------
111  // 1 byte size of integer (may not be the same on a different machine)
112  // 1 byte size of streampos (may not be the same on a different machine)
113  // 4 byte packet length (inclusive, also includes the end packet flag)
114  // 4 byte begin packet flag (0xFFFFFFFF)
115  std::cout << " Null packet...\n\n";
116  }
117  else{
118  // Below is the buffer packet structure
119  // ------------------------------------
120  // 1 byte size of integer (may not be the same on a different machine)
121  // 1 byte size of streampos (may not be the same on a different machine)
122  // 4 byte packet length (inclusive, also includes the end packet flag)
123  // x byte file path (no size limit)
124  // 8 byte file size streampos (long long)
125  // 4 byte spill number ID (unsigned int)
126  // 4 byte buffer size (unsigned int)
127  // 4 byte end packet flag (0xFFFFFFFF)
128  size_t fname_size = total_size - (2 + 4*size_of_int) - size_of_spos; // Size of the filename (in bytes)
129  char *fname = new char[fname_size+1];
130 
131  unsigned int index = 2 + size_of_int;
132  memcpy(fname, (char *)&buffer[index], fname_size); index += fname_size; // Copy the file name
133  memcpy((char *)&new_size, (char *)&buffer[index], size_of_spos); index += size_of_spos; // Copy the file size
134  memcpy((char *)&spillID, (char *)&buffer[index], size_of_int); index += size_of_int; // Copy the spill ID
135  memcpy((char *)&buffSize, (char *)&buffer[index], size_of_int); index += size_of_int; // Copy the buffer size
136  memcpy((char *)&end_packet_flag, (char *)&buffer[index], size_of_int); // Copy the end packet flag
137  fname[fname_size] = '\0'; // Terminate the filename string
138 
139  system("clear");
140 
141  std::cout << " recv: " << recv_bytes << " bytes\n";
142  std::cout << " Packet length: " << total_size << " bytes\n";
143  std::cout << " Poll2 filename: " << fname << "\n";
144 
145  int magnitude = order(new_size);
146  if(magnitude < 3){ std::cout << " Total file size: " << new_size << " B\n"; } // B
147  else if(magnitude >= 3 && magnitude < 6){ std::cout << " Total file size: " << new_size/1E3 << " kB\n"; } // kB
148  else if(magnitude >= 6 && magnitude < 9){ std::cout << " Total file size: " << new_size/1E6 << " MB\n"; } // MB
149  else{ std::cout << " Total file size: " << new_size/1E9 << " GB\n"; } // GB
150 
151  std::cout << " Spill number ID: " << spillID << "\n";
152  std::cout << " Buffer size: " << buffSize << " words\n";
153  std::cout << " End packet: " << end_packet_flag << "\n";
154 
155  if(!first_packet){
156  dT = time_span.count();
157  double rate = ((double)(new_size - file_size))/dT;
158 
159  magnitude = order(dT);
160  if(magnitude > -3){ std::cout << " Time diff: " << dT << " s\n"; } // s
161  else if(magnitude <= -3 && magnitude > -6){ std::cout << " Time diff: " << dT/1E-3 << " ms\n"; } // ms
162  else if(magnitude <= -6 && magnitude > -9){ std::cout << " Time diff: " << dT/1E-6 << " us\n"; } // us
163  else{ std::cout << " Time diff: " << dT/1E-9 << " ns\n"; } // ns
164 
165  magnitude = order(rate);
166  if(magnitude < 3){ std::cout << " Data rate: " << rate << " B/s\n"; } // B/s
167  else if(magnitude >= 3 && magnitude < 6){ std::cout << " Data rate: " << rate/1E3 << " kB/s\n"; } // kB/s
168  else if(magnitude >= 6 && magnitude < 9){ std::cout << " Data rate: " << rate/1E6 << " MB/s\n"; } // MB/s
169  else{ std::cout << " Data rate: " << rate/1E9 << " GB/s\n"; } // GB/s
170  }
171  else{ first_packet = false; }
172  std::cout << std::endl;
173 
174  file_size = new_size;
175  delete[] fname;
176  }
177  clock1 = std::chrono::high_resolution_clock::now();
178  }
179  }
180  else{ return 1; }
181  poll_server.Close();
182 
183  return 0;
184 }
void Close()
Close the socket.
int order(double input_)
Find the order of magnitude of an input double.
Definition: listener.cpp:23
Provides network connectivity for poll2.
int RecvMessage(char *message_, size_t length_)
int main()
Definition: listener.cpp:42
std::chrono::high_resolution_clock hr_clock
Definition: listener.cpp:19
bool Init(int port_, int sec_=10, int usec_=0)
Initialize the serv object and open a specified port. Returns false if the socket fails to open or th...
std::chrono::high_resolution_clock::time_point hr_time
Definition: listener.cpp:20