diff src/Application/consumer.c @ 1:88db7b62b961

Working queue based version
author Sean Halle <seanhalle@yahoo.com>
date Wed, 10 Jul 2013 14:17:04 -0700
parents 9cf9b2091eeb
children
line diff
     1.1 --- a/src/Application/consumer.c	Wed Jul 10 14:13:46 2013 -0700
     1.2 +++ b/src/Application/consumer.c	Wed Jul 10 14:17:04 2013 -0700
     1.3 @@ -11,20 +11,16 @@
     1.4   * Birth function for thread that performs the consumer behavior
     1.5   * 
     1.6   *Here's the protocol: 
     1.7 - *Consumer is born waiting for some producer to send it a production.
     1.8 - *When it wakes, it reads the variables used for communication, and packages
     1.9 - * the information into a tuple it is constructing.
    1.10 - *Then it wakes the producer, who is waiting to be sure that send was received.
    1.11 - *If the tuple is not yet complete, it loops back for another production.
    1.12 - *When tuple complete, it adds that tuple to the output
    1.13 - *If that's the last tuple, it ends itself
    1.14 - *If not, then wakes all the producers, who go to the next iteration.
    1.15 - *Then loops back to wait for some producer to send it a production
    1.16 + * Reads a production from the shared Q. 
    1.17 + *   If empty, yields and tries again.
    1.18 + * When has a production from every producer, broadcasts next iter to producers
    1.19 + * When has all the tuples, end
    1.20   */
    1.21  void* 
    1.22  consumer_birthFn( void* _params )
    1.23   {
    1.24 -   int lastSeenProductionNum, numProducts;
    1.25 +   int numProducts;
    1.26 +   void *production; //dummy ptr
    1.27     
    1.28     ConsumerParams* params = (ConsumerParams *)_params;
    1.29         
    1.30 @@ -36,51 +32,36 @@
    1.31      *Protocol:
    1.32      * increment tupleIter
    1.33      * wake producers for next tuple
    1.34 -    * wait on production ready
    1.35 -    * read comm vars and add msg to current tuple
    1.36 -    * increment ACK count
    1.37 -    * wake producer (who is waiting for ack)
    1.38 -    * if more productions for current tuple, repeat
    1.39 -    * have all productions for current tuple, so add tuple to output
    1.40 -    * if have all tuples are going to produce, end
    1.41 -    * else more, so  repeat
    1.42 +    * Reads a production from the shared Q. 
    1.43 +    *   If empty, yields and tries again.
    1.44 +    * if more productions in tuple, repeat
    1.45 +    * if have more tuples, repeat
    1.46 +    * end thread
    1.47      */
    1.48     while( tupleIter < params->numTuplesToCreate )
    1.49      {
    1.50 -      // increment tupleIter (global shared)
    1.51 -      // wake producers for next iter (don't need cond lock? -- teeter totter)
    1.52 +      if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);
    1.53 +       
    1.54 +      // wake producers for next iter
    1.55        DEBUG__printf("consumer broadcast for next iter\n");
    1.56        pthread_mutex_lock(&tupleIterLock);
    1.57 -      tupleIter += 1;
    1.58 +      tupleIter += 1; // increment tupleIter (global shared)
    1.59        pthread_cond_broadcast( &tupleIterCond );
    1.60        pthread_mutex_unlock(&tupleIterLock);
    1.61 -	  
    1.62 -      if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);       
    1.63           
    1.64        for( numProducts = 0; numProducts < params->numProducers; numProducts++ )
    1.65         { 
    1.66 -         //wait on productionReadyCond  (suspend until there is a production)
    1.67 -         pthread_mutex_lock( &productionReadyLock );
    1.68 -         while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID
    1.69 -          {
    1.70 -            pthread_cond_wait( &productionReadyCond, &productionReadyLock );
    1.71 +         // read a production from the shared Q.
    1.72 +         production = NULL;
    1.73 +         while( production == NULL )
    1.74 +          { pthread_mutex_lock( &queueAccessLock );
    1.75 +            production = readPrivQ( commQ );
    1.76 +            pthread_mutex_unlock( &queueAccessLock );
    1.77 +            // If empty, yields and tries again.
    1.78 +            if( production == NULL) sched_yield();
    1.79            }
    1.80           DEBUG__printf1( "consumer got production %d\n", currProductionNum );
    1.81 -         lastSeenProductionNum = currProductionNum;
    1.82 -         pthread_mutex_unlock( &productionReadyLock );
    1.83 -                  
    1.84 -         //Read comm vars and add msg to current tuple
    1.85 -         //add production to tuple -- overhead meas, do nothing
    1.86 -      
    1.87 -         // increment ACK count
    1.88 -         pthread_mutex_lock(     &consumerReceivedAckLock  );
    1.89 -         currConsumerReceivedACKNum += 1;//make different than last time prod saw
    1.90 -         
    1.91 -         // wake producer (who is waiting to be sure that send was received)
    1.92 -         DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum);
    1.93 -         pthread_cond_broadcast( &consumerReceivedAckCond );
    1.94 -         pthread_mutex_unlock(     &consumerReceivedAckLock  );
    1.95 -    } //if more productions for current tuple, repeat
    1.96 +       } //if more productions for current tuple, repeat
    1.97      
    1.98        // have all productions for current tuple, so add tuple to output
    1.99        //add tuple to output and malloc new tuple array -- overhead meas, do nothing