jaulib v1.3.0
Jau Support Library (C++, Java, ..)
io_util.cpp
Go to the documentation of this file.
1/*
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2021-2023 Gothel Software e.K.
4 * Copyright (c) 2021 ZAFENA AB
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25
26#include <chrono>
27
28// #include <botan_all.h>
29
30#include <jau/debug.hpp>
31#include <jau/io_util.hpp>
32#include <jau/byte_stream.hpp>
33#include <jau/string_util.hpp>
34
35#ifdef USE_LIBCURL
36 #include <curl/curl.h>
37#endif
38
39#include <thread>
40#include <pthread.h>
41
42using namespace jau::io;
43using namespace jau::fractions_i64_literals;
44
45
46uint64_t jau::io::read_file(const std::string& input_file,
48 const StreamConsumerFunc& consumer_fn) noexcept
49{
50 if(input_file == "-") {
51 ByteInStream_File in(0); // stdin
52 return read_stream(in, buffer, consumer_fn);
53 } else {
54 ByteInStream_File in(input_file);
55 return read_stream(in, buffer, consumer_fn);
56 }
57}
58
61 const StreamConsumerFunc& consumer_fn) noexcept {
62 uint64_t total = 0;
63 bool has_more;
64 do {
65 if( in.available(1) ) { // at least one byte to stream, also considers eof
66 buffer.resize(buffer.capacity());
67 const uint64_t got = in.read(buffer.data(), buffer.capacity());
68
69 buffer.resize(got);
70 total += got;
71 has_more = 1 <= got && !in.fail() && ( !in.has_content_size() || total < in.content_size() );
72 try {
73 if( !consumer_fn(buffer, !has_more) ) {
74 break; // end streaming
75 }
76 } catch (std::exception &e) {
77 ERR_PRINT("jau::io::read_stream: Caught exception: %s", e.what());
78 break; // end streaming
79 }
80 } else {
81 has_more = false;
82 buffer.resize(0);
83 consumer_fn(buffer, true); // forced final, zero size
84 }
85 } while( has_more );
86 return total;
87}
88
89static uint64_t _read_buffer(ByteInStream& in,
90 secure_vector<uint8_t>& buffer) noexcept {
91 if( in.available(1) ) { // at least one byte to stream, also considers eof
92 buffer.resize(buffer.capacity());
93 const uint64_t got = in.read(buffer.data(), buffer.capacity());
94 buffer.resize(got);
95 return got;
96 }
97 return 0;
98}
99
102 const StreamConsumerFunc& consumer_fn) noexcept {
103 secure_vector<uint8_t>* buffers[] = { &buffer1, &buffer2 };
104 bool eof[] = { false, false };
105
106 bool eof_read = false;
107 uint64_t total_send = 0;
108 uint64_t total_read = 0;
109 int idx = 0;
110 // fill 1st buffer upfront
111 {
112 uint64_t got = _read_buffer(in, *buffers[idx]);
113 total_read += got;
114 eof_read = 0 == got || in.fail() || ( in.has_content_size() && total_read >= in.content_size() );
115 eof[idx] = eof_read;
116 ++idx;
117 }
118
119 // - buffer_idx was filled
120 // - buffer_idx++
121 //
122 // - while !eof_send do
123 // - read buffer_idx if not eof_read,
124 // - set eof[buffer_idx+1]=true if zero bytes
125 // - buffer_idx++
126 // - sent buffer_idx
127 //
128 bool eof_send = false;
129 while( !eof_send ) {
130 int bidx_next = ( idx + 1 ) % 2;
131 if( !eof_read ) {
132 uint64_t got = _read_buffer(in, *buffers[idx]);
133 total_read += got;
134 eof_read = 0 == got || in.fail() || ( in.has_content_size() && total_read >= in.content_size() );
135 eof[idx] = eof_read;
136 if( 0 == got ) {
137 // read-ahead eof propagation if read zero bytes,
138 // hence next consumer_fn() will send last bytes with is_final=true
139 eof[bidx_next] = true;
140 }
141 }
142 idx = bidx_next;
143
144 secure_vector<uint8_t>* buffer = buffers[idx];
145 eof_send = eof[idx];
146 total_send += buffer->size();
147 try {
148 if( !consumer_fn(*buffer, eof_send) ) {
149 return total_send; // end streaming
150 }
151 } catch (std::exception &e) {
152 ERR_PRINT("jau::io::read_stream: Caught exception: %s", e.what());
153 return total_send; // end streaming
154 }
155 }
156 return total_send;
157}
158
159std::vector<std::string_view> jau::io::uri_tk::supported_protocols() noexcept {
160 std::vector<std::string_view> res;
161#ifdef USE_LIBCURL
162 const curl_version_info_data* cvid = curl_version_info(CURLVERSION_NOW);
163 if( nullptr == cvid || nullptr == cvid->protocols ) {
164 return res;
165 }
166 for(int i=0; nullptr != cvid->protocols[i]; ++i) {
167 res.emplace_back( cvid->protocols[i] );
168 }
169#endif // USE_LIBCURL
170 return res;
171}
172
173static bool _is_scheme_valid(const std::string_view& scheme) noexcept {
174 if ( scheme.empty() ) {
175 return false;
176 }
177 auto pos = std::find_if_not(scheme.begin(), scheme.end(), [&](char c){
178 return std::isalnum(c) || c == '+' || c == '.' || c == '-';
179 });
180 return pos == scheme.end();
181}
182std::string_view jau::io::uri_tk::get_scheme(const std::string_view& uri) noexcept {
183 std::size_t pos = uri.find(':');
184 if (pos == std::string_view::npos) {
185 return uri.substr(0, 0);
186 }
187 std::string_view scheme = uri.substr(0, pos);
188 if( !_is_scheme_valid( scheme ) ) {
189 return uri.substr(0, 0);
190 }
191 return scheme;
192}
193
194bool jau::io::uri_tk::protocol_supported(const std::string_view& uri) noexcept {
195 const std::string_view scheme = get_scheme(uri);
196 if( scheme.empty() ) {
197 return false;
198 }
199 const std::vector<std::string_view> protos = supported_protocols();
200 auto it = std::find(protos.cbegin(), protos.cend(), scheme);
201 return protos.cend() != it;
202}
203
204bool jau::io::uri_tk::is_local_file_protocol(const std::string_view& uri) noexcept {
205 return 0 == uri.find("file://");
206}
207
208bool jau::io::uri_tk::is_httpx_protocol(const std::string_view& uri) noexcept {
209 const std::string_view scheme = get_scheme(uri);
210 if( scheme.empty() ) {
211 return false;
212 }
213 return "https" == scheme || "http" == scheme;
214}
215
216#ifdef USE_LIBCURL
217
218struct curl_glue1_t {
219 CURL *curl_handle;
220 bool has_content_length;
221 uint64_t content_length;
222 uint64_t total_read;
224 StreamConsumerFunc consumer_fn;
225};
226
227static size_t consume_header_curl1(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept {
228 curl_glue1_t * cg = (curl_glue1_t*)userdata;
229
230 {
231 long v;
232 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
233 if( CURLE_OK == r ) {
234 if( 400 <= v ) {
235 IRQ_PRINT("response_code: %ld", v);
236 return 0;
237 } else {
238 DBG_PRINT("consume_header_curl1.0 response_code: %ld", v);
239 }
240 }
241 }
242 if( !cg->has_content_length ) {
243 curl_off_t v = 0;
244 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
245 if( CURLE_OK == r ) {
246 if( 0 > v ) { // curl returns -1 if the size if not known
247 cg->content_length = 0;
248 cg->has_content_length = false;
249 } else {
250 cg->content_length = v;
251 cg->has_content_length = true;
252 }
253 }
254 }
255 const size_t realsize = size * nmemb;
256
257 if( false ) {
258 DBG_PRINT("consume_header_curl1.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " )",
259 realsize, cg->total_read, cg->has_content_length, cg->content_length );
260 std::string blob(buffer, realsize);
261 jau::PLAIN_PRINT(true, "%s", blob.c_str());
262 }
263
264 return realsize;
265}
266
267static size_t consume_data_curl1(char *ptr, size_t size, size_t nmemb, void *userdata) noexcept {
268 curl_glue1_t * cg = (curl_glue1_t*)userdata;
269
270 if( !cg->has_content_length ) {
271 curl_off_t v = 0;
272 CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
273 if( CURLE_OK == r ) {
274 if( 0 > v ) { // curl returns -1 if the size if not known
275 cg->content_length = 0;
276 cg->has_content_length = false;
277 } else {
278 cg->content_length = v;
279 cg->has_content_length = true;
280 }
281 }
282 }
283 const size_t realsize = size * nmemb;
284 DBG_PRINT("consume_data_curl1.0 realsize %zu", realsize);
285 cg->buffer.resize(realsize);
286 memcpy(cg->buffer.data(), ptr, realsize);
287
288 cg->total_read += realsize;
289 const bool is_final = 0 == realsize ||
290 cg->has_content_length ? cg->total_read >= cg->content_length : false;
291
292 DBG_PRINT("consume_data_curl1.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d",
293 realsize, cg->total_read, cg->has_content_length, cg->content_length, is_final );
294
295 try {
296 if( !cg->consumer_fn(cg->buffer, is_final) ) {
297 return 0; // end streaming
298 }
299 } catch (std::exception &e) {
300 ERR_PRINT("jau::io::read_url_stream: Caught exception: %s", e.what());
301 return 0; // end streaming
302 }
303
304 return realsize;
305}
306
307#endif // USE_LIBCURL
308
309uint64_t jau::io::read_url_stream(const std::string& url,
311 const StreamConsumerFunc& consumer_fn) noexcept {
312#ifdef USE_LIBCURL
313 std::vector<char> errorbuffer;
314 errorbuffer.reserve(CURL_ERROR_SIZE);
315 CURLcode res;
316
317 if( !uri_tk::protocol_supported(url) ) {
318 const std::string_view scheme = uri_tk::get_scheme(url);
319 DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
320 std::string(scheme).c_str(), jau::to_string(uri_tk::supported_protocols(), ",").c_str());
321 return 0;
322 }
323
324 /* init the curl session */
325 CURL *curl_handle = curl_easy_init();
326 if( nullptr == curl_handle ) {
327 ERR_PRINT("Error setting up url %s, null curl handle", url.c_str());
328 return 0;
329 }
330
331 curl_glue1_t cg = { curl_handle, false, 0, 0, buffer, consumer_fn };
332
333 res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
334 if( CURLE_OK != res ) {
335 ERR_PRINT("Error setting up url %s, error %d %d",
336 url.c_str(), (int)res, curl_easy_strerror(res));
337 goto errout;
338 }
339
340 /* set URL to get here */
341 res = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
342 if( CURLE_OK != res ) {
343 ERR_PRINT("Error setting up url %s, error %d %d",
344 url.c_str(), (int)res, errorbuffer.data());
345 goto errout;
346 }
347
348 /* Switch on full protocol/debug output while testing */
349 res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 0L);
350 if( CURLE_OK != res ) {
351 ERR_PRINT("Error setting up url %s, error %d %d",
352 url.c_str(), (int)res, errorbuffer.data());
353 goto errout;
354 }
355
356 /* disable progress meter, set to 0L to enable it */
357 res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
358 if( CURLE_OK != res ) {
359 ERR_PRINT("Error setting up url %s, error %d %d",
360 url.c_str(), (int)res, errorbuffer.data());
361 goto errout;
362 }
363
364 /* Suppress proxy CONNECT response headers from user callbacks */
365 res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
366 if( CURLE_OK != res ) {
367 ERR_PRINT("Error setting up url %s, error %d %d",
368 url.c_str(), (int)res, errorbuffer.data());
369 goto errout;
370 }
371
372 /* Don't pass headers to the data stream. */
373 res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
374 if( CURLE_OK != res ) {
375 ERR_PRINT("Error setting up url %s, error %d %d",
376 url.c_str(), (int)res, errorbuffer.data());
377 goto errout;
378 }
379
380 /* send header data to this function */
381 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, consume_header_curl1);
382 if( CURLE_OK != res ) {
383 ERR_PRINT("Error setting up url %s, error %d %d",
384 url.c_str(), (int)res, errorbuffer.data());
385 goto errout;
386 }
387
388 /* set userdata for consume_header_curl2 */
389 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (void*)&cg);
390 if( CURLE_OK != res ) {
391 ERR_PRINT("Error setting up url %s, error %d %d",
392 url.c_str(), (int)res, errorbuffer.data());
393 goto errout;
394 }
395
396 /* send all data to this function */
397 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_data_curl1);
398 if( CURLE_OK != res ) {
399 ERR_PRINT("Error setting up url %s, error %d %d",
400 url.c_str(), (int)res, errorbuffer.data());
401 goto errout;
402 }
403
404 /* write the page body to this file handle */
405 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void*)&cg);
406 if( CURLE_OK != res ) {
407 ERR_PRINT("Error setting up url %s, error %d %d",
408 url.c_str(), (int)res, errorbuffer.data());
409 goto errout;
410 }
411
412 /* performs the tast, blocking! */
413 res = curl_easy_perform(curl_handle);
414 if( CURLE_OK != res ) {
415 IRQ_PRINT("processing url %s, error %d %d",
416 url.c_str(), (int)res, errorbuffer.data());
417 goto errout;
418 }
419
420 /* cleanup curl stuff */
421 curl_easy_cleanup(curl_handle);
422 return cg.total_read;
423
424errout:
425 curl_easy_cleanup(curl_handle);
426#else // USE_LIBCURL
427 (void) url;
428 (void) buffer;
429 (void) consumer_fn;
430#endif // USE_LIBCURL
431 return 0;
432}
433
435 {
436 std::unique_lock<std::mutex> lockWrite(m_sync);
437 m_completed = true;
438 }
439 m_cv.notify_all(); // have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
440}
441
443 std::unique_lock<std::mutex> lock(m_sync);
445 while( !m_completed ) {
447 m_cv.wait(lock);
448 } else {
449 std::cv_status s = wait_until(m_cv, lock, timeout_time );
450 if( std::cv_status::timeout == s && !m_completed ) {
451 return false;
452 }
453 }
454 }
455 return m_completed;
456}
457
458#ifdef USE_LIBCURL
459
460struct curl_glue2_t {
461 curl_glue2_t(CURL *_curl_handle,
462 jau::io::url_header_sync& _header_sync,
463 jau::relaxed_atomic_bool& _has_content_length,
464 jau::relaxed_atomic_uint64& _content_length,
465 jau::relaxed_atomic_uint64& _total_read,
466 ByteRingbuffer& _buffer,
468 : curl_handle(_curl_handle),
469 header_sync(_header_sync),
470 has_content_length(_has_content_length),
471 content_length(_content_length),
472 total_read(_total_read),
473 buffer(_buffer),
474 result(_result)
475 {}
476
477 CURL *curl_handle;
478 jau::io::url_header_sync& header_sync;
479 jau::relaxed_atomic_bool& has_content_length;
480 jau::relaxed_atomic_uint64& content_length;
481 jau::relaxed_atomic_uint64& total_read;
482 ByteRingbuffer& buffer;
484
485 void interrupt_all() noexcept {
486 buffer.interruptReader();
487 header_sync.notify_complete();
488 }
489 void set_end_of_input() noexcept {
490 buffer.set_end_of_input(true);
491 header_sync.notify_complete();
492 }
493};
494
495static size_t consume_header_curl2(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept {
496 curl_glue2_t * cg = (curl_glue2_t*)userdata;
497
498 if( async_io_result_t::NONE != cg->result ) {
499 // user abort!
500 DBG_PRINT("consume_header_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s",
501 cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() );
502 cg->set_end_of_input();
503 return 0;
504 }
505
506 {
507 long v;
508 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
509 if( CURLE_OK == r ) {
510 if( 400 <= v ) {
511 IRQ_PRINT("response_code: %ld", v);
512 cg->result = async_io_result_t::FAILED;
513 cg->set_end_of_input();
514 return 0;
515 } else {
516 DBG_PRINT("consume_header_curl2.0 response_code: %ld", v);
517 }
518 }
519 }
520 if( !cg->has_content_length ) {
521 curl_off_t v = 0;
522 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
523 if( CURLE_OK == r ) {
524 if( 0 <= v ) { // curl returns -1 if the size is not known
525 cg->content_length = v;
526 cg->has_content_length = true;
527 }
528 }
529 }
530 const size_t realsize = size * nmemb;
531
532 if( 2 == realsize && 0x0d == buffer[0] && 0x0a == buffer[1] ) {
533 cg->header_sync.notify_complete();
534 DBG_PRINT("consume_header_curl2.0 header_completed");
535 }
536
537 if( false ) {
538 DBG_PRINT("consume_header_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), result %d, rb %s",
539 realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), cg->result.load(), cg->buffer.toString().c_str() );
540 std::string blob(buffer, realsize);
541 jau::PLAIN_PRINT(true, "%s", jau::bytesHexString((uint8_t*)buffer, 0, realsize, true /* lsbFirst */).c_str());
542 jau::PLAIN_PRINT(true, "%s", blob.c_str());
543 }
544
545 return realsize;
546}
547
548static size_t consume_data_curl2(char *ptr, size_t size, size_t nmemb, void *userdata) noexcept {
549 curl_glue2_t * cg = (curl_glue2_t*)userdata;
550
551 if( async_io_result_t::NONE != cg->result ) {
552 // user abort!
553 DBG_PRINT("consume_data_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s",
554 cg->total_read.load(), cg->result.load(), cg->buffer.toString().c_str() );
555 cg->set_end_of_input();
556 return 0;
557 }
558
559 if( !cg->has_content_length ) {
560 curl_off_t v = 0;
561 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
562 if( CURLE_OK == r ) {
563 if( 0 <= v ) { // curl returns -1 if the size if not known
564 cg->content_length = v;
565 cg->has_content_length = true;
566 }
567 }
568 }
569
570 // Ensure header completion is being sent
571 if( !cg->header_sync.completed() ) {
572 cg->header_sync.notify_complete();
573 }
574
575 const size_t realsize = size * nmemb;
576 DBG_PRINT("consume_data_curl2.0 realsize %zu, rb %s", realsize, cg->buffer.toString().c_str() );
577 bool timeout_occured;
578 if( !cg->buffer.putBlocking(reinterpret_cast<uint8_t*>(ptr),
579 reinterpret_cast<uint8_t*>(ptr)+realsize, 0_s, timeout_occured) ) {
580 DBG_PRINT("consume_data_curl2 Failed put: total %" PRIi64 ", result %d, timeout %d, rb %s",
581 cg->total_read.load(), cg->result.load(), timeout_occured, cg->buffer.toString().c_str() );
582 if( timeout_occured ) {
583 cg->set_end_of_input();
584 }
585 return 0;
586 }
587
588 cg->total_read.fetch_add(realsize);
589 const bool is_final = 0 == realsize ||
590 cg->has_content_length ? cg->total_read >= cg->content_length : false;
591 if( is_final ) {
592 cg->result = async_io_result_t::SUCCESS;
593 cg->set_end_of_input();
594 }
595
596 if( false ) {
597 DBG_PRINT("consume_data_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d, result %d, rb %s",
598 realsize, cg->total_read.load(), cg->has_content_length.load(), cg->content_length.load(), is_final, cg->result.load(), cg->buffer.toString().c_str() );
599 }
600
601 return realsize;
602}
603
604static void read_url_stream_thread(const char *url, std::unique_ptr<curl_glue2_t> && cg) noexcept {
605 std::vector<char> errorbuffer;
606 errorbuffer.reserve(CURL_ERROR_SIZE);
607 CURLcode res;
608
609 /* init the curl session */
610 CURL *curl_handle = curl_easy_init();
611 if( nullptr == curl_handle ) {
612 ERR_PRINT("Error setting up url %s, null curl handle", url);
613 goto errout;
614 }
615 cg->curl_handle = curl_handle;
616
617 res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
618 if( CURLE_OK != res ) {
619 ERR_PRINT("Error setting up url %s, error %d '%s'",
620 url, (int)res, curl_easy_strerror(res));
621 goto errout;
622 }
623
624 /* set URL to get here */
625 res = curl_easy_setopt(curl_handle, CURLOPT_URL, url);
626 if( CURLE_OK != res ) {
627 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
628 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
629 goto errout;
630 }
631
632 /* Switch on full protocol/debug output while testing */
633 res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 0L);
634 if( CURLE_OK != res ) {
635 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
636 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
637 goto errout;
638 }
639
640 /* disable progress meter, set to 0L to enable it */
641 res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
642 if( CURLE_OK != res ) {
643 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
644 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
645 goto errout;
646 }
647
648 /* Suppress proxy CONNECT response headers from user callbacks */
649 res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
650 if( CURLE_OK != res ) {
651 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
652 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
653 goto errout;
654 }
655
656 /* Don't pass headers to the data stream. */
657 res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
658 if( CURLE_OK != res ) {
659 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
660 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
661 goto errout;
662 }
663
664 /* send header data to this function */
665 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, consume_header_curl2);
666 if( CURLE_OK != res ) {
667 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
668 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
669 goto errout;
670 }
671
672 /* set userdata for consume_header_curl2 */
673 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (void*)cg.get());
674 if( CURLE_OK != res ) {
675 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
676 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
677 goto errout;
678 }
679
680 /* send received data to this function */
681 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_data_curl2);
682 if( CURLE_OK != res ) {
683 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
684 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
685 goto errout;
686 }
687
688 /* set userdata for consume_data_curl2 */
689 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void*)cg.get());
690 if( CURLE_OK != res ) {
691 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
692 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
693 goto errout;
694 }
695
696 /* performs the tast, blocking! */
697 res = curl_easy_perform(curl_handle);
698 if( CURLE_OK != res ) {
699 if( async_io_result_t::NONE == cg->result ) {
700 // Error during normal processing
701 IRQ_PRINT("Error processing url %s, error %d '%s' '%s'",
702 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
703 } else {
704 // User aborted or response code error detected via consume_header_curl2
705 DBG_PRINT("Processing aborted url %s, error %d '%s' '%s'",
706 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
707 }
708 goto errout;
709 }
710
711 /* cleanup curl stuff */
712 cg->result = async_io_result_t::SUCCESS;
713 cg->header_sync.notify_complete();
714 goto cleanup;
715
716errout:
717 cg->result = async_io_result_t::FAILED;
718 cg->set_end_of_input();
719
720cleanup:
721 if( nullptr != curl_handle ) {
722 curl_easy_cleanup(curl_handle);
723 }
724 return;
725}
726
727#endif // USE_LIBCURL
728
729std::unique_ptr<std::thread> jau::io::read_url_stream(const std::string& url,
730 ByteRingbuffer& buffer,
731 jau::io::url_header_sync& header_sync,
732 jau::relaxed_atomic_bool& has_content_length,
733 jau::relaxed_atomic_uint64& content_length,
734 jau::relaxed_atomic_uint64& total_read,
735 relaxed_atomic_async_io_result_t& result) noexcept {
736 /* init user referenced values */
737 has_content_length = false;
738 content_length = 0;
739 total_read = 0;
740
741#ifdef USE_LIBCURL
742 if( !uri_tk::protocol_supported(url) ) {
743#endif // USE_LIBCURL
745 header_sync.notify_complete();
746 buffer.set_end_of_input(true);
747 const std::string_view scheme = uri_tk::get_scheme(url);
748 DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
749 std::string(scheme).c_str(), jau::to_string(uri_tk::supported_protocols(), ",").c_str());
750 return nullptr;
751#ifdef USE_LIBCURL
752 }
754
755 std::unique_ptr<curl_glue2_t> cg ( std::make_unique<curl_glue2_t>(nullptr, header_sync, has_content_length, content_length, total_read, buffer, result ) );
756
757 return std::make_unique<std::thread>(&::read_url_stream_thread, url.c_str(), std::move(cg)); // @suppress("Invalid arguments")
758#endif // USE_LIBCURL
759}
760
761void jau::io::print_stats(const std::string& prefix, const uint64_t& out_bytes_total, const jau::fraction_i64& td) noexcept {
762 jau::PLAIN_PRINT(true, "%s: Duration %s s, %s ms", prefix.c_str(),
763 td.to_string().c_str(), jau::to_decstring(td.to_ms()).c_str());
764
765 if( out_bytes_total >= 100'000'000 ) {
766 jau::PLAIN_PRINT(true, "%s: Size %s MB", prefix.c_str(),
767 jau::to_decstring(std::llround((double)out_bytes_total/1'000'000.0)).c_str());
768 } else if( out_bytes_total >= 100'000 ) {
769 jau::PLAIN_PRINT(true, "%s: Size %s KB", prefix.c_str(),
770 jau::to_decstring(std::llround((double)out_bytes_total/1'000.0)).c_str());
771 } else {
772 jau::PLAIN_PRINT(true, "%s: Size %s B", prefix.c_str(),
773 jau::to_decstring(out_bytes_total).c_str());
774 }
775
776 const uint64_t _rate_bps = std::llround( (double)out_bytes_total / td.to_double() ); // bytes per second
777 const uint64_t _rate_bitps = std::llround( ( (double)out_bytes_total * 8.0 ) / td.to_double() ); // bits per second
778
779 if( _rate_bitps >= 100'000'000 ) {
780 jau::PLAIN_PRINT(true, "%s: Bitrate %s Mbit/s, %s MB/s", prefix.c_str(),
781 jau::to_decstring(std::llround((double)_rate_bitps/1'000'000.0)).c_str(),
782 jau::to_decstring(std::llround((double)_rate_bps/1'000'000.0)).c_str());
783 } else if( _rate_bitps >= 100'000 ) {
784 jau::PLAIN_PRINT(true, "%s: Bitrate %s kbit/s, %s kB/s", prefix.c_str(),
785 jau::to_decstring(std::llround((double)_rate_bitps/1'000.0)).c_str(),
786 jau::to_decstring(std::llround((double)_rate_bps/1'000.0)).c_str());
787 } else {
788 jau::PLAIN_PRINT(true, "%s: Bitrate %s bit/s, %s B/s", prefix.c_str(),
789 jau::to_decstring(_rate_bitps).c_str(),
790 jau::to_decstring(_rate_bps).c_str());
791 }
792}
793
Class template jau::function is a general-purpose static-polymorphic function wrapper.
File based byte input stream, including named file descriptor.
Abstract byte input stream object.
Synchronization for URL header completion as used by asynchronous read_url_stream().
Definition: io_util.hpp:177
bool wait_until_completion(const jau::fraction_i64 &timeout) noexcept
Wait until completed() has been reached.
Definition: io_util.cpp:442
void notify_complete() noexcept
Notify completion, see completed()
Definition: io_util.cpp:434
void interruptReader() noexcept
Interrupt a potentially blocked reader once.
void set_end_of_input(const bool v=true) noexcept
Set End of Input from writer thread, unblocking all read-operations and a potentially currently block...
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
Definition: debug.hpp:109
#define DBG_PRINT(...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
Definition: debug.hpp:52
#define IRQ_PRINT(...)
Use for unconditional interruption messages, prefix '[elapsed_time] Interrupted @ FILE:LINE FUNC: '.
Definition: debug.hpp:115
constexpr InputIt find_if_not(InputIt first, InputIt last, UnaryPredicate q)
Like std::find_if_not() of 'algorithm'.
constexpr InputIt find(InputIt first, InputIt last, const T &value)
Like std::find() of 'algorithm'.
std::string to_string(const endian_t v) noexcept
Return std::string representation of the given endian.
fraction_timespec getMonotonicTime() noexcept
Returns current monotonic time since Unix Epoch 00:00:00 UTC on 1970-01-01.
Definition: basic_types.cpp:52
std::vector< T, jau::callocator_sec< T > > secure_vector
Definition: io_util.hpp:46
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
Definition: io_util.cpp:204
uint64_t read_stream(ByteInStream &in, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous byte input stream reader using the given StreamConsumerFunc consumer_fn.
Definition: io_util.cpp:59
uint64_t read_url_stream(const std::string &url, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given StreamConsumerFunc consumer_fn.
Definition: io_util.cpp:309
bool is_httpx_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the http or https protocol, i....
Definition: io_util.cpp:208
std::string_view get_scheme(const std::string_view &uri) noexcept
Returns the valid uri-scheme from given uri, which is empty if no valid scheme is included.
Definition: io_util.cpp:182
std::vector< std::string_view > supported_protocols() noexcept
Returns a list of supported protocol supported by libcurl network protocols, queried at runtime.
Definition: io_util.cpp:159
void print_stats(const std::string &prefix, const uint64_t &out_bytes_total, const jau::fraction_i64 &td) noexcept
Definition: io_util.cpp:761
uint64_t read_file(const std::string &input_file, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous byte input stream reader from given file path using the given StreamConsumerFunc consumer...
Definition: io_util.cpp:46
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
Definition: io_util.cpp:194
@ timeout
Input or output operation failed due to timeout.
@ NONE
Operation still in progress.
@ FAILED
Operation failed.
@ SUCCESS
Operation succeeded.
std::string bytesHexString(const void *data, const nsize_t offset, const nsize_t length, const bool lsbFirst, const bool lowerCase=true) noexcept
Produce a hexadecimal string representation of the given byte values.
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
static bool _is_scheme_valid(const std::string_view &scheme) noexcept
Definition: io_util.cpp:173
static uint64_t _read_buffer(ByteInStream &in, secure_vector< uint8_t > &buffer) noexcept
Definition: io_util.cpp:89
constexpr const jau::fraction_i64 zero(0l, 1lu)
zero is 0/1
void PLAIN_PRINT(const bool printPrefix, const char *format,...) noexcept
Use for unconditional plain messages, prefix '[elapsed_time] ' if printPrefix == true.
Definition: debug.cpp:258
std::cv_status wait_until(std::condition_variable &cv, std::unique_lock< std::mutex > &lock, const fraction_timespec &absolute_time, const bool monotonic=true) noexcept
wait_until causes the current thread to block until the condition variable is notified,...
Timespec structure using int64_t for its components in analogy to struct timespec_t on 64-bit platfor...