36 #include <curl/curl.h>
50 if(input_file ==
"-") {
65 if( in.available(1) ) {
66 buffer.resize(buffer.capacity());
67 const uint64_t got = in.read(buffer.data(), buffer.capacity());
71 has_more = 1 <= got && !in.fail() && ( !in.has_content_size() || total < in.content_size() );
73 if( !consumer_fn(buffer, !has_more) ) {
76 }
catch (std::exception &e) {
77 ERR_PRINT(
"jau::io::read_stream: Caught exception: %s", e.what());
83 consumer_fn(buffer,
true);
91 if( in.available(1) ) {
92 buffer.resize(buffer.capacity());
93 const uint64_t got = in.read(buffer.data(), buffer.capacity());
104 bool eof[] = {
false,
false };
106 bool eof_read =
false;
107 uint64_t total_send = 0;
108 uint64_t total_read = 0;
114 eof_read = 0 == got || in.fail() || ( in.has_content_size() && total_read >= in.content_size() );
128 bool eof_send =
false;
130 int bidx_next = ( idx + 1 ) % 2;
134 eof_read = 0 == got || in.fail() || ( in.has_content_size() && total_read >= in.content_size() );
139 eof[bidx_next] =
true;
146 total_send += buffer->size();
148 if( !consumer_fn(*buffer, eof_send) ) {
151 }
catch (std::exception &e) {
152 ERR_PRINT(
"jau::io::read_stream: Caught exception: %s", e.what());
160 std::vector<std::string_view> res;
162 const curl_version_info_data* cvid = curl_version_info(CURLVERSION_NOW);
163 if(
nullptr == cvid ||
nullptr == cvid->protocols ) {
166 for(
int i=0;
nullptr != cvid->protocols[i]; ++i) {
167 res.emplace_back( cvid->protocols[i] );
174 if ( scheme.empty() ) {
178 return std::isalnum(c) || c ==
'+' || c ==
'.' || c ==
'-';
180 return pos == scheme.end();
183 std::size_t pos = uri.find(
':');
184 if (pos == std::string_view::npos) {
185 return uri.substr(0, 0);
187 std::string_view scheme = uri.substr(0, pos);
189 return uri.substr(0, 0);
195 const std::string_view scheme =
get_scheme(uri);
196 if( scheme.empty() ) {
200 auto it =
std::find(protos.cbegin(), protos.cend(), scheme);
201 return protos.cend() != it;
205 return 0 == uri.find(
"file://");
209 const std::string_view scheme =
get_scheme(uri);
210 if( scheme.empty() ) {
213 return "https" == scheme ||
"http" == scheme;
220 bool has_content_length;
221 uint64_t content_length;
227static size_t consume_header_curl1(
char *buffer,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
228 curl_glue1_t * cg = (curl_glue1_t*)userdata;
232 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
233 if( CURLE_OK == r ) {
238 DBG_PRINT(
"consume_header_curl1.0 response_code: %ld", v);
242 if( !cg->has_content_length ) {
244 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
245 if( CURLE_OK == r ) {
247 cg->content_length = 0;
248 cg->has_content_length =
false;
250 cg->content_length = v;
251 cg->has_content_length =
true;
255 const size_t realsize = size * nmemb;
258 DBG_PRINT(
"consume_header_curl1.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" )",
259 realsize, cg->total_read, cg->has_content_length, cg->content_length );
260 std::string blob(buffer, realsize);
267static size_t consume_data_curl1(
char *ptr,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
268 curl_glue1_t * cg = (curl_glue1_t*)userdata;
270 if( !cg->has_content_length ) {
272 CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
273 if( CURLE_OK == r ) {
275 cg->content_length = 0;
276 cg->has_content_length =
false;
278 cg->content_length = v;
279 cg->has_content_length =
true;
283 const size_t realsize = size * nmemb;
284 DBG_PRINT(
"consume_data_curl1.0 realsize %zu", realsize);
285 cg->buffer.resize(realsize);
286 memcpy(cg->buffer.data(), ptr, realsize);
288 cg->total_read += realsize;
289 const bool is_final = 0 == realsize ||
290 cg->has_content_length ? cg->total_read >= cg->content_length :
false;
292 DBG_PRINT(
"consume_data_curl1.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), is_final %d",
293 realsize, cg->total_read, cg->has_content_length, cg->content_length, is_final );
296 if( !cg->consumer_fn(cg->buffer, is_final) ) {
299 }
catch (std::exception &e) {
300 ERR_PRINT(
"jau::io::read_url_stream: Caught exception: %s", e.what());
313 std::vector<char> errorbuffer;
314 errorbuffer.reserve(CURL_ERROR_SIZE);
319 DBG_PRINT(
"Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
325 CURL *curl_handle = curl_easy_init();
326 if(
nullptr == curl_handle ) {
327 ERR_PRINT(
"Error setting up url %s, null curl handle", url.c_str());
331 curl_glue1_t cg = { curl_handle,
false, 0, 0, buffer, consumer_fn };
333 res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
334 if( CURLE_OK != res ) {
335 ERR_PRINT(
"Error setting up url %s, error %d %d",
336 url.c_str(), (
int)res, curl_easy_strerror(res));
341 res = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
342 if( CURLE_OK != res ) {
343 ERR_PRINT(
"Error setting up url %s, error %d %d",
344 url.c_str(), (
int)res, errorbuffer.data());
349 res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 0L);
350 if( CURLE_OK != res ) {
351 ERR_PRINT(
"Error setting up url %s, error %d %d",
352 url.c_str(), (
int)res, errorbuffer.data());
357 res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
358 if( CURLE_OK != res ) {
359 ERR_PRINT(
"Error setting up url %s, error %d %d",
360 url.c_str(), (
int)res, errorbuffer.data());
365 res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
366 if( CURLE_OK != res ) {
367 ERR_PRINT(
"Error setting up url %s, error %d %d",
368 url.c_str(), (
int)res, errorbuffer.data());
373 res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
374 if( CURLE_OK != res ) {
375 ERR_PRINT(
"Error setting up url %s, error %d %d",
376 url.c_str(), (
int)res, errorbuffer.data());
381 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, consume_header_curl1);
382 if( CURLE_OK != res ) {
383 ERR_PRINT(
"Error setting up url %s, error %d %d",
384 url.c_str(), (
int)res, errorbuffer.data());
389 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&cg);
390 if( CURLE_OK != res ) {
391 ERR_PRINT(
"Error setting up url %s, error %d %d",
392 url.c_str(), (
int)res, errorbuffer.data());
397 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_data_curl1);
398 if( CURLE_OK != res ) {
399 ERR_PRINT(
"Error setting up url %s, error %d %d",
400 url.c_str(), (
int)res, errorbuffer.data());
405 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (
void*)&cg);
406 if( CURLE_OK != res ) {
407 ERR_PRINT(
"Error setting up url %s, error %d %d",
408 url.c_str(), (
int)res, errorbuffer.data());
413 res = curl_easy_perform(curl_handle);
414 if( CURLE_OK != res ) {
415 IRQ_PRINT(
"processing url %s, error %d %d",
416 url.c_str(), (
int)res, errorbuffer.data());
421 curl_easy_cleanup(curl_handle);
422 return cg.total_read;
425 curl_easy_cleanup(curl_handle);
436 std::unique_lock<std::mutex> lockWrite(m_sync);
443 std::unique_lock<std::mutex> lock(m_sync);
445 while( !m_completed ) {
449 std::cv_status s =
wait_until(m_cv, lock, timeout_time );
450 if( std::cv_status::timeout == s && !m_completed ) {
461 curl_glue2_t(CURL *_curl_handle,
468 : curl_handle(_curl_handle),
469 header_sync(_header_sync),
470 has_content_length(_has_content_length),
471 content_length(_content_length),
472 total_read(_total_read),
485 void interrupt_all() noexcept {
489 void set_end_of_input() noexcept {
495static size_t consume_header_curl2(
char *buffer,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
496 curl_glue2_t * cg = (curl_glue2_t*)userdata;
500 DBG_PRINT(
"consume_header_curl2 ABORT by User: total %" PRIi64
", result %d, rb %s",
501 cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() );
502 cg->set_end_of_input();
508 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
509 if( CURLE_OK == r ) {
513 cg->set_end_of_input();
516 DBG_PRINT(
"consume_header_curl2.0 response_code: %ld", v);
520 if( !cg->has_content_length ) {
522 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
523 if( CURLE_OK == r ) {
525 cg->content_length = v;
526 cg->has_content_length =
true;
530 const size_t realsize = size * nmemb;
532 if( 2 == realsize && 0x0d == buffer[0] && 0x0a == buffer[1] ) {
533 cg->header_sync.notify_complete();
534 DBG_PRINT(
"consume_header_curl2.0 header_completed");
538 DBG_PRINT(
"consume_header_curl2.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), result %d, rb %s",
539 realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), cg->result.load(), cg->buffer.toString().c_str() );
540 std::string blob(buffer, realsize);
548static size_t consume_data_curl2(
char *ptr,
size_t size,
size_t nmemb,
void *userdata)
noexcept {
549 curl_glue2_t * cg = (curl_glue2_t*)userdata;
553 DBG_PRINT(
"consume_data_curl2 ABORT by User: total %" PRIi64
", result %d, rb %s",
554 cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() );
555 cg->set_end_of_input();
559 if( !cg->has_content_length ) {
561 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
562 if( CURLE_OK == r ) {
564 cg->content_length = v;
565 cg->has_content_length =
true;
571 if( !cg->header_sync.completed() ) {
572 cg->header_sync.notify_complete();
575 const size_t realsize = size * nmemb;
576 DBG_PRINT(
"consume_data_curl2.0 realsize %zu, rb %s", realsize, cg->buffer.toString().c_str() );
577 bool timeout_occured;
578 if( !cg->buffer.putBlocking(
reinterpret_cast<uint8_t*
>(ptr),
579 reinterpret_cast<uint8_t*
>(ptr)+realsize, 0_s, timeout_occured) ) {
580 DBG_PRINT(
"consume_data_curl2 Failed put: total %" PRIi64
", result %d, timeout %d, rb %s",
581 cg->total_read.load(), cg->result.load(), timeout_occured, cg->buffer.toString().c_str() );
582 if( timeout_occured ) {
583 cg->set_end_of_input();
588 cg->total_read.fetch_add(realsize);
589 const bool is_final = 0 == realsize ||
590 cg->has_content_length ? cg->total_read >= cg->content_length :
false;
593 cg->set_end_of_input();
597 DBG_PRINT(
"consume_data_curl2.X realsize %zu, total %" PRIu64
" / ( content_len has %d, size %" PRIu64
" ), is_final %d, result %d, rb %s",
598 realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), is_final, cg->result.load(), cg->buffer.toString().c_str() );
604static void read_url_stream_thread(
const char *url, std::unique_ptr<curl_glue2_t> && cg)
noexcept {
605 std::vector<char> errorbuffer;
606 errorbuffer.reserve(CURL_ERROR_SIZE);
610 CURL *curl_handle = curl_easy_init();
611 if(
nullptr == curl_handle ) {
612 ERR_PRINT(
"Error setting up url %s, null curl handle", url);
615 cg->curl_handle = curl_handle;
617 res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
618 if( CURLE_OK != res ) {
619 ERR_PRINT(
"Error setting up url %s, error %d '%s'",
620 url, (
int)res, curl_easy_strerror(res));
625 res = curl_easy_setopt(curl_handle, CURLOPT_URL, url);
626 if( CURLE_OK != res ) {
627 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
628 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
633 res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 0L);
634 if( CURLE_OK != res ) {
635 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
636 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
641 res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
642 if( CURLE_OK != res ) {
643 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
644 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
649 res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
650 if( CURLE_OK != res ) {
651 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
652 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
657 res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
658 if( CURLE_OK != res ) {
659 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
660 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
665 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, consume_header_curl2);
666 if( CURLE_OK != res ) {
667 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
668 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
673 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)cg.get());
674 if( CURLE_OK != res ) {
675 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
676 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
681 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_data_curl2);
682 if( CURLE_OK != res ) {
683 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
684 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
689 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (
void*)cg.get());
690 if( CURLE_OK != res ) {
691 ERR_PRINT(
"Error setting up url %s, error %d '%s' '%s'",
692 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
697 res = curl_easy_perform(curl_handle);
698 if( CURLE_OK != res ) {
701 IRQ_PRINT(
"Error processing url %s, error %d '%s' '%s'",
702 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
705 DBG_PRINT(
"Processing aborted url %s, error %d '%s' '%s'",
706 url, (
int)res, curl_easy_strerror(res), errorbuffer.data());
713 cg->header_sync.notify_complete();
718 cg->set_end_of_input();
721 if(
nullptr != curl_handle ) {
722 curl_easy_cleanup(curl_handle);
737 has_content_length =
false;
748 DBG_PRINT(
"Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
755 std::unique_ptr<curl_glue2_t> cg ( std::make_unique<curl_glue2_t>(
nullptr, header_sync, has_content_length, content_length, total_read, buffer, result ) );
757 return std::make_unique<std::thread>(&::read_url_stream_thread, url.c_str(), std::move(cg));
765 if( out_bytes_total >= 100'000'000 ) {
768 }
else if( out_bytes_total >= 100'000 ) {
776 const uint64_t _rate_bps = std::llround( (
double)out_bytes_total / td.to_double() );
777 const uint64_t _rate_bitps = std::llround( ( (
double)out_bytes_total * 8.0 ) / td.to_double() );
779 if( _rate_bitps >= 100'000'000 ) {
783 }
else if( _rate_bitps >= 100'000 ) {
Class template jau::function is a general-purpose static-polymorphic function wrapper.
File based byte input stream, including named file descriptor.
Abstract byte input stream object.
void interruptReader() noexcept
Interrupt a potentially blocked reader once.
void set_end_of_input(const bool v=true) noexcept
Set End of Input from writer thread, unblocking all read-operations and a potentially currently block...
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
#define DBG_PRINT(...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
#define IRQ_PRINT(...)
Use for unconditional interruption messages, prefix '[elapsed_time] Interrupted @ FILE:LINE FUNC: '.
constexpr InputIt find_if_not(InputIt first, InputIt last, UnaryPredicate q)
Like std::find_if_not() of 'algorithm'.
constexpr InputIt find(InputIt first, InputIt last, const T &value)
Like std::find() of 'algorithm'.
std::string to_string(const endian_t v) noexcept
Return std::string representation of the given endian.
fraction_timespec getMonotonicTime() noexcept
Returns current monotonic time since Unix Epoch 00:00:00 UTC on 1970-01-01.
std::vector< T, jau::callocator_sec< T > > secure_vector
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
uint64_t read_stream(ByteInStream &in, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous byte input stream reader using the given StreamConsumerFunc consumer_fn.
uint64_t read_url_stream(const std::string &url, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given StreamConsumerFunc consumer_fn.
bool is_httpx_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the http or https protocol, i....
std::string_view get_scheme(const std::string_view &uri) noexcept
Returns the valid uri-scheme from given uri, which is empty if no valid scheme is included.
std::vector< std::string_view > supported_protocols() noexcept
Returns a list of supported protocol supported by libcurl network protocols, queried at runtime.
void print_stats(const std::string &prefix, const uint64_t &out_bytes_total, const jau::fraction_i64 &td) noexcept
uint64_t read_file(const std::string &input_file, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous byte input stream reader from given file path using the given StreamConsumerFunc consumer...
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
@ timeout
Input or output operation failed due to timeout.
@ NONE
Operation still in progress.
@ FAILED
Operation failed.
@ SUCCESS
Operation succeeded.
std::string bytesHexString(const void *data, const nsize_t offset, const nsize_t length, const bool lsbFirst, const bool lowerCase=true) noexcept
Produce a hexadecimal string representation of the given byte values.
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
static bool _is_scheme_valid(const std::string_view &scheme) noexcept
static uint64_t _read_buffer(ByteInStream &in, secure_vector< uint8_t > &buffer) noexcept
constexpr const jau::fraction_i64 zero(0l, 1lu)
zero is 0/1
void PLAIN_PRINT(const bool printPrefix, const char *format,...) noexcept
Use for unconditional plain messages, prefix '[elapsed_time] ' if printPrefix == true.
std::cv_status wait_until(std::condition_variable &cv, std::unique_lock< std::mutex > &lock, const fraction_timespec &absolute_time, const bool monotonic=true) noexcept
wait_until causes the current thread to block until the condition variable is notified,...
Timespec structure using int64_t for its components in analogy to struct timespec_t on 64-bit platfor...