Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 80 additions & 34 deletions src/datadog/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <unordered_map>
#include <unordered_set>

#include "clock.h"
#include "dict_reader.h"
#include "dict_writer.h"
#include "http_client.h"
Expand Down Expand Up @@ -95,6 +96,10 @@ CURLcode CurlLibrary::easy_setopt_writefunction(CURL *handle,
return curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, on_write);
}

CURLcode CurlLibrary::easy_setopt_timeout_ms(CURL *handle, long timeout_ms) {
return curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, timeout_ms);
}

const char *CurlLibrary::easy_strerror(CURLcode error) {
return curl_easy_strerror(error);
}
Expand Down Expand Up @@ -162,6 +167,7 @@ class CurlImpl {
std::mutex mutex_;
CurlLibrary &curl_;
const std::shared_ptr<Logger> logger_;
Clock clock_;
CURLM *multi_handle_;
std::unordered_set<CURL *> request_handles_;
std::list<CURL *> new_handles_;
Expand All @@ -179,6 +185,7 @@ class CurlImpl {
char error_buffer[CURL_ERROR_SIZE] = "";
std::unordered_map<std::string, std::string> response_headers_lower;
std::string response_body;
std::chrono::steady_clock::time_point deadline;

~Request();
};
Expand Down Expand Up @@ -221,13 +228,14 @@ class CurlImpl {
static StringView trim(StringView);

public:
explicit CurlImpl(const std::shared_ptr<Logger> &, CurlLibrary &,
const Curl::ThreadGenerator &);
explicit CurlImpl(const std::shared_ptr<Logger> &, const Clock &,
CurlLibrary &, const Curl::ThreadGenerator &);
~CurlImpl();

Expected<void> post(const URL &url, HeadersSetter set_headers,
std::string body, ResponseHandler on_response,
ErrorHandler on_error);
ErrorHandler on_error,
std::chrono::steady_clock::time_point deadline);

void drain(std::chrono::steady_clock::time_point deadline);
};
Expand All @@ -242,22 +250,25 @@ void throw_on_error(CURLcode result) {

} // namespace

Curl::Curl(const std::shared_ptr<Logger> &logger) : Curl(logger, libcurl) {}
Curl::Curl(const std::shared_ptr<Logger> &logger, const Clock &clock)
: Curl(logger, clock, libcurl) {}

