27#include <unordered_map>
38 #include <curl/curl.h>
52 if(input_file ==
"-") {
67 if( in.available(1) ) {
68 buffer.resize(buffer.capacity());
69 const uint64_t got = in.read(buffer.data(), buffer.capacity());
73 has_more = 1 <= got && !in.fail() && ( !in.hasContentSize() || total < in.contentSize() );
75 if( !consumer_fn(buffer, !has_more) ) {
78 }
catch (std::exception &e) {
79 ERR_PRINT(
"jau::io::read_stream: Caught exception: %s", e.what());
85 consumer_fn(buffer,
true);
93 if( in.available(1) ) {
94 buffer.resize(buffer.capacity());
95 const uint64_t got = in.read(buffer.data(), buffer.capacity());
106 bool eof[] = {
false,
false };
108 bool eof_read =
false;
109 uint64_t total_send = 0;
110 uint64_t total_read = 0;
116 eof_read = 0 == got || in.fail() || ( in.hasContentSize() && total_read >= in.contentSize() );
130 bool eof_send =
false;
132 int bidx_next = ( idx + 1 ) % 2;
136 eof_read = 0 == got || in.fail() || ( in.hasContentSize() && total_read >= in.contentSize() );
141 eof[bidx_next] =
true;
148 total_send += buffer->size();
150 if( !consumer_fn(*buffer, eof_send) ) {
153 }
catch (std::exception &e) {
154 ERR_PRINT(
"jau::io::read_stream: Caught exception: %s", e.what());
162 std::vector<std::string_view> res;
164 const curl_version_info_data* cvid = curl_version_info(CURLVERSION_NOW);
165 if(
nullptr == cvid ||
nullptr == cvid->protocols ) {
168 for(
int i=0;
nullptr != cvid->protocols[i]; ++i) {
169 res.emplace_back( cvid->protocols[i] );
176 if ( scheme.empty() ) {
179 auto pos = std::find_if_not(scheme.begin(), scheme.end(), [&](
char c) {
180 return std::isalnum(c) || c ==
'+' || c ==
'.' || c ==
'-';
182 return pos == scheme.end();
185 std::size_t pos = uri.find(
':');
186 if (pos == std::string_view::npos) {
187 return uri.substr(0, 0);
189 std::string_view scheme = uri.substr(0, pos);
191 return uri.substr(0, 0);
197 const std::string_view scheme =
get_scheme(uri);
198 if( scheme.empty() ) {
202 auto it = std::find(protos.cbegin(), protos.cend(), scheme);
203 return protos.cend() != it;
207 return uri.starts_with(
"file://");
211 const std::string_view scheme =
get_scheme(uri);
212 if( scheme.empty() ) {
215 return "https" == scheme ||
"http" == scheme;
222 bool has_content_length;
223 uint64_t content_length;
230static size_t consume_header_curl1(
char *buffer,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
231 curl_glue1_t * cg = (curl_glue1_t*)userdata;
235 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
236 if( CURLE_OK == r ) {
237 cg->status_code =
static_cast<int32_t
>(v);
242 DBG_PRINT(
"consume_header_curl1.0 response_code: %ld", v);
246 if( !cg->has_content_length ) {
248 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
249 if( CURLE_OK == r ) {
251 cg->content_length = 0;
252 cg->has_content_length =
false;
254 cg->content_length = v;
255 cg->has_content_length =
true;
259 const size_t realsize = size * nmemb;
262 DBG_PRINT(
"consume_header_curl1.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" )",
263 realsize, cg->total_read, cg->has_content_length, cg->content_length );
264 std::string blob(buffer, realsize);
271static size_t consume_data_curl1(
char *ptr,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
272 curl_glue1_t * cg = (curl_glue1_t*)userdata;
274 if( !cg->has_content_length ) {
276 CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
277 if( CURLE_OK == r ) {
279 cg->content_length = 0;
280 cg->has_content_length =
false;
282 cg->content_length = v;
283 cg->has_content_length =
true;
287 const size_t realsize = size * nmemb;
288 DBG_PRINT(
"consume_data_curl1.0 realsize %zu", realsize);
289 cg->buffer.resize(realsize);
290 memcpy(cg->buffer.data(), ptr, realsize);
292 cg->total_read += realsize;
293 const bool is_final = 0 == realsize ||
294 cg->has_content_length ? cg->total_read >= cg->content_length :
false;
296 DBG_PRINT(
"consume_data_curl1.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), is_final %d",
297 realsize, cg->total_read, cg->has_content_length, cg->content_length, is_final );
300 if( !cg->consumer_fn(cg->buffer, is_final) ) {
303 }
catch (std::exception &e) {
304 ERR_PRINT(
"jau::io::read_url_stream: Caught exception: %s", e.what());
317 std::vector<char> errorbuffer;
318 errorbuffer.reserve(CURL_ERROR_SIZE);
323 DBG_PRINT(
"Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
329 CURL *curl_handle = curl_easy_init();
330 DBG_PRINT(
"CURL: Create own handle %p", curl_handle);
332 curl_glue1_t cg = { .curl_handle=curl_handle, .has_content_length=
false, .content_length=0,
333 .status_code=0, .total_read=0, .buffer=buffer, .consumer_fn=consumer_fn };
335 res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
336 if( CURLE_OK != res ) {
337 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
338 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
343 res = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
344 if( CURLE_OK != res ) {
345 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
346 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
351 res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE,
353 if( CURLE_OK != res ) {
354 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
355 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
360 res = curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
361 if( CURLE_OK != res ) {
362 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
363 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
369 res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
370 if( CURLE_OK != res ) {
371 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
372 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
377 res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
378 if( CURLE_OK != res ) {
379 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
380 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
385 res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
386 if( CURLE_OK != res ) {
387 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
388 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
393 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, consume_header_curl1);
394 if( CURLE_OK != res ) {
395 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
396 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
401 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&cg);
402 if( CURLE_OK != res ) {
403 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
404 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
409 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_data_curl1);
410 if( CURLE_OK != res ) {
411 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
412 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
417 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (
void*)&cg);
418 if( CURLE_OK != res ) {
419 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
420 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
425 res = curl_easy_perform(curl_handle);
426 if( CURLE_OK != res ) {
427 IRQ_PRINT(
"Error processing url %s, error %d '%s' '%s'",
428 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
433 DBG_PRINT(
"CURL: Freeing own handle %p", curl_handle);
434 curl_easy_cleanup(curl_handle);
435 return cg.total_read;
438 DBG_PRINT(
"CURL: Freeing own handle %p", curl_handle);
439 curl_easy_cleanup(curl_handle);
450 std::unique_lock<std::mutex> lockWrite(m_sync);
458 std::unique_lock<std::mutex> lock(m_sync);
460 while( !m_completed ) {
464 std::cv_status s =
wait_until(m_cv, lock, timeout_time );
465 if( std::cv_status::timeout == s && !m_completed ) {
475struct curl_glue2_sync_t {
476 curl_glue2_sync_t(
void *_curl_handle,
481 : curl_handle(reinterpret_cast<CURL*>(_curl_handle)),
482 post_request(
std::move(_post_request)),
485 response(
std::move(_response)),
486 consumer_fn(
std::move(_consumer_fn))
492 int32_t response_code;
496 void interrupt_all() noexcept {
502 void set_end_of_input() noexcept {
510struct curl_glue2_async_t {
511 curl_glue2_async_t(
void *_curl_handle,
516 : curl_handle(reinterpret_cast<CURL*>(_curl_handle)),
517 post_request(
std::
move(_post_request)),
520 response(
std::
move(_response)),
521 consumer_fn(
std::
move(_consumer_fn))
527 int32_t response_code;
531 void interrupt_all() noexcept {
537 void set_end_of_input() noexcept {
545static size_t consume_header_curl2_sync(
char *buffer,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
546 curl_glue2_sync_t * cg = (curl_glue2_sync_t*)userdata;
551 DBG_PRINT(
"consume_header_curl2_sync ABORT by User: total %" PRIu64
", result %s",
553 cg->set_end_of_input();
559 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
560 if( CURLE_OK == r ) {
561 cg->response_code =
static_cast<int32_t
>(v);
565 cg->set_end_of_input();
568 DBG_PRINT(
"consume_header_curl2.0 response_code: %ld", v);
574 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
575 if( CURLE_OK == r ) {
582 const size_t realsize = size * nmemb;
584 if( 2 == realsize && 0x0d == buffer[0] && 0x0a == buffer[1] ) {
586 DBG_PRINT(
"consume_header_curl2.0 header_completed");
590 DBG_PRINT(
"consume_header_curl2.X realsize %" PRIu64
", total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), result %s",
593 std::string blob(buffer, realsize);
601static size_t consume_header_curl2_async(
char *buffer,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
602 curl_glue2_async_t * cg = (curl_glue2_async_t*)userdata;
607 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
608 DBG_PRINT(
"consume_header_curl2 ABORT by User: total %" PRIu64
", result %s, rb %s",
610 cg->set_end_of_input();
616 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
617 if( CURLE_OK == r ) {
618 cg->response_code =
static_cast<int32_t
>(v);
622 cg->set_end_of_input();
625 DBG_PRINT(
"consume_header_curl2.0 response_code: %ld", v);
631 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
632 if( CURLE_OK == r ) {
639 const size_t realsize = size * nmemb;
641 if( 2 == realsize && 0x0d == buffer[0] && 0x0a == buffer[1] ) {
643 DBG_PRINT(
"consume_header_curl2.0 header_completed");
647 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
648 DBG_PRINT(
"consume_header_curl2.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), result %s, rb %s",
651 std::string blob(buffer, realsize);
659static size_t consume_data_curl2_sync(
char *ptr,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
660 curl_glue2_sync_t * cg = (curl_glue2_sync_t*)userdata;
666 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
667 DBG_PRINT(
"consume_data_curl2 ABORT by User: total %" PRIu64
", result %s, rb %s",
669 cg->set_end_of_input();
675 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
676 if( CURLE_OK == r ) {
689 const size_t realsize = size * nmemb;
691 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
692 DBG_PRINT(
"consume_data_curl2.0 realsize %zu, rb %s", realsize, s );
695 bool timeout_occured;
696 if( !cg->buffer->putBlocking(
reinterpret_cast<uint8_t*
>(ptr),
697 reinterpret_cast<uint8_t*
>(ptr)+realsize, 0_s, timeout_occured) ) {
698 DBG_PRINT(
"consume_data_curl2 Failed put: total %" PRIu64
", result %s, timeout %d, rb %s",
700 if( timeout_occured ) {
701 cg->set_end_of_input();
708 const bool is_final = 0 == realsize ||
712 cg->set_end_of_input();
714 if( cg->consumer_fn ) {
716 if( !cg->consumer_fn(*cg->response,
reinterpret_cast<uint8_t*
>(ptr), realsize, is_final) ) {
719 }
catch (std::exception &e) {
720 ERR_PRINT(
"jau::io::read_url_stream: Caught exception: %s", e.what());
726 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
727 DBG_PRINT(
"consume_data_curl2.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), is_final %d, result %s, rb %s",
735static size_t consume_data_curl2_async(
char *ptr,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
736 curl_glue2_async_t * cg = (curl_glue2_async_t*)userdata;
741 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
742 DBG_PRINT(
"consume_data_curl2 ABORT by User: total %" PRIu64
", result %s, rb %s",
744 cg->set_end_of_input();
750 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
751 if( CURLE_OK == r ) {
764 const size_t realsize = size * nmemb;
766 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
767 DBG_PRINT(
"consume_data_curl2.0 realsize %zu, rb %s", realsize, s );
770 bool timeout_occured;
771 if( !cg->buffer->putBlocking(
reinterpret_cast<uint8_t*
>(ptr),
772 reinterpret_cast<uint8_t*
>(ptr)+realsize, 0_s, timeout_occured) ) {
773 DBG_PRINT(
"consume_data_curl2 Failed put: total %" PRIu64
", result %s, timeout %d, rb %s",
775 if( timeout_occured ) {
776 cg->set_end_of_input();
783 const bool is_final = 0 == realsize ||
787 cg->set_end_of_input();
789 if( cg->consumer_fn ) {
791 if( !cg->consumer_fn(*cg->response,
reinterpret_cast<uint8_t*
>(ptr), realsize, is_final) ) {
794 }
catch (std::exception &e) {
795 ERR_PRINT(
"jau::io::read_url_stream: Caught exception: %s", e.what());
801 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
802 DBG_PRINT(
"consume_data_curl2.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), is_final %d, result %s, rb %s",
810static bool read_url_stream_impl(CURL *curl_handle, std::vector<char>& errorbuffer,
813 curl_write_callback header_cb, curl_write_callback write_cb,
void* ctx_data)
noexcept
815 struct curl_slist *header_slist =
nullptr;
818 res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
819 if( CURLE_OK != res ) {
820 ERR_PRINT(
"Error setting up url %s, error %d '%s'",
821 url, (
int)res, curl_easy_strerror(res));
826 res = curl_easy_setopt(curl_handle, CURLOPT_URL, url);
827 if( CURLE_OK != res ) {
828 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
829 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
833 if(
nullptr != post_request ) {
837 if( post.
header.size() > 0 ) {
838 for (
const std::pair<const std::string, std::string>& n : post.
header) {
839 std::string v = n.first+
": "+n.second;
840 header_slist = curl_slist_append(header_slist, v.c_str());
842 curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, header_slist);
843 if( CURLE_OK != res ) {
844 ERR_PRINT(
"Error setting up POST header, error %d '%s' '%s'",
845 (
int)res, curl_easy_strerror(res), errorbuffer.data());
850 res = curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, post.
body.data());
851 if( CURLE_OK != res ) {
852 ERR_PRINT(
"Error setting up POST fields, error %d '%s' '%s'",
853 (
int)res, curl_easy_strerror(res), errorbuffer.data());
858 res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE,
860 if( CURLE_OK != res ) {
861 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
862 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
867 res = curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
868 if( CURLE_OK != res ) {
869 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
870 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
876 res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
877 if( CURLE_OK != res ) {
878 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
879 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
884 res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
885 if( CURLE_OK != res ) {
886 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
887 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
892 res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
893 if( CURLE_OK != res ) {
894 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
895 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
900 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_cb);
901 if( CURLE_OK != res ) {
902 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
903 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
908 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, ctx_data);
909 if( CURLE_OK != res ) {
910 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
911 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
916 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, write_cb);
917 if( CURLE_OK != res ) {
918 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
919 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
924 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, ctx_data);
925 if( CURLE_OK != res ) {
926 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
927 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
932 res = curl_easy_perform(curl_handle);
933 if( CURLE_OK != res ) {
936 IRQ_PRINT(
"Error processing url %s, error %d '%s' '%s'",
937 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
940 DBG_PRINT(
"Processing aborted url %s, error %d '%s' '%s'",
941 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
946 if(
nullptr != header_slist ) {
947 curl_slist_free_all(header_slist);
952 if(
nullptr != header_slist ) {
953 curl_slist_free_all(header_slist);
959 std::vector<char> errorbuffer;
960 errorbuffer.reserve(CURL_ERROR_SIZE);
962 bool owns_curl_handle =
false;
964 if(
nullptr == cg.curl_handle ) {
966 owns_curl_handle =
true;
967 curl_handle = curl_easy_init();
968 if(
nullptr == curl_handle ) {
969 ERR_PRINT(
"Error setting up url %s, null curl handle", url);
972 cg.curl_handle = curl_handle;
973 DBG_PRINT(
"CURL: Created own handle %p", curl_handle);
975 curl_handle = cg.curl_handle;
976 DBG_PRINT(
"CURL: Reusing own handle %p", curl_handle);
979 if( !read_url_stream_impl(curl_handle, errorbuffer,
980 url, cg.post_request.get(), cg.response->result,
981 consume_header_curl2_sync, consume_data_curl2_sync, (
void*)&cg) ) {
986 if( !cg.response->header_resp.completed() ) {
987 cg.response->header_resp.notify_complete();
991 if( cg.consumer_fn ) {
993 cg.consumer_fn(*cg.response,
nullptr, 0,
true);
994 }
catch (std::exception &e) {
995 ERR_PRINT(
"jau::io::read_url_stream: Caught exception: %s", e.what());
1004 cg.set_end_of_input();
1007 if( owns_curl_handle &&
nullptr != curl_handle ) {
1008 DBG_PRINT(
"CURL: Freeing own handle %p", curl_handle);
1009 curl_easy_cleanup(curl_handle);
1010 cg.curl_handle =
nullptr;
1014static void read_url_stream_async(
const char *url, std::unique_ptr<curl_glue2_async_t> && cg)
noexcept {
1015 std::vector<char> errorbuffer;
1016 errorbuffer.reserve(CURL_ERROR_SIZE);
1018 bool owns_curl_handle =
false;
1020 if(
nullptr == cg->curl_handle ) {
1022 owns_curl_handle =
true;
1023 curl_handle = curl_easy_init();
1024 if(
nullptr == curl_handle ) {
1025 ERR_PRINT(
"Error setting up url %s, null curl handle", url);
1028 cg->curl_handle = curl_handle;
1029 DBG_PRINT(
"CURL: Created own handle %p", curl_handle);
1031 curl_handle = cg->curl_handle;
1032 DBG_PRINT(
"CURL: Reusing own handle %p", curl_handle);
1035 if( !read_url_stream_impl(curl_handle, errorbuffer,
1036 url, cg->post_request.get(), cg->response->result,
1037 consume_header_curl2_async, consume_data_curl2_async, (
void*)cg.get()) ) {
1042 if( !cg->response->header_resp.completed() ) {
1043 cg->response->header_resp.notify_complete();
1047 if( cg->consumer_fn ) {
1049 cg->consumer_fn(*cg->response,
nullptr, 0,
true);
1050 }
catch (std::exception &e) {
1051 ERR_PRINT(
"jau::io::read_url_stream: Caught exception: %s", e.what());
1060 cg->set_end_of_input();
1063 if( owns_curl_handle &&
nullptr != curl_handle ) {
1064 DBG_PRINT(
"CURL: Freeing own handle %p", curl_handle);
1065 curl_easy_cleanup(curl_handle);
1066 cg->curl_handle =
nullptr;
1075 CURL* h = ::curl_easy_init();
1076 DBG_PRINT(
"CURL: Created user handle %p", h);
1084 CURL* h =
static_cast<CURL*
>(handle);
1085 if(
nullptr != h ) {
1086 DBG_PRINT(
"CURL: Freeing user handle %p", h);
1087 curl_easy_cleanup(h);
1110 DBG_PRINT(
"Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
1115 curl_glue2_sync_t cg (handle, std::move(httpPostReq), buffer, res, consumer_fn );
1137 DBG_PRINT(
"Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
1143 std::unique_ptr<curl_glue2_async_t> cg ( std::make_unique<curl_glue2_async_t>(handle, std::move(httpPostReq),
1144 buffer, res, consumer_fn ) );
1155 if( out_bytes_total >= 100'000'000 ) {
1158 }
else if( out_bytes_total >= 100'000 ) {
1166 const uint64_t _rate_bps = std::llround( (
double)out_bytes_total / td.to_double() );
1167 const uint64_t _rate_bitps = std::llround( ( (
double)out_bytes_total * 8.0 ) / td.to_double() );
1169 if( _rate_bitps >= 100'000'000 ) {
1173 }
else if( _rate_bitps >= 100'000 ) {
static bool getBooleanProperty(const std::string &name, const bool default_value) noexcept
Returns the boolean value of the environment's variable 'name', or the 'default_value' if the environ...
static environment & get(const std::string &root_prefix_domain="jau") noexcept
Static singleton initialization of this project's environment with the given global root prefix_domai...
File based byte input stream, including named file descriptor.
void interruptReader() noexcept
Interrupt a potentially blocked reader once.
void set_end_of_input(const bool v=true) noexcept
Set End of Input from writer thread, unblocking all read-operations and a potentially currently block...
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
#define DBG_PRINT(fmt,...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
#define IRQ_PRINT(...)
Use for unconditional interruption messages, prefix '[elapsed_time] Interrupted @ FILE:LINE FUNC: '.
#define PLAIN_PRINT(printPrefix, fmt,...)
Use for unconditional plain messages, prefix '[elapsed_time] ' if printPrefix == true.
std::string to_string(const endian_t v) noexcept
Return std::string representation of the given endian.
@ little
Identifier for little endian, equivalent to endian::little.
fraction_timespec getMonotonicTime() noexcept
Returns current monotonic time since Unix Epoch 00:00:00 UTC on 1970-01-01.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
@ std
Denotes a func::std_target_t.
AsyncStreamResponseRef read_url_stream_async(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const AsyncStreamConsumerFunc &consumer_fn) noexcept
Asynchronous URL stream reader using the given AsyncStreamConsumerFunc consumer_fn.
jau::ordered_atomic< io_result_t, std::memory_order_relaxed > relaxed_atomic_io_result_t
SyncStreamResponseRef read_url_stream_sync(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const SyncStreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given SyncStreamConsumerFunc consumer_fn.
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
net_tk_handle create_net_tk_handle() noexcept
creates a reusable handle, free with free_net_tk_handle() after use.
uint64_t read_url_stream(const std::string &url, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given StreamConsumerFunc consumer_fn.
jau::function< bool(secure_vector< uint8_t > &, bool)> StreamConsumerFunc
Stream consumer function.
uint64_t read_stream(ByteStream &in, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous byte input stream reader using the given StreamConsumerFunc consumer_fn.
bool is_httpx_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the http or https protocol, i....
std::string_view get_scheme(const std::string_view &uri) noexcept
Returns the valid uri-scheme from given uri, which is empty if no valid scheme is included.
std::vector< std::string_view > supported_protocols() noexcept
Returns a list of supported protocol supported by libcurl network protocols, queried at runtime.
jau::function< bool(AsyncStreamResponse &, const uint8_t *, size_t, bool)> AsyncStreamConsumerFunc
Asynchronous stream consumer function.
std::string toString(io_result_t v) noexcept
std::shared_ptr< SyncStreamResponse > SyncStreamResponseRef
std::shared_ptr< AsyncStreamResponse > AsyncStreamResponseRef
jau::function< bool(SyncStreamResponse &, const uint8_t *, size_t, bool)> SyncStreamConsumerFunc
Synchronous stream consumer function.
jau::ringbuffer< uint8_t, size_t > ByteRingbuffer
void free_net_tk_handle(net_tk_handle handle) noexcept
frees a handle after use created by create_net_tk_handle()
std::vector< T, jau::callocator_sec< T > > secure_vector
void print_stats(const std::string &prefix, const uint64_t &out_bytes_total, const jau::fraction_i64 &td) noexcept
uint64_t read_file(const std::string &input_file, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous byte input stream reader from given file path using the given StreamConsumerFunc consumer...
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
@ NONE
Operation still in progress.
@ FAILED
Operation failed.
@ SUCCESS
Operation succeeded.
@ timeout
Input or output operation failed due to timeout.
std::string toHexString(const void *data, const nsize_t length, const lb_endian_t byteOrder=lb_endian_t::big, const LoUpCase capitalization=LoUpCase::lower, const PrefixOpt prefix=PrefixOpt::prefix) noexcept
Produce a hexadecimal string representation of the given lsb-first byte values.
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
static uint64_t _read_buffer(ByteStream &in, secure_vector< uint8_t > &buffer) noexcept
static bool _is_scheme_valid(const std::string_view &scheme) noexcept
constexpr const jau::fraction_i64 zero(0l, 1lu)
zero is 0/1
std::unique_ptr< PostRequest > PostRequestPtr
std::cv_status wait_until(std::condition_variable &cv, std::unique_lock< std::mutex > &lock, const fraction_timespec &absolute_time, const bool monotonic=true) noexcept
wait_until causes the current thread to block until the condition variable is notified,...
Timespec structure using int64_t for its components in analogy to struct timespec_t on 64-bit platfor...
Asynchronous stream response.
relaxed_atomic_io_result_t result
tracking io_result_t. If set to other than io_result_t::NONE while streaming, streaming is aborted....
std::thread thread
background reading thread unique-pointer
relaxed_atomic_bool has_content_length
indicating whether content_length is known from server
relaxed_atomic_uint64 total_read
tracking the total_read
relaxed_atomic_uint64 content_length
content_length tracking the content_length
url_header_resp header_resp
synchronized URL header response completion
Synchronous stream response.
uint64_t total_read
tracking the total_read
uint64_t content_length
content_length tracking the content_length
url_header_resp header_resp
synchronized URL header response completion
relaxed_atomic_io_result_t result
tracking io_result_t. If set to other than io_result_t::NONE while streaming, streaming is aborted....
bool has_content_length
indicating whether content_length is known from server
std::unordered_map< std::string, std::string > header
CXX_ALWAYS_INLINE _Tp fetch_add(_Tp __i) noexcept
CXX_ALWAYS_INLINE _Tp load() const noexcept