segment.io

This commit is contained in:
Michael Gummelt
2015-04-22 11:31:53 -07:00
parent dfcc1560b9
commit f985e9fbbd
8 changed files with 340 additions and 144 deletions

View File

@@ -3,7 +3,7 @@
if [ -n "$BASH_SOURCE" ] ; then
BIN_DIR=$(dirname "$BASH_SOURCE")
elif [ $(basename -- "$0") = "env-setup" ]; then
elif [ $(basename -- "$0") = "env-setup-dev" ]; then
BIN_DIR=$(dirname "$0")
else
BIN_DIR=$PWD/bin

215
cli/dcoscli/analytics.py Normal file
View File

@@ -0,0 +1,215 @@
import json
import logging
import os
import sys
import uuid
import dcoscli
import requests
import rollbar
from dcos.api import config, constants
from dcoscli.constants import (ROLLBAR_SERVER_POST_KEY,
SEGMENT_IO_CLI_ERROR_EVENT,
SEGMENT_IO_CLI_EVENT, SEGMENT_IO_WRITE_KEY_DEV,
SEGMENT_IO_WRITE_KEY_PROD, SEGMENT_URL)
from futures import ThreadPoolExecutor
from requests.auth import HTTPBasicAuth
logger = logging.getLogger(__name__)
session_id = uuid.uuid4().hex
def wait_and_track(subproc):
"""
Run a command and report it to analytics services.
:param subproc: Subprocess to capture
:type subproc: Popen
:returns: exit code of subproc
:rtype: int
"""
rollbar.init(ROLLBAR_SERVER_POST_KEY,
'prod' if _is_prod() else 'dev')
conf = _conf()
report = conf.get('core.reporting', True)
with ThreadPoolExecutor(max_workers=2) as pool:
if report:
_segment_track_cli(pool, conf)
exit_code, err = _wait_and_capture(subproc)
# We only want to catch exceptions, not other stderr messages
# (such as "task does not exist", so we look for the 'Traceback'
# string. This only works for python, so we'll need to revisit
# this in the future when we support subcommands written in other
# languages.
if report and 'Traceback' in err:
_track_err(pool, exit_code, err, conf)
return exit_code
def _send_segment_event(event, properties):
"""
Send a segment event
:param event: name of event
:type event: string
:param properties: event properties
:type properties: dict
:rtype: None
"""
data = {'anonymousId': session_id,
'event': event,
'properties': properties}
key = SEGMENT_IO_WRITE_KEY_PROD if _is_prod() else \
SEGMENT_IO_WRITE_KEY_DEV
try:
requests.post(SEGMENT_URL,
json=data,
auth=HTTPBasicAuth(key, ''),
timeout=3)
except Exception as e:
logger.exception(e)
def _is_prod():
""" True if this process is in production. """
return os.environ.get('DCOS_PRODUCTION', 'true') != 'false'
def _conf():
"""
Get config file.
:rtype: Toml
"""
return config.load_from_path(
os.environ[constants.DCOS_CONFIG_ENV])
def _wait_and_capture(subproc):
"""
Run a subprocess and capture its stderr.
:param subproc: Subprocess to capture
:type subproc: Popen
:returns: exit code of subproc
:rtype: int
"""
err = ''
while subproc.poll() is None:
err_buff = subproc.stderr.read().decode('utf-8')
sys.stderr.write(err_buff)
err += err_buff
exit_code = subproc.poll()
return exit_code, err
def _track_err(pool, exit_code, err, conf):
"""
Report error details to analytics services.
:param pool: thread pool
:type pool: ThreadPoolExecutor
:param exit_code: exit code of tracked process
:type exit_code: int
:param err: stderr of tracked process
:type err: str
:param conf: dcos config file
:type conf: Toml
:rtype: None
"""
# Segment.io calls are async, but rollbar is not, so for
# parallelism, we must call segment first.
_segment_track_err(pool, conf, err, exit_code)
_rollbar_track_err(conf, err, exit_code)
def _segment_track_cli(pool, conf):
"""
Send segment.io cli event.
:param pool: thread pool
:type pool: ThreadPoolExecutor
:param conf: dcos config file
:type conf: Toml
:rtype: None
"""
props = _base_properties(conf)
pool.submit(_send_segment_event, SEGMENT_IO_CLI_EVENT, props)
def _segment_track_err(pool, conf, err, exit_code):
"""
Send segment.io error event.
:param pool: thread pool
:type segment: ThreadPoolExecutor
:param conf: dcos config file
:type conf: Toml
:param err: stderr of tracked process
:type err: str
:param exit_code: exit code of tracked process
:type exit_code: int
:rtype: None
"""
props = _base_properties(conf)
props['err'] = err
props['exit_code'] = exit_code
pool.submit(_send_segment_event, SEGMENT_IO_CLI_ERROR_EVENT, props)
def _rollbar_track_err(conf, err, exit_code):
"""
Report to rollbar. Synchronous.
:param exit_code: exit code of tracked process
:type exit_code: int
:param err: stderr of tracked process
:type err: str
:param conf: dcos config file
:type conf: Toml
:rtype: None
"""
props = _base_properties(conf)
props['exit_code'] = exit_code
try:
rollbar.report_message(err, 'error', extra_data=props)
except Exception as e:
logger.exception(e)
def _base_properties(conf=None):
"""
These properties are sent with every analytics event.
:param conf: dcos config file
:type conf: Toml
:rtype: dict
"""
if not conf:
conf = _conf()
cmd = 'dcos' + (' {}'.format(sys.argv[1]) if len(sys.argv) > 1 else '')
return {
'cmd': cmd,
'full_cmd': ' '.join(sys.argv),
'dcoscli.version': dcoscli.version,
'python_version': str(sys.version_info),
'config': json.dumps(list(conf.property_items()))
}

