Bug Summary

File:np/ufunc/workqueue.c
Warning:line 585, column 28
PyObject ownership leak with reference count of 1

Annotated Source Code

Press '?' to see keyboard shortcuts

clang -cc1 -cc1 -triple x86_64-unknown-linux-gnu -analyze -disable-free -disable-llvm-verifier -discard-value-names -main-file-name workqueue.c -analyzer-store=region -analyzer-opt-analyze-nested-blocks -analyzer-checker=core -analyzer-checker=apiModeling -analyzer-checker=unix -analyzer-checker=deadcode -analyzer-checker=security.insecureAPI.UncheckedReturn -analyzer-checker=security.insecureAPI.getpw -analyzer-checker=security.insecureAPI.gets -analyzer-checker=security.insecureAPI.mktemp -analyzer-checker=security.insecureAPI.mkstemp -analyzer-checker=security.insecureAPI.vfork -analyzer-checker=nullability.NullPassedToNonnull -analyzer-checker=nullability.NullReturnedFromNonnull -analyzer-output plist -w -analyzer-output=html -analyzer-checker=python -analyzer-disable-checker=deadcode -analyzer-config prune-paths=true,suppress-c++-stdlib=true,suppress-null-return-paths=false,crosscheck-with-z3=true,model-path=/opt/pyrefcon/lib/pyrefcon/models/models -analyzer-config experimental-enable-naive-ctu-analysis=true,ctu-dir=/tmp/pyrefcon/numba/csa-scan,ctu-index-name=/tmp/pyrefcon/numba/csa-scan/externalDefMap.txt,ctu-invocation-list=/tmp/pyrefcon/numba/csa-scan/invocations.yaml,display-ctu-progress=false -setup-static-analyzer -analyzer-config-compatibility-mode=true -mrelocation-model pic -pic-level 2 -fhalf-no-semantic-interposition -mframe-pointer=none -fmath-errno -fno-rounding-math -mconstructor-aliases -munwind-tables -target-cpu x86-64 -tune-cpu generic -debug-info-kind=limited -dwarf-version=4 -debugger-tuning=gdb -fcoverage-compilation-dir=/tmp/pyrefcon/numba -resource-dir /opt/pyrefcon/lib/clang/13.0.0 -isystem /opt/pyrefcon/lib/pyrefcon/models/python3.8 -D NDEBUG -D _FORTIFY_SOURCE=2 -internal-isystem /opt/pyrefcon/lib/clang/13.0.0/include -internal-isystem /usr/local/include -internal-isystem /usr/lib/gcc/x86_64-linux-gnu/10/../../../../x86_64-linux-gnu/include -internal-externc-isystem /usr/include/x86_64-linux-gnu -internal-externc-isystem /include -internal-externc-isystem /usr/include -O2 -Wno-unused-result -Wsign-compare -Wall -Wformat -Werror=format-security -Wformat -Werror=format-security -Wdate-time -fdebug-compilation-dir=/tmp/pyrefcon/numba -ferror-limit 19 -fwrapv -pthread -stack-protector 2 -fgnuc-version=4.2.1 -vectorize-loops -vectorize-slp -faddrsig -D__GCC_HAVE_DWARF2_CFI_ASM=1 -o /tmp/pyrefcon/numba/csa-scan/reports -x c numba/np/ufunc/workqueue.c

numba/np/ufunc/workqueue.c

