Replace the os.open method with safer way

Write files using a safer open for writing operations[1].
Modifided the file permission from 666 to 644 in base.j2 to make sure
consistenacy between docs and real file permission.
Edited unit tests based on the new method.

[1] https://security.openstack.org/guidelines/
            dg_apply-restrictive-file-permissions.html

Closes-Bug: #1548552
Change-Id: If3f315005fcd22afc2f24da527da08175e230bb1
This commit is contained in:
minwang 2016-03-03 17:08:21 -08:00
parent 2ac777aadc
commit bd04981021
7 changed files with 213 additions and 157 deletions

View File

@ -13,12 +13,11 @@
# under the License.
import os
import stat
import flask
from oslo_config import cfg
MODE_OWNER = 0o600
BUFFER = 1024
CONF = cfg.CONF
@ -27,12 +26,15 @@ CONF.import_group('amphora_agent', 'octavia.common.config')
def upload_server_cert():
stream = flask.request.stream
with open(CONF.amphora_agent.agent_server_cert, 'w') as crt_file:
file_path = CONF.amphora_agent.agent_server_cert
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
# mode 00600
mode = stat.S_IRUSR | stat.S_IWUSR
with os.fdopen(os.open(file_path, flags, mode), 'w') as crt_file:
b = stream.read(BUFFER)
while b:
crt_file.write(b)
b = stream.read(BUFFER)
os.fchmod(crt_file.fileno(), MODE_OWNER) # only accessible by owner
return flask.make_response(flask.jsonify({
'message': 'OK'}), 202)

View File

@ -14,6 +14,7 @@
import logging
import os
import stat
import subprocess
import flask
@ -42,14 +43,21 @@ def upload_keepalived_config():
os.makedirs(util.keepalived_check_scripts_dir())
conf_file = util.keepalived_cfg_path()
with open(conf_file, 'w') as f:
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
# mode 00644
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
with os.fdopen(os.open(conf_file, flags, mode), 'w') as f:
b = stream.read(BUFFER)
while b:
f.write(b)
b = stream.read(BUFFER)
if not os.path.exists(util.keepalived_init_path()):
with open(util.keepalived_init_path(), 'w') as text_file:
file_path = util.keepalived_init_path()
# mode 00755
mode = (stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP |
stat.S_IROTH | stat.S_IXOTH)
if not os.path.exists(file_path):
with os.fdopen(os.open(file_path, flags, mode), 'w') as text_file:
text = template.render(
keepalived_pid=util.keepalived_pid_path(),
keepalived_cmd=consts.KEEPALIVED_CMD,
@ -57,33 +65,15 @@ def upload_keepalived_config():
keepalived_log=util.keepalived_log_path()
)
text_file.write(text)
cmd = "chmod +x {file}".format(file=util.keepalived_init_path())
try:
subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
LOG.debug("Failed to upload keepalived configuration. "
"Unable to chmod init script.")
return flask.make_response(flask.jsonify(dict(
message="Failed to upload keepalived configuration. "
"Unable to chmod init script.",
details=e.output)), 500)
# Renders the Keepalived check script
with open(util.keepalived_check_script_path(), 'w') as text_file:
keepalived_path = util.keepalived_check_script_path()
open_obj = os.open(keepalived_path, flags, mode)
with os.fdopen(open_obj, 'w') as text_file:
text = check_script_template.render(
check_scripts_dir=util.keepalived_check_scripts_dir()
)
text_file.write(text)
cmd = ("chmod +x {file}".format(
file=util.keepalived_check_script_path()))
try:
subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
LOG.debug("Failed to upload keepalived configuration. "
"Unable to chmod check script.")
return flask.make_response(flask.jsonify(dict(
message="Failed to upload keepalived configuration. "
"Unable to chmod check script.",
details=e.output)), 500)
res = flask.make_response(flask.jsonify({
'message': 'OK'}), 200)

View File

