view pthreads_kmeans.c @ 2:467746c73fd0

DataSet print
author Merten Sach <msach@mailbox.tu-berlin.de>
date Wed, 28 Sep 2011 15:07:32 +0200
parents e69e4c2d612a
children
line source
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* File: pthreads_kmeans.c (OpenMP version) */
3 /* Description: Implementation of simple k-means clustering algorithm */
4 /* This program takes an array of N data objects, each with */
5 /* M coordinates and performs a k-means clustering given a */
6 /* user-provided value of the number of clusters (K). The */
7 /* clustering results are saved in 2 arrays: */
8 /* 1. a returned array of size [K][N] indicating the center */
9 /* coordinates of K clusters */
10 /* 2. membership[N] stores the cluster center ids, each */
11 /* corresponding to the cluster a data object is assigned */
12 /* */
13 /* Author: Wei-keng Liao */
14 /* ECE Department, Northwestern University */
15 /* email: wkliao@ece.northwestern.edu */
16 /* Copyright, 2005, Wei-keng Liao */
17 /* */
18 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <pthread.h>
23 #include <time.h>
24 #include <math.h>
25 #include "kmeans.h"
27 #include "VPThread_lib/VPThread.h"
29 #define PREC 300
31 struct barrier_t
32 {
33 int counter;
34 int nthreads;
35 int32 mutex;
36 int32 cond;
37 };
38 typedef struct barrier_t barrier;
40 extern int nthreads; /* Thread count */
41 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */
42 volatile int finished;
44 barrier barr;
45 int32 lock1;
46 pthread_attr_t attr;
48 void inline barrier_init(barrier *barr, int nthreads, VirtProcr *VProc)
49 {
50 barr->counter = 0;
51 barr->nthreads = nthreads;
52 barr->mutex = VPThread__make_mutex(VProc);
53 barr->cond = VPThread__make_cond(barr->mutex, VProc);
54 }
56 void inline barrier_wait(barrier *barr, VirtProcr *VProc)
57 {
58 int i;
60 VPThread__mutex_lock(barr->mutex, VProc);
61 barr->counter++;
62 if(barr->counter == barr->nthreads)
63 {
64 barr->counter = 0;
65 for(i=0; i < barr->nthreads; i++)
66 VPThread__cond_signal(barr->cond, VProc);
67 }
68 else
69 {
70 VPThread__cond_wait(barr->cond, VProc);
71 }
72 VPThread__mutex_unlock(barr->mutex, VProc);
73 }
75 /*
76 * Struct: input
77 * -------------
78 * Encapsulates all the input data for the benchmark, i.e. the object list,
79 * clustering output and the input statistics.
80 */
81 struct input {
82 int t;
83 double **objects; /* Object list */
84 double **clusters; /* Cluster list, out: [numClusters][numCoords] */
85 int *membership; /* For each object, contains the index of the cluster it currently belongs to */
86 int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */
87 double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */
88 int numObjs,numClusters,numCoords;
89 };
91 /*
92 * Function: euclid_dist_2
93 * -----------------------
94 * Computes the square of the euclidean distance between two multi-dimensional points.
95 */
96 __inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) {
97 int i;
98 double ans=0.0;
100 for (i=0; i<numdims; i++)
101 ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]);
103 return(ans);
104 }
106 /*
107 * Function: find_nearest_cluster
108 * ------------------------------
109 * Function determining the cluster center which is closest to the given object.
110 * Returns the index of that cluster center.
111 */
112 __inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) {
113 int index, i;
114 double dist, min_dist;
116 /* find the cluster id that has min distance to object */
117 index = 0;
118 min_dist = euclid_dist_2(numCoords, object, clusters[0]);
119 for (i=1; i<numClusters; i++) {
120 dist = euclid_dist_2(numCoords, object, clusters[i]);
122 /* no need square root */
123 if (dist < min_dist) { /* find the min and its array index */
124 min_dist = dist;
125 index = i;
126 }
127 }
128 return index;
129 }
131 void work(struct input *x, VirtProcr *VProc){
132 int tid = x->t;
133 double local_delta=0;
134 int i;
135 for (i = tid; i < x->numObjs; i += nthreads) {
136 /* find the array index of nearest cluster center */
137 int index = find_nearest_cluster(x->numClusters, x->numCoords,
138 x->objects[i], x->clusters);
140 /* if membership changes, increase delta by 1 */
141 if (x->membership[i] != index)
142 local_delta += 1.0;
143 /* assign the membership to object i */
144 x->membership[i] = index;
146 /* update new cluster centers : sum of all objects located
147 within (average will be performed later) */
148 x->local_newClusterSize[tid][index]++;
149 int j;
150 for (j=0; j < x->numCoords; j++)
151 x->local_newClusters[tid][index][j] += x->objects[i][j];
153 }
154 VPThread__mutex_lock(lock1, VProc);
155 delta +=local_delta;
156 VPThread__mutex_unlock(lock1, VProc);
157 }
159 /*
160 * Function: thread function work
161 * --------------
162 * Worker function for threading. Work distribution is done so that each thread computers
163 */
164 void tfwork(void *ip, VirtProcr *VProc)
165 {
166 struct input *x;
167 x = (struct input *)ip;
169 for(;;){
170 barrier_wait(&barr, VProc);
171 if (finished){
172 break;
173 }
174 work(x, VProc);
175 barrier_wait(&barr, VProc);
176 }
178 //pthread_exit(NULL);
179 VPThread__dissipate_thread(VProc);
180 }
182 /*
183 * Function: create_array_2d_f
184 * --------------------------
185 * Allocates memory for a 2-dim double array as needed for the algorithm.
186 */
187 double** create_array_2d_f(int height, int width, VirtProcr *VProc) {
188 double** ptr;
189 int i;
190 ptr = VPThread__malloc(height * sizeof(double*), VProc);
191 assert(ptr != NULL);
192 ptr[0] = VPThread__malloc(width * height * sizeof(double), VProc);
193 assert(ptr[0] != NULL);
194 /* Assign pointers correctly */
195 for(i = 1; i < height; i++)
196 ptr[i] = ptr[i-1] + width;
197 return ptr;
198 }
200 /*
201 * Function: create_array_2Dd_i
202 * --------------------------
203 * Allocates memory for a 2-dim integer array as needed for the algorithm.
204 */
205 int** create_array_2d_i(int height, int width, VirtProcr *VProc) {
206 int** ptr;
207 int i;
208 ptr = VPThread__malloc(height * sizeof(int*), VProc);
209 assert(ptr != NULL);
210 ptr[0] = VPThread__malloc(width * height * sizeof(int), VProc);
211 assert(ptr[0] != NULL);
212 /* Assign pointers correctly */
213 for(i = 1; i < height; i++)
214 ptr[i] = ptr[i-1] + width;
215 return ptr;
216 }
218 /*
219 * Function: pthreads_kmeans
220 * -------------------------
221 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords].
222 */
223 void pthreads_kmeans(void *data, VirtProcr *VProc)
224 {
225 struct call_data *cluster_data = (struct call_data*)data;
226 //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */
227 double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */
228 int numCoords = cluster_data->numCoords; /* no. coordinates */
229 int numObjs = cluster_data->numObjs; /* no. objects */
230 int numClusters = cluster_data->numClusters; /* no. clusters */
231 double threshold = cluster_data->threshold; /* % objects change membership */
232 int *membership = cluster_data->membership; /* out: [numObjs] */
234 int i, j, k, loop = 0;
235 int *newClusterSize; /* [numClusters]: no. objects assigned in each
236 new cluster */
237 double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */
238 double **newClusters; /* [numClusters][numCoords] */
239 //double timing;
240 int **local_newClusterSize; /* [nthreads][numClusters] */
241 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */
243 VirtProcr **thread;
245 /* === MEMORY SETUP === */
247 /* [numClusters] clusters of [numCoords] double coordinates each */
248 //Set pointers
249 for(i = 1; i < numClusters; i++)
250 clusters[i] = clusters[i-1] + numCoords;
252 /* Pick first numClusters elements of objects[] as initial cluster centers */
253 for (i=0; i < numClusters; i++)
254 for (j=0; j < numCoords; j++)
255 clusters[i][j] = objects[i][j];
257 /* Initialize membership, no object belongs to any cluster yet */
258 for (i = 0; i < numObjs; i++)
259 membership[i] = -1;
261 /* newClusterSize holds information on the count of members in each cluster */
262 newClusterSize = (int*)VPThread__malloc(numClusters * sizeof(int), VProc);
263 assert(newClusterSize != NULL);
265 /* newClusters holds the coordinates of the freshly created clusters */
266 newClusters = create_array_2d_f(numClusters, numCoords, VProc);
267 local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc);
269 /* local_newClusters is a 3D array */
270 local_newClusters = (double***)VPThread__malloc(nthreads * sizeof(double**), VProc);
271 assert(local_newClusters != NULL);
272 local_newClusters[0] = (double**) VPThread__malloc(nthreads * numClusters * sizeof(double*), VProc);
273 assert(local_newClusters[0] != NULL);
275 /* Set up the pointers */
276 for (i = 1; i < nthreads; i++)
277 local_newClusters[i] = local_newClusters[i-1] + numClusters;
279 for (i = 0; i < nthreads; i++) {
280 for (j = 0; j < numClusters; j++) {
281 local_newClusters[i][j] = (double*)VPThread__malloc(numCoords * sizeof(double), VProc);
282 assert(local_newClusters[i][j] != NULL);
283 }
284 }
285 /* Perform thread setup */
286 thread = (VirtProcr**)VPThread__malloc(nthreads * sizeof(VirtProcr*), VProc);
288 barrier_init(&barr, nthreads, VProc);
289 lock1 = VPThread__make_mutex(VProc);
290 finished=0;
292 struct input *ip = VPThread__malloc(nthreads * sizeof(struct input), VProc);
293 /* Provide thread-safe memory locations for each worker */
294 for(i = 0; i < nthreads; i++){
295 ip[i].t = i;
296 ip[i].objects=objects;
297 ip[i].clusters=clusters;
298 ip[i].membership=membership;
299 ip[i].local_newClusterSize=local_newClusterSize;
300 ip[i].local_newClusters=local_newClusters;
301 ip[i].numObjs=numObjs;
302 ip[i].numClusters=numClusters;
303 ip[i].numCoords=numCoords;
305 if (i>0){
306 thread[i] = VPThread__create_thread(tfwork, (void*)&ip[i], VProc);
307 }
308 }
310 /* === COMPUTATIONAL PHASE === */
312 do {
313 delta = 0.0;
314 barrier_wait(&barr, VProc);
315 work(&ip[0], VProc);
317 barrier_wait(&barr, VProc);
318 /* Let the main thread perform the array reduction */
319 for (i = 0; i < numClusters; i++) {
320 for (j = 0; j < nthreads; j++) {
321 newClusterSize[i] += local_newClusterSize[j][i];
322 local_newClusterSize[j][i] = 0.0;
323 for (k = 0; k < numCoords; k++) {
324 newClusters[i][k] += local_newClusters[j][i][k];
325 local_newClusters[j][i][k] = 0.0;
326 }
327 }
328 }
330 /* Average the sum and replace old cluster centers with newClusters */
331 for (i = 0; i < numClusters; i++) {
332 for (j = 0; j < numCoords; j++) {
333 if (newClusterSize[i] > 1)
334 clusters[i][j] = newClusters[i][j] / newClusterSize[i];
335 newClusters[i][j] = 0.0; /* set back to 0 */
336 }
337 newClusterSize[i] = 0; /* set back to 0 */
338 }
340 delta /= numObjs;
341 } while (loop++ < PREC && delta > threshold);
343 // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
344 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed
345 // iterations, therefore making benchmarking completely unreliable.
347 finished=1;
348 barrier_wait(&barr, VProc);
351 VPThread__free(ip, VProc);
352 VPThread__free(thread, VProc);
354 VPThread__free(local_newClusterSize[0], VProc);
355 VPThread__free(local_newClusterSize, VProc);
357 for (i = 0; i < nthreads; i++)
358 for (j = 0; j < numClusters; j++)
359 VPThread__free(local_newClusters[i][j], VProc);
360 VPThread__free(local_newClusters[0], VProc);
361 VPThread__free(local_newClusters, VProc);
363 VPThread__free(newClusters[0], VProc);
364 VPThread__free(newClusters, VProc);
365 VPThread__free(newClusterSize, VProc);
367 (cluster_data)->clusters = clusters;
369 VPThread__dissipate_thread(VProc);
370 }