annotate src/Application/producer.c @ 1:88db7b62b961

Working queue based version
author Sean Halle <seanhalle@yahoo.com>
date Wed, 10 Jul 2013 14:17:04 -0700
parents 9cf9b2091eeb
children
rev   line source
seanhalle@0 1 /*
seanhalle@0 2 *
seanhalle@0 3 */
seanhalle@0 4
seanhalle@0 5 #include "main.h"
seanhalle@0 6 #include <pthread.h>
seanhalle@0 7 #include <sched.h>
seanhalle@0 8
seanhalle@0 9 /*
seanhalle@0 10 * Producer.
seanhalle@0 11 *
seanhalle@0 12 * Birth function for thread that performs the producer behavior
seanhalle@0 13 */
seanhalle@0 14 void*
seanhalle@0 15 producer_birthFn( void* _params )
seanhalle@0 16 {
seanhalle@0 17 cpu_set_t cpuinfo;
seanhalle@0 18 int lastTupleIter, oldConsumerReceivedACKNum;
seanhalle@0 19
seanhalle@0 20 ProducerParams *params = (ProducerParams *)_params;
seanhalle@0 21
seanhalle@0 22 lastTupleIter = 0; //compared to global tupleIter while waiting
seanhalle@0 23
seanhalle@0 24 /* --------------------------------------------------
seanhalle@0 25 * Pin thread to core, the producers are divided
seanhalle@0 26 * equally over all cores. Pinning prohibits the
seanhalle@0 27 * switching of cores so that perf counter and TSC values remain
seanhalle@0 28 * from the same core between readings. Pinning shouldn't
seanhalle@0 29 * affect results.. may be odd case when num thds doesn't divide into
seanhalle@0 30 * num Cores
seanhalle@0 31 * --------------------------------------------------
seanhalle@0 32 */
seanhalle@0 33 /*
seanhalle@0 34 CPU_ZERO( &cpuinfo );
seanhalle@0 35 CPU_SET( params->coreID, &cpuinfo );
seanhalle@0 36 pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo );
seanhalle@0 37 pthread_yield(); //get off the core, so next can be created on it
seanhalle@0 38 uint32_t cpuid = sched_getcpu();
seanhalle@0 39 */
seanhalle@0 40
seanhalle@0 41
seanhalle@0 42 /*Protocol:
seanhalle@0 43 * wait for change in tupleIter (save updated tuple num for next time)
seanhalle@1 44 * Get queue lock
seanhalle@1 45 * write into queue
seanhalle@1 46 * release queue lock
seanhalle@0 47 * if not done, repeat
seanhalle@0 48 */
seanhalle@0 49 while( lastTupleIter < params->numTuplesToCreate )
seanhalle@0 50 {
seanhalle@0 51 //wait for change in tupleNum (save updated tuple num for next time)
seanhalle@0 52 pthread_mutex_lock( &tupleIterLock );
seanhalle@0 53 while( lastTupleIter == tupleIter )
seanhalle@0 54 {
seanhalle@0 55 pthread_cond_wait( &tupleIterCond,
seanhalle@0 56 &tupleIterLock );
seanhalle@0 57 }
seanhalle@1 58 pthread_mutex_unlock( &tupleIterLock );
seanhalle@0 59
seanhalle@0 60 lastTupleIter = tupleIter; //save for next time through loop
seanhalle@0 61
seanhalle@0 62 DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter);
seanhalle@0 63
seanhalle@1 64 //Q used to comm with consumer, protected with a lock
seanhalle@0 65
seanhalle@0 66 //Get producer lock
seanhalle@1 67 pthread_mutex_lock( &queueAccessLock );
seanhalle@1 68 writePrivQ( params, commQ ); //params is just a dummy pointer
seanhalle@1 69 pthread_mutex_unlock( &queueAccessLock );
seanhalle@0 70 } //if not done, do again
seanhalle@0 71
seanhalle@0 72 //Shutdown producer
seanhalle@0 73 pthread_exit(NULL);
seanhalle@0 74
seanhalle@0 75 }
seanhalle@0 76