comparison main.c @ 20:29b273cf3b1f

Benchmark that tests msg loss
author Merten Sach <msach@mailbox.tu-berlin.de>
date Tue, 13 Mar 2012 12:25:48 +0100
parents fdc2f264f3d6
children 08b37152b48d
comparison
equal deleted inserted replaced
3:cf8ba4d052e6 4:c24d544a2a0a
9 #include <errno.h> 9 #include <errno.h>
10 #include <pthread.h> 10 #include <pthread.h>
11 #include <unistd.h> 11 #include <unistd.h>
12 #include "VMS_Implementations/Vthread_impl/VPThread.h" 12 #include "VMS_Implementations/Vthread_impl/VPThread.h"
13 #include "C_Libraries/Queue_impl/PrivateQueue.h" 13 #include "C_Libraries/Queue_impl/PrivateQueue.h"
14 #include "C_Libraries/DynArray/DynArray.h"
15 #include "C_Libraries/BestEffortMessaging/LossyCom.h"
14 16
15 #include <linux/perf_event.h> 17 #include <linux/perf_event.h>
16 #include <linux/prctl.h> 18 #include <linux/prctl.h>
17 #include <sys/syscall.h> 19 #include <sys/syscall.h>
18 20
19 #undef DEBUG 21 #undef DEBUG
20 //#define DEBUG 22 //#define DEBUG
21 23
22 #define MEASURE_PERF 24 //#define MEASURE_PERF
23 25
24 #if !defined(unix) && !defined(__unix__) 26 #if !defined(unix) && !defined(__unix__)
25 #ifdef __MACH__ 27 #ifdef __MACH__
26 #define unix 1 28 #define unix 1
27 #define __unix__ 1 29 #define __unix__ 1
68 uint64 cycles; 70 uint64 cycles;
69 uint64 instructions; 71 uint64 instructions;
70 }; 72 };
71 73
72 const char *usage = { 74 const char *usage = {
73 "Usage: malloc_test [options]\n" 75 "Usage: msg_passing_test [options]\n"
74 " Spwans a number of threads and allocates memory.\n\n" 76 " Starts threads equal to the number of cores and sends\n"
77 " messages to random receivers\n\n"
75 "Options:\n" 78 "Options:\n"
76 " -t <num> how many threads to use (default: 1). This is internaly multiplied by the number of cores.\n" 79 " -n <num> This specifies the number of sends done by each thread.\n"
77 " -o <num> repeat workload and sync operation <m> times\n"
78 " -i <num> size of workload, repeat <n> times\n"
79 " -h this help screen\n\n" 80 " -h this help screen\n\n"
80 }; 81 };
82
83 /***************************
84 * Barrier Implementation
85 ***************************/
81 86
82 struct barrier_t 87 struct barrier_t
83 { 88 {
84 int counter; 89 int counter;
85 int nthreads; 90 int nthreads;
120 } 125 }
121 VPThread__mutex_unlock(barr->mutex, animatingPr); 126 VPThread__mutex_unlock(barr->mutex, animatingPr);
122 } 127 }
123 128
124 129
125 130 /**************************
131 * Worker Parameters
132 **************************/
126 typedef struct 133 typedef struct
127 { struct barrier_t* barrier; 134 { struct barrier_t* barrier;
128 uint64_t totalWorkCycles; 135 uint64_t totalWorkCycles;
129 uint64_t totalBadCycles; 136 uint64_t totalBadCycles;
130 uint64_t totalSyncCycles; 137 uint64_t totalSyncCycles;
131 uint64_t totalBadSyncCycles; 138 uint64_t totalBadSyncCycles;
132 uint64 numGoodSyncs; 139 uint64 numGoodSyncs;
133 uint64 numGoodTasks; 140 uint64 numGoodTasks;
141 uint64_t coreID;
142 lossyCom__endpoint_t* localEndpoint;
143 lossyCom__exchange_t* centralMsgExchange;
144 unsigned int receivedACKs;
145 unsigned int broadcasterStatus;
146 unsigned int terminate;
134 } 147 }
135 WorkerParams; 148 WorkerParams;
136
137 149
138 typedef struct 150 typedef struct
139 { measurement_t *startExeCycles; 151 { measurement_t *startExeCycles;
140 measurement_t *endExeCycles; 152 measurement_t *endExeCycles;
141 } 153 }
142 BenchParams; 154 BenchParams;
143 155
156 typedef struct
157 {
158 lossyCom__endpointID_t receiverID;
159 lossyCom__msgBody_t msg;
160 } savedMsg_t;
161
144 //======================== Globals ========================= 162 //======================== Globals =========================
145 char __ProgrammName[] = "overhead_test"; 163 char __ProgrammName[] = "overhead_test";
146 char __DataSet[255]; 164 char __DataSet[255];
147 165
148 int outer_iters, inner_iters, num_threads; 166 int num_msg_to_send;
149 size_t chunk_size = 0; 167 size_t chunk_size = 0;
150 168
151 int cycles_counter_fd[NUM_CORES]; 169 int cycles_counter_fd[NUM_CORES];
152 struct perf_event_attr* hw_event; 170 struct perf_event_attr* hw_event;
153 171
154 WorkerParams *workerParamsArray; 172 WorkerParams *workerParamsArray;
173
174 // init random number
175 uint32_t seed1;
176 uint32_t seed2;
155 177
156 //======================== App Code ========================= 178 //======================== App Code =========================
157 /* 179 /*
158 * Workload 180 * Workload
159 */ 181 */
167 perror("Error reading cycles counter"); \ 189 perror("Error reading cycles counter"); \
168 cycles = 0; \ 190 cycles = 0; \
169 } \ 191 } \
170 } while (0) //macro magic for scoping 192 } while (0) //macro magic for scoping
171 193
194 extern inline uint32_t
195 randomNumber(uint32_t* seed1, uint32_t* seed2);
196
197 #define BROADCAST BROADCAST_ID
198 #define BROADCAST_ACK BROADCAST_ID-1
199 #define TERMINATE BROADCAST_ID-2
200
201 #define RECEIVING_BROADCAST 0
202 #define BROADCASTING 1
203 #define RECEIVING_ACK 2
204
205 /*
206 * Message Handler Function
207 */
208 void msgHandler(lossyCom__endpointID_t senderID, lossyCom__msgBody_t msg, void* data)
209 {
210 WorkerParams* threadData = (WorkerParams*)data;
211 lossyCom__endpoint_t* comEndpoint = threadData->localEndpoint;
212 lossyCom__endpointID_t receiverID;
213
214 if(msg == BROADCAST_ID) //answer broadcast message
215 {
216 lossyCom__sendMsg(comEndpoint, senderID, BROADCAST_ACK);
217 return;
218 }
219 if(msg == (BROADCAST_ACK) && threadData->broadcasterStatus == RECEIVING_ACK)
220 {
221 threadData->receivedACKs++;
222 if(threadData->receivedACKs == NUM_CORES/2)//chose next broadcaster
223 {
224 do{
225 receiverID = randomNumber(&seed1, &seed2) % NUM_CORES;
226 }while(receiverID == comEndpoint->endpointID);
227
228 //send the receiverID to the receiver to notify him that he is next
229 lossyCom__sendMsg(comEndpoint, receiverID, receiverID);
230 threadData->broadcasterStatus = RECEIVING_BROADCAST;
231 }
232 return;
233 }
234 if(msg == TERMINATE) //termination message
235 {
236 printf("endpoint %d received termination request\n", comEndpoint->endpointID);
237 threadData->terminate = TRUE;
238 return;
239 }
240 //
241 threadData->broadcasterStatus = BROADCASTING;
242 }
243
244 unsigned int global_broadcast_counter;
172 245
173 double 246 double
174 worker_TLF(void* _params, VirtProcr* animatingPr) 247 worker_TLF(void* _params, VirtProcr* animatingPr)
175 { 248 {
176 int i,o; 249 unsigned int msgCounter;
250 unsigned int broadcaster;
251 uint32_t wait_iterations;
177 WorkerParams* params = (WorkerParams*)_params; 252 WorkerParams* params = (WorkerParams*)_params;
178 unsigned int totalWorkCycles = 0, totalBadCycles = 0; 253 unsigned int totalWorkCycles = 0, totalBadCycles = 0;
179 unsigned int totalSyncCycles = 0, totalBadSyncCycles = 0; 254 unsigned int totalSyncCycles = 0, totalBadSyncCycles = 0;
180 unsigned int workspace1=0, numGoodSyncs = 0, numGoodTasks = 0; 255 unsigned int workspace1=0, numGoodSyncs = 0, numGoodTasks = 0;
181 double workspace2=0.0; 256 double workspace2=0.0;
257
258 //core 0 always starts
259 params->broadcasterStatus = params->coreID==0?BROADCASTING:RECEIVING_BROADCAST;
260
261 /*
182 int32 privateMutex = VPThread__make_mutex(animatingPr); 262 int32 privateMutex = VPThread__make_mutex(animatingPr);
183 263
184 int cpuid = sched_getcpu(); 264 int cpuid = sched_getcpu();
185 265
186 measurement_t startWorkload, endWorkload, startWorkload2, endWorkload2; 266 measurement_t startWorkload, endWorkload, startWorkload2, endWorkload2;
187 uint64 numCycles; 267 uint64 numCycles;
188 for(o=0; o < outer_iters; o++) 268 */
189 {
190 #ifdef MEASURE_PERF 269 #ifdef MEASURE_PERF
191 saveCyclesAndInstrs(cpuid,startWorkload.cycles); 270 saveCyclesAndInstrs(cpuid,startWorkload.cycles);
192 #endif 271 #endif
272
273 //initialize endpoint for communication
274 lossyCom__endpoint_t comEndpoint;
275 params->localEndpoint = &comEndpoint;
276 lossyCom__initialize_endpoint(&comEndpoint,
277 params->centralMsgExchange,
278 params->coreID,
279 msgHandler,
280 params);
281
282 lossyCom__endpointID_t receiverID;
283 msgCounter = 0;
284 while(msgCounter <= num_msg_to_send)
285 {
286 int i;
193 287
194 //workltask 288 if(params->broadcasterStatus == BROADCASTING)
195 for(i=0; i < inner_iters; i++)
196 { 289 {
197 workspace1 += (workspace1 + 32)/2; 290 if(msgCounter == num_msg_to_send)//send termination msg
198 workspace2 += (workspace2 + 23.2)/1.4; 291 {
292 lossyCom__sendMsg(&comEndpoint,BROADCAST_ID, TERMINATE);
293 break;
294 }else{ //generate and send random message
295 params->receivedACKs = 0;
296 lossyCom__sendMsg(&comEndpoint, BROADCAST_ID, BROADCAST);
297 global_broadcast_counter++;
298 if(global_broadcast_counter % 1000 == 0){
299 printf("broadcast count: %d\n", global_broadcast_counter);
300 }
301 params->broadcasterStatus = RECEIVING_ACK; //mark msg as send
302 msgCounter++;
303 }
199 } 304 }
305
306 //check if the benchmark should terminate
307 if(params->terminate)
308 break;
309
310 //receive msg
311 lossyCom__receiveMsg(&comEndpoint);
312 }
313
200 314
201 #ifdef MEASURE_PERF 315 #ifdef MEASURE_PERF
202 saveCyclesAndInstrs(cpuid,endWorkload.cycles); 316 saveCyclesAndInstrs(cpuid,endWorkload.cycles);
203 numCycles = endWorkload.cycles - startWorkload.cycles; 317 numCycles = endWorkload.cycles - startWorkload.cycles;
204 //sanity check (400K is about 20K iters) 318 //sanity check (400K is about 20K iters)
205 if( numCycles < 400000 ) {totalWorkCycles += numCycles; numGoodTasks++;} 319 if( numCycles < 400000 ) {totalWorkCycles += numCycles; numGoodTasks++;}
206 else {totalBadCycles += numCycles; } 320 else {totalBadCycles += numCycles; }
207 #endif 321 #endif
208 322
209 //mutex access often causes switch to different Slave VP 323 barrier_wait(params->barrier, animatingPr);
210 VPThread__mutex_lock(privateMutex, animatingPr);
211
212 /*
213 saveCyclesAndInstrs(cpuid,startWorkload2.cycles);
214 //Task
215 for(i=0; i < inner_iters; i++)
216 {
217 workspace1 += (workspace1 + 32)/2;
218 workspace2 += (workspace2 + 23.2)/1.4;
219 }
220
221 saveCyclesAndInstrs(cpuid,endWorkload2.cycles);
222 numCycles = endWorkload2.cycles - startWorkload2.cycles;
223 //sanity check (400K is about 20K iters)
224 if( numCycles < 400000 ) {totalWorkCycles += numCycles; numGoodTasks++;}
225 else {totalBadCycles += numCycles; }
226
227 */
228 VPThread__mutex_unlock(privateMutex, animatingPr);
229 }
230 324
231 params->totalWorkCycles = totalWorkCycles; 325 params->totalWorkCycles = totalWorkCycles;
232 params->totalBadCycles = totalBadCycles; 326 params->totalBadCycles = totalBadCycles;
233 params->numGoodTasks = numGoodTasks; 327 params->numGoodTasks = numGoodTasks;
234 params->totalSyncCycles = totalSyncCycles; 328 params->totalSyncCycles = totalSyncCycles;
237 /* 331 /*
238 params->totalSyncCycles = VMS__give_num_plugin_cycles(); 332 params->totalSyncCycles = VMS__give_num_plugin_cycles();
239 params->totalBadSyncCycles = 0; 333 params->totalBadSyncCycles = 0;
240 params->numGoodSyncs = VMS__give_num_plugin_animations(); 334 params->numGoodSyncs = VMS__give_num_plugin_animations();
241 */ 335 */
242
243
244 //Wait for all threads to end
245 barrier_wait(params->barrier, animatingPr);
246
247 //Shutdown worker 336 //Shutdown worker
248 VPThread__dissipate_thread(animatingPr); 337 VPThread__dissipate_thread(animatingPr);
249 338
250 //below return never reached --> there for gcc 339 //below return never reached --> there for gcc
251 return (workspace1 + workspace2); //to prevent gcc from optimizing work out 340 return (workspace1 + workspace2); //to prevent gcc from optimizing work out
253 342
254 343
255 /* this is run after the VMS is set up*/ 344 /* this is run after the VMS is set up*/
256 void benchmark(void *_params, VirtProcr *animatingPr) 345 void benchmark(void *_params, VirtProcr *animatingPr)
257 { 346 {
258 int i, cpuID; 347 int i, cpuID, idx;
259 struct barrier_t barr; 348 struct barrier_t barr;
260 BenchParams *params; 349 BenchParams *params;
261 350
262 params = (BenchParams *)_params; 351 params = (BenchParams *)_params;
263 352
264 barrier_init(&barr, num_threads+1, animatingPr); 353 barrier_init(&barr, NUM_CORES+1, animatingPr);
265 354
355 //Init central communication exchange
356 lossyCom__exchange_t* centralMsgExchange = lossyCom__initialize(NUM_CORES);
357
266 //prepare input 358 //prepare input
267 for(i=0; i<num_threads; i++) 359 for(i=0; i<NUM_CORES; i++)
268 { 360 {
269 workerParamsArray[i].barrier = &barr; 361 workerParamsArray[i].barrier = &barr;
270 } 362 workerParamsArray[i].coreID = i;
363 workerParamsArray[i].centralMsgExchange = centralMsgExchange;
364 workerParamsArray[i].terminate = FALSE;
365 }
366 global_broadcast_counter = 0;
367
368 // init random number generator for wait and msg content
369 seed1 = rand()%1000;
370 seed2 = rand()%1000;
271 371
272 //save cycles before execution of threads, to get total exe cycles 372 //save cycles before execution of threads, to get total exe cycles
273 measurement_t *startExeCycles, *endExeCycles; 373 measurement_t *startExeCycles, *endExeCycles;
274 startExeCycles = params->startExeCycles; 374 startExeCycles = params->startExeCycles;
275 375
278 sizeof(startExeCycles->cycles)); 378 sizeof(startExeCycles->cycles));
279 if(nread<0) perror("Error reading cycles counter"); 379 if(nread<0) perror("Error reading cycles counter");
280 #endif 380 #endif
281 381
282 //create (which starts running) all threads 382 //create (which starts running) all threads
283 for(i=0; i<num_threads; i++) 383 for(i=NUM_CORES-1; i>=0; i--)
284 { VPThread__create_thread((VirtProcrFnPtr)worker_TLF, &(workerParamsArray[i]), animatingPr); 384 {
285 } 385 VPThread__create_thread_with_affinity((VirtProcrFnPtr)worker_TLF,
286 //wait for all threads to finish 386 &(workerParamsArray[i]),
287 barrier_wait(&barr, animatingPr); 387 animatingPr,
388 i);//schedule to core i
389 }
288 390
289 #ifdef MEASURE_PERF 391 #ifdef MEASURE_PERF
290 //endBarrierCycles read in barrier_wait()! Merten, email me if want to chg 392 //endBarrierCycles read in barrier_wait()! Merten, email me if want to chg
291 params->endExeCycles->cycles = barr.endBarrierCycles.cycles; 393 params->endExeCycles->cycles = barr.endBarrierCycles.cycles;
292 #endif 394 #endif
293 395
396 barrier_wait(&barr, animatingPr);
397 printf("Total broadcast count: %d\n", global_broadcast_counter);
398
399 //print send msgs
400 /*
401 printf("sendMsgs = []\n");
402 for(i = 0; i<NUM_CORES; i++)
403 {
404 printf("sendMsgs.append([");
405 for(idx = 0; idx< workerParamsArray[i].sendMsgs->numInArray; idx++)
406 {
407 printf("(%lu, %lu),",
408 (uint64_t)(workerParamsArray[i].ptrToArrayOfSendMsgs[idx]) & 0xFFFFFFFF,
409 ((uint64_t)(workerParamsArray[i].ptrToArrayOfSendMsgs[idx]) >> 32 ) & 0xFFFFFFFF);
410 }
411 printf("])\n");
412 }
413
414
415 //print received msgs
416 printf("receivedMsgs = []\n");
417 for(i = 0; i<NUM_CORES; i++)
418 {
419 printf("receivedMsgs.append([");
420 for(idx = 0; idx< workerParamsArray[i].receivedMsgs->numInArray; idx++)
421 {
422 printf("(%lu, %lu),",
423 (uint64_t)(workerParamsArray[i].ptrToArrayOfReceivedMsgs[idx]) & 0xFFFFFFFF,
424 ((uint64_t)(workerParamsArray[i].ptrToArrayOfReceivedMsgs[idx]) >> 32 ) & 0xFFFFFFFF);
425 }
426 printf("])\n");
427 }*/
294 428
295 /* 429 /*
296 uint64_t overallWorkCycles = 0; 430 uint64_t overallWorkCycles = 0;
297 for(i=0; i<num_threads; i++){ 431 for(i=0; i<num_threads; i++){
298 printf("WorkCycles: %lu\n",input[i].totalWorkCycles); 432 printf("WorkCycles: %lu\n",input[i].totalWorkCycles);
319 { 453 {
320 if(argv[i][0] == '-' && argv[i][2] == 0) 454 if(argv[i][0] == '-' && argv[i][2] == 0)
321 { 455 {
322 switch(argv[i][1]) 456 switch(argv[i][1])
323 { 457 {
324 case 't': 458 case 'n':
325 if(!isdigit(argv[++i][0])) 459 if(!isdigit(argv[++i][0]))
326 { 460 {
327 fprintf(stderr, "-t must be followed by the number of worker threads to spawn\n"); 461 fprintf(stderr, "-t must be followed by the number messages to send per core\n");
328 return EXIT_FAILURE; 462 return EXIT_FAILURE;
329 } 463 }
330 num_threads = atoi(argv[i]); 464 num_msg_to_send = atoi(argv[i]);
331 if(!num_threads) 465 if(!num_msg_to_send)
332 { 466 {
333 fprintf(stderr, "invalid number of threads specified: %d\n", num_threads); 467 fprintf(stderr, "invalid number of messages to send: %d\n", num_msg_to_send);
334 return EXIT_FAILURE; 468 return EXIT_FAILURE;
335 } 469 }
336 break; 470 break;
337 case 'o':
338 if(!isdigit(argv[++i][0]))
339 {
340 fputs("-i must be followed by a number\n", stderr);
341 return EXIT_FAILURE;
342 }
343 outer_iters = atoi(argv[i]);
344 break;
345 case 'i':
346 if(!isdigit(argv[++i][0]))
347 {
348 fputs("-o must be followed by a number (workload size)\n", stderr);
349 return EXIT_FAILURE;
350 }
351 inner_iters = atoi(argv[i]);
352 break;
353 case 'h': 471 case 'h':
354 fputs(usage, stdout); 472 fputs(usage, stdout);
355 return 0; 473 return 0;
356
357 default: 474 default:
358 fprintf(stderr, "unrecognized argument: %s\n", argv[i]); 475 fprintf(stderr, "unrecognized argument: %s\n", argv[i]);
359 fputs(usage, stderr); 476 fputs(usage, stderr);
360 return EXIT_FAILURE; 477 return EXIT_FAILURE;
361 }//switch 478 }//switch
362 }//if arg 479 }//if arg
363 else 480 else
364 { 481 {
365 fprintf(stderr, "unrecognized argument: %s\n", argv[i]); 482 fprintf(stderr, "unrecognized argument: %s\n", argv[i]);
366 fputs(usage, stderr); 483 fputs(usage, stderr);
367 return EXIT_FAILURE; 484 return EXIT_FAILURE;
368 } 485 }
369 }//for 486 }//for
370 487
371 488
372 #ifdef MEASURE_PERF 489 #ifdef MEASURE_PERF
439 benchParams = malloc(sizeof(BenchParams)); 556 benchParams = malloc(sizeof(BenchParams));
440 557
441 benchParams->startExeCycles = &startExeCycles; 558 benchParams->startExeCycles = &startExeCycles;
442 benchParams->endExeCycles = &endExeCycles; 559 benchParams->endExeCycles = &endExeCycles;
443 560
444 workerParamsArray = (WorkerParams *)malloc( (num_threads + 1) * sizeof(WorkerParams) ); 561 workerParamsArray = (WorkerParams *)malloc( (NUM_CORES) * sizeof(WorkerParams) );
445 if(workerParamsArray == NULL ) printf("error mallocing worker params array\n"); 562 if(workerParamsArray == NULL ) printf("error mallocing worker params array\n");
446 563
447 564
448 //This is the transition to the VMS runtime 565 //This is the transition to the VMS runtime
449 VPThread__create_seed_procr_and_do_work( &benchmark, benchParams ); 566 VPThread__create_seed_procr_and_do_work( &benchmark, benchParams );
472 // printf("Sum across threads of Bad Sync cycles: %lu\n", totalBadSyncCyclesAcrossCores); 589 // printf("Sum across threads of Bad Sync cycles: %lu\n", totalBadSyncCyclesAcrossCores);
473 printf("Overhead per sync: %f\n", (double)totalOverhead / (double)numSyncs ); 590 printf("Overhead per sync: %f\n", (double)totalOverhead / (double)numSyncs );
474 printf("ExeCycles/WorkCycles Ratio %f\n", 591 printf("ExeCycles/WorkCycles Ratio %f\n",
475 (double)totalExeCycles / (double)totalWorkCyclesAcrossCores); 592 (double)totalExeCycles / (double)totalWorkCyclesAcrossCores);
476 #else 593 #else
477 printf("No measurement done!\n"); 594 printf("#No measurement done!\n");
478 #endif 595 #endif
479 return 0; 596 return 0;
480 } 597 }