Skip to content

Commit ee75cb8

Browse files
Sparkyczbasepi
andauthored
Add some data about response data at ES instrumentation (#1108)
* Add some data about response data at ES instrumentation * Update docs * Fix failing tests * Fix typo (rows_affected not row_affected) and tests * Update CHANGELOG Co-authored-by: Colton Myers <[email protected]>
1 parent f64994d commit ee75cb8

File tree

7 files changed

+122
-28
lines changed

7 files changed

+122
-28
lines changed

CHANGELOG.asciidoc

+13
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,19 @@ endif::[]
3030
//===== Bug fixes
3131
//
3232
33+
=== Unreleased
34+
35+
// Unreleased changes go here
36+
// When the next release happens, nest these changes under the "Python Agent version 5.x" heading
37+
[float]
38+
===== Features
39+
40+
* Add additional context information about elasticsearch client requests {pull}1108[#1108]
41+
42+
[float]
43+
===== Bug fixes
44+
45+
3346
[[release-notes-6.x]]
3447
=== Python Agent version 6.x
3548

docs/supported-technologies.asciidoc

+4
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,10 @@ Celery tasks will be recorded automatically with Django and Flask only.
112112

113113
Instrumented methods:
114114

115+
* `elasticsearch.transport.Transport.perform_request`
115116
* `elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request`
116117
* `elasticsearch.connection.http_requests.RequestsHttpConnection.perform_request`
118+
* `elasticsearch._async.transport.AsyncTransport.perform_request`
117119
* `elasticsearch_async.connection.AIOHttpConnection.perform_request`
118120

119121
Additionally, the instrumentation wraps the following methods of the `Elasticsearch` client class:
@@ -127,6 +129,8 @@ Collected trace data:
127129

128130
* the query string (if available)
129131
* the `query` element from the request body (if available)
132+
* the response status code
133+
* the count of affected rows (if available)
130134

131135
We recommend using keyword arguments only with elasticsearch-py, as recommended by
132136
https://elasticsearch-py.readthedocs.io/en/master/api.html#api-documentation[the elasticsearch-py docs].

elasticapm/instrumentation/packages/asyncio/elasticsearch.py

+35-8
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,14 @@
3030

3131
import elasticapm
3232
from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule
33-
from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin
33+
from elasticapm.instrumentation.packages.elasticsearch import (
34+
ElasticsearchConnectionInstrumentation,
35+
ElasticsearchTransportInstrumentation,
36+
)
37+
from elasticapm.traces import execution_context
3438

3539

36-
class ElasticSearchAsyncConnection(ElasticSearchConnectionMixin, AsyncAbstractInstrumentedModule):
40+
class ElasticSearchAsyncConnection(ElasticsearchConnectionInstrumentation, AsyncAbstractInstrumentedModule):
3741
name = "elasticsearch_connection"
3842

3943
instrument_list = [
@@ -42,16 +46,39 @@ class ElasticSearchAsyncConnection(ElasticSearchConnectionMixin, AsyncAbstractIn
4246
]
4347

4448
async def call(self, module, method, wrapped, instance, args, kwargs):
45-
signature = self.get_signature(args, kwargs)
46-
context = self.get_context(instance, args, kwargs)
49+
span = execution_context.get_span()
4750

51+
self._update_context_by_request_data(span.context, instance, args, kwargs)
52+
53+
status_code, headers, raw_data = await wrapped(*args, **kwargs)
54+
55+
span.context["http"] = {"status_code": status_code}
56+
57+
return status_code, headers, raw_data
58+
59+
60+
class ElasticsearchAsyncTransportInstrumentation(
61+
ElasticsearchTransportInstrumentation, AsyncAbstractInstrumentedModule
62+
):
63+
name = "elasticsearch_connection"
64+
65+
instrument_list = [
66+
("elasticsearch._async.transport", "AsyncTransport.perform_request"),
67+
]
68+
69+
async def call(self, module, method, wrapped, instance, args, kwargs):
4870
async with elasticapm.async_capture_span(
49-
signature,
71+
self._get_signature(args, kwargs),
5072
span_type="db",
5173
span_subtype="elasticsearch",
5274
span_action="query",
53-
extra=context,
75+
extra={},
5476
skip_frames=2,
5577
leaf=True,
56-
):
57-
return await wrapped(*args, **kwargs)
78+
) as span:
79+
result_data = await wrapped(*args, **kwargs)
80+
81+
if isinstance(result_data, dict) and "hits" in result_data:
82+
span.context["db"]["rows_affected"] = result_data["hits"]["total"]["value"]
83+
84+
return result_data

elasticapm/instrumentation/packages/elasticsearch.py

+40-20
Original file line numberDiff line numberDiff line change
@@ -34,32 +34,42 @@
3434

3535
import elasticapm
3636
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
37+
from elasticapm.traces import execution_context
3738
from elasticapm.utils.logging import get_logger
3839

3940
logger = get_logger("elasticapm.instrument")
4041

4142
should_capture_body_re = re.compile("/(_search|_msearch|_count|_async_search|_sql|_eql)(/|$)")
4243

4344

44-
class ElasticSearchConnectionMixin(object):
45-
query_methods = ("search", "count", "delete_by_query")
45+
class ElasticsearchConnectionInstrumentation(AbstractInstrumentedModule):
46+
name = "elasticsearch_connection"
4647

47-
def get_signature(self, args, kwargs):
48-
args_len = len(args)
49-
http_method = args[0] if args_len else kwargs.get("method")
50-
http_path = args[1] if args_len > 1 else kwargs.get("url")
48+
instrument_list = [
49+
("elasticsearch.connection.http_urllib3", "Urllib3HttpConnection.perform_request"),
50+
("elasticsearch.connection.http_requests", "RequestsHttpConnection.perform_request"),
51+
]
5152

52-
return "ES %s %s" % (http_method, http_path)
53+
def call(self, module, method, wrapped, instance, args, kwargs):
54+
span = execution_context.get_span()
55+
56+
self._update_context_by_request_data(span.context, instance, args, kwargs)
57+
58+
status_code, headers, raw_data = wrapped(*args, **kwargs)
59+
60+
span.context["http"] = {"status_code": status_code}
5361

54-
def get_context(self, instance, args, kwargs):
62+
return status_code, headers, raw_data
63+
64+
def _update_context_by_request_data(self, context, instance, args, kwargs):
5565
args_len = len(args)
5666
url = args[1] if args_len > 1 else kwargs.get("url")
5767
params = args[2] if args_len > 2 else kwargs.get("params")
5868
body_serialized = args[3] if args_len > 3 else kwargs.get("body")
5969

6070
should_capture_body = bool(should_capture_body_re.search(url))
6171

62-
context = {"db": {"type": "elasticsearch"}}
72+
context["db"] = {"type": "elasticsearch"}
6373
if should_capture_body:
6474
query = []
6575
# using both q AND body is allowed in some API endpoints / ES versions,
@@ -76,32 +86,42 @@ def get_context(self, instance, args, kwargs):
7686
query.append(body_serialized)
7787
if query:
7888
context["db"]["statement"] = "\n\n".join(query)
89+
7990
context["destination"] = {
8091
"address": instance.host,
8192
"service": {"name": "elasticsearch", "resource": "elasticsearch", "type": "db"},
8293
}
83-
return context
8494

8595

86-
class ElasticsearchConnectionInstrumentation(ElasticSearchConnectionMixin, AbstractInstrumentedModule):
96+
class ElasticsearchTransportInstrumentation(AbstractInstrumentedModule):
8797
name = "elasticsearch_connection"
8898

8999
instrument_list = [
90-
("elasticsearch.connection.http_urllib3", "Urllib3HttpConnection.perform_request"),
91-
("elasticsearch.connection.http_requests", "RequestsHttpConnection.perform_request"),
100+
("elasticsearch.transport", "Transport.perform_request"),
92101
]
93102

94103
def call(self, module, method, wrapped, instance, args, kwargs):
95-
signature = self.get_signature(args, kwargs)
96-
context = self.get_context(instance, args, kwargs)
97-
98104
with elasticapm.capture_span(
99-
signature,
105+
self._get_signature(args, kwargs),
100106
span_type="db",
101107
span_subtype="elasticsearch",
102108
span_action="query",
103-
extra=context,
109+
extra={},
104110
skip_frames=2,
105111
leaf=True,
106-
):
107-
return wrapped(*args, **kwargs)
112+
) as span:
113+
result_data = wrapped(*args, **kwargs)
114+
115+
try:
116+
span.context["db"]["rows_affected"] = result_data["hits"]["total"]["value"]
117+
except (KeyError, TypeError):
118+
pass
119+
120+
return result_data
121+
122+
def _get_signature(self, args, kwargs):
123+
args_len = len(args)
124+
http_method = args[0] if args_len else kwargs.get("method")
125+
http_path = args[1] if args_len > 1 else kwargs.get("url")
126+
127+
return "ES %s %s" % (http_method, http_path)

elasticapm/instrumentation/register.py

+2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
"elasticapm.instrumentation.packages.sqlite.SQLiteInstrumentation",
5555
"elasticapm.instrumentation.packages.urllib3.Urllib3Instrumentation",
5656
"elasticapm.instrumentation.packages.elasticsearch.ElasticsearchConnectionInstrumentation",
57+
"elasticapm.instrumentation.packages.elasticsearch.ElasticsearchTransportInstrumentation",
5758
"elasticapm.instrumentation.packages.cassandra.CassandraInstrumentation",
5859
"elasticapm.instrumentation.packages.pymssql.PyMSSQLInstrumentation",
5960
"elasticapm.instrumentation.packages.pyodbc.PyODBCInstrumentation",
@@ -73,6 +74,7 @@
7374
"elasticapm.instrumentation.packages.asyncio.aiohttp_client.AioHttpClientInstrumentation",
7475
"elasticapm.instrumentation.packages.asyncio.httpx.HttpxAsyncClientInstrumentation",
7576
"elasticapm.instrumentation.packages.asyncio.elasticsearch.ElasticSearchAsyncConnection",
77+
"elasticapm.instrumentation.packages.asyncio.elasticsearch.ElasticsearchAsyncTransportInstrumentation",
7678
"elasticapm.instrumentation.packages.asyncio.aiopg.AioPGInstrumentation",
7779
"elasticapm.instrumentation.packages.asyncio.asyncpg.AsyncPGInstrumentation",
7880
"elasticapm.instrumentation.packages.tornado.TornadoRequestExecuteInstrumentation",

tests/instrumentation/asyncio_tests/async_elasticsearch_client_tests.py

+6
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ async def test_info(instrument, elasticapm_client, async_elasticsearch):
9191
assert span["subtype"] == "elasticsearch"
9292
assert span["action"] == "query"
9393
assert span["sync"] is False
94+
assert span["context"]["http"]["status_code"] == 200
9495

9596

9697
async def test_create(instrument, elasticapm_client, async_elasticsearch):
@@ -124,6 +125,7 @@ async def test_create(instrument, elasticapm_client, async_elasticsearch):
124125
assert span["action"] == "query"
125126
assert span["context"]["db"]["type"] == "elasticsearch"
126127
assert "statement" not in span["context"]["db"]
128+
assert span["context"]["http"]["status_code"] == 201
127129

128130

129131
async def test_search_body(instrument, elasticapm_client, async_elasticsearch):
@@ -152,6 +154,9 @@ async def test_search_body(instrument, elasticapm_client, async_elasticsearch):
152154
'{"query": {"term": {"user": "kimchy"}}, "sort": ["userid"]}'
153155
)
154156
assert span["sync"] is False
157+
if ES_VERSION[0] >= 7:
158+
assert span["context"]["db"]["rows_affected"] == 1
159+
assert span["context"]["http"]["status_code"] == 200
155160

156161

157162
async def test_count_body(instrument, elasticapm_client, async_elasticsearch):
@@ -175,3 +180,4 @@ async def test_count_body(instrument, elasticapm_client, async_elasticsearch):
175180
assert span["context"]["db"]["type"] == "elasticsearch"
176181
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query": {"term": {"user": "kimchy"}}}')
177182
assert span["sync"] is False
183+
assert span["context"]["http"]["status_code"] == 200

0 commit comments

Comments
 (0)