Python-2.7.3/Modules/_multiprocessing/semaphore.c

No issues found

  1 /*
  2  * A type which wraps a semaphore
  3  *
  4  * semaphore.c
  5  *
  6  * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
  7  */
  8 
  9 #include "multiprocessing.h"
 10 
 11 enum { RECURSIVE_MUTEX, SEMAPHORE };
 12 
 13 typedef struct {
 14     PyObject_HEAD
 15     SEM_HANDLE handle;
 16     long last_tid;
 17     int count;
 18     int maxvalue;
 19     int kind;
 20 } SemLockObject;
 21 
 22 #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
 23 
 24 
 25 #ifdef MS_WINDOWS
 26 
 27 /*
 28  * Windows definitions
 29  */
 30 
 31 #define SEM_FAILED NULL
 32 
 33 #define SEM_CLEAR_ERROR() SetLastError(0)
 34 #define SEM_GET_LAST_ERROR() GetLastError()
 35 #define SEM_CREATE(name, val, max) CreateSemaphore(NULL, val, max, NULL)
 36 #define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)
 37 #define SEM_GETVALUE(sem, pval) _GetSemaphoreValue(sem, pval)
 38 #define SEM_UNLINK(name) 0
 39 
 40 static int
 41 _GetSemaphoreValue(HANDLE handle, long *value)
 42 {
 43     long previous;
 44 
 45     switch (WaitForSingleObject(handle, 0)) {
 46     case WAIT_OBJECT_0:
 47         if (!ReleaseSemaphore(handle, 1, &previous))
 48             return MP_STANDARD_ERROR;
 49         *value = previous + 1;
 50         return 0;
 51     case WAIT_TIMEOUT:
 52         *value = 0;
 53         return 0;
 54     default:
 55         return MP_STANDARD_ERROR;
 56     }
 57 }
 58 
 59 static PyObject *
 60 semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
 61 {
 62     int blocking = 1;
 63     double timeout;
 64     PyObject *timeout_obj = Py_None;
 65     DWORD res, full_msecs, msecs, start, ticks;
 66 
 67     static char *kwlist[] = {"block", "timeout", NULL};
 68 
 69     if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
 70                                      &blocking, &timeout_obj))
 71         return NULL;
 72 
 73     /* calculate timeout */
 74     if (!blocking) {
 75         full_msecs = 0;
 76     } else if (timeout_obj == Py_None) {
 77         full_msecs = INFINITE;
 78     } else {
 79         timeout = PyFloat_AsDouble(timeout_obj);
 80         if (PyErr_Occurred())
 81             return NULL;
 82         timeout *= 1000.0;      /* convert to millisecs */
 83         if (timeout < 0.0) {
 84             timeout = 0.0;
 85         } else if (timeout >= 0.5 * INFINITE) { /* 25 days */
 86             PyErr_SetString(PyExc_OverflowError,
 87                             "timeout is too large");
 88             return NULL;
 89         }
 90         full_msecs = (DWORD)(timeout + 0.5);
 91     }
 92 
 93     /* check whether we already own the lock */
 94     if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
 95         ++self->count;
 96         Py_RETURN_TRUE;
 97     }
 98 
 99     /* check whether we can acquire without blocking */
