Guilherme Santos 122f45ca75 C-state Management Application Source Code
This commit adds `cstate-management application's source
code.
Subsequent commits will be added to effectively establish
`cstate-management` as a StarlingX application.

Test Plan:
PASS: Check whether tox code runs successfully.

Story: 2011105
Task: 50175

Authored-By: Kaige Sun <kaige.sun@windriver.com>
Authored-By: Jackie Huang <jackie.huang@windriver.com>
Co-Authored-By: Guilherme Santos <guilherme.santos@windriver.com>
Co-Authored-By: Vinicius Lobo <vinicius.rochalobo@windriver.com>

Change-Id: If24698e454f13e3590fe04f398ba196749678c75
Signed-off-by: Guilherme Santos <guilherme.santos@windriver.com>
2024-05-28 11:55:58 -03:00

176 lines
5.6 KiB
Python

#
# Copyright (c) 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import logging
import os
LOG = logging.getLogger(__name__)
def getfileval(stateFileName):
with open(stateFileName, 'r') as state_file:
return state_file.readline().strip("\n")
def get_current_driver():
"""Get current cpu idle driver."""
current_driver = 'acpi_idle'
current_driver_path = "/sys/devices/system/cpu/cpuidle/current_driver"
try:
current_driver_file = open(current_driver_path, 'r')
current_driver = current_driver_file.readline().strip("\n")
except OSError as ex:
LOG.error(f"Exception:{type(ex)}@{str(ex)}")
return current_driver
def get_available_cstates(cpu=0):
"""Get the list of available cstate of core."""
stateList = []
cpuidle_path = f"/sys/devices/system/cpu/cpu{str(cpu)}/cpuidle/"
try:
states = os.listdir(cpuidle_path)
except OSError as ex:
states = ''
LOG.error(f"Exception:{type(ex)}@{str(ex)}")
for state in states:
stateFileName = f"/sys/devices/system/cpu/cpu{str(cpu)}" \
f"/cpuidle/{state}/name"
stateFile = open(stateFileName, 'r')
statename = stateFile.readline().strip("\n")
stateList.append(statename)
stateFile.close()
return stateList
def get_cstates(cpu=0):
"""Get the list of available cstate with latency of core."""
stateList = []
# idle_driver_type = get_current_driver()
cpuidle_path = f"/sys/devices/system/cpu/cpu{str(cpu)}/cpuidle/"
try:
states = os.listdir(cpuidle_path)
except OSError as ex:
states = ''
LOG.error(f"Exception:{type(ex)}@{str(ex)}")
for state in states:
stateFileName = f'/sys/devices/system/cpu/cpu%s/cpuidle/{state}/name' \
% str(cpu)
stateLatency = f'/sys/devices/system/cpu/cpu%s/cpuidle/{state}/' \
'latency' % str(cpu)
stateFile = open(stateFileName, 'r')
latencyFile = open(stateLatency, 'r')
statename = stateFile.readline().strip("\n")
statelat = latencyFile.readline().strip("\n")
stateobj = {
'stateName': statename,
'stateLatency': statelat,
'state': state
}
stateList.append(stateobj)
stateFile.close()
latencyFile.close()
return stateList
def set_cstate(cpurange, disable, cstate):
"""Get list of cstate dirs to iterate through to find the cstate name
parameters:
- cpurange: [0,1,2,3,4,10]
- disable: 0 or 1
- cstate: POLL, C1 or other
"""
cstates = os.listdir("/sys/devices/system/cpu/cpu0/cpuidle")
for core in cpurange:
for y in cstates:
name = getfileval(
"/sys/devices/system/cpu/cpu0/cpuidle/" + y + "/name")
if (name == cstate):
stateName = "/sys/devices/system/cpu/cpu" + \
str(core) + "/cpuidle/" + str(y) + "/disable"
try:
stateFile = open(stateName, 'w')
except OSError:
LOG.error(
"Could not open '" + str(stateName) + "', skipping."
)
continue
LOG.info("Writing '" + str(disable) + "' to " + stateName)
stateFile.write(str(disable))
stateFile.close()
def get_cstates_configured(cpurange):
"""Get current c-state settings for specific cpurange.
parameter:
- cpurange: [0,1,2,3,4]
Return cpu_configured_cstates:
{'0': {'C1': '0', 'C2': '1', 'POLL': '0'},
'1': {'C1': '0', 'C2': '1', 'POLL': '0'},
'2': {'C1': '0', 'C2': '1', 'POLL': '0'},
'3': {'C1': '0', 'C2': '1', 'POLL': '0'},
'4': {'C1': '0', 'C2': '0', 'POLL': '0'}
}
"""
cpu_configured_cstates = dict()
for cpu in cpurange:
cpustates = os.listdir(
f"/sys/devices/system/cpu/cpu{str(cpu)}/cpuidle/"
)
configured_cstates = dict()
for state in cpustates:
stateBasePath = f'/sys/devices/system/cpu/cpu%s/cpuidle/{state}/' \
% str(cpu)
stateFileName = stateBasePath + 'name'
stateDisableFileName = stateBasePath + 'disable'
stateNameFile = open(stateFileName, 'r')
statename = stateNameFile.readline().strip("\n")
stateNameFile.close()
stateDisableFile = open(stateDisableFileName, 'r')
stateDisable = stateDisableFile.readline().strip("\n")
stateDisableFile.close()
configured_cstates[statename] = '1' if stateDisable == '0' else '0'
cpu_configured_cstates[str(cpu)] = configured_cstates
return cpu_configured_cstates
def get_max_cstate(cpurange):
cstates_configured = {}
cstates = get_cstates_configured(cpurange)
# cstates object be like:
# {
# "5": {"C1": "1", "C2": "1", "POLL": "1"},
# "6": {"C1": "1", "C2": "1", "POLL": "1"}
# }
for core in cstates:
core_cstates = cstates.get(core)
# core_cstates: {"C1": "1", "C2": "1", "POLL": "1"}
filtered_core_cstates = {
key: value for key, value in core_cstates.items()
if value == '1' and key != 'POLL'
}
# drop POLL cstate
sorted_filtered_core_cstates = sorted(filtered_core_cstates)
if not sorted_filtered_core_cstates:
sorted_filtered_core_cstates = ['POLL']
max_enabled_cstate = sorted_filtered_core_cstates[-1]
cstates_configured[core] = max_enabled_cstate
return cstates_configured