annotate src/Application/producer.c @ 0:9cf9b2091eeb

working condition variable version
author Sean Halle <seanhalle@yahoo.com>
date Wed, 10 Jul 2013 14:13:46 -0700
parents
children 88db7b62b961
rev   line source
seanhalle@0 1 /*
seanhalle@0 2 *
seanhalle@0 3 */
seanhalle@0 4
seanhalle@0 5 #include "main.h"
seanhalle@0 6 #include <pthread.h>
seanhalle@0 7 #include <sched.h>
seanhalle@0 8
seanhalle@0 9 /*
seanhalle@0 10 * Producer.
seanhalle@0 11 *
seanhalle@0 12 * Birth function for thread that performs the producer behavior
seanhalle@0 13 *
seanhalle@0 14 * Note: is pinned to a core, to facilitate collecting measurements
seanhalle@0 15 */
seanhalle@0 16 void*
seanhalle@0 17 producer_birthFn( void* _params )
seanhalle@0 18 {
seanhalle@0 19 cpu_set_t cpuinfo;
seanhalle@0 20 int lastTupleIter, oldConsumerReceivedACKNum;
seanhalle@0 21
seanhalle@0 22 ProducerParams *params = (ProducerParams *)_params;
seanhalle@0 23
seanhalle@0 24 lastTupleIter = 0; //compared to global tupleIter while waiting
seanhalle@0 25 oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive
seanhalle@0 26
seanhalle@0 27 /* --------------------------------------------------
seanhalle@0 28 * Pin thread to core, the producers are divided
seanhalle@0 29 * equally over all cores. Pinning prohibits the
seanhalle@0 30 * switching of cores so that perf counter and TSC values remain
seanhalle@0 31 * from the same core between readings. Pinning shouldn't
seanhalle@0 32 * affect results.. may be odd case when num thds doesn't divide into
seanhalle@0 33 * num Cores
seanhalle@0 34 * --------------------------------------------------
seanhalle@0 35 */
seanhalle@0 36 /*
seanhalle@0 37 CPU_ZERO( &cpuinfo );
seanhalle@0 38 CPU_SET( params->coreID, &cpuinfo );
seanhalle@0 39 pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo );
seanhalle@0 40 pthread_yield(); //get off the core, so next can be created on it
seanhalle@0 41 uint32_t cpuid = sched_getcpu();
seanhalle@0 42 */
seanhalle@0 43
seanhalle@0 44
seanhalle@0 45 /*Protocol:
seanhalle@0 46 * wait for change in tupleIter (save updated tuple num for next time)
seanhalle@0 47 * Get producer lock (only one producer at a time)
seanhalle@0 48 * write into comm vars
seanhalle@0 49 * get current ACK number
seanhalle@0 50 * notify consumer
seanhalle@0 51 * wait for ACK (get ACK lock, check on change in ACK number)
seanhalle@0 52 * release producer lock
seanhalle@0 53 * if not done, repeat
seanhalle@0 54 */
seanhalle@0 55 while( lastTupleIter < params->numTuplesToCreate )
seanhalle@0 56 {
seanhalle@0 57 //wait for change in tupleNum (save updated tuple num for next time)
seanhalle@0 58 pthread_mutex_lock( &tupleIterLock );
seanhalle@0 59 while( lastTupleIter == tupleIter )
seanhalle@0 60 {
seanhalle@0 61 pthread_cond_wait( &tupleIterCond,
seanhalle@0 62 &tupleIterLock );
seanhalle@0 63 }
seanhalle@0 64 pthread_mutex_unlock( &tupleIterLock );
seanhalle@0 65
seanhalle@0 66 lastTupleIter = tupleIter; //save for next time through loop
seanhalle@0 67
seanhalle@0 68 DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter);
seanhalle@0 69
seanhalle@0 70 //Two vars used to comm with consumer. One holds message to send,
seanhalle@0 71 // other holds ID of producer sending.
seanhalle@0 72 //Protect the two variables with a lock, that only one
seanhalle@0 73 // producer can get. Update the variable with the message to be
seanhalle@0 74 // communicated, and write ID of sender in second var.
seanhalle@0 75
seanhalle@0 76 //Get producer lock
seanhalle@0 77 pthread_mutex_lock( &producerAccessMutex );
seanhalle@0 78
seanhalle@0 79 // write into comm vars
seanhalle@0 80 producerMessage = tupleIter; //just a dummy -- overhead meas, do nothing
seanhalle@0 81 currProductionNum += 1;
seanhalle@0 82
seanhalle@0 83 // get current ACK number
seanhalle@0 84 oldConsumerReceivedACKNum = currConsumerReceivedACKNum;
seanhalle@0 85
seanhalle@0 86 // notify consumer (don't think need the cond lock here -- teeter-totter)
seanhalle@0 87 pthread_mutex_lock( &productionReadyLock );
seanhalle@0 88 DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID );
seanhalle@0 89 pthread_cond_broadcast( &productionReadyCond );
seanhalle@0 90 pthread_mutex_unlock( &productionReadyLock );
seanhalle@0 91
seanhalle@0 92 // wait for ACK (get ACK lock, check on change in ACK number)
seanhalle@0 93 pthread_mutex_lock( &consumerReceivedAckLock );
seanhalle@0 94 while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum )
seanhalle@0 95 {
seanhalle@0 96 pthread_cond_wait( &consumerReceivedAckCond,
seanhalle@0 97 &consumerReceivedAckLock );
seanhalle@0 98 }
seanhalle@0 99 pthread_mutex_unlock( &consumerReceivedAckLock );
seanhalle@0 100 DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum );
seanhalle@0 101
seanhalle@0 102 // release producer lock (so different producer can get and send)
seanhalle@0 103 pthread_mutex_unlock( &producerAccessMutex );
seanhalle@0 104 } //if not done, do again
seanhalle@0 105
seanhalle@0 106 //Shutdown producer
seanhalle@0 107 pthread_exit(NULL);
seanhalle@0 108
seanhalle@0 109 }
seanhalle@0 110