100     if (WaitForSingleObject(self->handle, 0) == WAIT_OBJECT_0) {
101         self->last_tid = GetCurrentThreadId();
102         ++self->count;
103         Py_RETURN_TRUE;
104     }
105 
106     msecs = full_msecs;
107     start = GetTickCount();
108 
109     for ( ; ; ) {
110         HANDLE handles[2] = {self->handle, sigint_event};
111 
112         /* do the wait */
113         Py_BEGIN_ALLOW_THREADS
114         ResetEvent(sigint_event);
115         res = WaitForMultipleObjects(2, handles, FALSE, msecs);
116         Py_END_ALLOW_THREADS
117 
118         /* handle result */
119         if (res != WAIT_OBJECT_0 + 1)
120             break;
121 
122         /* got SIGINT so give signal handler a chance to run */
123         Sleep(1);
124 
125         /* if this is main thread let KeyboardInterrupt be raised */
126         if (PyErr_CheckSignals())
127             return NULL;
128 
129         /* recalculate timeout */
130         if (msecs != INFINITE) {
131             ticks = GetTickCount();
132             if ((DWORD)(ticks - start) >= full_msecs)
133                 Py_RETURN_FALSE;
134             msecs = full_msecs - (ticks - start);
135         }
136     }
137 
138     /* handle result */
139     switch (res) {
140     case WAIT_TIMEOUT:
141         Py_RETURN_FALSE;
142     case WAIT_OBJECT_0:
143         self->last_tid = GetCurrentThreadId();
144         ++self->count;
145         Py_RETURN_TRUE;
146     case WAIT_FAILED:
147         return PyErr_SetFromWindowsErr(0);
148     default:
149         PyErr_Format(PyExc_RuntimeError, "WaitForSingleObject() or "
150                      "WaitForMultipleObjects() gave unrecognized "
151                      "value %d", res);
152         return NULL;
153     }
154 }
155 
156 static PyObject *
157 semlock_release(SemLockObject *self, PyObject *args)
158 {
159     if (self->kind == RECURSIVE_MUTEX) {
160         if (!ISMINE(self)) {
161             PyErr_SetString(PyExc_AssertionError, "attempt to "
162                             "release recursive lock not owned "
163                             "by thread");
164             return NULL;
165         }
166         if (self->count > 1) {
167             --self->count;
168             Py_RETURN_NONE;
169         }
170         assert(self->count == 1);
171     }
172 
173     if (!ReleaseSemaphore(self->handle, 1, NULL)) {
174         if (GetLastError() == ERROR_TOO_MANY_POSTS) {
175             PyErr_SetString(PyExc_ValueError, "semaphore or lock "
176                             "released too many times");
177             return NULL;
178         } else {
179             return PyErr_SetFromWindowsErr(0);
180         }
181     }
182 
183     --self->count;
184     Py_RETURN_NONE;
185 }
186 
187 #else /* !MS_WINDOWS */
188 
189 /*
190  * Unix definitions
191  */
192 
193 #define SEM_CLEAR_ERROR()
194 #define SEM_GET_LAST_ERROR() 0
195 #define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val)
196 #define SEM_CLOSE(sem) sem_close(sem)
197 #define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)
198 #define SEM_UNLINK(name) sem_unlink(name)
199 
200 #ifndef HAVE_SEM_UNLINK
201 #  define sem_unlink(name) 0
202 #endif
203 
204 #ifndef HAVE_SEM_TIMEDWAIT
205 #  define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save)
206 
207 int
208 sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save)
209 {
210     int res;
211     unsigned long delay, difference;
212     struct timeval now, tvdeadline, tvdelay;
213 
214     errno = 0;
215     tvdeadline.tv_sec = deadline->tv_sec;
216     tvdeadline.tv_usec = deadline->tv_nsec / 1000;
217 
218     for (delay = 0 ; ; delay += 1000) {
219         /* poll */
220         if (sem_trywait(sem) == 0)
221             return 0;
222         else if (errno != EAGAIN)
223             return MP_STANDARD_ERROR;
224 
225         /* get current time */
226         if (gettimeofday(&now, NULL) < 0)
227             return MP_STANDARD_ERROR;
228 
229         /* check for timeout */
230         if (tvdeadline.tv_sec < now.tv_sec ||
231             (tvdeadline.tv_sec == now.tv_sec &&
232              tvdeadline.tv_usec <= now.tv_usec)) {
233             errno = ETIMEDOUT;
234             return MP_STANDARD_ERROR;
235         }
236 
237         /* calculate how much time is left */
238         difference = (tvdeadline.tv_sec - now.tv_sec) * 1000000 +
239             (tvdeadline.tv_usec - now.tv_usec);
240 
241         /* check delay not too long -- maximum is 20 msecs */
242         if (delay > 20000)
243             delay = 20000;
244         if (delay > difference)
245             delay = difference;
246 
247         /* sleep */
248         tvdelay.tv_sec = delay / 1000000;
249         tvdelay.tv_usec = delay % 1000000;
250         if (select(0, NULL, NULL, NULL, &tvdelay) < 0)
251             return MP_STANDARD_ERROR;
252 
253         /* check for signals */
254         Py_BLOCK_THREADS
255         res = PyErr_CheckSignals();
256         Py_UNBLOCK_THREADS
257 
258         if (res) {
259             errno = EINTR;
260             return MP_EXCEPTION_HAS_BEEN_SET;
261         }
262     }
263 }
264 
265 #endif /* !HAVE_SEM_TIMEDWAIT */
266 
267 static PyObject *
268 semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
269 {
270     int blocking = 1, res;
271     double timeout;
272     PyObject *timeout_obj = Py_None;
273     struct timespec deadline = {0};
274     struct timeval now;
275     long sec, nsec;
276 
277     static char *kwlist[] = {"block", "timeout", NULL};
278 
279     if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
280                                      &blocking, &timeout_obj))
281         return NULL;
282 
283     if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
284         ++self->count;
285         Py_RETURN_TRUE;
286     }
287 
288     if (timeout_obj != Py_None) {
289         timeout = PyFloat_AsDouble(timeout_obj);
290         if (PyErr_Occurred())
291             return NULL;
292         if (timeout < 0.0)
293             timeout = 0.0;
294 
295         if (gettimeofday(&now, NULL) < 0) {
296             PyErr_SetFromErrno(PyExc_OSError);
297             return NULL;
298         }
299         sec = (long) timeout;
300         nsec = (long) (1e9 * (timeout - sec) + 0.5);
301         deadline.tv_sec = now.tv_sec + sec;
302         deadline.tv_nsec = now.tv_usec * 1000 + nsec;
303         deadline.tv_sec += (deadline.tv_nsec / 1000000000);
304         deadline.tv_nsec %= 1000000000;
305     }
306 
307     do {
308         Py_BEGIN_ALLOW_THREADS
309         if (blocking && timeout_obj == Py_None)
310             res = sem_wait(self->handle);
311         else if (!blocking)
312             res = sem_trywait(self->handle);
313         else
314             res = sem_timedwait(self->handle, &deadline);
315         Py_END_ALLOW_THREADS
316         if (res == MP_EXCEPTION_HAS_BEEN_SET)
317             break;
318     } while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
319 
320     if (res < 0) {
321         if (errno == EAGAIN || errno == ETIMEDOUT)
322             Py_RETURN_FALSE;
323         else if (errno == EINTR)
324             return NULL;
325         else
326             return PyErr_SetFromErrno(PyExc_OSError);
327     }
328 
329     ++self->count;
330     self->last_tid = PyThread_get_thread_ident();
331 
332     Py_RETURN_TRUE;
333 }
334 
335 static PyObject *
336 semlock_release(SemLockObject *self, PyObject *args)
337 {
338     if (self->kind == RECURSIVE_MUTEX) {
339         if (!ISMINE(self)) {
340             PyErr_SetString(PyExc_AssertionError, "attempt to "
341                             "release recursive lock not owned "
342                             "by thread");
343             return NULL;
344         }
345         if (self->count > 1) {
346             --self->count;
347             Py_RETURN_NONE;
348         }
349         assert(self->count == 1);
350     } else {
351 #ifdef HAVE_BROKEN_SEM_GETVALUE
352         /* We will only check properly the maxvalue == 1 case */
353         if (self->maxvalue == 1) {
354             /* make sure that already locked */
355             if (sem_trywait(self->handle) < 0) {
356                 if (errno != EAGAIN) {
357                     PyErr_SetFromErrno(PyExc_OSError);
358                     return NULL;
359                 }
360                 /* it is already locked as expected */
361             } else {
362                 /* it was not locked so undo wait and raise  */
363                 if (sem_post(self->handle) < 0) {
364                     PyErr_SetFromErrno(PyExc_OSError);
365                     return NULL;
366                 }
367                 PyErr_SetString(PyExc_ValueError, "semaphore "
368                                 "or lock released too many "
369                                 "times");
370                 return NULL;
371             }
372         }
373 #else
374         int sval;
375 
376         /* This check is not an absolute guarantee that the semaphore
377            does not rise above maxvalue. */
378         if (sem_getvalue(self->handle, &sval) < 0) {
379             return PyErr_SetFromErrno(PyExc_OSError);
380         } else if (sval >= self->maxvalue) {
381             PyErr_SetString(PyExc_ValueError, "semaphore or lock "
382                             "released too many times");
383             return NULL;
384         }
385 #endif
386     }
387 
388     if (sem_post(self->handle) < 0)
389         return PyErr_SetFromErrno(PyExc_OSError);
390 
391     --self->count;
392     Py_RETURN_NONE;
393 }
394 
395 #endif /* !MS_WINDOWS */
396 
397 /*
398  * All platforms
399  */
400 
401 static PyObject *
402 newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
403 {
404     SemLockObject *self;
405 
406     self = PyObject_New(SemLockObject, type);
407     if (!self)
408         return NULL;
409     self->handle = handle;
410     self->kind = kind;
411     self->count = 0;
412     self->last_tid = 0;
413     self->maxvalue = maxvalue;
414     return (PyObject*)self;
415 }
416 
417 static PyObject *
418 semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
419 {
420     char buffer[256];
421     SEM_HANDLE handle = SEM_FAILED;
422     int kind, maxvalue, value;
423     PyObject *result;
424     static char *kwlist[] = {"kind", "value", "maxvalue", NULL};
425     static int counter = 0;
426 
427     if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii", kwlist,
428                                      &kind, &value, &maxvalue))
429         return NULL;
430 
431     if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) {
432         PyErr_SetString(PyExc_ValueError, "unrecognized kind");
433         return NULL;
434     }
435 
436     PyOS_snprintf(buffer, sizeof(buffer), "/mp%ld-%d", (long)getpid(), counter++);
437 
438     SEM_CLEAR_ERROR();
439     handle = SEM_CREATE(buffer, value, maxvalue);
440     /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */
441     if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0)
442         goto failure;
443 
444     if (SEM_UNLINK(buffer) < 0)
445         goto failure;
446 
447     result = newsemlockobject(type, handle, kind, maxvalue);
448     if (!result)
449         goto failure;
450 
451     return result;
452 
453   failure:
454     if (handle != SEM_FAILED)
455         SEM_CLOSE(handle);
456     mp_SetError(NULL, MP_STANDARD_ERROR);
457     return NULL;
458 }
459 
460 static PyObject *
461 semlock_rebuild(PyTypeObject *type, PyObject *args)
462 {
463     SEM_HANDLE handle;
464     int kind, maxvalue;
465 
466     if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii",
467                           &handle, &kind, &maxvalue))
468         return NULL;
469 
470     return newsemlockobject(type, handle, kind, maxvalue);
471 }
472 
473 static void
474 semlock_dealloc(SemLockObject* self)
475 {
476     if (self->handle != SEM_FAILED)
477         SEM_CLOSE(self->handle);
478     PyObject_Del(self);
479 }
480 
481 static PyObject *
482 semlock_count(SemLockObject *self)
483 {
484     return PyInt_FromLong((long)self->count);
485 }
486 
487 static PyObject *
488 semlock_ismine(SemLockObject *self)
489 {
490     /* only makes sense for a lock */
491     return PyBool_FromLong(ISMINE(self));
492 }
493 
494 static PyObject *
495 semlock_getvalue(SemLockObject *self)
496 {
497 #ifdef HAVE_BROKEN_SEM_GETVALUE
498     PyErr_SetNone(PyExc_NotImplementedError);
499     return NULL;
500 #else
501     int sval;
502     if (SEM_GETVALUE(self->handle, &sval) < 0)
503         return mp_SetError(NULL, MP_STANDARD_ERROR);
504     /* some posix implementations use negative numbers to indicate
505        the number of waiting threads */
506     if (sval < 0)
507         sval = 0;
508     return PyInt_FromLong((long)sval);
509 #endif
510 }
511 
512 static PyObject *
513 semlock_iszero(SemLockObject *self)
514 {
515 #ifdef HAVE_BROKEN_SEM_GETVALUE
516     if (sem_trywait(self->handle) < 0) {
517         if (errno == EAGAIN)
518             Py_RETURN_TRUE;
519         return mp_SetError(NULL, MP_STANDARD_ERROR);
520     } else {
521         if (sem_post(self->handle) < 0)
522             return mp_SetError(NULL, MP_STANDARD_ERROR);
523         Py_RETURN_FALSE;
524     }
525 #else
526     int sval;
527     if (SEM_GETVALUE(self->handle, &sval) < 0)
528         return mp_SetError(NULL, MP_STANDARD_ERROR);
529     return PyBool_FromLong((long)sval == 0);
530 #endif
531 }
532 
533 static PyObject *
534 semlock_afterfork(SemLockObject *self)
535 {
536     self->count = 0;
537     Py_RETURN_NONE;
538 }
539 
540 /*
541  * Semaphore methods
542  */
543 
544 static PyMethodDef semlock_methods[] = {
545     {"acquire", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
546      "acquire the semaphore/lock"},
547     {"release", (PyCFunction)semlock_release, METH_NOARGS,
548      "release the semaphore/lock"},
549     {"__enter__", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
550      "enter the semaphore/lock"},
551     {"__exit__", (PyCFunction)semlock_release, METH_VARARGS,
552      "exit the semaphore/lock"},
553     {"_count", (PyCFunction)semlock_count, METH_NOARGS,
554      "num of `acquire()`s minus num of `release()`s for this process"},
555     {"_is_mine", (PyCFunction)semlock_ismine, METH_NOARGS,
556      "whether the lock is owned by this thread"},
557     {"_get_value", (PyCFunction)semlock_getvalue, METH_NOARGS,
558      "get the value of the semaphore"},
559     {"_is_zero", (PyCFunction)semlock_iszero, METH_NOARGS,
560      "returns whether semaphore has value zero"},
561     {"_rebuild", (PyCFunction)semlock_rebuild, METH_VARARGS | METH_CLASS,
562      ""},
563     {"_after_fork", (PyCFunction)semlock_afterfork, METH_NOARGS,
564      "rezero the net acquisition count after fork()"},
565     {NULL}
566 };
567 
568 /*
569  * Member table
570  */
571 
572 static PyMemberDef semlock_members[] = {
573     {"handle", T_SEM_HANDLE, offsetof(SemLockObject, handle), READONLY,
574      ""},
575     {"kind", T_INT, offsetof(SemLockObject, kind), READONLY,
576      ""},
577     {"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY,
578      ""},
579     {NULL}
580 };
581 
582 /*
583  * Semaphore type
584  */
585 
586 PyTypeObject SemLockType = {
587     PyVarObject_HEAD_INIT(NULL, 0)
588     /* tp_name           */ "_multiprocessing.SemLock",
589     /* tp_basicsize      */ sizeof(SemLockObject),
590     /* tp_itemsize       */ 0,
591     /* tp_dealloc        */ (destructor)semlock_dealloc,
592     /* tp_print          */ 0,
593     /* tp_getattr        */ 0,
594     /* tp_setattr        */ 0,
595     /* tp_compare        */ 0,
596     /* tp_repr           */ 0,
597     /* tp_as_number      */ 0,
598     /* tp_as_sequence    */ 0,
599     /* tp_as_mapping     */ 0,
600     /* tp_hash           */ 0,
601     /* tp_call           */ 0,
602     /* tp_str            */ 0,
603     /* tp_getattro       */ 0,
604     /* tp_setattro       */ 0,
605     /* tp_as_buffer      */ 0,
606     /* tp_flags          */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
607     /* tp_doc            */ "Semaphore/Mutex type",
608     /* tp_traverse       */ 0,
609     /* tp_clear          */ 0,
610     /* tp_richcompare    */ 0,
611     /* tp_weaklistoffset */ 0,
612     /* tp_iter           */ 0,
613     /* tp_iternext       */ 0,
614     /* tp_methods        */ semlock_methods,
615     /* tp_members        */ semlock_members,
616     /* tp_getset         */ 0,
617     /* tp_base           */ 0,
618     /* tp_dict           */ 0,
619     /* tp_descr_get      */ 0,
620     /* tp_descr_set      */ 0,
621     /* tp_dictoffset     */ 0,
622     /* tp_init           */ 0,
623     /* tp_alloc          */ 0,
624     /* tp_new            */ semlock_new,
625 };