8000 Trial tracemalloc as memory tracker (replacement v2 PR) by pp-mo · Pull Request #5946 · SciTools/iris · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Trial tracemalloc as memory tracker (replacement v2 PR) #5946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions benchmarks/benchmarks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

from os import environ
import resource
import tracemalloc

import numpy as np

ARTIFICIAL_DIM_SIZE = int(10e3) # For all artificial cubes, coords etc.

Expand Down Expand Up @@ -66,24 +69,44 @@ class TrackAddedMemoryAllocation:

"""

RESULT_MINIMUM_MB = 5.0
_DEFAULT_RESULT_MINIMUM_MB = 5.0
_DEFAULT_RESULT_ROUND_DP = 1

def __init__(self, use_tracemalloc=False, result_min_mb=None, result_round_dp=None):
self._use_tracemalloc = use_tracemalloc
if result_min_mb is None:
result_min_mb = self._DEFAULT_RESULT_MINIMUM_MB
self.RESULT_MINIMUM_MB = result_min_mb
if result_round_dp is None:
result_round_dp = self._DEFAULT_RESULT_ROUND_DP
self.RESULT_ROUND_DP = result_round_dp

@staticmethod
def process_resident_memory_mb():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.0

def __enter__(self):
self.mb_before = self.process_resident_memory_mb()
if self._use_tracemalloc:
self.mb_before = 0
tracemalloc.start()
else:
self.mb_before = self.process_resident_memory_mb()
return self

def __exit__(self, *_):
self.mb_after = self.process_resident_memory_mb()
if self._use_tracemalloc:
_, peak_mem = tracemalloc.get_traced_memory()
tracemalloc.stop()
self.mb_after = peak_mem * 1.0 / 1024**2
else:
self.mb_after = self.process_resident_memory_mb()

def addedmem_mb(self):
"""Return measured memory growth, in Mb."""
result = self.mb_after - self.mb_before
# Small results are too vulnerable to noise being interpreted as signal.
result = max(self.RESULT_MINIMUM_MB, result)
result = np.round(result, self.RESULT_ROUND_DP)
return result

@staticmethod
Expand Down Expand Up @@ -124,3 +147,23 @@ def on_demand_benchmark(benchmark_object):
"""
if "ON_DEMAND_BENCHMARKS" in environ:
return benchmark_object


def memtrace_benchmark(use_tracemalloc=False, result_min_mb=None):
# Call which returns a decorator == 'decorator with args'.
# N.B. embeds the the call argument in the env of the decorator returned
from functools import wraps

def decorator(decorated_func):
assert decorated_func.__name__[:6] == "track_"

@wraps(decorated_func)
def wrapper(*args, **kwargs):
with TrackAddedMemoryAllocation(
_use_tracemalloc=use_tracemalloc, result_min_mb=result_min_mb
):
result = decorated_func(*args, **kwargs)

return wrapper

return decorator
152 changes: 152 additions & 0 deletions benchmarks/benchmarks/memtrace_evaluation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright Iris contributors
#
# This file is part of Iris and is released under the BSD license.
# See LICENSE in the root of the repository for full licensing details.
"""Benchmarks to evaluate tracemalloc/rss methods of memory measurement."""

import numpy as np

from .. import TrackAddedMemoryAllocation
from .memory_exercising_task import SampleParallelTask


class MemcheckCommon:
# Basic controls over the test calculation
default_params = {
"measure": "tracemalloc", # alternate: "rss"
"runtype": "threads", # alternate: "processes"
"ysize": 10000,
"nx": 2000,
"nblocks": 6,
"nworkers": 3,
}

def _setup(self, **kwargs):
params = self.default_params.copy()
params.update(kwargs)
measure = params["measure"]
runtype = params["runtype"]
ysize = params["ysize"]
nx = params["nx"]
nblocks = params["nblocks"]
nworkers = params["nworkers"]

ny_task = ysize // nblocks
use_processes = {"threads": False, "processes": True}[runtype]
self.task = SampleParallelTask(
n_blocks=nblocks,
outerdim=ny_task,
innerdim=nx,
n_workers=nworkers,
use_process_workers=use_processes,
)
self.use_tracemalloc = {"tracemalloc": True, "rss": False}[measure]

def run_time_calc(self):
# This usage is a bit crap, as we don't really care about the runtype.
self.task.perform()

def run_addedmem_calc(self):
with TrackAddedMemoryAllocation(
use_tracemalloc=self.use_tracemalloc,
result_min_mb=0.0,
) as tracer:
self.task.perform()
return tracer.addedmem_mb()


def memory_units_mib(func):
func.unit = "Mib"
return func