Curl::Curl(const std::shared_ptr<Logger> &logger, CurlLibrary &curl)
: Curl(logger, curl,
Curl::Curl(const std::shared_ptr<Logger> &logger, const Clock &clock,
CurlLibrary &curl)
: Curl(logger, clock, curl,
[](auto &&func) { return std::thread(std::move(func)); }) {}

Curl::Curl(const std::shared_ptr<Logger> &logger, CurlLibrary &curl,
const Curl::ThreadGenerator &make_thread)
: impl_(new CurlImpl{logger, curl, make_thread}) {}
Curl::Curl(const std::shared_ptr<Logger> &logger, const Clock &clock,
CurlLibrary &curl, const Curl::ThreadGenerator &make_thread)
: impl_(new CurlImpl{logger, clock, curl, make_thread}) {}

Curl::~Curl() { delete impl_; }

Expected<void> Curl::post(const URL &url, HeadersSetter set_headers,
std::string body, ResponseHandler on_response,
ErrorHandler on_error) {
return impl_->post(url, set_headers, body, on_response, on_error);
ErrorHandler on_error,
std::chrono::steady_clock::time_point deadline) {
return impl_->post(url, set_headers, body, on_response, on_error, deadline);
}

void Curl::drain(std::chrono::steady_clock::time_point deadline) {
Expand All @@ -268,10 +279,11 @@ nlohmann::json Curl::config_json() const {
return nlohmann::json::object({{"type", "datadog::tracing::Curl"}});
}

CurlImpl::CurlImpl(const std::shared_ptr<Logger> &logger, CurlLibrary &curl,
const Curl::ThreadGenerator &make_thread)
CurlImpl::CurlImpl(const std::shared_ptr<Logger> &logger, const Clock &clock,
CurlLibrary &curl, const Curl::ThreadGenerator &make_thread)
: curl_(curl),
logger_(logger),
clock_(clock),
shutting_down_(false),
num_active_handles_(0) {
curl_.global_init(CURL_GLOBAL_ALL);
Expand Down Expand Up @@ -311,24 +323,35 @@ CurlImpl::~CurlImpl() {
}
log_on_error(curl_.multi_wakeup(multi_handle_));
event_loop_.join();

log_on_error(curl_.multi_cleanup(multi_handle_));
curl_.global_cleanup();
}

Expected<void> CurlImpl::post(const HTTPClient::URL &url,
HeadersSetter set_headers, std::string body,
ResponseHandler on_response,
ErrorHandler on_error) try {
Expected<void> CurlImpl::post(
const HTTPClient::URL &url, HeadersSetter set_headers, std::string body,
ResponseHandler on_response, ErrorHandler on_error,
std::chrono::steady_clock::time_point deadline) try {
if (multi_handle_ == nullptr) {
return Error{Error::CURL_HTTP_CLIENT_NOT_RUNNING,
"Unable to send request via libcurl because the HTTP client "
"failed to start."};
}

HeaderWriter writer{curl_};
set_headers(writer);
auto cleanup_list = [&](auto list) { curl_.slist_free_all(list); };
std::unique_ptr<curl_slist, decltype(cleanup_list)> headers{
writer.release(), std::move(cleanup_list)};

auto request = std::make_unique<Request>();

request->curl = &curl_;
request->request_headers = headers.get();
request->request_body = std::move(body);
request->on_response = std::move(on_response);
request->on_error = std::move(on_error);
request->deadline = std::move(deadline);

auto cleanup_handle = [&](auto handle) { curl_.easy_cleanup(handle); };
std::unique_ptr<CURL, decltype(cleanup_handle)> handle{
Expand All @@ -339,6 +362,8 @@ Expected<void> CurlImpl::post(const HTTPClient::URL &url,
"unable to initialize a curl handle for request sending"};
}

throw_on_error(
curl_.easy_setopt_httpheader(handle.get(), request->request_headers));
throw_on_error(curl_.easy_setopt_private(handle.get(), request.get()));
throw_on_error(
curl_.easy_setopt_errorbuffer(handle.get(), request->error_buffer));
Expand All @@ -365,25 +390,17 @@ Expected<void> CurlImpl::post(const HTTPClient::URL &url,
handle.get(), (url.scheme + "://" + url.authority + url.path).c_str()));
}

HeaderWriter writer{curl_};
set_headers(writer);
auto cleanup_list = [&](auto list) { curl_.slist_free_all(list); };
std::unique_ptr<curl_slist, decltype(cleanup_list)> headers{
writer.release(), std::move(cleanup_list)};
request->request_headers = headers.get();
throw_on_error(
curl_.easy_setopt_httpheader(handle.get(), request->request_headers));

std::list<CURL *> node;
node.push_back(handle.get());
{
std::lock_guard<std::mutex> lock(mutex_);
new_handles_.splice(new_handles_.end(), node);

headers.release();
handle.release();
request.release();
(void)headers.release();
(void)handle.release();
(void)request.release();
}

log_on_error(curl_.multi_wakeup(multi_handle_));

return nullopt;
Expand Down Expand Up @@ -464,6 +481,7 @@ CURLMcode CurlImpl::log_on_error(CURLMcode result) {
void CurlImpl::run() {
int num_messages_remaining;
CURLMsg *message;
const int max_wait_milliseconds = 10000;
std::unique_lock<std::mutex> lock(mutex_);

for (;;) {
Expand All @@ -478,16 +496,46 @@ void CurlImpl::run() {
&num_messages_remaining))) {
handle_message(*message, lock);
}

const int max_wait_milliseconds = 10 * 1000;
lock.unlock();
log_on_error(curl_.multi_poll(multi_handle_, nullptr, 0,
max_wait_milliseconds, nullptr));
lock.lock();

// New requests might have been added while we were sleeping.
for (; !new_handles_.empty(); new_handles_.pop_front()) {
CURL *const handle = new_handles_.front();
CURL *handle = new_handles_.front();
char *user_data;
if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) !=
CURLE_OK) {
curl_.easy_cleanup(handle);
continue;
}

auto *request = reinterpret_cast<Request *>(user_data);
const auto timeout = request->deadline - clock_().tick;
if (timeout <= std::chrono::steady_clock::time_point::duration::zero()) {
std::string message;
message +=
"Request deadline exceeded before request was even added to "
"libcurl "
"event loop. Deadline was ";
message += std::to_string(
-std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
.count());
message += " nanoseconds ago.";
request->on_error(
Error{Error::CURL_DEADLINE_EXCEEDED_BEFORE_REQUEST_START,
std::move(message)});

curl_.easy_cleanup(handle);
delete request;

continue;
}

log_on_error(curl_.easy_setopt_timeout_ms(
handle, std::chrono::duration_cast<std::chrono::milliseconds>(timeout)
.count()));
log_on_error(curl_.multi_add_handle(multi_handle_, handle));
request_handles_.insert(handle);
}
Expand All @@ -510,8 +558,6 @@ void CurlImpl::run() {
}

request_handles_.clear();
log_on_error(curl_.multi_cleanup(multi_handle_));
curl_.global_cleanup();
}

