| File: | np/ufunc/workqueue.c | 
| Warning: | line 587, column 28 PyObject ownership leak with reference count of 1  | 
Press '?' to see keyboard shortcuts
Keyboard shortcuts:
| 1 | /* | |||
| 2 | Implement parallel vectorize workqueue. | |||
| 3 | ||||
| 4 | This keeps a set of worker threads running all the time. | |||
| 5 | They wait and spin on a task queue for jobs. | |||
| 6 | ||||
| 7 | **WARNING** | |||
| 8 | This module is not thread-safe. Adding task to queue is not protected from | |||
| 9 | race conditions. | |||
| 10 | */ | |||
| 11 | #include "../../_pymodule.h" | |||
| 12 | #ifdef _POSIX_C_SOURCE | |||
| 13 | #undef _POSIX_C_SOURCE | |||
| 14 | #endif | |||
| 15 | #ifdef _XOPEN_SOURCE | |||
| 16 | #undef _XOPEN_SOURCE | |||
| 17 | #endif | |||
| 18 | ||||
| 19 | #ifdef _MSC_VER | |||
| 20 | /* Windows */ | |||
| 21 | #include <windows.h> | |||
| 22 | #include <process.h> | |||
| 23 | #include <malloc.h> | |||
| 24 | #include <signal.h> | |||
| 25 | #define NUMBA_WINTHREAD | |||
| 26 | #else | |||
| 27 | /* PThread */ | |||
| 28 | #include <pthread.h> | |||
| 29 | #include <unistd.h> | |||
| 30 | ||||
| 31 | #if defined(__FreeBSD__) | |||
| 32 | #include <stdlib.h> | |||
| 33 | #else | |||
| 34 | #include <alloca.h> | |||
| 35 | #endif | |||
| 36 | ||||
| 37 | #include <sys/types.h> | |||
| 38 | #include <unistd.h> | |||
| 39 | #include <signal.h> | |||
| 40 | #define NUMBA_PTHREAD | |||
| 41 | #endif | |||
| 42 | ||||
| 43 | #include <string.h> | |||
| 44 | #include <stddef.h> | |||
| 45 | #include <stdio.h> | |||
| 46 | #include "workqueue.h" | |||
| 47 | #include "gufunc_scheduler.h" | |||
| 48 | ||||
| 49 | #define _DEBUG0 0 | |||
| 50 | ||||
| 51 | /* workqueue is not threadsafe, so we use DSO globals to flag and update various | |||
| 52 | * states. | |||
| 53 | */ | |||
| 54 | /* This variable is the nesting level, it's incremented at the start of each | |||
| 55 | * parallel region and decremented at the end, if parallel regions are nested | |||
| 56 | * on entry the value == 1 and workqueue will abort (this in preference to just | |||
| 57 | * hanging or segfaulting). | |||
| 58 | */ | |||
| 59 | static int _nesting_level = 0; | |||
| 60 | ||||
| 61 | /* As the thread-pool isn't inherited by children, | |||
| 62 | free the task-queue, too. */ | |||
| 63 | static void reset_after_fork(void); | |||
| 64 | ||||
| 65 | /* PThread */ | |||
| 66 | #ifdef NUMBA_PTHREAD | |||
| 67 | ||||
| 68 | typedef struct | |||
| 69 | { | |||
| 70 | pthread_cond_t cond; | |||
| 71 | pthread_mutex_t mutex; | |||
| 72 | } queue_condition_t; | |||
| 73 | ||||
| 74 | static int | |||
| 75 | queue_condition_init(queue_condition_t *qc) | |||
| 76 | { | |||
| 77 | int r; | |||
| 78 | if ((r = pthread_cond_init(&qc->cond, NULL((void*)0)))) | |||
| 79 | return r; | |||
| 80 | if ((r = pthread_mutex_init(&qc->mutex, NULL((void*)0)))) | |||
| 81 | return r; | |||
| 82 | return 0; | |||
| 83 | } | |||
| 84 | ||||
| 85 | static void | |||
| 86 | queue_condition_lock(queue_condition_t *qc) | |||
| 87 | { | |||
| 88 | /* XXX errors? */ | |||
| 89 | pthread_mutex_lock(&qc->mutex); | |||
| 90 | } | |||
| 91 | ||||
| 92 | static void | |||
| 93 | queue_condition_unlock(queue_condition_t *qc) | |||
| 94 | { | |||
| 95 | /* XXX errors? */ | |||
| 96 | pthread_mutex_unlock(&qc->mutex); | |||
| 97 | } | |||
| 98 | ||||
| 99 | static void | |||
| 100 | queue_condition_signal(queue_condition_t *qc) | |||
| 101 | { | |||
| 102 | /* XXX errors? */ | |||
| 103 | pthread_cond_signal(&qc->cond); | |||
| 104 | } | |||
| 105 | ||||
| 106 | static void | |||
| 107 | queue_condition_wait(queue_condition_t *qc) | |||
| 108 | { | |||
| 109 | /* XXX errors? */ | |||
| 110 | pthread_cond_wait(&qc->cond, &qc->mutex); | |||
| 111 | } | |||
| 112 | ||||
| 113 | static thread_pointer | |||
| 114 | numba_new_thread(void *worker, void *arg) | |||
| 115 | { | |||
| 116 | int status; | |||
| 117 | pthread_attr_t attr; | |||
| 118 | pthread_t th; | |||
| 119 | ||||
| 120 | pthread_atfork(0, 0, reset_after_fork); | |||
| 121 | ||||
| 122 | /* Create detached threads */ | |||
| 123 | pthread_attr_init(&attr); | |||
| 124 | pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHEDPTHREAD_CREATE_DETACHED); | |||
| 125 | ||||
| 126 | status = pthread_create(&th, &attr, worker, arg); | |||
| 127 | ||||
| 128 | if (status != 0) | |||
| 129 | { | |||
| 130 | return NULL((void*)0); | |||
| 131 | } | |||
| 132 | ||||
| 133 | pthread_attr_destroy(&attr); | |||
| 134 | return (thread_pointer)th; | |||
| 135 | } | |||
| 136 | ||||
| 137 | static int | |||
| 138 | get_thread_id(void) | |||
| 139 | { | |||
| 140 | return (int)pthread_self(); | |||
| 141 | } | |||
| 142 | ||||
| 143 | #endif | |||
| 144 | ||||
| 145 | /* Win Thread */ | |||
| 146 | #ifdef NUMBA_WINTHREAD | |||
| 147 | ||||
| 148 | typedef struct | |||
| 149 | { | |||
| 150 | CONDITION_VARIABLE cv; | |||
| 151 | CRITICAL_SECTION cs; | |||
| 152 | } queue_condition_t; | |||
| 153 | ||||
| 154 | static int | |||
| 155 | queue_condition_init(queue_condition_t *qc) | |||
| 156 | { | |||
| 157 | InitializeConditionVariable(&qc->cv); | |||
| 158 | InitializeCriticalSection(&qc->cs); | |||
| 159 | return 0; | |||
| 160 | } | |||
| 161 | ||||
| 162 | static void | |||
| 163 | queue_condition_lock(queue_condition_t *qc) | |||
| 164 | { | |||
| 165 | EnterCriticalSection(&qc->cs); | |||
| 166 | } | |||
| 167 | ||||
| 168 | static void | |||
| 169 | queue_condition_unlock(queue_condition_t *qc) | |||
| 170 | { | |||
| 171 | LeaveCriticalSection(&qc->cs); | |||
| 172 | } | |||
| 173 | ||||
| 174 | static void | |||
| 175 | queue_condition_signal(queue_condition_t *qc) | |||
| 176 | { | |||
| 177 | WakeConditionVariable(&qc->cv); | |||
| 178 | } | |||
| 179 | ||||
| 180 | static void | |||
| 181 | queue_condition_wait(queue_condition_t *qc) | |||
| 182 | { | |||
| 183 | SleepConditionVariableCS(&qc->cv, &qc->cs, INFINITE); | |||
| 184 | } | |||
| 185 | ||||
| 186 | /* Adapted from Python/thread_nt.h */ | |||
| 187 | typedef struct | |||
| 188 | { | |||
| 189 | void (*func)(void*); | |||
| 190 | void *arg; | |||
| 191 | } callobj; | |||
| 192 | ||||
| 193 | static unsigned __stdcall | |||
| 194 | bootstrap(void *call) | |||
| 195 | { | |||
| 196 | callobj *obj = (callobj*)call; | |||
| 197 | void (*func)(void*) = obj->func; | |||
| 198 | void *arg = obj->arg; | |||
| 199 | HeapFree(GetProcessHeap(), 0, obj); | |||
| 200 | func(arg); | |||
| 201 | _endthreadex(0); | |||
| 202 | return 0; | |||
| 203 | } | |||
| 204 | ||||
| 205 | static thread_pointer | |||
| 206 | numba_new_thread(void *worker, void *arg) | |||
| 207 | { | |||
| 208 | uintptr_t handle; | |||
| 209 | unsigned threadID; | |||
| 210 | callobj *obj; | |||
| 211 | ||||
| 212 | if (sizeof(handle) > sizeof(void*)) | |||
| 213 | return 0; | |||
| 214 | ||||
| 215 | obj = (callobj*)HeapAlloc(GetProcessHeap(), 0, sizeof(*obj)); | |||
| 216 | if (!obj) | |||
| 217 | return NULL((void*)0); | |||
| 218 | ||||
| 219 | obj->func = worker; | |||
| 220 | obj->arg = arg; | |||
| 221 | ||||
| 222 | handle = _beginthreadex(NULL((void*)0), 0, bootstrap, obj, 0, &threadID); | |||
| 223 | if (handle == -1) | |||
| 224 | return 0; | |||
| 225 | return (thread_pointer)handle; | |||
| 226 | } | |||
| 227 | ||||
| 228 | static int | |||
| 229 | get_thread_id(void) | |||
| 230 | { | |||
| 231 | return GetCurrentThreadId(); | |||
| 232 | } | |||
| 233 | ||||
| 234 | #endif | |||
| 235 | ||||
| 236 | typedef struct Task | |||
| 237 | { | |||
| 238 | void (*func)(void *args, void *dims, void *steps, void *data); | |||
| 239 | void *args, *dims, *steps, *data; | |||
| 240 | } Task; | |||
| 241 | ||||
| 242 | typedef struct | |||
| 243 | { | |||
| 244 | queue_condition_t cond; | |||
| 245 | int state; | |||
| 246 | Task task; | |||
| 247 | } Queue; | |||
| 248 | ||||
| 249 | ||||
| 250 | static Queue *queues = NULL((void*)0); | |||
| 251 | static int queue_count; | |||
| 252 | static int queue_pivot = 0; | |||
| 253 | static int NUM_THREADS = -1; | |||
| 254 | ||||
| 255 | static void | |||
| 256 | queue_state_wait(Queue *queue, int old, int repl) | |||
| 257 | { | |||
| 258 | queue_condition_t *cond = &queue->cond; | |||
| 259 | ||||
| 260 | queue_condition_lock(cond); | |||
| 261 | while (queue->state != old) | |||
| 262 | { | |||
| 263 | queue_condition_wait(cond); | |||
| 264 | } | |||
| 265 | queue->state = repl; | |||
| 266 | queue_condition_signal(cond); | |||
| 267 | queue_condition_unlock(cond); | |||
| 268 | } | |||
| 269 | ||||
| 270 | // break on this for debug | |||
| 271 | void debug_marker(void); | |||
| 272 | void debug_marker() {}; | |||
| 273 | ||||
| 274 | ||||
| 275 | #ifdef _MSC_VER | |||
| 276 | #define THREAD_LOCAL(ty)__thread ty __declspec(thread) ty | |||
| 277 | #else | |||
| 278 | /* Non-standard C99 extension that's understood by gcc and clang */ | |||
| 279 | #define THREAD_LOCAL(ty)__thread ty __thread ty | |||
| 280 | #endif | |||
| 281 | ||||
| 282 | // This is the number of threads that is default, it is set on initialisation of | |||
| 283 | // the threading backend via the launch_threads() call | |||
| 284 | static int _INIT_NUM_THREADS = -1; | |||
| 285 | ||||
| 286 | // This is the per-thread thread mask, each thread can carry its own mask. | |||
| 287 | static THREAD_LOCAL(int)__thread int _TLS_num_threads = 0; | |||
| 288 | ||||
| 289 | static void | |||
| 290 | set_num_threads(int count) | |||
| 291 | { | |||
| 292 | _TLS_num_threads = count; | |||
| 293 | } | |||
| 294 | ||||
| 295 | static int | |||
| 296 | get_num_threads(void) | |||
| 297 | { | |||
| 298 | // This is purely to permit the implementation to survive to the point | |||
| 299 | // where it can exit cleanly as multiple threads cannot be used with this | |||
| 300 | // backend | |||
| 301 | if (_TLS_num_threads == 0) | |||
| 302 | { | |||
| 303 | // This is a thread that did not call launch_threads() but is still a | |||
| 304 | // "main" thread, probably from e.g. threading.Thread() use, it still | |||
| 305 | // has a TLS slot which is 0 from the lack of launch_threads() call | |||
| 306 | _TLS_num_threads = _INIT_NUM_THREADS; | |||
| 307 | } | |||
| 308 | return _TLS_num_threads; | |||
| 309 | } | |||
| 310 | ||||
| 311 | ||||
| 312 | // this complies to a launchable function from `add_task` like: | |||
| 313 | // add_task(nopfn, NULL, NULL, NULL, NULL) | |||
| 314 | // useful if you want to limit the number of threads locally | |||
| 315 | // static void nopfn(void *args, void *dims, void *steps, void *data) {}; | |||
| 316 | ||||
| 317 | ||||
| 318 | // synchronize the TLS num_threads slot to value args[0] | |||
| 319 | static void sync_tls(void *args, void *dims, void *steps, void *data) { | |||
| 320 | int nthreads = *((int *)(args)); | |||
| 321 | _TLS_num_threads = nthreads; | |||
| 322 | }; | |||
| 323 | ||||
| 324 | ||||
| 325 | static void | |||
| 326 | parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *data, | |||
| 327 | size_t inner_ndim, size_t array_count, int num_threads) | |||
| 328 | { | |||
| 329 | ||||
| 330 | // args = <ir.Argument '.1' of type i8**>, | |||
| 331 | // dimensions = <ir.Argument '.2' of type i64*> | |||
| 332 | // steps = <ir.Argument '.3' of type i64*> | |||
| 333 | // data = <ir.Argument '.4' of type i8*> | |||
| 334 | ||||
| 335 | // check the nesting level, if it's already 1, abort, workqueue cannot | |||
| 336 | // handle nesting. | |||
| 337 | if (_nesting_level >= 1){ | |||
| 338 |         fprintf(stderr, "%s", "Terminating: Nested parallel kernel launch "__fprintf_chk (stderr, 2 - 1, "%s", "Terminating: Nested parallel kernel launch " "detected, the workqueue threading layer does " "not supported nested parallelism. Try the TBB " "threading layer.\n")  | |||
| 339 |                               "detected, the workqueue threading layer does "__fprintf_chk (stderr, 2 - 1, "%s", "Terminating: Nested parallel kernel launch " "detected, the workqueue threading layer does " "not supported nested parallelism. Try the TBB " "threading layer.\n")  | |||
| 340 |                               "not supported nested parallelism. Try the TBB "__fprintf_chk (stderr, 2 - 1, "%s", "Terminating: Nested parallel kernel launch " "detected, the workqueue threading layer does " "not supported nested parallelism. Try the TBB " "threading layer.\n")  | |||
| 341 |                               "threading layer.\n")__fprintf_chk (stderr, 2 - 1, "%s", "Terminating: Nested parallel kernel launch " "detected, the workqueue threading layer does " "not supported nested parallelism. Try the TBB " "threading layer.\n");  | |||
| 342 | raise(SIGABRT6); | |||
| 343 | return; | |||
| 344 | } | |||
| 345 | ||||
| 346 | // increment the nest level | |||
| 347 | _nesting_level += 1; | |||
| 348 | ||||
| 349 | size_t * count_space = NULL((void*)0); | |||
| 350 | char ** array_arg_space = NULL((void*)0); | |||
| 351 | const size_t arg_len = (inner_ndim + 1); | |||
| 352 | int i; // induction var for chunking, thread count unlikely to overflow int | |||
| 353 | size_t j, count, remain, total; | |||
| 354 | ||||
| 355 | ptrdiff_t offset; | |||
| 356 | char * base; | |||
| 357 | int old_queue_count = -1; | |||
| 358 | ||||
| 359 | size_t step; | |||
| 360 | ||||
| 361 | debug_marker(); | |||
| 362 | ||||
| 363 | total = *((size_t *)dimensions); | |||
| 364 | count = total / num_threads; | |||
| 365 | remain = total; | |||
| 366 | ||||
| 367 | if(_DEBUG0) | |||
| 368 | { | |||
| 369 | printf("inner_ndim: %zd\n",inner_ndim)__printf_chk (2 - 1, "inner_ndim: %zd\n",inner_ndim); | |||
| 370 | printf("arg_len: %zd\n", arg_len)__printf_chk (2 - 1, "arg_len: %zd\n", arg_len); | |||
| 371 | printf("total: %zd\n", total)__printf_chk (2 - 1, "total: %zd\n", total); | |||
| 372 | printf("count: %zd\n", count)__printf_chk (2 - 1, "count: %zd\n", count); | |||
| 373 | ||||
| 374 | printf("dimensions: ")__printf_chk (2 - 1, "dimensions: "); | |||
| 375 | for(j = 0; j < arg_len; j++) | |||
| 376 | { | |||
| 377 | printf("%zd, ", ((size_t *)dimensions)[j])__printf_chk (2 - 1, "%zd, ", ((size_t *)dimensions)[j]); | |||
| 378 | } | |||
| 379 | printf("\n")__printf_chk (2 - 1, "\n"); | |||
| 380 | ||||
| 381 | printf("steps: ")__printf_chk (2 - 1, "steps: "); | |||
| 382 | for(j = 0; j < array_count; j++) | |||
| 383 | { | |||
| 384 | printf("%zd, ", steps[j])__printf_chk (2 - 1, "%zd, ", steps[j]); | |||
| 385 | } | |||
| 386 | printf("\n")__printf_chk (2 - 1, "\n"); | |||
| 387 | ||||
| 388 | printf("*args: ")__printf_chk (2 - 1, "*args: "); | |||
| 389 | for(j = 0; j < array_count; j++) | |||
| 390 | { | |||
| 391 | printf("%p, ", (void *)args[j])__printf_chk (2 - 1, "%p, ", (void *)args[j]); | |||
| 392 | } | |||
| 393 | } | |||
| 394 | ||||
| 395 | // sync the thread pool TLS slots, sync all slots, we don't know which | |||
| 396 | // threads will end up running. | |||
| 397 | for (i = 0; i < NUM_THREADS; i++) | |||
| 398 | { | |||
| 399 | add_task(sync_tls, (void *)(&num_threads), NULL((void*)0), NULL((void*)0), NULL((void*)0)); | |||
| 400 | } | |||
| 401 | ready(); | |||
| 402 | synchronize(); | |||
| 403 | ||||
| 404 | // This backend isn't threadsafe so just mutate the global | |||
| 405 | old_queue_count = queue_count; | |||
| 406 | queue_count = num_threads; | |||
| 407 | ||||
| 408 | for (i = 0; i < num_threads; i++) | |||
| 409 | { | |||
| 410 | count_space = (size_t *)alloca(sizeof(size_t) * arg_len)__builtin_alloca (sizeof(size_t) * arg_len); | |||
| 411 | memcpy(count_space, dimensions, arg_len * sizeof(size_t)); | |||
| 412 | if(i == num_threads - 1) | |||
| 413 | { | |||
| 414 | // Last thread takes all leftover | |||
| 415 | count_space[0] = remain; | |||
| 416 | } | |||
| 417 | else | |||
| 418 | { | |||
| 419 | count_space[0] = count; | |||
| 420 | remain = remain - count; | |||
| 421 | } | |||
| 422 | ||||
| 423 | if(_DEBUG0) | |||
| 424 | { | |||
| 425 |             printf("\n=================== THREAD %d ===================\n", i)__printf_chk (2 - 1, "\n=================== THREAD %d ===================\n" , i);  | |||
| 426 | printf("\ncount_space: ")__printf_chk (2 - 1, "\ncount_space: "); | |||
| 427 | for(j = 0; j < arg_len; j++) | |||
| 428 | { | |||
| 429 | printf("%zd, ", count_space[j])__printf_chk (2 - 1, "%zd, ", count_space[j]); | |||
| 430 | } | |||
| 431 | printf("\n")__printf_chk (2 - 1, "\n"); | |||
| 432 | } | |||
| 433 | ||||
| 434 | array_arg_space = alloca(sizeof(char*) * array_count)__builtin_alloca (sizeof(char*) * array_count); | |||
| 435 | ||||
| 436 | for(j = 0; j < array_count; j++) | |||
| 437 | { | |||
| 438 | base = args[j]; | |||
| 439 | step = steps[j]; | |||
| 440 | offset = step * count * i; | |||
| 441 | array_arg_space[j] = (char *)(base + offset); | |||
| 442 | ||||
| 443 | if(_DEBUG0) | |||
| 444 | { | |||
| 445 | printf("Index %zd\n", j)__printf_chk (2 - 1, "Index %zd\n", j); | |||
| 446 | printf("-->Got base %p\n", (void *)base)__printf_chk (2 - 1, "-->Got base %p\n", (void *)base); | |||
| 447 | printf("-->Got step %zd\n", step)__printf_chk (2 - 1, "-->Got step %zd\n", step); | |||
| 448 | printf("-->Got offset %td\n", offset)__printf_chk (2 - 1, "-->Got offset %td\n", offset); | |||
| 449 |                 printf("-->Got addr %p\n", (void *)array_arg_space[j])__printf_chk (2 - 1, "-->Got addr %p\n", (void *)array_arg_space [j]);  | |||
| 450 | } | |||
| 451 | } | |||
| 452 | ||||
| 453 | if(_DEBUG0) | |||
| 454 | { | |||
| 455 | printf("\narray_arg_space: ")__printf_chk (2 - 1, "\narray_arg_space: "); | |||
| 456 | for(j = 0; j < array_count; j++) | |||
| 457 | { | |||
| 458 | printf("%p, ", (void *)array_arg_space[j])__printf_chk (2 - 1, "%p, ", (void *)array_arg_space[j]); | |||
| 459 | } | |||
| 460 | } | |||
| 461 | add_task(fn, (void *)array_arg_space, (void *)count_space, steps, data); | |||
| 462 | } | |||
| 463 | ||||
| 464 | ready(); | |||
| 465 | synchronize(); | |||
| 466 | ||||
| 467 | queue_count = old_queue_count; | |||
| 468 | // decrement the nest level | |||
| 469 | _nesting_level -= 1; | |||
| 470 | } | |||
| 471 | ||||
| 472 | static void | |||
| 473 | add_task(void *fn, void *args, void *dims, void *steps, void *data) | |||
| 474 | { | |||
| 475 | void (*func)(void *args, void *dims, void *steps, void *data) = fn; | |||
| 476 | ||||
| 477 | Queue *queue = &queues[queue_pivot]; | |||
| 478 | ||||
| 479 | Task *task = &queue->task; | |||
| 480 | task->func = func; | |||
| 481 | task->args = args; | |||
| 482 | task->dims = dims; | |||
| 483 | task->steps = steps; | |||
| 484 | task->data = data; | |||
| 485 | ||||
| 486 | /* Move pivot */ | |||
| 487 | if ( ++queue_pivot == queue_count ) | |||
| 488 | { | |||
| 489 | queue_pivot = 0; | |||
| 490 | } | |||
| 491 | } | |||
| 492 | ||||
| 493 | static | |||
| 494 | void thread_worker(void *arg) | |||
| 495 | { | |||
| 496 | Queue *queue = (Queue*)arg; | |||
| 497 | Task *task; | |||
| 498 | ||||
| 499 | while (1) | |||
| 500 | { | |||
| 501 | /* Wait for the queue to be in READY state (i.e. for some task | |||
| 502 | * to need running), and switch it to RUNNING. | |||
| 503 | */ | |||
| 504 | queue_state_wait(queue, READY, RUNNING); | |||
| 505 | ||||
| 506 | task = &queue->task; | |||
| 507 | task->func(task->args, task->dims, task->steps, task->data); | |||
| 508 | ||||
| 509 | /* Task is done. */ | |||
| 510 | queue_state_wait(queue, RUNNING, DONE); | |||
| 511 | } | |||
| 512 | } | |||
| 513 | ||||
| 514 | static void launch_threads(int count) | |||
| 515 | { | |||
| 516 | if (!queues) | |||
| 517 | { | |||
| 518 | /* If queues are not yet allocated, | |||
| 519 | create them, one for each thread. */ | |||
| 520 | int i; | |||
| 521 | size_t sz = sizeof(Queue) * count; | |||
| 522 | ||||
| 523 | /* set for use in parallel_for */ | |||
| 524 | NUM_THREADS = count; | |||
| 525 | queues = malloc(sz); /* this memory will leak */ | |||
| 526 | /* Note this initializes the state to IDLE */ | |||
| 527 | memset(queues, 0, sz); | |||
| 528 | queue_count = count; | |||
| 529 | ||||
| 530 | for (i = 0; i < count; ++i) | |||
| 531 | { | |||
| 532 | queue_condition_init(&queues[i].cond); | |||
| 533 | numba_new_thread(thread_worker, &queues[i]); | |||
| 534 | } | |||
| 535 | ||||
| 536 | _INIT_NUM_THREADS = count; | |||
| 537 | } | |||
| 538 | } | |||
| 539 | ||||
| 540 | static void synchronize(void) | |||
| 541 | { | |||
| 542 | int i; | |||
| 543 | for (i = 0; i < queue_count; ++i) | |||
| 544 | { | |||
| 545 | queue_state_wait(&queues[i], DONE, IDLE); | |||
| 546 | } | |||
| 547 | } | |||
| 548 | ||||
| 549 | static void ready(void) | |||
| 550 | { | |||
| 551 | int i; | |||
| 552 | for (i = 0; i < queue_count; ++i) | |||
| 553 | { | |||
| 554 | queue_state_wait(&queues[i], IDLE, READY); | |||
| 555 | } | |||
| 556 | } | |||
| 557 | ||||
| 558 | static void reset_after_fork(void) | |||
| 559 | { | |||
| 560 | free(queues); | |||
| 561 | queues = NULL((void*)0); | |||
| 562 | NUM_THREADS = -1; | |||
| 563 | _INIT_NUM_THREADS = -1; | |||
| 564 | _nesting_level = 0; | |||
| 565 | } | |||
| 566 | ||||
| 567 | MOD_INIT(workqueue)PyObject* PyInit_workqueue(void) | |||
| 568 | { | |||
| 569 | PyObject *m; | |||
| 570 |     MOD_DEF(m, "workqueue", "No docs", NULL){ static struct PyModuleDef moduledef = { { { 1, ((void*)0) } , ((void*)0), 0, ((void*)0), }, "workqueue", "No docs", -1, ( (void*)0), ((void*)0), ((void*)0), ((void*)0), ((void*)0) }; m = PyModule_Create2(&moduledef, 1013); }  | |||
| 571 | if (m == NULL((void*)0)) | |||
  | ||||
| 572 | return MOD_ERROR_VAL((void*)0); | |||
| 573 | ||||
| 574 | PyObject_SetAttrString(m, "launch_threads", | |||
| 575 | PyLong_FromVoidPtr(&launch_threads)); | |||
| 576 | PyObject_SetAttrString(m, "synchronize", | |||
| 577 | PyLong_FromVoidPtr(&synchronize)); | |||
| 578 | PyObject_SetAttrString(m, "ready", | |||
| 579 | PyLong_FromVoidPtr(&ready)); | |||
| 580 | PyObject_SetAttrString(m, "add_task", | |||
| 581 | PyLong_FromVoidPtr(&add_task)); | |||
| 582 | PyObject_SetAttrString(m, "parallel_for", | |||
| 583 | PyLong_FromVoidPtr(¶llel_for)); | |||
| 584 | PyObject_SetAttrString(m, "do_scheduling_signed", | |||
| 585 | PyLong_FromVoidPtr(&do_scheduling_signed)); | |||
| 586 | PyObject_SetAttrString(m, "do_scheduling_unsigned", | |||
| 587 | PyLong_FromVoidPtr(&do_scheduling_unsigned)); | |||
  | ||||
| 588 | PyObject_SetAttrString(m, "set_num_threads", | |||
| 589 | PyLong_FromVoidPtr((void*)&set_num_threads)); | |||
| 590 | PyObject_SetAttrString(m, "get_num_threads", | |||
| 591 | PyLong_FromVoidPtr((void*)&get_num_threads)); | |||
| 592 | PyObject_SetAttrString(m, "get_thread_id", | |||
| 593 | PyLong_FromVoidPtr((void*)&get_thread_id)); | |||
| 594 | return MOD_SUCCESS_VAL(m)m; | |||
| 595 | } | 
| 1 | #ifndef PyLong_FromVoidPtr | 
| 2 | PyObject* clang_analyzer_PyObject_New_Reference(); | 
| 3 | PyObject* PyLong_FromVoidPtr(void *p) { | 
| 4 | return clang_analyzer_PyObject_New_Reference(); | 
| 5 | } | 
| 6 | #else | 
| 7 | #warning "API PyLong_FromVoidPtr is defined as a macro." | 
| 8 | #endif |