@ -18,6 +18,7 @@ import logging
import os
import re
import shutil
import stat
import subprocess
import flask
@ -98,7 +99,10 @@ def upload_haproxy_config(amphora_id, listener_id):
os.makedirs(util.haproxy_dir(listener_id))
name = os.path.join(util.haproxy_dir(listener_id), 'haproxy.cfg.new')
with open(name, 'w') as file:
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
# mode 00600
mode = stat.S_IRUSR | stat.S_IWUSR
with os.fdopen(os.open(name, flags, mode), 'w') as file:
b = stream.read(BUFFER)
while (b):
file.write(b)
@ -121,8 +125,12 @@ def upload_haproxy_config(amphora_id, listener_id):
os.rename(name, util.config_path(listener_id))
use_upstart = util.CONF.haproxy_amphora.use_upstart
if not os.path.exists(util.init_path(listener_id)):
with open(util.init_path(listener_id), 'w') as text_file:
file = util.init_path(listener_id)
# mode 00755
mode = (stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP |
stat.S_IROTH | stat.S_IXOTH)
if not os.path.exists(file):
with os.fdopen(os.open(file, flags, mode), 'w') as text_file:
template = UPSTART_TEMPLATE if use_upstart else SYSVINIT_TEMPLATE
text = template.render(
peer_name=peer_name,
@ -135,13 +143,9 @@ def upload_haproxy_config(amphora_id, listener_id):
text_file.write(text)
if not use_upstart:
# make init.d script executable
file = util.init_path(listener_id)
permcmd = ("chmod 755 {file}".format(file=file))
insrvcmd = ("insserv {file}".format(file=file))
try:
subprocess.check_output(permcmd.split(), stderr=subprocess.STDOUT)
subprocess.check_output(insrvcmd.split(), stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
LOG.debug("Failed to make %(file)s executable: %(err)s",
@ -313,12 +317,15 @@ def upload_certificate(listener_id, filename):
os.makedirs(_cert_dir(listener_id))
stream = Wrapped(flask.request.stream)
with open(_cert_file_path(listener_id, filename), 'w') as crt_file:
file = _cert_file_path(listener_id, filename)
flags = os.O_WRONLY | os.O_CREAT
# mode 00600
mode = stat.S_IRUSR | stat.S_IWUSR
with os.fdopen(os.open(file, flags, mode), 'w') as crt_file:
b = stream.read(BUFFER)
while (b):
crt_file.write(b)
b = stream.read(BUFFER)
os.fchmod(crt_file.fileno(), 0o600) # only accessible by owner
resp = flask.jsonify(dict(message='OK'))
resp.headers['ETag'] = stream.get_md5()

View File

@ -15,6 +15,7 @@
import logging
import os
import socket
import stat
import subprocess
import flask
@ -54,7 +55,11 @@ def plug_vip(vip, subnet_cidr, gateway, mac_address):
broadcast = '.'.join(sections)
# write interface file
with open(util.get_network_interface_file(interface), 'w') as text_file:
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
name = util.get_network_interface_file(interface)
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
with os.fdopen(os.open(name, flags, mode), 'w') as text_file:
text = template_vip.render(
interface=interface,
vip=vip,
@ -114,7 +119,12 @@ def plug_network(mac_address):
interface = _interface_by_mac(mac_address)
# write interface file
with open(util.get_network_interface_file(interface), 'w') as text_file:
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
# mode 00644
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
name = util.get_network_interface_file(interface)
with os.fdopen(os.open(name, flags, mode), 'w') as text_file:
text = template_port.render(interface=interface)
text_file.write(text)

View File

@ -125,7 +125,6 @@ class CertComputeCreate(ComputeCreate):
with open(CONF.controller_worker.client_ca, 'r') as client_ca:
ca = client_ca.read()
config_drive_files = {
# '/etc/octavia/octavia.conf'
'/etc/octavia/certs/server.pem': server_pem,
'/etc/octavia/certs/client_ca.pem': ca}
return super(CertComputeCreate, self).execute(

View File

@ -11,9 +11,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import json
import os
import stat
import subprocess
import mock
@ -47,22 +48,28 @@ class ServerTestCase(base.TestCase):
@mock.patch('os.remove')
def test_haproxy(self, mock_remove, mock_subprocess, mock_rename,
mock_makedirs, mock_exists):
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mock_exists.return_value = True
file_name = '/var/lib/octavia/123/haproxy.cfg.new'
m = mock.mock_open()
# happy case upstart file exists
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.put('/' + api_server.VERSION +
'/listeners/amp_123/123/haproxy',
data='test')
mode = stat.S_IRUSR | stat.S_IWUSR
mock_open.assert_called_with(file_name, flags, mode)
mock_fdopen.assert_called_with(123, 'w')
self.assertEqual(202, rv.status_code)
m.assert_called_once_with(
'/var/lib/octavia/123/haproxy.cfg.new', 'w')
handle = m()
handle.write.assert_called_once_with(six.b('test'))
mock_subprocess.assert_called_once_with(
"haproxy -c -L {peer} -f {config_file}".format(
config_file='/var/lib/octavia/123/haproxy.cfg.new',
config_file=file_name,
peer=(octavia_utils.
base64_sha1_string('amp_123').rstrip('='))).split(),
stderr=-2)
@ -73,7 +80,7 @@ class ServerTestCase(base.TestCase):
# exception writing
m = mock.mock_open()
m.side_effect = IOError() # open crashes
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
rv = self.app.put('/' + api_server.VERSION +
'/listeners/amp_123/123/haproxy',
data='test')
@ -81,17 +88,23 @@ class ServerTestCase(base.TestCase):
# check if files get created
mock_exists.return_value = False
init_path = '/etc/init/haproxy-123.conf'
m = mock.mock_open()
# happy case upstart file exists
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.put('/' + api_server.VERSION +
'/listeners/amp_123/123/haproxy',
data='test')
self.assertEqual(202, rv.status_code)
m.assert_any_call('/var/lib/octavia/123/haproxy.cfg.new', 'w')
m.assert_any_call(util.UPSTART_DIR + '/haproxy-123.conf', 'w')
handle = m()
mode = (stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP |
stat.S_IROTH | stat.S_IXOTH)
mock_open.assert_called_with(init_path, flags, mode)
mock_fdopen.assert_called_with(123, 'w')
handle = mock_fdopen()
handle.write.assert_any_call(six.b('test'))
# skip the template stuff
mock_makedirs.assert_called_with('/var/lib/octavia/123')
@ -100,7 +113,9 @@ class ServerTestCase(base.TestCase):
mock_exists.return_value = True
mock_subprocess.side_effect = [subprocess.CalledProcessError(
7, 'test', RANDOM_ERROR)]
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.put('/' + api_server.VERSION +
'/listeners/amp_123/123/haproxy',
data='test')
@ -108,17 +123,18 @@ class ServerTestCase(base.TestCase):
self.assertEqual(
{'message': 'Invalid request', u'details': u'random error'},
json.loads(rv.data.decode('utf-8')))
m.assert_called_with('/var/lib/octavia/123/haproxy.cfg.new', 'w')
handle = m()
mode = stat.S_IRUSR | stat.S_IWUSR
mock_open.assert_called_with(file_name, flags, mode)
mock_fdopen.assert_called_with(123, 'w')
handle = mock_fdopen()
handle.write.assert_called_with(six.b('test'))
mock_subprocess.assert_called_with(
"haproxy -c -L {peer} -f {config_file}".format(
config_file='/var/lib/octavia/123/haproxy.cfg.new',
config_file=file_name,
peer=(octavia_utils.
base64_sha1_string('amp_123').rstrip('='))).split(),
stderr=-2)
mock_remove.assert_called_once_with(
'/var/lib/octavia/123/haproxy.cfg.new')
mock_remove.assert_called_once_with(file_name)
@mock.patch('os.path.exists')
@mock.patch('octavia.amphorae.backends.agent.api_server.listener.'
@ -417,10 +433,9 @@ class ServerTestCase(base.TestCase):
json.loads(rv.data.decode('utf-8')))
@mock.patch('os.path.exists')
@mock.patch('os.fchmod')
@mock.patch('os.makedirs')
def test_upload_certificate_md5(self, mock_makedir, mock_chmod,
mock_exists):
def test_upload_certificate_md5(self, mock_makedir, mock_exists):
# wrong file name
rv = self.app.put('/' + api_server.VERSION +
'/listeners/123/certificates/test.bla',
@ -430,7 +445,7 @@ class ServerTestCase(base.TestCase):
mock_exists.return_value = True
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
rv = self.app.put('/' + api_server.VERSION +
'/listeners/123/certificates/test.pem',
data='TestTest')
@ -438,12 +453,11 @@ class ServerTestCase(base.TestCase):
self.assertEqual(OK, json.loads(rv.data.decode('utf-8')))
handle = m()
handle.write.assert_called_once_with(six.b('TestTest'))
mock_chmod.assert_called_once_with(handle.fileno(), 0o600)
mock_exists.return_value = False
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
rv = self.app.put('/' + api_server.VERSION +
'/listeners/123/certificates/test.pem',
data='TestTest')
@ -453,12 +467,10 @@ class ServerTestCase(base.TestCase):
handle.write.assert_called_once_with(six.b('TestTest'))
mock_makedir.assert_called_once_with('/var/lib/octavia/certs/123')
@mock.patch('os.fchmod')
def test_upload_server_certificate(self, mock_chmod):
def test_upload_server_certificate(self):
certificate_update.BUFFER = 5 # test the while loop
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
rv = self.app.put('/' + api_server.VERSION +
'/certificate',
data='TestTest')
@ -467,7 +479,6 @@ class ServerTestCase(base.TestCase):
handle = m()
handle.write.assert_any_call(six.b('TestT'))
handle.write.assert_any_call(six.b('est'))
mock_chmod.assert_called_once_with(handle.fileno(), 0o600)
@mock.patch('netifaces.interfaces')
@mock.patch('netifaces.ifaddresses')
@ -500,14 +511,21 @@ class ServerTestCase(base.TestCase):
mock_interfaces.side_effect = [['blah']]
mock_ifaddress.side_effect = [[netifaces.AF_LINK],
{netifaces.AF_LINK: [{'addr': '123'}]}]
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
file_name = '/etc/network/interfaces.d/blah.cfg'
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.post('/' + api_server.VERSION + "/plug/network",
content_type='application/json',
data=json.dumps(port_info))
self.assertEqual(202, rv.status_code)
m.assert_called_once_with(
'/etc/network/interfaces.d/blah.cfg', 'w')
mock_open.assert_called_with(file_name, flags, mode)
mock_fdopen.assert_called_with(123, 'w')
handle = m()
handle.write.assert_called_once_with(
'\n# Generated by Octavia agent\n'
@ -524,7 +542,7 @@ class ServerTestCase(base.TestCase):
7, 'test', RANDOM_ERROR), subprocess.CalledProcessError(
7, 'test', RANDOM_ERROR)]
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
rv = self.app.post('/' + api_server.VERSION + "/plug/network",
content_type='application/json',
data=json.dumps(port_info))
@ -578,15 +596,22 @@ class ServerTestCase(base.TestCase):
mock_interfaces.side_effect = [['blah']]
mock_ifaddress.side_effect = [[netifaces.AF_LINK],
{netifaces.AF_LINK: [{'addr': '123'}]}]
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
file_name = '/etc/network/interfaces.d/blah.cfg'
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.post('/' + api_server.VERSION +
"/plug/vip/203.0.113.2",
content_type='application/json',
data=json.dumps(subnet_info))
self.assertEqual(202, rv.status_code)
m.assert_called_once_with(
'/etc/network/interfaces.d/blah.cfg', 'w')
mock_open.assert_called_with(file_name, flags, mode)
mock_fdopen.assert_called_with(123, 'w')
handle = m()
handle.write.assert_called_once_with(
'\n# Generated by Octavia agent\n'
@ -607,8 +632,9 @@ class ServerTestCase(base.TestCase):
subprocess.CalledProcessError(
7, 'test', RANDOM_ERROR), subprocess.CalledProcessError(
7, 'test', RANDOM_ERROR)]
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
rv = self.app.post('/' + api_server.VERSION +
"/plug/vip/203.0.113.2",
content_type='application/json',
@ -652,42 +678,42 @@ class ServerTestCase(base.TestCase):
@mock.patch('os.path.exists')
@mock.patch('os.makedirs')
@mock.patch('os.rename')
@mock.patch('subprocess.check_output')
@mock.patch('os.remove')
def test_upload_keepalived_config(self, mock_remove, mock_subprocess,
def test_upload_keepalived_config(self, mock_remove,
mock_rename, mock_makedirs, mock_exists):
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mock_exists.return_value = True
cfg_path = util.keepalived_cfg_path()
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.put('/' + api_server.VERSION + '/vrrp/upload',
data='test')
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
mock_open.assert_called_with(cfg_path, flags, mode)
mock_fdopen.assert_called_with(123, 'w')
self.assertEqual(200, rv.status_code)
mock_exists.return_value = False
script_path = util.keepalived_check_script_path()
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.put('/' + api_server.VERSION + '/vrrp/upload',
data='test')
mode = (stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP |
stat.S_IROTH | stat.S_IXOTH)
mock_open.assert_called_with(script_path, flags, mode)
mock_fdopen.assert_called_with(123, 'w')
self.assertEqual(200, rv.status_code)
mock_subprocess.side_effect = subprocess.CalledProcessError(1,
'blah!')
with mock.patch.object(builtins, 'open', m):
rv = self.app.put('/' + api_server.VERSION + '/vrrp/upload',
data='test')
self.assertEqual(500, rv.status_code)
mock_subprocess.side_effect = [True,
subprocess.CalledProcessError(1,
'blah!')]
with mock.patch.object(builtins, 'open', m):
rv = self.app.put('/' + api_server.VERSION + '/vrrp/upload',
data='test')
self.assertEqual(500, rv.status_code)
@mock.patch('subprocess.check_output')
def test_manage_service_vrrp(self, mock_check_output):
rv = self.app.put('/' + api_server.VERSION + '/vrrp/start')

View File

@ -11,9 +11,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import json
import os
import stat
import subprocess
import mock
@ -49,27 +50,32 @@ class ServerTestCase(base.TestCase):
@mock.patch('os.remove')
def test_haproxy(self, mock_remove, mock_subprocess, mock_rename,
mock_makedirs, mock_exists):
mock_exists.return_value = True
m = mock.mock_open()
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
file_name = '/var/lib/octavia/123/haproxy.cfg.new'
# happy case init file exists
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.put('/' + api_server.VERSION +
'/listeners/amp_123/123/haproxy',
data='test')
mode = stat.S_IRUSR | stat.S_IWUSR
mock_open.assert_called_with(file_name, flags, mode)
mock_fdopen.assert_called_with(123, 'w')
self.assertEqual(202, rv.status_code)
m.assert_called_once_with(
'/var/lib/octavia/123/haproxy.cfg.new', 'w')
handle = m()
handle.write.assert_called_once_with(six.b('test'))
calls = [
mock.call("haproxy -c -L {peer} -f {config_file}".format(
config_file='/var/lib/octavia/123/haproxy.cfg.new',
config_file=file_name,
peer=(octavia_utils.base64_sha1_string('amp_123')
.rstrip('='))).split(), stderr=-2),
mock.call(['chmod', '755', '/etc/init.d/haproxy-123'],
stderr=-2),
mock.call(['insserv', '/etc/init.d/haproxy-123'], stderr=-2)
.rstrip('='))).split(), stderr=-2)
]
mock_subprocess.assert_has_calls(calls)
mock_rename.assert_called_once_with(
@ -79,7 +85,7 @@ class ServerTestCase(base.TestCase):
# exception writing
m = mock.mock_open()
m.side_effect = IOError() # open crashes
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
rv = self.app.put('/' + api_server.VERSION +
'/listeners/amp_123/123/haproxy',
data='test')
@ -90,14 +96,20 @@ class ServerTestCase(base.TestCase):
m = mock.mock_open()
# happy case init file exists
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.put('/' + api_server.VERSION +
'/listeners/amp_123/123/haproxy',
data='test')
self.assertEqual(202, rv.status_code)
m.assert_any_call('/var/lib/octavia/123/haproxy.cfg.new', 'w')
m.assert_any_call(util.SYSVINIT_DIR + '/haproxy-123', 'w')
handle = m()
mode = (stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP |
stat.S_IROTH | stat.S_IXOTH)
mock_open.assert_called_with(util.SYSVINIT_DIR + '/haproxy-123',
flags, mode)
mock_fdopen.assert_called_with(123, 'w')
handle = mock_fdopen()
handle.write.assert_any_call(six.b('test'))
# skip the template stuff
mock_makedirs.assert_called_with('/var/lib/octavia/123')
@ -106,7 +118,10 @@ class ServerTestCase(base.TestCase):
mock_exists.return_value = True
mock_subprocess.side_effect = [subprocess.CalledProcessError(
7, 'test', RANDOM_ERROR)]
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.put('/' + api_server.VERSION +
'/listeners/amp_123/123/haproxy',
data='test')
@ -114,17 +129,18 @@ class ServerTestCase(base.TestCase):
self.assertEqual(
{'message': 'Invalid request', u'details': u'random error'},
json.loads(rv.data.decode('utf-8')))
m.assert_called_with('/var/lib/octavia/123/haproxy.cfg.new', 'w')
handle = m()
mode = stat.S_IRUSR | stat.S_IWUSR
mock_open.assert_called_with(file_name, flags, mode)
mock_fdopen.assert_called_with(123, 'w')
handle = mock_fdopen()
handle.write.assert_called_with(six.b('test'))
mock_subprocess.assert_called_with(
"haproxy -c -L {peer} -f {config_file}".format(
config_file='/var/lib/octavia/123/haproxy.cfg.new',
config_file=file_name,
peer=(octavia_utils.
base64_sha1_string('amp_123').rstrip('='))).split(),
stderr=-2)
mock_remove.assert_called_once_with(
'/var/lib/octavia/123/haproxy.cfg.new')
mock_remove.assert_called_once_with(file_name)
@mock.patch('os.path.exists')
@mock.patch('octavia.amphorae.backends.agent.api_server.listener.'
@ -423,10 +439,8 @@ class ServerTestCase(base.TestCase):
json.loads(rv.data.decode('utf-8')))
@mock.patch('os.path.exists')
@mock.patch('os.fchmod')
@mock.patch('os.makedirs')
def test_upload_certificate_md5(self, mock_makedir, mock_chmod,
mock_exists):
def test_upload_certificate_md5(self, mock_makedir, mock_exists):
# wrong file name
rv = self.app.put('/' + api_server.VERSION +
'/listeners/123/certificates/test.bla',
@ -436,7 +450,7 @@ class ServerTestCase(base.TestCase):
mock_exists.return_value = True
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
rv = self.app.put('/' + api_server.VERSION +
'/listeners/123/certificates/test.pem',
data='TestTest')
@ -444,12 +458,11 @@ class ServerTestCase(base.TestCase):
self.assertEqual(OK, json.loads(rv.data.decode('utf-8')))
handle = m()
handle.write.assert_called_once_with(six.b('TestTest'))
mock_chmod.assert_called_once_with(handle.fileno(), 0o600)
mock_exists.return_value = False
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
rv = self.app.put('/' + api_server.VERSION +
'/listeners/123/certificates/test.pem',
data='TestTest')
@ -459,12 +472,10 @@ class ServerTestCase(base.TestCase):
handle.write.assert_called_once_with(six.b('TestTest'))
mock_makedir.assert_called_once_with('/var/lib/octavia/certs/123')
@mock.patch('os.fchmod')
def test_upload_server_certificate(self, mock_chmod):
def test_upload_server_certificate(self):
certificate_update.BUFFER = 5 # test the while loop
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
rv = self.app.put('/' + api_server.VERSION +
'/certificate',
data='TestTest')
@ -473,7 +484,6 @@ class ServerTestCase(base.TestCase):
handle = m()
handle.write.assert_any_call(six.b('TestT'))
handle.write.assert_any_call(six.b('est'))
mock_chmod.assert_called_once_with(handle.fileno(), 0o600)
@mock.patch('netifaces.interfaces')
@mock.patch('netifaces.ifaddresses')
@ -506,14 +516,20 @@ class ServerTestCase(base.TestCase):
mock_interfaces.side_effect = [['blah']]
mock_ifaddress.side_effect = [[netifaces.AF_LINK],
{netifaces.AF_LINK: [{'addr': '123'}]}]
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
file_name = '/etc/network/interfaces.d/blah.cfg'
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.post('/' + api_server.VERSION + "/plug/network",
content_type='application/json',
data=json.dumps(port_info))
self.assertEqual(202, rv.status_code)
m.assert_called_once_with(
'/etc/network/interfaces.d/blah.cfg', 'w')
mock_open.assert_called_with(file_name, flags, mode)
mock_fdopen.assert_called_with(123, 'w')
handle = m()
handle.write.assert_called_once_with(
'\n# Generated by Octavia agent\n'
@ -530,7 +546,7 @@ class ServerTestCase(base.TestCase):
7, 'test', RANDOM_ERROR), subprocess.CalledProcessError(
7, 'test', RANDOM_ERROR)]
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
rv = self.app.post('/' + api_server.VERSION + "/plug/network",
content_type='application/json',
data=json.dumps(port_info))
@ -584,15 +600,21 @@ class ServerTestCase(base.TestCase):
mock_interfaces.side_effect = [['blah']]
mock_ifaddress.side_effect = [[netifaces.AF_LINK],
{netifaces.AF_LINK: [{'addr': '123'}]}]
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
file_name = '/etc/network/interfaces.d/blah.cfg'
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.post('/' + api_server.VERSION +
"/plug/vip/203.0.113.2",
content_type='application/json',
data=json.dumps(subnet_info))
self.assertEqual(202, rv.status_code)
m.assert_called_once_with(
'/etc/network/interfaces.d/blah.cfg', 'w')
mock_open.assert_called_with(file_name, flags, mode)
mock_fdopen.assert_called_with(123, 'w')
handle = m()
handle.write.assert_called_once_with(
'\n# Generated by Octavia agent\n'
@ -614,7 +636,7 @@ class ServerTestCase(base.TestCase):
7, 'test', RANDOM_ERROR), subprocess.CalledProcessError(
7, 'test', RANDOM_ERROR)]
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m):
rv = self.app.post('/' + api_server.VERSION +
"/plug/vip/203.0.113.2",
content_type='application/json',
@ -658,42 +680,42 @@ class ServerTestCase(base.TestCase):
@mock.patch('os.path.exists')
@mock.patch('os.makedirs')
@mock.patch('os.rename')
@mock.patch('subprocess.check_output')
@mock.patch('os.remove')
def test_upload_keepalived_config(self, mock_remove, mock_subprocess,
def test_upload_keepalived_config(self, mock_remove,
mock_rename, mock_makedirs, mock_exists):
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
mock_exists.return_value = True
cfg_path = util.keepalived_cfg_path()
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.put('/' + api_server.VERSION + '/vrrp/upload',
data='test')
mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
mock_open.assert_called_with(cfg_path, flags, mode)
mock_fdopen(123, 'w')
self.assertEqual(200, rv.status_code)
mock_exists.return_value = False
script_path = util.keepalived_check_script_path()
m = mock.mock_open()
with mock.patch.object(builtins, 'open', m):
with mock.patch('os.open') as mock_open, mock.patch.object(
os, 'fdopen', m) as mock_fdopen:
mock_open.return_value = 123
rv = self.app.put('/' + api_server.VERSION + '/vrrp/upload',
data='test')
mode = (stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP |
stat.S_IROTH | stat.S_IXOTH)
mock_open.assert_called_with(script_path, flags, mode)
mock_fdopen(123, 'w')
self.assertEqual(200, rv.status_code)
mock_subprocess.side_effect = subprocess.CalledProcessError(1,
'blah!')
with mock.patch.object(builtins, 'open', m):
rv = self.app.put('/' + api_server.VERSION + '/vrrp/upload',
data='test')
self.assertEqual(500, rv.status_code)
mock_subprocess.side_effect = [True,
subprocess.CalledProcessError(1,
'blah!')]
with mock.patch.object(builtins, 'open', m):
rv = self.app.put('/' + api_server.VERSION + '/vrrp/upload',
data='test')
self.assertEqual(500, rv.status_code)
@mock.patch('subprocess.check_output')
def test_manage_service_vrrp(self, mock_check_output):
rv = self.app.put('/' + api_server.VERSION + '/vrrp/start')