# HG changeset patch # User Sean Halle # Date 1373490826 25200 # Node ID 9cf9b2091eeb768d45cc73d819062f0c9ceb823f working condition variable version diff -r 000000000000 -r 9cf9b2091eeb .hgignore --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.hgignore Wed Jul 10 14:13:46 2013 -0700 @@ -0,0 +1,3 @@ +nbproject +task_size_vs_exe_time +glob: *.o diff -r 000000000000 -r 9cf9b2091eeb src/Application/consumer.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/Application/consumer.c Wed Jul 10 14:13:46 2013 -0700 @@ -0,0 +1,94 @@ +/* + * + */ + +#include "main.h" +#include + +/* + * Consumer. + * + * Birth function for thread that performs the consumer behavior + * + *Here's the protocol: + *Consumer is born waiting for some producer to send it a production. + *When it wakes, it reads the variables used for communication, and packages + * the information into a tuple it is constructing. + *Then it wakes the producer, who is waiting to be sure that send was received. + *If the tuple is not yet complete, it loops back for another production. + *When tuple complete, it adds that tuple to the output + *If that's the last tuple, it ends itself + *If not, then wakes all the producers, who go to the next iteration. + *Then loops back to wait for some producer to send it a production + */ +void* +consumer_birthFn( void* _params ) + { + int lastSeenProductionNum, numProducts; + + ConsumerParams* params = (ConsumerParams *)_params; + + + /*The consumer does two loops. + * The outside loop counts the number of tuples created. + * The inside loop collects the products for one tuple. + * + *Protocol: + * increment tupleIter + * wake producers for next tuple + * wait on production ready + * read comm vars and add msg to current tuple + * increment ACK count + * wake producer (who is waiting for ack) + * if more productions for current tuple, repeat + * have all productions for current tuple, so add tuple to output + * if have all tuples are going to produce, end + * else more, so repeat + */ + while( tupleIter < params->numTuplesToCreate ) + { + // increment tupleIter (global shared) + // wake producers for next iter (don't need cond lock? -- teeter totter) + DEBUG__printf("consumer broadcast for next iter\n"); + pthread_mutex_lock(&tupleIterLock); + tupleIter += 1; + pthread_cond_broadcast( &tupleIterCond ); + pthread_mutex_unlock(&tupleIterLock); + + if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); + + for( numProducts = 0; numProducts < params->numProducers; numProducts++ ) + { + //wait on productionReadyCond (suspend until there is a production) + pthread_mutex_lock( &productionReadyLock ); + while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID + { + pthread_cond_wait( &productionReadyCond, &productionReadyLock ); + } + DEBUG__printf1( "consumer got production %d\n", currProductionNum ); + lastSeenProductionNum = currProductionNum; + pthread_mutex_unlock( &productionReadyLock ); + + //Read comm vars and add msg to current tuple + //add production to tuple -- overhead meas, do nothing + + // increment ACK count + pthread_mutex_lock( &consumerReceivedAckLock ); + currConsumerReceivedACKNum += 1;//make different than last time prod saw + + // wake producer (who is waiting to be sure that send was received) + DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum); + pthread_cond_broadcast( &consumerReceivedAckCond ); + pthread_mutex_unlock( &consumerReceivedAckLock ); + } //if more productions for current tuple, repeat + + // have all productions for current tuple, so add tuple to output + //add tuple to output and malloc new tuple array -- overhead meas, do nothing + + } // if have all tuples are going to produce, end + + //Shutdown consumer thread + pthread_exit(NULL); + + } + diff -r 000000000000 -r 9cf9b2091eeb src/Application/main.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/Application/main.c Wed Jul 10 14:13:46 2013 -0700 @@ -0,0 +1,277 @@ +/* + * + */ + +#include "main.h" + +//========== Global Vars =========== + +const char *usage = + { + "Usage: k_tuple_async [options]\n" + " Creates a number of workers, and one consumer that packages productions " + " into a tuple.\n\n" + "Options:\n" + " -p The number of producer threads to create.\n" + " -t the number of tuples to create\n" + " -h this help screen\n\n" + }; + +char __ProgrammName[] = "K-tuple_async"; +char __DataSet[255]; + +#ifdef MEASURE_PERF +int cycles_counter_fd[NUM_CORES]; +int instrs_counter_fd[NUM_CORES]; +int cycles_counter_main_fd; +#endif + +pthread_mutex_t waitForAllDoneLock; +pthread_cond_t waitForAllDoneCond; + + +//=================================== +/* provide a millisecond-resolution timer for each system */ +#if defined(unix) || defined(__unix__) +#include +#include +unsigned long get_msec(void) { + static struct timeval timeval, first_timeval; + + gettimeofday(&timeval, 0); + if(first_timeval.tv_sec == 0) { + first_timeval = timeval; + return 0; + } + return (timeval.tv_sec - first_timeval.tv_sec) * 1000 + (timeval.tv_usec - first_timeval.tv_usec) / 1000; +} +#elif defined(__WIN32__) || defined(WIN32) +#include +unsigned long get_msec(void) { + return GetTickCount(); +} +#else +#error "I don't know how to measure time on your platform" +#endif + +/*Initializes the performance counters, and opens the file descriptors used + * to read from the performance counters + */ +void +set_up_performance_counters() + { int i; + + #ifdef MEASURE_PERF + //setup performance counters + struct perf_event_attr hw_event; + memset(&hw_event,0,sizeof(hw_event)); + hw_event.type = PERF_TYPE_HARDWARE; + hw_event.size = sizeof(hw_event); + hw_event.disabled = 0; + hw_event.freq = 0; + hw_event.inherit = 1; /* children inherit it */ + hw_event.pinned = 1; /* must always be on PMU */ + hw_event.exclusive = 0; /* only group on PMU */ + hw_event.exclude_user = 0; /* don't count user */ + hw_event.exclude_kernel = 1; /* ditto kernel */ + hw_event.exclude_hv = 1; /* ditto hypervisor */ + hw_event.exclude_idle = 1; /* don't count when idle */ + hw_event.mmap = 0; /* include mmap data */ + hw_event.comm = 0; /* include comm data */ + + + for( i = 0; i < NUM_CORES; i++ ) + { + hw_event.config = PERF_COUNT_HW_CPU_CYCLES; //cycles + cycles_counter_fd[i] = syscall(__NR_perf_event_open, &hw_event, + 0,//pid_t pid, + i,//int cpu, + -1,//int group_fd, + 0//unsigned long flags + ); + if (cycles_counter_fd[i]<0){ + fprintf(stderr,"On core %d: ",i); + perror("Failed to open cycles counter"); + } + } + + int cycles_counter_main_fd; + hw_event.config = PERF_COUNT_HW_CPU_CYCLES; //cycles + hw_event.exclude_kernel=0; + cycles_counter_main_fd = syscall(__NR_perf_event_open, &hw_event, + 0,//pid_t pid, + -1,//int cpu, + -1,//int group_fd, + 0//unsigned long flags + ); + if (cycles_counter_main_fd<0){ + perror("Failed to open main cycles counter"); + } + + #endif + } + + +void +init_stuff() + { + pthread_mutex_init(&tupleIterLock, NULL); + pthread_cond_init( &tupleIterCond, NULL ); + tupleIter = 0; + + pthread_mutex_init(&producerAccessMutex, NULL); + pthread_mutex_init(&productionReadyLock, NULL); + pthread_cond_init( &productionReadyCond, NULL ); + currProductionNum = 0; + + pthread_mutex_init(&consumerReceivedAckLock, NULL); + pthread_cond_init( &consumerReceivedAckCond, NULL ); + currConsumerReceivedACKNum = 0; + } + + +typedef struct + { + int numProducers; + int numTuplesToCreate; + } +ParsedArgs; + +/*The benchmark Fn creates the producers and the consumer, then gives the + * "go" signal. It measures time from go until the consumer produces the + * last tuple as output. + */ +void +benchmark( ParsedArgs *args ) + { + int i; + ProducerParams producerParams[args->numProducers]; + pthread_t producerThds[args->numProducers]; + pthread_t consumerThd; + + ConsumerParams consumerParams; + + //Set up the param structs for producers.. gives them the mutex and cond var + // to communicate with consumer + //Also the core the producer should pin its thread to + for(i=0; i < args->numProducers; i++) + { + producerParams[i].producerID = i + 1; //no ID of 0, a fact used in handshake + producerParams[i].numTuplesToCreate = args->numTuplesToCreate; + producerParams[i].coreID = i % NUM_CORES; + } + + consumerParams.numProducers = args->numProducers; + consumerParams.numTuplesToCreate = args->numTuplesToCreate; + + //take measurement before creation of threads, to get total exetime + MeasStruct benchStartMeas, benchEndMeas; + + takeAMeas(0, benchStartMeas); + + for(i=0; i < args->numProducers; i++) + { pthread_create( &producerThds[i], NULL, &producer_birthFn, (void*)&producerParams[i]); + } + + pthread_create( &consumerThd, NULL, &consumer_birthFn, (void*)&consumerParams ); + + for(i=0; inumProducers; i++) + { pthread_join( producerThds[i], NULL ); + } + pthread_join( consumerThd, NULL ); + + //work is all done, so take a measurement snapshot at end + takeAMeas(0, benchEndMeas); + + +#ifdef MEASURE_PERF + uint64_t totalExeCycles = ( benchEndMeas.cycles - benchStartMeas.cycles); + printf("Total Execution: %lu\n", totalExeCycles); +#else + uint64_t totalExeCycles = ( benchEndMeas.total - benchStartMeas.total); + printf("Total Cycles of Execution: %lu\n", totalExeCycles); +#endif + + //====================================================== + } + + +/*This parsed the command line arguments and returns the values in a struct + * Command line args should be a '-' followed by a single letter, then a value + */ +ParsedArgs * +parse_arguments( int argc, char **argv ) + { ParsedArgs *parsedArgs; + int i; + + parsedArgs = malloc(sizeof(ParsedArgs)); + if(argc < 2) + { fprintf(stdout, "must give arguments"); + fputs(usage, stdout); + return EXIT_FAILURE; + } + for( i=1; i < argc; i++ ) + { if(argv[i][0] == '-' && argv[i][2] == 0) + { switch(argv[i][1]) + { case 'p': + { if(!isdigit(argv[++i][0])) + { fprintf(stderr, "-p must be followed by the number of producer threads to spawn\n"); + return EXIT_FAILURE; + } + parsedArgs->numProducers = atoi(argv[i]); + if( parsedArgs->numProducers == 0 ) + { fprintf(stderr, "invalid number of producers specified: %d\n", parsedArgs->numProducers); + return EXIT_FAILURE; + } + else + { DEBUG__printf1("num producers: %d\n", parsedArgs->numProducers ); + } + } + break; + case 't': + { if( !isdigit( argv[++i][0] ) ) + { fputs("-t must be followed by a number\n", stderr); + return EXIT_FAILURE; + } + parsedArgs->numTuplesToCreate = atoi(argv[i]); + DEBUG__printf1("num tuples to produce: %d\n", parsedArgs->numTuplesToCreate ); + } + break; + case 'h': + { fputs(usage, stdout); + return 0; + } + default: + { fprintf(stderr, "unrecognized argument: %s\n", argv[i]); + fputs(usage, stderr); + return EXIT_FAILURE; + } + } + } + else + { fprintf(stdout, "unrecognized argument: %s\n", argv[i]); + fputs(usage, stdout); + return EXIT_FAILURE; + } + }//for + return parsedArgs; + } + +int main(int argc, char **argv) + { ParsedArgs *args; + int i; + + + set_up_performance_counters(); + + init_stuff(); + + args = parse_arguments( argc, argv); + + if( args < 10 ) return args +1; //non-zero exit when parsing went wrong + + benchmark( args ); + + return 0; + } + diff -r 000000000000 -r 9cf9b2091eeb src/Application/main.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/Application/main.h Wed Jul 10 14:13:46 2013 -0700 @@ -0,0 +1,166 @@ +/* + * + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +//========================== +//#define TURN_ON_DEBUG + +//========================== +#define NUM_CORES 4 + +//========================== + +//SELECT how the measurement is done +//only one must be enabled +#define MEASURE_TSC +//#define MEASURE_PERF + + +#if !defined(unix) && !defined(__unix__) +#ifdef __MACH__ +#define unix 1 +#define __unix__ 1 +#endif /* __MACH__ */ +#endif /* unix */ + +/* find the appropriate way to define explicitly sized types */ +/* for C99 or GNU libc (also mach's libc) we can use stdint.h */ +#if (__STDC_VERSION__ >= 199900) || defined(__GLIBC__) || defined(__MACH__) +#include +#elif defined(unix) || defined(__unix__) /* some UNIX systems have them in sys/types.h */ +#include +#elif defined(__WIN32__) || defined(WIN32) /* the nameless one */ +typedef unsigned __int8 uint8_t; +typedef unsigned __int32 uint32_t; +#endif /* sized type detection */ + + +//================== +#ifdef TURN_ON_DEBUG + #define DEBUG__printf(msg) printf(msg) + #define DEBUG__printf1(msg, arg1) printf(msg, arg1) + #define DEBUG__printf2(msg, arg1, arg2) printf(msg, arg1, arg2) +#else + #define DEBUG__printf(msg) + #define DEBUG__printf1(msg, arg1) + #define DEBUG__printf2(msg, arg1, arg2) +#endif +//===== RDTSC wrapper ===== //Does work for x86_64 compile + +#define saveTimeStampCountInto(low, high) \ + asm volatile("RDTSC; \ + movl %%eax, %0; \ + movl %%edx, %1;" \ + /* outputs */ : "=m" (low), "=m" (high)\ + /* inputs */ : \ + /* clobber */ : "%eax", "%edx" \ + ); + +#define saveLowTimeStampCountInto(low) \ + asm volatile("RDTSC; \ + movl %%eax, %0;" \ + /* outputs */ : "=m" (low) \ + /* inputs */ : \ + /* clobber */ : "%eax", "%edx" \ + ); + +//==================== + +union timeStamp + { + uint32_t lowHigh[2]; //lowHigh[0] is low, lowHigh[1] is high + uint64_t total; + }; + +struct perfData + { + uint64_t cycles; + uint64_t instructions; + }; + +//MEASURE_TSC should be mutually exclusive with MEASURE_PERF +#ifdef MEASURE_TSC + typedef union timeStamp MeasStruct; +#else + #ifdef MEASURE_PERF + typedef struct perfData MeasStruct; + #endif +#endif + + //fast way to collect time intervals, by putting into hist right away +#define makeAMeasHist( idx, name, numBins, startVal, binWidth ) \ + makeHighestDynArrayIndexBeAtLeast( _VMSMasterEnv->measHistsInfo, idx ); \ + _VMSMasterEnv->measHists[idx] = \ + makeFixedBinHist( numBins, startVal, binWidth, name ); + +//read and save current perf-counter readings for cycles and instrs +#ifdef MEASURE_PERF + #define takeAMeas(core, perfDataStruct) do{ \ + int cycles_fd = cycles_counter_fd[core]; \ + int nread; \ + \ + nread = read(cycles_fd,&(perfDataStruct.cycles),sizeof(perfDataStruct.cycles)); \ + if(nread<0){ \ + perror("Error reading cycles counter"); \ + cycles = 0; \ + } \ + } while (0) //macro magic for scoping +#else + #define takeAMeas(core, timeStampStruct) do{ \ + saveTimeStampCountInto(timeStampStruct.lowHigh[0], timeStampStruct.lowHigh[1]);\ + } while (0) //macro magic for scoping +#endif + + +typedef struct + { + int coreID; + int numTuplesToCreate; + int producerID; + + } +ProducerParams; + +typedef struct + { + int coreID; + int numTuplesToCreate; + int numProducers; + + } +ConsumerParams; + +//=========== Global Vars ============= +pthread_mutex_t tupleIterLock; +pthread_cond_t tupleIterCond; +int tupleIter; + +pthread_mutex_t producerAccessMutex; +pthread_mutex_t productionReadyLock; +pthread_cond_t productionReadyCond; +int currProductionNum; +int producerMessage; + +pthread_mutex_t consumerReceivedAckLock; +pthread_cond_t consumerReceivedAckCond; +int currConsumerReceivedACKNum; + +//======= +void* +producer_birthFn( void* _params ); +void* +consumer_birthFn( void* _params ); + + diff -r 000000000000 -r 9cf9b2091eeb src/Application/producer.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/Application/producer.c Wed Jul 10 14:13:46 2013 -0700 @@ -0,0 +1,110 @@ +/* + * + */ + +#include "main.h" +#include +#include + +/* + * Producer. + * + * Birth function for thread that performs the producer behavior + * + * Note: is pinned to a core, to facilitate collecting measurements + */ +void* +producer_birthFn( void* _params ) + { + cpu_set_t cpuinfo; + int lastTupleIter, oldConsumerReceivedACKNum; + + ProducerParams *params = (ProducerParams *)_params; + + lastTupleIter = 0; //compared to global tupleIter while waiting + oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive + + /* -------------------------------------------------- + * Pin thread to core, the producers are divided + * equally over all cores. Pinning prohibits the + * switching of cores so that perf counter and TSC values remain + * from the same core between readings. Pinning shouldn't + * affect results.. may be odd case when num thds doesn't divide into + * num Cores + * -------------------------------------------------- + */ + /* + CPU_ZERO( &cpuinfo ); + CPU_SET( params->coreID, &cpuinfo ); + pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo ); + pthread_yield(); //get off the core, so next can be created on it + uint32_t cpuid = sched_getcpu(); + */ + + + /*Protocol: + * wait for change in tupleIter (save updated tuple num for next time) + * Get producer lock (only one producer at a time) + * write into comm vars + * get current ACK number + * notify consumer + * wait for ACK (get ACK lock, check on change in ACK number) + * release producer lock + * if not done, repeat + */ + while( lastTupleIter < params->numTuplesToCreate ) + { + //wait for change in tupleNum (save updated tuple num for next time) + pthread_mutex_lock( &tupleIterLock ); + while( lastTupleIter == tupleIter ) + { + pthread_cond_wait( &tupleIterCond, + &tupleIterLock ); + } + pthread_mutex_unlock( &tupleIterLock ); + + lastTupleIter = tupleIter; //save for next time through loop + + DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter); + + //Two vars used to comm with consumer. One holds message to send, + // other holds ID of producer sending. + //Protect the two variables with a lock, that only one + // producer can get. Update the variable with the message to be + // communicated, and write ID of sender in second var. + + //Get producer lock + pthread_mutex_lock( &producerAccessMutex ); + + // write into comm vars + producerMessage = tupleIter; //just a dummy -- overhead meas, do nothing + currProductionNum += 1; + + // get current ACK number + oldConsumerReceivedACKNum = currConsumerReceivedACKNum; + + // notify consumer (don't think need the cond lock here -- teeter-totter) + pthread_mutex_lock( &productionReadyLock ); + DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID ); + pthread_cond_broadcast( &productionReadyCond ); + pthread_mutex_unlock( &productionReadyLock ); + + // wait for ACK (get ACK lock, check on change in ACK number) + pthread_mutex_lock( &consumerReceivedAckLock ); + while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum ) + { + pthread_cond_wait( &consumerReceivedAckCond, + &consumerReceivedAckLock ); + } + pthread_mutex_unlock( &consumerReceivedAckLock ); + DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum ); + + // release producer lock (so different producer can get and send) + pthread_mutex_unlock( &producerAccessMutex ); + } //if not done, do again + + //Shutdown producer + pthread_exit(NULL); + + } +