class MemcheckRunstyles(MemcheckCommon):
# only some are parametrised, or it's just too complicated!
param_names = [
"measure",
"runtype",
"ysize",
]
params = [
# measure
["tracemalloc", "rss"],
# runtype
["threads", "processes"],
# ysize
[10000, 40000],
]

def setup(self, measure, runtype, ysize):
self._setup(measure=measure, runtype=runtype, ysize=ysize)

def time_calc(self, measure, runtype, ysize):
self.run_time_calc()

@memory_units_mib
def track_addmem_calc(self, measure, runtype, ysize):
return self.run_addedmem_calc()


class MemcheckBlocksAndWorkers(MemcheckCommon):
# only some are parametrised, or it's just too complicated!
param_names = [
"nblocks",
"nworkers",
]
params = [
# nblocks
[1, 4, 9],
# nworkers
[1, 2, 3, 4],
]

def setup(self, nblocks, nworkers):
self.default_params["ysize"] = 20000
self._setup(
nblocks=nblocks,
nworkers=nworkers,
)

def time_calc(self, nblocks, nworkers):
self.run_time_calc()

@memory_units_mib
def track_addmem_calc(self, nblocks, nworkers):
return self.run_addedmem_calc()


class MemcheckBlocksAndWorkers_processes(MemcheckBlocksAndWorkers):
def setup(self, nblocks, nworkers):
self.default_params["runtype"] = "processes"
super().setup(nblocks, nworkers)


class MemcheckBlocksAndWorkers_Rss(MemcheckBlocksAndWorkers):
def setup(self, nblocks, nworkers):
self.default_params["measure"] = "rss"
super().setup(
nblocks=nblocks,
nworkers=nworkers,
)


class MemcheckTaskRepeats(MemcheckCommon):
param_names = ["nreps"]
params = [1, 2, 3, 4]

def setup(self, nreps):
self._extra_allocated_mem = []
self._setup()

def _test_task(self):
odd_array = np.zeros([1000, 1000], dtype=np.float32)
odd_array[1, 1] = 1
self._extra_allocated_mem.extend(odd_array)
self.task.perform()

@memory_units_mib
def track_mem(self, nreps):
with TrackAddedMemoryAllocation() as tracer:
for _ in range(nreps):
self._test_task()
return tracer.addedmem_mb()
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright Iris contributors
#
# This file is part of Iris and is released under the BSD license.
# See LICENSE in the root of the repository for full licensing details.
"""Provide a standard parallel calculation for testing the memory tracing."""

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

import numpy as np

"""
the basic operation is to for each worker to construct a (NY, NX) numpy
random array, of which it calculates and returns the mean(axis=0)
--> (NX,) result
The results are then collected --> (N_BLOCKS, NX),
and a mean over all calculated --> (NX,)
The final (single-value) result is the *minimum* of that.
"""

# _SHOW_DEBUG = True
_SHOW_DEBUG = False


def debug(msg):
if _SHOW_DEBUG:
print(msg)


def subtask_operation(arg):
i_task, ny, nx = arg
debug(f"\nRunning #{i_task}({ny}, {nx}) ..")
data = np.random.uniform(0.0, 1.0, size=(ny, nx)) # noqa: NPY002
sub_result = data.mean(axis=0)
debug(f"\n.. completed #{i_task}")
return sub_result


class SampleParallelTask:
def __init__(
self,
n_blocks=5,
outerdim=1000,
innerdim=250,
n_workers=4,
use_process_workers=False,
):
self.n_blocks = n_blocks
self.outerdim = outerdim
self.innerdim = innerdim
self.n_workers = n_workers
if use_process_workers:
self.pool_type = ProcessPoolExecutor
else:
self.pool_type = ThreadPoolExecutor
self._setup_calc()

def _setup_calc(self):
self._pool = self.pool_type(self.n_workers)

def perform(self):
partial_results = self._pool.map(
subtask_operation,
[
(i_task + 1, self.outerdim, self.innerdim)
for i_task in range(self.n_blocks)
],
)
combined = np.stack(list(partial_results))
result = np.mean(combined, axis=0)
result = result.min()
return result


if __name__ == "__main__":
nb = 12
nw = 3
ny, nx = 1000, 200
dims = (ny, nx)
use_processes = False
typ = "process" if use_processes else "thread"
msg = f"Starting: blocks={nb} workers={nw} size={dims} type={typ}"
print(msg)
calc = SampleParallelTask(
n_blocks=nb,
outerdim=ny,
innerdim=nx,
n_workers=nw,
use_process_workers=use_processes,
)
debug("Created.")
debug("Run..")
result = calc.perform()
debug("\n.. Run DONE.")
debug(f"result = {result}")
0