1/*
2Implement parallel vectorize workqueue.
3
4This keeps a set of worker threads running all the time.
5They wait and spin on a task queue for jobs.
6
7**WARNING**
8This module is not thread-safe. Adding task to queue is not protected from
9race conditions.
10*/
11#include "../../_pymodule.h"
12#ifdef _POSIX_C_SOURCE
13#undef _POSIX_C_SOURCE
14#endif
15#ifdef _XOPEN_SOURCE
16#undef _XOPEN_SOURCE
17#endif
18
19#ifdef _MSC_VER
20/* Windows */
21#include <windows.h>
22#include <process.h>
23#include <malloc.h>
24#include <signal.h>
25#define NUMBA_WINTHREAD
26#else
27/* PThread */
28#include <pthread.h>
29#include <unistd.h>
30
31#if defined(__FreeBSD__)
32#include <stdlib.h>
33#else
34#include <alloca.h>
35#endif
36
37#include <sys/types.h>
38#include <unistd.h>
39#include <signal.h>
40#define NUMBA_PTHREAD
41#endif
42
43#include <string.h>
44#include <stddef.h>
45#include <stdio.h>
46#include "workqueue.h"
47#include "gufunc_scheduler.h"
48
49#define _DEBUG0 0
50
51/* workqueue is not threadsafe, so we use DSO globals to flag and update various
52 * states.
53 */
54/* This variable is the nesting level, it's incremented at the start of each
55 * parallel region and decremented at the end, if parallel regions are nested
56 * on entry the value == 1 and workqueue will abort (this in preference to just
57 * hanging or segfaulting).
58 */
59static int _nesting_level = 0;
60
61/* As the thread-pool isn't inherited by children,
62 free the task-queue, too. */
63static void reset_after_fork(void);
64
65/* PThread */
66#ifdef NUMBA_PTHREAD
67
68typedef struct
69{
70 pthread_cond_t cond;
71 pthread_mutex_t mutex;
72} queue_condition_t;
73
74static int
75queue_condition_init(queue_condition_t *qc)
76{
77 int r;
78 if ((r = pthread_cond_init(&qc->cond, NULL((void*)0))))
79 return r;
80 if ((r = pthread_mutex_init(&qc->mutex, NULL((void*)0))))
81 return r;
82 return 0;
83}
84
85static void
86queue_condition_lock(queue_condition_t *qc)
87{
88 /* XXX errors? */
89 pthread_mutex_lock(&qc->mutex);
90}
91
92static void
93queue_condition_unlock(queue_condition_t *qc)
94{
95 /* XXX errors? */
96 pthread_mutex_unlock(&qc->mutex);
97}
98
99static void
100queue_condition_signal(queue_condition_t *qc)
101{
102 /* XXX errors? */
103 pthread_cond_signal(&qc->cond);
104}
105
106static void
107queue_condition_wait(queue_condition_t *qc)
108{
109 /* XXX errors? */
110 pthread_cond_wait(&qc->cond, &qc->mutex);
111}
112
113static thread_pointer
114numba_new_thread(void *worker, void *arg)
115{
116 int status;
117 pthread_attr_t attr;
118 pthread_t th;
119
120 pthread_atfork(0, 0, reset_after_fork);
121
122 /* Create detached threads */
123 pthread_attr_init(&attr);
124 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHEDPTHREAD_CREATE_DETACHED);
125
126 status = pthread_create(&th, &attr, worker, arg);
127
128 if (status != 0)
129 {
130 return NULL((void*)0);
131 }
132
133 pthread_attr_destroy(&attr);
134 return (thread_pointer)th;
135}
136
137static int
138get_thread_id(void)
139{
140 return (int)pthread_self();
141}
142
143#endif
144
145/* Win Thread */
146#ifdef NUMBA_WINTHREAD
147
148typedef struct
149{
150 CONDITION_VARIABLE cv;
151 CRITICAL_SECTION cs;
152} queue_condition_t;
153
154static int
155queue_condition_init(queue_condition_t *qc)
156{
157 InitializeConditionVariable(&qc->cv);
158 InitializeCriticalSection(&qc->cs);
159 return 0;
160}
161
162static void
163queue_condition_lock(queue_condition_t *qc)
164{
165 EnterCriticalSection(&qc->cs);
166}
167
168static void
169queue_condition_unlock(queue_condition_t *qc)
170{
171 LeaveCriticalSection(&qc->cs);
172}
173
174static void
175queue_condition_signal(queue_condition_t *qc)
176{
177 WakeConditionVariable(&qc->cv);
178}
179
180static void
181queue_condition_wait(queue_condition_t *qc)
182{
183 SleepConditionVariableCS(&qc->cv, &qc->cs, INFINITE);
184}
185
186/* Adapted from Python/thread_nt.h */
187typedef struct
188{
189 void (*func)(void*);
190 void *arg;
191} callobj;
192
193static unsigned __stdcall
194bootstrap(void *call)
195{
196 callobj *obj = (callobj*)call;
197 void (*func)(void*) = obj->func;
198 void *arg = obj->arg;
199 HeapFree(GetProcessHeap(), 0, obj);
200 func(arg);
201 _endthreadex(0);
202 return 0;
203}
204
205static thread_pointer
206numba_new_thread(void *worker, void *arg)
207{
208 uintptr_t handle;
209 unsigned threadID;
210 callobj *obj;
211
212 if (sizeof(handle) > sizeof(void*))
213 return 0;
214
215 obj = (callobj*)HeapAlloc(GetProcessHeap(), 0, sizeof(*obj));
216 if (!obj)
217 return NULL((void*)0);
218
219 obj->func = worker;
220 obj->arg = arg;
221
222 handle = _beginthreadex(NULL((void*)0), 0, bootstrap, obj, 0, &threadID);
223 if (handle == -1)
224 return 0;
225 return (thread_pointer)handle;
226}
227
228static int
229get_thread_id(void)
230{
231 return GetCurrentThreadId();
232}
233
234#endif
235
236typedef struct Task
237{
238 void (*func)(void *args, void *dims, void *steps, void *data);
239 void *args, *dims, *steps, *data;
240} Task;
241
242typedef struct
243{
244 queue_condition_t cond;
245 int state;
246 Task task;
247} Queue;
248
249
250static Queue *queues = NULL((void*)0);
251static int queue_count;
252static int queue_pivot = 0;
253static int NUM_THREADS = -1;
254
255static void
256queue_state_wait(Queue *queue, int old, int repl)
257{
258 queue_condition_t *cond = &queue->cond;
259
260 queue_condition_lock(cond);
261 while (queue->state != old)
262 {
263 queue_condition_wait(cond);
264 }
265 queue->state = repl;
266 queue_condition_signal(cond);
267 queue_condition_unlock(cond);
268}
269
270// break on this for debug
271void debug_marker(void);
272void debug_marker() {};
273
274
275#ifdef _MSC_VER
276#define THREAD_LOCAL(ty)__thread ty __declspec(thread) ty
277#else
278/* Non-standard C99 extension that's understood by gcc and clang */
279#define THREAD_LOCAL(ty)__thread ty __thread ty
280#endif
281
282// This is the number of threads that is default, it is set on initialisation of
283// the threading backend via the launch_threads() call
284static int _INIT_NUM_THREADS = -1;
285
286// This is the per-thread thread mask, each thread can carry its own mask.
287static THREAD_LOCAL(int)__thread int _TLS_num_threads = 0;
288
289static void
290set_num_threads(int count)
291{
292 _TLS_num_threads = count;
293}
294
295static int
296get_num_threads(void)
297{
298 // This is purely to permit the implementation to survive to the point
299 // where it can exit cleanly as multiple threads cannot be used with this
300 // backend
301 if (_TLS_num_threads == 0)
302 {
303 // This is a thread that did not call launch_threads() but is still a
304 // "main" thread, probably from e.g. threading.Thread() use, it still
305 // has a TLS slot which is 0 from the lack of launch_threads() call
306 _TLS_num_threads = _INIT_NUM_THREADS;
307 }
308 return _TLS_num_threads;
309}
310
311
312// this complies to a launchable function from `add_task` like:
313// add_task(nopfn, NULL, NULL, NULL, NULL)
314// useful if you want to limit the number of threads locally
315// static void nopfn(void *args, void *dims, void *steps, void *data) {};
316
317
318// synchronize the TLS num_threads slot to value args[0]
319static void sync_tls(void *args, void *dims, void *steps, void *data) {
320 int nthreads = *((int *)(args));
321 _TLS_num_threads = nthreads;
322};
323
324
325static void
326parallel_for(void *fn, char **args, size_t *dimensions, size_t *steps, void *data,
327 size_t inner_ndim, size_t array_count, int num_threads)
328{
329
330 // args = <ir.Argument '.1' of type i8**>,
331 // dimensions = <ir.Argument '.2' of type i64*>
332 // steps = <ir.Argument '.3' of type i64*>
333 // data = <ir.Argument '.4' of type i8*>
334
335 // check the nesting level, if it's already 1, abort, workqueue cannot
336 // handle nesting.
337 if (_nesting_level >= 1){
338 fprintf(stderr, "%s", "Terminating: Nested parallel kernel launch "__fprintf_chk (stderr, 2 - 1, "%s", "Terminating: Nested parallel kernel launch "
"detected, the workqueue threading layer does " "not supported nested parallelism. Try the TBB "
"threading layer.\n")
339 "detected, the workqueue threading layer does "__fprintf_chk (stderr, 2 - 1, "%s", "Terminating: Nested parallel kernel launch "
"detected, the workqueue threading layer does " "not supported nested parallelism. Try the TBB "
"threading layer.\n")
340 "not supported nested parallelism. Try the TBB "__fprintf_chk (stderr, 2 - 1, "%s", "Terminating: Nested parallel kernel launch "
"detected, the workqueue threading layer does " "not supported nested parallelism. Try the TBB "
"threading layer.\n")
341 "threading layer.\n")__fprintf_chk (stderr, 2 - 1, "%s", "Terminating: Nested parallel kernel launch "
"detected, the workqueue threading layer does " "not supported nested parallelism. Try the TBB "
"threading layer.\n")
;
342 raise(SIGABRT6);
343 return;
344 }
345
346 // increment the nest level
347 _nesting_level += 1;
348
349 size_t * count_space = NULL((void*)0);
350 char ** array_arg_space = NULL((void*)0);
351 const size_t arg_len = (inner_ndim + 1);
352 int i; // induction var for chunking, thread count unlikely to overflow int
353 size_t j, count, remain, total;
354
355 ptrdiff_t offset;
356 char * base;
357 int old_queue_count = -1;
358
359 size_t step;
360
361 debug_marker();
362
363 total = *((size_t *)dimensions);
364 count = total / num_threads;
365 remain = total;
366
367 if(_DEBUG0)
368 {
369 printf("inner_ndim: %zd\n",inner_ndim)__printf_chk (2 - 1, "inner_ndim: %zd\n",inner_ndim);
370 printf("arg_len: %zd\n", arg_len)__printf_chk (2 - 1, "arg_len: %zd\n", arg_len);
371 printf("total: %zd\n", total)__printf_chk (2 - 1, "total: %zd\n", total);
372 printf("count: %zd\n", count)__printf_chk (2 - 1, "count: %zd\n", count);
373
374 printf("dimensions: ")__printf_chk (2 - 1, "dimensions: ");
375 for(j = 0; j < arg_len; j++)
376 {
377 printf("%zd, ", ((size_t *)dimensions)[j])__printf_chk (2 - 1, "%zd, ", ((size_t *)dimensions)[j]);
378 }
379 printf("\n")__printf_chk (2 - 1, "\n");
380
381 printf("steps: ")__printf_chk (2 - 1, "steps: ");
382 for(j = 0; j < array_count; j++)
383 {
384 printf("%zd, ", steps[j])__printf_chk (2 - 1, "%zd, ", steps[j]);
385 }
386 printf("\n")__printf_chk (2 - 1, "\n");
387
388 printf("*args: ")__printf_chk (2 - 1, "*args: ");
389 for(j = 0; j < array_count; j++)
390 {
391 printf("%p, ", (void *)args[j])__printf_chk (2 - 1, "%p, ", (void *)args[j]);
392 }
393 }
394
395 // sync the thread pool TLS slots, sync all slots, we don't know which
396 // threads will end up running.
397 for (i = 0; i < NUM_THREADS; i++)
398 {
399 add_task(sync_tls, (void *)(&num_threads), NULL((void*)0), NULL((void*)0), NULL((void*)0));
400 }
401 ready();
402 synchronize();
403
404 // This backend isn't threadsafe so just mutate the global
405 old_queue_count = queue_count;
406 queue_count = num_threads;
407
408 for (i = 0; i < num_threads; i++)
409 {
410 count_space = (size_t *)alloca(sizeof(size_t) * arg_len)__builtin_alloca (sizeof(size_t) * arg_len);
411 memcpy(count_space, dimensions, arg_len * sizeof(size_t));
412 if(i == num_threads - 1)
413 {
414 // Last thread takes all leftover
415 count_space[0] = remain;
416 }
417 else
418 {
419 count_space[0] = count;
420 remain = remain - count;
421 }
422
423 if(_DEBUG0)
424 {
425 printf("\n=================== THREAD %d ===================\n", i)__printf_chk (2 - 1, "\n=================== THREAD %d ===================\n"
, i)
;
426 printf("\ncount_space: ")__printf_chk (2 - 1, "\ncount_space: ");
427 for(j = 0; j < arg_len; j++)
428 {
429 printf("%zd, ", count_space[j])__printf_chk (2 - 1, "%zd, ", count_space[j]);
430 }
431 printf("\n")__printf_chk (2 - 1, "\n");
432 }
433
434 array_arg_space = alloca(sizeof(char*) * array_count)__builtin_alloca (sizeof(char*) * array_count);
435
436 for(j = 0; j < array_count; j++)
437 {
438 base = args[j];
439 step = steps[j];
440 offset = step * count * i;
441 array_arg_space[j] = (char *)(base + offset);
442
443 if(_DEBUG0)
444 {
445 printf("Index %zd\n", j)__printf_chk (2 - 1, "Index %zd\n", j);
446 printf("-->Got base %p\n", (void *)base)__printf_chk (2 - 1, "-->Got base %p\n", (void *)base);
447 printf("-->Got step %zd\n", step)__printf_chk (2 - 1, "-->Got step %zd\n", step);
448 printf("-->Got offset %td\n", offset)__printf_chk (2 - 1, "-->Got offset %td\n", offset);
449 printf("-->Got addr %p\n", (void *)array_arg_space[j])__printf_chk (2 - 1, "-->Got addr %p\n", (void *)array_arg_space
[j])
;
450 }
451 }
452
453 if(_DEBUG0)
454 {
455 printf("\narray_arg_space: ")__printf_chk (2 - 1, "\narray_arg_space: ");
456 for(j = 0; j < array_count; j++)
457 {
458 printf("%p, ", (void *)array_arg_space[j])__printf_chk (2 - 1, "%p, ", (void *)array_arg_space[j]);
459 }
460 }
461 add_task(fn, (void *)array_arg_space, (void *)count_space, steps, data);
462 }
463
464 ready();
465 synchronize();
466
467 queue_count = old_queue_count;
468 // decrement the nest level
469 _nesting_level -= 1;
470}
471
472static void
473add_task(void *fn, void *args, void *dims, void *steps, void *data)
474{
475 void (*func)(void *args, void *dims, void *steps, void *data) = fn;
476
477 Queue *queue = &queues[queue_pivot];
478
479 Task *task = &queue->task;
480 task->func = func;
481 task->args = args;
482 task->dims = dims;
483 task->steps = steps;
484 task->data = data;
485
486 /* Move pivot */
487 if ( ++queue_pivot == queue_count )
488 {
489 queue_pivot = 0;
490 }
491}
492
493static
494void thread_worker(void *arg)
495{
496 Queue *queue = (Queue*)arg;
497 Task *task;
498
499 while (1)
500 {
501 /* Wait for the queue to be in READY state (i.e. for some task
502 * to need running), and switch it to RUNNING.
503 */
504 queue_state_wait(queue, READY, RUNNING);
505
506 task = &queue->task;
507 task->func(task->args, task->dims, task->steps, task->data);
508
509 /* Task is done. */
510 queue_state_wait(queue, RUNNING, DONE);
511 }
512}
513
514static void launch_threads(int count)
515{
516 if (!queues)
517 {
518 /* If queues are not yet allocated,
519 create them, one for each thread. */
520 int i;
521 size_t sz = sizeof(Queue) * count;
522
523 /* set for use in parallel_for */
524 NUM_THREADS = count;
525 queues = malloc(sz); /* this memory will leak */
526 /* Note this initializes the state to IDLE */
527 memset(queues, 0, sz);
528 queue_count = count;
529
530 for (i = 0; i < count; ++i)
531 {
532 queue_condition_init(&queues[i].cond);
533 numba_new_thread(thread_worker, &queues[i]);
534 }
535
536 _INIT_NUM_THREADS = count;
537 }
538}
539
540static void synchronize(void)
541{
542 int i;
543 for (i = 0; i < queue_count; ++i)
544 {
545 queue_state_wait(&queues[i], DONE, IDLE);
546 }
547}
548
549static void ready(void)
550{
551 int i;
552 for (i = 0; i < queue_count; ++i)
553 {
554 queue_state_wait(&queues[i], IDLE, READY);
555 }
556}
557
558static void reset_after_fork(void)
559{
560 free(queues);
561 queues = NULL((void*)0);
562 NUM_THREADS = -1;
563 _INIT_NUM_THREADS = -1;
564 _nesting_level = 0;
565}
566
567MOD_INIT(workqueue)PyObject* PyInit_workqueue(void)
568{
569 PyObject *m;
570 MOD_DEF(m, "workqueue", "No docs", NULL){ static struct PyModuleDef moduledef = { { { 1, ((void*)0) }
, ((void*)0), 0, ((void*)0), }, "workqueue", "No docs", -1, (
(void*)0), ((void*)0), ((void*)0), ((void*)0), ((void*)0) }; m
= PyModule_Create2(&moduledef, 1013); }
571 if (m == NULL((void*)0))
1
Assuming 'm' is not equal to NULL
2
Taking false branch
572 return MOD_ERROR_VAL((void*)0);
573
574 PyObject_SetAttrString(m, "launch_threads",
575 PyLong_FromVoidPtr(&launch_threads));
576 PyObject_SetAttrString(m, "synchronize",
577 PyLong_FromVoidPtr(&synchronize));
578 PyObject_SetAttrString(m, "ready",
579 PyLong_FromVoidPtr(&ready));
580 PyObject_SetAttrString(m, "add_task",
581 PyLong_FromVoidPtr(&add_task));
582 PyObject_SetAttrString(m, "parallel_for",
583 PyLong_FromVoidPtr(&parallel_for));
584 PyObject_SetAttrString(m, "do_scheduling_signed",
585 PyLong_FromVoidPtr(&do_scheduling_signed));
3
Calling 'PyLong_FromVoidPtr'
5
Returning from 'PyLong_FromVoidPtr'
6
PyObject ownership leak with reference count of 1
586 PyObject_SetAttrString(m, "do_scheduling_unsigned",
587 PyLong_FromVoidPtr(&do_scheduling_unsigned));
588 PyObject_SetAttrString(m, "set_num_threads",
589 PyLong_FromVoidPtr((void*)&set_num_threads));
590 PyObject_SetAttrString(m, "get_num_threads",
591 PyLong_FromVoidPtr((void*)&get_num_threads));
592 PyObject_SetAttrString(m, "get_thread_id",
593 PyLong_FromVoidPtr((void*)&get_thread_id));
594 return MOD_SUCCESS_VAL(m)m;
595}

/opt/pyrefcon/lib/pyrefcon/models/models/PyLong_FromVoidPtr.model

1#ifndef PyLong_FromVoidPtr
2PyObject* clang_analyzer_PyObject_New_Reference();
3PyObject* PyLong_FromVoidPtr(void *p) {
4 return clang_analyzer_PyObject_New_Reference();
4
Setting reference count to 1
5}
6#else
7#warning "API PyLong_FromVoidPtr is defined as a macro."
8#endif