/* 
 * 
 */

#include "main.h"
#include <pthread.h>

/*
 * Consumer.
 * 
 * Birth function for thread that performs the consumer behavior
 * 
 *Here's the protocol: 
 *Consumer is born waiting for some producer to send it a production.
 *When it wakes, it reads the variables used for communication, and packages
 * the information into a tuple it is constructing.
 *Then it wakes the producer, who is waiting to be sure that send was received.
 *If the tuple is not yet complete, it loops back for another production.
 *When tuple complete, it adds that tuple to the output
 *If that's the last tuple, it ends itself
 *If not, then wakes all the producers, who go to the next iteration.
 *Then loops back to wait for some producer to send it a production
 */
void* 
consumer_birthFn( void* _params )
 {
   int lastSeenProductionNum, numProducts;
   
   ConsumerParams* params = (ConsumerParams *)_params;
       
   
   /*The consumer does two loops.  
    * The outside loop counts the number of tuples created.
    * The inside loop collects the products for one tuple.
    * 
    *Protocol:
    * increment tupleIter
    * wake producers for next tuple
    * wait on production ready
    * read comm vars and add msg to current tuple
    * increment ACK count
    * wake producer (who is waiting for ack)
    * if more productions for current tuple, repeat
    * have all productions for current tuple, so add tuple to output
    * if have all tuples are going to produce, end
    * else more, so  repeat
    */
   while( tupleIter < params->numTuplesToCreate )
    {
      // increment tupleIter (global shared)
      // wake producers for next iter (don't need cond lock? -- teeter totter)
      DEBUG__printf("consumer broadcast for next iter\n");
      pthread_mutex_lock(&tupleIterLock);
      tupleIter += 1;
      pthread_cond_broadcast( &tupleIterCond );
      pthread_mutex_unlock(&tupleIterLock);
	  
      if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);       
         
      for( numProducts = 0; numProducts < params->numProducers; numProducts++ )
       { 
         //wait on productionReadyCond  (suspend until there is a production)
         pthread_mutex_lock( &productionReadyLock );
         while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID
          {
            pthread_cond_wait( &productionReadyCond, &productionReadyLock );
          }
         DEBUG__printf1( "consumer got production %d\n", currProductionNum );
         lastSeenProductionNum = currProductionNum;
         pthread_mutex_unlock( &productionReadyLock );
                  
         //Read comm vars and add msg to current tuple
         //add production to tuple -- overhead meas, do nothing
      
         // increment ACK count
         pthread_mutex_lock(     &consumerReceivedAckLock  );
         currConsumerReceivedACKNum += 1;//make different than last time prod saw
         
         // wake producer (who is waiting to be sure that send was received)
         DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum);
         pthread_cond_broadcast( &consumerReceivedAckCond );
         pthread_mutex_unlock(     &consumerReceivedAckLock  );
    } //if more productions for current tuple, repeat
    
      // have all productions for current tuple, so add tuple to output
      //add tuple to output and malloc new tuple array -- overhead meas, do nothing
      
    } // if have all tuples are going to produce, end

   //Shutdown consumer thread
   pthread_exit(NULL);
    
 }

