Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > pthread > pthread__k_tuple__async
diff src/Application/consumer.c @ 0:9cf9b2091eeb
working condition variable version
| author | Sean Halle <seanhalle@yahoo.com> |
|---|---|
| date | Wed, 10 Jul 2013 14:13:46 -0700 |
| parents | |
| children | 88db7b62b961 |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/src/Application/consumer.c Wed Jul 10 14:13:46 2013 -0700 1.3 @@ -0,0 +1,94 @@ 1.4 +/* 1.5 + * 1.6 + */ 1.7 + 1.8 +#include "main.h" 1.9 +#include <pthread.h> 1.10 + 1.11 +/* 1.12 + * Consumer. 1.13 + * 1.14 + * Birth function for thread that performs the consumer behavior 1.15 + * 1.16 + *Here's the protocol: 1.17 + *Consumer is born waiting for some producer to send it a production. 1.18 + *When it wakes, it reads the variables used for communication, and packages 1.19 + * the information into a tuple it is constructing. 1.20 + *Then it wakes the producer, who is waiting to be sure that send was received. 1.21 + *If the tuple is not yet complete, it loops back for another production. 1.22 + *When tuple complete, it adds that tuple to the output 1.23 + *If that's the last tuple, it ends itself 1.24 + *If not, then wakes all the producers, who go to the next iteration. 1.25 + *Then loops back to wait for some producer to send it a production 1.26 + */ 1.27 +void* 1.28 +consumer_birthFn( void* _params ) 1.29 + { 1.30 + int lastSeenProductionNum, numProducts; 1.31 + 1.32 + ConsumerParams* params = (ConsumerParams *)_params; 1.33 + 1.34 + 1.35 + /*The consumer does two loops. 1.36 + * The outside loop counts the number of tuples created. 1.37 + * The inside loop collects the products for one tuple. 1.38 + * 1.39 + *Protocol: 1.40 + * increment tupleIter 1.41 + * wake producers for next tuple 1.42 + * wait on production ready 1.43 + * read comm vars and add msg to current tuple 1.44 + * increment ACK count 1.45 + * wake producer (who is waiting for ack) 1.46 + * if more productions for current tuple, repeat 1.47 + * have all productions for current tuple, so add tuple to output 1.48 + * if have all tuples are going to produce, end 1.49 + * else more, so repeat 1.50 + */ 1.51 + while( tupleIter < params->numTuplesToCreate ) 1.52 + { 1.53 + // increment tupleIter (global shared) 1.54 + // wake producers for next iter (don't need cond lock? -- teeter totter) 1.55 + DEBUG__printf("consumer broadcast for next iter\n"); 1.56 + pthread_mutex_lock(&tupleIterLock); 1.57 + tupleIter += 1; 1.58 + pthread_cond_broadcast( &tupleIterCond ); 1.59 + pthread_mutex_unlock(&tupleIterLock); 1.60 + 1.61 + if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); 1.62 + 1.63 + for( numProducts = 0; numProducts < params->numProducers; numProducts++ ) 1.64 + { 1.65 + //wait on productionReadyCond (suspend until there is a production) 1.66 + pthread_mutex_lock( &productionReadyLock ); 1.67 + while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID 1.68 + { 1.69 + pthread_cond_wait( &productionReadyCond, &productionReadyLock ); 1.70 + } 1.71 + DEBUG__printf1( "consumer got production %d\n", currProductionNum ); 1.72 + lastSeenProductionNum = currProductionNum; 1.73 + pthread_mutex_unlock( &productionReadyLock ); 1.74 + 1.75 + //Read comm vars and add msg to current tuple 1.76 + //add production to tuple -- overhead meas, do nothing 1.77 + 1.78 + // increment ACK count 1.79 + pthread_mutex_lock( &consumerReceivedAckLock ); 1.80 + currConsumerReceivedACKNum += 1;//make different than last time prod saw 1.81 + 1.82 + // wake producer (who is waiting to be sure that send was received) 1.83 + DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum); 1.84 + pthread_cond_broadcast( &consumerReceivedAckCond ); 1.85 + pthread_mutex_unlock( &consumerReceivedAckLock ); 1.86 + } //if more productions for current tuple, repeat 1.87 + 1.88 + // have all productions for current tuple, so add tuple to output 1.89 + //add tuple to output and malloc new tuple array -- overhead meas, do nothing 1.90 + 1.91 + } // if have all tuples are going to produce, end 1.92 + 1.93 + //Shutdown consumer thread 1.94 + pthread_exit(NULL); 1.95 + 1.96 + } 1.97 +
