view main.c @ 21:08b37152b48d

cleanup
author Merten Sach <msach@mailbox.tu-berlin.de>
date Tue, 13 Mar 2012 19:09:11 +0100
parents 29b273cf3b1f
children
line source
1 /*
2 *
3 */
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <math.h>
8 #include <ctype.h>
9 #include <errno.h>
10 #include <pthread.h>
11 #include <unistd.h>
12 #include "VMS_Implementations/Vthread_impl/VPThread.h"
13 #include "C_Libraries/Queue_impl/PrivateQueue.h"
14 #include "C_Libraries/DynArray/DynArray.h"
15 #include "C_Libraries/BestEffortMessaging/LossyCom.h"
17 #include <linux/perf_event.h>
18 #include <linux/prctl.h>
19 #include <sys/syscall.h>
21 #undef DEBUG
22 //#define DEBUG
24 //#define MEASURE_PERF
26 #if !defined(unix) && !defined(__unix__)
27 #ifdef __MACH__
28 #define unix 1
29 #define __unix__ 1
30 #endif /* __MACH__ */
31 #endif /* unix */
33 /* find the appropriate way to define explicitly sized types */
34 /* for C99 or GNU libc (also mach's libc) we can use stdint.h */
35 #if (__STDC_VERSION__ >= 199900) || defined(__GLIBC__) || defined(__MACH__)
36 #include <stdint.h>
37 #elif defined(unix) || defined(__unix__) /* some UNIX systems have them in sys/types.h */
38 #include <sys/types.h>
39 #elif defined(__WIN32__) || defined(WIN32) /* the nameless one */
40 typedef unsigned __int8 uint8_t;
41 typedef unsigned __int32 uint32_t;
42 #endif /* sized type detection */
44 /* provide a millisecond-resolution timer for each system */
45 #if defined(unix) || defined(__unix__)
46 #include <time.h>
47 #include <sys/time.h>
48 unsigned long get_msec(void) {
49 static struct timeval timeval, first_timeval;
51 gettimeofday(&timeval, 0);
52 if(first_timeval.tv_sec == 0) {
53 first_timeval = timeval;
54 return 0;
55 }
56 return (timeval.tv_sec - first_timeval.tv_sec) * 1000 + (timeval.tv_usec - first_timeval.tv_usec) / 1000;
57 }
58 #elif defined(__WIN32__) || defined(WIN32)
59 #include <windows.h>
60 unsigned long get_msec(void) {
61 return GetTickCount();
62 }
63 #else
64 //#error "I don't know how to measure time on your platform"
65 #endif
67 //======================== Defines =========================
68 typedef struct perfData measurement_t;
69 struct perfData{
70 uint64 cycles;
71 uint64 instructions;
72 };
74 const char *usage = {
75 "Usage: msg_passing_test [options]\n"
76 " Starts threads equal to the number of cores and sends\n"
77 " messages to random receivers\n\n"
78 "Options:\n"
79 " -n <num> This specifies the number of sends done by each thread.\n"
80 " -h this help screen\n\n"
81 };
83 /***************************
84 * Barrier Implementation
85 ***************************/
87 struct barrier_t
88 {
89 int counter;
90 int nthreads;
91 int32 mutex;
92 int32 cond;
93 measurement_t endBarrierCycles;
95 };
96 typedef struct barrier_t barrier;
98 void inline barrier_init(barrier *barr, int nthreads, VirtProcr *animatingPr)
99 {
100 barr->counter = 0;
101 barr->nthreads = nthreads;
102 barr->mutex = VPThread__make_mutex(animatingPr);
103 barr->cond = VPThread__make_cond(barr->mutex, animatingPr);
104 }
106 int cycles_counter_main_fd;
107 void inline barrier_wait(barrier *barr, VirtProcr *animatingPr)
108 { int i;
110 VPThread__mutex_lock(barr->mutex, animatingPr);
111 barr->counter++;
112 if(barr->counter == barr->nthreads)
113 {
114 #ifdef MEASURE_PERF
115 read(cycles_counter_main_fd, &(barr->endBarrierCycles.cycles), \
116 sizeof(barr->endBarrierCycles.cycles));
117 #endif
119 barr->counter = 0;
120 for(i=0; i < barr->nthreads; i++)
121 VPThread__cond_signal(barr->cond, animatingPr);
122 }
123 else
124 { VPThread__cond_wait(barr->cond, animatingPr);
125 }
126 VPThread__mutex_unlock(barr->mutex, animatingPr);
127 }
130 /**************************
131 * Worker Parameters
132 **************************/
133 typedef struct
134 { struct barrier_t* barrier;
135 uint64_t totalWorkCycles;
136 uint64_t totalBadCycles;
137 uint64_t totalSyncCycles;
138 uint64_t totalBadSyncCycles;
139 uint64 numGoodSyncs;
140 uint64 numGoodTasks;
141 uint64_t coreID;
142 lossyCom__endpoint_t* localEndpoint;
143 lossyCom__exchange_t* centralMsgExchange;
144 unsigned int receivedACKs;
145 unsigned int broadcasterStatus;
146 unsigned int terminate;
147 }
148 WorkerParams;
150 typedef struct
151 { measurement_t *startExeCycles;
152 measurement_t *endExeCycles;
153 }
154 BenchParams;
156 typedef struct
157 {
158 lossyCom__endpointID_t receiverID;
159 lossyCom__msgBody_t msg;
160 } savedMsg_t;
162 //======================== Globals =========================
163 char __ProgrammName[] = "overhead_test";
164 char __DataSet[255];
166 int num_msg_to_send;
167 size_t chunk_size = 0;
169 int cycles_counter_fd[NUM_CORES];
170 struct perf_event_attr* hw_event;
172 WorkerParams *workerParamsArray;
174 // init random number
175 uint32_t seed1;
176 uint32_t seed2;
178 //======================== App Code =========================
179 /*
180 * Workload
181 */
183 #define saveCyclesAndInstrs(core,cycles) do{ \
184 int cycles_fd = cycles_counter_fd[core]; \
185 int nread; \
186 \
187 nread = read(cycles_fd,&(cycles),sizeof(cycles)); \
188 if(nread<0){ \
189 perror("Error reading cycles counter"); \
190 cycles = 0; \
191 } \
192 } while (0) //macro magic for scoping
194 extern inline uint32_t
195 randomNumber(uint32_t* seed1, uint32_t* seed2);
197 #define BROADCAST BROADCAST_ID
198 #define BROADCAST_ACK BROADCAST_ID-1
199 #define TERMINATE BROADCAST_ID-2
201 #define RECEIVING_BROADCAST 0
202 #define BROADCASTING 1
203 #define RECEIVING_ACK 2
205 /*
206 * Message Handler Function
207 */
208 void msgHandler(lossyCom__endpointID_t senderID, lossyCom__msgBody_t msg, void* data)
209 {
210 WorkerParams* threadData = (WorkerParams*)data;
211 lossyCom__endpoint_t* comEndpoint = threadData->localEndpoint;
212 lossyCom__endpointID_t receiverID;
214 if(msg == BROADCAST_ID) //answer broadcast message
215 {
216 lossyCom__sendMsg(comEndpoint, senderID, BROADCAST_ACK);
217 return;
218 }
219 if(msg == (BROADCAST_ACK) && threadData->broadcasterStatus == RECEIVING_ACK)
220 {
221 threadData->receivedACKs++;
222 if(threadData->receivedACKs == NUM_CORES-2)//chose next broadcaster
223 {
224 do{
225 receiverID = randomNumber(&seed1, &seed2) % NUM_CORES;
226 }while(receiverID == comEndpoint->endpointID);
228 //send the receiverID to the receiver to notify him that he is next
229 lossyCom__sendMsg(comEndpoint, receiverID, receiverID);
230 threadData->broadcasterStatus = RECEIVING_BROADCAST;
231 }
232 return;
233 }
234 if(msg == TERMINATE) //termination message
235 {
236 printf("endpoint %d received termination request\n", comEndpoint->endpointID);
237 threadData->terminate = TRUE;
238 return;
239 }
240 //I'm the next broadcaster!
241 threadData->broadcasterStatus = BROADCASTING;
242 }
244 unsigned int global_broadcast_counter;
246 double
247 worker_TLF(void* _params, VirtProcr* animatingPr)
248 {
249 unsigned int msgCounter;
250 WorkerParams* params = (WorkerParams*)_params;
251 unsigned int totalWorkCycles = 0, totalBadCycles = 0;
252 unsigned int totalSyncCycles = 0, totalBadSyncCycles = 0;
253 unsigned int workspace1=0, numGoodSyncs = 0, numGoodTasks = 0;
254 double workspace2=0.0;
256 //core 0 always starts
257 params->broadcasterStatus = params->coreID==0?BROADCASTING:RECEIVING_BROADCAST;
259 /*
260 int32 privateMutex = VPThread__make_mutex(animatingPr);
262 int cpuid = sched_getcpu();
264 measurement_t startWorkload, endWorkload, startWorkload2, endWorkload2;
265 uint64 numCycles;
266 */
267 #ifdef MEASURE_PERF
268 saveCyclesAndInstrs(cpuid,startWorkload.cycles);
269 #endif
271 //initialize endpoint for communication
272 lossyCom__endpoint_t comEndpoint;
273 params->localEndpoint = &comEndpoint;
274 lossyCom__initialize_endpoint(&comEndpoint,
275 params->centralMsgExchange,
276 params->coreID,
277 msgHandler,
278 params);
280 msgCounter = 0;
281 while(msgCounter <= num_msg_to_send)
282 {
283 if(params->broadcasterStatus == BROADCASTING)
284 {
285 if(msgCounter == num_msg_to_send)//send termination msg
286 {
287 lossyCom__broadcastMsg(&comEndpoint, TERMINATE);
288 break;
289 }else{ //generate and send random message
290 params->receivedACKs = 0;
291 lossyCom__broadcastMsg(&comEndpoint, BROADCAST);
292 global_broadcast_counter++;
293 if(global_broadcast_counter % 1000 == 0){
294 printf("broadcast count: %d\n", global_broadcast_counter);
295 }
296 params->broadcasterStatus = RECEIVING_ACK; //mark msg as send
297 msgCounter++;
298 }
299 }
301 //check if the benchmark should terminate
302 if(params->terminate)
303 break;
305 //receive msg
306 lossyCom__receiveMsg(&comEndpoint);
307 }
310 #ifdef MEASURE_PERF
311 saveCyclesAndInstrs(cpuid,endWorkload.cycles);
312 numCycles = endWorkload.cycles - startWorkload.cycles;
313 //sanity check (400K is about 20K iters)
314 if( numCycles < 400000 ) {totalWorkCycles += numCycles; numGoodTasks++;}
315 else {totalBadCycles += numCycles; }
316 #endif
318 //wait for all threads to finish
319 barrier_wait(params->barrier, animatingPr);
321 params->totalWorkCycles = totalWorkCycles;
322 params->totalBadCycles = totalBadCycles;
323 params->numGoodTasks = numGoodTasks;
324 params->totalSyncCycles = totalSyncCycles;
325 params->totalBadSyncCycles = totalBadSyncCycles;
326 params->numGoodSyncs = numGoodSyncs;
327 /*
328 params->totalSyncCycles = VMS__give_num_plugin_cycles();
329 params->totalBadSyncCycles = 0;
330 params->numGoodSyncs = VMS__give_num_plugin_animations();
331 */
332 //Shutdown worker
333 VPThread__dissipate_thread(animatingPr);
335 //below return never reached --> there for gcc
336 return (workspace1 + workspace2); //to prevent gcc from optimizing work out
337 }
340 /* this is run after the VMS is set up*/
341 void benchmark(void *_params, VirtProcr *animatingPr)
342 {
343 int i;
344 struct barrier_t barr;
345 BenchParams *params;
347 params = (BenchParams *)_params;
349 barrier_init(&barr, NUM_CORES+1, animatingPr);
351 //Init central communication exchange
352 lossyCom__exchange_t* centralMsgExchange = lossyCom__initialize(NUM_CORES);
354 //prepare input
355 for(i=0; i<NUM_CORES; i++)
356 {
357 workerParamsArray[i].barrier = &barr;
358 workerParamsArray[i].coreID = i;
359 workerParamsArray[i].centralMsgExchange = centralMsgExchange;
360 workerParamsArray[i].terminate = FALSE;
361 }
362 global_broadcast_counter = 0;
364 // init random number generator for wait and msg content
365 seed1 = rand()%1000;
366 seed2 = rand()%1000;
368 #ifdef MEASURE_PERF
369 //save cycles before execution of threads, to get total exe cycles
370 measurement_t *startExeCycles, *endExeCycles;
371 startExeCycles = params->startExeCycles;
374 int nread = read(cycles_counter_main_fd, &(startExeCycles->cycles),
375 sizeof(startExeCycles->cycles));
376 if(nread<0) perror("Error reading cycles counter");
377 #endif
379 //create (which starts running) all threads
380 for(i=NUM_CORES-1; i>=0; i--)
381 {
382 VPThread__create_thread_with_affinity((VirtProcrFnPtr)worker_TLF,
383 &(workerParamsArray[i]),
384 animatingPr,
385 i);//schedule to core i
386 }
388 #ifdef MEASURE_PERF
389 //endBarrierCycles read in barrier_wait()! Merten, email me if want to chg
390 params->endExeCycles->cycles = barr.endBarrierCycles.cycles;
391 #endif
393 barrier_wait(&barr, animatingPr);
394 printf("Total broadcast count: %d\n", global_broadcast_counter);
396 //print send msgs
397 /*
398 printf("sendMsgs = []\n");
399 for(i = 0; i<NUM_CORES; i++)
400 {
401 printf("sendMsgs.append([");
402 for(idx = 0; idx< workerParamsArray[i].sendMsgs->numInArray; idx++)
403 {
404 printf("(%lu, %lu),",
405 (uint64_t)(workerParamsArray[i].ptrToArrayOfSendMsgs[idx]) & 0xFFFFFFFF,
406 ((uint64_t)(workerParamsArray[i].ptrToArrayOfSendMsgs[idx]) >> 32 ) & 0xFFFFFFFF);
407 }
408 printf("])\n");
409 }
412 //print received msgs
413 printf("receivedMsgs = []\n");
414 for(i = 0; i<NUM_CORES; i++)
415 {
416 printf("receivedMsgs.append([");
417 for(idx = 0; idx< workerParamsArray[i].receivedMsgs->numInArray; idx++)
418 {
419 printf("(%lu, %lu),",
420 (uint64_t)(workerParamsArray[i].ptrToArrayOfReceivedMsgs[idx]) & 0xFFFFFFFF,
421 ((uint64_t)(workerParamsArray[i].ptrToArrayOfReceivedMsgs[idx]) >> 32 ) & 0xFFFFFFFF);
422 }
423 printf("])\n");
424 }*/
426 /*
427 uint64_t overallWorkCycles = 0;
428 for(i=0; i<num_threads; i++){
429 printf("WorkCycles: %lu\n",input[i].totalWorkCycles);
430 overallWorkCycles += input[i].totalWorkCycles;
431 }
433 printf("Sum across threads of work cycles: %lu\n", overallWorkCycles);
434 printf("Total Execution: %lu\n", endBenchTime.cycles-startBenchTime.cycles);
435 printf("Runtime/Workcycle Ratio %lu\n",
436 ((endBenchTime.cycles-startBenchTime.cycles)*100)/overallWorkCycles);
437 */
439 //======================================================
441 VPThread__dissipate_thread(animatingPr);
442 }
444 int main(int argc, char **argv)
445 {
446 int i;
448 //set global static variables, based on cmd-line args
449 for(i=1; i<argc; i++)
450 {
451 if(argv[i][0] == '-' && argv[i][2] == 0)
452 {
453 switch(argv[i][1])
454 {
455 case 'n':
456 if(!isdigit(argv[++i][0]))
457 {
458 fprintf(stderr, "-t must be followed by the number messages to send per core\n");
459 return EXIT_FAILURE;
460 }
461 num_msg_to_send = atoi(argv[i]);
462 if(!num_msg_to_send)
463 {
464 fprintf(stderr, "invalid number of messages to send: %d\n", num_msg_to_send);
465 return EXIT_FAILURE;
466 }
467 break;
468 case 'h':
469 fputs(usage, stdout);
470 return 0;
471 default:
472 fprintf(stderr, "unrecognized argument: %s\n", argv[i]);
473 fputs(usage, stderr);
474 return EXIT_FAILURE;
475 }//switch
476 }//if arg
477 else
478 {
479 fprintf(stderr, "unrecognized argument: %s\n", argv[i]);
480 fputs(usage, stderr);
481 return EXIT_FAILURE;
482 }
483 }//for
486 #ifdef MEASURE_PERF
487 //setup performance counters
488 hw_event = malloc(sizeof(struct perf_event_attr));
489 memset(hw_event,0,sizeof(struct perf_event_attr));
491 hw_event->type = PERF_TYPE_HARDWARE;
492 hw_event->size = sizeof(hw_event);
493 hw_event->disabled = 0;
494 hw_event->freq = 0;
495 hw_event->inherit = 1; /* children inherit it */
496 hw_event->pinned = 1; /* says this virt counter must always be on HW */
497 hw_event->exclusive = 0; /* only group on PMU */
498 hw_event->exclude_user = 0; /* don't count user */
499 hw_event->exclude_kernel = 1; /* don't count kernel */
500 hw_event->exclude_hv = 1; /* ditto hypervisor */
501 hw_event->exclude_idle = 1; /* don't count when idle */
502 hw_event->mmap = 0; /* include mmap data */
503 hw_event->comm = 0; /* include comm data */
505 hw_event->config = PERF_COUNT_HW_CPU_CYCLES; //cycles
507 int cpuID, retries;
509 for( cpuID = 0; cpuID < NUM_CORES; cpuID++ )
510 { retries = 0;
511 do
512 { retries += 1;
513 cycles_counter_fd[cpuID] =
514 syscall(__NR_perf_event_open, hw_event,
515 0,//pid_t: 0 is "pid of calling process"
516 cpuID,//int: cpu, the value returned by "CPUID" instr(?)
517 -1,//int: group_fd, -1 is "leader" or independent
518 0//unsigned long: flags
519 );
520 }
521 while(cycles_counter_fd[cpuID]<0 && retries < 100);
522 if(retries >= 100)
523 {
524 fprintf(stderr,"On core %d: ",cpuID);
525 perror("Failed to open cycles counter");
526 }
527 }
529 //Set up counter to accumulate total cycles to process, across all CPUs
531 retries = 0;
532 do
533 { retries += 1;
534 cycles_counter_main_fd =
535 syscall(__NR_perf_event_open, hw_event,
536 0,//pid_t: 0 is "pid of calling process"
537 -1,//int: cpu, -1 means accumulate from all cores
538 -1,//int: group_fd, -1 is "leader" == independent
539 0//unsigned long: flags
540 );
541 }
542 while(cycles_counter_main_fd<0 && retries < 100);
543 if(retries >= 100)
544 {
545 fprintf(stderr,"in main ");
546 perror("Failed to open cycles counter");
547 }
548 #endif
550 measurement_t startExeCycles, endExeCycles;
551 BenchParams *benchParams;
553 benchParams = malloc(sizeof(BenchParams));
555 benchParams->startExeCycles = &startExeCycles;
556 benchParams->endExeCycles = &endExeCycles;
558 workerParamsArray = (WorkerParams *)malloc( (NUM_CORES) * sizeof(WorkerParams) );
559 if(workerParamsArray == NULL ) printf("error mallocing worker params array\n");
562 //This is the transition to the VMS runtime
563 VPThread__create_seed_procr_and_do_work( &benchmark, benchParams );
565 #ifdef MEASURE_PERF
566 uint64_t totalWorkCyclesAcrossCores = 0, totalBadCyclesAcrossCores = 0;
567 uint64_t totalSyncCyclesAcrossCores = 0, totalBadSyncCyclesAcrossCores = 0;
568 for(i=0; i<num_threads; i++){
569 printf("WorkCycles: %lu\n",workerParamsArray[i].totalWorkCycles);
570 // printf("Num Good Tasks: %lu\n",workerParamsArray[i].numGoodTasks);
571 // printf("SyncCycles: %lu\n",workerParamsArray[i].totalSyncCycles);
572 // printf("Num Good Syncs: %lu\n",workerParamsArray[i].numGoodSyncs);
573 totalWorkCyclesAcrossCores += workerParamsArray[i].totalWorkCycles;
574 totalBadCyclesAcrossCores += workerParamsArray[i].totalBadCycles;
575 totalSyncCyclesAcrossCores += workerParamsArray[i].totalSyncCycles;
576 totalBadSyncCyclesAcrossCores += workerParamsArray[i].totalBadSyncCycles;
577 }
579 uint64_t totalExeCycles = endExeCycles.cycles - startExeCycles.cycles;
580 totalExeCycles -= totalBadCyclesAcrossCores;
581 uint64 totalOverhead = totalExeCycles - totalWorkCyclesAcrossCores;
582 int32 numSyncs = outer_iters * num_threads * 2;
583 printf("Total Execution Cycles: %lu\n", totalExeCycles);
584 printf("Sum across threads of work cycles: %lu\n", totalWorkCyclesAcrossCores);
585 printf("Sum across threads of bad work cycles: %lu\n", totalBadCyclesAcrossCores);
586 // printf("Sum across threads of Bad Sync cycles: %lu\n", totalBadSyncCyclesAcrossCores);
587 printf("Overhead per sync: %f\n", (double)totalOverhead / (double)numSyncs );
588 printf("ExeCycles/WorkCycles Ratio %f\n",
589 (double)totalExeCycles / (double)totalWorkCyclesAcrossCores);
590 #else
591 printf("#No measurement done!\n");
592 #endif
593 return 0;
594 }