craftbeerpi4-pione/venv/lib/python3.8/site-packages/astroid/brain/brain_multiprocessing.py

107 lines
3 KiB
Python

# Copyright (c) 2016, 2018 Claudiu Popa <pcmanticore@gmail.com>
# Copyright (c) 2019 Hugo van Kemenade <hugovk@users.noreply.github.com>
# Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
# For details: https://github.com/PyCQA/astroid/blob/master/COPYING.LESSER
import sys
import astroid
from astroid import exceptions
def _multiprocessing_transform():
module = astroid.parse(
"""
from multiprocessing.managers import SyncManager
def Manager():
return SyncManager()
"""
)
# Multiprocessing uses a getattr lookup inside contexts,
# in order to get the attributes they need. Since it's extremely
# dynamic, we use this approach to fake it.
node = astroid.parse(
"""
from multiprocessing.context import DefaultContext, BaseContext
default = DefaultContext()
base = BaseContext()
"""
)
try:
context = next(node["default"].infer())
base = next(node["base"].infer())
except exceptions.InferenceError:
return module
for node in (context, base):
for key, value in node.locals.items():
if key.startswith("_"):
continue
value = value[0]
if isinstance(value, astroid.FunctionDef):
# We need to rebound this, since otherwise
# it will have an extra argument (self).
value = astroid.BoundMethod(value, node)
module[key] = value
return module
def _multiprocessing_managers_transform():
return astroid.parse(
"""
import array
import threading
import multiprocessing.pool as pool
import six
class Namespace(object):
pass
class Value(object):
def __init__(self, typecode, value, lock=True):
self._typecode = typecode
self._value = value
def get(self):
return self._value
def set(self, value):
self._value = value
def __repr__(self):
return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
value = property(get, set)
def Array(typecode, sequence, lock=True):
return array.array(typecode, sequence)
class SyncManager(object):
Queue = JoinableQueue = six.moves.queue.Queue
Event = threading.Event
RLock = threading.RLock
BoundedSemaphore = threading.BoundedSemaphore
Condition = threading.Condition
Barrier = threading.Barrier
Pool = pool.Pool
list = list
dict = dict
Value = Value
Array = Array
Namespace = Namespace
__enter__ = lambda self: self
__exit__ = lambda *args: args
def start(self, initializer=None, initargs=None):
pass
def shutdown(self):
pass
"""
)
astroid.register_module_extender(
astroid.MANAGER, "multiprocessing.managers", _multiprocessing_managers_transform
)
astroid.register_module_extender(
astroid.MANAGER, "multiprocessing", _multiprocessing_transform
)