From 94494fbf447df012a10e06ec10efedbd6f098bbc Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Thu, 9 May 2024 12:34:40 +0100 Subject: [PATCH 1/4] Trial tracemalloc as alternative memory tracker. --- benchmarks/benchmarks/__init__.py | 49 +++++++- .../memtrace_evaluation/__init__.py | 113 ++++++++++++++++++ .../memory_exercising_task.py | 94 +++++++++++++++ 3 files changed, 253 insertions(+), 3 deletions(-) create mode 100644 benchmarks/benchmarks/memtrace_evaluation/__init__.py create mode 100644 benchmarks/benchmarks/memtrace_evaluation/memory_exercising_task.py diff --git a/benchmarks/benchmarks/__init__.py b/benchmarks/benchmarks/__init__.py index 14b28b3070..ca07ec2933 100644 --- a/benchmarks/benchmarks/__init__.py +++ b/benchmarks/benchmarks/__init__.py @@ -6,6 +6,9 @@ from os import environ import resource +import tracemalloc + +import numpy as np ARTIFICIAL_DIM_SIZE = int(10e3) # For all artificial cubes, coords etc. @@ -66,24 +69,44 @@ class TrackAddedMemoryAllocation: """ - RESULT_MINIMUM_MB = 5.0 + _DEFAULT_RESULT_MINIMUM_MB = 5.0 + _DEFAULT_RESULT_ROUND_DP = 1 + + def __init__(self, use_tracemalloc=False, result_min_mb=None, result_round_dp=None): + self._use_tracemalloc = use_tracemalloc + if result_min_mb is None: + result_min_mb = self._DEFAULT_RESULT_MINIMUM_MB + self.RESULT_MINIMUM_MB = result_min_mb + if result_round_dp is None: + result_round_dp = self._DEFAULT_RESULT_ROUND_DP + self.RESULT_ROUND_DP = result_round_dp @staticmethod def process_resident_memory_mb(): return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.0 def __enter__(self): - self.mb_before = self.process_resident_memory_mb() + if self._use_tracemalloc: + self.mb_before = 0 + tracemalloc.start() + else: + self.mb_before = self.process_resident_memory_mb() return self def __exit__(self, *_): - self.mb_after = self.process_resident_memory_mb() + if self._use_tracemalloc: + _, peak_mem = tracemalloc.get_traced_memory() + tracemalloc.stop() + self.mb_after = peak_mem * 1.0 / 1024**2 + else: + self.mb_after = self.process_resident_memory_mb() def addedmem_mb(self): """Return measured memory growth, in Mb.""" result = self.mb_after - self.mb_before # Small results are too vulnerable to noise being interpreted as signal. result = max(self.RESULT_MINIMUM_MB, result) + result = np.round(result, self.RESULT_ROUND_DP) return result @staticmethod @@ -124,3 +147,23 @@ def on_demand_benchmark(benchmark_object): """ if "ON_DEMAND_BENCHMARKS" in environ: return benchmark_object + + +def memtrace_benchmark(use_tracemalloc=False, result_min_mb=None): + # Call which returns a decorator == 'decorator with args'. + # N.B. embeds the the call argument in the env of the decorator returned + from functools import wraps + + def decorator(decorated_func): + assert decorated_func.__name__[:6] == "track_" + + @wraps(decorated_func) + def wrapper(*args, **kwargs): + with TrackAddedMemoryAllocation( + _use_tracemalloc=use_tracemalloc, result_min_mb=result_min_mb + ): + result = decorated_func(*args, **kwargs) + + return wrapper + + return decorator diff --git a/benchmarks/benchmarks/memtrace_evaluation/__init__.py b/benchmarks/benchmarks/memtrace_evaluation/__init__.py new file mode 100644 index 0000000000..9a8953c83b --- /dev/null +++ b/benchmarks/benchmarks/memtrace_evaluation/__init__.py @@ -0,0 +1,113 @@ +# Copyright Iris contributors +# +# This file is part of Iris and is released under the BSD license. +# See LICENSE in the root of the repository for full licensing details. +"""Benchmarks to evaluate tracemalloc/rss methods of memory measurement.""" + +from .. import TrackAddedMemoryAllocation +from .memory_exercising_task import SampleParallelTask + + +class MemcheckCommon: + # Basic controls over the test calculation + default_params = { + "measure": "tracemalloc", # alternate: "rss" + "runtype": "threads", # alternate: "processes" + "ysize": 10000, + "nx": 2000, + "nblocks": 6, + "nworkers": 4, + } + + def _setup(self, **kwargs): + params = self.default_params.copy() + params.update(kwargs) + measure = params["measure"] + runtype = params["runtype"] + ysize = params["ysize"] + nx = params["nx"] + nblocks = params["nblocks"] + nworkers = params["nworkers"] + + nyfull = ysize // nblocks + use_processes = {"threads": False, "processes": True}[runtype] + self.task = SampleParallelTask( + n_blocks=nblocks, + outerdim=nyfull // nblocks, + innerdim=nx, + n_workers=nworkers, + use_process_workers=use_processes, + ) + self.use_tracemalloc = {"tracemalloc": True, "rss": False}[measure] + + def run_time_calc(self): + # This usage is a bit crap, as we don't really care about the runtype. + self.task.perform() + + def run_addedmem_calc(self): + with TrackAddedMemoryAllocation( + use_tracemalloc=self.use_tracemalloc, + result_min_mb=0.0, + ) as tracer: + self.task.perform() + return tracer.addedmem_mb() + + +def memory_units_mib(func): + func.unit = "Mib" + return func + + +class MemcheckRunstyles(MemcheckCommon): + # only some are parametrised, or it's just too complicated! + param_names = [ + "measure", + "runtype", + "ysize", + ] + params = [ + # measure + ["tracemalloc", "rss"], + # runtype + ["threads", "processes"], + # ysize + [10000, 40000], + ] + + def setup(self, measure, runtype, ysize): + self._setup(measure=measure, runtype=runtype, ysize=ysize) + + def time_calc(self, measure, runtype, ysize): + self.run_time_calc() + + @memory_units_mib + def track_addmem_calc(self, measure, runtype, ysize): + return self.run_addedmem_calc() + + +class MemcheckBlocksAndWorkers(MemcheckCommon): + # only some are parametrised, or it's just too complicated! + param_names = [ + "nblocks", + "nworkers", + ] + params = [ + # nblocks + [1, 4, 9], + # nworkers + [1, 4, 9], + ] + + def setup(self, nblocks, nworkers): + self.default_params["ysize"] = 20000 + self._setup( + nblocks=nblocks, + nworkers=nworkers, + ) + + def time_calc(self, nblocks, nworkers): + self.run_time_calc() + + @memory_units_mib + def track_addmem_calc(self, nblocks, nworkers): + return self.run_addedmem_calc() diff --git a/benchmarks/benchmarks/memtrace_evaluation/memory_exercising_task.py b/benchmarks/benchmarks/memtrace_evaluation/memory_exercising_task.py new file mode 100644 index 0000000000..c7a04387e0 --- /dev/null +++ b/benchmarks/benchmarks/memtrace_evaluation/memory_exercising_task.py @@ -0,0 +1,94 @@ +# Copyright Iris contributors +# +# This file is part of Iris and is released under the BSD license. +# See LICENSE in the root of the repository for full licensing details. +"""Provide a standard parallel calculation for testing the memory tracing.""" + +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor + +import numpy as np + +""" +the basic operation is to for each worker to construct a (NY, NX) numpy +random array, of which it calculates and returns the mean(axis=0) + --> (NX,) result +The results are then collected --> (N_BLOCKS, NX), +and a mean over all calculated --> (NX,) +The final (single-value) result is the *minimum* of that. +""" + +# _SHOW_DEBUG = True +_SHOW_DEBUG = False + + +def debug(msg): + if _SHOW_DEBUG: + print(msg) + + +def subtask_operation(arg): + i_task, ny, nx = arg + debug(f"\nRunning #{i_task}({ny}, {nx}) ..") + data = np.random.uniform(0.0, 1.0, size=(ny, nx)) # noqa: NPY002 + sub_result = data.mean(axis=0) + debug(f"\n.. completed #{i_task}") + return sub_result + + +class SampleParallelTask: + def __init__( + self, + n_blocks=5, + outerdim=1000, + innerdim=250, + n_workers=4, + use_process_workers=False, + ): + self.n_blocks = n_blocks + self.outerdim = outerdim + self.innerdim = innerdim + self.n_workers = n_workers + if use_process_workers: + self.pool_type = ProcessPoolExecutor + else: + self.pool_type = ThreadPoolExecutor + self._setup_calc() + + def _setup_calc(self): + self._pool = self.pool_type(self.n_workers) + + def perform(self): + partial_results = self._pool.map( + subtask_operation, + [ + (i_task + 1, self.outerdim, self.innerdim) + for i_task in range(self.n_blocks) + ], + ) + combined = np.stack(list(partial_results)) + result = np.mean(combined, axis=0) + result = result.min() + return result + + +if __name__ == "__main__": + nb = 12 + nw = 3 + ny, nx = 1000, 200 + dims = (ny, nx) + use_processes = False + typ = "process" if use_processes else "thread" + msg = f"Starting: blocks={nb} workers={nw} size={dims} type={typ}" + print(msg) + calc = SampleParallelTask( + n_blocks=nb, + outerdim=ny, + innerdim=nx, + n_workers=nw, + use_process_workers=use_processes, + ) + debug("Created.") + debug("Run..") + result = calc.perform() + debug("\n.. Run DONE.") + debug(f"result = {result}") From d039ad03bc2b131a9691f4f6e7e2bdbb3b74f9cb Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Thu, 9 May 2024 17:04:27 +0100 Subject: [PATCH 2/4] Use more practical nworkers params. --- .../benchmarks/memtrace_evaluation/__init__.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/benchmarks/benchmarks/memtrace_evaluation/__init__.py b/benchmarks/benchmarks/memtrace_evaluation/__init__.py index 9a8953c83b..25aaa4ffa2 100644 --- a/benchmarks/benchmarks/memtrace_evaluation/__init__.py +++ b/benchmarks/benchmarks/memtrace_evaluation/__init__.py @@ -16,7 +16,7 @@ class MemcheckCommon: "ysize": 10000, "nx": 2000, "nblocks": 6, - "nworkers": 4, + "nworkers": 3, } def _setup(self, **kwargs): @@ -95,7 +95,7 @@ class MemcheckBlocksAndWorkers(MemcheckCommon): # nblocks [1, 4, 9], # nworkers - [1, 4, 9], + [1, 2, 3, 4], ] def setup(self, nblocks, nworkers): @@ -111,3 +111,12 @@ def time_calc(self, nblocks, nworkers): @memory_units_mib def track_addmem_calc(self, nblocks, nworkers): return self.run_addedmem_calc() + + +class MemcheckBlocksAndWorkers_Rss(MemcheckBlocksAndWorkers): + def setup(self, nblocks, nworkers): + self.default_params["measure"] = "rss" + super().setup( + nblocks=nblocks, + nworkers=nworkers, + ) From 84ff4ad5d6cf9c8b95a204dd563764c36ec27f69 Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Thu, 16 May 2024 15:12:44 +0100 Subject: [PATCH 3/4] Fix task scaling; test WorkersAndBlocks with process-based tasks. --- benchmarks/benchmarks/memtrace_evaluation/__init__.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/benchmarks/benchmarks/memtrace_evaluation/__init__.py b/benchmarks/benchmarks/memtrace_evaluation/__init__.py index 25aaa4ffa2..d6145f2782 100644 --- a/benchmarks/benchmarks/memtrace_evaluation/__init__.py +++ b/benchmarks/benchmarks/memtrace_evaluation/__init__.py @@ -29,11 +29,11 @@ def _setup(self, **kwargs): nblocks = params["nblocks"] nworkers = params["nworkers"] - nyfull = ysize // nblocks + ny_task = ysize // nblocks use_processes = {"threads": False, "processes": True}[runtype] self.task = SampleParallelTask( n_blocks=nblocks, - outerdim=nyfull // nblocks, + outerdim=ny_task, innerdim=nx, n_workers=nworkers, use_process_workers=use_processes, @@ -113,6 +113,12 @@ def track_addmem_calc(self, nblocks, nworkers): return self.run_addedmem_calc() +class MemcheckBlocksAndWorkers_processes(MemcheckBlocksAndWorkers): + def setup(self, nblocks, nworkers): + self.default_params["runtype"] = "processes" + super().setup(nblocks, nworkers) + + class MemcheckBlocksAndWorkers_Rss(MemcheckBlocksAndWorkers): def setup(self, nblocks, nworkers): self.default_params["measure"] = "rss" From 8f7166e7bfdb5c0efbef1e2b902ac1bbfc9fa2b5 Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Tue, 21 May 2024 00:28:32 +0100 Subject: [PATCH 4/4] Investigate repeated ops with simulated memory 'leak'. --- .../memtrace_evaluation/__init__.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/benchmarks/benchmarks/memtrace_evaluation/__init__.py b/benchmarks/benchmarks/memtrace_evaluation/__init__.py index d6145f2782..58934d4968 100644 --- a/benchmarks/benchmarks/memtrace_evaluation/__init__.py +++ b/benchmarks/benchmarks/memtrace_evaluation/__init__.py @@ -4,6 +4,8 @@ # See LICENSE in the root of the repository for full licensing details. """Benchmarks to evaluate tracemalloc/rss methods of memory measurement.""" +import numpy as np + from .. import TrackAddedMemoryAllocation from .memory_exercising_task import SampleParallelTask @@ -126,3 +128,25 @@ def setup(self, nblocks, nworkers): nblocks=nblocks, nworkers=nworkers, ) + + +class MemcheckTaskRepeats(MemcheckCommon): + param_names = ["nreps"] + params = [1, 2, 3, 4] + + def setup(self, nreps): + self._extra_allocated_mem = [] + self._setup() + + def _test_task(self): + odd_array = np.zeros([1000, 1000], dtype=np.float32) + odd_array[1, 1] = 1 + self._extra_allocated_mem.extend(odd_array) + self.task.perform() + + @memory_units_mib + def track_mem(self, nreps): + with TrackAddedMemoryAllocation() as tracer: + for _ in range(nreps): + self._test_task() + return tracer.addedmem_mb()