void CurlImpl::handle_message(const CURLMsg &message,
Expand Down
12 changes: 8 additions & 4 deletions src/datadog/curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <string>
#include <thread>

#include "clock.h"
#include "http_client.h"
#include "json_fwd.hpp"

Expand Down Expand Up @@ -59,6 +60,7 @@ class CurlLibrary {
virtual CURLcode easy_setopt_url(CURL *handle, const char *url);
virtual CURLcode easy_setopt_writedata(CURL *handle, void *data);
virtual CURLcode easy_setopt_writefunction(CURL *handle, WriteCallback);
virtual CURLcode easy_setopt_timeout_ms(CURL *handle, long timeout_ms);
virtual const char *easy_strerror(CURLcode error);
virtual void global_cleanup();
virtual CURLcode global_init(long flags);
Expand Down Expand Up @@ -86,16 +88,18 @@ class Curl : public HTTPClient {
public:
using ThreadGenerator = std::function<std::thread(std::function<void()> &&)>;

explicit Curl(const std::shared_ptr<Logger> &);
Curl(const std::shared_ptr<Logger> &, CurlLibrary &);
Curl(const std::shared_ptr<Logger> &, CurlLibrary &, const ThreadGenerator &);
explicit Curl(const std::shared_ptr<Logger> &, const Clock &);
Curl(const std::shared_ptr<Logger> &, const Clock &, CurlLibrary &);
Curl(const std::shared_ptr<Logger> &, const Clock &, CurlLibrary &,
const ThreadGenerator &);
~Curl();

Curl(const Curl &) = delete;

Expected<void> post(const URL &url, HeadersSetter set_headers,
std::string body, ResponseHandler on_response,
ErrorHandler on_error) override;
ErrorHandler on_error,
std::chrono::steady_clock::time_point deadline) override;

void drain(std::chrono::steady_clock::time_point deadline) override;

Expand Down
46 changes: 25 additions & 21 deletions src/datadog/datadog_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,19 @@ std::variant<CollectorResponse, std::string> parse_agent_traces_response(
DatadogAgent::DatadogAgent(
const FinalizedDatadogAgentConfig& config,
const std::shared_ptr<TracerTelemetry>& tracer_telemetry,
const Clock& clock, const std::shared_ptr<Logger>& logger)
const std::shared_ptr<Logger>& logger)
: tracer_telemetry_(tracer_telemetry),
clock_(clock),
clock_(config.clock),
logger_(logger),
traces_endpoint_(traces_endpoint(config.url)),
telemetry_endpoint_(telemetry_endpoint(config.url)),
http_client_(config.http_client),
event_scheduler_(config.event_scheduler),
cancel_scheduled_flush_(event_scheduler_->schedule_recurring_event(
config.flush_interval, [this]() { flush(); })),
flush_interval_(config.flush_interval) {
flush_interval_(config.flush_interval),
request_timeout_(config.request_timeout),
shutdown_timeout_(config.shutdown_timeout) {
assert(logger_);
assert(tracer_telemetry_);
if (tracer_telemetry_->enabled()) {
Expand Down Expand Up @@ -185,7 +187,7 @@ DatadogAgent::DatadogAgent(
}

DatadogAgent::~DatadogAgent() {
const auto deadline = clock_().tick + std::chrono::seconds(2);
const auto deadline = clock_().tick + shutdown_timeout_;
cancel_scheduled_flush_();
flush();
if (tracer_telemetry_->enabled()) {
Expand All @@ -208,17 +210,15 @@ Expected<void> DatadogAgent::send(
}

nlohmann::json DatadogAgent::config_json() const {
const auto flush_interval_milliseconds =
std::chrono::duration_cast<std::chrono::milliseconds>(flush_interval_)
.count();

// clang-format off
return nlohmann::json::object({
{"type", "datadog::tracing::DatadogAgent"},
{"config", nlohmann::json::object({
{"traces_url", (traces_endpoint_.scheme + "://" + traces_endpoint_.authority + traces_endpoint_.path)},
{"telemetry_url", (telemetry_endpoint_.scheme + "://" + telemetry_endpoint_.authority + telemetry_endpoint_.path)},
{"flush_interval_milliseconds", flush_interval_milliseconds},
{"flush_interval_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(flush_interval_).count() },
{"request_timeout_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(request_timeout_).count() },
{"shutdown_timeout_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(shutdown_timeout_).count() },
{"http_client", http_client_->config_json()},
{"event_scheduler", event_scheduler_->config_json()},
})},
Expand Down Expand Up @@ -324,9 +324,10 @@ void DatadogAgent::flush() {
};

tracer_telemetry_->metrics().trace_api.requests.inc();
auto post_result = http_client_->post(
traces_endpoint_, std::move(set_request_headers), std::move(body),
std::move(on_response), std::move(on_error));
auto post_result =
http_client_->post(traces_endpoint_, std::move(set_request_headers),
std::move(body), std::move(on_response),
std::move(on_error), clock_().tick + request_timeout_);
if (auto* error = post_result.if_error()) {
logger_->log_error(
error->with_prefix("Unexpected error submitting traces: "));
Expand All @@ -335,9 +336,10 @@ void DatadogAgent::flush() {

void DatadogAgent::send_app_started() {
auto payload = tracer_telemetry_->app_started();
auto post_result = http_client_->post(
telemetry_endpoint_, telemetry_set_request_headers_, std::move(payload),
telemetry_on_response_, telemetry_on_error_);
auto post_result =
http_client_->post(telemetry_endpoint_, telemetry_set_request_headers_,
std::move(payload), telemetry_on_response_,
telemetry_on_error_, clock_().tick + request_timeout_);
if (auto* error = post_result.if_error()) {
logger_->log_error(error->with_prefix(
"Unexpected error submitting telemetry app-started event: "));
Expand All @@ -346,9 +348,10 @@ void DatadogAgent::send_app_started() {

void DatadogAgent::send_heartbeat_and_telemetry() {
auto payload = tracer_telemetry_->heartbeat_and_telemetry();
auto post_result = http_client_->post(
telemetry_endpoint_, telemetry_set_request_headers_, std::move(payload),
telemetry_on_response_, telemetry_on_error_);
auto post_result =
http_client_->post(telemetry_endpoint_, telemetry_set_request_headers_,
std::move(payload), telemetry_on_response_,
telemetry_on_error_, clock_().tick + request_timeout_);
if (auto* error = post_result.if_error()) {
logger_->log_error(error->with_prefix(
"Unexpected error submitting telemetry app-heartbeat event: "));
Expand All @@ -357,9 +360,10 @@ void DatadogAgent::send_heartbeat_and_telemetry() {

void DatadogAgent::send_app_closing() {
auto payload = tracer_telemetry_->app_closing();
auto post_result = http_client_->post(
telemetry_endpoint_, telemetry_set_request_headers_, std::move(payload),
telemetry_on_response_, telemetry_on_error_);
auto post_result =
http_client_->post(telemetry_endpoint_, telemetry_set_request_headers_,
std::move(payload), telemetry_on_response_,
telemetry_on_error_, clock_().tick + request_timeout_);
if (auto* error = post_result.if_error()) {
logger_->log_error(error->with_prefix(
"Unexpected error submitting telemetry app-closing event: "));
Expand Down
Loading