Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > pthread > pthread__k_tuple__async
diff src/Application/consumer.c @ 1:88db7b62b961
Working queue based version
| author | Sean Halle <seanhalle@yahoo.com> |
|---|---|
| date | Wed, 10 Jul 2013 14:17:04 -0700 |
| parents | 9cf9b2091eeb |
| children |
line diff
1.1 --- a/src/Application/consumer.c Wed Jul 10 14:13:46 2013 -0700 1.2 +++ b/src/Application/consumer.c Wed Jul 10 14:17:04 2013 -0700 1.3 @@ -11,20 +11,16 @@ 1.4 * Birth function for thread that performs the consumer behavior 1.5 * 1.6 *Here's the protocol: 1.7 - *Consumer is born waiting for some producer to send it a production. 1.8 - *When it wakes, it reads the variables used for communication, and packages 1.9 - * the information into a tuple it is constructing. 1.10 - *Then it wakes the producer, who is waiting to be sure that send was received. 1.11 - *If the tuple is not yet complete, it loops back for another production. 1.12 - *When tuple complete, it adds that tuple to the output 1.13 - *If that's the last tuple, it ends itself 1.14 - *If not, then wakes all the producers, who go to the next iteration. 1.15 - *Then loops back to wait for some producer to send it a production 1.16 + * Reads a production from the shared Q. 1.17 + * If empty, yields and tries again. 1.18 + * When has a production from every producer, broadcasts next iter to producers 1.19 + * When has all the tuples, end 1.20 */ 1.21 void* 1.22 consumer_birthFn( void* _params ) 1.23 { 1.24 - int lastSeenProductionNum, numProducts; 1.25 + int numProducts; 1.26 + void *production; //dummy ptr 1.27 1.28 ConsumerParams* params = (ConsumerParams *)_params; 1.29 1.30 @@ -36,51 +32,36 @@ 1.31 *Protocol: 1.32 * increment tupleIter 1.33 * wake producers for next tuple 1.34 - * wait on production ready 1.35 - * read comm vars and add msg to current tuple 1.36 - * increment ACK count 1.37 - * wake producer (who is waiting for ack) 1.38 - * if more productions for current tuple, repeat 1.39 - * have all productions for current tuple, so add tuple to output 1.40 - * if have all tuples are going to produce, end 1.41 - * else more, so repeat 1.42 + * Reads a production from the shared Q. 1.43 + * If empty, yields and tries again. 1.44 + * if more productions in tuple, repeat 1.45 + * if have more tuples, repeat 1.46 + * end thread 1.47 */ 1.48 while( tupleIter < params->numTuplesToCreate ) 1.49 { 1.50 - // increment tupleIter (global shared) 1.51 - // wake producers for next iter (don't need cond lock? -- teeter totter) 1.52 + if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); 1.53 + 1.54 + // wake producers for next iter 1.55 DEBUG__printf("consumer broadcast for next iter\n"); 1.56 pthread_mutex_lock(&tupleIterLock); 1.57 - tupleIter += 1; 1.58 + tupleIter += 1; // increment tupleIter (global shared) 1.59 pthread_cond_broadcast( &tupleIterCond ); 1.60 pthread_mutex_unlock(&tupleIterLock); 1.61 - 1.62 - if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); 1.63 1.64 for( numProducts = 0; numProducts < params->numProducers; numProducts++ ) 1.65 { 1.66 - //wait on productionReadyCond (suspend until there is a production) 1.67 - pthread_mutex_lock( &productionReadyLock ); 1.68 - while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID 1.69 - { 1.70 - pthread_cond_wait( &productionReadyCond, &productionReadyLock ); 1.71 + // read a production from the shared Q. 1.72 + production = NULL; 1.73 + while( production == NULL ) 1.74 + { pthread_mutex_lock( &queueAccessLock ); 1.75 + production = readPrivQ( commQ ); 1.76 + pthread_mutex_unlock( &queueAccessLock ); 1.77 + // If empty, yields and tries again. 1.78 + if( production == NULL) sched_yield(); 1.79 } 1.80 DEBUG__printf1( "consumer got production %d\n", currProductionNum ); 1.81 - lastSeenProductionNum = currProductionNum; 1.82 - pthread_mutex_unlock( &productionReadyLock ); 1.83 - 1.84 - //Read comm vars and add msg to current tuple 1.85 - //add production to tuple -- overhead meas, do nothing 1.86 - 1.87 - // increment ACK count 1.88 - pthread_mutex_lock( &consumerReceivedAckLock ); 1.89 - currConsumerReceivedACKNum += 1;//make different than last time prod saw 1.90 - 1.91 - // wake producer (who is waiting to be sure that send was received) 1.92 - DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum); 1.93 - pthread_cond_broadcast( &consumerReceivedAckCond ); 1.94 - pthread_mutex_unlock( &consumerReceivedAckLock ); 1.95 - } //if more productions for current tuple, repeat 1.96 + } //if more productions for current tuple, repeat 1.97 1.98 // have all productions for current tuple, so add tuple to output 1.99 //add tuple to output and malloc new tuple array -- overhead meas, do nothing
