Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > pthread > pthread__k_tuple__async
comparison src/Application/producer.c @ 1:88db7b62b961
Working queue based version
| author | Sean Halle <seanhalle@yahoo.com> |
|---|---|
| date | Wed, 10 Jul 2013 14:17:04 -0700 |
| parents | 9cf9b2091eeb |
| children |
comparison
equal
deleted
inserted
replaced
| 0:ee1e1e746014 | 1:70163cf233d8 |
|---|---|
| 8 | 8 |
| 9 /* | 9 /* |
| 10 * Producer. | 10 * Producer. |
| 11 * | 11 * |
| 12 * Birth function for thread that performs the producer behavior | 12 * Birth function for thread that performs the producer behavior |
| 13 * | |
| 14 * Note: is pinned to a core, to facilitate collecting measurements | |
| 15 */ | 13 */ |
| 16 void* | 14 void* |
| 17 producer_birthFn( void* _params ) | 15 producer_birthFn( void* _params ) |
| 18 { | 16 { |
| 19 cpu_set_t cpuinfo; | 17 cpu_set_t cpuinfo; |
| 20 int lastTupleIter, oldConsumerReceivedACKNum; | 18 int lastTupleIter, oldConsumerReceivedACKNum; |
| 21 | 19 |
| 22 ProducerParams *params = (ProducerParams *)_params; | 20 ProducerParams *params = (ProducerParams *)_params; |
| 23 | 21 |
| 24 lastTupleIter = 0; //compared to global tupleIter while waiting | 22 lastTupleIter = 0; //compared to global tupleIter while waiting |
| 25 oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive | |
| 26 | 23 |
| 27 /* -------------------------------------------------- | 24 /* -------------------------------------------------- |
| 28 * Pin thread to core, the producers are divided | 25 * Pin thread to core, the producers are divided |
| 29 * equally over all cores. Pinning prohibits the | 26 * equally over all cores. Pinning prohibits the |
| 30 * switching of cores so that perf counter and TSC values remain | 27 * switching of cores so that perf counter and TSC values remain |
| 42 */ | 39 */ |
| 43 | 40 |
| 44 | 41 |
| 45 /*Protocol: | 42 /*Protocol: |
| 46 * wait for change in tupleIter (save updated tuple num for next time) | 43 * wait for change in tupleIter (save updated tuple num for next time) |
| 47 * Get producer lock (only one producer at a time) | 44 * Get queue lock |
| 48 * write into comm vars | 45 * write into queue |
| 49 * get current ACK number | 46 * release queue lock |
| 50 * notify consumer | |
| 51 * wait for ACK (get ACK lock, check on change in ACK number) | |
| 52 * release producer lock | |
| 53 * if not done, repeat | 47 * if not done, repeat |
| 54 */ | 48 */ |
| 55 while( lastTupleIter < params->numTuplesToCreate ) | 49 while( lastTupleIter < params->numTuplesToCreate ) |
| 56 { | 50 { |
| 57 //wait for change in tupleNum (save updated tuple num for next time) | 51 //wait for change in tupleNum (save updated tuple num for next time) |
| 59 while( lastTupleIter == tupleIter ) | 53 while( lastTupleIter == tupleIter ) |
| 60 { | 54 { |
| 61 pthread_cond_wait( &tupleIterCond, | 55 pthread_cond_wait( &tupleIterCond, |
| 62 &tupleIterLock ); | 56 &tupleIterLock ); |
| 63 } | 57 } |
| 64 pthread_mutex_unlock( &tupleIterLock ); | 58 pthread_mutex_unlock( &tupleIterLock ); |
| 65 | 59 |
| 66 lastTupleIter = tupleIter; //save for next time through loop | 60 lastTupleIter = tupleIter; //save for next time through loop |
| 67 | 61 |
| 68 DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter); | 62 DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter); |
| 69 | 63 |
| 70 //Two vars used to comm with consumer. One holds message to send, | 64 //Q used to comm with consumer, protected with a lock |
| 71 // other holds ID of producer sending. | |
| 72 //Protect the two variables with a lock, that only one | |
| 73 // producer can get. Update the variable with the message to be | |
| 74 // communicated, and write ID of sender in second var. | |
| 75 | 65 |
| 76 //Get producer lock | 66 //Get producer lock |
| 77 pthread_mutex_lock( &producerAccessMutex ); | 67 pthread_mutex_lock( &queueAccessLock ); |
| 78 | 68 writePrivQ( params, commQ ); //params is just a dummy pointer |
| 79 // write into comm vars | 69 pthread_mutex_unlock( &queueAccessLock ); |
| 80 producerMessage = tupleIter; //just a dummy -- overhead meas, do nothing | |
| 81 currProductionNum += 1; | |
| 82 | |
| 83 // get current ACK number | |
| 84 oldConsumerReceivedACKNum = currConsumerReceivedACKNum; | |
| 85 | |
| 86 // notify consumer (don't think need the cond lock here -- teeter-totter) | |
| 87 pthread_mutex_lock( &productionReadyLock ); | |
| 88 DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID ); | |
| 89 pthread_cond_broadcast( &productionReadyCond ); | |
| 90 pthread_mutex_unlock( &productionReadyLock ); | |
| 91 | |
| 92 // wait for ACK (get ACK lock, check on change in ACK number) | |
| 93 pthread_mutex_lock( &consumerReceivedAckLock ); | |
| 94 while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum ) | |
| 95 { | |
| 96 pthread_cond_wait( &consumerReceivedAckCond, | |
| 97 &consumerReceivedAckLock ); | |
| 98 } | |
| 99 pthread_mutex_unlock( &consumerReceivedAckLock ); | |
| 100 DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum ); | |
| 101 | |
| 102 // release producer lock (so different producer can get and send) | |
| 103 pthread_mutex_unlock( &producerAccessMutex ); | |
| 104 } //if not done, do again | 70 } //if not done, do again |
| 105 | 71 |
| 106 //Shutdown producer | 72 //Shutdown producer |
| 107 pthread_exit(NULL); | 73 pthread_exit(NULL); |
| 108 | 74 |
