seanhalle@0: /* seanhalle@0: * seanhalle@0: */ seanhalle@0: seanhalle@0: #include "main.h" seanhalle@0: #include seanhalle@0: #include seanhalle@0: seanhalle@0: /* seanhalle@0: * Producer. seanhalle@0: * seanhalle@0: * Birth function for thread that performs the producer behavior seanhalle@0: * seanhalle@0: * Note: is pinned to a core, to facilitate collecting measurements seanhalle@0: */ seanhalle@0: void* seanhalle@0: producer_birthFn( void* _params ) seanhalle@0: { seanhalle@0: cpu_set_t cpuinfo; seanhalle@0: int lastTupleIter, oldConsumerReceivedACKNum; seanhalle@0: seanhalle@0: ProducerParams *params = (ProducerParams *)_params; seanhalle@0: seanhalle@0: lastTupleIter = 0; //compared to global tupleIter while waiting seanhalle@0: oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive seanhalle@0: seanhalle@0: /* -------------------------------------------------- seanhalle@0: * Pin thread to core, the producers are divided seanhalle@0: * equally over all cores. Pinning prohibits the seanhalle@0: * switching of cores so that perf counter and TSC values remain seanhalle@0: * from the same core between readings. Pinning shouldn't seanhalle@0: * affect results.. may be odd case when num thds doesn't divide into seanhalle@0: * num Cores seanhalle@0: * -------------------------------------------------- seanhalle@0: */ seanhalle@0: /* seanhalle@0: CPU_ZERO( &cpuinfo ); seanhalle@0: CPU_SET( params->coreID, &cpuinfo ); seanhalle@0: pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo ); seanhalle@0: pthread_yield(); //get off the core, so next can be created on it seanhalle@0: uint32_t cpuid = sched_getcpu(); seanhalle@0: */ seanhalle@0: seanhalle@0: seanhalle@0: /*Protocol: seanhalle@0: * wait for change in tupleIter (save updated tuple num for next time) seanhalle@0: * Get producer lock (only one producer at a time) seanhalle@0: * write into comm vars seanhalle@0: * get current ACK number seanhalle@0: * notify consumer seanhalle@0: * wait for ACK (get ACK lock, check on change in ACK number) seanhalle@0: * release producer lock seanhalle@0: * if not done, repeat seanhalle@0: */ seanhalle@0: while( lastTupleIter < params->numTuplesToCreate ) seanhalle@0: { seanhalle@0: //wait for change in tupleNum (save updated tuple num for next time) seanhalle@0: pthread_mutex_lock( &tupleIterLock ); seanhalle@0: while( lastTupleIter == tupleIter ) seanhalle@0: { seanhalle@0: pthread_cond_wait( &tupleIterCond, seanhalle@0: &tupleIterLock ); seanhalle@0: } seanhalle@0: pthread_mutex_unlock( &tupleIterLock ); seanhalle@0: seanhalle@0: lastTupleIter = tupleIter; //save for next time through loop seanhalle@0: seanhalle@0: DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter); seanhalle@0: seanhalle@0: //Two vars used to comm with consumer. One holds message to send, seanhalle@0: // other holds ID of producer sending. seanhalle@0: //Protect the two variables with a lock, that only one seanhalle@0: // producer can get. Update the variable with the message to be seanhalle@0: // communicated, and write ID of sender in second var. seanhalle@0: seanhalle@0: //Get producer lock seanhalle@0: pthread_mutex_lock( &producerAccessMutex ); seanhalle@0: seanhalle@0: // write into comm vars seanhalle@0: producerMessage = tupleIter; //just a dummy -- overhead meas, do nothing seanhalle@0: currProductionNum += 1; seanhalle@0: seanhalle@0: // get current ACK number seanhalle@0: oldConsumerReceivedACKNum = currConsumerReceivedACKNum; seanhalle@0: seanhalle@0: // notify consumer (don't think need the cond lock here -- teeter-totter) seanhalle@0: pthread_mutex_lock( &productionReadyLock ); seanhalle@0: DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID ); seanhalle@0: pthread_cond_broadcast( &productionReadyCond ); seanhalle@0: pthread_mutex_unlock( &productionReadyLock ); seanhalle@0: seanhalle@0: // wait for ACK (get ACK lock, check on change in ACK number) seanhalle@0: pthread_mutex_lock( &consumerReceivedAckLock ); seanhalle@0: while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum ) seanhalle@0: { seanhalle@0: pthread_cond_wait( &consumerReceivedAckCond, seanhalle@0: &consumerReceivedAckLock ); seanhalle@0: } seanhalle@0: pthread_mutex_unlock( &consumerReceivedAckLock ); seanhalle@0: DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum ); seanhalle@0: seanhalle@0: // release producer lock (so different producer can get and send) seanhalle@0: pthread_mutex_unlock( &producerAccessMutex ); seanhalle@0: } //if not done, do again seanhalle@0: seanhalle@0: //Shutdown producer seanhalle@0: pthread_exit(NULL); seanhalle@0: seanhalle@0: } seanhalle@0: