ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/gclib/gclib/GYarn.cpp
Revision: 107
Committed: Tue Oct 11 12:31:50 2011 UTC (8 years ago) by gpertea
File size: 10703 byte(s)
Log Message:
wip GYarn (threads)

Line File contents
1 /* cpp-ification of yarn.c -- generic thread operations implemented using pthread functions
2 * Copyright (C) 2008 Mark Adler
3 * Version 1.1 26 Oct 2008 Mark Adler
4 * For conditions of distribution and use, see copyright notice in yarn.h
5 */
6
7 /* Basic thread operations implemented using the POSIX pthread library. All
8 pthread references are isolated within this module to allow alternate
9 implementations with other thread libraries. See yarn.h for the description
10 of these operations. */
11
12 /* Version history:
13 1.0 19 Oct 2008 First version
14 1.1 26 Oct 2008 No need to set the stack size -- remove
15 Add yarn_abort() function for clean-up on error exit
16 */
17
18 /* for thread portability */
19 #define _POSIX_PTHREAD_SEMANTICS
20 #define _REENTRANT
21
22 /* external libraries and entities referenced */
23 #include <stdio.h> /* fprintf(), stderr */
24 #include <stdlib.h> /* exit(), malloc(), free(), NULL */
25 #include <pthread.h> /* pthread_t, pthread_create(), pthread_join(), */
26 /* pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(),
27 PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(),
28 pthread_self(), pthread_equal(),
29 pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(),
30 pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(),
31 pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(),
32 pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() */
33 #include <errno.h> /* ENOMEM, EAGAIN, EINVAL */
34
35 /* interface definition */
36 #include "GYarn.h"
37
38 /* constants */
39 #define local static /* for non-exported functions and globals */
40
41 /* error handling external globals, resettable by application */
42 char *yarn_prefix = "yarn";
43 void (*yarn_abort)(int) = NULL;
44
45
46 /* immediately exit -- use for errors that shouldn't ever happen */
47 local void fail(int err)
48 {
49 fprintf(stderr, "%s: %s (%d) -- aborting\n", yarn_prefix,
50 err == ENOMEM ? "out of memory" : "internal pthread error", err);
51 if (yarn_abort != NULL)
52 yarn_abort(err);
53 exit(err == ENOMEM || err == EAGAIN ? err : EINVAL);
54 }
55
56 /* memory handling routines provided by user -- if none are provided, malloc()
57 and free() are used, which are therefore assumed to be thread-safe */
58 typedef void *(*malloc_t)(size_t);
59 typedef void (*free_t)(void *);
60 local malloc_t my_malloc_f = malloc;
61 local free_t my_free = free;
62
63 /* use user-supplied allocation routines instead of malloc() and free() */
64 void yarn_mem(malloc_t lease, free_t vacate)
65 {
66 my_malloc_f = lease;
67 my_free = vacate;
68 }
69
70 /* memory allocation that cannot fail (from the point of view of the caller) */
71 local void *my_malloc(size_t size)
72 {
73 void *block;
74
75 if ((block = my_malloc_f(size)) == NULL)
76 fail(ENOMEM);
77 return block;
78 }
79
80 /* -- lock functions -- */
81
82 struct lock_s {
83 pthread_mutex_t mutex;
84 pthread_cond_t cond;
85 long value;
86 };
87
88 lock *new_lock(long initial)
89 {
90 int ret;
91 lock *bolt;
92
93 bolt = my_malloc(sizeof(struct lock_s));
94 if ((ret = pthread_mutex_init(&(bolt->mutex), NULL)) ||
95 (ret = pthread_cond_init(&(bolt->cond), NULL)))
96 fail(ret);
97 bolt->value = initial;
98 return bolt;
99 }
100
101 void possess(lock *bolt)
102 {
103 int ret;
104
105 if ((ret = pthread_mutex_lock(&(bolt->mutex))) != 0)
106 fail(ret);
107 }
108
109 void release(lock *bolt)
110 {
111 int ret;
112
113 if ((ret = pthread_mutex_unlock(&(bolt->mutex))) != 0)
114 fail(ret);
115 }
116
117 void twist(lock *bolt, enum twist_op op, long val)
118 {
119 int ret;
120
121 if (op == TO)
122 bolt->value = val;
123 else if (op == BY)
124 bolt->value += val;
125 if ((ret = pthread_cond_broadcast(&(bolt->cond))) ||
126 (ret = pthread_mutex_unlock(&(bolt->mutex))))
127 fail(ret);
128 }
129
130 #define until(a) while(!(a))
131
132 void wait_for(lock *bolt, enum wait_op op, long val)
133 {
134 int ret;
135
136 switch (op) {
137 case TO_BE:
138 until (bolt->value == val)
139 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
140 fail(ret);
141 break;
142 case NOT_TO_BE:
143 until (bolt->value != val)
144 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
145 fail(ret);
146 break;
147 case TO_BE_MORE_THAN:
148 until (bolt->value > val)
149 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
150 fail(ret);
151 break;
152 case TO_BE_LESS_THAN:
153 until (bolt->value < val)
154 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
155 fail(ret);
156 }
157 }
158
159 long peek_lock(lock *bolt)
160 {
161 return bolt->value;
162 }
163
164 void free_lock(lock *bolt)
165 {
166 int ret;
167 if ((ret = pthread_cond_destroy(&(bolt->cond))) ||
168 (ret = pthread_mutex_destroy(&(bolt->mutex))))
169 fail(ret);
170 my_free(bolt);
171 }
172
173 /* -- thread functions (uses lock functions above) -- */
174
175 struct thread_s {
176 pthread_t id;
177 int done; /* true if this thread has exited */
178 thread *next; /* for list of all launched threads */
179 };
180
181 /* list of threads launched but not joined, count of threads exited but not
182 joined (incremented by ignition() just before exiting) */
183 local lock threads_lock = {
184 PTHREAD_MUTEX_INITIALIZER,
185 PTHREAD_COND_INITIALIZER,
186 0 /* number of threads exited but not joined */
187 };
188 local thread *threads = NULL; /* list of extant threads */
189
190 /* structure in which to pass the probe and its payload to ignition() */
191 struct capsule {
192 void (*probe)(void *);
193 void *payload;
194 };
195
196 /* mark the calling thread as done and alert join_all() */
197 local void reenter(void *dummy)
198 {
199 thread *match, **prior;
200 pthread_t me;
201
202 /* find this thread in the threads list by matching the thread id */
203 me = pthread_self();
204 possess(&(threads_lock));
205 prior = &(threads);
206 while ((match = *prior) != NULL) {
207 if (pthread_equal(match->id, me))
208 break;
209 prior = &(match->next);
210 }
211 if (match == NULL)
212 fail(EINVAL);
213
214 /* mark this thread as done and move it to the head of the list */
215 match->done = 1;
216 if (threads != match) {
217 *prior = match->next;
218 match->next = threads;
219 threads = match;
220 }
221
222 /* update the count of threads to be joined and alert join_all() */
223 twist(&(threads_lock), BY, +1);
224 }
225
226 /* all threads go through this routine so that just before the thread exits,
227 it marks itself as done in the threads list and alerts join_all() so that
228 the thread resources can be released -- use cleanup stack so that the
229 marking occurs even if the thread is cancelled */
230 local void *ignition(void *arg)
231 {
232 struct capsule *capsule = arg;
233
234 /* run reenter() before leaving */
235 pthread_cleanup_push(reenter, NULL);
236
237 /* execute the requested function with argument */
238 capsule->probe(capsule->payload);
239 my_free(capsule);
240
241 /* mark this thread as done and let join_all() know */
242 pthread_cleanup_pop(1);
243
244 /* exit thread */
245 return NULL;
246 }
247
248 /* not all POSIX implementations create threads as joinable by default, so that
249 is made explicit here */
250 thread *launch(void (*probe)(void *), void *payload)
251 {
252 int ret;
253 thread *th;
254 struct capsule *capsule;
255 pthread_attr_t attr;
256
257 /* construct the requested call and argument for the ignition() routine
258 (allocated instead of automatic so that we're sure this will still be
259 there when ignition() actually starts up -- ignition() will free this
260 allocation) */
261 capsule = my_malloc(sizeof(struct capsule));
262 capsule->probe = probe;
263 capsule->payload = payload;
264
265 /* assure this thread is in the list before join_all() or ignition() looks
266 for it */
267 possess(&(threads_lock));
268
269 /* create the thread and call ignition() from that thread */
270 th = my_malloc(sizeof(struct thread_s));
271 if ((ret = pthread_attr_init(&attr)) ||
272 (ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE)) ||
273 (ret = pthread_create(&(th->id), &attr, ignition, capsule)) ||
274 (ret = pthread_attr_destroy(&attr)))
275 fail(ret);
276
277 /* put the thread in the threads list for join_all() */
278 th->done = 0;
279 th->next = threads;
280 threads = th;
281 release(&(threads_lock));
282 return th;
283 }
284
285 void join(thread *ally)
286 {
287 int ret;
288 thread *match, **prior;
289
290 /* wait for thread to exit and return its resources */
291 if ((ret = pthread_join(ally->id, NULL)) != 0)
292 fail(ret);
293
294 /* find the thread in the threads list */
295 possess(&(threads_lock));
296 prior = &(threads);
297 while ((match = *prior) != NULL) {
298 if (match == ally)
299 break;
300 prior = &(match->next);
301 }
302 if (match == NULL)
303 fail(EINVAL);
304
305 /* remove thread from list and update exited count, free thread */
306 if (match->done)
307 threads_lock.value--;
308 *prior = match->next;
309 release(&(threads_lock));
310 my_free(ally);
311 }
312
313 /* This implementation of join_all() only attempts to join threads that have
314 announced that they have exited (see ignition()). When there are many
315 threads, this is faster than waiting for some random thread to exit while a
316 bunch of other threads have already exited. */
317 int join_all(void)
318 {
319 int ret, count;
320 thread *match, **prior;
321
322 /* grab the threads list and initialize the joined count */
323 count = 0;
324 possess(&(threads_lock));
325
326 /* do until threads list is empty */
327 while (threads != NULL) {
328 /* wait until at least one thread has reentered */
329 wait_for(&(threads_lock), NOT_TO_BE, 0);
330
331 /* find the first thread marked done (should be at or near the top) */
332 prior = &(threads);
333 while ((match = *prior) != NULL) {
334 if (match->done)
335 break;
336 prior = &(match->next);
337 }
338 if (match == NULL)
339 fail(EINVAL);
340
341 /* join the thread (will be almost immediate), remove from the threads
342 list, update the reenter count, and free the thread */
343 if ((ret = pthread_join(match->id, NULL)) != 0)
344 fail(ret);
345 threads_lock.value--;
346 *prior = match->next;
347 my_free(match);
348 count++;
349 }
350
351 /* let go of the threads list and return the number of threads joined */
352 release(&(threads_lock));
353 return count;
354 }
355
356 /* cancel and join the thread -- the thread will cancel when it gets to a file
357 operation, a sleep or pause, or a condition wait */
358 void destruct(thread *off_course)
359 {
360 int ret;
361
362 if ((ret = pthread_cancel(off_course->id)) != 0)
363 fail(ret);
364 join(off_course);
365 }