jaulib v1.3.8
Jau Support Library (C++, Java, ..)
Loading...
Searching...
No Matches
io_util.cpp
Go to the documentation of this file.
1/*
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2021-2023 Gothel Software e.K.
4 * Copyright (c) 2021 ZAFENA AB
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25
26#include <chrono>
27#include <memory>
28#include <unordered_map>
29
30// #include <botan_all.h>
31
32#include <jau/debug.hpp>
33#include <jau/io_util.hpp>
34#include <jau/byte_stream.hpp>
35#include <jau/string_util.hpp>
36
37#ifdef USE_LIBCURL
38 #include <curl/curl.h>
39#endif
40
41#include <thread>
42#include <pthread.h>
43
44using namespace jau::io;
45using namespace jau::fractions_i64_literals;
46
47
48uint64_t jau::io::read_file(const std::string& input_file,
50 const StreamConsumerFunc& consumer_fn) noexcept
51{
52 if(input_file == "-") {
53 ByteInStream_File in(0); // stdin
54 return read_stream(in, buffer, consumer_fn);
55 } else {
56 ByteInStream_File in(input_file);
57 return read_stream(in, buffer, consumer_fn);
58 }
59}
60
63 const StreamConsumerFunc& consumer_fn) noexcept {
64 uint64_t total = 0;
65 bool has_more;
66 do {
67 if( in.available(1) ) { // at least one byte to stream, also considers eof
68 buffer.resize(buffer.capacity());
69 const uint64_t got = in.read(buffer.data(), buffer.capacity());
70
71 buffer.resize(got);
72 total += got;
73 has_more = 1 <= got && !in.fail() && ( !in.has_content_size() || total < in.content_size() );
74 try {
75 if( !consumer_fn(buffer, !has_more) ) {
76 break; // end streaming
77 }
78 } catch (std::exception &e) {
79 ERR_PRINT("jau::io::read_stream: Caught exception: %s", e.what());
80 break; // end streaming
81 }
82 } else {
83 has_more = false;
84 buffer.resize(0);
85 consumer_fn(buffer, true); // forced final, zero size
86 }
87 } while( has_more );
88 return total;
89}
90
91static uint64_t _read_buffer(ByteInStream& in,
92 secure_vector<uint8_t>& buffer) noexcept {
93 if( in.available(1) ) { // at least one byte to stream, also considers eof
94 buffer.resize(buffer.capacity());
95 const uint64_t got = in.read(buffer.data(), buffer.capacity());
96 buffer.resize(got);
97 return got;
98 }
99 return 0;
100}
101
104 const StreamConsumerFunc& consumer_fn) noexcept {
105 secure_vector<uint8_t>* buffers[] = { &buffer1, &buffer2 };
106 bool eof[] = { false, false };
107
108 bool eof_read = false;
109 uint64_t total_send = 0;
110 uint64_t total_read = 0;
111 int idx = 0;
112 // fill 1st buffer upfront
113 {
114 uint64_t got = _read_buffer(in, *buffers[idx]);
115 total_read += got;
116 eof_read = 0 == got || in.fail() || ( in.has_content_size() && total_read >= in.content_size() );
117 eof[idx] = eof_read;
118 ++idx;
119 }
120
121 // - buffer_idx was filled
122 // - buffer_idx++
123 //
124 // - while !eof_send do
125 // - read buffer_idx if not eof_read,
126 // - set eof[buffer_idx+1]=true if zero bytes
127 // - buffer_idx++
128 // - sent buffer_idx
129 //
130 bool eof_send = false;
131 while( !eof_send ) {
132 int bidx_next = ( idx + 1 ) % 2;
133 if( !eof_read ) {
134 uint64_t got = _read_buffer(in, *buffers[idx]);
135 total_read += got;
136 eof_read = 0 == got || in.fail() || ( in.has_content_size() && total_read >= in.content_size() );
137 eof[idx] = eof_read;
138 if( 0 == got ) {
139 // read-ahead eof propagation if read zero bytes,
140 // hence next consumer_fn() will send last bytes with is_final=true
141 eof[bidx_next] = true;
142 }
143 }
144 idx = bidx_next;
145
146 secure_vector<uint8_t>* buffer = buffers[idx];
147 eof_send = eof[idx];
148 total_send += buffer->size();
149 try {
150 if( !consumer_fn(*buffer, eof_send) ) {
151 return total_send; // end streaming
152 }
153 } catch (std::exception &e) {
154 ERR_PRINT("jau::io::read_stream: Caught exception: %s", e.what());
155 return total_send; // end streaming
156 }
157 }
158 return total_send;
159}
160
161std::vector<std::string_view> jau::io::uri_tk::supported_protocols() noexcept {
162 std::vector<std::string_view> res;
163#ifdef USE_LIBCURL
164 const curl_version_info_data* cvid = curl_version_info(CURLVERSION_NOW);
165 if( nullptr == cvid || nullptr == cvid->protocols ) {
166 return res;
167 }
168 for(int i=0; nullptr != cvid->protocols[i]; ++i) {
169 res.emplace_back( cvid->protocols[i] );
170 }
171#endif // USE_LIBCURL
172 return res;
173}
174
175static bool _is_scheme_valid(const std::string_view& scheme) noexcept {
176 if ( scheme.empty() ) {
177 return false;
178 }
179 auto pos = std::find_if_not(scheme.begin(), scheme.end(), [&](char c){
180 return std::isalnum(c) || c == '+' || c == '.' || c == '-';
181 });
182 return pos == scheme.end();
183}
184std::string_view jau::io::uri_tk::get_scheme(const std::string_view& uri) noexcept {
185 std::size_t pos = uri.find(':');
186 if (pos == std::string_view::npos) {
187 return uri.substr(0, 0);
188 }
189 std::string_view scheme = uri.substr(0, pos);
190 if( !_is_scheme_valid( scheme ) ) {
191 return uri.substr(0, 0);
192 }
193 return scheme;
194}
195
196bool jau::io::uri_tk::protocol_supported(const std::string_view& uri) noexcept {
197 const std::string_view scheme = get_scheme(uri);
198 if( scheme.empty() ) {
199 return false;
200 }
201 const std::vector<std::string_view> protos = supported_protocols();
202 auto it = std::find(protos.cbegin(), protos.cend(), scheme);
203 return protos.cend() != it;
204}
205
206bool jau::io::uri_tk::is_local_file_protocol(const std::string_view& uri) noexcept {
207 return uri.starts_with("file://");
208}
209
210bool jau::io::uri_tk::is_httpx_protocol(const std::string_view& uri) noexcept {
211 const std::string_view scheme = get_scheme(uri);
212 if( scheme.empty() ) {
213 return false;
214 }
215 return "https" == scheme || "http" == scheme;
216}
217
218#ifdef USE_LIBCURL
219
220struct curl_glue1_t {
221 CURL *curl_handle;
222 bool has_content_length;
223 uint64_t content_length;
224 int32_t status_code;
225 uint64_t total_read;
227 StreamConsumerFunc consumer_fn;
228};
229
230static size_t consume_header_curl1(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept {
231 curl_glue1_t * cg = (curl_glue1_t*)userdata;
232
233 {
234 long v;
235 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
236 if( CURLE_OK == r ) {
237 cg->status_code = static_cast<int32_t>(v);
238 if( 400 <= v ) {
239 IRQ_PRINT("response_code: %ld", v);
240 return 0;
241 } else {
242 DBG_PRINT("consume_header_curl1.0 response_code: %ld", v);
243 }
244 }
245 }
246 if( !cg->has_content_length ) {
247 curl_off_t v = 0;
248 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
249 if( CURLE_OK == r ) {
250 if( 0 > v ) { // curl returns -1 if the size if not known
251 cg->content_length = 0;
252 cg->has_content_length = false;
253 } else {
254 cg->content_length = v;
255 cg->has_content_length = true;
256 }
257 }
258 }
259 const size_t realsize = size * nmemb;
260
261 if( false ) {
262 DBG_PRINT("consume_header_curl1.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " )",
263 realsize, cg->total_read, cg->has_content_length, cg->content_length );
264 std::string blob(buffer, realsize);
265 jau::PLAIN_PRINT(true, "%s", blob.c_str());
266 }
267
268 return realsize;
269}
270
271static size_t consume_data_curl1(char *ptr, size_t size, size_t nmemb, void *userdata) noexcept {
272 curl_glue1_t * cg = (curl_glue1_t*)userdata;
273
274 if( !cg->has_content_length ) {
275 curl_off_t v = 0;
276 CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
277 if( CURLE_OK == r ) {
278 if( 0 > v ) { // curl returns -1 if the size if not known
279 cg->content_length = 0;
280 cg->has_content_length = false;
281 } else {
282 cg->content_length = v;
283 cg->has_content_length = true;
284 }
285 }
286 }
287 const size_t realsize = size * nmemb;
288 DBG_PRINT("consume_data_curl1.0 realsize %zu", realsize);
289 cg->buffer.resize(realsize);
290 memcpy(cg->buffer.data(), ptr, realsize);
291
292 cg->total_read += realsize;
293 const bool is_final = 0 == realsize ||
294 cg->has_content_length ? cg->total_read >= cg->content_length : false;
295
296 DBG_PRINT("consume_data_curl1.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d",
297 realsize, cg->total_read, cg->has_content_length, cg->content_length, is_final );
298
299 try {
300 if( !cg->consumer_fn(cg->buffer, is_final) ) {
301 return 0; // end streaming
302 }
303 } catch (std::exception &e) {
304 ERR_PRINT("jau::io::read_url_stream: Caught exception: %s", e.what());
305 return 0; // end streaming
306 }
307
308 return realsize;
309}
310
311#endif // USE_LIBCURL
312
313uint64_t jau::io::read_url_stream(const std::string& url,
315 const StreamConsumerFunc& consumer_fn) noexcept {
316#ifdef USE_LIBCURL
317 std::vector<char> errorbuffer;
318 errorbuffer.reserve(CURL_ERROR_SIZE);
319 CURLcode res;
320
321 if( !uri_tk::protocol_supported(url) ) {
322 const std::string_view scheme = uri_tk::get_scheme(url);
323 DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
324 std::string(scheme).c_str(), jau::to_string(uri_tk::supported_protocols(), ",").c_str());
325 return 0;
326 }
327
328 /* init the curl session */
329 CURL *curl_handle = curl_easy_init();
330 DBG_PRINT("CURL: Create own handle %p", curl_handle);
331 if( nullptr == curl_handle ) {
332 ERR_PRINT("Error setting up url %s, null curl handle", url.c_str());
333 return 0;
334 }
335
336 curl_glue1_t cg = { curl_handle, false, 0, 0, 0, buffer, consumer_fn };
337
338 res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
339 if( CURLE_OK != res ) {
340 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
341 url.c_str(), (int)res, curl_easy_strerror(res), errorbuffer.data());
342 goto errout;
343 }
344
345 /* set URL to get here */
346 res = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
347 if( CURLE_OK != res ) {
348 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
349 url.c_str(), (int)res, curl_easy_strerror(res), errorbuffer.data());
350 goto errout;
351 }
352
353 /* Switch on full protocol/debug output while testing */
354 res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE,
355 jau::environment::getBooleanProperty("jau_io_net_verbose", false) ? 1L : 0L );
356 if( CURLE_OK != res ) {
357 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
358 url.c_str(), (int)res, curl_easy_strerror(res), errorbuffer.data());
359 goto errout;
360 }
361
362 if( !jau::environment::getBooleanProperty("jau_io_net_ssl_verifypeer", true) ) {
363 res = curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
364 if( CURLE_OK != res ) {
365 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
366 url.c_str(), (int)res, curl_easy_strerror(res), errorbuffer.data());
367 goto errout;
368 }
369 }
370
371 /* disable progress meter, set to 0L to enable it */
372 res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
373 if( CURLE_OK != res ) {
374 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
375 url.c_str(), (int)res, curl_easy_strerror(res), errorbuffer.data());
376 goto errout;
377 }
378
379 /* Suppress proxy CONNECT response headers from user callbacks */
380 res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
381 if( CURLE_OK != res ) {
382 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
383 url.c_str(), (int)res, curl_easy_strerror(res), errorbuffer.data());
384 goto errout;
385 }
386
387 /* Don't pass headers to the data stream. */
388 res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
389 if( CURLE_OK != res ) {
390 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
391 url.c_str(), (int)res, curl_easy_strerror(res), errorbuffer.data());
392 goto errout;
393 }
394
395 /* send header data to this function */
396 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, consume_header_curl1);
397 if( CURLE_OK != res ) {
398 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
399 url.c_str(), (int)res, curl_easy_strerror(res), errorbuffer.data());
400 goto errout;
401 }
402
403 /* set userdata for consume_header_curl2 */
404 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (void*)&cg);
405 if( CURLE_OK != res ) {
406 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
407 url.c_str(), (int)res, curl_easy_strerror(res), errorbuffer.data());
408 goto errout;
409 }
410
411 /* send all data to this function */
412 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, consume_data_curl1);
413 if( CURLE_OK != res ) {
414 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
415 url.c_str(), (int)res, curl_easy_strerror(res), errorbuffer.data());
416 goto errout;
417 }
418
419 /* write the page body to this file handle */
420 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void*)&cg);
421 if( CURLE_OK != res ) {
422 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
423 url.c_str(), (int)res, curl_easy_strerror(res), errorbuffer.data());
424 goto errout;
425 }
426
427 /* performs the tast, blocking! */
428 res = curl_easy_perform(curl_handle);
429 if( CURLE_OK != res ) {
430 IRQ_PRINT("Error processing url %s, error %d '%s' '%s'",
431 url.c_str(), (int)res, curl_easy_strerror(res), errorbuffer.data());
432 goto errout;
433 }
434
435 /* cleanup curl stuff */
436 DBG_PRINT("CURL: Freeing own handle %p", curl_handle);
437 curl_easy_cleanup(curl_handle);
438 return cg.total_read;
439
440errout:
441 DBG_PRINT("CURL: Freeing own handle %p", curl_handle);
442 curl_easy_cleanup(curl_handle);
443#else // USE_LIBCURL
444 (void) url;
445 (void) buffer;
446 (void) consumer_fn;
447#endif // USE_LIBCURL
448 return 0;
449}
450
452 {
453 std::unique_lock<std::mutex> lockWrite(m_sync);
454 m_completed = true;
455 m_response_code = response_code;
456 }
457 m_cv.notify_all(); // have mutex unlocked before notify_all to avoid pessimistic re-block of notified wait() thread.
458}
459
461 std::unique_lock<std::mutex> lock(m_sync);
463 while( !m_completed ) {
465 m_cv.wait(lock);
466 } else {
467 std::cv_status s = wait_until(m_cv, lock, timeout_time );
468 if( std::cv_status::timeout == s && !m_completed ) {
469 return false;
470 }
471 }
472 }
473 return m_completed;
474}
475
476#ifdef USE_LIBCURL
477
478struct curl_glue2_sync_t {
479 curl_glue2_sync_t(void *_curl_handle,
480 http::PostRequestPtr _post_request,
481 ByteRingbuffer *_buffer,
482 SyncStreamResponseRef _response,
483 SyncStreamConsumerFunc _consumer_fn)
484 : curl_handle(reinterpret_cast<CURL*>(_curl_handle)),
485 post_request(std::move(_post_request)),
486 buffer(_buffer),
487 response_code(0),
488 response(std::move(_response)),
489 consumer_fn(std::move(_consumer_fn))
490 {}
491
492 CURL *curl_handle;
493 http::PostRequestPtr post_request;
494 ByteRingbuffer *buffer;
495 int32_t response_code;
496 SyncStreamResponseRef response;
497 SyncStreamConsumerFunc consumer_fn;
498
499 void interrupt_all() noexcept {
500 if( buffer ) {
501 buffer->interruptReader();
502 }
503 response->header_resp.notify_complete(response_code);
504 }
505 void set_end_of_input() noexcept {
506 if( buffer ) {
507 buffer->set_end_of_input(true);
508 }
509 response->header_resp.notify_complete(response_code);
510 }
511};
512
513struct curl_glue2_async_t {
514 curl_glue2_async_t(void *_curl_handle,
515 http::PostRequestPtr _post_request,
516 ByteRingbuffer *_buffer,
517 AsyncStreamResponseRef _response,
518 AsyncStreamConsumerFunc _consumer_fn)
519 : curl_handle(reinterpret_cast<CURL*>(_curl_handle)),
520 post_request(std::move(_post_request)),
521 buffer(_buffer),
522 response_code(0),
523 response(std::move(_response)),
524 consumer_fn(std::move(_consumer_fn))
525 {}
526
527 CURL *curl_handle;
528 http::PostRequestPtr post_request;
529 ByteRingbuffer *buffer;
530 int32_t response_code;
531 AsyncStreamResponseRef response;
532 AsyncStreamConsumerFunc consumer_fn;
533
534 void interrupt_all() noexcept {
535 if( buffer ) {
536 buffer->interruptReader();
537 }
538 response->header_resp.notify_complete(response_code);
539 }
540 void set_end_of_input() noexcept {
541 if( buffer ) {
542 buffer->set_end_of_input(true);
543 }
544 response->header_resp.notify_complete(response_code);
545 }
546};
547
548static size_t consume_header_curl2_sync(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept {
549 curl_glue2_sync_t * cg = (curl_glue2_sync_t*)userdata;
550 SyncStreamResponse& response = *cg->response;
551
552 if( io_result_t::NONE != response.result ) {
553 // user abort!
554 DBG_PRINT("consume_header_curl2_sync ABORT by User: total %" PRIi64 ", result %d, rb %s",
555 response.total_read, response.result.load() );
556 cg->set_end_of_input();
557 return 0;
558 }
559
560 {
561 long v;
562 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
563 if( CURLE_OK == r ) {
564 cg->response_code = static_cast<int32_t>(v);
565 if( 400 <= v ) {
566 IRQ_PRINT("response_code: %ld", v);
567 response.result = io_result_t::FAILED;
568 cg->set_end_of_input();
569 return 0;
570 } else {
571 DBG_PRINT("consume_header_curl2.0 response_code: %ld", v);
572 }
573 }
574 }
575 if( !response.has_content_length ) {
576 curl_off_t v = 0;
577 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
578 if( CURLE_OK == r ) {
579 if( 0 <= v ) { // curl returns -1 if the size is not known
580 response.content_length = v;
581 response.has_content_length = true;
582 }
583 }
584 }
585 const size_t realsize = size * nmemb;
586
587 if( 2 == realsize && 0x0d == buffer[0] && 0x0a == buffer[1] ) {
588 response.header_resp.notify_complete(cg->response_code);
589 DBG_PRINT("consume_header_curl2.0 header_completed");
590 }
591
592 if( false ) {
593 DBG_PRINT("consume_header_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), result %d",
594 realsize, response.total_read, response.has_content_length, response.content_length, response.result.load() );
595 std::string blob(buffer, realsize);
596 jau::PLAIN_PRINT(true, "%s", jau::bytesHexString((uint8_t*)buffer, 0, realsize, true /* lsbFirst */).c_str());
597 jau::PLAIN_PRINT(true, "%s", blob.c_str());
598 }
599
600 return realsize;
601}
602
603static size_t consume_header_curl2_async(char *buffer, size_t size, size_t nmemb, void *userdata) noexcept {
604 curl_glue2_async_t * cg = (curl_glue2_async_t*)userdata;
605 AsyncStreamResponse& response = *cg->response;
606
607 if( io_result_t::NONE != response.result ) {
608 // user abort!
609 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
610 DBG_PRINT("consume_header_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s",
611 response.total_read.load(), response.result.load(), s.c_str() );
612 cg->set_end_of_input();
613 return 0;
614 }
615
616 {
617 long v;
618 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_RESPONSE_CODE, &v);
619 if( CURLE_OK == r ) {
620 cg->response_code = static_cast<int32_t>(v);
621 if( 400 <= v ) {
622 IRQ_PRINT("response_code: %ld", v);
623 response.result = io_result_t::FAILED;
624 cg->set_end_of_input();
625 return 0;
626 } else {
627 DBG_PRINT("consume_header_curl2.0 response_code: %ld", v);
628 }
629 }
630 }
631 if( !response.has_content_length ) {
632 curl_off_t v = 0;
633 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
634 if( CURLE_OK == r ) {
635 if( 0 <= v ) { // curl returns -1 if the size is not known
636 response.content_length = v;
637 response.has_content_length = true;
638 }
639 }
640 }
641 const size_t realsize = size * nmemb;
642
643 if( 2 == realsize && 0x0d == buffer[0] && 0x0a == buffer[1] ) {
644 response.header_resp.notify_complete(cg->response_code);
645 DBG_PRINT("consume_header_curl2.0 header_completed");
646 }
647
648 if( false ) {
649 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
650 DBG_PRINT("consume_header_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), result %d, rb %s",
651 realsize, response.total_read.load(), response.has_content_length.load(), response.content_length.load(), response.result.load(), s.c_str() );
652 std::string blob(buffer, realsize);
653 jau::PLAIN_PRINT(true, "%s", jau::bytesHexString((uint8_t*)buffer, 0, realsize, true /* lsbFirst */).c_str());
654 jau::PLAIN_PRINT(true, "%s", blob.c_str());
655 }
656
657 return realsize;
658}
659
660static size_t consume_data_curl2_sync(char *ptr, size_t size, size_t nmemb, void *userdata) noexcept {
661 curl_glue2_sync_t * cg = (curl_glue2_sync_t*)userdata;
662 SyncStreamResponse& response = *cg->response;
663
664 if( io_result_t::NONE != response.result ) {
665 // user abort!
666 // user abort!
667 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
668 DBG_PRINT("consume_data_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s",
669 response.total_read, response.result.load(), s.c_str() );
670 cg->set_end_of_input();
671 return 0;
672 }
673
674 if( !response.has_content_length ) {
675 curl_off_t v = 0;
676 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
677 if( CURLE_OK == r ) {
678 if( 0 <= v ) { // curl returns -1 if the size if not known
679 response.content_length = v;
680 response.has_content_length = true;
681 }
682 }
683 }
684
685 // Ensure header completion is being sent
686 if( !response.header_resp.completed() ) {
687 response.header_resp.notify_complete();
688 }
689
690 const size_t realsize = size * nmemb;
691 if( jau::environment::get().debug ) {
692 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
693 DBG_PRINT("consume_data_curl2.0 realsize %zu, rb %s", realsize, s.c_str() );
694 }
695 if( cg->buffer ) {
696 bool timeout_occured;
697 if( !cg->buffer->putBlocking(reinterpret_cast<uint8_t*>(ptr),
698 reinterpret_cast<uint8_t*>(ptr)+realsize, 0_s, timeout_occured) ) {
699 DBG_PRINT("consume_data_curl2 Failed put: total %" PRIi64 ", result %d, timeout %d, rb %s",
700 response.total_read, response.result.load(), timeout_occured, cg->buffer->toString().c_str() );
701 if( timeout_occured ) {
702 cg->set_end_of_input();
703 }
704 return 0;
705 }
706 }
707
708 response.total_read += realsize;
709 const bool is_final = 0 == realsize ||
710 response.has_content_length ? response.total_read >= response.content_length : false;
711 if( is_final ) {
712 response.result = io_result_t::SUCCESS;
713 cg->set_end_of_input();
714 }
715 if( cg->consumer_fn ) {
716 try {
717 if( !cg->consumer_fn(*cg->response, reinterpret_cast<uint8_t*>(ptr), realsize, is_final) ) {
718 return 0; // end streaming
719 }
720 } catch (std::exception &e) {
721 ERR_PRINT("jau::io::read_url_stream: Caught exception: %s", e.what());
722 return 0; // end streaming
723 }
724 }
725
726 if( jau::environment::get().debug ) {
727 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
728 DBG_PRINT("consume_data_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d, result %d, rb %s",
729 realsize, response.total_read, response.has_content_length, response.content_length, is_final, response.result.load(), s.c_str() );
730 }
731
732 return realsize;
733}
734
735static size_t consume_data_curl2_async(char *ptr, size_t size, size_t nmemb, void *userdata) noexcept {
736 curl_glue2_async_t * cg = (curl_glue2_async_t*)userdata;
737 AsyncStreamResponse& response = *cg->response;
738
739 if( io_result_t::NONE != response.result ) {
740 // user abort!
741 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
742 DBG_PRINT("consume_data_curl2 ABORT by User: total %" PRIi64 ", result %d, rb %s",
743 response.total_read.load(), response.result.load(), s.c_str() );
744 cg->set_end_of_input();
745 return 0;
746 }
747
748 if( !response.has_content_length ) {
749 curl_off_t v = 0;
750 const CURLcode r = curl_easy_getinfo(cg->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &v);
751 if( CURLE_OK == r ) {
752 if( 0 <= v ) { // curl returns -1 if the size if not known
753 response.content_length = v;
754 response.has_content_length = true;
755 }
756 }
757 }
758
759 // Ensure header completion is being sent
760 if( !response.header_resp.completed() ) {
761 response.header_resp.notify_complete();
762 }
763
764 const size_t realsize = size * nmemb;
765 if( jau::environment::get().debug ) {
766 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
767 DBG_PRINT("consume_data_curl2.0 realsize %zu, rb %s", realsize, s.c_str() );
768 }
769 if( cg->buffer ) {
770 bool timeout_occured;
771 if( !cg->buffer->putBlocking(reinterpret_cast<uint8_t*>(ptr),
772 reinterpret_cast<uint8_t*>(ptr)+realsize, 0_s, timeout_occured) ) {
773 DBG_PRINT("consume_data_curl2 Failed put: total %" PRIi64 ", result %d, timeout %d, rb %s",
774 response.total_read.load(), response.result.load(), timeout_occured, cg->buffer->toString().c_str() );
775 if( timeout_occured ) {
776 cg->set_end_of_input();
777 }
778 return 0;
779 }
780 }
781
782 response.total_read.fetch_add(realsize);
783 const bool is_final = 0 == realsize ||
784 response.has_content_length ? response.total_read >= response.content_length : false;
785 if( is_final ) {
786 response.result = io_result_t::SUCCESS;
787 cg->set_end_of_input();
788 }
789 if( cg->consumer_fn ) {
790 try {
791 if( !cg->consumer_fn(*cg->response, reinterpret_cast<uint8_t*>(ptr), realsize, is_final) ) {
792 return 0; // end streaming
793 }
794 } catch (std::exception &e) {
795 ERR_PRINT("jau::io::read_url_stream: Caught exception: %s", e.what());
796 return 0; // end streaming
797 }
798 }
799
800 if( jau::environment::get().debug ) {
801 const std::string s = cg->buffer ? cg->buffer->toString() : "null";
802 DBG_PRINT("consume_data_curl2.X realsize %zu, total %" PRIu64 " / ( content_len has %d, size %" PRIu64 " ), is_final %d, result %d, rb %s",
803 realsize, response.total_read.load(), response.has_content_length.load(), response.content_length.load(), is_final, response.result.load(), s.c_str() );
804 }
805
806 return realsize;
807}
808
809static bool read_url_stream_impl(CURL *curl_handle, std::vector<char>& errorbuffer,
810 const char *url, http::PostRequest *post_request,
812 curl_write_callback header_cb, curl_write_callback write_cb, void* ctx_data) noexcept
813{
814 struct curl_slist *header_slist = nullptr;
815 CURLcode res;
816
817 res = curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, errorbuffer.data());
818 if( CURLE_OK != res ) {
819 ERR_PRINT("Error setting up url %s, error %d '%s'",
820 url, (int)res, curl_easy_strerror(res));
821 goto errout;
822 }
823
824 /* set URL to get here */
825 res = curl_easy_setopt(curl_handle, CURLOPT_URL, url);
826 if( CURLE_OK != res ) {
827 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
828 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
829 goto errout;
830 }
831
832 if( nullptr != post_request ) {
833 http::PostRequest &post = *post_request;
834 // slist1 = curl_slist_append(slist1, "Content-Type: application/json");
835 // slist1 = curl_slist_append(slist1, "Accept: application/json");
836 if( post.header.size() > 0 ) {
837 for (const std::pair<const std::string, std::string>& n : post.header) {
838 std::string v = n.first+": "+n.second;
839 header_slist = curl_slist_append(header_slist, v.c_str());
840 }
841 curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, header_slist);
842 if( CURLE_OK != res ) {
843 ERR_PRINT("Error setting up POST header, error %d '%s' '%s'",
844 (int)res, curl_easy_strerror(res), errorbuffer.data());
845 goto errout;
846 }
847 }
848
849 res = curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, post.body.data());
850 if( CURLE_OK != res ) {
851 ERR_PRINT("Error setting up POST fields, error %d '%s' '%s'",
852 (int)res, curl_easy_strerror(res), errorbuffer.data());
853 goto errout;
854 }
855 }
856 /* Switch on full protocol/debug output while testing */
857 res = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE,
858 jau::environment::getBooleanProperty("jau_io_net_verbose", false) ? 1L : 0L );
859 if( CURLE_OK != res ) {
860 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
861 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
862 goto errout;
863 }
864
865 if( !jau::environment::getBooleanProperty("jau_io_net_ssl_verifypeer", true) ) {
866 res = curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
867 if( CURLE_OK != res ) {
868 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
869 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
870 goto errout;
871 }
872 }
873
874 /* disable progress meter, set to 0L to enable it */
875 res = curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
876 if( CURLE_OK != res ) {
877 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
878 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
879 goto errout;
880 }
881
882 /* Suppress proxy CONNECT response headers from user callbacks */
883 res = curl_easy_setopt(curl_handle, CURLOPT_SUPPRESS_CONNECT_HEADERS, 1L);
884 if( CURLE_OK != res ) {
885 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
886 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
887 goto errout;
888 }
889
890 /* Don't pass headers to the data stream. */
891 res = curl_easy_setopt(curl_handle, CURLOPT_HEADER, 0L);
892 if( CURLE_OK != res ) {
893 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
894 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
895 goto errout;
896 }
897
898 /* send header data to this function */
899 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_cb);
900 if( CURLE_OK != res ) {
901 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
902 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
903 goto errout;
904 }
905
906 /* set userdata for consume_header_curl2 */
907 res = curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, ctx_data);
908 if( CURLE_OK != res ) {
909 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
910 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
911 goto errout;
912 }
913
914 /* send received data to this function */
915 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, write_cb);
916 if( CURLE_OK != res ) {
917 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
918 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
919 goto errout;
920 }
921
922 /* set userdata for consume_data_curl2 */
923 res = curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, ctx_data);
924 if( CURLE_OK != res ) {
925 ERR_PRINT("Error setting up url %s, error %d '%s' '%s'",
926 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
927 goto errout;
928 }
929
930 /* performs the tast, blocking! */
931 res = curl_easy_perform(curl_handle);
932 if( CURLE_OK != res ) {
933 if( io_result_t::NONE == result ) {
934 // Error during normal processing
935 IRQ_PRINT("Error processing url %s, error %d '%s' '%s'",
936 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
937 } else {
938 // User aborted or response code error detected via consume_header_curl2
939 DBG_PRINT("Processing aborted url %s, error %d '%s' '%s'",
940 url, (int)res, curl_easy_strerror(res), errorbuffer.data());
941 }
942 goto errout;
943 }
944
945 if( nullptr != header_slist ) {
946 curl_slist_free_all(header_slist);
947 }
948 return true;
949
950errout:
951 if( nullptr != header_slist ) {
952 curl_slist_free_all(header_slist);
953 }
954 return false;
955}
956
957static void read_url_stream_sync(const char *url, curl_glue2_sync_t& cg) noexcept {
958 std::vector<char> errorbuffer;
959 errorbuffer.reserve(CURL_ERROR_SIZE);
960
961 bool owns_curl_handle = false;
962 CURL *curl_handle;
963 if( nullptr == cg.curl_handle ) {
964 /* init the curl session */
965 owns_curl_handle = true;
966 curl_handle = curl_easy_init();
967 if( nullptr == curl_handle ) {
968 ERR_PRINT("Error setting up url %s, null curl handle", url);
969 goto errout;
970 }
971 cg.curl_handle = curl_handle;
972 DBG_PRINT("CURL: Created own handle %p", curl_handle);
973 } else {
974 curl_handle = cg.curl_handle;
975 DBG_PRINT("CURL: Reusing own handle %p", curl_handle);
976 }
977
978 if( !read_url_stream_impl(curl_handle, errorbuffer,
979 url, cg.post_request.get(), cg.response->result,
980 consume_header_curl2_sync, consume_data_curl2_sync, (void*)&cg) ) {
981 goto errout;
982 }
983
984 // Ensure header completion is being sent
985 if( !cg.response->header_resp.completed() ) {
986 cg.response->header_resp.notify_complete();
987 }
988 if( cg.response->result != io_result_t::SUCCESS ) {
989 cg.response->result = io_result_t::SUCCESS;
990 if( cg.consumer_fn ) {
991 try {
992 cg.consumer_fn(*cg.response, nullptr, 0, true);
993 } catch (std::exception &e) {
994 ERR_PRINT("jau::io::read_url_stream: Caught exception: %s", e.what());
995 goto errout;
996 }
997 }
998 }
999 goto cleanup;
1000
1001errout:
1002 cg.response->result = io_result_t::FAILED;
1003 cg.set_end_of_input();
1004
1005cleanup:
1006 if( owns_curl_handle && nullptr != curl_handle ) {
1007 DBG_PRINT("CURL: Freeing own handle %p", curl_handle);
1008 curl_easy_cleanup(curl_handle);
1009 cg.curl_handle = nullptr;
1010 }
1011}
1012
1013static void read_url_stream_async(const char *url, std::unique_ptr<curl_glue2_async_t> && cg) noexcept {
1014 std::vector<char> errorbuffer;
1015 errorbuffer.reserve(CURL_ERROR_SIZE);
1016
1017 bool owns_curl_handle = false;
1018 CURL *curl_handle;
1019 if( nullptr == cg->curl_handle ) {
1020 /* init the curl session */
1021 owns_curl_handle = true;
1022 curl_handle = curl_easy_init();
1023 if( nullptr == curl_handle ) {
1024 ERR_PRINT("Error setting up url %s, null curl handle", url);
1025 goto errout;
1026 }
1027 cg->curl_handle = curl_handle;
1028 DBG_PRINT("CURL: Created own handle %p", curl_handle);
1029 } else {
1030 curl_handle = cg->curl_handle;
1031 DBG_PRINT("CURL: Reusing own handle %p", curl_handle);
1032 }
1033
1034 if( !read_url_stream_impl(curl_handle, errorbuffer,
1035 url, cg->post_request.get(), cg->response->result,
1036 consume_header_curl2_async, consume_data_curl2_async, (void*)cg.get()) ) {
1037 goto errout;
1038 }
1039
1040 // Ensure header completion is being sent
1041 if( !cg->response->header_resp.completed() ) {
1042 cg->response->header_resp.notify_complete();
1043 }
1044 if( cg->response->result != io_result_t::SUCCESS ) {
1045 cg->response->result = io_result_t::SUCCESS;
1046 if( cg->consumer_fn ) {
1047 try {
1048 cg->consumer_fn(*cg->response, nullptr, 0, true);
1049 } catch (std::exception &e) {
1050 ERR_PRINT("jau::io::read_url_stream: Caught exception: %s", e.what());
1051 goto errout;
1052 }
1053 }
1054 }
1055 goto cleanup;
1056
1057errout:
1058 cg->response->result = io_result_t::FAILED;
1059 cg->set_end_of_input();
1060
1061cleanup:
1062 if( owns_curl_handle && nullptr != curl_handle ) {
1063 DBG_PRINT("CURL: Freeing own handle %p", curl_handle);
1064 curl_easy_cleanup(curl_handle);
1065 cg->curl_handle = nullptr;
1066 }
1067}
1068
1069#endif // USE_LIBCURL
1070
1071
1073#ifdef USE_LIBCURL
1074 CURL* h = ::curl_easy_init();
1075 DBG_PRINT("CURL: Created user handle %p", h);
1076 return static_cast<jau::io::net_tk_handle>( h );
1077#else
1078 return static_cast<jau::io::net_tk_handle>( nullptr );
1079#endif
1080}
1082#ifdef USE_LIBCURL
1083 CURL* h = static_cast<CURL*>(handle);
1084 if( nullptr != h ) {
1085 DBG_PRINT("CURL: Freeing user handle %p", h);
1086 curl_easy_cleanup(h);
1087 }
1088#else
1089 (void)handle;
1090#endif
1091}
1092
1094 http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer,
1095 const SyncStreamConsumerFunc& consumer_fn) noexcept {
1096 /* init user referenced values */
1097 SyncStreamResponseRef res = std::make_shared<SyncStreamResponse>(handle);
1098
1099#ifdef USE_LIBCURL
1100 if( !uri_tk::protocol_supported(url) ) {
1101#endif // USE_LIBCURL
1102 (void)httpPostReq;
1103 (void)consumer_fn;
1106 // buffer.set_end_of_input(true);
1107 const std::string_view scheme = uri_tk::get_scheme(url);
1108 DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
1109 std::string(scheme).c_str(), jau::to_string(uri_tk::supported_protocols(), ",").c_str());
1110 return res;
1111#ifdef USE_LIBCURL
1112 }
1113 curl_glue2_sync_t cg (handle, std::move(httpPostReq), buffer, res, consumer_fn );
1114 read_url_stream_sync(url.c_str(), cg);
1115 return res;
1116#endif // USE_LIBCURL
1117}
1118
1120 http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer,
1121 const AsyncStreamConsumerFunc& consumer_fn) noexcept {
1122 /* init user referenced values */
1123 AsyncStreamResponseRef res = std::make_shared<AsyncStreamResponse>(handle);
1124
1125#ifdef USE_LIBCURL
1126 if( !uri_tk::protocol_supported(url) ) {
1127#endif // USE_LIBCURL
1128 (void)httpPostReq;
1129 (void)buffer;
1130 (void)consumer_fn;
1133 // buffer.set_end_of_input(true);
1134 const std::string_view scheme = uri_tk::get_scheme(url);
1135 DBG_PRINT("Protocol of given uri-scheme '%s' not supported. Supported protocols [%s].",
1136 std::string(scheme).c_str(), jau::to_string(uri_tk::supported_protocols(), ",").c_str());
1137 return res;
1138#ifdef USE_LIBCURL
1139 }
1140
1141 std::unique_ptr<curl_glue2_async_t> cg ( std::make_unique<curl_glue2_async_t>(handle, std::move(httpPostReq),
1142 buffer, res, consumer_fn ) );
1143
1144 res->thread = std::thread(&::read_url_stream_async, url.c_str(), std::move(cg)); // @suppress("Invalid arguments")
1145 return res;
1146#endif // USE_LIBCURL
1147}
1148
1149void jau::io::print_stats(const std::string& prefix, const uint64_t& out_bytes_total, const jau::fraction_i64& td) noexcept {
1150 jau::PLAIN_PRINT(true, "%s: Duration %s s, %s ms", prefix.c_str(),
1151 td.to_string().c_str(), jau::to_decstring(td.to_ms()).c_str());
1152
1153 if( out_bytes_total >= 100'000'000 ) {
1154 jau::PLAIN_PRINT(true, "%s: Size %s MB", prefix.c_str(),
1155 jau::to_decstring(std::llround((double)out_bytes_total/1'000'000.0)).c_str());
1156 } else if( out_bytes_total >= 100'000 ) {
1157 jau::PLAIN_PRINT(true, "%s: Size %s KB", prefix.c_str(),
1158 jau::to_decstring(std::llround((double)out_bytes_total/1'000.0)).c_str());
1159 } else {
1160 jau::PLAIN_PRINT(true, "%s: Size %s B", prefix.c_str(),
1161 jau::to_decstring(out_bytes_total).c_str());
1162 }
1163
1164 const uint64_t _rate_bps = std::llround( (double)out_bytes_total / td.to_double() ); // bytes per second
1165 const uint64_t _rate_bitps = std::llround( ( (double)out_bytes_total * 8.0 ) / td.to_double() ); // bits per second
1166
1167 if( _rate_bitps >= 100'000'000 ) {
1168 jau::PLAIN_PRINT(true, "%s: Bitrate %s Mbit/s, %s MB/s", prefix.c_str(),
1169 jau::to_decstring(std::llround((double)_rate_bitps/1'000'000.0)).c_str(),
1170 jau::to_decstring(std::llround((double)_rate_bps/1'000'000.0)).c_str());
1171 } else if( _rate_bitps >= 100'000 ) {
1172 jau::PLAIN_PRINT(true, "%s: Bitrate %s kbit/s, %s kB/s", prefix.c_str(),
1173 jau::to_decstring(std::llround((double)_rate_bitps/1'000.0)).c_str(),
1174 jau::to_decstring(std::llround((double)_rate_bps/1'000.0)).c_str());
1175 } else {
1176 jau::PLAIN_PRINT(true, "%s: Bitrate %s bit/s, %s B/s", prefix.c_str(),
1177 jau::to_decstring(_rate_bitps).c_str(),
1178 jau::to_decstring(_rate_bps).c_str());
1179 }
1180}
1181
static bool getBooleanProperty(const std::string &name, const bool default_value) noexcept
Returns the boolean value of the environment's variable 'name', or the 'default_value' if the environ...
static environment & get(const std::string &root_prefix_domain="jau") noexcept
Static singleton initialization of this project's environment with the given global root prefix_domai...
File based byte input stream, including named file descriptor.
Abstract byte input stream object.
bool wait_until_completion(const jau::fraction_i64 &timeout) noexcept
Wait until completed() has been reached.
Definition io_util.cpp:460
void notify_complete(const int32_t response_code=200) noexcept
Notify completion, see completed()
Definition io_util.cpp:451
bool completed() const noexcept
Returns whether URL header is completed.
Definition io_util.hpp:214
int32_t response_code() const noexcept
Definition io_util.hpp:216
void interruptReader() noexcept
Interrupt a potentially blocked reader once.
void set_end_of_input(const bool v=true) noexcept
Set End of Input from writer thread, unblocking all read-operations and a potentially currently block...
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
Definition debug.hpp:112
#define DBG_PRINT(...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
Definition debug.hpp:52
#define IRQ_PRINT(...)
Use for unconditional interruption messages, prefix '[elapsed_time] Interrupted @ FILE:LINE FUNC: '.
Definition debug.hpp:118
std::string to_string(const endian_t v) noexcept
Return std::string representation of the given endian.
fraction_timespec getMonotonicTime() noexcept
Returns current monotonic time since Unix Epoch 00:00:00 UTC on 1970-01-01.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
@ std
Denotes a func::std_target_t.
AsyncStreamResponseRef read_url_stream_async(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const AsyncStreamConsumerFunc &consumer_fn) noexcept
Asynchronous URL stream reader using the given AsyncStreamConsumerFunc consumer_fn.
Definition io_util.cpp:1119
jau::ordered_atomic< io_result_t, std::memory_order_relaxed > relaxed_atomic_io_result_t
Definition io_util.hpp:78
SyncStreamResponseRef read_url_stream_sync(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const SyncStreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given SyncStreamConsumerFunc consumer_fn.
Definition io_util.cpp:1093
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
Definition io_util.cpp:206
net_tk_handle create_net_tk_handle() noexcept
creates a reusable handle, free with free_net_tk_handle() after use.
Definition io_util.cpp:1072
uint64_t read_stream(ByteInStream &in, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous byte input stream reader using the given StreamConsumerFunc consumer_fn.
Definition io_util.cpp:61
uint64_t read_url_stream(const std::string &url, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given StreamConsumerFunc consumer_fn.
Definition io_util.cpp:313
void * net_tk_handle
Definition io_util.hpp:239
jau::function< bool(secure_vector< uint8_t > &, bool)> StreamConsumerFunc
Stream consumer function.
Definition io_util.hpp:98
bool is_httpx_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the http or https protocol, i....
Definition io_util.cpp:210
std::string_view get_scheme(const std::string_view &uri) noexcept
Returns the valid uri-scheme from given uri, which is empty if no valid scheme is included.
Definition io_util.cpp:184
std::vector< std::string_view > supported_protocols() noexcept
Returns a list of supported protocol supported by libcurl network protocols, queried at runtime.
Definition io_util.cpp:161
jau::function< bool(AsyncStreamResponse &, const uint8_t *, size_t, bool)> AsyncStreamConsumerFunc
Asynchronous stream consumer function.
Definition io_util.hpp:372
std::shared_ptr< SyncStreamResponse > SyncStreamResponseRef
Definition io_util.hpp:283
std::shared_ptr< AsyncStreamResponse > AsyncStreamResponseRef
Definition io_util.hpp:364
jau::function< bool(SyncStreamResponse &, const uint8_t *, size_t, bool)> SyncStreamConsumerFunc
Synchronous stream consumer function.
Definition io_util.hpp:291
jau::ringbuffer< uint8_t, size_t > ByteRingbuffer
Definition io_util.hpp:50
void free_net_tk_handle(net_tk_handle handle) noexcept
frees a handle after use created by create_net_tk_handle()
Definition io_util.cpp:1081
std::vector< T, jau::callocator_sec< T > > secure_vector
Definition io_util.hpp:46
void print_stats(const std::string &prefix, const uint64_t &out_bytes_total, const jau::fraction_i64 &td) noexcept
Definition io_util.cpp:1149
uint64_t read_file(const std::string &input_file, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous byte input stream reader from given file path using the given StreamConsumerFunc consumer...
Definition io_util.cpp:48
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
Definition io_util.cpp:196
@ timeout
Input or output operation failed due to timeout.
@ NONE
Operation still in progress.
Definition io_util.hpp:73
@ FAILED
Operation failed.
Definition io_util.hpp:70
@ SUCCESS
Operation succeeded.
Definition io_util.hpp:76
std::string bytesHexString(const void *data, const nsize_t length, const bool lsbFirst, const bool lowerCase=true, const bool skipLeading0x=false) noexcept
Produce a hexadecimal string representation of the given byte values.
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
static bool _is_scheme_valid(const std::string_view &scheme) noexcept
Definition io_util.cpp:175
static uint64_t _read_buffer(ByteInStream &in, secure_vector< uint8_t > &buffer) noexcept
Definition io_util.cpp:91
constexpr const jau::fraction_i64 zero(0l, 1lu)
zero is 0/1
std::unique_ptr< PostRequest > PostRequestPtr
Definition io_util.hpp:236
void PLAIN_PRINT(const bool printPrefix, const char *format,...) noexcept
Use for unconditional plain messages, prefix '[elapsed_time] ' if printPrefix == true.
Definition debug.cpp:258
std::cv_status wait_until(std::condition_variable &cv, std::unique_lock< std::mutex > &lock, const fraction_timespec &absolute_time, const bool monotonic=true) noexcept
wait_until causes the current thread to block until the condition variable is notified,...
STL namespace.
Timespec structure using int64_t for its components in analogy to struct timespec_t on 64-bit platfor...
Asynchronous stream response.
Definition io_util.hpp:326
relaxed_atomic_io_result_t result
tracking io_result_t. If set to other than io_result_t::NONE while streaming, streaming is aborted....
Definition io_util.hpp:358
std::thread thread
background reading thread unique-pointer
Definition io_util.hpp:348
relaxed_atomic_bool has_content_length
indicating whether content_length is known from server
Definition io_util.hpp:352
relaxed_atomic_uint64 total_read
tracking the total_read
Definition io_util.hpp:356
relaxed_atomic_uint64 content_length
content_length tracking the content_length
Definition io_util.hpp:354
url_header_resp header_resp
synchronized URL header response completion
Definition io_util.hpp:350
Synchronous stream response.
Definition io_util.hpp:247
uint64_t total_read
tracking the total_read
Definition io_util.hpp:275
uint64_t content_length
content_length tracking the content_length
Definition io_util.hpp:273
url_header_resp header_resp
synchronized URL header response completion
Definition io_util.hpp:269
relaxed_atomic_io_result_t result
tracking io_result_t. If set to other than io_result_t::NONE while streaming, streaming is aborted....
Definition io_util.hpp:277
bool has_content_length
indicating whether content_length is known from server
Definition io_util.hpp:271
std::unordered_map< std::string, std::string > header
Definition io_util.hpp:233
CXX_ALWAYS_INLINE _Tp fetch_add(_Tp __i) noexcept
CXX_ALWAYS_INLINE _Tp load() const noexcept