Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > pthread > pthread__k_tuple__async
comparison src/Application/consumer.c @ 0:9cf9b2091eeb
working condition variable version
| author | Sean Halle <seanhalle@yahoo.com> |
|---|---|
| date | Wed, 10 Jul 2013 14:13:46 -0700 |
| parents | |
| children | 88db7b62b961 |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:41753310be14 |
|---|---|
| 1 /* | |
| 2 * | |
| 3 */ | |
| 4 | |
| 5 #include "main.h" | |
| 6 #include <pthread.h> | |
| 7 | |
| 8 /* | |
| 9 * Consumer. | |
| 10 * | |
| 11 * Birth function for thread that performs the consumer behavior | |
| 12 * | |
| 13 *Here's the protocol: | |
| 14 *Consumer is born waiting for some producer to send it a production. | |
| 15 *When it wakes, it reads the variables used for communication, and packages | |
| 16 * the information into a tuple it is constructing. | |
| 17 *Then it wakes the producer, who is waiting to be sure that send was received. | |
| 18 *If the tuple is not yet complete, it loops back for another production. | |
| 19 *When tuple complete, it adds that tuple to the output | |
| 20 *If that's the last tuple, it ends itself | |
| 21 *If not, then wakes all the producers, who go to the next iteration. | |
| 22 *Then loops back to wait for some producer to send it a production | |
| 23 */ | |
| 24 void* | |
| 25 consumer_birthFn( void* _params ) | |
| 26 { | |
| 27 int lastSeenProductionNum, numProducts; | |
| 28 | |
| 29 ConsumerParams* params = (ConsumerParams *)_params; | |
| 30 | |
| 31 | |
| 32 /*The consumer does two loops. | |
| 33 * The outside loop counts the number of tuples created. | |
| 34 * The inside loop collects the products for one tuple. | |
| 35 * | |
| 36 *Protocol: | |
| 37 * increment tupleIter | |
| 38 * wake producers for next tuple | |
| 39 * wait on production ready | |
| 40 * read comm vars and add msg to current tuple | |
| 41 * increment ACK count | |
| 42 * wake producer (who is waiting for ack) | |
| 43 * if more productions for current tuple, repeat | |
| 44 * have all productions for current tuple, so add tuple to output | |
| 45 * if have all tuples are going to produce, end | |
| 46 * else more, so repeat | |
| 47 */ | |
| 48 while( tupleIter < params->numTuplesToCreate ) | |
| 49 { | |
| 50 // increment tupleIter (global shared) | |
| 51 // wake producers for next iter (don't need cond lock? -- teeter totter) | |
| 52 DEBUG__printf("consumer broadcast for next iter\n"); | |
| 53 pthread_mutex_lock(&tupleIterLock); | |
| 54 tupleIter += 1; | |
| 55 pthread_cond_broadcast( &tupleIterCond ); | |
| 56 pthread_mutex_unlock(&tupleIterLock); | |
| 57 | |
| 58 if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); | |
| 59 | |
| 60 for( numProducts = 0; numProducts < params->numProducers; numProducts++ ) | |
| 61 { | |
| 62 //wait on productionReadyCond (suspend until there is a production) | |
| 63 pthread_mutex_lock( &productionReadyLock ); | |
| 64 while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID | |
| 65 { | |
| 66 pthread_cond_wait( &productionReadyCond, &productionReadyLock ); | |
| 67 } | |
| 68 DEBUG__printf1( "consumer got production %d\n", currProductionNum ); | |
| 69 lastSeenProductionNum = currProductionNum; | |
| 70 pthread_mutex_unlock( &productionReadyLock ); | |
| 71 | |
| 72 //Read comm vars and add msg to current tuple | |
| 73 //add production to tuple -- overhead meas, do nothing | |
| 74 | |
| 75 // increment ACK count | |
| 76 pthread_mutex_lock( &consumerReceivedAckLock ); | |
| 77 currConsumerReceivedACKNum += 1;//make different than last time prod saw | |
| 78 | |
| 79 // wake producer (who is waiting to be sure that send was received) | |
| 80 DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum); | |
| 81 pthread_cond_broadcast( &consumerReceivedAckCond ); | |
| 82 pthread_mutex_unlock( &consumerReceivedAckLock ); | |
| 83 } //if more productions for current tuple, repeat | |
| 84 | |
| 85 // have all productions for current tuple, so add tuple to output | |
| 86 //add tuple to output and malloc new tuple array -- overhead meas, do nothing | |
| 87 | |
| 88 } // if have all tuples are going to produce, end | |
| 89 | |
| 90 //Shutdown consumer thread | |
| 91 pthread_exit(NULL); | |
| 92 | |
| 93 } | |
| 94 |
