Fix bandit error: Ascend driver:[B602:subprocess_popen_with_shell_equals_true]

This patch fixes one of the cyborg-tox-bandit Failures[0] in Zuul check.

[0]https://c3f93530c9211d80493c-aff6bc2b39e4d26360d25c473974606e.ssl.cf5.rackcdn.com/696089/16/check/cyborg-tox-bandit/e8316b7/job-output.txt

Change-Id: I60c246e835644d8f5b4f94ccfa0209989cf7236a
This commit is contained in:
Yumeng Bao 2020-04-16 05:33:27 -07:00
parent d8445acaf2
commit a5534682e2
2 changed files with 26 additions and 22 deletions

View File

@ -10,19 +10,19 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from oslo_concurrency import processutils
from oslo_serialization import jsonutils
import re
from cyborg.accelerator.common import utils
from cyborg.accelerator.drivers.driver import GenericDriver from cyborg.accelerator.drivers.driver import GenericDriver
from cyborg.common import constants
from cyborg.objects.driver_objects import driver_attach_handle from cyborg.objects.driver_objects import driver_attach_handle
from cyborg.objects.driver_objects import driver_controlpath_id from cyborg.objects.driver_objects import driver_controlpath_id
from cyborg.objects.driver_objects import driver_deployable from cyborg.objects.driver_objects import driver_deployable
from cyborg.objects.driver_objects import driver_device from cyborg.objects.driver_objects import driver_device
import cyborg.privsep
import re
import subprocess
from cyborg.accelerator.common import utils
from cyborg.common import constants
from oslo_serialization import jsonutils
PCI_INFO_PATTERN = re.compile(r"(?P<slot>[0-9a-f]{4}:[0-9a-f]{2}:" PCI_INFO_PATTERN = re.compile(r"(?P<slot>[0-9a-f]{4}:[0-9a-f]{2}:"
r"[0-9a-f]{2}\.[0-9a-f]) " r"[0-9a-f]{2}\.[0-9a-f]) "
@ -32,6 +32,12 @@ PCI_INFO_PATTERN = re.compile(r"(?P<slot>[0-9a-f]{4}:[0-9a-f]{2}:"
r"[(rev ](?P<revision>[0-9a-f]{2})") r"[(rev ](?P<revision>[0-9a-f]{2})")
@cyborg.privsep.sys_admin_pctxt.entrypoint
def lspci_privileged():
cmd = ['lspci', '-nnn', '-D']
return processutils.execute(*cmd)
class AscendDriver(GenericDriver): class AscendDriver(GenericDriver):
"""The class for Ascend AI Chip drivers. """The class for Ascend AI Chip drivers.
@ -66,14 +72,13 @@ class AscendDriver(GenericDriver):
# TODO(yikun): can be extracted into PCIDeviceDriver # TODO(yikun): can be extracted into PCIDeviceDriver
def _get_pci_lines(self, keywords=()): def _get_pci_lines(self, keywords=()):
cmd = "sudo lspci -nnn -D" pci_lines = []
if keywords: if keywords:
cmd += "| grep -E %s" % '|'.join(keywords) lspci_out = lspci_privileged()[0].split('\n')
# FIXME(wangzhh): Use oslo.privsep instead of subprocess here to for i in range(len(lspci_out)):
# prevent shell injection attacks. # filter out pci devices info that contains all keywords
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) if all([k in (lspci_out[i]) for k in keywords]):
p.wait() pci_lines.append(lspci_out[i])
pci_lines = p.stdout.readlines()
return pci_lines return pci_lines
def discover(self): def discover(self):

View File

@ -16,18 +16,17 @@ import mock
from cyborg.accelerator.drivers.aichip.huawei.ascend import AscendDriver from cyborg.accelerator.drivers.aichip.huawei.ascend import AscendDriver
from cyborg.tests import base from cyborg.tests import base
d100_pci_res = [ d100_pci_res = (
"0000:00:0c.0 Processing accelerators [1200]:" '0000:00:0c.0 Processing accelerators [1200]:'
" Device [19e5:d100] (rev 20)\n", ' Device [19e5:d100] (rev 20)\n'
"0000:00:0d.0 Processing accelerators [1200]:" '0000:00:0d.0 Processing accelerators [1200]:'
" Device [19e5:d100] (rev 20)" ' Device [19e5:d100] (rev 20)\n',)
]
class TestAscendDriver(base.TestCase): class TestAscendDriver(base.TestCase):
@mock.patch('cyborg.accelerator.drivers.aichip.' @mock.patch('cyborg.accelerator.drivers.aichip.'
'huawei.ascend.AscendDriver._get_pci_lines', 'huawei.ascend.lspci_privileged',
return_value=d100_pci_res) return_value=d100_pci_res)
def test_discover(self, mock_pci): def test_discover(self, mock_pci):
ascend_driver = AscendDriver() ascend_driver = AscendDriver()