Merge "Adjust Python 2.6 OSerror-on-EPIPE workaround"
This commit is contained in:
@@ -65,6 +65,7 @@ def set_subprocess(_subprocess=None):
|
|||||||
|
|
||||||
def _check_files_accessible(files):
|
def _check_files_accessible(files):
|
||||||
err = None
|
err = None
|
||||||
|
retcode = -1
|
||||||
try:
|
try:
|
||||||
for try_file in files:
|
for try_file in files:
|
||||||
with open(try_file, 'r'):
|
with open(try_file, 'r'):
|
||||||
@@ -74,8 +75,19 @@ def _check_files_accessible(files):
|
|||||||
# the given file.
|
# the given file.
|
||||||
err = ('Hit OSError in _process_communicate_handle_oserror()\n'
|
err = ('Hit OSError in _process_communicate_handle_oserror()\n'
|
||||||
'Likely due to %s: %s') % (try_file, e.strerror)
|
'Likely due to %s: %s') % (try_file, e.strerror)
|
||||||
|
# Emulate openssl behavior, which returns with code 2 when
|
||||||
|
# access to a file failed:
|
||||||
|
|
||||||
return err
|
# You can get more from
|
||||||
|
# http://www.openssl.org/docs/apps/cms.html#EXIT_CODES
|
||||||
|
#
|
||||||
|
# $ openssl cms -verify -certfile not_exist_file -CAfile \
|
||||||
|
# not_exist_file -inform PEM -nosmimecap -nodetach \
|
||||||
|
# -nocerts -noattr
|
||||||
|
# Error opening certificate file not_exist_file
|
||||||
|
retcode = 2
|
||||||
|
|
||||||
|
return retcode, err
|
||||||
|
|
||||||
|
|
||||||
def _process_communicate_handle_oserror(process, data, files):
|
def _process_communicate_handle_oserror(process, data, files):
|
||||||
@@ -91,12 +103,11 @@ def _process_communicate_handle_oserror(process, data, files):
|
|||||||
|
|
||||||
# The quick exit is typically caused by the openssl command not being
|
# The quick exit is typically caused by the openssl command not being
|
||||||
# able to read an input file, so check ourselves if can't read a file.
|
# able to read an input file, so check ourselves if can't read a file.
|
||||||
err = _check_files_accessible(files)
|
retcode, err = _check_files_accessible(files)
|
||||||
if process.stderr:
|
if process.stderr:
|
||||||
msg = process.stderr.read()
|
msg = process.stderr.read()
|
||||||
err = err + msg.decode('utf-8')
|
err = err + msg.decode('utf-8')
|
||||||
output = ''
|
output = ''
|
||||||
retcode = -1
|
|
||||||
else:
|
else:
|
||||||
retcode = process.poll()
|
retcode = process.poll()
|
||||||
if err is not None:
|
if err is not None:
|
||||||
|
@@ -10,6 +10,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import errno
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
@@ -77,8 +78,6 @@ class CMSTest(utils.TestCase, testresources.ResourcedTestCase):
|
|||||||
'/no/such/file', '/no/such/key')
|
'/no/such/file', '/no/such/key')
|
||||||
|
|
||||||
def test_cms_verify_token_no_oserror(self):
|
def test_cms_verify_token_no_oserror(self):
|
||||||
import errno
|
|
||||||
|
|
||||||
def raise_OSError(*args):
|
def raise_OSError(*args):
|
||||||
e = OSError()
|
e = OSError()
|
||||||
e.errno = errno.EPIPE
|
e.errno = errno.EPIPE
|
||||||
@@ -87,11 +86,11 @@ class CMSTest(utils.TestCase, testresources.ResourcedTestCase):
|
|||||||
with mock.patch('subprocess.Popen.communicate', new=raise_OSError):
|
with mock.patch('subprocess.Popen.communicate', new=raise_OSError):
|
||||||
try:
|
try:
|
||||||
cms.cms_verify("x", '/no/such/file', '/no/such/key')
|
cms.cms_verify("x", '/no/such/file', '/no/such/key')
|
||||||
except subprocess.CalledProcessError as e:
|
except exceptions.CertificateConfigError as e:
|
||||||
self.assertIn('/no/such/file', e.output)
|
self.assertIn('/no/such/file', e.output)
|
||||||
self.assertIn('Hit OSError ', e.output)
|
self.assertIn('Hit OSError ', e.output)
|
||||||
else:
|
else:
|
||||||
self.fail('Expected subprocess.CalledProcessError')
|
self.fail('Expected exceptions.CertificateConfigError')
|
||||||
|
|
||||||
def test_cms_verify_token_scoped(self):
|
def test_cms_verify_token_scoped(self):
|
||||||
cms_content = cms.token_to_cms(self.examples.SIGNED_TOKEN_SCOPED)
|
cms_content = cms.token_to_cms(self.examples.SIGNED_TOKEN_SCOPED)
|
||||||
|
Reference in New Issue
Block a user