diff --git a/collectd-extensions/src/collectd.service b/collectd-extensions/src/collectd.service index f2b62cd..508b423 100644 --- a/collectd-extensions/src/collectd.service +++ b/collectd-extensions/src/collectd.service @@ -12,5 +12,9 @@ ExecStart=/usr/sbin/collectd ExecStartPost=/bin/bash -c 'echo $MAINPID > /var/run/collectd.pid' ExecStopPost=/bin/rm -f /var/run/collectd.pid +# cgroup performance engineering +# - smooth out CPU impulse from poorly behaved plugin +CPUShares=256 + [Install] WantedBy=multi-user.target diff --git a/collectd-extensions/src/cpu.py b/collectd-extensions/src/cpu.py index f81cbff..f279ec2 100755 --- a/collectd-extensions/src/cpu.py +++ b/collectd-extensions/src/cpu.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2018-2021 Wind River Systems, Inc. +# Copyright (c) 2018-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -17,6 +17,7 @@ ############################################################################ import collectd import copy +import numpy as np import os import plugin_common as pc import re @@ -26,8 +27,13 @@ import tsconfig.tsconfig as tsc from kubernetes.client.rest import ApiException -PLUGIN = 'platform cpu usage plugin' +#PLUGIN = 'platform cpu usage plugin' +PLUGIN = 'platform cpu' +PLUGIN_HISTOGRAM = 'histogram' PLUGIN_DEBUG = 'DEBUG platform cpu' +PLUGIN_HIRES_INTERVAL = 1 # hi-resolution sample interval in secs +PLUGIN_DISPATCH_INTERVAL = 30 # dispatch interval in secs +PLUGIN_HISTOGRAM_INTERVAL = 300 # histogram interval in secs TIMESTAMP = 'timestamp' PLATFORM_CPU_PERCENT = 'platform-occupancy' @@ -42,25 +48,38 @@ SCHEDSTAT = '/proc/schedstat' CPUACCT = pc.CGROUP_ROOT + '/cpuacct' CPUACCT_USAGE = 'cpuacct.usage' CPUACCT_USAGE_PERCPU = 'cpuacct.usage_percpu' +CPU_STAT = 'cpu.stat' # Common regex pattern match groups re_uid = re.compile(r'^pod(\S+)') re_processor = re.compile(r'^[Pp]rocessor\s+:\s+(\d+)') -re_schedstat = re.compile(r'^cpu(\d+)\s+\d+\s+\d+\s+\d+\s+\d+\s+\d+\s+\d+\s+(\d+)\s+') +re_schedstat = re.compile(r'^cpu(\d+)\s+\d+\s+\d+\s+\d+\s+\d+\s+\d+\s+\d+\s+(\d+)\s+(\d+)\s+') re_schedstat_version = re.compile(r'^version\s+(\d+)') re_keyquoteval = re.compile(r'^\s*(\S+)\s*[=:]\s*\"(\S+)\"\s*') +re_cpu_wait_sum = re.compile(r'^wait_sum\s+(\d+)') # hirunner minimum cpu occupancy threshold HIRUNNER_MINIMUM_CPU_PERCENT = 0.1 +# Set numpy format for printing bins +np.set_printoptions(formatter={'int': '{: 4d}'.format}) + # Plugin specific control class and object. class CPU_object(pc.PluginObject): def __init__(self): super(CPU_object, self).__init__(PLUGIN, '') + # CPU Plugin flags + self.dispatch = False # print occupancy and dispatch this sample + self.histogram = False # print occupancy histogram this sample + + # CPU plugin configurable settings self.debug = True self.verbose = True + self.hires = False + + # Cache Kubernetes pods data self._cache = {} self._k8s_client = pc.K8sClient() self.k8s_pods = set() @@ -69,15 +88,50 @@ class CPU_object(pc.PluginObject): self.schedstat_supported = True self.number_platform_cpus = 0 - # Platform CPU monitor now = time.time() # epoch time in floating seconds - self._t0 = {} # cputime state info at start of sample interval - self._t0[TIMESTAMP] = now - self._t0_cpuacct = {} - self._data = {} # derived measurements at end of sample interval - self._data[PLATFORM_CPU_PERCENT] = 0.0 - self.elapsed_ms = 0.0 + # CPU State information at start of dispatch interval + self.d_t0 = {} # per-cpu cputime at dispatch time 0 + self.d_w0 = {} # per-cpu cpuwait at dispatch time 0 + self.d_t0[TIMESTAMP] = now # timestamp dispatch time 0 + self.d_w0[TIMESTAMP] = now # timestamp dispatch time 0 + self.d_t0_cpuacct = {} # per-cgroup cpuacct at dispatch time 0 + self.d_t0_cpuwait = {} # per-cgroup cpuwait at dispatch time 0 + + # Derived measurements over dispatch interval + self.d_occ = {} # dispatch occupancy per cgroup or derived aggregate + self.d_occw = {} # dispatch occupancy wait per cgroup or derived aggregate + self.d_occ[PLATFORM_CPU_PERCENT] = 0.0 # dispatch platform occupancy + self.d_occw[PLATFORM_CPU_PERCENT] = 0.0 # dispatch platform occupancy wait + for g in pc.OVERALL_GROUPS: + self.d_occ[g] = 0.0 + self.d_occw[g] = 0.0 + self.d_elapsed_ms = 0.0 # dispatch elapsed time + + # CPU State information at start of read sample interval + self._t0 = {} # per-cpu cputime at time 0 + self._w0 = {} # per-cpu cpuwait at time 0 + self._t0[TIMESTAMP] = now # timestamp time 0 + self._w0[TIMESTAMP] = now # timestamp time 0 + self._t0_cpuacct = {} # per-cgroup cpuacct at time 0 + self._t0_cpuwait = {} # per-cgroup cpuwait at time 0 + + # Derived measurements over read sample interval + self._occ = {} # occupancy per cgroup or derived aggregate + self._occw = {} # occupancy wait per cgroup or derived aggregate + self._occ[PLATFORM_CPU_PERCENT] = 0.0 # platform occupancy + self._occw[PLATFORM_CPU_PERCENT] = 0.0 # platform occupancy wait + for g in pc.OVERALL_GROUPS: + self._occ[g] = 0.0 + self._occw[g] = 0.0 + self.elapsed_ms = 0.0 # elapsed time + + # Derived measurements over histogram interval + self.hist_t0 = now # histogram timestamp time 0 + self.hist_elapsed_ms = 0.0 # histogram elapsed time + self.hist_occ = {} # histogram bin counts per cgroup or derived aggregate + self.shared_bins = np.histogram_bin_edges( + np.array([0, 100], dtype=np.float64), bins=10, range=(0, 100)) # Instantiate the class @@ -87,13 +141,17 @@ obj = CPU_object() def read_schedstat(): """Read current hiresolution times per cpu from /proc/schedstats. - Return dictionary of cputimes in nanoseconds per cpu. + Return dictionary of cputimes in nanoseconds per cpu, + dictionary of cpuwaits in nanoseconds per cpu. """ cputime = {} + cpuwait = {} - # Obtain cumulative cputime (nanoseconds) from 7th field of - # /proc/schedstat. This is the time running tasks on this cpu. + # Obtain cumulative cputime (nanoseconds) from 7th field, + # and cumulative cpuwait (nanoseconds) from 8th field, + # from /proc/schedstat. This is the time running and waiting + # for tasks on this cpu. try: with open(SCHEDSTAT, 'r') as f: for line in f: @@ -101,11 +159,13 @@ def read_schedstat(): if match: k = int(match.group(1)) v = int(match.group(2)) + w = int(match.group(3)) cputime[k] = v + cpuwait[k] = w except Exception as err: collectd.error('%s Cannot read schedstat, error=%s' % (PLUGIN, err)) - return cputime + return cputime, cpuwait def get_logical_cpus(): @@ -202,8 +262,36 @@ def get_cgroup_cpuacct(path, cpulist=None): return acct +def get_cgroup_cpu_wait_sum(path): + """Get cgroup cpu.stat wait_sum usage for a specific cgroup path. + + This represents the aggregate of all tasks wait time cfs_rq. + This tells us how suffering a task group is in the fight of + cpu resources. + + Returns cumulative wait_sum in nanoseconds. + """ + + wait_sum = 0 + + # Get the aggregate wait_sum for all cpus + fstat = '/'.join([path, CPU_STAT]) + try: + with open(fstat, 'r') as f: + for line in f: + match = re_cpu_wait_sum.search(line) + if match: + v = int(match.group(1)) + wait_sum = int(v) + except IOError: + # Silently ignore IO errors. It is likely the cgroup disappeared. + pass + + return wait_sum + + def get_cpuacct(): - """Get cpuacct usage based on cgroup hierarchy.""" + """Get cpuacct usage and wait_sum based on cgroup hierarchy.""" cpuacct = {} cpuacct[pc.GROUP_OVERALL] = {} @@ -211,48 +299,86 @@ def get_cpuacct(): cpuacct[pc.GROUP_PODS] = {} cpuacct[pc.CGROUP_SYSTEM] = {} cpuacct[pc.CGROUP_USER] = {} + cpuacct[pc.CGROUP_INIT] = {} + cpuacct[pc.CGROUP_K8SPLATFORM] = {} + + cpuwait = {} + cpuwait[pc.GROUP_OVERALL] = {} + cpuwait[pc.GROUP_FIRST] = {} + cpuwait[pc.GROUP_PODS] = {} + cpuwait[pc.CGROUP_SYSTEM] = {} + cpuwait[pc.CGROUP_USER] = {} + cpuwait[pc.CGROUP_INIT] = {} + cpuwait[pc.CGROUP_K8SPLATFORM] = {} + + exclude_types = ['.mount'] # Overall cpuacct usage acct = get_cgroup_cpuacct(CPUACCT, cpulist=obj.cpu_list) + wait = get_cgroup_cpu_wait_sum(CPUACCT) cpuacct[pc.GROUP_OVERALL][pc.GROUP_TOTAL] = acct + cpuwait[pc.GROUP_OVERALL][pc.GROUP_TOTAL] = wait # Initialize 'overhead' time (derived measurement). This will contain # the remaining cputime not specifically tracked by first-level cgroups. cpuacct[pc.GROUP_OVERALL][pc.GROUP_OVERHEAD] = acct + cpuwait[pc.GROUP_OVERALL][pc.GROUP_OVERHEAD] = wait # Walk the first level cgroups and get cpuacct usage # (e.g., docker, k8s-infra, user.slice, system.slice, machine.slice) dir_list = next(os.walk(CPUACCT))[1] for name in dir_list: - if any(name.endswith(x) for x in ['.mount', '.scope']): + if any(name.endswith(x) for x in exclude_types): continue cg_path = '/'.join([CPUACCT, name]) acct = get_cgroup_cpuacct(cg_path, cpulist=obj.cpu_list) + wait = get_cgroup_cpu_wait_sum(cg_path) cpuacct[pc.GROUP_FIRST][name] = acct + cpuwait[pc.GROUP_FIRST][name] = wait # Subtract out first-level cgroups. The remaining cputime represents # systemd 'init' pid and kthreads on Platform cpus. cpuacct[pc.GROUP_OVERALL][pc.GROUP_OVERHEAD] -= acct + cpuwait[pc.GROUP_OVERALL][pc.GROUP_OVERHEAD] -= wait # Walk the system.slice cgroups and get cpuacct usage path = '/'.join([CPUACCT, pc.CGROUP_SYSTEM]) dir_list = next(os.walk(path))[1] for name in dir_list: - if any(name.endswith(x) for x in ['.mount', '.scope']): + if any(name.endswith(x) for x in exclude_types): continue cg_path = '/'.join([path, name]) acct = get_cgroup_cpuacct(cg_path, cpulist=obj.cpu_list) + wait = get_cgroup_cpu_wait_sum(cg_path) cpuacct[pc.CGROUP_SYSTEM][name] = acct + cpuwait[pc.CGROUP_SYSTEM][name] = wait + + # Walk the system.slice cgroups and get cpuacct usage + path = '/'.join([CPUACCT, pc.CGROUP_K8SPLATFORM]) + if os.path.isdir(path): + dir_list = next(os.walk(path))[1] + else: + dir_list = [] + for name in dir_list: + if any(name.endswith(x) for x in exclude_types): + continue + cg_path = '/'.join([path, name]) + acct = get_cgroup_cpuacct(cg_path, cpulist=obj.cpu_list) + wait = get_cgroup_cpu_wait_sum(cg_path) + cpuacct[pc.CGROUP_K8SPLATFORM][name] = acct + cpuwait[pc.CGROUP_K8SPLATFORM][name] = wait # Walk the user.slice cgroups and get cpuacct usage path = '/'.join([CPUACCT, pc.CGROUP_USER]) dir_list = next(os.walk(path))[1] for name in dir_list: - if any(name.endswith(x) for x in ['.mount', '.scope']): + if any(name.endswith(x) for x in exclude_types): continue cg_path = '/'.join([path, name]) acct = get_cgroup_cpuacct(cg_path, cpulist=obj.cpu_list) + wait = get_cgroup_cpu_wait_sum(cg_path) cpuacct[pc.CGROUP_USER][name] = acct + cpuwait[pc.CGROUP_USER][name] = wait # Walk the kubepods hierarchy to the pod level and get cpuacct usage. # We can safely ignore reading this if the path does not exist. @@ -268,8 +394,357 @@ def get_cpuacct(): uid = match.group(1) cg_path = os.path.join(root, name) acct = get_cgroup_cpuacct(cg_path) + wait = get_cgroup_cpu_wait_sum(cg_path) cpuacct[pc.GROUP_PODS][uid] = acct - return cpuacct + cpuwait[pc.GROUP_PODS][uid] = wait + return cpuacct, cpuwait + + +def calculate_occupancy( + prefix, hires, dispatch, + cache, + t0, t1, + w0, w1, + t0_cpuacct, t1_cpuacct, + t0_cpuwait, t1_cpuwait, + occ, occw, + elapsed_ms, + number_platform_cpus, + cpu_list, debug): + """Calculate average occupancy and wait for platform cpus and cgroups. + + This calculates: + - per-cpu cputime delta between time 0 and time 1 (ms) + - per-cpu cpuwait delta between time 0 and time 1 (ms) + - average platform occupancy based on cputime (%) + - average platform occupancy wait based on cpuwait (%) + - per-cgroup cpuacct delta between time 0 and time 1 + - per-cgroup cpuwait delta between time 0 and time 1 + - average per-cgroup occupancy based on cpuacct (%) + - average per-cgroup occupancy wait based on cpuwait (%) + - aggregate occupancy of specific cgroup groupings (%) + - aggregate occupancy wait of specific cgroup groupings (%) + + This logs platform occupancy and aggregate cgroup groupings. + This logs of hirunner occupancy for base cgroups. + """ + + # Aggregate cputime and cpuwait delta for platform logical cpus + cputime_ms = 0.0 + cpuwait_ms = 0.0 + for cpu in cpu_list: + # Paranoia check, we should never hit this. + if cpu not in t0 or cpu not in w0: + collectd.error('%s cputime initialization error' % (PLUGIN)) + break + cputime_ms += float(t1[cpu] - t0[cpu]) + cpuwait_ms += float(w1[cpu] - w0[cpu]) + cputime_ms /= float(pc.ONE_MILLION) + cpuwait_ms /= float(pc.ONE_MILLION) + + # Calculate average occupancy and wait of platform logical cpus + p_occ = 0.0 + p_occw = 0.0 + if number_platform_cpus > 0 and elapsed_ms > 0: + p_occ = float(pc.ONE_HUNDRED) * float(cputime_ms) \ + / float(elapsed_ms) / number_platform_cpus + p_occw = float(pc.ONE_HUNDRED) * float(cpuwait_ms) \ + / float(elapsed_ms) / number_platform_cpus + else: + p_occ = 0.0 + p_occw = 0.0 + + if debug: + collectd.info('%s %s %s elapsed = %.1f ms, ' + 'cputime = %.1f ms, cpuwait = %.1f ms, ' + 'n_cpus = %d, ' + 'occupancy = %.2f %%, wait = %.2f %%' + % (PLUGIN_DEBUG, + prefix, + PLATFORM_CPU_PERCENT, + elapsed_ms, + cputime_ms, cpuwait_ms, + number_platform_cpus, + p_occ, p_occw)) + + occ[PLATFORM_CPU_PERCENT] = p_occ + occw[PLATFORM_CPU_PERCENT] = p_occw + + # Calculate cpuacct and cpuwait delta for cgroup hierarchy, dropping transient cgroups + cpuacct = {} + for i in t1_cpuacct.keys(): + cpuacct[i] = {} + for k, v in t1_cpuacct[i].items(): + if i in t0_cpuacct.keys() and k in t0_cpuacct[i].keys(): + cpuacct[i][k] = v - t0_cpuacct[i][k] + else: + cpuacct[i][k] = v + cpuwait = {} + for i in t1_cpuwait.keys(): + cpuwait[i] = {} + for k, v in t1_cpuwait[i].items(): + if i in t0_cpuwait.keys() and k in t0_cpuwait[i].keys(): + cpuwait[i][k] = v - t0_cpuwait[i][k] + else: + cpuwait[i][k] = v + + # Summarize cpuacct usage for various groupings we aggregate + for g in pc.GROUPS_AGGREGATED: + cpuacct[pc.GROUP_OVERALL][g] = 0.0 + cpuwait[pc.GROUP_OVERALL][g] = 0.0 + + # Aggregate cpuacct usage by K8S pod + for uid in cpuacct[pc.GROUP_PODS]: + acct = cpuacct[pc.GROUP_PODS][uid] + wait = cpuwait[pc.GROUP_PODS][uid] + if uid in cache: + pod = cache[uid] + else: + collectd.warning('%s uid %s not found' % (PLUGIN, uid)) + continue + + # K8S platform system usage, i.e., essential: kube-system + # check for component label app.starlingx.io/component=platform + if pod.is_platform_resource(): + cpuacct[pc.GROUP_OVERALL][pc.GROUP_K8S_SYSTEM] += acct + cpuwait[pc.GROUP_OVERALL][pc.GROUP_K8S_SYSTEM] += wait + + # K8S platform addons usage, i.e., non-essential: monitor, openstack + if pod.namespace in pc.K8S_NAMESPACE_ADDON: + cpuacct[pc.GROUP_OVERALL][pc.GROUP_K8S_ADDON] += acct + cpuwait[pc.GROUP_OVERALL][pc.GROUP_K8S_ADDON] += wait + + # Calculate base cpuacct usage (i.e., base tasks, exclude K8S and VMs) + # e.g., docker, system.slice, user.slice, init.scope + for name in cpuacct[pc.GROUP_FIRST].keys(): + if name in pc.BASE_GROUPS: + cpuacct[pc.GROUP_OVERALL][pc.GROUP_BASE] += \ + cpuacct[pc.GROUP_FIRST][name] + cpuwait[pc.GROUP_OVERALL][pc.GROUP_BASE] += \ + cpuwait[pc.GROUP_FIRST][name] + elif name not in pc.BASE_GROUPS_EXCLUDE: + collectd.warning('%s could not find cgroup: %s' % (PLUGIN, name)) + + # Calculate system.slice container cpuacct usage + for g in pc.CONTAINERS_CGROUPS: + if g in cpuacct[pc.CGROUP_SYSTEM].keys(): + cpuacct[pc.GROUP_OVERALL][pc.GROUP_CONTAINERS] += \ + cpuacct[pc.CGROUP_SYSTEM][g] + cpuwait[pc.GROUP_OVERALL][pc.GROUP_CONTAINERS] += \ + cpuwait[pc.CGROUP_SYSTEM][g] + if g in cpuacct[pc.CGROUP_K8SPLATFORM].keys(): + cpuacct[pc.GROUP_OVERALL][pc.GROUP_CONTAINERS] += \ + cpuacct[pc.CGROUP_K8SPLATFORM][g] + cpuwait[pc.GROUP_OVERALL][pc.GROUP_CONTAINERS] += \ + cpuwait[pc.CGROUP_K8SPLATFORM][g] + + # Calculate platform cpuacct usage (this excludes apps) + for g in pc.PLATFORM_GROUPS: + cpuacct[pc.GROUP_OVERALL][pc.GROUP_PLATFORM] += \ + cpuacct[pc.GROUP_OVERALL][g] + cpuwait[pc.GROUP_OVERALL][pc.GROUP_PLATFORM] += \ + cpuwait[pc.GROUP_OVERALL][g] + + # Calculate cgroup based occupancy and wait for overall groupings + for g in pc.OVERALL_GROUPS: + cputime_ms = \ + float(cpuacct[pc.GROUP_OVERALL][g]) / float(pc.ONE_MILLION) + g_occ = float(pc.ONE_HUNDRED) * float(cputime_ms) \ + / float(elapsed_ms) / number_platform_cpus + occ[g] = g_occ + cpuwait_ms = \ + float(cpuwait[pc.GROUP_OVERALL][g]) / float(pc.ONE_MILLION) + g_occw = float(pc.ONE_HUNDRED) * float(cpuwait_ms) \ + / float(elapsed_ms) / number_platform_cpus + occw[g] = g_occw + if obj.debug: + collectd.info('%s %s %s elapsed = %.1f ms, ' + 'cputime = %.1f ms, cpuwait = %.1f ms, ' + 'n_cpus = %d, ' + 'occupancy = %.2f %%, wait = %.2f %%' + % (PLUGIN_DEBUG, + prefix, + g, + elapsed_ms, + cputime_ms, cpuwait_ms, + number_platform_cpus, + g_occ, g_occ)) + + # Store occupancy hirunners + h_occ = {} + h_occw = {} + + # Calculate cgroup based occupancy for first-level groupings + for g in cpuacct[pc.GROUP_FIRST]: + cputime_ms = \ + float(cpuacct[pc.GROUP_FIRST][g]) / float(pc.ONE_MILLION) + g_occ = float(pc.ONE_HUNDRED) * float(cputime_ms) \ + / float(elapsed_ms) / number_platform_cpus + occ[g] = g_occ + cpuwait_ms = \ + float(cpuwait[pc.GROUP_FIRST][g]) / float(pc.ONE_MILLION) + g_occw = float(pc.ONE_HUNDRED) * float(cpuwait_ms) \ + / float(elapsed_ms) / number_platform_cpus + occw[g] = g_occw + + if g != pc.CGROUP_INIT: + continue + + # Keep hirunners exceeding minimum threshold. + if g_occ >= HIRUNNER_MINIMUM_CPU_PERCENT: + h_occ[g] = g_occ + if g_occw >= HIRUNNER_MINIMUM_CPU_PERCENT: + h_occw[g] = g_occw + + # Calculate cgroup based occupancy for cgroups within system.slice. + for g in cpuacct[pc.CGROUP_SYSTEM]: + cputime_ms = \ + float(cpuacct[pc.CGROUP_SYSTEM][g]) / float(pc.ONE_MILLION) + g_occ = float(pc.ONE_HUNDRED) * float(cputime_ms) \ + / float(elapsed_ms) / number_platform_cpus + occ[g] = g_occ + cpuwait_ms = \ + float(cpuwait[pc.CGROUP_SYSTEM][g]) / float(pc.ONE_MILLION) + g_occw = float(pc.ONE_HUNDRED) * float(cpuwait_ms) \ + / float(elapsed_ms) / number_platform_cpus + occw[g] = g_occw + + # Keep hirunners exceeding minimum threshold. + if g_occ >= HIRUNNER_MINIMUM_CPU_PERCENT: + h_occ[g] = g_occ + if g_occw >= HIRUNNER_MINIMUM_CPU_PERCENT: + h_occw[g] = g_occw + + # Calculate cgroup based occupancy for cgroups within k8splatform.slice. + if pc.CGROUP_K8SPLATFORM in cpuacct.keys(): + for g in cpuacct[pc.CGROUP_K8SPLATFORM]: + cputime_ms = \ + float(cpuacct[pc.CGROUP_K8SPLATFORM][g]) / float(pc.ONE_MILLION) + g_occ = float(pc.ONE_HUNDRED) * float(cputime_ms) \ + / float(elapsed_ms) / number_platform_cpus + occ[g] = g_occ + cpuwait_ms = \ + float(cpuwait[pc.CGROUP_K8SPLATFORM][g]) / float(pc.ONE_MILLION) + g_occw = float(pc.ONE_HUNDRED) * float(cpuwait_ms) \ + / float(elapsed_ms) / number_platform_cpus + occw[g] = g_occw + + # Keep hirunners exceeding minimum threshold. + if g_occ >= HIRUNNER_MINIMUM_CPU_PERCENT: + h_occ[g] = g_occ + if g_occw >= HIRUNNER_MINIMUM_CPU_PERCENT: + h_occw[g] = g_occw + + # Calculate cgroup based occupancy for cgroups within user.slice. + for g in cpuacct[pc.CGROUP_USER]: + cputime_ms = \ + float(cpuacct[pc.CGROUP_USER][g]) / float(pc.ONE_MILLION) + g_occ = float(pc.ONE_HUNDRED) * float(cputime_ms) \ + / float(elapsed_ms) / number_platform_cpus + occ[g] = g_occ + cpuwait_ms = \ + float(cpuwait[pc.CGROUP_USER][g]) / float(pc.ONE_MILLION) + g_occw = float(pc.ONE_HUNDRED) * float(cpuwait_ms) \ + / float(elapsed_ms) / number_platform_cpus + occw[g] = g_occw + + # Keep hirunners exceeding minimum threshold. + if g_occ >= HIRUNNER_MINIMUM_CPU_PERCENT: + h_occ[g] = g_occ + if g_occw >= HIRUNNER_MINIMUM_CPU_PERCENT: + h_occw[g] = g_occw + + if (hires and prefix == 'hires') or (dispatch and prefix == 'dispatch'): + # Print cpu occupancy usage for high-level groupings + collectd.info('%s %s Usage: %.1f%% (avg per cpu); ' + 'cpus: %d, Platform: %.1f%% ' + '(Base: %.1f, k8s-system: %.1f), k8s-addon: %.1f, ' + '%s: %.1f, %s: %.1f' + % (PLUGIN, prefix, + occ[PLATFORM_CPU_PERCENT], + number_platform_cpus, + occ[pc.GROUP_PLATFORM], + occ[pc.GROUP_BASE], + occ[pc.GROUP_K8S_SYSTEM], + occ[pc.GROUP_K8S_ADDON], + pc.GROUP_CONTAINERS, + occ[pc.GROUP_CONTAINERS], + pc.GROUP_OVERHEAD, + occ[pc.GROUP_OVERHEAD])) + + # Print hirunner cpu occupancy usage for base cgroups + occs = ', '.join( + '{}: {:.1f}'.format(k.split('.', 1)[0], v) for k, v in sorted( + h_occ.items(), key=lambda t: -float(t[1])) + ) + collectd.info('%s %s %s: %.1f%%; cpus: %d, (%s)' + % (PLUGIN, + prefix, 'Base usage', + occ[pc.GROUP_BASE], + number_platform_cpus, + occs)) + + # Print hirunner cpu wait for base cgroups + occws = ', '.join( + '{}: {:.1f}'.format(k.split('.', 1)[0], v) for k, v in sorted( + h_occw.items(), key=lambda t: -float(t[1])) + ) + collectd.info('%s %s %s: %.1f%%; cpus: %d, (%s)' + % (PLUGIN, + prefix, 'Base wait', + occw[pc.GROUP_BASE], + number_platform_cpus, + occws)) + + +def aggregate_histogram(histogram, occ, shared_bins, hist_occ, debug): + """Aggregate occupancy histogram bins for platform cpus and cgroups. + + This aggregates occupancy histogram bins for each key measurement. + + When 'histogram' flag is True, this will: + - calculate mean, 95th-percentime, and max statistics, and bins + the measurements + - log histograms and statistics per measurement in hirunner order + """ + + # Aggregate each key, value into histogram bins + for k, v in occ.items(): + # Get abbreviated name (excludes: .service, .scope, .socket, .mount) + # eg, 'k8splatform.slice' will shorten to 'k8splatform' + key = k.split('.', 1)[0] + if key not in hist_occ: + hist_occ[key] = np.array([], dtype=np.float64) + if v is not None: + hist_occ[key] = np.append(hist_occ[key], v) + + if histogram: + # Calculate histograms and statistics for each key measurement + H = {} + for k, v in hist_occ.items(): + H[k] = {} + H[k]['count'] = hist_occ[k].size + if H[k]['count'] > 0: + H[k]['mean'] = np.mean(hist_occ[k]) + H[k]['p95'] = np.percentile(hist_occ[k], 95) + H[k]['pmax'] = np.max(hist_occ[k]) + H[k]['hist'], _ = np.histogram(hist_occ[k], bins=shared_bins) + else: + H[k]['mean'] = 0 + H[k]['p95'] = 0.0 + H[k]['pmax'] = 0.0 + H[k]['hist'] = [] + + # Print out each histogram, sort by cpu occupancy hirunners + bins = ' '.join('{:4d}'.format(int(x)) for x in shared_bins[1:]) + collectd.info('%s: %26.26s : bins=[%s]' + % (PLUGIN_HISTOGRAM, 'component', bins)) + for k, v in sorted(H.items(), key=lambda t: -float(t[1]['mean'])): + if v['mean'] > HIRUNNER_MINIMUM_CPU_PERCENT: + collectd.info('%s: %26.26s : hist=%s : cnt: %3d, ' + 'mean: %5.1f %%, p95: %5.1f %%, max: %5.1f %%' + % (PLUGIN_HISTOGRAM, k, v['hist'], v['count'], + v['mean'], v['p95'], v['pmax'])) def update_cpu_data(init=False): @@ -287,23 +762,36 @@ def update_cpu_data(init=False): # Calculate elapsed time delta since last run obj.elapsed_ms = float(pc.ONE_THOUSAND) * (now - obj._t0[TIMESTAMP]) + obj.d_elapsed_ms = float(pc.ONE_THOUSAND) * (now - obj.d_t0[TIMESTAMP]) + obj.hist_elapsed_ms = float(pc.ONE_THOUSAND) * (now - obj.hist_t0) # Prevent calling this routine too frequently (<= 1 sec) if not init and obj.elapsed_ms <= 1000.0: return + # Check whether this is a dispatch interval + if obj.d_elapsed_ms >= 1000.0 * PLUGIN_DISPATCH_INTERVAL: + obj.dispatch = True + + # Check whether this is a histogram interval + if obj.hist_elapsed_ms >= 1000.0 * PLUGIN_HISTOGRAM_INTERVAL: + obj.histogram = True + t1 = {} + w1 = {} t1[TIMESTAMP] = now + w1[TIMESTAMP] = now if obj.schedstat_supported: # Get current per-cpu cumulative cputime usage from /proc/schedstat. - cputimes = read_schedstat() + cputime, cpuwait = read_schedstat() for cpu in obj.cpu_list: - t1[cpu] = cputimes[cpu] + t1[cpu] = cputime[cpu] + w1[cpu] = cpuwait[cpu] else: return - # Get current cpuacct usages based on cgroup hierarchy - t1_cpuacct = get_cpuacct() + # Get current cpuacct usages and wait_sum based on cgroup hierarchy + t1_cpuacct, t1_cpuwait = get_cpuacct() # Refresh the k8s pod information if we have discovered new cgroups cg_pods = set(t1_cpuacct[pc.GROUP_PODS].keys()) @@ -350,154 +838,73 @@ def update_cpu_data(init=False): del obj._cache[uid] except ApiException: # continue with remainder of calculations, keeping cache - collectd.warning("cpu plugin encountered kube ApiException") + collectd.warning('%s encountered kube ApiException' % (PLUGIN)) pass # Save initial state information if init: + obj.d_t0 = copy.deepcopy(t1) + obj.d_w0 = copy.deepcopy(w1) + obj.d_t0_cpuacct = copy.deepcopy(t1_cpuacct) + obj.d_t0_cpuwait = copy.deepcopy(t1_cpuwait) + obj._t0 = copy.deepcopy(t1) + obj._w0 = copy.deepcopy(w1) obj._t0_cpuacct = copy.deepcopy(t1_cpuacct) + obj._t0_cpuwait = copy.deepcopy(t1_cpuwait) return - # Aggregate cputime delta for platform logical cpus using integer math - cputime_ms = 0.0 - for cpu in obj.cpu_list: - # Paranoia check, we should never hit this. - if cpu not in obj._t0: - collectd.error('%s cputime initialization error' % (PLUGIN)) - break - cputime_ms += float(t1[cpu] - obj._t0[cpu]) - cputime_ms /= float(pc.ONE_MILLION) + # Calculate average cpu occupancy for hi-resolution read sample + prefix = 'hires' + calculate_occupancy( + prefix, obj.hires, obj.dispatch, + obj._cache, + obj._t0, t1, + obj._w0, w1, + obj._t0_cpuacct, t1_cpuacct, + obj._t0_cpuwait, t1_cpuwait, + obj._occ, obj._occw, + obj.elapsed_ms, + obj.number_platform_cpus, + obj.cpu_list, + obj.debug) - # Calculate average occupancy of platform logical cpus - occupancy = 0.0 - if obj.number_platform_cpus > 0 and obj.elapsed_ms > 0: - occupancy = float(pc.ONE_HUNDRED) * float(cputime_ms) \ - / float(obj.elapsed_ms) / obj.number_platform_cpus - else: - occupancy = 0.0 - obj._data[PLATFORM_CPU_PERCENT] = occupancy - if obj.debug: - collectd.info('%s %s elapsed = %.1f ms, cputime = %.1f ms, ' - 'n_cpus = %d, occupancy = %.2f %%' - % (PLUGIN_DEBUG, - PLATFORM_CPU_PERCENT, - obj.elapsed_ms, - cputime_ms, - obj.number_platform_cpus, - occupancy)) + # Aggregate occupancy histogram bins + aggregate_histogram( + obj.histogram, obj._occ, obj.shared_bins, obj.hist_occ, obj.debug) - # Calculate cpuacct delta for cgroup hierarchy, dropping transient cgroups - cpuacct = {} - for i in t1_cpuacct.keys(): - cpuacct[i] = {} - for k, v in t1_cpuacct[i].items(): - if i in obj._t0_cpuacct and k in obj._t0_cpuacct[i]: - cpuacct[i][k] = v - obj._t0_cpuacct[i][k] - else: - cpuacct[i][k] = v + # Clear histogram data for next interval + if obj.histogram: + obj.histogram = False + obj.hist_occ = {} + obj.hist_t0 = now - # Summarize cpuacct usage for various groupings we aggregate - for g in pc.GROUPS_AGGREGATED: - cpuacct[pc.GROUP_OVERALL][g] = 0.0 - - # Aggregate cpuacct usage by K8S pod - for uid in cpuacct[pc.GROUP_PODS]: - acct = cpuacct[pc.GROUP_PODS][uid] - if uid in obj._cache: - pod = obj._cache[uid] - else: - collectd.warning('%s uid %s not found' % (PLUGIN, uid)) - continue - - # K8S platform system usage, i.e., essential: kube-system - # check for component label app.starlingx.io/component=platform - if pod.is_platform_resource(): - cpuacct[pc.GROUP_OVERALL][pc.GROUP_K8S_SYSTEM] += acct - - # K8S platform addons usage, i.e., non-essential: monitor, openstack - if pod.namespace in pc.K8S_NAMESPACE_ADDON: - cpuacct[pc.GROUP_OVERALL][pc.GROUP_K8S_ADDON] += acct - - # Calculate base cpuacct usage (i.e., base tasks, exclude K8S and VMs) - # e.g., docker, system.slice, user.slice - for name in cpuacct[pc.GROUP_FIRST]: - if name in pc.BASE_GROUPS: - cpuacct[pc.GROUP_OVERALL][pc.GROUP_BASE] += \ - cpuacct[pc.GROUP_FIRST][name] - elif name not in pc.BASE_GROUPS_EXCLUDE: - collectd.warning('%s could not find cgroup: %s' % (PLUGIN, name)) - - # Calculate system.slice container cpuacct usage - for g in pc.CONTAINERS_CGROUPS: - if g in cpuacct[pc.CGROUP_SYSTEM]: - cpuacct[pc.GROUP_OVERALL][pc.GROUP_CONTAINERS] += \ - cpuacct[pc.CGROUP_SYSTEM][g] - - # Calculate platform cpuacct usage (this excludes apps) - for g in pc.PLATFORM_GROUPS: - cpuacct[pc.GROUP_OVERALL][pc.GROUP_PLATFORM] += \ - cpuacct[pc.GROUP_OVERALL][g] - - # Calculate cgroup based occupancy for overall groupings - for g in pc.OVERALL_GROUPS: - cputime_ms = \ - float(cpuacct[pc.GROUP_OVERALL][g]) / float(pc.ONE_MILLION) - occupancy = float(pc.ONE_HUNDRED) * float(cputime_ms) \ - / float(obj.elapsed_ms) / obj.number_platform_cpus - obj._data[g] = occupancy - if obj.debug: - collectd.info('%s %s elapsed = %.1f ms, cputime = %.1f ms, ' - 'n_cpus = %d, occupancy = %.2f %%' - % (PLUGIN_DEBUG, - g, - obj.elapsed_ms, - cputime_ms, - obj.number_platform_cpus, - occupancy)) - - # Calculate cgroup based occupancy for first-level groupings - for g in cpuacct[pc.GROUP_FIRST]: - cputime_ms = \ - float(cpuacct[pc.GROUP_FIRST][g]) / float(pc.ONE_MILLION) - occupancy = float(pc.ONE_HUNDRED) * float(cputime_ms) \ - / float(obj.elapsed_ms) / obj.number_platform_cpus - obj._data[g] = occupancy - - # Calculate cgroup based occupancy for cgroups within - # system.slice and user.slice, keeping the hirunners - # exceeding minimum threshold. - occ = {} - for g in cpuacct[pc.CGROUP_SYSTEM]: - cputime_ms = \ - float(cpuacct[pc.CGROUP_SYSTEM][g]) / float(pc.ONE_MILLION) - occupancy = float(pc.ONE_HUNDRED) * float(cputime_ms) \ - / float(obj.elapsed_ms) / obj.number_platform_cpus - obj._data[g] = occupancy - if occupancy >= HIRUNNER_MINIMUM_CPU_PERCENT: - occ[g] = occupancy - for g in cpuacct[pc.CGROUP_USER]: - cputime_ms = \ - float(cpuacct[pc.CGROUP_USER][g]) / float(pc.ONE_MILLION) - occupancy = float(pc.ONE_HUNDRED) * float(cputime_ms) \ - / float(obj.elapsed_ms) / obj.number_platform_cpus - obj._data[g] = occupancy - if occupancy >= HIRUNNER_MINIMUM_CPU_PERCENT: - occ[g] = occupancy - occs = ', '.join( - '{}: {:.1f}'.format(k.split('.', 1)[0], v) for k, v in sorted( - occ.items(), key=lambda t: -float(t[1])) - ) - collectd.info('%s %s: %.1f%%; cpus: %d, (%s)' - % (PLUGIN, - 'Base usage', - obj._data[pc.GROUP_BASE], - obj.number_platform_cpus, - occs)) + # Calculate average cpu occupancy for dispatch interval + if obj.dispatch: + prefix = 'dispatch' + calculate_occupancy( + prefix, obj.hires, obj.dispatch, + obj._cache, + obj.d_t0, t1, + obj.d_w0, w1, + obj.d_t0_cpuacct, t1_cpuacct, + obj.d_t0_cpuwait, t1_cpuwait, + obj.d_occ, obj.d_occw, + obj.d_elapsed_ms, + obj.number_platform_cpus, + obj.cpu_list, + obj.debug) # Update t0 state for the next sample collection obj._t0 = copy.deepcopy(t1) + obj._w0 = copy.deepcopy(w1) obj._t0_cpuacct = copy.deepcopy(t1_cpuacct) + obj._t0_cpuwait = copy.deepcopy(t1_cpuwait) + if obj.dispatch: + obj.d_t0 = copy.deepcopy(t1) + obj.d_w0 = copy.deepcopy(w1) + obj.d_t0_cpuacct = copy.deepcopy(t1_cpuacct) + obj.d_t0_cpuwait = copy.deepcopy(t1_cpuwait) def config_func(config): @@ -510,9 +917,11 @@ def config_func(config): obj.debug = pc.convert2boolean(val) elif key == 'verbose': obj.verbose = pc.convert2boolean(val) + elif key == 'hires': + obj.hires = pc.convert2boolean(val) - collectd.info('%s debug=%s, verbose=%s' - % (PLUGIN, obj.debug, obj.verbose)) + collectd.info('%s debug=%s, verbose=%s, hires=%s' + % (PLUGIN, obj.debug, obj.verbose, obj.hires)) return pc.PLUGIN_PASS @@ -598,55 +1007,41 @@ def read_func(): collectd.info('%s no cpus to monitor' % PLUGIN) return pc.PLUGIN_PASS - # Gather current cputime state information, and calculate occupancy since - # this routine was last run. + # Gather current cputime state information, and calculate occupancy + # since this routine was last run. update_cpu_data() # Prevent dispatching measurements at plugin startup - if obj.elapsed_ms <= 1000.0: + if obj.elapsed_ms <= 500.0: return pc.PLUGIN_PASS - if obj.verbose: - collectd.info('%s Usage: %.1f%% (avg per cpu); ' - 'cpus: %d, Platform: %.1f%% ' - '(Base: %.1f, k8s-system: %.1f), k8s-addon: %.1f, ' - '%s: %.1f, %s: %.1f' - % (PLUGIN, obj._data[PLATFORM_CPU_PERCENT], - obj.number_platform_cpus, - obj._data[pc.GROUP_PLATFORM], - obj._data[pc.GROUP_BASE], - obj._data[pc.GROUP_K8S_SYSTEM], - obj._data[pc.GROUP_K8S_ADDON], - pc.GROUP_CONTAINERS, - obj._data[pc.GROUP_CONTAINERS], - pc.GROUP_OVERHEAD, - obj._data[pc.GROUP_OVERHEAD])) - # Fault insertion code to assis in regression UT # # if os.path.exists('/var/run/fit/cpu_data'): # with open('/var/run/fit/cpu_data', 'r') as infile: # for line in infile: - # obj._data[PLATFORM_CPU_PERCENT] = float(line) + # obj._occ[PLATFORM_CPU_PERCENT] = float(line) # collectd.info("%s using FIT data:%.2f" % - # (PLUGIN, obj._data[PLATFORM_CPU_PERCENT] )) + # (PLUGIN, obj._occ[PLATFORM_CPU_PERCENT] )) # break - # Dispatch overall platform cpu usage percent value - val = collectd.Values(host=obj.hostname) - val.plugin = 'cpu' - val.type = 'percent' - val.type_instance = 'used' - val.dispatch(values=[obj._data[PLATFORM_CPU_PERCENT]]) + if obj.dispatch: + # Dispatch overall platform cpu usage percent value + val = collectd.Values(host=obj.hostname) + val.plugin = 'cpu' + val.type = 'percent' + val.type_instance = 'used' + val.dispatch(values=[obj.d_occ[PLATFORM_CPU_PERCENT]]) - # Dispatch grouped platform cpu usage values - val = collectd.Values(host=obj.hostname) - val.plugin = 'cpu' - val.type = 'percent' - val.type_instance = 'occupancy' - for g in pc.OVERALL_GROUPS: - val.plugin_instance = g - val.dispatch(values=[obj._data[g]]) + # Dispatch grouped platform cpu usage values + val = collectd.Values(host=obj.hostname) + val.plugin = 'cpu' + val.type = 'percent' + val.type_instance = 'occupancy' + for g in pc.OVERALL_GROUPS: + val.plugin_instance = g + val.dispatch(values=[obj.d_occ[g]]) + obj.dispatch = False # Calculate overhead cost of gathering metrics if obj.debug: @@ -661,4 +1056,4 @@ def read_func(): # Register the config, init and read functions collectd.register_config(config_func) collectd.register_init(init_func) -collectd.register_read(read_func) +collectd.register_read(read_func, interval=PLUGIN_HIRES_INTERVAL) diff --git a/collectd-extensions/src/memory.py b/collectd-extensions/src/memory.py index 6544dc7..114411a 100755 --- a/collectd-extensions/src/memory.py +++ b/collectd-extensions/src/memory.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2018-2022 Wind River Systems, Inc. +# Copyright (c) 2018-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -618,22 +618,23 @@ def output_top_10_pids(pid_dict, message): """Outputs the top 10 pids with the formatted message. Args: - pid_dict: Dict The Dictionary of PIDs with Name and RSS - message: Formatted String, the template message to be output. + pid_dict: dictionary {pid: {'name': name, 'rss: value} + message: Formatted String, template output message """ # Check that pid_dict has values if not pid_dict: return - proc = [] - # Sort the dict based on Rss value from highest to lowest. - sorted_pid_dict = sorted(pid_dict.items(), key=lambda x: x[1]['rss'], - reverse=True) - # Convert sorted_pid_dict into a list - [proc.append((i[1].get('name'), format_iec(i[1].get('rss')))) for i in - sorted_pid_dict] - # Output top 10 entries of the list - collectd.info(message % (str(proc[:10]))) + + # Output top 10 RSS usage entries + mems = ', '.join( + '{}: {}'.format( + v.get('name', '-'), + format_iec(v.get('rss', 0.0))) for k, v in sorted( + pid_dict.items(), + key=lambda t: -float(t[1]['rss']))[:10] + ) + collectd.info(message % (mems)) def config_func(config): @@ -777,10 +778,10 @@ def read_func(): # K8S platform addons usage, i.e., non-essential: monitor, openstack if pod.namespace in pc.K8S_NAMESPACE_ADDON: memory[pc.GROUP_OVERALL][pc.GROUP_K8S_ADDON] += MiB - # Limit output to every 5 minutes and after 29 seconds to avoid duplication - if datetime.datetime.now().minute % 5 == 0 and datetime.datetime.now( - ).second > 29: + # Get per-process and per-pod RSS memory every 5 minutes + now = datetime.datetime.now() + if now.minute % 5 == 0 and now.second > 29: # Populate the memory per process dictionary to output results pids = get_platform_memory_per_process() @@ -795,13 +796,21 @@ def read_func(): for uid in group_pods: if uid in obj._cache: pod = obj._cache[uid] - # Ensure pods outside of Kube-System and Kube-Addon are only logged every 30 min - if datetime.datetime.now().minute % 30 == 0 and datetime.datetime.now().second > 29: - collectd.info(f'The pod:{pod.name} running in namespace:{pod.namespace} ' - f'has the following processes{group_pods[uid]}') + # Log detailed memory usage of all pods every 30 minutes + if now.minute % 30 == 0 and now.second > 29: + mems = ', '.join( + '{}({}): {}'.format( + v.get('name', '-'), + k, + format_iec(v.get('rss', 0.0))) for k, v in sorted( + group_pods[uid].items(), + key=lambda t: -float(t[1]['rss'])) + ) + collectd.info(f'memory usage: Pod: {pod.name}, ' + f'Namespace: {pod.namespace}, ' + f'pids: {mems}') else: - collectd.warning('%s: uid %s for pod %s not found in namespace %s' % ( - PLUGIN, uid, pod.name, pod.namespace)) + collectd.warning('%s: uid %s for pod not found' % (PLUGIN, uid)) continue # K8S platform system usage, i.e., essential: kube-system @@ -815,16 +824,16 @@ def read_func(): for key in group_pods[uid]: k8s_addon[key] = group_pods[uid][key] - message = 'The top 10 memory rss processes for the platform are : %s' + message = 'Top 10 memory usage pids: platform: %s' output_top_10_pids(platform, message) - message = 'The top 10 memory rss processes for the Kubernetes System are :%s' + message = 'Top 10 memory usage pids: Kubernetes System: %s' output_top_10_pids(k8s_system, message) - message = 'The top 10 memory rss processes Kubernetes Addon are :%s' + message = 'Top 10 memory usage pids: Kubernetes Addon: %s' output_top_10_pids(k8s_addon, message) - message = 'The top 10 memory rss processes overall are :%s' + message = 'Top 10 memory usage pids: overall: %s' output_top_10_pids(overall, message) # Calculate base memory usage (i.e., normal memory, exclude K8S and VMs) diff --git a/collectd-extensions/src/ovs_interface.py b/collectd-extensions/src/ovs_interface.py index 7033f66..7c443a1 100755 --- a/collectd-extensions/src/ovs_interface.py +++ b/collectd-extensions/src/ovs_interface.py @@ -1,7 +1,7 @@ # # SPDX-License-Identifier: Apache-2.0 # -# Copyright (C) 2019 Intel Corporation +# Copyright (C) 2019-2024 Intel Corporation # ############################################################################ # @@ -741,7 +741,7 @@ def parse_ovs_appctl_bond_list(buf): buf = buf.strip().split("\n") result = {} for idx, line in enumerate(buf): - if idx is 0: + if idx == 0: continue line = line.strip() @@ -837,7 +837,7 @@ def compare_interfaces(interfaces1, interfaces2): len1 = len(set1 - set2) len2 = len(set2 - set1) - if len1 is 0 and len2 is 0: + if len1 == 0 and len2 == 0: return True else: return False diff --git a/collectd-extensions/src/plugin_common.py b/collectd-extensions/src/plugin_common.py index f632a8c..1bc8f6d 100644 --- a/collectd-extensions/src/plugin_common.py +++ b/collectd-extensions/src/plugin_common.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2019-2022 Wind River Systems, Inc. +# Copyright (c) 2019-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -40,6 +40,7 @@ MIN_AUDITS_B4_FIRST_QUERY = 2 K8S_MODULE_MAJOR_VERSION = int(K8S_MODULE_VERSION.split('.')[0]) KUBELET_CONF = '/etc/kubernetes/kubelet.conf' SSL_TLS_SUPPRESS = True +K8S_TIMEOUT = 2 # Standard units' conversion parameters (mebi, kibi) # Reference: https://en.wikipedia.org/wiki/Binary_prefix @@ -83,9 +84,11 @@ GROUPS_AGGREGATED = [GROUP_PLATFORM, GROUP_BASE, GROUP_K8S_SYSTEM, GROUP_K8S_ADDON, GROUP_CONTAINERS] # First level cgroups -- these are the groups we know about +CGROUP_INIT = 'init.scope' CGROUP_SYSTEM = 'system.slice' CGROUP_USER = 'user.slice' CGROUP_MACHINE = 'machine.slice' +CGROUP_K8SPLATFORM = 'k8splatform.slice' CGROUP_DOCKER = 'docker' CGROUP_K8S = K8S_ROOT @@ -98,7 +101,8 @@ CONTAINERS_CGROUPS = [CGROUP_SYSTEM_CONTAINERD, CGROUP_SYSTEM_DOCKER, CGROUP_SYSTEM_KUBELET, CGROUP_SYSTEM_ETCD] # Groupings by first level cgroup -BASE_GROUPS = [CGROUP_DOCKER, CGROUP_SYSTEM, CGROUP_USER] +BASE_GROUPS = [CGROUP_INIT, CGROUP_DOCKER, CGROUP_SYSTEM, CGROUP_USER, + CGROUP_K8SPLATFORM] BASE_GROUPS_EXCLUDE = [CGROUP_K8S, CGROUP_MACHINE] # Groupings of pods by kubernetes namespace @@ -750,18 +754,28 @@ class K8sClient(object): # Debian # kubectl --kubeconfig KUBELET_CONF get pods --all-namespaces \ # --selector spec.nodeName=the_host -o json - kube_results = subprocess.check_output( - ['kubectl', '--kubeconfig', KUBELET_CONF, - '--field-selector', field_selector, - 'get', 'pods', '--all-namespaces', - '-o', 'json' - ]).decode() - json_results = json.loads(kube_results) + try: + kube_results = subprocess.check_output( + ['kubectl', '--kubeconfig', KUBELET_CONF, + '--field-selector', field_selector, + 'get', 'pods', '--all-namespaces', + '-o', 'json', + ], timeout=K8S_TIMEOUT).decode() + json_results = json.loads(kube_results) + except subprocess.TimeoutExpired: + collectd.error('kube_get_local_pods: Timeout') + return [] + except json.JSONDecodeError as e: + collectd.error('kube_get_local_pods: Could not parse json output, error=%s' % (str(e))) + return [] + except subprocess.CalledProcessError as e: + collectd.error('kube_get_local_pods: Could not get pods, error=%s' % (str(e))) + return [] # convert the items to: kubernetes.client.V1Pod api_items = [self._as_kube_pod(x) for x in json_results['items']] return api_items except Exception as err: - collectd.error("kube_get_local_pods: %s" % (err)) + collectd.error("kube_get_local_pods: error=%s" % (str(err))) raise @@ -783,7 +797,8 @@ class POD_object: """Check whether pod contains platform namespace or platform label""" if (self.namespace in K8S_NAMESPACE_SYSTEM - or self.labels.get(PLATFORM_LABEL_KEY) == GROUP_PLATFORM): + or (self.labels is not None and + self.labels.get(PLATFORM_LABEL_KEY) == GROUP_PLATFORM)): return True return False diff --git a/collectd-extensions/src/python_plugins.conf b/collectd-extensions/src/python_plugins.conf index 240c042..cecf211 100644 --- a/collectd-extensions/src/python_plugins.conf +++ b/collectd-extensions/src/python_plugins.conf @@ -5,6 +5,7 @@ LoadPlugin python debug = false verbose = true + hires = false Import "memory" @@ -21,5 +22,4 @@ LoadPlugin python Import "remotels" Import "service_res" LogTraces = true - Encoding "utf-8" diff --git a/monitor-tools/debian/deb_folder/changelog b/monitor-tools/debian/deb_folder/changelog index 66d25c3..1b20e73 100644 --- a/monitor-tools/debian/deb_folder/changelog +++ b/monitor-tools/debian/deb_folder/changelog @@ -1,3 +1,10 @@ +monitor-tools (1.0-2) unstable; urgency=medium + + * Update schedtop to display cgroups from systemd services and Kubernetes pods + * Add watchpids to find created processes, typically short-lived + + -- Jim Gauld Thu, 12 Sep 2024 09:54:55 -0400 + monitor-tools (1.0-1) unstable; urgency=medium * Initial release. diff --git a/monitor-tools/debian/deb_folder/control b/monitor-tools/debian/deb_folder/control index 4f3ca01..29a2474 100644 --- a/monitor-tools/debian/deb_folder/control +++ b/monitor-tools/debian/deb_folder/control @@ -13,4 +13,5 @@ Description: Monitor tools package This package contains data collection tools to monitor host performance. Tools are general purpose engineering and debugging related. Includes overall memory, cpu occupancy, per-task cpu, - per-task scheduling, per-task io. + per-task scheduling, per-task io, newly created short-lived-processes, + local port scanning. diff --git a/monitor-tools/debian/deb_folder/copyright b/monitor-tools/debian/deb_folder/copyright index 3d5ab57..90e2761 100644 --- a/monitor-tools/debian/deb_folder/copyright +++ b/monitor-tools/debian/deb_folder/copyright @@ -5,7 +5,7 @@ Source: https://opendev.org/starlingx/utilities Files: * Copyright: - (c) 2013-2021 Wind River Systems, Inc + (c) 2013-2024 Wind River Systems, Inc (c) Others (See individual files for more details) License: Apache-2 Licensed under the Apache License, Version 2.0 (the "License"); @@ -26,7 +26,7 @@ License: Apache-2 # If you want to use GPL v2 or later for the /debian/* files use # the following clauses, or change it to suit. Delete these two lines Files: debian/* -Copyright: 2021 Wind River Systems, Inc +Copyright: 2024 Wind River Systems, Inc License: Apache-2 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/monitor-tools/debian/deb_folder/rules b/monitor-tools/debian/deb_folder/rules index 8511f7f..160c68b 100755 --- a/monitor-tools/debian/deb_folder/rules +++ b/monitor-tools/debian/deb_folder/rules @@ -10,5 +10,8 @@ override_dh_install: install -p memtop $(ROOT)/usr/bin install -p schedtop $(ROOT)/usr/bin install -p occtop $(ROOT)/usr/bin + install -p k8smetrics $(ROOT)/usr/bin + install -p portscanner $(ROOT)/usr/bin + install -p watchpids $(ROOT)/usr/bin dh_install diff --git a/monitor-tools/debian/meta_data.yaml b/monitor-tools/debian/meta_data.yaml index 09f5c27..771c33e 100644 --- a/monitor-tools/debian/meta_data.yaml +++ b/monitor-tools/debian/meta_data.yaml @@ -1,6 +1,6 @@ --- debname: monitor-tools -debver: 1.0-1 +debver: 1.0-2 src_path: scripts revision: dist: $STX_DIST diff --git a/monitor-tools/scripts/k8smetrics b/monitor-tools/scripts/k8smetrics new file mode 100755 index 0000000..7c1964a --- /dev/null +++ b/monitor-tools/scripts/k8smetrics @@ -0,0 +1,292 @@ +#!/usr/bin/env python + +######################################################################## +# +# Copyright (c) 2024 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +######################################################################## +# +# Calculate Kubernetes latency percentile metrics (50%, 95, and 99%) for +# etcdserver and kube-apiserver. This is based on Prometheus format raw +# metrics histograms within kube-apiserver. +# +# This obtains current Kubernetes raw metrics cumulative counters, +# (e.g., kubectl get --raw /metrics). The counters represent cumulative +# frequency of delays <= value. This calculates the delta from previous, +# and does percentile calculation. +# +# Example: +# kubectl get --raw /metrics +# +# To see API calls: +# kubectl get --raw /metrics -v 6 +# +# This does minimal parsing and aggregation to yield equivalent of the +# following Prometheus PromQL queries using data over a time-window: +# histogram_quantile(0.95, sum(rate(etcd_request_duration_seconds_bucket[5m])) by (le)) +# histogram_quantile(0.95, sum(rate(apiserver_request_duration_seconds_bucket{verb!~"CONNECT|WATCH|WATCH|PROXY"}[5m])) by (le)) +# histogram_quantile(0.95, sum(rate(workqueue_queue_duration_seconds_bucket[5m])) by (le)) +# histogram_quantile(0.95, sum(rate(rest_client_request_duration_seconds[5m])) by (le)) +# +# Specific verbs are excluded to eliminate tooling anomolies, otherwise +# histogram are polluted with >>40second delays. +# +# TODO(jgauld): Migrate code to use prometheus_client API; +# prometheus_clientthat is not currently installed. +# + +import argparse +from copy import deepcopy +from datetime import datetime, timedelta +import logging +import logging.handlers +import os +import pprint +import re +import subprocess +import sys +import tempfile +import time + +LOG = logging.getLogger(__name__) + +KUBECONFIG = '/etc/kubernetes/admin.conf' +re_bucket = re.compile(r'^([a-zA-Z0-9:_]+)_bucket{(.*)}\s+(\d+)') + +def get_raw_metrics(rawfile=None): + if rawfile is None: + fd, rawfile = tempfile.mkstemp(dir='/tmp', prefix='k8s-prom-raw-', suffix='.log') + with os.fdopen(fd, 'w') as f: + cmd = ['kubectl', '--kubeconfig={}'.format(KUBECONFIG), + 'get', '--raw', '/metrics'] + try: + subprocess.check_call(cmd, stdout=f, timeout=5) + except subprocess.TimeoutExpired as e: + LOG.error('get_raw_metrics: error=%s' % (str(e))) + except subprocess.CalledProcessError as e: + LOG.error('get_raw_metrics: error=%s' % (str(e))) + except Exception as e: + LOG.error('get_raw_metrics: error=%s' % (str(e))) + return rawfile + +def read_raw_metrics(rawfile=None): + patterns = { + 'apiserver_request_duration_seconds': {'exclude_verbs': ['CONNECT', 'WATCH', 'WATCHLIST', 'PROXY']}, + 'etcd_request_duration_seconds': {}, + 'workqueue_queue_duration_seconds': {}, + 'rest_client_request_duration_seconds': {}, + } + names = patterns.keys() + + # Store aggregate bucket values metric[name][le] + metrics = {} + for name in names: + metrics[name] = {} + + cleanup = False + if rawfile is None: + cleanup = True + rawfile = get_raw_metrics() + + with open(rawfile) as f: + for l in f: + if l.startswith(tuple(names)): + # THIS IS TOO VERBOSE FOR TYPICAL DEBUG + #LOG.debug(l.rstrip()) + + match = re_bucket.search(l) + if match: + name = match.group(1) + tags = match.group(2) + count = int(match.group(3)) + + D = {} + for key_value in tags.split(','): + key, value = key_value.split('=') + value = value.replace('"', '') + D.update({key: value}) + + # make sure we have a valid "le" bucket + bucket = D.get('le') + if bucket is None: + continue + + # filter out specific verbs + exclude_verbs = patterns[name].get('exclude_verbs', {}) + if 'verb' in D and D['verb'] in exclude_verbs: + continue + + # Aggregate metric for matching name and "le" bucket + if bucket not in metrics[name]: + metrics[name][bucket] = 0 + metrics[name][bucket] += count + + if cleanup: + os.unlink(rawfile) + + return metrics + + +def percentile(hist, q=0.95): + # Input: dictionary hist[le_bin] = freq + + # these are sorted + le_bins = sorted(list(hist.keys()), key=float) + + # Calculate number of binned samples + count = 0 + for x in le_bins: + count += hist[x] + + p0 = 0.0 + x0 = 0.0 + + for x in le_bins: + x1 = float(x) + p = float(hist[x]) / float(count) + p1 = p0 + p + if p1 >= q: + percentile = x0 + (x1 - x0) * (q - p0) / (p1 - p0) + break + p0 = p1 + percentile = x1 + + return percentile + +def k8smetrics(args=None): + # Read prometheus raw metrics snapshot at time t1 + now = datetime.now() + tstamp1 = now + t1 = read_raw_metrics() + if args.debug: + LOG.debug("t1:") + pprint.pprint(t1, indent=1) + + start_time = now + while now - start_time < timedelta(minutes=args.period_min): + # Copy all state information for time t0 + t0 = deepcopy(t1) + tstamp0 = tstamp1 + + time.sleep(args.interval_min*60) + + # Read prometheus raw metrics snapshot at time t1 + now = datetime.now() + tstamp1 = now + t1 = read_raw_metrics() + if args.debug: + LOG.debug("t1:") + pprint.pprint(t1, indent=1) + + # Print tool header for this interval + duration = tstamp1 - tstamp0 + LOG.info('Samples from: %s - %s, duration: %s' + % (tstamp0, tstamp1, duration)) + + # Calculate delta between cumulative snapshots + delta = {} + for name in t1.keys(): + delta[name] = {} + for bucket in t1[name]: + v0 = t0[name].get(bucket, 0) + delta[name][bucket] = t1[name][bucket] - v0 + + # NOTE: le="+Inf" is identical to value of x_count + # le="y" is upper-bound of the bucket + hist = {} + for name in delta.keys(): + hist[name] = {} + inf = delta[name].pop('+Inf', None) + if inf is None: + continue + buckets = sorted(list(delta[name].keys()), key=float) + + # Calculate frequency distribution from cumulative frequency + maxbin = 0.0 + v0 = 0 + for x in buckets: + v = delta[name][x] + d = v - v0 + # in the case of anomolous value (yeah, we going crazy) + if d < 0: + if args.debug: + LOG.debug('d<0: x=%s, v0=%s, v=%s, d=%s, inf=%s' % (x, v0, v, d, inf)) + d = 0 + if d > 0: + maxbin = float(x) + v0 = v + hist[name][x] = d + + index = name.rfind('_seconds') + text = name[:index] + percentile_50 = 1000.0*percentile(hist[name], q=0.50) + percentile_95 = 1000.0*percentile(hist[name], q=0.95) + percentile_99 = 1000.0*percentile(hist[name], q=0.99) + + # Print histogram summary and percentiles for each metric + print("{} : count: {}, p50: {:.0f} ms, p95: {:.0f} ms, p99: {:.0f} ms, maxbin: {:.0f} ms".format( + text, inf, percentile_50, percentile_95, percentile_99, 1000.0*maxbin)) + print('bins:', end=' ') + [print('{0:5g}'.format(1000.0*float(x)), end=' ') for x in buckets] + print() + print(' <=:', end=' ') + [print('{0:5.0f}'.format(delta[name][x]), end=' ') for x in buckets] + print() + print('hist:', end=' ') + [print('{0:5.0f}'.format(hist[name][x]), end=' ') for x in buckets] + print() + + # blank line between metrics + print() + + return 0 + +def main(): + # Instantiate the parser + parser = argparse.ArgumentParser(description='Kubernetes latency percentile metrics') + + # Optional argument + parser.add_argument('--period_min', type=int, default=1, + help='sampling period in minutes') + parser.add_argument('--interval_min', type=int, default=1, + help='sampling interval in minutes') + parser.add_argument('--debug', action='store_true', + help='enable tool debug') + + args = parser.parse_args() + + # Configure logging + if args.debug: + level = logging.DEBUG + else: + level = logging.INFO + out_hdlr = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter( + '%(asctime)s %(process)s %(levelname)s %(module)s: %(message)s') + out_hdlr.setFormatter(formatter) + out_hdlr.setLevel(level) + LOG.addHandler(out_hdlr) + LOG.setLevel(level) + + LOG.info("Kubernetes latency percentiles: period:%s mins, interval=%s mins", + args.period_min, args.interval_min) + + try: + ret = k8smetrics(args=args) + sys.exit(ret) + + except KeyboardInterrupt as e: + LOG.info('caught: %r, shutting down', e) + sys.exit(0) + + except IOError: + sys.exit(0) + + except Exception as e: + LOG.error('exception: %r', e, exc_info=1) + sys.exit(-4) + +if __name__ == '__main__': + main() diff --git a/monitor-tools/scripts/portscanner b/monitor-tools/scripts/portscanner new file mode 100755 index 0000000..c87aa8a --- /dev/null +++ b/monitor-tools/scripts/portscanner @@ -0,0 +1,153 @@ +#!/usr/bin/env python +######################################################################## +# +# Copyright (c) 2024 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +######################################################################## + +import argparse +import psutil +from psutil._common import addr +from datetime import datetime, timedelta +import logging +import logging.handlers +import time +import os +import sys +import time + +LOG = logging.getLogger(__name__) + +# Global variables +seen_connections = dict() +seen_local_ports = dict() + +def connections_summary(): + # Print overall connections summary + sorted_dict = sorted(seen_connections.items(), key=lambda item: item[1]) + sorted_dict = reversed(sorted_dict) + print('\nSUMMARY: Total connections') + for key, value in sorted_dict: + print("%7d %s" % (value, key)) + +def portscan(args=None): + + match_status = ['ESTABLISHED'] + + LOG.info("Scanning for connections on port:%s, matching status:%s, for %d minutes.", + match_status, args.port, args.duration_min) + + now = datetime.now() + start_time = now + while now - start_time < timedelta(minutes=args.duration_min): + now = datetime.now() + + try: + connections = psutil.net_connections(kind='tcp') + except psutil.Error as error: + LOG.error("Error: %s", str(error)) + connections = [] + time.sleep(1) + + matches = [] + for conn in connections: + if (isinstance(conn.raddr, addr) and + (conn.raddr.port == args.port) and + (any(s == conn.status for s in match_status)) and + str(conn.laddr.port) + str(conn.pid) not in seen_local_ports): + local_port_pid = str(conn.laddr.port) + str(conn.pid) + seen_local_ports[local_port_pid] = seen_local_ports.get(local_port_pid, 0) + 1 + matches.append(conn) + + if matches: + tstamp = now.strftime("%Y-%m-%d %H:%M:%S") + for conn in matches: + try: + p = psutil.Process(pid=conn.pid) + except psutil.Error as error: + LOG.debug("Error: %s", str(error)) + continue + + d = p.as_dict() + pid = conn.pid + r_ip = conn.raddr.ip + + new_match = False + summary_key = '{} {} {} {}'.format( + r_ip, pid, d['name'],' '.join(d['cmdline'])) + if summary_key not in seen_connections: + new_match = True + + # Increment connection counts based on unique key + seen_connections[summary_key] = seen_connections.get(summary_key, 0) + 1 + + # d['environ'] -- too verbose + if new_match: + print("{} Local:{}:{} Remote: {}:{} status:{} ppid:{}, pid:{}, threads:{}, user:{}, name:{}, cmdline:{}".format( + tstamp, + conn.laddr.ip, conn.laddr.port, + conn.raddr.ip, conn.raddr.port, + conn.status, + d['ppid'], d['pid'], d['num_threads'], + d['username'], d['name'],' '.join(d['cmdline']))) + + time.sleep(args.delay) + +def main(): + """Main program.""" + + # Instantiate the parser + parser = argparse.ArgumentParser( + description='Scan processes matching net_connection port') + + # Optional argument + parser.add_argument('--duration_min', type=int, default=5, + help='duration to collect in minutes') + parser.add_argument('--port', type=int, default=5000, + help='specific port to scan') + parser.add_argument('--delay', type=float, default=0.2, + help='scanning delay in seconds') + parser.add_argument('--debug', action='store_true', + help='enable tool debug') + + args = parser.parse_args() + + # Configure logging + if args.debug: + level = logging.DEBUG + else: + level = logging.INFO + out_hdlr = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter( + '%(asctime)s %(process)s %(levelname)s %(module)s: %(message)s') + out_hdlr.setFormatter(formatter) + out_hdlr.setLevel(level) + LOG.addHandler(out_hdlr) + LOG.setLevel(level) + + # Limit access of this tool. + if os.geteuid() != 0: + LOG.error('Require sudo/root.') + sys.exit(1) + + try: + ret = portscan(args=args) + connections_summary() + sys.exit(ret) + + except KeyboardInterrupt as e: + LOG.info('caught: %r, shutting down', e) + connections_summary() + sys.exit(0) + + except IOError: + sys.exit(0) + + except Exception as e: + LOG.error('exception: %r', e, exc_info=1) + sys.exit(-4) + +if __name__ == '__main__': + main() diff --git a/monitor-tools/scripts/schedtop b/monitor-tools/scripts/schedtop index 49408bd..6a9c410 100755 --- a/monitor-tools/scripts/schedtop +++ b/monitor-tools/scripts/schedtop @@ -1,7 +1,7 @@ #!/usr/bin/perl ######################################################################## # -# Copyright (c) 2015-2021 Wind River Systems, Inc. +# Copyright (c) 2015-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -16,7 +16,8 @@ # Usage: schedtop OPTIONS # [--delay=] [--repeat=] [--period=] # [--reset-hwm] [--idle] [--sort=] -# [--watch-cmd=tid1,cmd1,cmd2,...] [--watch-only] [--watch-quiet] +# [--watch-cmd=tid1,cmd1,cmd2,...] [--watch-cgroup=cgroup1,...] +# [--watch-only] [--watch-quiet] # [--trig-delay=time] # [--help] @@ -28,6 +29,7 @@ use Time::HiRes qw(clock_gettime usleep CLOCK_MONOTONIC CLOCK_REALTIME); use Benchmark ':hireswallclock'; use Carp qw(croak carp); use Math::BigInt; +use File::Find (); # Define toolname our $TOOLNAME = "schedtop"; @@ -73,6 +75,10 @@ our $USER_HZ = 100; # no easy way to get this our $CLOCK_NS = SI_G / $USER_HZ; our $print_host = 1; +our @cgroup_procs_paths = (); +our @cgroup_procs_match = (); +our @cgroup_tids = (); + # Print options our ($P_none, $P_lite, $P_brief, $P_full) = (0, 1, 2, 3); our ($P_ps, $P_cpu, $P_del, $P_io, $P_id, $P_cmd) = (0, 1, 2, 3, 4, 5); @@ -88,6 +94,7 @@ our ($arg_debug, $arg_sort, $arg_print, @arg_watch_cmd, + @arg_watch_cgroup, $arg_watch_only, $arg_watch_quiet, $arg_trig_delay, @@ -120,7 +127,7 @@ my @delta_list = ( my @state_list = ( 'exec_max', 'wait_max', 'block_max', - 'pid', 'ppid', 'state', 'comm', 'cmdline', 'wchan', 'affinity', + 'pid', 'ppid', 'state', 'cgroup', 'comm', 'cmdline', 'wchan', 'affinity', 'VmSize', 'VmRSS', 'start_time', 'nice', 'policy', 'priority', 'rt_priority', 'task_cpu' ); @@ -142,6 +149,7 @@ $| = 1; \$::arg_sort, \$::arg_print, \@::arg_watch_cmd, + \@::arg_watch_cgroup, \$::arg_watch_only, \$::arg_watch_quiet, \$::arg_trig_delay, @@ -198,8 +206,9 @@ printf "selected options: ". $::arg_sort, $::arg_print; if (@::arg_watch_cmd) { printf "selected watch/trigger options: ". - "watch-cmd=%s, only=%s, quiet=%s, delay=%d ms\n", + "watch-cmd=%s, watch-cgroup=%s, only=%s, quiet=%s, delay=%d ms\n", join(',', @::arg_watch_cmd), + join(',', @::arg_watch_cgroup), (defined $::arg_watch_only ? 'true' : 'false'), (defined $::arg_watch_quiet ? 'true' : 'false'), $::arg_trig_delay; @@ -218,6 +227,12 @@ for (my $i=0; $i < $::num_cpus; $i++) { } $w_aff = &max(length 'AFF', length $::affinity_mask->as_hex()); +# Find cgroup.proc paths matching specified cgroup patterns +&find_matching_cgroup_procs(\@::cgroup_procs_match, \@::arg_watch_cgroup); +for my $file (@::cgroup_procs_match) { + print "matched cgroup:", $file, "\n"; +} + # Reset scheduling hi-water marks if (defined $::arg_reset_hwm) { &get_tids(\%::tids_1); @@ -246,7 +261,7 @@ if ($is_schedstat) { # Get current scheduling and io info for all tids &read_sched(\%::tids_1, \%::task_1); # Track watched tids for monitoring -&track_watched_tids(\%::tids_1, \%::tids_w, \%::task_1, \@::arg_watch_cmd); +&track_watched_tids(\%::tids_1, \%::tids_w, \%::task_1, \@::arg_watch_cmd, \@::arg_watch_cgroup); # determine column sort order my $s_keyw = 'watched'; @@ -295,11 +310,46 @@ REPEAT_LOOP: for (my $repeat=1; $repeat <= $::arg_repeat; $repeat++) { &read_stat(\%::percpu_1); } if (defined $::arg_watch_only) { - # Get list of pids and tids from watched commands; - # this reduces cpu impact dramatically - foreach my $tid (keys %::tids_w) { - $::tids_1{$tid} = $::tids_w{$tid}; + # This determines a subset of pids and tids + # based on previous watched tids and matching cgroups. + # This should reduce cpu impact dramatically. + + # Get list of pids and tids + &get_tids(\%::tids_1); + + # Get array of tids corresponding to matching cgroups + &read_cgroup_procs(\@::cgroup_tids, \@::cgroup_procs_match); + my %cgroup_tids_h = map { $_ => 1 } @::cgroup_tids; + + # Keep previous watched tids and find new matches from cgroup.procs + my @del_tids = (); + foreach my $tid (keys %::tids_1) { + my $pid = $::tids_1{$tid}; + next if (exists $::tids_w{$tid}); + if (exists $cgroup_tids_h{$tid}) { + $::tids_w{$tid} = $pid; + printf "ADD watching: tid=%7d\n", $tid; + next; + } + push(@del_tids, $tid); } + + # Prune tids not actually being watched + foreach my $tid (@del_tids) { + delete $::tids_1{$tid}; + } + + # Prune watched tids that not longer exist + my @del_tids_w = (); + foreach my $tid (keys %::tids_w) { + next if (exists $::tids_1{$tid}); + push(@del_tids_w, $tid); + } + foreach my $tid (@del_tids_w) { + printf "REM watching: tid=%7d\n", $tid; + delete $::tids_w{$tid}; + } + } else { # Get list of pids and tids &get_tids(\%::tids_1); @@ -462,7 +512,7 @@ REPEAT_LOOP: for (my $repeat=1; $repeat <= $::arg_repeat; $repeat++) { if ($::opt_P{$::P_cmd} == $::P_brief) { $L .= sprintf "%s", "cmdline"; } elsif ($::opt_P{$::P_cmd} == $::P_full) { - $L .= sprintf "%-15s %s", "comm", "cmdline"; + $L .= sprintf "%-16s %-15s %s", "cgroup", "comm", "cmdline"; } print $L, "\n"; @@ -526,7 +576,8 @@ REPEAT_LOOP: for (my $repeat=1; $repeat <= $::arg_repeat; $repeat++) { if ($::opt_P{$::P_cmd} == $::P_brief) { $L .= sprintf "%s", $::D_task{$tid}{'cmdline'}; } elsif ($::opt_P{$::P_cmd} == $::P_full) { - $L .= sprintf "%-15s %s", + $L .= sprintf "%-16s %-15s %s", + substr($::D_task{$tid}{'cgroup'}, 0, 16), substr($::D_task{$tid}{'comm'}, 0, 15), $::D_task{$tid}{'cmdline'}; } @@ -625,17 +676,36 @@ sub get_tids } # Reset scheduling hi-water-marks +# NOTE: Reset by write 0 to sched is finicky; use brute force sub reset_sched_hwm { (local *::tids) = @_; # reset scheduling hi-water-marks by writing '0' to each task + my (%pids_) = (); + foreach my $tid (keys %::tids) { + my $pid = $::tids{$tid}; + $pids_{$pid} = 1; + } + foreach my $pid (keys %pids_) { + my $file = '/proc/' . $pid . '/sched'; + open(my $fh, "> $file") || next; + print $fh "0\n"; + close($fh); + } foreach my $tid (keys %::tids) { my $file = '/proc/' . $tid . '/sched'; open(my $fh, "> $file") || next; print $fh "0\n"; close($fh); } + foreach my $tid (keys %::tids) { + my $pid = $::tids{$tid}; + my $file = '/proc/' . $pid . '/task/' . $tid . '/sched'; + open(my $fh, "> $file") || next; + print $fh "0\n"; + close($fh); + } } # Trigger a crash dump via sysrq, result in /var/crash . @@ -658,22 +728,81 @@ sub sysrq_trigger_crash # Track watched tids for monitoring sub track_watched_tids { - (local *::tids, local *::tids_w, local *::task, local *::arg_watch_cmd) = @_; + (local *::tids, local *::tids_w, local *::task, local *::arg_watch_cmd, local *::arg_watch_cgroup) = @_; foreach my $tid (keys %::tids) { my $pid = $::tids{$tid}; my $comm = $::task{$tid}{'comm'}; + my $cgroup = $::task{$tid}{'cgroup'}; + my $cmdline = $::task{$tid}{'cmdline'}; my $watched = 0; + + next if (exists $::tids_w{$tid}); + foreach my $cmd (@::arg_watch_cmd) { - if (($cmd =~ /^\d+$/) && ($tid == $cmd)) { + if (($cmd =~ /^\d+$/) && (($tid == $cmd) || ($pid == $cmd))) { $::tids_w{$tid} = $pid; - printf "watching: tid=%7d, comm=%s\n", $tid, $comm; + printf "watching: tid=%7d, cgroup=%s, comm=%s, cmdline=%.40s\n", $tid, $cgroup, $comm, $cmdline; } if ((defined $comm) && ($comm =~ /^\Q$cmd\E/)) { $::tids_w{$tid} = $pid; - printf "watching: tid=%7d, comm=%s\n", $tid, $comm; + printf "watching: tid=%7d, cgroup=%s, comm=%s, cmdline=%.40s\n", $tid, $cgroup, $comm, $cmdline; } } + foreach my $cg (@::arg_watch_cgroup) { + if ((defined $cgroup) && ($cgroup =~ /^\Q$cg\E/)) { + $::tids_w{$tid} = $pid; + printf "watching: tid=%7d, cgroup=%s, comm=%s, cmdline=%.40s\n", $tid, $cgroup, $comm, $cmdline; + } + } + } +} + +# Find module difficult, storing result in global variable +sub wanted_cgroup_procs { + my $F = $File::Find::name; + if ($_ eq 'cgroup.procs') { + push @::cgroup_procs_paths, $F; + } +} + +# Find cgroup.proc paths matching specified cgroup patterns +sub find_matching_cgroup_procs +{ + (local *::cgroup_procs_match, local *::arg_watch_cgroup) = @_; + + # Find all cgroup.procs paths for the pids cgroup controller + File::Find::find(\&wanted_cgroup_procs, '/sys/fs/cgroup/pids'); + + foreach my $file (@::cgroup_procs_paths) { + foreach my $cg (@::arg_watch_cgroup) { + if ($file =~ /\Q$cg\E(\.service|\.scope)/) { + push(@::cgroup_procs_match, $file); + } elsif ($file =~ /kubepods\/\w+\/\Q$cg\E/) { + push(@::cgroup_procs_match, $file); + } + } + } +} + +# Get array of tids corresponding to matching cgroups +sub read_cgroup_procs +{ + (local *::tids, local *::cgroup_procs_match) = @_; + + my $tid = (); + + # reset scheduling hi-water-marks by writing '0' to each task + foreach my $cgroup_procs (@::cgroup_procs_match) { + open(my $fh, $cgroup_procs) || goto SKIP_PROCS; + while (<$fh>) { + if (/^(\d+)$/) { + $tid = $1; + push @::tids, $tid; + } + } + close($fh); + SKIP_PROCS:; } } @@ -703,7 +832,7 @@ sub read_sched $gtime, $cgtime, $start_data, $end_data, $start_brk, $arg_start, $arg_end, $env_start, $env_end, $exit_code) = (); - + my ($cgroup) = (); my ($nr_switches, $nr_migrations) = (0,0); my ($exec_runtime, $exec_max) = (0.0, 0.0); my ($wait_max, $wait_sum, $wait_count) = (0.0, 0.0, 0); @@ -716,7 +845,7 @@ sub read_sched $cancelled_write_bytes) = (0,0,0,0,0,0,0); my ($sched_valid, $io_valid, $status_valid, $cmdline_valid, - $wchan_valid, $stat_valid) = (); + $wchan_valid, $stat_valid, $cgroup_valid) = (); $pid = $::tids{$tid}; @@ -765,6 +894,67 @@ sub read_sched #prio : 120 #clock-delta : 28 + # Changes for 6.6.0 kernel + #cat /proc/1/sched + #systemd (1, #threads: 1) + #------------------------------------------------------------------- + #se.exec_start : 251536392.418317 + #se.vruntime : 542073.435409 + #se.sum_exec_runtime : 1097697.572750 + #se.nr_migrations : 35039 + #sum_sleep_runtime : 249925608.224346 + #sum_block_runtime : 234992.983051 + #wait_start : 0.000000 + #sleep_start : 251536392.418317 + #block_start : 0.000000 + #sleep_max : 11967.794377 + #block_max : 1230.041276 + #exec_max : 147.808142 + #slice_max : 78.070544 + #wait_max : 180.271599 + #wait_sum : 440802.706697 + #wait_count : 1022180 + #iowait_sum : 81.179285 + #iowait_count : 63 + #nr_migrations_cold : 0 + #nr_failed_migrations_affine : 145872 + #nr_failed_migrations_running : 67209 + #nr_failed_migrations_hot : 82715 + #nr_forced_migrations : 12 + #nr_wakeups : 264124 + #nr_wakeups_sync : 41 + #nr_wakeups_migrate : 205 + #nr_wakeups_local : 146458 + #nr_wakeups_remote : 117666 + #nr_wakeups_affine : 204 + #nr_wakeups_affine_attempts : 409 + #nr_wakeups_passive : 0 + #nr_wakeups_idle : 0 + #avg_atom : 1.072258 + #avg_per_cpu : 31.327879 + #nr_switches : 1023725 + #nr_voluntary_switches : 264916 + #nr_involuntary_switches : 758809 + #se.load.weight : 1048576 + #se.avg.load_sum : 1490 + #se.avg.runnable_sum : 1526937 + #se.avg.util_sum : 365568 + #se.avg.load_avg : 32 + #se.avg.runnable_avg : 32 + #se.avg.util_avg : 7 + #se.avg.last_update_time : 251536392418304 + #se.avg.util_est.ewma : 163 + #se.avg.util_est.enqueued : 7 + #policy : 0 + #prio : 120 + #clock-delta : 112 + #mm->numa_scan_seq : 0 + #numa_pages_migrated : 0 + #numa_preferred_nid : -1 + #total_numa_faults : 0 + #current_node=0, numa_group_id=0 + #numa_faults node=0 task_private=0 task_shared=0 group_private=0 group_shared=0 + # parse /proc//task//sched $file = '/proc/' . $pid . '/task/' . $tid . '/sched'; open($fh, $file) || goto SKIP_SCHED; @@ -774,19 +964,19 @@ sub read_sched } my ($k, $v, $c0); LOOP_SCHED: while (<$fh>) { - if (/^se\.statistics.{1,2}wait_max\s+:\s+(\S+)/) { + if (/^wait_max\s+:\s+(\S+)/ || /^se\.statistics.{1,2}wait_max\s+:\s+(\S+)/) { $wait_max = $1; - } elsif (/^se\.statistics.{1,2}block_max\s+:\s+(\S+)/) { + } elsif (/^block_max\s+:\s+(\S+)/ || /^se\.statistics.{1,2}block_max\s+:\s+(\S+)/) { $block_max = $1; - } elsif (/^se\.statistics.{1,2}wait_sum\s+:\s+(\S+)/) { + } elsif (/^wait_sum\s+:\s+(\S+)/ || /^se\.statistics.{1,2}wait_sum\s+:\s+(\S+)/) { $wait_sum = $1; - } elsif (/^se\.statistics.{1,2}wait_count\s+:\s+(\S+)/) { + } elsif (/^wait_count\s+:\s+(\S+)/ || /^se\.statistics.{1,2}wait_count\s+:\s+(\S+)/) { $wait_count = $1; - } elsif (/^se\.statistics.{1,2}exec_max\s+:\s+(\S+)/) { + } elsif (/^exec_max\s+:\s+(\S+)/ || /^se\.statistics.{1,2}exec_max\s+:\s+(\S+)/) { $exec_max = $1; - } elsif (/^se\.statistics.{1,2}iowait_sum\s+:\s+(\S+)/) { + } elsif (/^iowait_sum\s+:\s+(\S+)/ || /^se\.statistics.{1,2}iowait_sum\s+:\s+(\S+)/) { $iowait_sum = $1; - } elsif (/^se\.statistics.{1,2}iowait_count\s+:\s+(\S+)/) { + } elsif (/^iowait_count\s+:\s+(\S+)/ || /^se\.statistics.{1,2}iowait_count\s+:\s+(\S+)/) { $iowait_count = $1; } elsif (/^se\.sum_exec_runtime\s+:\s+(\S+)/) { $exec_runtime = $1; @@ -967,6 +1157,46 @@ sub read_sched $stat_valid = 1; close($fh); + #cat /proc/1/task/1/cgroup + #12:cpu,cpuacct:/init.scope + #11:pids:/init.scope + #10:hugetlb:/ + #9:memory:/init.scope + #8:rdma:/ + #7:cpuset:/ + #6:net_cls,net_prio:/ + #5:devices:/init.scope + #4:blkio:/init.scope + #3:freezer:/ + #2:perf_event:/ + #1:name=systemd:/init.scope + #0::/init.scope + + # Extract the pod id: + # /k8s-infra/kubepods/burstable/pode84531c2-0bb1-45f8-b27f-e779b858552d/fdeaea0e577a525a3d9e41655ee05dd9b4edf17ce4b1bf95803cae1518f43ca2 + # Extract *.service or *.scope name: + # /system.slice/acpid.service + # /system.slice/system-ceph.slice/ceph-mds.scope + + # parse /proc//task//cgroup + $file = '/proc/' . $pid . '/task/' . $tid . '/cgroup'; + open($fh, $file) || next; + LOOP_CGROUP: while (<$fh>) { + if (/^\d+:(pids|cpu,cpuacct):(.*)/) { + $_ = $2; + if (/kubepods\/\w+\/(pod[a-z0-9-]+)\/\w+$/) { + $cgroup = $1; + } elsif (/\/([a-zA-Z0-9_-@:]+)\.\w+$/) { + $cgroup = $1; + } else { + $cgroup = '-'; # '-' looks prettier than '/' + } + $cgroup_valid = 1; + last LOOP_CGROUP; + } + } + close($fh); + # sched if (defined $sched_valid) { $::task{$tid}{'exec_runtime'} = $exec_runtime; @@ -1060,6 +1290,14 @@ sub read_sched $::task{$tid}{'start_time'} = ''; $::task{$tid}{'task_cpu'} = 0; } + + # cgroup + if (defined $cgroup_valid) { + $::task{$tid}{'cgroup'} = $cgroup; + } else { + $::task{$tid}{'cgroup'} = '-'; + } + } } @@ -1327,6 +1565,7 @@ sub parse_schedtop_args { local *::arg_sort, local *::arg_print, local *::arg_watch_cmd, + local *::arg_watch_cgroup, local *::arg_watch_only, local *::arg_watch_quiet, local *::arg_trig_delay, @@ -1356,6 +1595,7 @@ sub parse_schedtop_args { "sort=s", \$::arg_sort, "print=s", \$::arg_print, "watch-cmd=s@", \@::arg_watch_cmd, + "watch-cgroup=s@", \@::arg_watch_cgroup, "watch-only", \$::arg_watch_only, "watch-quiet", \$::arg_watch_quiet, "trig-delay=i", \$::arg_trig_delay, @@ -1383,17 +1623,17 @@ sub parse_schedtop_args { $fail = 1; warn "$::TOOLNAME: Input error: --print=$::arg_print invalid; valid options are: brief, full\n"; } - if ((defined $::arg_watch_only) && !(@::arg_watch_cmd)) { + if ((defined $::arg_watch_only) && !(@::arg_watch_cmd || @::arg_watch_cgroup)) { $fail = 1; - warn "$::TOOLNAME: Input error: --watch-only requires --watch-cmd option.\n"; + warn "$::TOOLNAME: Input error: --watch-only requires --watch-cmd or --watch-cgroup option.\n"; } - if ((defined $::arg_watch_quiet) && !(@::arg_watch_cmd)) { + if ((defined $::arg_watch_quiet) && !(@::arg_watch_cmd || @::arg_watch_cgroup)) { $fail = 1; - warn "$::TOOLNAME: Input error: --watch-quiet requires --watch-cmd option.\n"; + warn "$::TOOLNAME: Input error: --watch-quiet requires --watch-cmd or --watch-cgroup option.\n"; } - if ((defined $::arg_trig_delay) && !(@::arg_watch_cmd)) { + if ((defined $::arg_trig_delay) && !(@::arg_watch_cmd || @::arg_watch_cgroup)) { $fail = 1; - warn "$::TOOLNAME: Input error: --trig-delay requires --watch-cmd option.\n"; + warn "$::TOOLNAME: Input error: --trig-delay requires --watch-cmd or --watch-cgroup option.\n"; } if ((defined $::arg_trig_delay) && ($::arg_trig_delay < 1)) { $fail = 1; @@ -1407,6 +1647,14 @@ sub parse_schedtop_args { push(@::arg_watch_cmd, split(',', $cmd)); } } + if (@::arg_watch_cgroup) { + my @cgroups = @::arg_watch_cgroup; + @::arg_watch_cgroup = (); + for my $cgroup (@cgroups) { + push(@::arg_watch_cgroup, split(',', $cgroup)); + } + } + if (@::ARGV) { $fail = 1; warn "$::TOOLNAME: Input error: not expecting these options: '@::ARGV'.\n"; @@ -1443,7 +1691,8 @@ sub Usage { printf "Usage: $::TOOLNAME OPTIONS\n"; printf " [--delay=] [--repeat=] [--period=]\n"; printf " [--reset-hwm] [--idle] [--sort=] [--print=]\n"; - printf " [--watch-cmd=tid1,cmd1,cmd2,...] [--watch-only] [--watch-quiet]\n"; + printf " [--watch-cmd=tid1,cmd1,cmd2,...] [--watch-cgroup=cgroup1,...]\n"; + printf " [--watch-only] [--watch-quiet]\n"; printf " [--trig-delay=time]\n"; printf " [--help]\n"; @@ -1465,6 +1714,8 @@ sub ListHelp { printf("Watch specific tasks or commands:\n"); printf(" --watch-cmd=tid1,cmd1,... : watch specific tids or 'comm' names\n"); printf(" (matches from beginning of comm with partial name, eg, --watch-cmd=sirq)\n"); + printf(" --watch-cgroup=cgroup1,... : watch specific cgroup names\n"); + printf(" (matches from beginning of cgroup with partial name, eg, --watch-cgroup=sm)\n"); printf(" --watch-only : display only watched tasks (reduces impact of tool)\n"); printf(" --watch-quiet : suppress output after watch starts\n"); printf("Trigger crash dump via sysrq:\n"); diff --git a/monitor-tools/scripts/watchpids b/monitor-tools/scripts/watchpids new file mode 100755 index 0000000..d9e9152 --- /dev/null +++ b/monitor-tools/scripts/watchpids @@ -0,0 +1,861 @@ +#!/usr/bin/perl +######################################################################## +# +# Copyright (c) 2015-2024 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +# +######################################################################## +# +# Description: +# This displays process calling tree information for newly created +# processess. +# +# Usage: watchpids OPTIONS +# [--delay=] [--repeat=] [--period=] +# [--help] + +## TODO: JGAULD +# --tids +# --tree | --forest +# --simple +# --exclude=name +# --filter=affinity mask (TBD.. eg, 0x1000001 ) + +use strict; +use warnings; +use Data::Dumper; +use POSIX qw(uname strftime); +use Time::HiRes qw(clock_gettime usleep CLOCK_MONOTONIC CLOCK_REALTIME); +use Benchmark ':hireswallclock'; +use Carp qw(croak carp); +use Math::BigInt; +use File::Find (); + +# Define toolname +our $TOOLNAME = "watchpids"; +our $VERSION = "0.1"; + +# Constants +use constant SI_k => 1.0E3; +use constant SI_M => 1.0E6; +use constant SI_G => 1.0E9; +use constant Ki => 1024.0; +use constant Mi => 1024.0*1024.0; +use constant Gi => 1024.0*1024.0*1024.0; + +# Globals +our %opt_V = (); +our %opt_P = (); +our %percpu_0 = (); +our %percpu_1 = (); +our %task_0 = (); +our %task_1 = (); +our %task_n = (); +our %tids_0 = (); +our %tids_1 = (); +our %tids_w = (); +our %loadavg = (); +our $tm_0 = (); +our $tm_1 = (); +our $tr_0 = (); +our $tr_1 = (); +our $tm_elapsed = (); +our $tr_elapsed = (); +our $tm_final = (); +our $uptime = (); +our $num_cpus = 1; +our $affinity_mask = Math::BigInt->new('0'); +our $w_aff = 10; +our $num_tasks = 0; +our $num_blk = 0; +our $num_state_D = 0; +our $USER_HZ = 100; # no easy way to get this +our $CLOCK_NS = SI_G / $USER_HZ; +our $print_host = 1; + +our @cgroup_procs_paths = (); +our @cgroup_procs_match = (); +our @cgroup_tids = (); + + +# Argument list parameters +our ($arg_debug, + $arg_delay, + $arg_repeat, + $arg_period, + $arg_tids, + ) = (); + +#------------------------------------------------------------------------------- +# MAIN Program +#------------------------------------------------------------------------------- +my $ONE_BILLION = 1.0E9; +my $MIN_DELAY = 0.001; +my $MAX_DELAY = 0.001; + +# benchmark variables +my ($bd, $b0, $b1); +my @policies = ('OT', 'FF', 'RR', 'BA', 'ID', 'UN', 'UN'); + +my @state_list = ( + 'pid', 'ppid', 'state', 'cgroup', 'comm', 'cmdline', 'affinity', + 'VmSize', 'VmRSS', 'start_time', + 'nice', 'policy', 'priority', 'rt_priority', 'task_cpu' +); + +# Autoflush output +select(STDERR); +$| = 1; +select(STDOUT); # default +$| = 1; + +# Parse input arguments and print tool usage if necessary +&parse_watchpids_args( + \$::arg_debug, + \$::arg_delay, + \$::arg_repeat, + \$::arg_period, + \$::arg_tids, +); + +# Check for root user +if ($>) { + warn "$::TOOLNAME: requires root/sudo.\n"; + exit 1; +} + +# Print out some debugging information +if (defined $::arg_debug) { + $Data::Dumper::Indent = 1; +} + +# Print out selected options +printf "selected options: delay = %.3fs, repeat = %d, tids = %s\n", + $::arg_delay, $::arg_repeat, $::arg_tids ? 'true' : 'false'; + +# Capture timestamp +$b0 = new Benchmark; + +# Get number of logical cpus +&get_num_logical_cpus(\$::num_cpus); +$::affinity_mask = Math::BigInt->new('0'); +for (my $i=0; $i < $::num_cpus; $i++) { + my $y = Math::BigInt->new('1'); + $y->blsft($i); + $::affinity_mask->bior($y); +} +$w_aff = &max(length 'AFF', length $::affinity_mask->as_hex()); + +# Find cgroup.proc paths matching specified cgroup patterns +&find_matching_cgroup_procs(\@::cgroup_procs_match, \@::arg_watch_cgroup); +for my $file (@::cgroup_procs_match) { + print "matched cgroup:", $file, "\n"; +} + +# Get current hires epoc timestamp +$::tm_1 = clock_gettime(CLOCK_MONOTONIC); +$::tr_1 = clock_gettime(CLOCK_REALTIME); +$::tm_final = $::tm_1 + $::arg_delay*$::arg_repeat; + +# Set initial delay +$::tm_elapsed = $::arg_delay; +$MAX_DELAY = $::arg_delay + $MIN_DELAY; + + +# Get list of pids and tids +&get_tids(\%::tids_1, \$::arg_tids); + +# Get current scheduling info for all tids if new or requiring refresh +&read_sched(\%::tids_1, \%::task_0, \%::task_1, \%::task_n); + +# Get current uptime +&get_uptime(\$::uptime); + +# determine column sort order +my ($s_key1, $s_key2, $s_key3) = (); +($s_key1, $s_key2, $s_key3) = ('ppid', 'pid', 'tid'); + + +# Main loop +REPEAT_LOOP: for (my $repeat=1; $repeat <= $::arg_repeat; $repeat++) { + + # copy all state variables + $::tm_0 = (); $::tr_0 = (); %::tids_0 = (); %::task_0 = (); + $::tm_0 = $::tm_1; $::tr_0 = $::tr_1; + foreach my $tid (keys %::tids_1) { $::tids_0{$tid} = $::tids_1{$tid}; } + + foreach my $tid (keys %::task_1) { + # TODO: JGAULD -- problem with undefined affinity in task_0 + ## TODO: BEGIN if ($::task_1{$tid}{'age'} == 0) { + foreach my $var (keys %{$::task_1{$tid}}) { + $::task_0{$tid}{$var} = $::task_1{$tid}{$var}; + } + ## TODO: END } + } + + # estimate sleep delay to achieve desired interarrival by subtracting out + # the measured cpu runtime of the tool. + my $delay = $::arg_delay; + $delay = $MIN_DELAY if ($delay < $MIN_DELAY); + $delay = $MAX_DELAY if ($delay > $MAX_DELAY); + usleep( SI_M*$delay ); + + # Collect current state + $::tm_1 = (); $::tr_1 = (); %::percpu_1 = (); %::tids_1 = (); %::task_1 = (); %::task_n = (); + # Get current hires epoc timestamp + $::tm_1 = clock_gettime(CLOCK_MONOTONIC); + $::tr_1 = clock_gettime(CLOCK_REALTIME); + + # Get list of pids and tids + &get_tids(\%::tids_1, \$::arg_tids); + + # JGAULD: PRUNE LOGIC TBD + + # Get current scheduling info for all tids if new or requiring refresh + &read_sched(\%::tids_1, \%::task_0, \%::task_1, \%::task_n); + my $num_new = keys %::task_n; + + # Get current uptime + &get_uptime(\$::uptime); + + # Delta calculation + $::tm_elapsed = $::tm_1 - $::tm_0; + $::tr_elapsed = $::tr_1 - $::tr_0; + + # Print heading and column titles only if we have new pids + if ($num_new) { + # Print summary + &watchpids_header( + \$::tr_1, + \$::tm_elapsed, + \$::tr_elapsed, + \$::uptime, + \$::print_host + ); + + print "\n"; + + # Build up output line by specific area + my $L = (); + $L = ''; + if ($::arg_tids) { + $L .= sprintf "%7s %7s %7s ", "PPID", "PID", "TID"; + } else { + $L .= sprintf "%7s %7s ", "PPID", "PID"; + } + $L .= sprintf "%1s %2s %*s %2s %3s %4s ", + "S", "P", $w_aff, "AFF", "PO", "NI", "PR"; + $L .= sprintf "%-16s %-15s %s", "cgroup", "comm", "cmdline"; + print $L, "\n"; + } + + # TODO: JGAULD : Use of uninitialized value in numeric comparison (<=>) at ./watchpids line 237. + #foreach my $tid (sort {($::task_1{$b}{$s_key1} <=> $::task_1{$a}{$s_key1}) or + # ($::task_1{$b}{$s_key2} <=> $::task_1{$a}{$s_key2}) or + # ($::task_1{$b}{$s_key3} <=> $::task_1{$a}{$s_key3})} keys %::task_n) { + # TODO: JGAULD make this configurable maybe? --long + my $COMMAND_LEN = 120; + + # TODO: JGAULD -- add option for parent calling forest tree + + foreach my $tid (keys %::task_n) { + + # TODO: JGAULD : UNDEFINDED AFFINITY + my $aff = Math::BigInt->new('0')->as_hex(); + if (defined $::task_1{$tid}{'affinity'}) { + $aff = $::task_1{$tid}{'affinity'}->as_hex(); + } else { + # TODO: JGAULD -- DEBUG -- only field is 'age', no other keys + if (defined $::arg_debug) { + print Data::Dumper->Dump([\%::task_1{$tid}], [qw(task_1)]); + } + next; + } + + # Build up output line by specific area + my $L = (); + $L = ''; + if ($::arg_tids) { + $L .= sprintf "%7d %7d %7d ", + $::task_1{$tid}{'ppid'}, $::task_1{$tid}{'pid'}, $tid; + } else { + $L .= sprintf "%7d %7d ", + $::task_1{$tid}{'ppid'}, $::task_1{$tid}{'pid'}; + } + $L .= sprintf "%1s %2d %*s %2s %3d %4d ", + $::task_1{$tid}{'state'}, $::task_1{$tid}{'task_cpu'}, $w_aff, $aff, + $policies[$::task_1{$tid}{'policy'}], $::task_1{$tid}{'nice'}, + $::task_1{$tid}{'priority'}; + $L .= sprintf "%-16s %-15s %s", + substr($::task_1{$tid}{'cgroup'}, 0, 16), + substr($::task_1{$tid}{'comm'}, 0, 15), + substr($::task_1{$tid}{'cmdline'}, 0, $COMMAND_LEN); + # JGAULD: SHORTEN: $::task_1{$tid}{'cmdline'}; + + print $L, "\n"; + } + if ($num_new) { + print "\n"; + } + + # exit repeat loop if we have exceeded overall time + last if ($::tm_1 > $::tm_final); + +} # REPEAT LOOP + +# Print that tool has finished +print "done\n"; + +# Capture timestamp and report delta +$b1 = new Benchmark; $bd = Benchmark::timediff($b1, $b0); +printf "processing time: %s\n", timestr($bd); +exit 0; + + +#------------------------------------------------------------------------------- +# Convert a number to SI unit xxx.yyyG +sub format_SI +{ + (my $value) = @_; + if ($value >= SI_G) { + return sprintf("%.3fG", $value/SI_G); + } elsif ($value >= SI_M) { + return sprintf("%.3fM", $value/SI_M); + } elsif ($value >= SI_k) { + return sprintf("%.3fk", $value/SI_k); + } else { + return sprintf("%.0f", $value); + } +} + +# Convert to IEC binary unit xxx.yyyGi +# Since underlying memory units are in pages, don't need decimals for Ki +sub format_IEC +{ + (my $value) = @_; + if ($value >= Gi) { + return sprintf("%.3fGi", $value/Gi); + } elsif ($value >= Mi) { + return sprintf("%.3fMi", $value/Mi); + } elsif ($value >= Ki) { + return sprintf("%.0fKi", $value/Ki); + } else { + return sprintf("%.0f", $value); + } +} + +# Determine max of array +sub max { + my ($max, @vars) = @_; + for (@vars) { + $max = $_ if $_ > $max; + } + return $max; +} + +# Determine tids and pid mapping by walking /proc//task/ +sub get_tids +{ + (local *::tids, *::arg_tids) = @_; + my (@pids_, @tids_) = (); + my ($dh, $pid, $tid); + + # get pid list + my $dir = '/proc'; + opendir($dh, $dir) || croak "Cannot open directory: $dir ($!)"; + @pids_ = grep { /^\d+$/ && -d "$dir/$_" } readdir($dh); + closedir $dh; + + if ($::arg_tids) { + # get tid list + foreach $pid (@pids_) { + $dir = '/proc/' . $pid . '/task'; + opendir(my $dh, $dir) || next; + @tids_ = grep { /^\d+$/ && -d "$dir/$_" } readdir($dh); + closedir $dh; + foreach $tid (@tids_) { $::tids{$tid} = $pid; } + } + } else { + foreach $pid (@pids_) { $::tids{$pid} = $pid; } + } +} + +# Find module difficult, storing result in global variable +sub wanted_cgroup_procs { + my $F = $File::Find::name; + if ($_ eq 'cgroup.procs') { + push @::cgroup_procs_paths, $F; + } +} + +# Find cgroup.proc paths matching specified cgroup patterns +sub find_matching_cgroup_procs +{ + (local *::cgroup_procs_match, local *::arg_watch_cgroup) = @_; + + # Find all cgroup.procs paths for the pids cgroup controller + File::Find::find(\&wanted_cgroup_procs, '/sys/fs/cgroup/pids'); +} + +# Get array of tids corresponding to matching cgroups +sub read_cgroup_procs +{ + (local *::tids, local *::cgroup_procs_match) = @_; + + my $tid = (); + + # reset scheduling hi-water-marks by writing '0' to each task + foreach my $cgroup_procs (@::cgroup_procs_match) { + open(my $fh, $cgroup_procs) || goto SKIP_PROCS; + while (<$fh>) { + if (/^(\d+)$/) { + $tid = $1; + push @::tids, $tid; + } + } + close($fh); + SKIP_PROCS:; + } +} + +# Parse cpu and scheduling info for each tid +# - ignore the specific tid if there is incomplete data, +# (i.e., cannot obtain info because task has died, +# eg. missing ./stat, ./status, ./cmdline, ./wchan) +# +sub read_sched +{ + (local *::tids, local *::task_0, local *::task, local *::task_n) = @_; + + # TODO: JGAULD -- consider changing this to global; + # maybe it has to be input option; very unlikely folks + # dynamically changing scheduling attributes + my $TASK_REFRESH_INTERVAL = 100; + + %::task = (); + %::task_n = (); + foreach my $tid (keys %::tids) { + my ($fh, $file, $pid, $comm, $cmdline, $wchan, $id) = (); + my ($tpid, $tcomm, $state, $ppid, $pgrp, $sid, + $tty_nr, $tty_pgrp, $flags, + $min_flt, $cmin_flt, $maj_flt, $cmaj_flt, + $utime, $stime, $cutime, $cstime, + $priority, $nice, $num_threads, + $it_real_value, $start_time, + $vsize, $rss, $rsslim, + $start_code, $end_code, $start_stack, $esp, $eip, + $pending, $blocked, $sigign, $sigcatch, $wchan_addr, + $dum1, $dum2, $exit_signal, $task_cpu, + $rt_priority, $policy, $blkio_ticks, + $gtime, $cgtime, + $start_data, $end_data, $start_brk, $arg_start, $arg_end, + $env_start, $env_end, $exit_code) = (); + my ($cgroup) = (); + my ($VmSize, $VmRSS) = (); + my $Cpus_allowed = Math::BigInt->new('0'); + my $affinity = Math::BigInt->new('0'); + + my ($status_valid, $cmdline_valid, $stat_valid, $cgroup_valid) = (); + + $pid = $::tids{$tid}; + + # JGAULD: read stuff if new, else skip + my $bypass_refresh = 1; + if (exists $::task_0{$tid}) { + # Copy previous values. + foreach my $var (keys %{$::task_0{$tid}}) { + $::task{$tid}{$var} = $::task_0{$tid}{$var}; + } + $::task{$tid}{'age'} = $::task_0{$tid}{'age'} + 1; + if ($::task{$tid}{'age'} == $TASK_REFRESH_INTERVAL) { + $::task{$tid}{'age'} = 0; + $bypass_refresh = 0; + } + } else { + $::task_n{$tid} = 1; + $::task{$tid}{'age'} = 0; + $bypass_refresh = 0; + } + + next if ($bypass_refresh); + + # parse /proc//task//status + $file = '/proc/' . $pid . '/task/' . $tid . '/status'; + open($fh, $file) || next; + LOOP_STATUS: while (<$fh>) { + if (/^Name:\s+(.*)/) { + $comm = $1; + } elsif (/^State:\s+(\S+)/) { + $state = $1; + } elsif (/^PPid:\s+(\S+)/) { + $ppid = $1; + } elsif (/^VmSize:\s+(\S+)/) { + $VmSize = $1; + } elsif (/^VmRSS:\s+(\S+)/) { + $VmRSS = $1; + } elsif (/^Cpus_allowed:\s+([0]+,)*(\S+)/) { + my $h = $2; $h =~ tr/,/_/; + $Cpus_allowed = Math::BigInt->from_hex($h); + $affinity = $Cpus_allowed->band($::affinity_mask); + $status_valid = 1; + last LOOP_STATUS; + } + } + close($fh); + + # parse /proc//task//cmdline + $file = '/proc/' . $pid . '/task/' . $tid . '/cmdline'; + open($fh, $file) || next; + LOOP_CMDLINE: while (<$fh>) { + if (/^(.*)$/) { + $cmdline = $1; + $cmdline =~ s/\000/ /g; + $cmdline_valid = 1; + last LOOP_CMDLINE; + } + } + if (!$cmdline_valid) { + $cmdline_valid = 1; + $cmdline = $comm; + } + close($fh); + + + #Table 1-4: Contents of the stat files (as of 2.6.30-rc7) + #.............................................................................. + # Field Content + # tpid process id (or tid, if /proc//task//stat) + # tcomm filename of the executable + # state state (R is running, S is sleeping, D is sleeping in an + # uninterruptible wait, Z is zombie, T is traced or stopped) + # ppid process id of the parent process + # pgrp pgrp of the process + # sid session id + # tty_nr tty the process uses + # tty_pgrp pgrp of the tty + # flags task flags + # min_flt number of minor faults + # cmin_flt number of minor faults with child's + # maj_flt number of major faults + # cmaj_flt number of major faults with child's + # utime user mode jiffies + # stime kernel mode jiffies + # cutime user mode jiffies with child's + # cstime kernel mode jiffies with child's + # priority priority level + # nice nice level + # num_threads number of threads + # it_real_value (obsolete, always 0) + # start_time time the process started after system boot + # vsize virtual memory size + # rss resident set memory size + # rsslim current limit in bytes on the rss + # start_code address above which program text can run + # end_code address below which program text can run + # start_stack address of the start of the main process stack + # esp current value of ESP + # eip current value of EIP + # pending bitmap of pending signals + # blocked bitmap of blocked signals + # sigign bitmap of ignored signals + # sigcatch bitmap of catched signals + # wchan address where process went to sleep + # 0 (place holder) + # 0 (place holder) + # exit_signal signal to send to parent thread on exit + # task_cpu which CPU the task is scheduled on + # rt_priority realtime priority + # policy scheduling policy (man sched_setscheduler) + # blkio_ticks time spent waiting for block IO + # gtime guest time of the task in jiffies + # cgtime guest time of the task children in jiffies + # start_data address above which program data+bss is placed + # end_data address below which program data+bss is placed + # start_brk address above which program heap can be expanded with brk() + # arg_start address above which program command line is placed + # arg_end address below which program command line is placed + # env_start address above which program environment is placed + # env_end address below which program environment is placed + # exit_code the thread's exit_code in the form reported by the waitpid system call + + # parse /proc//task//stat + $file = '/proc/' . $pid . '/task/' . $tid . '/stat'; + my $dummy; + open($fh, $file) || next; + $_ = <$fh>; + ($tpid, $tcomm, $dummy) = /^(\d+)\s+\((.*)\)\s+(.*)/; + ($state, $ppid, $pgrp, $sid, + $tty_nr, $tty_pgrp, $flags, + $min_flt, $cmin_flt, $maj_flt, $cmaj_flt, + $utime, $stime, $cutime, $cstime, + $priority, $nice, $num_threads, + $it_real_value, $start_time, + $vsize, $rss, $rsslim, + $start_code, $end_code, $start_stack, $esp, $eip, + $pending, $blocked, $sigign, $sigcatch, $wchan_addr, + $dum1, $dum2, $exit_signal, $task_cpu, + $rt_priority, $policy, $blkio_ticks, $gtime, $cgtime, + $start_data, $end_data, $start_brk, $arg_start, $arg_end, + $env_start, $env_end, $exit_code) = split(/\s+/, $dummy); + $stat_valid = 1; + close($fh); + + #cat /proc/1/task/1/cgroup + #12:cpu,cpuacct:/init.scope + #11:pids:/init.scope + #10:hugetlb:/ + #9:memory:/init.scope + #8:rdma:/ + #7:cpuset:/ + #6:net_cls,net_prio:/ + #5:devices:/init.scope + #4:blkio:/init.scope + #3:freezer:/ + #2:perf_event:/ + #1:name=systemd:/init.scope + #0::/init.scope + + # Extract the pod id: + # /k8s-infra/kubepods/burstable/pode84531c2-0bb1-45f8-b27f-e779b858552d/fdeaea0e577a525a3d9e41655ee05dd9b4edf17ce4b1bf95803cae1518f43ca2 + # Extract *.service or *.scope name: + # /system.slice/acpid.service + # /system.slice/system-ceph.slice/ceph-mds.scope + + # parse /proc//task//cgroup + $file = '/proc/' . $pid . '/task/' . $tid . '/cgroup'; + open($fh, $file) || next; + LOOP_CGROUP: while (<$fh>) { + if (/^\d+:(pids|cpu,cpuacct):(.*)/) { + $_ = $2; + if (/kubepods\/\w+\/(pod[a-z0-9-]+)\/\w+$/) { + $cgroup = $1; + } elsif (/\/([a-zA-Z0-9_-@:]+)\.\w+$/) { + $cgroup = $1; + } else { + $cgroup = '-'; # '-' looks prettier than '/' + } + $cgroup_valid = 1; + last LOOP_CGROUP; + } + } + close($fh); + + # status + if (defined $status_valid) { + $::task{$tid}{'pid'} = $pid; + $::task{$tid}{'comm'} = $comm; + $::task{$tid}{'state'} = $state; + $::task{$tid}{'ppid'} = $ppid; + $::task{$tid}{'VmSize'} = $VmSize; + $::task{$tid}{'VmRSS'} = $VmRSS; + $::task{$tid}{'affinity'} = $affinity; + } else { + $::task{$tid}{'pid'} = 0; + $::task{$tid}{'comm'} = '-'; + $::task{$tid}{'state'} = '-'; + $::task{$tid}{'ppid'} = 0; + $::task{$tid}{'VmSize'} = 0; + $::task{$tid}{'VmRSS'} = 0; + $::task{$tid}{'affinity'} = Math::BigInt->new('0'); + } + + # cmdline + if (defined $cmdline_valid) { + $::task{$tid}{'cmdline'} = $cmdline; + } else { + $::task{$tid}{'cmdline'} = $comm; + } + + # stat + if (defined $stat_valid) { + $::task{$tid}{'nice'} = $nice; + $::task{$tid}{'policy'} = $policy; + $::task{$tid}{'priority'} = $priority; + $::task{$tid}{'rt_priority'} = $rt_priority; + $::task{$tid}{'start_time'} = $start_time; + $::task{$tid}{'task_cpu'} = $task_cpu; + } else { + $::task{$tid}{'nice'} = 0; + $::task{$tid}{'policy'} = '-'; + $::task{$tid}{'priority'} = 0; + $::task{$tid}{'rt_priority'} = 0; + $::task{$tid}{'start_time'} = ''; + $::task{$tid}{'task_cpu'} = 0; + } + + # cgroup + if (defined $cgroup_valid) { + $::task{$tid}{'cgroup'} = $cgroup; + } else { + $::task{$tid}{'cgroup'} = '-'; + } + + } +} + +# Parse uptime from /proc/uptime +sub get_uptime +{ + (local *::uptime) = @_; + $::uptime = 0.0; + + my $file = '/proc/uptime'; + open(my $fh, $file) || croak "Cannot open file: $file ($!)"; + $_ = <$fh>; + if (/^(\S+)\s+\S+/) { + $::uptime = $1; + } + close($fh); +} + +# Get number of online logical cpus +sub get_num_logical_cpus { + (local *::num_cpus) = @_; + $::num_cpus = 0; + + my $file = "/proc/cpuinfo"; + open(my $fh, $file) || croak "Cannot open file: $file ($!)"; + LOOP_CPUINFO: while (<$fh>) { + if (/^[Pp]rocessor\s+:\s\d+/) { + $::num_cpus++; + } + } + close($fh); +} + +# Print header +sub watchpids_header { + (local *::tr_1, + local *::tm_elapsed, + local *::tr_elapsed, + local *::uptime, + ) = @_; + + # process epoch to get current timestamp + my $mm_in_s = 60; + my $hh_in_s = 60*60; + my $dd_in_s = 24*60*60; + my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst); + ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($::tr_1); + my $msec = 1000.0*($::tr_1 - int($::tr_1)); + + # convert uptime to elapsed ::: + my ($up, $up_dd, $up_hh, $up_mm, $up_ss); + $up = int($::uptime); + $up_dd = int($up/$dd_in_s); + $up -= $dd_in_s*$up_dd; + $up_hh = int($up/$hh_in_s); + $up -= $hh_in_s*$up_hh; + $up_mm = int($up/$mm_in_s); + $up -= $mm_in_s*$up_mm; + $up_ss = $up; + + #watchpids -- 2014/03/03 02:00:21.357 dt:2050.003 ms up:6:13:00:56 + printf "%s %s -- ". + "%4d-%02d-%02d %02d:%02d:%02d.%03d ". + "dt:%.3f ms ". + "up:%d:%02d:%02d:%02d\n", + $::TOOLNAME, $::VERSION, + 1900+$year, 1+$mon, $mday, $hour, $min, $sec, $msec, + $::tm_elapsed*1000.0, + $up_dd, $up_hh, $up_mm, $up_ss; +} + +# Parse and validate command line arguments +sub parse_watchpids_args { + (local *::arg_debug, + local *::arg_delay, + local *::arg_repeat, + local *::arg_period, + local *::arg_tids, + ) = @_; + + # Local variables + my ($fail, $arg_help); + + # Use the Argument processing module + use Getopt::Long; + + # Print usage if no arguments + if (!@::ARGV) { + &Usage(); + exit 0; + } + + # Process input arguments + $fail = 0; + GetOptions( + "debug:i", \$::arg_debug, + "delay=f", \$::arg_delay, + "period=i", \$::arg_period, + "repeat=i", \$::arg_repeat, + "tids", \$::arg_tids, + "help|h", \$arg_help + ) || GetOptionsMessage(); + + # Print help documentation if user has selected --help + &ListHelp() if (defined $arg_help); + + # Validate options + if ((defined $::arg_repeat) && (defined $::arg_period)) { + $fail = 1; + warn "$::TOOLNAME: Input error: cannot specify both --repeat and --period options.\n"; + } + if ((defined $::arg_delay) && ($::arg_delay < 0.01)) { + $fail = 1; + warn "$::TOOLNAME: Input error: --delay %f is less than 0.01.\n", + $::arg_delay; + } + $::arg_tids = (defined $::arg_tids) ? 1 : 0; + if (@::ARGV) { + $fail = 1; + warn "$::TOOLNAME: Input error: not expecting these options: '@::ARGV'.\n"; + } + + # Set reasonable defaults + $::arg_delay ||= 1.0; + $::arg_repeat ||= 1; + if ($::arg_period) { + $::arg_repeat = $::arg_period / $::arg_delay; + } else { + $::arg_period = $::arg_delay * $::arg_repeat; + } + + # Upon missing or invalid options, print usage + if ($fail == 1) { + &Usage(); + exit 1; + } +} + +# Print out a warning message and usage +sub GetOptionsMessage { + warn "$::TOOLNAME: Error processing input arguments.\n"; + &Usage(); + exit 1; +} + +# Print out program usage +sub Usage { + printf "Usage: $::TOOLNAME OPTIONS\n"; + printf " [--delay=] [--repeat=] [--period=]\n"; + printf " [--help]\n"; + + printf "\n"; +} + +# Print tool help +sub ListHelp { + printf "$::TOOLNAME -- display per-task scheduling occupancy\n"; + &Usage(); + printf "Options: miscellaneous\n"; + printf " --delay= : output interval (seconds): default: 1.0\n"; + printf " --repeat= : number of repeat samples: default: 1\n"; + printf " --period= : overall tool duration (seconds): default: --\n"; + printf " --help : this help\n"; + exit 0; +} + +1; diff --git a/tox.ini b/tox.ini index 64b3337..99aa951 100644 --- a/tox.ini +++ b/tox.ini @@ -71,8 +71,17 @@ basepython = python3 description = Dummy environment to allow pylint to be run in subdir tox # deps = -r{toxinidir}/test-requirements.txt +[bandit] +# The following bandit tests are being skipped: +# B602: Test for use of popen with shell equals true +# +# Note: 'skips' entry cannot be split across multiple lines +# +skips = B602 +exclude = tests + [testenv:bandit] basepython = python3 description = Bandit code scan for *.py files under config folder deps = -r{toxinidir}/test-requirements.txt -commands = bandit -r {toxinidir}/ -x '**/.tox/**,**/.eggs/**' -lll +commands = bandit --ini tox.ini -n 5 -r {toxinidir}/ -x '**/.tox/**,**/.eggs/**' -lll