28#include <unordered_map>
38 #include <curl/curl.h>
52 if(input_file ==
"-") {
67 if( in.available(1) ) {
68 buffer.resize(buffer.capacity());
69 const uint64_t got = in.read(buffer.data(), buffer.capacity());
73 has_more = 1 <= got && !in.fail() && ( !in.has_content_size() || total < in.content_size() );
75 if( !consumer_fn(buffer, !has_more) ) {
78 }
catch (std::exception &e) {
79 ERR_PRINT(
"jau::io::read_stream: Caught exception: %s", e.what());
85 consumer_fn(buffer,
true);
93 if( in.available(1) ) {
94 buffer.resize(buffer.capacity());
95 const uint64_t got = in.read(buffer.data(), buffer.capacity());
106 bool eof[] = {
false,
false };
108 bool eof_read =
false;
109 uint64_t total_send = 0;
110 uint64_t total_read = 0;
116 eof_read = 0 == got || in.fail() || ( in.has_content_size() && total_read >= in.content_size() );
130 bool eof_send =
false;
132 int bidx_next = ( idx + 1 ) % 2;
136 eof_read = 0 == got || in.fail() || ( in.has_content_size() && total_read >= in.content_size() );
141 eof[bidx_next] =
true;
148 total_send += buffer->size();
150 if( !consumer_fn(*buffer, eof_send) ) {
153 }
catch (std::exception &e) {
154 ERR_PRINT(
"jau::io::read_stream: Caught exception: %s", e.what());
162 std::vector<std::string_view> res;
164 const curl_version_info_data* cvid = curl_version_info(CURLVERSION_NOW);
165 if(
nullptr == cvid ||
nullptr == cvid->protocols ) {
168 for(
int i=0;
nullptr != cvid->protocols[i]; ++i) {
169 res.emplace_back( cvid->protocols[i] );
176 if ( scheme.empty() ) {
179 auto pos = std::find_if_not(scheme.begin(), scheme.end(), [&](
char c){
180 return std::isalnum(c) || c ==
'+' || c ==
'.' || c ==
'-';
182 return pos == scheme.end();
185 std::size_t pos = uri.find(
':');
186 if (pos == std::string_view::npos) {
187 return uri.substr(0, 0);
189 std::string_view scheme = uri.substr(0, pos);
191 return uri.substr(0, 0);
197 const std::string_view scheme =
get_scheme(uri);
198 if( scheme.empty() ) {
202 auto it = std::find(protos.cbegin(), protos.cend(), scheme);
203 return protos.cend() != it;
207 return uri.starts_with(
"file://");
211 const std::string_view scheme =
get_scheme(uri);
212 if( scheme.empty() ) {
215 return "https" == scheme ||
"http" == scheme;
222 bool has_content_length;
223 uint64_t content_length;
230static size_t consume_header_curl1(
char *buffer,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
231 curl_glue1_t * cg = (curl_glue1_t*)userdata;
235 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
236 if( CURLE_OK == r ) {
237 cg->status_code =
static_cast<int32_t
>(v);
242 DBG_PRINT(
"consume_header_curl1.0 response_code: %ld", v);
246 if( !cg->has_content_length ) {
248 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
249 if( CURLE_OK == r ) {
251 cg->content_length = 0;
252 cg->has_content_length =
false;
254 cg->content_length = v;
255 cg->has_content_length =
true;
259 const size_t realsize = size * nmemb;
262 DBG_PRINT(
"consume_header_curl1.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" )",
263 realsize, cg->total_read, cg->has_content_length, cg->content_length );
264 std::string blob(buffer, realsize);
271static size_t consume_data_curl1(
char *ptr,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
272 curl_glue1_t * cg = (curl_glue1_t*)userdata;
274 if( !cg->has_content_length ) {
276 CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
277 if( CURLE_OK == r ) {
279 cg->content_length = 0;
280 cg->has_content_length =
false;
282 cg->content_length = v;
283 cg->has_content_length =
true;
287 const size_t realsize = size * nmemb;
288 DBG_PRINT(
"consume_data_curl1.0 realsize %zu", realsize);
289 cg->buffer.resize(realsize);
290 memcpy(cg->buffer.data(), ptr, realsize);
292 cg->total_read += realsize;
293 const bool is_final = 0 == realsize ||
294 cg->has_content_length ? cg->total_read >= cg->content_length :
false;
296 DBG_PRINT(
"consume_data_curl1.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), is_final %d",
297 realsize, cg->total_read, cg->has_content_length, cg->content_length, is_final );
300 if( !cg->consumer_fn(cg->buffer, is_final) ) {
303 }
catch (std::exception &e) {
304 ERR_PRINT(
"jau::io::read_url_stream: Caught exception: %s", e.what());
317 std::vector<char> errorbuffer;
318 errorbuffer.reserve(CURL_ERROR_SIZE);
323 DBG_PRINT(
"Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
329 CURL *curl_handle = curl_easy_init();
330 DBG_PRINT(
"CURL: Create own handle %p", curl_handle);
331 if(
nullptr == curl_handle ) {
332 ERR_PRINT(
"Error setting up url %s, null curl handle", url.c_str());
336 curl_glue1_t cg = { curl_handle,
false, 0, 0, 0, buffer, consumer_fn };
338 res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
339 if( CURLE_OK != res ) {
340 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
341 url.c_str(), (
int)res, curl_easy_strerror(res), errorbuffer.data());
346 res = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
347 if( CURLE_OK != res ) {
348 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
349 url.c_str(), (
int)res, curl_easy_strerror(res), errorbuffer.data());
354 res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE,
356 if( CURLE_OK != res ) {
357 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
358 url.c_str(), (
int)res, curl_easy_strerror(res), errorbuffer.data());
363 res = curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
364 if( CURLE_OK != res ) {
365 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
366 url.c_str(), (
int)res, curl_easy_strerror(res), errorbuffer.data());
372 res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
373 if( CURLE_OK != res ) {
374 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
375 url.c_str(), (
int)res, curl_easy_strerror(res), errorbuffer.data());
380 res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
381 if( CURLE_OK != res ) {
382 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
383 url.c_str(), (
int)res, curl_easy_strerror(res), errorbuffer.data());
388 res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
389 if( CURLE_OK != res ) {
390 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
391 url.c_str(), (
int)res, curl_easy_strerror(res), errorbuffer.data());
396 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, consume_header_curl1);
397 if( CURLE_OK != res ) {
398 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
399 url.c_str(), (
int)res, curl_easy_strerror(res), errorbuffer.data());
404 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&cg);
405 if( CURLE_OK != res ) {
406 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
407 url.c_str(), (
int)res, curl_easy_strerror(res), errorbuffer.data());
412 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_data_curl1);
413 if( CURLE_OK != res ) {
414 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
415 url.c_str(), (
int)res, curl_easy_strerror(res), errorbuffer.data());
420 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (
void*)&cg);
421 if( CURLE_OK != res ) {
422 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
423 url.c_str(), (
int)res, curl_easy_strerror(res), errorbuffer.data());
428 res = curl_easy_perform(curl_handle);
429 if( CURLE_OK != res ) {
430 IRQ_PRINT(
"Error processing url %s, error %d '%s' '%s'",
431 url.c_str(), (
int)res, curl_easy_strerror(res), errorbuffer.data());
436 DBG_PRINT(
"CURL: Freeing own handle %p", curl_handle);
437 curl_easy_cleanup(curl_handle);
438 return cg.total_read;
441 DBG_PRINT(
"CURL: Freeing own handle %p", curl_handle);
442 curl_easy_cleanup(curl_handle);
453 std::unique_lock<std::mutex> lockWrite(m_sync);
461 std::unique_lock<std::mutex> lock(m_sync);
463 while( !m_completed ) {
467 std::cv_status s =
wait_until(m_cv, lock, timeout_time );
468 if( std::cv_status::timeout == s && !m_completed ) {
478struct curl_glue2_sync_t {
479 curl_glue2_sync_t(
void *_curl_handle,
484 : curl_handle(reinterpret_cast<CURL*>(_curl_handle)),
485 post_request(
std::move(_post_request)),
488 response(
std::move(_response)),
489 consumer_fn(
std::move(_consumer_fn))
495 int32_t response_code;
499 void interrupt_all() noexcept {
505 void set_end_of_input() noexcept {
513struct curl_glue2_async_t {
514 curl_glue2_async_t(
void *_curl_handle,
519 : curl_handle(reinterpret_cast<CURL*>(_curl_handle)),
520 post_request(
std::
move(_post_request)),
523 response(
std::
move(_response)),
524 consumer_fn(
std::
move(_consumer_fn))
530 int32_t response_code;
534 void interrupt_all() noexcept {
540 void set_end_of_input() noexcept {
548static size_t consume_header_curl2_sync(
char *buffer,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
549 curl_glue2_sync_t * cg = (curl_glue2_sync_t*)userdata;
554 DBG_PRINT(
"consume_header_curl2_sync ABORT by User: total %" PRIi64
", result %d, rb %s",
556 cg->set_end_of_input();
562 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
563 if( CURLE_OK == r ) {
564 cg->response_code =
static_cast<int32_t
>(v);
568 cg->set_end_of_input();
571 DBG_PRINT(
"consume_header_curl2.0 response_code: %ld", v);
577 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
578 if( CURLE_OK == r ) {
585 const size_t realsize = size * nmemb;
587 if( 2 == realsize && 0x0d == buffer[0] && 0x0a == buffer[1] ) {
589 DBG_PRINT(
"consume_header_curl2.0 header_completed");
593 DBG_PRINT(
"consume_header_curl2.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), result %d",
595 std::string blob(buffer, realsize);
603static size_t consume_header_curl2_async(
char *buffer,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
604 curl_glue2_async_t * cg = (curl_glue2_async_t*)userdata;
609 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
610 DBG_PRINT(
"consume_header_curl2 ABORT by User: total %" PRIi64
", result %d, rb %s",
612 cg->set_end_of_input();
618 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
619 if( CURLE_OK == r ) {
620 cg->response_code =
static_cast<int32_t
>(v);
624 cg->set_end_of_input();
627 DBG_PRINT(
"consume_header_curl2.0 response_code: %ld", v);
633 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
634 if( CURLE_OK == r ) {
641 const size_t realsize = size * nmemb;
643 if( 2 == realsize && 0x0d == buffer[0] && 0x0a == buffer[1] ) {
645 DBG_PRINT(
"consume_header_curl2.0 header_completed");
649 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
650 DBG_PRINT(
"consume_header_curl2.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), result %d, rb %s",
652 std::string blob(buffer, realsize);
660static size_t consume_data_curl2_sync(
char *ptr,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
661 curl_glue2_sync_t * cg = (curl_glue2_sync_t*)userdata;
667 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
668 DBG_PRINT(
"consume_data_curl2 ABORT by User: total %" PRIi64
", result %d, rb %s",
670 cg->set_end_of_input();
676 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
677 if( CURLE_OK == r ) {
690 const size_t realsize = size * nmemb;
692 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
693 DBG_PRINT(
"consume_data_curl2.0 realsize %zu, rb %s", realsize, s.c_str() );
696 bool timeout_occured;
697 if( !cg->buffer->putBlocking(
reinterpret_cast<uint8_t*
>(ptr),
698 reinterpret_cast<uint8_t*
>(ptr)+realsize, 0_s, timeout_occured) ) {
699 DBG_PRINT(
"consume_data_curl2 Failed put: total %" PRIi64
", result %d, timeout %d, rb %s",
701 if( timeout_occured ) {
702 cg->set_end_of_input();
709 const bool is_final = 0 == realsize ||
713 cg->set_end_of_input();
715 if( cg->consumer_fn ) {
717 if( !cg->consumer_fn(*cg->response,
reinterpret_cast<uint8_t*
>(ptr), realsize, is_final) ) {
720 }
catch (std::exception &e) {
721 ERR_PRINT(
"jau::io::read_url_stream: Caught exception: %s", e.what());
727 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
728 DBG_PRINT(
"consume_data_curl2.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), is_final %d, result %d, rb %s",
735static size_t consume_data_curl2_async(
char *ptr,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
736 curl_glue2_async_t * cg = (curl_glue2_async_t*)userdata;
741 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
742 DBG_PRINT(
"consume_data_curl2 ABORT by User: total %" PRIi64
", result %d, rb %s",
744 cg->set_end_of_input();
750 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
751 if( CURLE_OK == r ) {
764 const size_t realsize = size * nmemb;
766 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
767 DBG_PRINT(
"consume_data_curl2.0 realsize %zu, rb %s", realsize, s.c_str() );
770 bool timeout_occured;
771 if( !cg->buffer->putBlocking(
reinterpret_cast<uint8_t*
>(ptr),
772 reinterpret_cast<uint8_t*
>(ptr)+realsize, 0_s, timeout_occured) ) {
773 DBG_PRINT(
"consume_data_curl2 Failed put: total %" PRIi64
", result %d, timeout %d, rb %s",
775 if( timeout_occured ) {
776 cg->set_end_of_input();
783 const bool is_final = 0 == realsize ||
787 cg->set_end_of_input();
789 if( cg->consumer_fn ) {
791 if( !cg->consumer_fn(*cg->response,
reinterpret_cast<uint8_t*
>(ptr), realsize, is_final) ) {
794 }
catch (std::exception &e) {
795 ERR_PRINT(
"jau::io::read_url_stream: Caught exception: %s", e.what());
801 const std::string s = cg->buffer ? cg->buffer->toString() :
"null";
802 DBG_PRINT(
"consume_data_curl2.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), is_final %d, result %d, rb %s",
809static bool read_url_stream_impl(CURL *curl_handle, std::vector<char>& errorbuffer,
812 curl_write_callback header_cb, curl_write_callback write_cb,
void* ctx_data)
noexcept
814 struct curl_slist *header_slist =
nullptr;
817 res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
818 if( CURLE_OK != res ) {
819 ERR_PRINT(
"Error setting up url %s, error %d '%s'",
820 url, (
int)res, curl_easy_strerror(res));
825 res = curl_easy_setopt(curl_handle, CURLOPT_URL, url);
826 if( CURLE_OK != res ) {
827 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
828 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
832 if(
nullptr != post_request ) {
836 if( post.
header.size() > 0 ) {
837 for (
const std::pair<const std::string, std::string>& n : post.
header) {
838 std::string v = n.first+
": "+n.second;
839 header_slist = curl_slist_append(header_slist, v.c_str());
841 curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, header_slist);
842 if( CURLE_OK != res ) {
843 ERR_PRINT(
"Error setting up POST header, error %d '%s' '%s'",
844 (
int)res, curl_easy_strerror(res), errorbuffer.data());
849 res = curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, post.
body.data());
850 if( CURLE_OK != res ) {
851 ERR_PRINT(
"Error setting up POST fields, error %d '%s' '%s'",
852 (
int)res, curl_easy_strerror(res), errorbuffer.data());
857 res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE,
859 if( CURLE_OK != res ) {
860 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
861 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
866 res = curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
867 if( CURLE_OK != res ) {
868 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
869 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
875 res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
876 if( CURLE_OK != res ) {
877 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
878 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
883 res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
884 if( CURLE_OK != res ) {
885 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
886 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
891 res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
892 if( CURLE_OK != res ) {
893 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
894 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
899 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_cb);
900 if( CURLE_OK != res ) {
901 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
902 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
907 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, ctx_data);
908 if( CURLE_OK != res ) {
909 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
910 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
915 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, write_cb);
916 if( CURLE_OK != res ) {
917 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
918 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
923 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, ctx_data);
924 if( CURLE_OK != res ) {
925 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
926 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
931 res = curl_easy_perform(curl_handle);
932 if( CURLE_OK != res ) {
935 IRQ_PRINT(
"Error processing url %s, error %d '%s' '%s'",
936 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
939 DBG_PRINT(
"Processing aborted url %s, error %d '%s' '%s'",
940 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
945 if(
nullptr != header_slist ) {
946 curl_slist_free_all(header_slist);
951 if(
nullptr != header_slist ) {
952 curl_slist_free_all(header_slist);
958 std::vector<char> errorbuffer;
959 errorbuffer.reserve(CURL_ERROR_SIZE);
961 bool owns_curl_handle =
false;
963 if(
nullptr == cg.curl_handle ) {
965 owns_curl_handle =
true;
966 curl_handle = curl_easy_init();
967 if(
nullptr == curl_handle ) {
968 ERR_PRINT(
"Error setting up url %s, null curl handle", url);
971 cg.curl_handle = curl_handle;
972 DBG_PRINT(
"CURL: Created own handle %p", curl_handle);
974 curl_handle = cg.curl_handle;
975 DBG_PRINT(
"CURL: Reusing own handle %p", curl_handle);
978 if( !read_url_stream_impl(curl_handle, errorbuffer,
979 url, cg.post_request.get(), cg.response->result,
980 consume_header_curl2_sync, consume_data_curl2_sync, (
void*)&cg) ) {
985 if( !cg.response->header_resp.completed() ) {
986 cg.response->header_resp.notify_complete();
990 if( cg.consumer_fn ) {
992 cg.consumer_fn(*cg.response,
nullptr, 0,
true);
993 }
catch (std::exception &e) {
994 ERR_PRINT(
"jau::io::read_url_stream: Caught exception: %s", e.what());
1003 cg.set_end_of_input();
1006 if( owns_curl_handle &&
nullptr != curl_handle ) {
1007 DBG_PRINT(
"CURL: Freeing own handle %p", curl_handle);
1008 curl_easy_cleanup(curl_handle);
1009 cg.curl_handle =
nullptr;
1013static void read_url_stream_async(
const char *url, std::unique_ptr<curl_glue2_async_t> && cg)
noexcept {
1014 std::vector<char> errorbuffer;
1015 errorbuffer.reserve(CURL_ERROR_SIZE);
1017 bool owns_curl_handle =
false;
1019 if(
nullptr == cg->curl_handle ) {
1021 owns_curl_handle =
true;
1022 curl_handle = curl_easy_init();
1023 if(
nullptr == curl_handle ) {
1024 ERR_PRINT(
"Error setting up url %s, null curl handle", url);
1027 cg->curl_handle = curl_handle;
1028 DBG_PRINT(
"CURL: Created own handle %p", curl_handle);
1030 curl_handle = cg->curl_handle;
1031 DBG_PRINT(
"CURL: Reusing own handle %p", curl_handle);
1034 if( !read_url_stream_impl(curl_handle, errorbuffer,
1035 url, cg->post_request.get(), cg->response->result,
1036 consume_header_curl2_async, consume_data_curl2_async, (
void*)cg.get()) ) {
1041 if( !cg->response->header_resp.completed() ) {
1042 cg->response->header_resp.notify_complete();
1046 if( cg->consumer_fn ) {
1048 cg->consumer_fn(*cg->response,
nullptr, 0,
true);
1049 }
catch (std::exception &e) {
1050 ERR_PRINT(
"jau::io::read_url_stream: Caught exception: %s", e.what());
1059 cg->set_end_of_input();
1062 if( owns_curl_handle &&
nullptr != curl_handle ) {
1063 DBG_PRINT(
"CURL: Freeing own handle %p", curl_handle);
1064 curl_easy_cleanup(curl_handle);
1065 cg->curl_handle =
nullptr;
1074 CURL* h = ::curl_easy_init();
1075 DBG_PRINT(
"CURL: Created user handle %p", h);
1083 CURL* h =
static_cast<CURL*
>(handle);
1084 if(
nullptr != h ) {
1085 DBG_PRINT(
"CURL: Freeing user handle %p", h);
1086 curl_easy_cleanup(h);
1108 DBG_PRINT(
"Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
1113 curl_glue2_sync_t cg (handle, std::move(httpPostReq), buffer, res, consumer_fn );
1135 DBG_PRINT(
"Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
1141 std::unique_ptr<curl_glue2_async_t> cg ( std::make_unique<curl_glue2_async_t>(handle, std::move(httpPostReq),
1142 buffer, res, consumer_fn ) );
1153 if( out_bytes_total >= 100'000'000 ) {
1156 }
else if( out_bytes_total >= 100'000 ) {
1164 const uint64_t _rate_bps = std::llround( (
double)out_bytes_total / td.to_double() );
1165 const uint64_t _rate_bitps = std::llround( ( (
double)out_bytes_total * 8.0 ) / td.to_double() );
1167 if( _rate_bitps >= 100'000'000 ) {
1171 }
else if( _rate_bitps >= 100'000 ) {
static bool getBooleanProperty(const std::string &name, const bool default_value) noexcept
Returns the boolean value of the environment's variable 'name', or the 'default_value' if the environ...
static environment & get(const std::string &root_prefix_domain="jau") noexcept
Static singleton initialization of this project's environment with the given global root prefix_domai...
File based byte input stream, including named file descriptor.
Abstract byte input stream object.
void interruptReader() noexcept
Interrupt a potentially blocked reader once.
void set_end_of_input(const bool v=true) noexcept
Set End of Input from writer thread, unblocking all read-operations and a potentially currently block...
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
#define DBG_PRINT(...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
#define IRQ_PRINT(...)
Use for unconditional interruption messages, prefix '[elapsed_time] Interrupted @ FILE:LINE FUNC: '.
std::string to_string(const endian_t v) noexcept
Return std::string representation of the given endian.
fraction_timespec getMonotonicTime() noexcept
Returns current monotonic time since Unix Epoch 00:00:00 UTC on 1970-01-01.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
@ std
Denotes a func::std_target_t.
AsyncStreamResponseRef read_url_stream_async(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const AsyncStreamConsumerFunc &consumer_fn) noexcept
Asynchronous URL stream reader using the given AsyncStreamConsumerFunc consumer_fn.
jau::ordered_atomic< io_result_t, std::memory_order_relaxed > relaxed_atomic_io_result_t
SyncStreamResponseRef read_url_stream_sync(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const SyncStreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given SyncStreamConsumerFunc consumer_fn.
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
net_tk_handle create_net_tk_handle() noexcept
creates a reusable handle, free with free_net_tk_handle() after use.
uint64_t read_stream(ByteInStream &in, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous byte input stream reader using the given StreamConsumerFunc consumer_fn.
uint64_t read_url_stream(const std::string &url, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given StreamConsumerFunc consumer_fn.
jau::function< bool(secure_vector< uint8_t > &, bool)> StreamConsumerFunc
Stream consumer function.
bool is_httpx_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the http or https protocol, i....
std::string_view get_scheme(const std::string_view &uri) noexcept
Returns the valid uri-scheme from given uri, which is empty if no valid scheme is included.
std::vector< std::string_view > supported_protocols() noexcept
Returns a list of supported protocol supported by libcurl network protocols, queried at runtime.
jau::function< bool(AsyncStreamResponse &, const uint8_t *, size_t, bool)> AsyncStreamConsumerFunc
Asynchronous stream consumer function.
std::shared_ptr< SyncStreamResponse > SyncStreamResponseRef
std::shared_ptr< AsyncStreamResponse > AsyncStreamResponseRef
jau::function< bool(SyncStreamResponse &, const uint8_t *, size_t, bool)> SyncStreamConsumerFunc
Synchronous stream consumer function.
jau::ringbuffer< uint8_t, size_t > ByteRingbuffer
void free_net_tk_handle(net_tk_handle handle) noexcept
frees a handle after use created by create_net_tk_handle()
std::vector< T, jau::callocator_sec< T > > secure_vector
void print_stats(const std::string &prefix, const uint64_t &out_bytes_total, const jau::fraction_i64 &td) noexcept
uint64_t read_file(const std::string &input_file, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous byte input stream reader from given file path using the given StreamConsumerFunc consumer...
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
@ timeout
Input or output operation failed due to timeout.
@ NONE
Operation still in progress.
@ FAILED
Operation failed.
@ SUCCESS
Operation succeeded.
std::string bytesHexString(const void *data, const nsize_t length, const bool lsbFirst, const bool lowerCase=true, const bool skipLeading0x=false) noexcept
Produce a hexadecimal string representation of the given byte values.
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
static bool _is_scheme_valid(const std::string_view &scheme) noexcept
static uint64_t _read_buffer(ByteInStream &in, secure_vector< uint8_t > &buffer) noexcept
constexpr const jau::fraction_i64 zero(0l, 1lu)
zero is 0/1
std::unique_ptr< PostRequest > PostRequestPtr
void PLAIN_PRINT(const bool printPrefix, const char *format,...) noexcept
Use for unconditional plain messages, prefix '[elapsed_time] ' if printPrefix == true.
std::cv_status wait_until(std::condition_variable &cv, std::unique_lock< std::mutex > &lock, const fraction_timespec &absolute_time, const bool monotonic=true) noexcept
wait_until causes the current thread to block until the condition variable is notified,...
Timespec structure using int64_t for its components in analogy to struct timespec_t on 64-bit platfor...
Asynchronous stream response.
relaxed_atomic_io_result_t result
tracking io_result_t. If set to other than io_result_t::NONE while streaming, streaming is aborted....
std::thread thread
background reading thread unique-pointer
relaxed_atomic_bool has_content_length
indicating whether content_length is known from server
relaxed_atomic_uint64 total_read
tracking the total_read
relaxed_atomic_uint64 content_length
content_length tracking the content_length
url_header_resp header_resp
synchronized URL header response completion
Synchronous stream response.
uint64_t total_read
tracking the total_read
uint64_t content_length
content_length tracking the content_length
url_header_resp header_resp
synchronized URL header response completion
relaxed_atomic_io_result_t result
tracking io_result_t. If set to other than io_result_t::NONE while streaming, streaming is aborted....
bool has_content_length
indicating whether content_length is known from server
std::unordered_map< std::string, std::string > header
CXX_ALWAYS_INLINE _Tp fetch_add(_Tp __i) noexcept
CXX_ALWAYS_INLINE _Tp load() const noexcept