Source code for mpire.dashboard.utils

import getpass
import inspect
import socket
from functools import partial
from typing import Callable, Dict, List, Sequence, Tuple, Union
import types

DASHBOARD_FUNCTION_STACKLEVEL = 1


def get_two_available_ports(port_range: Sequence) -> Tuple[int, int]:
    """
    Get two available ports, one from the start and one from the end of the range

    :param port_range: Port range to try. Reverses the list and will then pick the first one available
    :raises OSError: If there are not enough ports available
    :return: Two available ports
    """
    def _port_available(port_nr: int) -> bool:
        """
        Checks if a port is available

        :param port_nr: Port number to check
        :return: True if available, False otherwise
        """
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.bind(('', port_nr))
            s.close()
            return True
        except OSError:
            return False

    available_ports = set()
    for port_nr in port_range:
        if _port_available(port_nr):
            available_ports.add(port_nr)
            break

    for port_nr in reversed(port_range):
        if _port_available(port_nr):
            available_ports.add(port_nr)
            break

    if len(available_ports) != 2:
        raise OSError(f"Dashboard Manager Server: there are not enough ports available: {port_range}")

    return tuple(sorted(available_ports))


[docs]def get_stacklevel() -> int: """ Gets the stack level to use when obtaining function details (used for the dashboard) :return: Stack level """ return DASHBOARD_FUNCTION_STACKLEVEL
[docs]def set_stacklevel(stacklevel: int) -> None: """ Sets the stack level to use when obtaining function details (used for the dashboard) :param stacklevel: Stack level """ global DASHBOARD_FUNCTION_STACKLEVEL DASHBOARD_FUNCTION_STACKLEVEL = stacklevel
def get_function_details(func: Callable) -> Dict[str, Union[str, int]]: """ Obtain function details, including: - function filename - function line number - function name - invoked from filename - invoked from line number - invoked code context :param func: Function to call each time new task arguments become available. When passing on the worker ID the function should receive the worker ID as its first argument. If shared objects are provided the function should receive those as the next argument. If the worker state has been enabled it should receive a state variable as the next argument :return: Function details dictionary """ # Get the frame in which the pool.map(...) was called. We obtain the current stack and skip all frames which # involve the current mpire module. If the desired stack level is higher than 1, we continue until we've reached # the desired stack level. We then obtain the code context of that frame. invoked_frame = None stacklevel = 0 for frame_info in inspect.stack(): if frame_info.frame.f_globals['__name__'].split('.')[0] != 'mpire' or stacklevel > 0: invoked_frame = frame_info stacklevel += 1 if stacklevel == DASHBOARD_FUNCTION_STACKLEVEL: break # Obtain proper code context. Usually the last line of the invoked code is returned, but we want the complete # code snippet that called this function. That's why we increase the context size and need to find the start and # ending of the snippet. A context size of 10 should suffice. The end of the snippet is where we encounter the # line found when context=1 (i.e., what is returned in invoked_frame.code_context). The start is where we see # something along the lines of `.[i]map[_unordered](`. code_context = inspect.getframeinfo(invoked_frame.frame, context=10).code_context if code_context is not None: code_context = code_context[:code_context.index(invoked_frame.code_context[0]) + 1] code_context = find_calling_lines(code_context) invoked_line_no = invoked_frame.lineno - (len(code_context) - 1) code_context = ' '.join(line.strip() for line in code_context) else: invoked_line_no = 'N/A' if isinstance(func, partial): # If we're dealing with a partial, obtain the function within func = func.func elif hasattr(func, '__call__') and not isinstance(func, (type, types.FunctionType, types.MethodType)): # If we're dealing with a callable class instance, use its __call__ method func = func.__call__ # We use a try/except block as some constructs don't allow this. E.g., in the case the function is a MagicMock # (i.e., in unit tests) these inspections will fail try: function_filename = inspect.getabsfile(func) function_line_no = func.__code__.co_firstlineno function_name = func.__name__ except: function_filename = 'n/a' function_line_no = 'n/a' function_name = 'n/a' # Obtain user. This can fail when the current uid refers to a non-existing user, which can happen when running in a # container as a non-root user. See https://github.com/sybrenjansen/mpire/issues/128. try: user = getpass.getuser() except KeyError: user = "n/a" # Populate details func_details = {'user': f'{user}@{socket.gethostname()}', 'function_filename': function_filename, 'function_line_no': function_line_no, 'function_name': function_name, 'invoked_filename': invoked_frame.filename, 'invoked_line_no': invoked_line_no, 'invoked_code_context': code_context} return func_details def find_calling_lines(code_context: List[str]) -> List[str]: """ Tries to find the lines corresponding to the calling function :param code_context: List of code lines :return: List of code lines """ # Traverse the lines in reverse order. We need a closing bracket to indicate the end of the calling function. From # that point on we work our way backward until we find the corresponding opening bracket. There can be more bracket # groups in between, so we have to keep counting brackets until we've found the right one. n_parentheses_groups = 0 found_parentheses_group = False inside_string = False inside_string_ch = None line_nr = 1 for line_nr, line in enumerate(reversed(code_context), start=1): for ch in reversed(line): # If we're inside a string keep ignoring characters until we find the closing string character if inside_string: if ch == inside_string_ch: inside_string = False # Check if a string has started elif ch in {'"', "'"}: inside_string = True inside_string_ch = ch # Closing parenthesis group elif ch == ')': n_parentheses_groups += 1 found_parentheses_group = True # Starting parenthesis group elif ch == '(': n_parentheses_groups -= 1 # Check if we've found the corresponding opening bracket if found_parentheses_group and n_parentheses_groups == 0: break return code_context[-line_nr:]