diff --git a/setup.py b/setup.py index d69a949..fb2a512 100755 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ setuptools.setup( name='wavefront-sdk-python', - version='1.8.7', # The version number. Update with each pull request. + version='1.8.8', # The version number. Update with each pull request. author='Wavefront by VMware', url='https://github.com/wavefrontHQ/wavefront-sdk-python', license='Apache-2.0', diff --git a/test/test_client.py b/test/test_client.py new file mode 100644 index 0000000..b3c52e3 --- /dev/null +++ b/test/test_client.py @@ -0,0 +1,136 @@ +"""Unit Tests for Wavefront Python SDK. + +@author Travis Keep (travisk@vmware.com) +""" + +import unittest +import uuid +from unittest.mock import ANY +from unittest.mock import Mock +from unittest.mock import patch + +import requests + +from wavefront_sdk.client import WavefrontClient +from wavefront_sdk.entities.tracing.span_log import SpanLog + + +class TestUtils(unittest.TestCase): + """Test direct ingestion.""" + + def setUp(self): + self._sender = WavefrontClient( + 'no_server', + 'no_token', + flush_interval_seconds=86400, # turn off auto flushing + enable_internal_metrics=False) + self._spans_log_buffer = self._sender._spans_log_buffer + self._tracing_spans_buffer = self._sender._tracing_spans_buffer + self._response = Mock() + self._response.status_code = 200 + + def test_send_span_with_span_logs(self): + + self._sender.send_span( + 'spanName', + 1635123456789, + 12345, + 'localhost', + uuid.UUID('11111111-2222-3333-4444-555555555555'), + uuid.UUID('11111111-0000-0001-0002-123456789012'), + [uuid.UUID('55555555-4444-3333-2222-111111111111')], + None, + [('application', 'Wavefront'), ('service', 'test-spans')], + [SpanLog( + 1635123789456000, + {"FooLogKey": "FooLogValue"})]) + + self.maxDiff = None + + # Verify span logs correctly emitted + actual_line = self._spans_log_buffer.get() + expected_line = ( + '{"traceId": "11111111-2222-3333-4444-555555555555", ' + '"spanId": "11111111-0000-0001-0002-123456789012", ' + '"logs": [{"timestamp": 1635123789456000, ' + '"fields": {"FooLogKey": "FooLogValue"}}], ' + '"span": "\\"spanName\\" source=\\"localhost\\" ' + 'traceId=11111111-2222-3333-4444-555555555555 ' + 'spanId=11111111-0000-0001-0002-123456789012 ' + 'parent=55555555-4444-3333-2222-111111111111 ' + '\\"application\\"=\\"Wavefront\\" \\"service\\"=\\"test-spans\\" ' + '\\"_spanLogs\\"=\\"true\\" 1635123456789 12345\\n"}\n') + self.assertEqual(expected_line, actual_line) + + # Verify span correctly emitted + actual_line = self._tracing_spans_buffer.get() + expected_line = ( + '"spanName" source="localhost" ' + 'traceId=11111111-2222-3333-4444-555555555555 ' + 'spanId=11111111-0000-0001-0002-123456789012 ' + 'parent=55555555-4444-3333-2222-111111111111 ' + '"application"="Wavefront" "service"="test-spans" ' + '"_spanLogs"="true" 1635123456789 12345\n') + self.assertEqual(expected_line, actual_line) + + def test_send_span_without_span_logs(self): + + self._sender.send_span( + 'spanName', + 1635123456789, + 12345, + 'localhost', + uuid.UUID('11111111-2222-3333-4444-555555555555'), + uuid.UUID('11111111-0000-0001-0002-123456789012'), + [uuid.UUID('55555555-4444-3333-2222-111111111111')], + None, + [('application', 'Wavefront'), ('service', 'test-spans')], + []) + + self.maxDiff = None + + # Assert no span logs emitted + self.assertTrue(self._spans_log_buffer.empty()) + + # Verify span correctly emitted + actual_line = self._tracing_spans_buffer.get() + expected_line = ( + '"spanName" source="localhost" ' + 'traceId=11111111-2222-3333-4444-555555555555 ' + 'spanId=11111111-0000-0001-0002-123456789012 ' + 'parent=55555555-4444-3333-2222-111111111111 ' + '"application"="Wavefront" "service"="test-spans" ' + '1635123456789 12345\n') + self.assertEqual(expected_line, actual_line) + + def test_report_event(self): + + with patch.object( + requests, 'post', return_value=self._response) as mock_post: + self._sender._report( + '', self._sender.WAVEFRONT_EVENT_FORMAT, '', Mock()) + + mock_post.assert_called_once_with( + ANY, + params=None, + headers=ANY, + data=ANY, + timeout=self._sender.HTTP_TIMEOUT) + + def test_report_non_event(self): + + with patch.object( + requests, 'post', return_value=self._response) as mock_post: + self._sender._report('', 'metric', '', Mock()) + + mock_post.assert_called_once_with( + ANY, + params=ANY, + headers=ANY, + data=ANY, + timeout=self._sender.HTTP_TIMEOUT) + + +if __name__ == '__main__': + # run 'python -m unittest discover' from top-level to run tests + unittest.main() diff --git a/test/test_direct.py b/test/test_direct.py new file mode 100644 index 0000000..7a27ba1 --- /dev/null +++ b/test/test_direct.py @@ -0,0 +1,136 @@ +"""Unit Tests for Wavefront Python SDK. + +@author Travis Keep (travisk@vmware.com) +""" + +import unittest +import uuid +from unittest.mock import ANY +from unittest.mock import Mock +from unittest.mock import patch + +import requests + +from wavefront_sdk.direct import WavefrontDirectClient +from wavefront_sdk.entities.tracing.span_log import SpanLog + + +class TestUtils(unittest.TestCase): + """Test direct ingestion.""" + + def setUp(self): + self._sender = WavefrontDirectClient( + 'no_server', + 'no_token', + flush_interval_seconds=86400, # turn off auto flushing + enable_internal_metrics=False) + self._spans_log_buffer = self._sender._spans_log_buffer + self._tracing_spans_buffer = self._sender._tracing_spans_buffer + self._response = Mock() + self._response.status_code = 200 + + def test_send_span_with_span_logs(self): + + self._sender.send_span( + 'spanName', + 1635123456789, + 12345, + 'localhost', + uuid.UUID('11111111-2222-3333-4444-555555555555'), + uuid.UUID('11111111-0000-0001-0002-123456789012'), + [uuid.UUID('55555555-4444-3333-2222-111111111111')], + None, + [('application', 'Wavefront'), ('service', 'test-spans')], + [SpanLog( + 1635123789456000, + {"FooLogKey": "FooLogValue"})]) + + self.maxDiff = None + + # Verify span logs correctly emitted + actual_line = self._spans_log_buffer.get() + expected_line = ( + '{"traceId": "11111111-2222-3333-4444-555555555555", ' + '"spanId": "11111111-0000-0001-0002-123456789012", ' + '"logs": [{"timestamp": 1635123789456000, ' + '"fields": {"FooLogKey": "FooLogValue"}}], ' + '"span": "\\"spanName\\" source=\\"localhost\\" ' + 'traceId=11111111-2222-3333-4444-555555555555 ' + 'spanId=11111111-0000-0001-0002-123456789012 ' + 'parent=55555555-4444-3333-2222-111111111111 ' + '\\"application\\"=\\"Wavefront\\" \\"service\\"=\\"test-spans\\" ' + '\\"_spanLogs\\"=\\"true\\" 1635123456789 12345\\n"}\n') + self.assertEqual(expected_line, actual_line) + + # Verify span correctly emitted + actual_line = self._tracing_spans_buffer.get() + expected_line = ( + '"spanName" source="localhost" ' + 'traceId=11111111-2222-3333-4444-555555555555 ' + 'spanId=11111111-0000-0001-0002-123456789012 ' + 'parent=55555555-4444-3333-2222-111111111111 ' + '"application"="Wavefront" "service"="test-spans" ' + '"_spanLogs"="true" 1635123456789 12345\n') + self.assertEqual(expected_line, actual_line) + + def test_send_span_without_span_logs(self): + + self._sender.send_span( + 'spanName', + 1635123456789, + 12345, + 'localhost', + uuid.UUID('11111111-2222-3333-4444-555555555555'), + uuid.UUID('11111111-0000-0001-0002-123456789012'), + [uuid.UUID('55555555-4444-3333-2222-111111111111')], + None, + [('application', 'Wavefront'), ('service', 'test-spans')], + []) + + self.maxDiff = None + + # Assert no span logs emitted + self.assertTrue(self._spans_log_buffer.empty()) + + # Verify span correctly emitted + actual_line = self._tracing_spans_buffer.get() + expected_line = ( + '"spanName" source="localhost" ' + 'traceId=11111111-2222-3333-4444-555555555555 ' + 'spanId=11111111-0000-0001-0002-123456789012 ' + 'parent=55555555-4444-3333-2222-111111111111 ' + '"application"="Wavefront" "service"="test-spans" ' + '1635123456789 12345\n') + self.assertEqual(expected_line, actual_line) + + def test_report_event(self): + + with patch.object( + requests, 'post', return_value=self._response) as mock_post: + self._sender._report( + '', self._sender.WAVEFRONT_EVENT_FORMAT, '', Mock()) + + mock_post.assert_called_once_with( + ANY, + params=None, + headers=ANY, + data=ANY, + timeout=self._sender.HTTP_TIMEOUT) + + def test_report_non_event(self): + + with patch.object( + requests, 'post', return_value=self._response) as mock_post: + self._sender._report('', 'metric', '', Mock()) + + mock_post.assert_called_once_with( + ANY, + params=ANY, + headers=ANY, + data=ANY, + timeout=self._sender.HTTP_TIMEOUT) + + +if __name__ == '__main__': + # run 'python -m unittest discover' from top-level to run tests + unittest.main() diff --git a/test/test_proxy.py b/test/test_proxy.py new file mode 100644 index 0000000..2f21cce --- /dev/null +++ b/test/test_proxy.py @@ -0,0 +1,95 @@ +"""Unit Tests for Wavefront Python SDK. + +@author Travis Keep (travisk@vmware.com) +""" + +import unittest +import uuid +from unittest.mock import Mock +from unittest.mock import call + +from wavefront_sdk.common.proxy_connection_handler import ( + ProxyConnectionHandler) +from wavefront_sdk.entities.tracing.span_log import SpanLog +from wavefront_sdk.proxy import WavefrontProxyClient + + +class TestUtils(unittest.TestCase): + """Test proxy ingestion.""" + + def setUp(self): + self._sender = WavefrontProxyClient( + 'no_host', + None, + None, + None, + enable_internal_metrics=False) + self._sender._tracing_proxy_connection_handler = ( + Mock(spec=ProxyConnectionHandler)) + self._tracing = self._sender._tracing_proxy_connection_handler + + def test_send_span_with_span_logs(self): + + self._sender.send_span( + 'spanName', + 1635123456789, + 12345, + 'localhost', + uuid.UUID('11111111-2222-3333-4444-555555555555'), + uuid.UUID('11111111-0000-0001-0002-123456789012'), + [uuid.UUID('55555555-4444-3333-2222-111111111111')], + None, + [('application', 'Wavefront'), ('service', 'test-spans')], + [SpanLog( + 1635123789456000, + {"FooLogKey": "FooLogValue"})]) + + expected_span_line = ( + '"spanName" source="localhost" ' + 'traceId=11111111-2222-3333-4444-555555555555 ' + 'spanId=11111111-0000-0001-0002-123456789012 ' + 'parent=55555555-4444-3333-2222-111111111111 ' + '"application"="Wavefront" "service"="test-spans" ' + '"_spanLogs"="true" 1635123456789 12345\n') + expected_span_log_line = ( + '{"traceId": "11111111-2222-3333-4444-555555555555", ' + '"spanId": "11111111-0000-0001-0002-123456789012", ' + '"logs": [{"timestamp": 1635123789456000, ' + '"fields": {"FooLogKey": "FooLogValue"}}], ' + '"span": "\\"spanName\\" source=\\"localhost\\" ' + 'traceId=11111111-2222-3333-4444-555555555555 ' + 'spanId=11111111-0000-0001-0002-123456789012 ' + 'parent=55555555-4444-3333-2222-111111111111 ' + '\\"application\\"=\\"Wavefront\\" \\"service\\"=\\"test-spans\\" ' + '\\"_spanLogs\\"=\\"true\\" 1635123456789 12345\\n"}\n') + self._tracing.send_data.assert_has_calls( + [call(expected_span_line), call(expected_span_log_line)]) + self.assertEqual(2, self._tracing.send_data.call_count) + + def test_send_span_without_span_logs(self): + + self._sender.send_span( + 'spanName', + 1635123456789, + 12345, + 'localhost', + uuid.UUID('11111111-2222-3333-4444-555555555555'), + uuid.UUID('11111111-0000-0001-0002-123456789012'), + [uuid.UUID('55555555-4444-3333-2222-111111111111')], + None, + [('application', 'Wavefront'), ('service', 'test-spans')], + []) + + expected_span_line = ( + '"spanName" source="localhost" ' + 'traceId=11111111-2222-3333-4444-555555555555 ' + 'spanId=11111111-0000-0001-0002-123456789012 ' + 'parent=55555555-4444-3333-2222-111111111111 ' + '"application"="Wavefront" "service"="test-spans" ' + '1635123456789 12345\n') + self._tracing.send_data.assert_called_once_with(expected_span_line) + + +if __name__ == '__main__': + # run 'python -m unittest discover' from top-level to run tests + unittest.main() diff --git a/wavefront_sdk/client.py b/wavefront_sdk/client.py index c653e5c..06ac9d5 100644 --- a/wavefront_sdk/client.py +++ b/wavefront_sdk/client.py @@ -41,6 +41,7 @@ class WavefrontClient(connection_handler.ConnectionHandler, REPORT_END_POINT = '/report' EVENT_END_POINT = '/api/v2/event' + HTTP_TIMEOUT = 60.0 def __init__(self, server, token, max_queue_size=50000, batch_size=10000, flush_interval_seconds=5, enable_internal_metrics=True, @@ -192,13 +193,15 @@ def _report(self, points, data_format, entity_prefix, report_errors): response = requests.post(self.server + self.EVENT_END_POINT, params=None, headers=self._event_headers, - data=points) + data=points, + timeout=self.HTTP_TIMEOUT) else: params = {'f': data_format} compressed_data = utils.gzip_compress(points.encode('utf-8')) response = requests.post(self.server + self.REPORT_END_POINT, params=params, headers=self._headers, - data=compressed_data) + data=compressed_data, + timeout=self.HTTP_TIMEOUT) self._sdk_metrics_registry.new_delta_counter( f'{entity_prefix}.report.{response.status_code}').inc() @@ -445,7 +448,7 @@ def send_span(self, name, start_millis, duration_millis, source, trace_id, if span_logs: try: line_data = utils.span_log_to_line_data(trace_id, span_id, - span_logs) + span_logs, line_data) self._span_logs_valid.inc() except ValueError as error: self._span_logs_invalid.inc() diff --git a/wavefront_sdk/client_factory.py b/wavefront_sdk/client_factory.py index 4715a97..7b6be67 100644 --- a/wavefront_sdk/client_factory.py +++ b/wavefront_sdk/client_factory.py @@ -10,7 +10,6 @@ from wavefront_sdk.multi_clients import WavefrontMultiClient -# pylint: disable=W0232 # Class has no __init__ method class WavefrontClientFactory: """Wavefront client factory. diff --git a/wavefront_sdk/common/utils.py b/wavefront_sdk/common/utils.py index 2979883..f64075f 100644 --- a/wavefront_sdk/common/utils.py +++ b/wavefront_sdk/common/utils.py @@ -367,18 +367,20 @@ def tracing_span_to_line_data(name, start_millis, duration_millis, source, return ' '.join(str_builder) + '\n' -def span_log_to_line_data(trace_id, span_id, span_logs, scrambler=None): +def span_log_to_line_data(trace_id, span_id, span_logs, span, scrambler=None): """Wavefront Tracing Span Log JSON format. @param trace_id: Trace ID @param span_id: Span ID @param span_logs: Span Log + @param span: Span line @param scrambler: Additional UUID, optional @return: Span Log in JSON Format """ span_log_json = {'traceId': str(trace_id), 'spanId': str(span_id), - 'logs': span_logs} + 'logs': span_logs, + 'span': str(span)} if scrambler: span_log_json['_scrambler'] = str(scrambler) return json.dumps(span_log_json, default=lambda o: o.__dict__) + '\n' diff --git a/wavefront_sdk/direct.py b/wavefront_sdk/direct.py index 200f540..a29c2a0 100644 --- a/wavefront_sdk/direct.py +++ b/wavefront_sdk/direct.py @@ -44,6 +44,7 @@ class WavefrontDirectClient(connection_handler.ConnectionHandler, REPORT_END_POINT = '/report' EVENT_END_POINT = '/api/v2/event' + HTTP_TIMEOUT = 60.0 # pylint: disable=too-many-arguments # pylint: disable=too-many-statements @@ -196,14 +197,16 @@ def _report(self, points, data_format, entity_prefix, report_errors): response = requests.post(self.server + self.EVENT_END_POINT, params=None, headers=self._event_headers, - data=points) + data=points, + timeout=self.HTTP_TIMEOUT) else: params = {'f': data_format} compressed_data = utils.gzip_compress(points.encode('utf-8')) response = requests.post(self.server + self.REPORT_END_POINT, params=params, headers=self._headers, - data=compressed_data) + data=compressed_data, + timeout=self.HTTP_TIMEOUT) status_code = response.status_code self._sdk_metrics_registry.new_delta_counter( f'{entity_prefix}.report.{status_code}').inc() @@ -514,7 +517,7 @@ def send_span(self, name, start_millis, duration_millis, source, trace_id, if span_logs: try: line_data = utils.span_log_to_line_data( - trace_id, span_id, span_logs) + trace_id, span_id, span_logs, line_data) self._span_logs_valid.inc() except ValueError as error: self._span_logs_invalid.inc() diff --git a/wavefront_sdk/proxy.py b/wavefront_sdk/proxy.py index 949975f..7a578bb 100644 --- a/wavefront_sdk/proxy.py +++ b/wavefront_sdk/proxy.py @@ -340,8 +340,9 @@ def send_span(self, name, start_millis, duration_millis, source, trace_id, self._default_source) self._spans_valid.inc() self._tracing_proxy_connection_handler.send_data(line_data) - except ValueError: + except ValueError as error: self._spans_invalid.inc() + raise error except Exception as error: self._spans_dropped.inc() self._tracing_proxy_connection_handler.increment_failure_count() @@ -349,7 +350,7 @@ def send_span(self, name, start_millis, duration_millis, source, trace_id, if span_logs: try: span_log_line_data = utils.span_log_to_line_data( - trace_id, span_id, span_logs) + trace_id, span_id, span_logs, line_data) self._span_logs_valid.inc() self._tracing_proxy_connection_handler.send_data( span_log_line_data)