mirror of
https://github.com/Telecominfraproject/ols-nos.git
synced 2025-11-01 18:48:05 +00:00
[ruijie] Replace os.system and remove subprocess with shell=True (#12107)
Signed-off-by: maipbui <maibui@microsoft.com> Dependency: [https://github.com/sonic-net/sonic-buildimage/pull/12065](https://github.com/sonic-net/sonic-buildimage/pull/12065) #### Why I did it 1. `getstatusoutput` is used without a static string and it uses `shell=True` 2. `subprocess()` - when using with `shell=True` is dangerous. Using subprocess function without a static string can lead to command injection. 3. `os` - not secure against maliciously constructed input and dangerous if used to evaluate dynamic content. #### How I did it 1. use `getstatusoutput` without shell=True 2. `subprocess()` - use `shell=False` instead. use an array string. Ref: [https://semgrep.dev/docs/cheat-sheets/python-command-injection/#mitigation](https://semgrep.dev/docs/cheat-sheets/python-command-injection/#mitigation) 3. `os` - use with `subprocess`
This commit is contained in:
@@ -6,7 +6,7 @@ import os
|
||||
import subprocess
|
||||
import time
|
||||
from ruijieconfig import GLOBALCONFIG, GLOBALINITPARAM, GLOBALINITCOMMAND, MAC_LED_RESET, STARTMODULE, i2ccheck_params
|
||||
|
||||
from sonic_py_common.general import getstatusoutput_noshell, getstatusoutput_noshell_pipe
|
||||
from ruijieutil import rjpciwr
|
||||
|
||||
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
|
||||
@@ -46,7 +46,7 @@ def write_sysfs_value(reg_name, value):
|
||||
|
||||
def check_driver():
|
||||
u'''whether there is driver start with rg'''
|
||||
status, output = log_os_system("lsmod | grep rg | wc -l")
|
||||
status, output = getstatusoutput_noshell_pipe(["lsmod"], ["grep", "rg"], ["wc", "-l"])
|
||||
#System execution error
|
||||
if status:
|
||||
return False
|
||||
@@ -70,61 +70,59 @@ def i2c_getPid(name):
|
||||
return ret
|
||||
|
||||
def startAvscontrol():
|
||||
cmd = "nohup avscontrol.py start >/dev/null 2>&1 &"
|
||||
cmd = ["avscontrol.py", "start"]
|
||||
rets = i2c_getPid("avscontrol.py")
|
||||
if len(rets) == 0:
|
||||
os.system(cmd)
|
||||
subprocess.Popen(cmd)
|
||||
|
||||
def startFanctrol():
|
||||
if STARTMODULE['fancontrol'] == 1:
|
||||
cmd = "nohup fancontrol.py start >/dev/null 2>&1 &"
|
||||
cmd = ["fancontrol.py", "start"]
|
||||
rets = i2c_getPid("fancontrol.py")
|
||||
if len(rets) == 0:
|
||||
os.system(cmd)
|
||||
subprocess.Popen(cmd)
|
||||
|
||||
def starthal_fanctrl():
|
||||
if STARTMODULE.get('hal_fanctrl',0) == 1:
|
||||
cmd = "nohup hal_fanctrl.py start >/dev/null 2>&1 &"
|
||||
cmd = ["hal_fanctrl.py", "start"]
|
||||
rets = i2c_getPid("hal_fanctrl.py")
|
||||
if len(rets) == 0:
|
||||
os.system(cmd)
|
||||
subprocess.Popen(cmd)
|
||||
|
||||
def starthal_ledctrl():
|
||||
if STARTMODULE.get('hal_ledctrl',0) == 1:
|
||||
cmd = "nohup hal_ledctrl.py start >/dev/null 2>&1 &"
|
||||
cmd = ["hal_ledctrl.py", "start"]
|
||||
rets = i2c_getPid("hal_ledctrl.py")
|
||||
if len(rets) == 0:
|
||||
os.system(cmd)
|
||||
|
||||
subprocess.Popen(cmd)
|
||||
def startDevmonitor():
|
||||
if STARTMODULE.get('dev_monitor',0) == 1:
|
||||
cmd = "nohup dev_monitor.py start >/dev/null 2>&1 &"
|
||||
cmd = ["dev_monitor.py", "start"]
|
||||
rets = i2c_getPid("dev_monitor.py")
|
||||
if len(rets) == 0:
|
||||
os.system(cmd)
|
||||
|
||||
subprocess.Popen(cmd)
|
||||
def startSlotmonitor():
|
||||
if STARTMODULE.get('slot_monitor',0) == 1:
|
||||
cmd = "nohup slot_monitor.py start >/dev/null 2>&1 &"
|
||||
cmd = ["slot_monitor.py", "start"]
|
||||
rets = i2c_getPid("slot_monitor.py")
|
||||
if len(rets) == 0:
|
||||
os.system(cmd)
|
||||
subprocess.Popen(cmd)
|
||||
|
||||
def stopFanctrol():
|
||||
u'''disable fan timer service'''
|
||||
if STARTMODULE['fancontrol'] == 1:
|
||||
rets = i2c_getPid("fancontrol.py") #
|
||||
for ret in rets:
|
||||
cmd = "kill "+ ret
|
||||
os.system(cmd)
|
||||
cmd = ["kill", ret]
|
||||
subprocess.call(cmd)
|
||||
return True
|
||||
|
||||
def stophal_ledctrl():
|
||||
if STARTMODULE.get('hal_ledctrl',0) == 1:
|
||||
rets = i2c_getPid("hal_ledctrl.py")
|
||||
for ret in rets:
|
||||
cmd = "kill "+ ret
|
||||
os.system(cmd)
|
||||
cmd = ["kill", ret]
|
||||
subprocess.call(cmd)
|
||||
return True
|
||||
|
||||
|
||||
@@ -133,8 +131,8 @@ def stopDevmonitor():
|
||||
if STARTMODULE.get('dev_monitor',0) == 1:
|
||||
rets = i2c_getPid("dev_monitor.py") #
|
||||
for ret in rets:
|
||||
cmd = "kill "+ ret
|
||||
os.system(cmd)
|
||||
cmd = ["kill", ret]
|
||||
subprocess.call(cmd)
|
||||
return True
|
||||
|
||||
def stopSlotmonitor():
|
||||
@@ -142,15 +140,16 @@ def stopSlotmonitor():
|
||||
if STARTMODULE.get('slot_monitor',0) == 1:
|
||||
rets = i2c_getPid("slot_monitor.py") #
|
||||
for ret in rets:
|
||||
cmd = "kill "+ ret
|
||||
os.system(cmd)
|
||||
cmd = ["kill", ret]
|
||||
subprocess.call(cmd)
|
||||
return True
|
||||
|
||||
def removeDev(bus, loc):
|
||||
cmd = "echo 0x%02x > /sys/bus/i2c/devices/i2c-%d/delete_device" % (loc, bus)
|
||||
devpath = "/sys/bus/i2c/devices/%d-%04x"%(bus, loc)
|
||||
if os.path.exists(devpath):
|
||||
log_os_system(cmd)
|
||||
file = "/sys/bus/i2c/devices/i2c-%d/delete_device" % bus
|
||||
with open(file, 'w') as f:
|
||||
f.write('0x%02x\n'%str(bus))
|
||||
|
||||
def addDev(name, bus, loc):
|
||||
if name == "lm75":
|
||||
@@ -163,10 +162,11 @@ def addDev(name, bus, loc):
|
||||
if i % 10 == 0:
|
||||
click.echo("%%DEVICE_I2C-INIT: %s not found, wait 0.1 second ! i %d " % (pdevpath,i))
|
||||
|
||||
cmd = "echo %s 0x%02x > /sys/bus/i2c/devices/i2c-%d/new_device" % (name, loc, bus)
|
||||
devpath = "/sys/bus/i2c/devices/%d-%04x"%(bus, loc)
|
||||
if os.path.exists(devpath) == False:
|
||||
os.system(cmd)
|
||||
file = "/sys/bus/i2c/devices/i2c-%d/new_device" % bus
|
||||
with open(file, 'w') as f:
|
||||
f.write('%s 0x%02x\n' % (name, loc))
|
||||
|
||||
def removedevs():
|
||||
devs = GLOBALCONFIG["DEVS"]
|
||||
@@ -179,8 +179,7 @@ def adddevs():
|
||||
addDev(devs[dev]["name"], devs[dev]["bus"] , devs[dev]["loc"])
|
||||
|
||||
def checksignaldriver(name):
|
||||
modisexistcmd = "lsmod | grep %s | wc -l" % name
|
||||
status, output = log_os_system(modisexistcmd)
|
||||
status, output = getstatusoutput_noshell_pipe(["lsmod"], ["grep", name], ["wc", "-l"])
|
||||
#System execution error
|
||||
if status:
|
||||
return False
|
||||
@@ -190,17 +189,17 @@ def checksignaldriver(name):
|
||||
return False
|
||||
|
||||
def adddriver(name, delay):
|
||||
cmd = "modprobe %s" % name
|
||||
cmd = ["modprobe", name]
|
||||
if delay != 0:
|
||||
time.sleep(delay)
|
||||
if checksignaldriver(name) != True:
|
||||
log_os_system(cmd)
|
||||
getstatusoutput_noshell(cmd)
|
||||
|
||||
def removedriver(name, delay):
|
||||
realname = name.lstrip().split(" ")[0];
|
||||
cmd = "rmmod -f %s" % realname
|
||||
cmd = ["rmmod", "-f", realname]
|
||||
if checksignaldriver(realname):
|
||||
log_os_system(cmd)
|
||||
getstatusoutput_noshell(cmd)
|
||||
|
||||
def removedrivers():
|
||||
u'''remove all drivers'''
|
||||
|
||||
Reference in New Issue
Block a user