jaulib v1.4.1-14-g15926ba
Jau Support Library (C++, Java, ..)
Loading...
Searching...
No Matches
io_util.cpp
Go to the documentation of this file.
1/*
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2021-2023 Gothel Software e.K.
4 * Copyright (c) 2021 ZAFENA AB
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25
26#include <memory>
27#include <unordered_map>
28
29// #include <botan_all.h>
30
31#include <jau/debug.hpp>
32#include <jau/io/io_util.hpp>
34#include <jau/string_util.hpp>
35#include <jau/type_concepts.hpp>
36
37#ifdef USE_LIBCURL
38 #include <curl/curl.h>
39#endif
40
41#include <thread>
42#include <pthread.h>
43
44using namespace jau::io;
45using namespace jau::fractions_i64_literals;
46
47
48uint64_t jau::io::read_file(const std::string& input_file,
50 const StreamConsumerFunc& consumer_fn) noexcept
51{
52 if(input_file == "-") {
54 return read_stream(in, buffer, consumer_fn);
55 } else {
57 return read_stream(in, buffer, consumer_fn);
58 }
59}
60
63 const StreamConsumerFunc& consumer_fn) noexcept {
64 uint64_t total = 0;
65 bool has_more;
66 do {
67 if( in.available(1) ) { // at least one byte to stream, also considers eof
68 buffer.resize(buffer.capacity());
69 const uint64_t got = in.read(buffer.data(), buffer.capacity());
70
71 buffer.resize(got);
72 total += got;
73 has_more = 1 <= got && !in.fail() && ( !in.hasContentSize() || total < in.contentSize() );
74 try {
75 if( !consumer_fn(buffer, !has_more) ) {
76 break; // end streaming
77 }
78 } catch (std::exception &e) {
79 ERR_PRINT("jau::io::read_stream: Caught exception: %s", e.what());
80 break; // end streaming
81 }
82 } else {
83 has_more = false;
84 buffer.resize(0);
85 consumer_fn(buffer, true); // forced final, zero size
86 }
87 } while( has_more );
88 return total;
89}
90
91static uint64_t _read_buffer(ByteStream& in,
92 secure_vector<uint8_t>& buffer) noexcept {
93 if( in.available(1) ) { // at least one byte to stream, also considers eof
94 buffer.resize(buffer.capacity());
95 const uint64_t got = in.read(buffer.data(), buffer.capacity());
96 buffer.resize(got);
97 return got;
98 }
99 return 0;
100}
101
104 const StreamConsumerFunc& consumer_fn) noexcept {
105 secure_vector<uint8_t>* buffers[] = { &buffer1, &buffer2 };
106 bool eof[] = { false, false };
107
108 bool eof_read = false;
109 uint64_t total_send = 0;
110 uint64_t total_read = 0;
111 int idx = 0;
112 // fill 1st buffer upfront
113 {
114 uint64_t got = _read_buffer(in, *buffers[idx]);
115 total_read += got;
116 eof_read = 0 == got || in.fail() || ( in.hasContentSize() && total_read >= in.contentSize() );
117 eof[idx] = eof_read;
118 ++idx;
119 }
120
121 // - buffer_idx was filled
122 // - buffer_idx++
123 //
124 // - while !eof_send do
125 // - read buffer_idx if not eof_read,
126 // - set eof[buffer_idx+1]=true if zero bytes
127 // - buffer_idx++
128 // - sent buffer_idx
129 //
130 bool eof_send = false;
131 while( !eof_send ) {
132 int bidx_next = ( idx + 1 ) % 2;
133 if( !eof_read ) {
134 uint64_t got = _read_buffer(in, *buffers[idx]);
135 total_read += got;
136 eof_read = 0 == got || in.fail() || ( in.hasContentSize() && total_read >= in.contentSize() );
137 eof[idx] = eof_read;
138 if( 0 == got ) {
139 // read-ahead eof propagation if read zero bytes,
140 // hence next consumer_fn() will send last bytes with is_final=true
141 eof[bidx_next] = true;
142 }
143 }
144 idx = bidx_next;
145
146 secure_vector<uint8_t>* buffer = buffers[idx];
147 eof_send = eof[idx];
148 total_send += buffer->size();
149 try {
150 if( !consumer_fn(*buffer, eof_send) ) {
151 return total_send; // end streaming
152 }
153 } catch (std::exception &e) {
154 ERR_PRINT("jau::io::read_stream: Caught exception: %s", e.what());
155 return total_send; // end streaming
156 }
157 }
158 return total_send;
159}
160
161std::vector<std::string_view> jau::io::uri_tk::supported_protocols() noexcept {
162 std::vector<std::string_view> res;
163#ifdef USE_LIBCURL
164 const curl_version_info_data* cvid = curl_version_info(CURLVERSION_NOW);
165 if( nullptr == cvid || nullptr == cvid->protocols ) {
166 return res;
167 }
168 for(int i=0; nullptr != cvid->protocols[i]; ++i) {
169 res.emplace_back( cvid->protocols[i] );
170 }
171#endif // USE_LIBCURL
172 return res;
173}
174
175static bool _is_scheme_valid(const std::string_view& scheme) noexcept {
176 if ( scheme.empty() ) {
177 return false;
178 }
179 auto pos = std::find_if_not(scheme.begin(), scheme.end(), [&](char c) { // NOLINT(modernize-use-ranges)
180 return std::isalnum(c) || c == '+' || c == '.' || c == '-';
181 });
182 return pos == scheme.end();
183}
184std::string_view jau::io::uri_tk::get_scheme(const std::string_view& uri) noexcept {
185 std::size_t pos = uri.find(':');
186 if (pos == std::string_view::npos) {
187 return uri.substr(0, 0);
188 }
189 std::string_view scheme = uri.substr(0, pos);
190 if( !_is_scheme_valid( scheme ) ) {
191 return uri.substr(0, 0);
192 }
193 return scheme;
194}
195
196bool jau::io::uri_tk::protocol_supported(const std::string_view& uri) noexcept {
197 const std::string_view scheme = get_scheme(uri);
198 if( scheme.empty() ) {
199 return false;
200 }
201 const std::vector<std::string_view> protos = supported_protocols();
202 auto it = std::find(protos.cbegin(), protos.cend(), scheme); // NOLINT(modernize-use-ranges)
203 return protos.cend() != it;
204}
205
206bool jau::io::uri_tk::is_local_file_protocol(const std::string_view& uri) noexcept {
207 return uri.starts_with("file://");
208}
209
210bool jau::io::uri_tk::is_httpx_protocol(const std::string_view& uri) noexcept {
211 const std::string_view scheme = get_scheme(uri);
212 if( scheme.empty() ) {
213 return false;
214 }
215 return "https" == scheme || "http" == scheme;
216}
217
218#ifdef USE_LIBCURL
219
220struct curl_glue1_t {
221 CURL *curl_handle;
222 bool has_content_length;
223 uint64_t content_length;
224 int32_t status_code;
225 uint64_t total_read;
227 StreamConsumerFunc consumer_fn;
228};
229
230static size_t consume_header_curl1(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept {
231 curl_glue1_t * cg = (curl_glue1_t*)userdata;
232
233 {
234 long v;
235 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
236 if( CURLE_OK == r ) {
237 cg->status_code = static_cast<int32_t>(v);
238 if( 400 <= v ) {
239 IRQ_PRINT("response_code: %ld", v);
240 return 0;
241 } else {
242 DBG_PRINT("consume_header_curl1.0 response_code: %ld", v);
243 }
244 }
245 }
246 if( !cg->has_content_length ) {
247 curl_off_t v = 0;
248 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
249 if( CURLE_OK == r ) {
250 if( 0 > v ) { // curl returns -1 if the size if not known
251 cg->content_length = 0;
252 cg->has_content_length = false;
253 } else {
254 cg->content_length = v;
255 cg->has_content_length = true;
256 }
257 }
258 }
259 const size_t realsize = size * nmemb;
260
261 if( false ) {
262 DBG_PRINT("consume_header_curl1.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " )",
263 realsize, cg->total_read, cg->has_content_length, cg->content_length );
264 std::string blob(buffer, realsize);
265 PLAIN_PRINT(true, "%s", blob);
266 }
267
268 return realsize;
269}
270
271static size_t consume_data_curl1(char *ptr, size_t size, size_t nmemb, void *userdata) noexcept {
272 curl_glue1_t * cg = (curl_glue1_t*)userdata;
273
274 if( !cg->has_content_length ) {
275 curl_off_t v = 0;
276 CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
277 if( CURLE_OK == r ) {
278 if( 0 > v ) { // curl returns -1 if the size if not known
279 cg->content_length = 0;
280 cg->has_content_length = false;
281 } else {
282 cg->content_length = v;
283 cg->has_content_length = true;
284 }
285 }
286 }
287 const size_t realsize = size * nmemb;
288 DBG_PRINT("consume_data_curl1.0 realsize %zu", realsize);
289 cg->buffer.resize(realsize);
290 memcpy(cg->buffer.data(), ptr, realsize);
291
292 cg->total_read += realsize;
293 const bool is_final = 0 == realsize ||
294 cg->has_content_length ? cg->total_read >= cg->content_length : false;
295
296 DBG_PRINT("consume_data_curl1.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d",
297 realsize, cg->total_read, cg->has_content_length, cg->content_length, is_final );
298
299 try {
300 if( !cg->consumer_fn(cg->buffer, is_final) ) {
301 return 0; // end streaming
302 }
303 } catch (std::exception &e) {
304 ERR_PRINT("jau::io::read_url_stream: Caught exception: %s", e.what());
305 return 0; // end streaming
306 }
307
308 return realsize;
309}
310
311#endif // USE_LIBCURL
312
313uint64_t jau::io::read_url_stream(const std::string& url,
315 const StreamConsumerFunc& consumer_fn) noexcept {
316#ifdef USE_LIBCURL
317 std::vector<char> errorbuffer;
318 errorbuffer.reserve(CURL_ERROR_SIZE);
319 CURLcode res;
320
321 if( !uri_tk::protocol_supported(url) ) {
322 const std::string_view scheme = uri_tk::get_scheme(url);
323 DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
324 std::string(scheme), jau::to_string(uri_tk::supported_protocols(), ","));
325 return 0;
326 }
327
328 /* init the curl session */
329 CURL *curl_handle = curl_easy_init();
330 DBG_PRINT("CURL: Create own handle %p", curl_handle);
331
332 curl_glue1_t cg = { .curl_handle=curl_handle, .has_content_length=false, .content_length=0,
333 .status_code=0, .total_read=0, .buffer=buffer, .consumer_fn=consumer_fn };
334
335 res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
336 if( CURLE_OK != res ) {
337 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
338 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
339 goto errout;
340 }
341
342 /* set URL to get here */
343 res = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
344 if( CURLE_OK != res ) {
345 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
346 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
347 goto errout;
348 }
349
350 /* Switch on full protocol/debug output while testing */
351 res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE,
352 jau::environment::getBooleanProperty("jau_io_net_verbose", false) ? 1L : 0L );
353 if( CURLE_OK != res ) {
354 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
355 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
356 goto errout;
357 }
358
359 if( !jau::environment::getBooleanProperty("jau_io_net_ssl_verifypeer", true) ) {
360 res = curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
361 if( CURLE_OK != res ) {
362 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
363 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
364 goto errout;
365 }
366 }
367
368 /* disable progress meter, set to 0L to enable it */
369 res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
370 if( CURLE_OK != res ) {
371 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
372 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
373 goto errout;
374 }
375
376 /* Suppress proxy CONNECT response headers from user callbacks */
377 res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
378 if( CURLE_OK != res ) {
379 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
380 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
381 goto errout;
382 }
383
384 /* Don't pass headers to the data stream. */
385 res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
386 if( CURLE_OK != res ) {
387 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
388 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
389 goto errout;
390 }
391
392 /* send header data to this function */
393 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, consume_header_curl1);
394 if( CURLE_OK != res ) {
395 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
396 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
397 goto errout;
398 }
399
400 /* set userdata for consume_header_curl2 */
401 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (void*)&cg);
402 if( CURLE_OK != res ) {
403 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
404 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
405 goto errout;
406 }
407
408 /* send all data to this function */
409 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_data_curl1);
410 if( CURLE_OK != res ) {
411 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
412 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
413 goto errout;
414 }
415
416 /* write the page body to this file handle */
417 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void*)&cg);
418 if( CURLE_OK != res ) {
419 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
420 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
421 goto errout;
422 }
423
424 /* performs the tast, blocking! */
425 res = curl_easy_perform(curl_handle);
426 if( CURLE_OK != res ) {
427 IRQ_PRINT("Error processing url %s, error %d '%s' '%s'",
428 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
429 goto errout;
430 }
431
432 /* cleanup curl stuff */
433 DBG_PRINT("CURL: Freeing own handle %p", curl_handle);
434 curl_easy_cleanup(curl_handle);
435 return cg.total_read;
436
437errout:
438 DBG_PRINT("CURL: Freeing own handle %p", curl_handle);
439 curl_easy_cleanup(curl_handle);
440#else // USE_LIBCURL
441 (void) url;
442 (void) buffer;
443 (void) consumer_fn;
444#endif // USE_LIBCURL
445 return 0;
446}
447
449 {
450 std::unique_lock<std::mutex> lockWrite(m_sync);
451 m_completed = true;
452 m_response_code = response_code;
453 }
454 m_cv.notify_all(); // have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
455}
456
458 std::unique_lock<std::mutex> lock(m_sync);
460 while( !m_completed ) {
462 m_cv.wait(lock);
463 } else {
464 std::cv_status s = wait_until(m_cv, lock, timeout_time );
465 if( std::cv_status::timeout == s && !m_completed ) {
466 return false;
467 }
468 }
469 }
470 return m_completed;
471}
472
473#ifdef USE_LIBCURL
474
475struct curl_glue2_sync_t {
476 curl_glue2_sync_t(void *_curl_handle,
477 http::PostRequestPtr _post_request,
478 ByteRingbuffer *_buffer,
479 SyncStreamResponseRef _response,
480 SyncStreamConsumerFunc _consumer_fn)
481 : curl_handle(reinterpret_cast<CURL*>(_curl_handle)),
482 post_request(std::move(_post_request)),
483 buffer(_buffer),
484 response_code(0),
485 response(std::move(_response)),
486 consumer_fn(std::move(_consumer_fn))
487 {}
488
489 CURL *curl_handle;
490 http::PostRequestPtr post_request;
491 ByteRingbuffer *buffer;
492 int32_t response_code;
493 SyncStreamResponseRef response;
494 SyncStreamConsumerFunc consumer_fn;
495
496 void interrupt_all() noexcept {
497 if( buffer ) {
498 buffer->interruptReader();
499 }
500 response->header_resp.notify_complete(response_code);
501 }
502 void set_end_of_input() noexcept {
503 if( buffer ) {
504 buffer->set_end_of_input(true);
505 }
506 response->header_resp.notify_complete(response_code);
507 }
508};
509
510struct curl_glue2_async_t {
511 curl_glue2_async_t(void *_curl_handle,
512 http::PostRequestPtr _post_request,
513 ByteRingbuffer *_buffer,
514 AsyncStreamResponseRef _response,
515 AsyncStreamConsumerFunc _consumer_fn)
516 : curl_handle(reinterpret_cast<CURL*>(_curl_handle)),
517 post_request(std::move(_post_request)),
518 buffer(_buffer),
519 response_code(0),
520 response(std::move(_response)),
521 consumer_fn(std::move(_consumer_fn))
522 {}
523
524 CURL *curl_handle;
525 http::PostRequestPtr post_request;
526 ByteRingbuffer *buffer;
527 int32_t response_code;
528 AsyncStreamResponseRef response;
529 AsyncStreamConsumerFunc consumer_fn;
530
531 void interrupt_all() noexcept {
532 if( buffer ) {
533 buffer->interruptReader();
534 }
535 response->header_resp.notify_complete(response_code);
536 }
537 void set_end_of_input() noexcept {
538 if( buffer ) {
539 buffer->set_end_of_input(true);
540 }
541 response->header_resp.notify_complete(response_code);
542 }
543};
544
545static size_t consume_header_curl2_sync(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept {
546 curl_glue2_sync_t * cg = (curl_glue2_sync_t*)userdata;
547 SyncStreamResponse& response = *cg->response;
548
549 if( io_result_t::NONE != response.result ) {
550 // user abort!
551 DBG_PRINT("consume_header_curl2_sync ABORT by User: total %" PRIu64 ", result %s",
552 response.total_read, jau::io::toString(response.result) );
553 cg->set_end_of_input();
554 return 0;
555 }
556
557 {
558 long v;
559 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
560 if( CURLE_OK == r ) {
561 cg->response_code = static_cast<int32_t>(v);
562 if( 400 <= v ) {
563 IRQ_PRINT("response_code: %ld", v);
564 response.result = io_result_t::FAILED;
565 cg->set_end_of_input();
566 return 0;
567 } else {
568 DBG_PRINT("consume_header_curl2.0 response_code: %ld", v);
569 }
570 }
571 }
572 if( !response.has_content_length ) {
573 curl_off_t v = 0;
574 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
575 if( CURLE_OK == r ) {
576 if( 0 <= v ) { // curl returns -1 if the size is not known
577 response.content_length = v;
578 response.has_content_length = true;
579 }
580 }
581 }
582 const size_t realsize = size * nmemb;
583
584 if( 2 == realsize && 0x0d == buffer[0] && 0x0a == buffer[1] ) {
585 response.header_resp.notify_complete(cg->response_code);
586 DBG_PRINT("consume_header_curl2.0 header_completed");
587 }
588
589 if( false ) {
590 DBG_PRINT("consume_header_curl2.X realsize %" PRIu64 ", total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), result %s",
591 realsize, response.total_read, response.has_content_length, response.content_length,
592 jau::io::toString(response.result) );
593 std::string blob(buffer, realsize);
594 PLAIN_PRINT(true, "%s", jau::toHexString((uint8_t*)buffer, realsize, jau::lb_endian_t::little));
595 PLAIN_PRINT(true, "%s", blob);
596 }
597
598 return realsize;
599}
600
601static size_t consume_header_curl2_async(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept {
602 curl_glue2_async_t * cg = (curl_glue2_async_t*)userdata;
603 AsyncStreamResponse& response = *cg->response;
604
605 if( io_result_t::NONE != response.result ) {
606 // user abort!
607 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
608 DBG_PRINT("consume_header_curl2 ABORT by User: total %" PRIu64 ", result %s, rb %s",
609 response.total_read.load(), jau::io::toString(response.result), s );
610 cg->set_end_of_input();
611 return 0;
612 }
613
614 {
615 long v;
616 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
617 if( CURLE_OK == r ) {
618 cg->response_code = static_cast<int32_t>(v);
619 if( 400 <= v ) {
620 IRQ_PRINT("response_code: %ld", v);
621 response.result = io_result_t::FAILED;
622 cg->set_end_of_input();
623 return 0;
624 } else {
625 DBG_PRINT("consume_header_curl2.0 response_code: %ld", v);
626 }
627 }
628 }
629 if( !response.has_content_length ) {
630 curl_off_t v = 0;
631 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
632 if( CURLE_OK == r ) {
633 if( 0 <= v ) { // curl returns -1 if the size is not known
634 response.content_length = v;
635 response.has_content_length = true;
636 }
637 }
638 }
639 const size_t realsize = size * nmemb;
640
641 if( 2 == realsize && 0x0d == buffer[0] && 0x0a == buffer[1] ) {
642 response.header_resp.notify_complete(cg->response_code);
643 DBG_PRINT("consume_header_curl2.0 header_completed");
644 }
645
646 if( false ) {
647 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
648 DBG_PRINT("consume_header_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), result %s, rb %s",
649 realsize, response.total_read.load(), response.has_content_length.load(), response.content_length.load(),
650 jau::io::toString(response.result), s );
651 std::string blob(buffer, realsize);
652 PLAIN_PRINT(true, "%s", jau::toHexString((uint8_t*)buffer, realsize, jau::lb_endian_t::little));
653 PLAIN_PRINT(true, "%s", blob);
654 }
655
656 return realsize;
657}
658
659static size_t consume_data_curl2_sync(char *ptr, size_t size, size_t nmemb, void *userdata) noexcept {
660 curl_glue2_sync_t * cg = (curl_glue2_sync_t*)userdata;
661 SyncStreamResponse& response = *cg->response;
662
663 if( io_result_t::NONE != response.result ) {
664 // user abort!
665 // user abort!
666 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
667 DBG_PRINT("consume_data_curl2 ABORT by User: total %" PRIu64 ", result %s, rb %s",
668 response.total_read, jau::io::toString(response.result), s );
669 cg->set_end_of_input();
670 return 0;
671 }
672
673 if( !response.has_content_length ) {
674 curl_off_t v = 0;
675 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
676 if( CURLE_OK == r ) {
677 if( 0 <= v ) { // curl returns -1 if the size if not known
678 response.content_length = v;
679 response.has_content_length = true;
680 }
681 }
682 }
683
684 // Ensure header completion is being sent
685 if( !response.header_resp.completed() ) {
686 response.header_resp.notify_complete();
687 }
688
689 const size_t realsize = size * nmemb;
690 if( jau::environment::get().debug ) {
691 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
692 DBG_PRINT("consume_data_curl2.0 realsize %zu, rb %s", realsize, s );
693 }
694 if( cg->buffer ) {
695 bool timeout_occured;
696 if( !cg->buffer->putBlocking(reinterpret_cast<uint8_t*>(ptr),
697 reinterpret_cast<uint8_t*>(ptr)+realsize, 0_s, timeout_occured) ) {
698 DBG_PRINT("consume_data_curl2 Failed put: total %" PRIu64 ", result %s, timeout %d, rb %s",
699 response.total_read, jau::io::toString(response.result), timeout_occured, cg->buffer->toString() );
700 if( timeout_occured ) {
701 cg->set_end_of_input();
702 }
703 return 0;
704 }
705 }
706
707 response.total_read += realsize;
708 const bool is_final = 0 == realsize ||
709 response.has_content_length ? response.total_read >= response.content_length : false;
710 if( is_final ) {
711 response.result = io_result_t::SUCCESS;
712 cg->set_end_of_input();
713 }
714 if( cg->consumer_fn ) {
715 try {
716 if( !cg->consumer_fn(*cg->response, reinterpret_cast<uint8_t*>(ptr), realsize, is_final) ) {
717 return 0; // end streaming
718 }
719 } catch (std::exception &e) {
720 ERR_PRINT("jau::io::read_url_stream: Caught exception: %s", e.what());
721 return 0; // end streaming
722 }
723 }
724
725 if( jau::environment::get().debug ) {
726 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
727 DBG_PRINT("consume_data_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d, result %s, rb %s",
728 realsize, response.total_read, response.has_content_length, response.content_length, is_final,
729 jau::io::toString(response.result), s );
730 }
731
732 return realsize;
733}
734
735static size_t consume_data_curl2_async(char *ptr, size_t size, size_t nmemb, void *userdata) noexcept {
736 curl_glue2_async_t * cg = (curl_glue2_async_t*)userdata;
737 AsyncStreamResponse& response = *cg->response;
738
739 if( io_result_t::NONE != response.result ) {
740 // user abort!
741 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
742 DBG_PRINT("consume_data_curl2 ABORT by User: total %" PRIu64 ", result %s, rb %s",
743 response.total_read.load(), jau::io::toString(response.result), s );
744 cg->set_end_of_input();
745 return 0;
746 }
747
748 if( !response.has_content_length ) {
749 curl_off_t v = 0;
750 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
751 if( CURLE_OK == r ) {
752 if( 0 <= v ) { // curl returns -1 if the size if not known
753 response.content_length = v;
754 response.has_content_length = true;
755 }
756 }
757 }
758
759 // Ensure header completion is being sent
760 if( !response.header_resp.completed() ) {
761 response.header_resp.notify_complete();
762 }
763
764 const size_t realsize = size * nmemb;
765 if( jau::environment::get().debug ) {
766 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
767 DBG_PRINT("consume_data_curl2.0 realsize %zu, rb %s", realsize, s );
768 }
769 if( cg->buffer ) {
770 bool timeout_occured;
771 if( !cg->buffer->putBlocking(reinterpret_cast<uint8_t*>(ptr),
772 reinterpret_cast<uint8_t*>(ptr)+realsize, 0_s, timeout_occured) ) {
773 DBG_PRINT("consume_data_curl2 Failed put: total %" PRIu64 ", result %s, timeout %d, rb %s",
774 response.total_read.load(), jau::io::toString(response.result), timeout_occured, cg->buffer->toString() );
775 if( timeout_occured ) {
776 cg->set_end_of_input();
777 }
778 return 0;
779 }
780 }
781
782 response.total_read.fetch_add(realsize);
783 const bool is_final = 0 == realsize ||
784 response.has_content_length ? response.total_read >= response.content_length : false;
785 if( is_final ) {
786 response.result = io_result_t::SUCCESS;
787 cg->set_end_of_input();
788 }
789 if( cg->consumer_fn ) {
790 try {
791 if( !cg->consumer_fn(*cg->response, reinterpret_cast<uint8_t*>(ptr), realsize, is_final) ) {
792 return 0; // end streaming
793 }
794 } catch (std::exception &e) {
795 ERR_PRINT("jau::io::read_url_stream: Caught exception: %s", e.what());
796 return 0; // end streaming
797 }
798 }
799
800 if( jau::environment::get().debug ) {
801 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
802 DBG_PRINT("consume_data_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d, result %s, rb %s",
803 realsize, response.total_read.load(), response.has_content_length.load(), response.content_length.load(), is_final,
804 jau::io::toString(response.result), s );
805 }
806
807 return realsize;
808}
809
810static bool read_url_stream_impl(CURL *curl_handle, std::vector<char>& errorbuffer,
811 const char *url, http::PostRequest *post_request,
813 curl_write_callback header_cb, curl_write_callback write_cb, void* ctx_data) noexcept
814{
815 struct curl_slist *header_slist = nullptr;
816 CURLcode res;
817
818 res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
819 if( CURLE_OK != res ) {
820 ERR_PRINT("Error setting up url %s, error %d '%s'",
821 url, (int)res, curl_easy_strerror(res));
822 goto errout;
823 }
824
825 /* set URL to get here */
826 res = curl_easy_setopt(curl_handle, CURLOPT_URL, url);
827 if( CURLE_OK != res ) {
828 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
829 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
830 goto errout;
831 }
832
833 if( nullptr != post_request ) {
834 http::PostRequest &post = *post_request;
835 // slist1 = curl_slist_append(slist1, "Content-Type: application/json");
836 // slist1 = curl_slist_append(slist1, "Accept: application/json");
837 if( post.header.size() > 0 ) {
838 for (const std::pair<const std::string, std::string>& n : post.header) {
839 std::string v = n.first+": "+n.second;
840 header_slist = curl_slist_append(header_slist, v.c_str());
841 }
842 curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, header_slist);
843 if( CURLE_OK != res ) {
844 ERR_PRINT("Error setting up POST header, error %d '%s' '%s'",
845 (int)res, curl_easy_strerror(res), errorbuffer.data());
846 goto errout;
847 }
848 }
849
850 res = curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, post.body.data());
851 if( CURLE_OK != res ) {
852 ERR_PRINT("Error setting up POST fields, error %d '%s' '%s'",
853 (int)res, curl_easy_strerror(res), errorbuffer.data());
854 goto errout;
855 }
856 }
857 /* Switch on full protocol/debug output while testing */
858 res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE,
859 jau::environment::getBooleanProperty("jau_io_net_verbose", false) ? 1L : 0L );
860 if( CURLE_OK != res ) {
861 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
862 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
863 goto errout;
864 }
865
866 if( !jau::environment::getBooleanProperty("jau_io_net_ssl_verifypeer", true) ) {
867 res = curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
868 if( CURLE_OK != res ) {
869 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
870 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
871 goto errout;
872 }
873 }
874
875 /* disable progress meter, set to 0L to enable it */
876 res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
877 if( CURLE_OK != res ) {
878 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
879 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
880 goto errout;
881 }
882
883 /* Suppress proxy CONNECT response headers from user callbacks */
884 res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
885 if( CURLE_OK != res ) {
886 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
887 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
888 goto errout;
889 }
890
891 /* Don't pass headers to the data stream. */
892 res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
893 if( CURLE_OK != res ) {
894 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
895 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
896 goto errout;
897 }
898
899 /* send header data to this function */
900 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_cb);
901 if( CURLE_OK != res ) {
902 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
903 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
904 goto errout;
905 }
906
907 /* set userdata for consume_header_curl2 */
908 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, ctx_data);
909 if( CURLE_OK != res ) {
910 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
911 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
912 goto errout;
913 }
914
915 /* send received data to this function */
916 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, write_cb);
917 if( CURLE_OK != res ) {
918 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
919 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
920 goto errout;
921 }
922
923 /* set userdata for consume_data_curl2 */
924 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, ctx_data);
925 if( CURLE_OK != res ) {
926 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
927 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
928 goto errout;
929 }
930
931 /* performs the tast, blocking! */
932 res = curl_easy_perform(curl_handle);
933 if( CURLE_OK != res ) {
934 if( io_result_t::NONE == result ) {
935 // Error during normal processing
936 IRQ_PRINT("Error processing url %s, error %d '%s' '%s'",
937 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
938 } else {
939 // User aborted or response code error detected via consume_header_curl2
940 DBG_PRINT("Processing aborted url %s, error %d '%s' '%s'",
941 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
942 }
943 goto errout;
944 }
945
946 if( nullptr != header_slist ) {
947 curl_slist_free_all(header_slist);
948 }
949 return true;
950
951errout:
952 if( nullptr != header_slist ) {
953 curl_slist_free_all(header_slist);
954 }
955 return false;
956}
957
958static void read_url_stream_sync(const char *url, curl_glue2_sync_t& cg) noexcept {
959 std::vector<char> errorbuffer;
960 errorbuffer.reserve(CURL_ERROR_SIZE);
961
962 bool owns_curl_handle = false;
963 CURL *curl_handle;
964 if( nullptr == cg.curl_handle ) {
965 /* init the curl session */
966 owns_curl_handle = true;
967 curl_handle = curl_easy_init();
968 if( nullptr == curl_handle ) {
969 ERR_PRINT("Error setting up url %s, null curl handle", url);
970 goto errout;
971 }
972 cg.curl_handle = curl_handle;
973 DBG_PRINT("CURL: Created own handle %p", curl_handle);
974 } else {
975 curl_handle = cg.curl_handle;
976 DBG_PRINT("CURL: Reusing own handle %p", curl_handle);
977 }
978
979 if( !read_url_stream_impl(curl_handle, errorbuffer,
980 url, cg.post_request.get(), cg.response->result,
981 consume_header_curl2_sync, consume_data_curl2_sync, (void*)&cg) ) {
982 goto errout;
983 }
984
985 // Ensure header completion is being sent
986 if( !cg.response->header_resp.completed() ) {
987 cg.response->header_resp.notify_complete();
988 }
989 if( cg.response->result != io_result_t::SUCCESS ) {
990 cg.response->result = io_result_t::SUCCESS;
991 if( cg.consumer_fn ) {
992 try {
993 cg.consumer_fn(*cg.response, nullptr, 0, true);
994 } catch (std::exception &e) {
995 ERR_PRINT("jau::io::read_url_stream: Caught exception: %s", e.what());
996 goto errout;
997 }
998 }
999 }
1000 goto cleanup;
1001
1002errout:
1003 cg.response->result = io_result_t::FAILED;
1004 cg.set_end_of_input();
1005
1006cleanup:
1007 if( owns_curl_handle && nullptr != curl_handle ) {
1008 DBG_PRINT("CURL: Freeing own handle %p", curl_handle);
1009 curl_easy_cleanup(curl_handle);
1010 cg.curl_handle = nullptr;
1011 }
1012}
1013
1014static void read_url_stream_async(const char *url, std::unique_ptr<curl_glue2_async_t> && cg) noexcept {
1015 std::vector<char> errorbuffer;
1016 errorbuffer.reserve(CURL_ERROR_SIZE);
1017
1018 bool owns_curl_handle = false;
1019 CURL *curl_handle;
1020 if( nullptr == cg->curl_handle ) {
1021 /* init the curl session */
1022 owns_curl_handle = true;
1023 curl_handle = curl_easy_init();
1024 if( nullptr == curl_handle ) {
1025 ERR_PRINT("Error setting up url %s, null curl handle", url);
1026 goto errout;
1027 }
1028 cg->curl_handle = curl_handle;
1029 DBG_PRINT("CURL: Created own handle %p", curl_handle);
1030 } else {
1031 curl_handle = cg->curl_handle;
1032 DBG_PRINT("CURL: Reusing own handle %p", curl_handle);
1033 }
1034
1035 if( !read_url_stream_impl(curl_handle, errorbuffer,
1036 url, cg->post_request.get(), cg->response->result,
1037 consume_header_curl2_async, consume_data_curl2_async, (void*)cg.get()) ) {
1038 goto errout;
1039 }
1040
1041 // Ensure header completion is being sent
1042 if( !cg->response->header_resp.completed() ) {
1043 cg->response->header_resp.notify_complete();
1044 }
1045 if( cg->response->result != io_result_t::SUCCESS ) {
1046 cg->response->result = io_result_t::SUCCESS;
1047 if( cg->consumer_fn ) {
1048 try {
1049 cg->consumer_fn(*cg->response, nullptr, 0, true);
1050 } catch (std::exception &e) {
1051 ERR_PRINT("jau::io::read_url_stream: Caught exception: %s", e.what());
1052 goto errout;
1053 }
1054 }
1055 }
1056 goto cleanup;
1057
1058errout:
1059 cg->response->result = io_result_t::FAILED;
1060 cg->set_end_of_input();
1061
1062cleanup:
1063 if( owns_curl_handle && nullptr != curl_handle ) {
1064 DBG_PRINT("CURL: Freeing own handle %p", curl_handle);
1065 curl_easy_cleanup(curl_handle);
1066 cg->curl_handle = nullptr;
1067 }
1068}
1069
1070#endif // USE_LIBCURL
1071
1072
1074#ifdef USE_LIBCURL
1075 CURL* h = ::curl_easy_init();
1076 DBG_PRINT("CURL: Created user handle %p", h);
1077 return static_cast<jau::io::net_tk_handle>( h );
1078#else
1079 return static_cast<jau::io::net_tk_handle>( nullptr );
1080#endif
1081}
1083#ifdef USE_LIBCURL
1084 CURL* h = static_cast<CURL*>(handle);
1085 if( nullptr != h ) {
1086 DBG_PRINT("CURL: Freeing user handle %p", h);
1087 curl_easy_cleanup(h);
1088 }
1089#else
1090 (void)handle;
1091#endif
1092}
1093
1095 http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer,
1096 const SyncStreamConsumerFunc& consumer_fn) noexcept {
1097 /* init user referenced values */
1098 SyncStreamResponseRef res = std::make_shared<SyncStreamResponse>(handle);
1099
1100#ifdef USE_LIBCURL
1101 if( !uri_tk::protocol_supported(url) ) {
1102#endif // USE_LIBCURL
1103 (void)httpPostReq;
1104 (void)buffer;
1105 (void)consumer_fn;
1108 // buffer.set_end_of_input(true);
1109 const std::string_view scheme = uri_tk::get_scheme(url);
1110 DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
1111 std::string(scheme), jau::to_string(uri_tk::supported_protocols(), ","));
1112 return res;
1113#ifdef USE_LIBCURL
1114 }
1115 curl_glue2_sync_t cg (handle, std::move(httpPostReq), buffer, res, consumer_fn );
1116 read_url_stream_sync(url.c_str(), cg);
1117 return res;
1118#endif // USE_LIBCURL
1119}
1120
1122 http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer,
1123 const AsyncStreamConsumerFunc& consumer_fn) noexcept {
1124 /* init user referenced values */
1125 AsyncStreamResponseRef res = std::make_shared<AsyncStreamResponse>(handle);
1126
1127#ifdef USE_LIBCURL
1128 if( !uri_tk::protocol_supported(url) ) {
1129#endif // USE_LIBCURL
1130 (void)httpPostReq;
1131 (void)buffer;
1132 (void)consumer_fn;
1135 // buffer.set_end_of_input(true);
1136 const std::string_view scheme = uri_tk::get_scheme(url);
1137 DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
1138 std::string(scheme), jau::to_string(uri_tk::supported_protocols(), ","));
1139 return res;
1140#ifdef USE_LIBCURL
1141 }
1142
1143 std::unique_ptr<curl_glue2_async_t> cg ( std::make_unique<curl_glue2_async_t>(handle, std::move(httpPostReq),
1144 buffer, res, consumer_fn ) );
1145
1146 res->thread = std::thread(&::read_url_stream_async, url.c_str(), std::move(cg)); // @suppress("Invalid arguments")
1147 return res;
1148#endif // USE_LIBCURL
1149}
1150
1151void jau::io::print_stats(const std::string& prefix, const uint64_t& out_bytes_total, const jau::fraction_i64& td) noexcept {
1152 PLAIN_PRINT(true, "%s: Duration %s s, %s ms", prefix,
1153 td.toString(), jau::to_decstring(td.to_ms()));
1154
1155 if( out_bytes_total >= 100'000'000 ) {
1156 PLAIN_PRINT(true, "%s: Size %s MB", prefix,
1157 jau::to_decstring(std::llround((double)out_bytes_total/1'000'000.0)));
1158 } else if( out_bytes_total >= 100'000 ) {
1159 PLAIN_PRINT(true, "%s: Size %s KB", prefix,
1160 jau::to_decstring(std::llround((double)out_bytes_total/1'000.0)));
1161 } else {
1162 PLAIN_PRINT(true, "%s: Size %s B", prefix,
1163 jau::to_decstring(out_bytes_total));
1164 }
1165
1166 const uint64_t _rate_bps = std::llround( (double)out_bytes_total / td.to_double() ); // bytes per second
1167 const uint64_t _rate_bitps = std::llround( ( (double)out_bytes_total * 8.0 ) / td.to_double() ); // bits per second
1168
1169 if( _rate_bitps >= 100'000'000 ) {
1170 PLAIN_PRINT(true, "%s: Bitrate %s Mbit/s, %s MB/s", prefix,
1171 jau::to_decstring(std::llround((double)_rate_bitps/1'000'000.0)),
1172 jau::to_decstring(std::llround((double)_rate_bps/1'000'000.0)));
1173 } else if( _rate_bitps >= 100'000 ) {
1174 PLAIN_PRINT(true, "%s: Bitrate %s kbit/s, %s kB/s", prefix,
1175 jau::to_decstring(std::llround((double)_rate_bitps/1'000.0)),
1176 jau::to_decstring(std::llround((double)_rate_bps/1'000.0)));
1177 } else {
1178 PLAIN_PRINT(true, "%s: Bitrate %s bit/s, %s B/s", prefix,
1179 jau::to_decstring(_rate_bitps),
1180 jau::to_decstring(_rate_bps));
1181 }
1182}
1183
static bool getBooleanProperty(const std::string &name, const bool default_value) noexcept
Returns the boolean value of the environment's variable 'name', or the 'default_value' if the environ...
static environment & get(const std::string &root_prefix_domain="jau") noexcept
Static singleton initialization of this project's environment with the given global root prefix_domai...
File based byte input stream, including named file descriptor.
Byte stream interface.
bool wait_until_completion(const jau::fraction_i64 &timeout) noexcept
Wait until completed() has been reached.
Definition io_util.cpp:457
void notify_complete(const int32_t response_code=200) noexcept
Notify completion, see completed()
Definition io_util.cpp:448
bool completed() const noexcept
Returns whether URL header is completed.
Definition io_util.hpp:198
int32_t response_code() const noexcept
Definition io_util.hpp:200
void interruptReader() noexcept
Interrupt a potentially blocked reader once.
void set_end_of_input(const bool v=true) noexcept
Set End of Input from writer thread, unblocking all read-operations and a potentially currently block...
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
Definition debug.hpp:178
#define DBG_PRINT(fmt,...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
Definition debug.hpp:128
#define IRQ_PRINT(...)
Use for unconditional interruption messages, prefix '[elapsed_time] Interrupted @ FILE:LINE FUNC: '.
Definition debug.hpp:187
#define PLAIN_PRINT(printPrefix, fmt,...)
Use for unconditional plain messages, prefix '[elapsed_time] ' if printPrefix == true.
Definition debug.hpp:197
std::string to_string(const endian_t v) noexcept
Return std::string representation of the given endian.
@ little
Identifier for little endian, equivalent to endian::little.
fraction_timespec getMonotonicTime() noexcept
Returns current monotonic time since Unix Epoch 00:00:00 UTC on 1970-01-01.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
@ std
Denotes a func::std_target_t.
AsyncStreamResponseRef read_url_stream_async(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const AsyncStreamConsumerFunc &consumer_fn) noexcept
Asynchronous URL stream reader using the given AsyncStreamConsumerFunc consumer_fn.
Definition io_util.cpp:1121
jau::ordered_atomic< io_result_t, std::memory_order_relaxed > relaxed_atomic_io_result_t
Definition io_util.hpp:65
SyncStreamResponseRef read_url_stream_sync(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const SyncStreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given SyncStreamConsumerFunc consumer_fn.
Definition io_util.cpp:1094
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
Definition io_util.cpp:206
net_tk_handle create_net_tk_handle() noexcept
creates a reusable handle, free with free_net_tk_handle() after use.
Definition io_util.cpp:1073
uint64_t read_url_stream(const std::string &url, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given StreamConsumerFunc consumer_fn.
Definition io_util.cpp:313
void * net_tk_handle
Definition io_util.hpp:223
jau::function< bool(secure_vector< uint8_t > &, bool)> StreamConsumerFunc
Stream consumer function.
Definition io_util.hpp:82
uint64_t read_stream(ByteStream &in, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous byte input stream reader using the given StreamConsumerFunc consumer_fn.
Definition io_util.cpp:61
bool is_httpx_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the http or https protocol, i....
Definition io_util.cpp:210
std::string_view get_scheme(const std::string_view &uri) noexcept
Returns the valid uri-scheme from given uri, which is empty if no valid scheme is included.
Definition io_util.cpp:184
std::vector< std::string_view > supported_protocols() noexcept
Returns a list of supported protocol supported by libcurl network protocols, queried at runtime.
Definition io_util.cpp:161
jau::function< bool(AsyncStreamResponse &, const uint8_t *, size_t, bool)> AsyncStreamConsumerFunc
Asynchronous stream consumer function.
Definition io_util.hpp:356
std::string toString(io_result_t v) noexcept
Definition io_util.hpp:67
std::shared_ptr< SyncStreamResponse > SyncStreamResponseRef
Definition io_util.hpp:267
std::shared_ptr< AsyncStreamResponse > AsyncStreamResponseRef
Definition io_util.hpp:348
jau::function< bool(SyncStreamResponse &, const uint8_t *, size_t, bool)> SyncStreamConsumerFunc
Synchronous stream consumer function.
Definition io_util.hpp:275
jau::ringbuffer< uint8_t, size_t > ByteRingbuffer
Definition io_util.hpp:37
void free_net_tk_handle(net_tk_handle handle) noexcept
frees a handle after use created by create_net_tk_handle()
Definition io_util.cpp:1082
std::vector< T, jau::callocator_sec< T > > secure_vector
Definition io_util.hpp:33
void print_stats(const std::string &prefix, const uint64_t &out_bytes_total, const jau::fraction_i64 &td) noexcept
Definition io_util.cpp:1151
uint64_t read_file(const std::string &input_file, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous byte input stream reader from given file path using the given StreamConsumerFunc consumer...
Definition io_util.cpp:48
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
Definition io_util.cpp:196
@ read
Read capabilities.
@ NONE
Operation still in progress.
Definition io_util.hpp:60
@ FAILED
Operation failed.
Definition io_util.hpp:57
@ SUCCESS
Operation succeeded.
Definition io_util.hpp:63
@ timeout
Input or output operation failed due to timeout.
std::string toHexString(const void *data, const nsize_t length, const lb_endian_t byteOrder=lb_endian_t::big, const LoUpCase capitalization=LoUpCase::lower, const PrefixOpt prefix=PrefixOpt::prefix) noexcept
Produce a hexadecimal string representation of the given lsb-first byte values.
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
static uint64_t _read_buffer(ByteStream &in, secure_vector< uint8_t > &buffer) noexcept
Definition io_util.cpp:91
static bool _is_scheme_valid(const std::string_view &scheme) noexcept
Definition io_util.cpp:175
constexpr const jau::fraction_i64 zero(0l, 1lu)
zero is 0/1
std::unique_ptr< PostRequest > PostRequestPtr
Definition io_util.hpp:220
std::cv_status wait_until(std::condition_variable &cv, std::unique_lock< std::mutex > &lock, const fraction_timespec &absolute_time, const bool monotonic=true) noexcept
wait_until causes the current thread to block until the condition variable is notified,...
STL namespace.
Timespec structure using int64_t for its components in analogy to struct timespec_t on 64-bit platfor...
Asynchronous stream response.
Definition io_util.hpp:310
relaxed_atomic_io_result_t result
tracking io_result_t. If set to other than io_result_t::NONE while streaming, streaming is aborted....
Definition io_util.hpp:342
std::thread thread
background reading thread unique-pointer
Definition io_util.hpp:332
relaxed_atomic_bool has_content_length
indicating whether content_length is known from server
Definition io_util.hpp:336
relaxed_atomic_uint64 total_read
tracking the total_read
Definition io_util.hpp:340
relaxed_atomic_uint64 content_length
content_length tracking the content_length
Definition io_util.hpp:338
url_header_resp header_resp
synchronized URL header response completion
Definition io_util.hpp:334
Synchronous stream response.
Definition io_util.hpp:231
uint64_t total_read
tracking the total_read
Definition io_util.hpp:259
uint64_t content_length
content_length tracking the content_length
Definition io_util.hpp:257
url_header_resp header_resp
synchronized URL header response completion
Definition io_util.hpp:253
relaxed_atomic_io_result_t result
tracking io_result_t. If set to other than io_result_t::NONE while streaming, streaming is aborted....
Definition io_util.hpp:261
bool has_content_length
indicating whether content_length is known from server
Definition io_util.hpp:255
std::unordered_map< std::string, std::string > header
Definition io_util.hpp:217
CXX_ALWAYS_INLINE _Tp fetch_add(_Tp __i) noexcept
CXX_ALWAYS_INLINE _Tp load() const noexcept