seanhalle@0: /* seanhalle@0: * seanhalle@0: */ seanhalle@0: seanhalle@0: #include "main.h" seanhalle@0: #include seanhalle@0: #include seanhalle@0: seanhalle@0: /* seanhalle@0: * Producer. seanhalle@0: * seanhalle@0: * Birth function for thread that performs the producer behavior seanhalle@0: */ seanhalle@0: void* seanhalle@0: producer_birthFn( void* _params ) seanhalle@0: { seanhalle@0: cpu_set_t cpuinfo; seanhalle@0: int lastTupleIter, oldConsumerReceivedACKNum; seanhalle@0: seanhalle@0: ProducerParams *params = (ProducerParams *)_params; seanhalle@0: seanhalle@0: lastTupleIter = 0; //compared to global tupleIter while waiting seanhalle@0: seanhalle@0: /* -------------------------------------------------- seanhalle@0: * Pin thread to core, the producers are divided seanhalle@0: * equally over all cores. Pinning prohibits the seanhalle@0: * switching of cores so that perf counter and TSC values remain seanhalle@0: * from the same core between readings. Pinning shouldn't seanhalle@0: * affect results.. may be odd case when num thds doesn't divide into seanhalle@0: * num Cores seanhalle@0: * -------------------------------------------------- seanhalle@0: */ seanhalle@0: /* seanhalle@0: CPU_ZERO( &cpuinfo ); seanhalle@0: CPU_SET( params->coreID, &cpuinfo ); seanhalle@0: pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo ); seanhalle@0: pthread_yield(); //get off the core, so next can be created on it seanhalle@0: uint32_t cpuid = sched_getcpu(); seanhalle@0: */ seanhalle@0: seanhalle@0: seanhalle@0: /*Protocol: seanhalle@0: * wait for change in tupleIter (save updated tuple num for next time) seanhalle@1: * Get queue lock seanhalle@1: * write into queue seanhalle@1: * release queue lock seanhalle@0: * if not done, repeat seanhalle@0: */ seanhalle@0: while( lastTupleIter < params->numTuplesToCreate ) seanhalle@0: { seanhalle@0: //wait for change in tupleNum (save updated tuple num for next time) seanhalle@0: pthread_mutex_lock( &tupleIterLock ); seanhalle@0: while( lastTupleIter == tupleIter ) seanhalle@0: { seanhalle@0: pthread_cond_wait( &tupleIterCond, seanhalle@0: &tupleIterLock ); seanhalle@0: } seanhalle@1: pthread_mutex_unlock( &tupleIterLock ); seanhalle@0: seanhalle@0: lastTupleIter = tupleIter; //save for next time through loop seanhalle@0: seanhalle@0: DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter); seanhalle@0: seanhalle@1: //Q used to comm with consumer, protected with a lock seanhalle@0: seanhalle@0: //Get producer lock seanhalle@1: pthread_mutex_lock( &queueAccessLock ); seanhalle@1: writePrivQ( params, commQ ); //params is just a dummy pointer seanhalle@1: pthread_mutex_unlock( &queueAccessLock ); seanhalle@0: } //if not done, do again seanhalle@0: seanhalle@0: //Shutdown producer seanhalle@0: pthread_exit(NULL); seanhalle@0: seanhalle@0: } seanhalle@0: