Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > pthread > pthread__k_tuple__async
view src/Application/producer.c @ 1:88db7b62b961
Working queue based version
| author | Sean Halle <seanhalle@yahoo.com> |
|---|---|
| date | Wed, 10 Jul 2013 14:17:04 -0700 |
| parents | 9cf9b2091eeb |
| children |
line source
1 /*
2 *
3 */
5 #include "main.h"
6 #include <pthread.h>
7 #include <sched.h>
9 /*
10 * Producer.
11 *
12 * Birth function for thread that performs the producer behavior
13 */
14 void*
15 producer_birthFn( void* _params )
16 {
17 cpu_set_t cpuinfo;
18 int lastTupleIter, oldConsumerReceivedACKNum;
20 ProducerParams *params = (ProducerParams *)_params;
22 lastTupleIter = 0; //compared to global tupleIter while waiting
24 /* --------------------------------------------------
25 * Pin thread to core, the producers are divided
26 * equally over all cores. Pinning prohibits the
27 * switching of cores so that perf counter and TSC values remain
28 * from the same core between readings. Pinning shouldn't
29 * affect results.. may be odd case when num thds doesn't divide into
30 * num Cores
31 * --------------------------------------------------
32 */
33 /*
34 CPU_ZERO( &cpuinfo );
35 CPU_SET( params->coreID, &cpuinfo );
36 pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo );
37 pthread_yield(); //get off the core, so next can be created on it
38 uint32_t cpuid = sched_getcpu();
39 */
42 /*Protocol:
43 * wait for change in tupleIter (save updated tuple num for next time)
44 * Get queue lock
45 * write into queue
46 * release queue lock
47 * if not done, repeat
48 */
49 while( lastTupleIter < params->numTuplesToCreate )
50 {
51 //wait for change in tupleNum (save updated tuple num for next time)
52 pthread_mutex_lock( &tupleIterLock );
53 while( lastTupleIter == tupleIter )
54 {
55 pthread_cond_wait( &tupleIterCond,
56 &tupleIterLock );
57 }
58 pthread_mutex_unlock( &tupleIterLock );
60 lastTupleIter = tupleIter; //save for next time through loop
62 DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter);
64 //Q used to comm with consumer, protected with a lock
66 //Get producer lock
67 pthread_mutex_lock( &queueAccessLock );
68 writePrivQ( params, commQ ); //params is just a dummy pointer
69 pthread_mutex_unlock( &queueAccessLock );
70 } //if not done, do again
72 //Shutdown producer
73 pthread_exit(NULL);
75 }
