diff src/Application/consumer.c @ 0:9cf9b2091eeb

working condition variable version
author Sean Halle <seanhalle@yahoo.com>
date Wed, 10 Jul 2013 14:13:46 -0700
parents
children 88db7b62b961
line diff
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/src/Application/consumer.c	Wed Jul 10 14:13:46 2013 -0700
     1.3 @@ -0,0 +1,94 @@
     1.4 +/* 
     1.5 + * 
     1.6 + */
     1.7 +
     1.8 +#include "main.h"
     1.9 +#include <pthread.h>
    1.10 +
    1.11 +/*
    1.12 + * Consumer.
    1.13 + * 
    1.14 + * Birth function for thread that performs the consumer behavior
    1.15 + * 
    1.16 + *Here's the protocol: 
    1.17 + *Consumer is born waiting for some producer to send it a production.
    1.18 + *When it wakes, it reads the variables used for communication, and packages
    1.19 + * the information into a tuple it is constructing.
    1.20 + *Then it wakes the producer, who is waiting to be sure that send was received.
    1.21 + *If the tuple is not yet complete, it loops back for another production.
    1.22 + *When tuple complete, it adds that tuple to the output
    1.23 + *If that's the last tuple, it ends itself
    1.24 + *If not, then wakes all the producers, who go to the next iteration.
    1.25 + *Then loops back to wait for some producer to send it a production
    1.26 + */
    1.27 +void* 
    1.28 +consumer_birthFn( void* _params )
    1.29 + {
    1.30 +   int lastSeenProductionNum, numProducts;
    1.31 +   
    1.32 +   ConsumerParams* params = (ConsumerParams *)_params;
    1.33 +       
    1.34 +   
    1.35 +   /*The consumer does two loops.  
    1.36 +    * The outside loop counts the number of tuples created.
    1.37 +    * The inside loop collects the products for one tuple.
    1.38 +    * 
    1.39 +    *Protocol:
    1.40 +    * increment tupleIter
    1.41 +    * wake producers for next tuple
    1.42 +    * wait on production ready
    1.43 +    * read comm vars and add msg to current tuple
    1.44 +    * increment ACK count
    1.45 +    * wake producer (who is waiting for ack)
    1.46 +    * if more productions for current tuple, repeat
    1.47 +    * have all productions for current tuple, so add tuple to output
    1.48 +    * if have all tuples are going to produce, end
    1.49 +    * else more, so  repeat
    1.50 +    */
    1.51 +   while( tupleIter < params->numTuplesToCreate )
    1.52 +    {
    1.53 +      // increment tupleIter (global shared)
    1.54 +      // wake producers for next iter (don't need cond lock? -- teeter totter)
    1.55 +      DEBUG__printf("consumer broadcast for next iter\n");
    1.56 +      pthread_mutex_lock(&tupleIterLock);
    1.57 +      tupleIter += 1;
    1.58 +      pthread_cond_broadcast( &tupleIterCond );
    1.59 +      pthread_mutex_unlock(&tupleIterLock);
    1.60 +	  
    1.61 +      if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);       
    1.62 +         
    1.63 +      for( numProducts = 0; numProducts < params->numProducers; numProducts++ )
    1.64 +       { 
    1.65 +         //wait on productionReadyCond  (suspend until there is a production)
    1.66 +         pthread_mutex_lock( &productionReadyLock );
    1.67 +         while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID
    1.68 +          {
    1.69 +            pthread_cond_wait( &productionReadyCond, &productionReadyLock );
    1.70 +          }
    1.71 +         DEBUG__printf1( "consumer got production %d\n", currProductionNum );
    1.72 +         lastSeenProductionNum = currProductionNum;
    1.73 +         pthread_mutex_unlock( &productionReadyLock );
    1.74 +                  
    1.75 +         //Read comm vars and add msg to current tuple
    1.76 +         //add production to tuple -- overhead meas, do nothing
    1.77 +      
    1.78 +         // increment ACK count
    1.79 +         pthread_mutex_lock(     &consumerReceivedAckLock  );
    1.80 +         currConsumerReceivedACKNum += 1;//make different than last time prod saw
    1.81 +         
    1.82 +         // wake producer (who is waiting to be sure that send was received)
    1.83 +         DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum);
    1.84 +         pthread_cond_broadcast( &consumerReceivedAckCond );
    1.85 +         pthread_mutex_unlock(     &consumerReceivedAckLock  );
    1.86 +    } //if more productions for current tuple, repeat
    1.87 +    
    1.88 +      // have all productions for current tuple, so add tuple to output
    1.89 +      //add tuple to output and malloc new tuple array -- overhead meas, do nothing
    1.90 +      
    1.91 +    } // if have all tuples are going to produce, end
    1.92 +
    1.93 +   //Shutdown consumer thread
    1.94 +   pthread_exit(NULL);
    1.95 +    
    1.96 + }
    1.97 +