jaulib v1.3.8
Jau Support Library (C++, Java, ..)
Loading...
Searching...
No Matches
byte_stream.cpp
Go to the documentation of this file.
1/*
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2021-2023 Gothel Software e.K.
4 *
5 * ByteInStream, ByteInStream_SecMemory and ByteInStream_istream are derived from Botan under same license:
6 * - Copyright (c) 1999-2007 Jack Lloyd
7 * - Copyright (c) 2005 Matthew Gregan
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining
10 * a copy of this software and associated documentation files (the
11 * "Software"), to deal in the Software without restriction, including
12 * without limitation the rights to use, copy, modify, merge, publish,
13 * distribute, sublicense, and/or sell copies of the Software, and to
14 * permit persons to whom the Software is furnished to do so, subject to
15 * the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be
18 * included in all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 */
28
29// #include <botan_all.h>
30
31#include <jau/cpuid.hpp>
32#include <jau/debug.hpp>
33#include <jau/byte_stream.hpp>
34
35extern "C" {
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <sys/stat.h>
39 #include <fcntl.h>
40}
41
42#ifdef USE_LIBCURL
43 #include <curl/curl.h>
44#endif
45
46#include <thread>
47#include <pthread.h>
48
49#ifndef O_BINARY
50#define O_BINARY 0
51#endif
52#ifndef O_NONBLOCK
53#define O_NONBLOCK 0
54#endif
55
56using namespace jau::io;
57using namespace jau::fractions_i64_literals;
58using namespace jau::int_literals;
59
60#if defined(__FreeBSD__)
61 typedef off_t off64_t;
62 #define __posix_openat64 ::openat
63 #define __posix_lseek64 ::lseek
64#else
65 #define __posix_openat64 ::openat64
66 #define __posix_lseek64 ::lseek64
67#endif
68
69#ifdef USE_LIBCURL
70 const size_t jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE = 2_uz * (size_t)CURL_MAX_WRITE_SIZE;
71#else
72 const size_t jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE = 2_uz * 16384_uz;
73#endif
74
75inline constexpr void copy_mem(void* out, const void* in, size_t n) noexcept {
76 if(in != nullptr && out != nullptr && n > 0) {
77 std::memmove(out, in, sizeof(uint8_t)*n);
78 }
79}
80
81bool ByteInStream::read(uint8_t& out) noexcept {
82 return 1 == read(&out, 1);
83}
84
85bool ByteInStream::peek(uint8_t& out) noexcept {
86 return 1 == peek(&out, 1, 0);
87}
88
89size_t ByteInStream::discard(size_t n) noexcept {
90 uint8_t buf[1024] = { 0 };
91 size_t discarded = 0;
92
93 while(n)
94 {
95 const size_t got = read(buf, std::min(n, sizeof(buf)));
96 if( 0 == got ) {
97 break;
98 }
99 discarded += got;
100 n -= got;
101 }
102
103 return discarded;
104}
105
106size_t ByteInStream_SecMemory::read(void* out, size_t length) noexcept {
107 if( 0 == length || !good() ) {
108 return 0;
109 }
110 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
111 copy_mem(out, m_source.data() + m_offset, got);
112 m_offset += got;
113 if( m_source.size() == m_offset ) {
115 }
116 return got;
117}
118
119bool ByteInStream_SecMemory::available(size_t n) noexcept {
120 return m_source.size() - m_offset >= n;
121}
122
123size_t ByteInStream_SecMemory::peek(void* out, size_t length, size_t peek_offset) noexcept {
124 const size_t bytes_left = m_source.size() - m_offset;
125 if(peek_offset >= bytes_left) {
126 return 0;
127 }
128 const size_t got = std::min(bytes_left - peek_offset, length);
129 copy_mem(out, &m_source[m_offset + peek_offset], got);
130 return got;
131}
132
134: m_source(cast_char_ptr_to_uint8(in.data()),
135 cast_char_ptr_to_uint8(in.data()) + in.length()),
136 m_offset(0)
137{ }
138
140 m_source.clear();
141 m_offset = 0;
143}
144
145std::string ByteInStream_SecMemory::to_string() const noexcept {
146 return "ByteInStream_SecMemory[content size "+jau::to_decstring(m_source.size())+
147 ", consumed "+jau::to_decstring(m_offset)+
148 ", available "+jau::to_decstring(m_source.size()-m_offset)+
149 ", iostate["+jau::io::to_string(rdstate())+
150 "]]";
151}
152
153size_t ByteInStream_File::read(void* out, size_t length) noexcept {
154 if( 0 == length || !good() ) {
155 return 0;
156 }
157 uint8_t* out_u8 = static_cast<uint8_t*>(out);
158 size_t total = 0;
159 while( total < length ) {
160 ssize_t len;
161 while ( ( len = ::read(m_fd, out_u8+total, length-total) ) < 0 ) {
162 if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) {
163 // cont temp unavail or interruption
164 // unlikely for regular files and we open w/o O_NONBLOCK
165 // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
166 // - EINTR (signal)
167 continue;
168 }
169 // Check errno == ETIMEDOUT ??
171 DBG_PRINT("ByteInStream_File::read: Error occurred in %s, errno %d %s", to_string().c_str(), errno, strerror(errno));
172 return 0;
173 }
174 total += static_cast<size_t>(len);
175 if( 0 == len || ( m_has_content_length && m_bytes_consumed + total >= m_content_size ) ) {
176 setstate_impl( iostate::eofbit ); // Note: std::istream also sets iostate::failbit on eof, we don't.
177 break;
178 }
179 }
180 m_bytes_consumed += total;
181 return total;
182}
183
184size_t ByteInStream_File::peek(void* out, size_t length, size_t offset) noexcept {
185 if( 0 == length || !good() ||
186 ( sizeof(size_t) >= sizeof(off64_t) && offset > std::numeric_limits<off64_t>::max() ) ||
187 ( m_has_content_length && m_content_size - m_bytes_consumed < offset + 1 /* min number of bytes to read */ ) ) {
188 return 0;
189 }
190 size_t got = 0;
191
192 off64_t abs_pos = 0;
193 if( 0 < offset ) {
194 abs_pos = __posix_lseek64(m_fd, static_cast<off64_t>(offset), SEEK_CUR);
195 if( 0 > abs_pos ) {
197 DBG_PRINT("ByteInStream_File::peek: Error occurred (offset1 %zd) in %s, errno %d %s",
198 offset, to_string().c_str(), errno, strerror(errno));
199 return 0;
200 }
201 }
202 if( abs_pos == static_cast<off64_t>(offset) ) {
203 ssize_t len = 0;
204 while ( ( len = ::read(m_fd, out, length) ) < 0 ) {
205 if ( errno == EAGAIN || errno == EINTR ) {
206 // cont temp unavail or interruption
207 continue;
208 }
209 // Check errno == ETIMEDOUT ??
211 DBG_PRINT("ByteInStream_File::peak: Error occurred (read) in %s, errno %d %s", to_string().c_str(), errno, strerror(errno));
212 return 0;
213 }
214 got = len; // potentially zero bytes, i.e. eof
215 }
216 if( __posix_lseek64(m_fd, static_cast<off64_t>(m_bytes_consumed), SEEK_SET) < 0 ) {
217 // even though we were able to fetch the desired data above, let's fail if position reset fails
219 DBG_PRINT("ByteInStream_File::peek: Error occurred (offset2 %zd) in %s, errno %d %s",
220 offset, to_string().c_str(), errno, strerror(errno));
221 return 0;
222 }
223 return got;
224}
225
226bool ByteInStream_File::available(size_t n) noexcept {
227 return is_open() && good() && ( !m_has_content_length || m_content_size - m_bytes_consumed >= (uint64_t)n );
228};
229
231: ByteInStream(),
232 stats(fd), m_fd(-1), m_has_content_length(false), m_content_size(0), m_bytes_consumed(0)
233{
234 if( !stats.exists() || !stats.has_access() ) {
235 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
236 DBG_PRINT("ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(), to_string().c_str());
237 } else {
238 m_has_content_length = stats.has( jau::fs::file_stats::field_t::size );
239 m_content_size = m_has_content_length ? stats.size() : 0;
240 m_fd = ::dup(fd);
241 if ( 0 > m_fd ) {
242 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
243 DBG_PRINT("ByteInStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(), to_string().c_str());
244 }
245 }
246}
247
248ByteInStream_File::ByteInStream_File(const int dirfd, const std::string& path) noexcept
249: ByteInStream(),
250 stats(), m_fd(-1), m_has_content_length(false), m_content_size(0), m_bytes_consumed(0)
251{
253 // cut of leading `file://`
254 std::string path2 = path.substr(7);
255 stats = jau::fs::file_stats(dirfd, path2);
256 } else {
257 stats = jau::fs::file_stats(dirfd, path);
258 }
259 if( !stats.exists() || !stats.has_access() ) {
260 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
261 DBG_PRINT("ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(), to_string().c_str());
262 } else {
263 if( stats.has( jau::fs::file_stats::field_t::size ) ) {
264 m_has_content_length = true;
265 m_content_size = stats.size();
266 } else {
267 m_has_content_length = false;
268 m_content_size = 0;
269 }
270 // O_NONBLOCK, is useless on files and counter to this class logic
271 const int src_flags = O_RDONLY|O_BINARY|O_NOCTTY;
272 if( stats.has_fd() ) {
273 m_fd = ::dup( stats.fd() );
274 } else {
275 m_fd = __posix_openat64(dirfd, stats.path().c_str(), src_flags);
276 }
277 if ( 0 > m_fd ) {
278 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
279 DBG_PRINT("ByteInStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(), to_string().c_str());
280 }
281 }
282}
283
284ByteInStream_File::ByteInStream_File(const std::string& path) noexcept
285: ByteInStream_File(AT_FDCWD, path) {}
286
288 if( 0 <= m_fd ) {
289 ::close(m_fd);
290 m_fd = -1;
292 }
293}
294
295std::string ByteInStream_File::to_string() const noexcept {
296 return "ByteInStream_File[content_length "+( has_content_size() ? jau::to_decstring(m_content_size) : "n/a" )+
297 ", consumed "+jau::to_decstring(m_bytes_consumed)+
298 ", available "+jau::to_decstring(get_available())+
299 ", fd "+std::to_string(m_fd)+
300 ", iostate["+jau::io::to_string(rdstate())+
301 "], "+stats.to_string()+
302 "]";
303}
304
305
307: m_url(std::move(url)), m_timeout(timeout), m_buffer(BEST_URLSTREAM_RINGBUFFER_SIZE),
308 m_stream_resp( read_url_stream_async(nullptr, m_url, /*httpPostReq=*/nullptr, &m_buffer, AsyncStreamConsumerFunc()) ),
309 m_bytes_consumed(0)
310
311{ }
312
313void ByteInStream_URL::close() noexcept {
314 DBG_PRINT("ByteInStream_URL: close.0 %s, %s", id().c_str(), to_string_int().c_str());
315
316 if( m_stream_resp->processing() ) {
317 m_stream_resp->result = io_result_t::SUCCESS; // signal end of streaming
318 }
319
320 m_buffer.close( true /* zeromem */); // also unblocks all r/w ops
321 if( m_stream_resp->thread.joinable() ) {
322 DBG_PRINT("ByteInStream_URL: close.1 %s, %s", id().c_str(), m_buffer.toString().c_str());
323 m_stream_resp->thread.join();
324 }
325 std::thread none;
326 m_stream_resp->thread.swap(none);
327 DBG_PRINT("ByteInStream_URL: close.X %s, %s", id().c_str(), to_string_int().c_str());
328}
329
330bool ByteInStream_URL::available(size_t n) noexcept {
331 if( !good() || !m_stream_resp->processing() ) {
332 // url thread ended, only remaining bytes in buffer available left
333 return m_buffer.size() >= n;
334 }
335 m_stream_resp->header_resp.wait_until_completion(m_timeout);
336 if( m_stream_resp->has_content_length && m_stream_resp->content_length - m_bytes_consumed < n ) {
337 return false;
338 }
339 // I/O still in progress, we have to poll until data is available or timeout
340 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
341 bool timeout_occured;
342 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
343 if( avail < n ) {
344 if( timeout_occured ) {
346 if( m_stream_resp->processing() ) {
347 m_stream_resp->result = io_result_t::FAILED;
348 }
349 m_buffer.interruptWriter();
350 }
351 return false;
352 } else {
353 return true;
354 }
355}
356
357bool ByteInStream_URL::is_open() const noexcept {
358 // url thread has not ended or remaining bytes in buffer available left
359 return m_stream_resp->processing() || m_buffer.size() > 0;
360}
361
362size_t ByteInStream_URL::read(void* out, size_t length) noexcept {
363 m_stream_resp->header_resp.wait_until_completion(m_timeout);
364 if( 0 == length || !good() ) {
365 return 0;
366 }
367 bool timeout_occured = false;
368 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
369 const size_t got = m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured);
370 m_bytes_consumed += got;
371 if( timeout_occured ) {
373 if( m_stream_resp->processing() ) {
374 m_stream_resp->result = io_result_t::FAILED;
375 }
376 m_buffer.interruptWriter();
377 }
378 // DBG_PRINT("ByteInStream_URL::read: size %zu/%zu bytes, %s", got, length, to_string_int().c_str() );
379 return got;
380}
381
382size_t ByteInStream_URL::peek(void* out, size_t length, size_t peek_offset) noexcept {
383 (void)out;
384 (void)length;
385 (void)peek_offset;
386 ERR_PRINT("ByteInStream_URL::peek not implemented");
387 return 0;
388}
389
391 if ( ( !m_stream_resp->processing() && m_buffer.isEmpty() ) ||
392 ( m_stream_resp->has_content_length && m_bytes_consumed >= m_stream_resp->content_length ) )
393 {
395 }
396 if( m_stream_resp->failed() ) {
398 }
399 return rdstate_impl();
400}
401
402std::string ByteInStream_URL::to_string_int() const noexcept {
403 return m_url+", Url[content_length "+( has_content_size() ? jau::to_decstring(m_stream_resp->content_length.load()) : "n/a" )+
404 ", xfered "+jau::to_decstring(m_stream_resp->total_read.load())+
405 ", result "+std::to_string((int8_t)m_stream_resp->result.load())+
406 "], consumed "+jau::to_decstring(m_bytes_consumed)+
407 ", available "+jau::to_decstring(get_available())+
408 ", iostate["+jau::io::to_string(rdstate())+
409 "], "+m_buffer.toString();
410}
411std::string ByteInStream_URL::to_string() const noexcept {
412 return "ByteInStream_URL["+to_string_int()+"]";
413}
414
415std::unique_ptr<ByteInStream> jau::io::to_ByteInStream(const std::string& path_or_uri, jau::fraction_i64 timeout) noexcept {
416 if( !jau::io::uri_tk::is_local_file_protocol(path_or_uri) &&
418 {
419 std::unique_ptr<ByteInStream> res = std::make_unique<ByteInStream_URL>(path_or_uri, timeout);
420 if( nullptr != res && !res->fail() ) {
421 return res;
422 }
423 }
424 std::unique_ptr<ByteInStream> res = std::make_unique<ByteInStream_File>(path_or_uri);
425 if( nullptr != res && !res->fail() ) {
426 return res;
427 }
428 return nullptr;
429}
430
432: m_id(std::move(id_name)), m_timeout(timeout), m_buffer(BEST_URLSTREAM_RINGBUFFER_SIZE),
433 m_has_content_length( false ), m_content_size( 0 ), m_total_xfered( 0 ), m_result( io::io_result_t::NONE ),
434 m_bytes_consumed(0)
435{ }
436
438 DBG_PRINT("ByteInStream_Feed: close.0 %s, %s", id().c_str(), to_string_int().c_str());
439
440 if( io_result_t::NONE == m_result ) {
441 m_result = io_result_t::SUCCESS; // signal end of streaming
442 }
443 m_buffer.close( true /* zeromem */); // also unblocks all r/w ops
444 DBG_PRINT("ByteInStream_Feed: close.X %s, %s", id().c_str(), to_string_int().c_str());
445}
446
447bool ByteInStream_Feed::available(size_t n) noexcept {
448 if( !good() || io_result_t::NONE != m_result ) {
449 // feeder completed, only remaining bytes in buffer available left
450 return m_buffer.size() >= n;
451 }
452 if( m_has_content_length && m_content_size - m_bytes_consumed < n ) {
453 return false;
454 }
455 // I/O still in progress, we have to poll until data is available or timeout
456 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
457 bool timeout_occured;
458 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
459 if( avail < n ) {
460 if( timeout_occured ) {
462 if( io_result_t::NONE == m_result ) {
463 m_result = io_result_t::FAILED;
464 }
465 m_buffer.interruptWriter();
466 }
467 return false;
468 } else {
469 return true;
470 }
471}
472
473bool ByteInStream_Feed::is_open() const noexcept {
474 // feeder has not ended or remaining bytes in buffer available left
475 return io_result_t::NONE == m_result || m_buffer.size() > 0;
476}
477
478size_t ByteInStream_Feed::read(void* out, size_t length) noexcept {
479 if( 0 == length || !good() ) {
480 return 0;
481 }
482 bool timeout_occured = false;
483 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
484 const size_t got = m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured);
485 m_bytes_consumed += got;
486 if( timeout_occured ) {
488 if( io_result_t::NONE == m_result ) {
489 m_result = io_result_t::FAILED;
490 }
491 m_buffer.interruptWriter();
492 }
493 // DBG_PRINT("ByteInStream_Feed::read: size %zu/%zu bytes, timeout_occured %d, %s", got, length, timeout_occured, to_string_int().c_str() );
494 return got;
495}
496
497size_t ByteInStream_Feed::peek(void* out, size_t length, size_t peek_offset) noexcept {
498 (void)out;
499 (void)length;
500 (void)peek_offset;
501 ERR_PRINT("ByteInStream_Feed::peek not implemented");
502 return 0;
503}
504
506 if ( ( io_result_t::NONE != m_result && m_buffer.isEmpty() ) ||
507 ( m_has_content_length && m_bytes_consumed >= m_content_size ) )
508 {
510 }
511 if( io_result_t::FAILED == m_result ) {
513 }
514 return rdstate_impl();
515}
516
517bool ByteInStream_Feed::write(uint8_t in[], size_t length, const jau::fraction_i64& timeout) noexcept {
518 if( 0 < length && ( good() && io_result_t::NONE == m_result ) ) { // feeder still running
519 bool timeout_occured;
520 if( m_buffer.putBlocking(in, in+length, timeout, timeout_occured) ) {
521 m_total_xfered.fetch_add(length);
522 return true;
523 } else {
524 if( timeout_occured ) {
526 m_buffer.interruptWriter();
527 } else {
529 }
530 if( io_result_t::NONE == m_result ) {
531 m_result = io_result_t::FAILED;
532 }
533 return false;
534 }
535 } else {
536 return false;
537 }
538}
539
540void ByteInStream_Feed::set_eof(const io_result_t result) noexcept {
541 m_result = result;
542 m_buffer.set_end_of_input(true); // still considering last data, also irqs blocking ringbuffer reader
543}
544
545std::string ByteInStream_Feed::to_string_int() const noexcept {
546 return m_id+", ext[content_length "+( has_content_size() ? jau::to_decstring(m_content_size.load()) : "n/a" )+
547 ", xfered "+jau::to_decstring(m_total_xfered.load())+
548 ", result "+std::to_string((int8_t)m_result.load())+
549 "], consumed "+std::to_string(m_bytes_consumed)+
550 ", available "+std::to_string(get_available())+
551 ", iostate["+jau::io::to_string(rdstate())+
552 "], "+m_buffer.toString();
553}
554
555std::string ByteInStream_Feed::to_string() const noexcept {
556 return "ByteInStream_Feed["+to_string_int()+"]";
557}
558
561 m_parent.close();
562 DBG_PRINT("ByteInStream_Recorder: close.X %s", id().c_str());
563}
564
566 m_buffer.resize(0);
567 m_rec_offset = m_bytes_consumed;
568 m_is_recording = true;
569}
570
572 m_is_recording = false;
573}
574
576 m_is_recording = false;
577 m_buffer.clear();
578 m_rec_offset = 0;
579}
580
581size_t ByteInStream_Recorder::read(void* out, size_t length) noexcept {
582 const size_t consumed_bytes = m_parent.read(out, length);
583 m_bytes_consumed += consumed_bytes;
584 if( is_recording() ) {
585 uint8_t* out_u8 = static_cast<uint8_t*>(out);
586 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
587 }
588 return consumed_bytes;
589}
590
591std::string ByteInStream_Recorder::to_string() const noexcept {
592 return "ByteInStream_Recorder[parent "+m_parent.id()+", recording[on "+std::to_string(m_is_recording)+
593 " offset "+jau::to_decstring(m_rec_offset)+
594 "], consumed "+jau::to_decstring(m_bytes_consumed)+
595 ", iostate["+jau::io::to_string(rdstate())+"]]";
596}
597
598bool ByteOutStream::write(const uint8_t& in) noexcept {
599 return 1 == write(&in, 1);
600}
601
602size_t ByteOutStream_File::write(const void* out, size_t length) noexcept {
603 if( 0 == length || fail() ) {
604 return 0;
605 }
606 const uint8_t* out_u8 = static_cast<const uint8_t*>(out);
607 size_t total = 0;
608 while( total < length ) {
609 ssize_t len;
610 while ( ( len = ::write(m_fd, out_u8+total, length-total) ) < 0 ) {
611 if ( errno == EAGAIN || errno == EINTR ) {
612 // cont temp unavail or interruption
613 // unlikely for regular files and we open w/o O_NONBLOCK
614 // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
615 // - EINTR (signal)
616 continue;
617 }
618 // Check errno == ETIMEDOUT ??
620 DBG_PRINT("ByteOutStream_File::write: Error occurred in %s, errno %d %s", to_string().c_str(), errno, strerror(errno));
621 return 0;
622 }
623 total += static_cast<size_t>(len);
624 if( 0 == len ) {
626 break;
627 }
628 }
629 m_bytes_consumed += total;
630 return total;
631}
632
634: stats(fd), m_fd(-1),
635 m_bytes_consumed(0)
636{
637 if( !stats.exists() || !stats.has_access() ) {
638 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
639 DBG_PRINT("ByteOutStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(), to_string().c_str());
640 } else {
641 m_fd = ::dup(fd);
642 if ( 0 > m_fd ) {
643 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
644 DBG_PRINT("ByteOutStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(), to_string().c_str());
645 }
646 // open file-descriptor is appending anyways
647 }
648}
649
650ByteOutStream_File::ByteOutStream_File(const int dirfd, const std::string& path, const jau::fs::fmode_t mode) noexcept
651: stats(), m_fd(-1),
652 m_bytes_consumed(0)
653{
655 // cut of leading `file://`
656 std::string path2 = path.substr(7);
657 stats = jau::fs::file_stats(dirfd, path2);
658 } else {
659 stats = jau::fs::file_stats(dirfd, path);
660 }
661 if( ( stats.exists() && !stats.is_file() && !stats.has_fd() ) || !stats.has_access() ) {
662 setstate_impl( iostate::failbit ); // Note: conforming with std::ofstream open (?)
663 DBG_PRINT("ByteOutStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", stats.to_string().c_str(), to_string().c_str());
664 } else {
665 // O_NONBLOCK, is useless on files and counter to this class logic
666 if( stats.has_fd() ) {
667 m_fd = ::dup( stats.fd() );
668 } else {
669 const int dst_flags = ( stats.exists() ? 0 : O_CREAT|O_EXCL ) | O_WRONLY|O_BINARY|O_NOCTTY;
670 m_fd = __posix_openat64(dirfd, stats.path().c_str(), dst_flags, jau::fs::posix_protection_bits( mode ));
671 }
672 if ( 0 > m_fd ) {
673 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
674 DBG_PRINT("ByteOutStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(), to_string().c_str());
675 }
676 if( stats.is_file() ) {
677 off64_t abs_pos = __posix_lseek64(m_fd, 0, SEEK_END);
678 if( 0 > abs_pos ) {
680 ERR_PRINT("Failed to position existing file to end %s, errno %d %s",
681 to_string().c_str(), errno, strerror(errno));
682 }
683 }
684 }
685}
686
687ByteOutStream_File::ByteOutStream_File(const std::string& path, const jau::fs::fmode_t mode) noexcept
688: ByteOutStream_File(AT_FDCWD, path, mode) {}
689
691 if( 0 <= m_fd ) {
692 ::close(m_fd);
693 m_fd = -1;
695 }
696}
697
698std::string ByteOutStream_File::to_string() const noexcept {
699 return "ByteOutStream_File[consumed "+jau::to_decstring(m_bytes_consumed)+
700 ", fd "+std::to_string(m_fd)+
701 ", iostate["+jau::io::to_string(rdstate())+
702 "], "+stats.to_string()+
703 "]";
704}
705
#define O_BINARY
constexpr void copy_mem(void *out, const void *in, size_t n) noexcept
#define __posix_lseek64
#define __posix_openat64
Platform agnostic representation of POSIX ::lstat() and ::stat() for a given pathname.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool is_open() const noexcept override
Checks if the stream has an associated file.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
ByteInStream_Feed(std::string id_name, const jau::fraction_i64 &timeout) noexcept
Construct a ringbuffer backed externally provisioned byte input stream.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
iostate rdstate() const noexcept override
Returns the current state flags.
bool write(uint8_t in[], size_t length, const jau::fraction_i64 &timeout) noexcept
Write given bytes to the async ringbuffer using explicit given timeout.
void set_eof(const io_result_t result) noexcept
Set end-of-data (EOS), i.e.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
std::string to_string() const noexcept override
size_t read(void *out, size_t length) noexcept override
Read from the source.
std::string to_string() const noexcept override
size_t peek(void *, size_t, size_t) noexcept override
Read from the source but do not modify the internal offset.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t read(void *, size_t) noexcept override
Read from the source.
ByteInStream_File(const std::string &path) noexcept
Construct a stream based byte input stream from filesystem path.
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
bool is_open() const noexcept override
Checks if the stream has an associated file.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void clear_recording() noexcept
Clears the recording.
iostate rdstate() const noexcept override
Returns the current state flags.
void start_recording() noexcept
Starts the recording.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t read(void *, size_t) noexcept override
Read from the source.
std::string to_string() const noexcept override
void stop_recording() noexcept
Stops the recording.
ByteInStream_SecMemory(const std::string &in)
Construct a secure memory source that reads from a string.
std::string to_string() const noexcept override
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t peek(void *, size_t, size_t) noexcept override
Read from the source but do not modify the internal offset.
size_t read(void *, size_t) noexcept override
Read from the source.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
std::string to_string() const noexcept override
size_t read(void *out, size_t length) noexcept override
Read from the source.
ByteInStream_URL(std::string url, const jau::fraction_i64 &timeout) noexcept
Construct a ringbuffer backed Http byte input stream.
iostate rdstate() const noexcept override
Returns the current state flags.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
bool is_open() const noexcept override
Checks if the stream has an associated file.
virtual size_t peek(void *out, size_t length, size_t peek_offset) noexcept=0
Read from the source but do not modify the internal offset.
size_t discard(size_t N) noexcept
Discard the next N bytes of the data.
virtual size_t read(void *out, size_t length) noexcept=0
Read from the source.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
ByteOutStream_File(const std::string &path, const jau::fs::fmode_t mode=jau::fs::fmode_t::def_file_prot) noexcept
Construct a stream based byte output stream from filesystem path, either an existing or new file.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
std::string to_string() const noexcept override
virtual size_t write(const void *in, size_t length) noexcept=0
Write to the data sink.
bool fail() const noexcept
Checks if an error has occurred.
constexpr iostate rdstate_impl() const noexcept
bool good() const noexcept
Checks if no error nor eof() has occurred i.e.
bool timeout() const noexcept
Checks if a timeout (non-recoverable) has occurred.
virtual iostate rdstate() const noexcept
Returns the current state flags.
constexpr void setstate_impl(iostate state) const noexcept
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
Definition debug.hpp:112
#define DBG_PRINT(...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
Definition debug.hpp:52
std::string to_string(const endian_t v) noexcept
Return std::string representation of the given endian.
const uint8_t * cast_char_ptr_to_uint8(const char *s) noexcept
constexpr E & write(E &store, const E bits, bool set) noexcept
If set==true, sets the bits in store, i.e.
fmode_t
Generic file type and POSIX protection mode bits as used in file_stats, touch(), mkdir() etc.
constexpr ::mode_t posix_protection_bits(const fmode_t mask) noexcept
Returns the POSIX protection bits: rwx_all | set_uid | set_gid | sticky, i.e.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
AsyncStreamResponseRef read_url_stream_async(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const AsyncStreamConsumerFunc &consumer_fn) noexcept
Asynchronous URL stream reader using the given AsyncStreamConsumerFunc consumer_fn.
Definition io_util.cpp:1119
std::unique_ptr< ByteInStream > to_ByteInStream(const std::string &path_or_uri, jau::fraction_i64 timeout=20_s) noexcept
Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supporte...
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
Definition io_util.cpp:206
io_result_t
I/O operation result value.
Definition io_util.hpp:68
jau::function< bool(AsyncStreamResponse &, const uint8_t *, size_t, bool)> AsyncStreamConsumerFunc
Asynchronous stream consumer function.
Definition io_util.hpp:372
std::string toString(io_result_t v) noexcept
Definition io_util.hpp:80
const size_t BEST_URLSTREAM_RINGBUFFER_SIZE
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
Definition io_util.cpp:196
@ none
No error occurred nor has EOS being reached.
@ failbit
Input or output operation failed (formatting or extraction error).
@ eofbit
An input operation reached the end of its stream.
@ timeout
Input or output operation failed due to timeout.
@ NONE
Operation still in progress.
Definition io_util.hpp:73
@ FAILED
Operation failed.
Definition io_util.hpp:70
@ SUCCESS
Operation succeeded.
Definition io_util.hpp:76
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
Definition backtrace.hpp:32
STL namespace.
relaxed_atomic_uint64 content_length
content_length tracking the content_length
Definition io_util.hpp:354
CXX_ALWAYS_INLINE _Tp load() const noexcept