View File

@@ -1 +1,9 @@
ROLLBAR_SERVER_POST_KEY = '62f87c5df3674629b143a137de3d3244'
SEGMENT_IO_WRITE_KEY_PROD = '51ybGTeFEFU1xo6u10XMDrr6kATFyRyh'
SEGMENT_IO_WRITE_KEY_DEV = '39uhSEOoRHMw6cMR6st9tYXDbAL3JSaP'
SEGMENT_IO_CLI_EVENT = 'dcos-cli'
SEGMENT_IO_CLI_ERROR_EVENT = 'dcos-cli-error'
SEGMENT_URL = 'https://api.segment.io/v1/track'
DCOS_PRODUCTION_ENV = 'DCOS_PRODUCTION'

View File

@@ -26,8 +26,6 @@ Environment Variables:
to read about a specific subcommand.
"""
import json
import logging
import os
import signal
import sys
@@ -35,12 +33,9 @@ from subprocess import PIPE, Popen
import dcoscli
import docopt
import rollbar
from dcos.api import (config, constants, emitting, errors, http, subcommand,
util)
from dcoscli.constants import ROLLBAR_SERVER_POST_KEY
from dcos.api import constants, emitting, errors, http, subcommand, util
from dcoscli import analytics
logger = logging.getLogger(__name__)
emitter = emitting.FlatEmitter()
@@ -77,78 +72,7 @@ def main():
subproc = Popen([executable, command] + args['<args>'],
stderr=PIPE)
prod = os.environ.get('DCOS_PRODUCTION', 'true') != 'false'
rollbar.init(ROLLBAR_SERVER_POST_KEY,
'prod' if prod else 'dev')
return _wait_and_track(subproc)
def _wait_and_capture(subproc):
"""
:param subproc: Subprocess to capture
:type subproc: Popen
:returns: exit code of subproc
:rtype: int
"""
# capture and print stderr
err = ''
while subproc.poll() is None:
err_buff = subproc.stderr.read().decode('utf-8')
sys.stderr.write(err_buff)
err += err_buff
exit_code = subproc.poll()
return exit_code, err
def _wait_and_track(subproc):
"""
:param subproc: Subprocess to capture
:type subproc: Popen
:returns: exit code of subproc
:rtype: int
"""
exit_code, err = _wait_and_capture(subproc)
conf = config.load_from_path(
os.environ[constants.DCOS_CONFIG_ENV])
# We only want to catch exceptions, not other stderr messages
# (such as "task does not exist", so we look for the 'Traceback'
# string. This only works for python, so we'll need to revisit
# this in the future when we support subcommands written in other
# languages.
if 'Traceback' in err and conf.get('core.reporting', True):
_track(exit_code, err, conf)
return exit_code
def _track(exit_code, err, conf):
"""
:param exit_code: exit code of tracked process
:type exit_code: int
:param err: stderr of tracked process
:type err: str
:param conf: dcos config file
:type conf: Toml
:rtype: None
"""
# rollbar analytics
try:
rollbar.report_message(err, 'error', extra_data={
'cmd': ' '.join(sys.argv),
'exit_code': exit_code,
'python_version': str(sys.version_info),
'dcoscli.version': dcoscli.version,
'config': json.dumps(list(conf.property_items()))
})
except Exception as e:
logger.exception(e)
return analytics.wait_and_track(subproc)
def _config_log_level_environ(log_level):

View File

@@ -67,7 +67,8 @@ setup(
'pkginfo',
'toml',
'virtualenv',
'rollbar'
'rollbar',
'futures'
],
# If there are data files included in your packages that need to be

View File

@@ -7,4 +7,4 @@ host = "localhost"
cache = "tmp/cache"
sources = [ "git://github.com/mesosphere/universe.git", "https://github.com/mesosphere/universe/archive/master.zip",]
[core]
reporting = true
reporting = false

View File

@@ -1,66 +1,136 @@
import json
import os
import sys
from functools import wraps
import dcoscli
import dcoscli.analytics
import requests
import rollbar
from dcos.api import config, constants, util
from dcoscli.constants import ROLLBAR_SERVER_POST_KEY
from dcos.api import constants, util
from dcoscli.analytics import _base_properties
from dcoscli.constants import (ROLLBAR_SERVER_POST_KEY,
SEGMENT_IO_CLI_ERROR_EVENT,
SEGMENT_IO_CLI_EVENT, SEGMENT_IO_WRITE_KEY_DEV,
SEGMENT_IO_WRITE_KEY_PROD, SEGMENT_URL)
from dcoscli.main import main
from mock import Mock, patch
from mock import patch
ANON_ID = 0
def _mock(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
with patch('rollbar.init'), \
patch('rollbar.report_message'), \
patch('requests.post'), \
patch('dcoscli.analytics.session_id'):
dcoscli.analytics.session_id = ANON_ID
fn()
return wrapper
@_mock
def test_no_exc():
'''Tests that a command which does not raise an exception does not
report an exception.
'''
# args
args = [util.which('dcos')]
exit_code = _mock_analytics_run(args)
env = _env_reporting()
assert rollbar.report_message.call_count == 0
assert exit_code == 0
with patch('sys.argv', args), patch.dict(os.environ, env):
assert main() == 0
# segment.io
args, kwargs = requests.post.call_args
assert args == (SEGMENT_URL,)
props = _base_properties()
assert kwargs['json'] == {'anonymousId': ANON_ID,
'event': SEGMENT_IO_CLI_EVENT,
'properties': props}
assert kwargs['timeout'] == 3
# rollbar
assert rollbar.report_message.call_count == 0
@_mock
def test_exc():
'''Tests that a command which does raise an exception does report an
exception.
'''
# args
args = [util.which('dcos')]
exit_code = _mock_analytics_run_exc(args)
env = _env_reporting()
props = _analytics_properties(args, exit_code=1)
rollbar.report_message.assert_called_with('Traceback', 'error',
extra_data=props)
assert exit_code == 1
with patch('sys.argv', args), \
patch.dict(os.environ, env), \
patch('dcoscli.analytics._wait_and_capture',
return_value=(1, 'Traceback')):
assert main() == 1
# segment.io
_, kwargs = requests.post.call_args_list[1]
props = _base_properties()
props['err'] = 'Traceback'
props['exit_code'] = 1
assert kwargs['json'] == {'anonymousId': ANON_ID,
'event': SEGMENT_IO_CLI_ERROR_EVENT,
'properties': props}
props = _base_properties()
props['exit_code'] = 1
rollbar.report_message.assert_called_with('Traceback', 'error',
extra_data=props)
@_mock
def test_config_reporting_false():
'''Test that "core.reporting = false" blocks exception reporting.'''
args = [util.which('dcos')]
exit_code = _mock_analytics_run_exc(args, False)
env = _env_no_reporting()
assert rollbar.report_message.call_count == 0
assert exit_code == 1
with patch('sys.argv', args), \
patch.dict(os.environ, env), \
patch('dcoscli.analytics._wait_and_capture',
return_value=(1, 'Traceback')):
assert main() == 1
assert rollbar.report_message.call_count == 0
assert requests.post.call_count == 0
@_mock
def test_production_setting_true():
'''Test that env var DCOS_PRODUCTION=true sends exceptions to
the 'prod' environment.
'''Test that env var DCOS_PRODUCTION as empty string sends exceptions
to the 'prod' environment.
'''
args = [util.which('dcos')]
with patch.dict(os.environ, {'DCOS_PRODUCTION': 'true'}):
_mock_analytics_run(args)
env = _env_reporting()
env['DCOS_PRODUCTION'] = ''
with patch('sys.argv', args), patch.dict(os.environ, env):
assert main() == 0
_, kwargs = requests.post.call_args_list[0]
assert kwargs['auth'].username == SEGMENT_IO_WRITE_KEY_PROD
rollbar.init.assert_called_with(ROLLBAR_SERVER_POST_KEY, 'prod')
@_mock
def test_production_setting_false():
'''Test that env var DCOS_PRODUCTION=false sends exceptions to
the 'dev' environment.
@@ -68,47 +138,23 @@ def test_production_setting_false():
'''
args = [util.which('dcos')]
with patch.dict(os.environ, {'DCOS_PRODUCTION': 'false'}):
_mock_analytics_run(args)
env = _env_reporting()
env['DCOS_PRODUCTION'] = 'false'
with patch('sys.argv', args), patch.dict(os.environ, env):
assert main() == 0
_, kwargs = requests.post.call_args_list[0]
assert kwargs['auth'].username == SEGMENT_IO_WRITE_KEY_DEV
rollbar.init.assert_called_with(ROLLBAR_SERVER_POST_KEY, 'dev')
def _config_path_reporting():
return os.path.join('tests', 'data', 'analytics', 'dcos_reporting.toml')
def _config_path_no_reporting():
return os.path.join('tests', 'data', 'analytics', 'dcos_no_reporting.toml')
def _env_reporting():
return {constants.DCOS_CONFIG_ENV: _config_path_reporting()}
path = os.path.join('tests', 'data', 'analytics', 'dcos_reporting.toml')
return {constants.DCOS_CONFIG_ENV: path}
def _env_no_reporting():
return {constants.DCOS_CONFIG_ENV: _config_path_no_reporting()}
def _mock_analytics_run_exc(args, reporting=True):
dcoscli.main._wait_and_capture = Mock(return_value=(1, 'Traceback'))
return _mock_analytics_run(args, reporting)
def _mock_analytics_run(args, reporting=True):
env = _env_reporting() if reporting else _env_no_reporting()
with patch('sys.argv', args), patch.dict(os.environ, env):
rollbar.init = Mock()
rollbar.report_message = Mock()
return main()
def _analytics_properties(sysargs, **kwargs):
conf = config.load_from_path(_config_path_reporting())
defaults = {'cmd': ' '.join(sysargs),
'exit_code': 0,
'dcoscli.version': dcoscli.version,
'python_version': str(sys.version_info),
'config': json.dumps(list(conf.property_items()))}
defaults.update(kwargs)
return defaults
path = os.path.join('tests', 'data', 'analytics', 'dcos_no_reporting.toml')
return {constants.DCOS_CONFIG_ENV: path}

View File

@@ -1,6 +1,7 @@
import json
import os
import dcoscli.constants as cli_constants
import six
from dcos.api import constants
@@ -12,7 +13,8 @@ from common import exec_command
def env():
return {
constants.PATH_ENV: os.environ[constants.PATH_ENV],
constants.DCOS_CONFIG_ENV: os.path.join("tests", "data", "dcos.toml")
constants.DCOS_CONFIG_ENV: os.path.join("tests", "data", "dcos.toml"),
cli_constants.DCOS_PRODUCTION_ENV: 'false'
}
@@ -67,7 +69,7 @@ def test_list_property(env):
env)
assert returncode == 0
assert stdout == b"""core.reporting=True
assert stdout == b"""core.reporting=False
marathon.host=localhost
marathon.port=8080
package.cache=tmp/cache
@@ -341,9 +343,9 @@ def test_set_missing_property(env):
def test_set_core_property(env):
_set_value('core.reporting', 'false', env)
_get_value('core.reporting', False, env)
_set_value('core.reporting', 'true', env)
_get_value('core.reporting', True, env)
_set_value('core.reporting', 'false', env)
def _set_value(key, value, env):