jaulib v1.3.6
Jau Support Library (C++, Java, ..)
Loading...
Searching...
No Matches
byte_stream.cpp
Go to the documentation of this file.
1/*
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2021-2023 Gothel Software e.K.
4 *
5 * ByteInStream, ByteInStream_SecMemory and ByteInStream_istream are derived from Botan under same license:
6 * - Copyright (c) 1999-2007 Jack Lloyd
7 * - Copyright (c) 2005 Matthew Gregan
8 *
9 * Permission is hereby granted, free of charge, to any person obtaining
10 * a copy of this software and associated documentation files (the
11 * "Software"), to deal in the Software without restriction, including
12 * without limitation the rights to use, copy, modify, merge, publish,
13 * distribute, sublicense, and/or sell copies of the Software, and to
14 * permit persons to whom the Software is furnished to do so, subject to
15 * the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be
18 * included in all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
24 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
25 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 */
28
29// #include <botan_all.h>
30
31#include <jau/cpuid.hpp>
32#include <jau/debug.hpp>
33#include <jau/byte_stream.hpp>
34
35extern "C" {
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <sys/stat.h>
39 #include <fcntl.h>
40}
41
42#ifdef USE_LIBCURL
43 #include <curl/curl.h>
44#endif
45
46#include <thread>
47#include <pthread.h>
48
49#ifndef O_BINARY
50#define O_BINARY 0
51#endif
52#ifndef O_NONBLOCK
53#define O_NONBLOCK 0
54#endif
55
56using namespace jau::io;
57using namespace jau::fractions_i64_literals;
58using namespace jau::int_literals;
59
60#if defined(__FreeBSD__)
61 typedef off_t off64_t;
62 #define __posix_openat64 ::openat
63 #define __posix_lseek64 ::lseek
64#else
65 #define __posix_openat64 ::openat64
66 #define __posix_lseek64 ::lseek64
67#endif
68
69#ifdef USE_LIBCURL
70 const size_t jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE = 2_uz * (size_t)CURL_MAX_WRITE_SIZE;
71#else
72 const size_t jau::io::BEST_URLSTREAM_RINGBUFFER_SIZE = 2_uz * 16384_uz;
73#endif
74
75inline constexpr void copy_mem(void* out, const void* in, size_t n) noexcept {
76 if(in != nullptr && out != nullptr && n > 0) {
77 std::memmove(out, in, sizeof(uint8_t)*n);
78 }
79}
80
81bool ByteInStream::read(uint8_t& out) noexcept {
82 return 1 == read(&out, 1);
83}
84
85bool ByteInStream::peek(uint8_t& out) noexcept {
86 return 1 == peek(&out, 1, 0);
87}
88
89size_t ByteInStream::discard(size_t n) noexcept {
90 uint8_t buf[1024] = { 0 };
91 size_t discarded = 0;
92
93 while(n)
94 {
95 const size_t got = read(buf, std::min(n, sizeof(buf)));
96 if( 0 == got ) {
97 break;
98 }
99 discarded += got;
100 n -= got;
101 }
102
103 return discarded;
104}
105
106size_t ByteInStream_SecMemory::read(void* out, size_t length) noexcept {
107 if( 0 == length || !good() ) {
108 return 0;
109 }
110 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
111 copy_mem(out, m_source.data() + m_offset, got);
112 m_offset += got;
113 if( m_source.size() == m_offset ) {
115 }
116 return got;
117}
118
119bool ByteInStream_SecMemory::available(size_t n) noexcept {
120 return m_source.size() - m_offset >= n;
121}
122
123size_t ByteInStream_SecMemory::peek(void* out, size_t length, size_t peek_offset) noexcept {
124 const size_t bytes_left = m_source.size() - m_offset;
125 if(peek_offset >= bytes_left) {
126 return 0;
127 }
128 const size_t got = std::min(bytes_left - peek_offset, length);
129 copy_mem(out, &m_source[m_offset + peek_offset], got);
130 return got;
131}
132
134: m_source(cast_char_ptr_to_uint8(in.data()),
135 cast_char_ptr_to_uint8(in.data()) + in.length()),
136 m_offset(0)
137{ }
138
140 m_source.clear();
141 m_offset = 0;
143}
144
145std::string ByteInStream_SecMemory::to_string() const noexcept {
146 return "ByteInStream_SecMemory[content size "+jau::to_decstring(m_source.size())+
147 ", consumed "+jau::to_decstring(m_offset)+
148 ", available "+jau::to_decstring(m_source.size()-m_offset)+
149 ", iostate["+jau::io::to_string(rdstate())+
150 "]]";
151}
152
153size_t ByteInStream_File::read(void* out, size_t length) noexcept {
154 if( 0 == length || !good() ) {
155 return 0;
156 }
157 uint8_t* out_u8 = static_cast<uint8_t*>(out);
158 size_t total = 0;
159 while( total < length ) {
160 ssize_t len;
161 while ( ( len = ::read(m_fd, out_u8+total, length-total) ) < 0 ) {
162 if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) {
163 // cont temp unavail or interruption
164 // unlikely for regular files and we open w/o O_NONBLOCK
165 // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
166 // - EINTR (signal)
167 continue;
168 }
169 // Check errno == ETIMEDOUT ??
171 DBG_PRINT("ByteInStream_File::read: Error occurred in %s, errno %d %s", to_string().c_str(), errno, strerror(errno));
172 return 0;
173 }
174 total += static_cast<size_t>(len);
175 if( 0 == len || ( m_has_content_length && m_bytes_consumed + total >= m_content_size ) ) {
176 setstate_impl( iostate::eofbit ); // Note: std::istream also sets iostate::failbit on eof, we don't.
177 break;
178 }
179 }
180 m_bytes_consumed += total;
181 return total;
182}
183
184size_t ByteInStream_File::peek(void* out, size_t length, size_t offset) noexcept {
185 if( 0 == length || !good() ||
186 ( sizeof(size_t) >= sizeof(off64_t) && offset > std::numeric_limits<off64_t>::max() ) ||
187 ( m_has_content_length && m_content_size - m_bytes_consumed < offset + 1 /* min number of bytes to read */ ) ) {
188 return 0;
189 }
190 size_t got = 0;
191
192 off64_t abs_pos = 0;
193 if( 0 < offset ) {
194 abs_pos = __posix_lseek64(m_fd, static_cast<off64_t>(offset), SEEK_CUR);
195 if( 0 > abs_pos ) {
197 DBG_PRINT("ByteInStream_File::peek: Error occurred (offset1 %zd) in %s, errno %d %s",
198 offset, to_string().c_str(), errno, strerror(errno));
199 return 0;
200 }
201 }
202 if( abs_pos == static_cast<off64_t>(offset) ) {
203 ssize_t len = 0;
204 while ( ( len = ::read(m_fd, out, length) ) < 0 ) {
205 if ( errno == EAGAIN || errno == EINTR ) {
206 // cont temp unavail or interruption
207 continue;
208 }
209 // Check errno == ETIMEDOUT ??
211 DBG_PRINT("ByteInStream_File::peak: Error occurred (read) in %s, errno %d %s", to_string().c_str(), errno, strerror(errno));
212 return 0;
213 }
214 got = len; // potentially zero bytes, i.e. eof
215 }
216 if( __posix_lseek64(m_fd, static_cast<off64_t>(m_bytes_consumed), SEEK_SET) < 0 ) {
217 // even though we were able to fetch the desired data above, let's fail if position reset fails
219 DBG_PRINT("ByteInStream_File::peek: Error occurred (offset2 %zd) in %s, errno %d %s",
220 offset, to_string().c_str(), errno, strerror(errno));
221 return 0;
222 }
223 return got;
224}
225
226bool ByteInStream_File::available(size_t n) noexcept {
227 return is_open() && good() && ( !m_has_content_length || m_content_size - m_bytes_consumed >= (uint64_t)n );
228};
229
231: ByteInStream(),
232 stats(fd), m_fd(-1), m_has_content_length(false), m_content_size(0), m_bytes_consumed(0)
233{
234 if( !stats.exists() || !stats.has_access() ) {
235 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
236 DBG_PRINT("ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(), to_string().c_str());
237 } else {
238 m_has_content_length = stats.has( jau::fs::file_stats::field_t::size );
239 m_content_size = m_has_content_length ? stats.size() : 0;
240 m_fd = ::dup(fd);
241 if ( 0 > m_fd ) {
242 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
243 DBG_PRINT("ByteInStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(), to_string().c_str());
244 }
245 }
246}
247
248ByteInStream_File::ByteInStream_File(const int dirfd, const std::string& path) noexcept
249: ByteInStream(),
250 stats(), m_fd(-1), m_has_content_length(false), m_content_size(0), m_bytes_consumed(0)
251{
253 // cut of leading `file://`
254 std::string path2 = path.substr(7);
255 stats = jau::fs::file_stats(dirfd, path2);
256 } else {
257 stats = jau::fs::file_stats(dirfd, path);
258 }
259 if( !stats.exists() || !stats.has_access() ) {
260 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
261 DBG_PRINT("ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(), to_string().c_str());
262 } else {
263 if( stats.has( jau::fs::file_stats::field_t::size ) ) {
264 m_has_content_length = true;
265 m_content_size = stats.size();
266 } else {
267 m_has_content_length = false;
268 m_content_size = 0;
269 }
270 // O_NONBLOCK, is useless on files and counter to this class logic
271 const int src_flags = O_RDONLY|O_BINARY|O_NOCTTY;
272 if( stats.has_fd() ) {
273 m_fd = ::dup( stats.fd() );
274 } else {
275 m_fd = __posix_openat64(dirfd, stats.path().c_str(), src_flags);
276 }
277 if ( 0 > m_fd ) {
278 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
279 DBG_PRINT("ByteInStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(), to_string().c_str());
280 }
281 }
282}
283
284ByteInStream_File::ByteInStream_File(const std::string& path) noexcept
285: ByteInStream_File(AT_FDCWD, path) {}
286
288 if( 0 <= m_fd ) {
289 ::close(m_fd);
290 m_fd = -1;
292 }
293}
294
295std::string ByteInStream_File::to_string() const noexcept {
296 return "ByteInStream_File[content_length "+( has_content_size() ? jau::to_decstring(m_content_size) : "n/a" )+
297 ", consumed "+jau::to_decstring(m_bytes_consumed)+
298 ", available "+jau::to_decstring(get_available())+
299 ", fd "+std::to_string(m_fd)+
300 ", iostate["+jau::io::to_string(rdstate())+
301 "], "+stats.to_string()+
302 "]";
303}
304
305
307: m_url(std::move(url)), m_timeout(timeout), m_buffer(BEST_URLSTREAM_RINGBUFFER_SIZE),
308 m_header_sync(), m_has_content_length( false ), m_content_size( 0 ),
309 m_total_xfered( 0 ), m_result( io::async_io_result_t::NONE ),
310 m_bytes_consumed(0)
311
312{
313 m_url_thread = read_url_stream(m_url, m_buffer, m_header_sync, m_has_content_length, m_content_size, m_total_xfered, m_result);
314 if( nullptr == m_url_thread ) {
315 // url protocol not supported
316 m_result = async_io_result_t::FAILED;
317 }
318}
319
320void ByteInStream_URL::close() noexcept {
321 DBG_PRINT("ByteInStream_URL: close.0 %s, %s", id().c_str(), to_string_int().c_str());
322
323 if( async_io_result_t::NONE == m_result ) {
324 m_result = async_io_result_t::SUCCESS; // signal end of streaming
325 }
326
327 m_buffer.close( true /* zeromem */); // also unblocks all r/w ops
328 if( nullptr != m_url_thread && m_url_thread->joinable() ) {
329 DBG_PRINT("ByteInStream_URL: close.1 %s, %s", id().c_str(), m_buffer.toString().c_str());
330 m_url_thread->join();
331 }
332 m_url_thread = nullptr;
333 DBG_PRINT("ByteInStream_URL: close.X %s, %s", id().c_str(), to_string_int().c_str());
334}
335
336bool ByteInStream_URL::available(size_t n) noexcept {
337 if( !good() || async_io_result_t::NONE != m_result ) {
338 // url thread ended, only remaining bytes in buffer available left
339 return m_buffer.size() >= n;
340 }
341 m_header_sync.wait_until_completion(m_timeout);
342 if( m_has_content_length && m_content_size - m_bytes_consumed < n ) {
343 return false;
344 }
345 // I/O still in progress, we have to poll until data is available or timeout
346 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
347 bool timeout_occured;
348 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
349 if( avail < n ) {
350 if( timeout_occured ) {
352 if( async_io_result_t::NONE == m_result ) {
353 m_result = async_io_result_t::FAILED;
354 }
355 m_buffer.interruptWriter();
356 }
357 return false;
358 } else {
359 return true;
360 }
361}
362
363bool ByteInStream_URL::is_open() const noexcept {
364 // url thread has not ended or remaining bytes in buffer available left
365 return async_io_result_t::NONE == m_result || m_buffer.size() > 0;
366}
367
368size_t ByteInStream_URL::read(void* out, size_t length) noexcept {
369 m_header_sync.wait_until_completion(m_timeout);
370 if( 0 == length || !good() ) {
371 return 0;
372 }
373 bool timeout_occured = false;
374 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
375 const size_t got = m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured);
376 m_bytes_consumed += got;
377 if( timeout_occured ) {
379 if( async_io_result_t::NONE == m_result ) {
380 m_result = async_io_result_t::FAILED;
381 }
382 m_buffer.interruptWriter();
383 }
384 // DBG_PRINT("ByteInStream_URL::read: size %zu/%zu bytes, %s", got, length, to_string_int().c_str() );
385 return got;
386}
387
388size_t ByteInStream_URL::peek(void* out, size_t length, size_t peek_offset) noexcept {
389 (void)out;
390 (void)length;
391 (void)peek_offset;
392 ERR_PRINT("ByteInStream_URL::peek not implemented");
393 return 0;
394}
395
397 if ( ( async_io_result_t::NONE != m_result && m_buffer.isEmpty() ) ||
398 ( m_has_content_length && m_bytes_consumed >= m_content_size ) )
399 {
401 }
402 if( async_io_result_t::FAILED == m_result ) {
404 }
405 return rdstate_impl();
406}
407
408std::string ByteInStream_URL::to_string_int() const noexcept {
409 return m_url+", Url[content_length "+( has_content_size() ? jau::to_decstring(m_content_size.load()) : "n/a" )+
410 ", xfered "+jau::to_decstring(m_total_xfered.load())+
411 ", result "+std::to_string((int8_t)m_result.load())+
412 "], consumed "+jau::to_decstring(m_bytes_consumed)+
413 ", available "+jau::to_decstring(get_available())+
414 ", iostate["+jau::io::to_string(rdstate())+
415 "], "+m_buffer.toString();
416}
417std::string ByteInStream_URL::to_string() const noexcept {
418 return "ByteInStream_URL["+to_string_int()+"]";
419}
420
421std::unique_ptr<ByteInStream> jau::io::to_ByteInStream(const std::string& path_or_uri, jau::fraction_i64 timeout) noexcept {
422 if( !jau::io::uri_tk::is_local_file_protocol(path_or_uri) &&
424 {
425 std::unique_ptr<ByteInStream> res = std::make_unique<ByteInStream_URL>(path_or_uri, timeout);
426 if( nullptr != res && !res->fail() ) {
427 return res;
428 }
429 }
430 std::unique_ptr<ByteInStream> res = std::make_unique<ByteInStream_File>(path_or_uri);
431 if( nullptr != res && !res->fail() ) {
432 return res;
433 }
434 return nullptr;
435}
436
438: m_id(std::move(id_name)), m_timeout(timeout), m_buffer(BEST_URLSTREAM_RINGBUFFER_SIZE),
439 m_has_content_length( false ), m_content_size( 0 ), m_total_xfered( 0 ), m_result( io::async_io_result_t::NONE ),
440 m_bytes_consumed(0)
441{ }
442
444 DBG_PRINT("ByteInStream_Feed: close.0 %s, %s", id().c_str(), to_string_int().c_str());
445
446 if( async_io_result_t::NONE == m_result ) {
447 m_result = async_io_result_t::SUCCESS; // signal end of streaming
448 }
449 m_buffer.close( true /* zeromem */); // also unblocks all r/w ops
450 DBG_PRINT("ByteInStream_Feed: close.X %s, %s", id().c_str(), to_string_int().c_str());
451}
452
453bool ByteInStream_Feed::available(size_t n) noexcept {
454 if( !good() || async_io_result_t::NONE != m_result ) {
455 // feeder completed, only remaining bytes in buffer available left
456 return m_buffer.size() >= n;
457 }
458 if( m_has_content_length && m_content_size - m_bytes_consumed < n ) {
459 return false;
460 }
461 // I/O still in progress, we have to poll until data is available or timeout
462 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
463 bool timeout_occured;
464 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
465 if( avail < n ) {
466 if( timeout_occured ) {
468 if( async_io_result_t::NONE == m_result ) {
469 m_result = async_io_result_t::FAILED;
470 }
471 m_buffer.interruptWriter();
472 }
473 return false;
474 } else {
475 return true;
476 }
477}
478
479bool ByteInStream_Feed::is_open() const noexcept {
480 // feeder has not ended or remaining bytes in buffer available left
481 return async_io_result_t::NONE == m_result || m_buffer.size() > 0;
482}
483
484size_t ByteInStream_Feed::read(void* out, size_t length) noexcept {
485 if( 0 == length || !good() ) {
486 return 0;
487 }
488 bool timeout_occured = false;
489 // set_eof() unblocks ringbuffer via set_end_of_input(true) permanently, hence blocking call on !m_has_content_length is OK.
490 const size_t got = m_buffer.getBlocking(static_cast<uint8_t*>(out), length, 1, m_timeout, timeout_occured);
491 m_bytes_consumed += got;
492 if( timeout_occured ) {
494 if( async_io_result_t::NONE == m_result ) {
495 m_result = async_io_result_t::FAILED;
496 }
497 m_buffer.interruptWriter();
498 }
499 // DBG_PRINT("ByteInStream_Feed::read: size %zu/%zu bytes, timeout_occured %d, %s", got, length, timeout_occured, to_string_int().c_str() );
500 return got;
501}
502
503size_t ByteInStream_Feed::peek(void* out, size_t length, size_t peek_offset) noexcept {
504 (void)out;
505 (void)length;
506 (void)peek_offset;
507 ERR_PRINT("ByteInStream_Feed::peek not implemented");
508 return 0;
509}
510
512 if ( ( async_io_result_t::NONE != m_result && m_buffer.isEmpty() ) ||
513 ( m_has_content_length && m_bytes_consumed >= m_content_size ) )
514 {
516 }
517 if( async_io_result_t::FAILED == m_result ) {
519 }
520 return rdstate_impl();
521}
522
523bool ByteInStream_Feed::write(uint8_t in[], size_t length, const jau::fraction_i64& timeout) noexcept {
524 if( 0 < length && ( good() && async_io_result_t::NONE == m_result ) ) { // feeder still running
525 bool timeout_occured;
526 if( m_buffer.putBlocking(in, in+length, timeout, timeout_occured) ) {
527 m_total_xfered.fetch_add(length);
528 return true;
529 } else {
530 if( timeout_occured ) {
532 m_buffer.interruptWriter();
533 } else {
535 }
536 if( async_io_result_t::NONE == m_result ) {
537 m_result = async_io_result_t::FAILED;
538 }
539 return false;
540 }
541 } else {
542 return false;
543 }
544}
545
547 m_result = result;
548 m_buffer.set_end_of_input(true); // still considering last data, also irqs blocking ringbuffer reader
549}
550
551std::string ByteInStream_Feed::to_string_int() const noexcept {
552 return m_id+", ext[content_length "+( has_content_size() ? jau::to_decstring(m_content_size.load()) : "n/a" )+
553 ", xfered "+jau::to_decstring(m_total_xfered.load())+
554 ", result "+std::to_string((int8_t)m_result.load())+
555 "], consumed "+std::to_string(m_bytes_consumed)+
556 ", available "+std::to_string(get_available())+
557 ", iostate["+jau::io::to_string(rdstate())+
558 "], "+m_buffer.toString();
559}
560
561std::string ByteInStream_Feed::to_string() const noexcept {
562 return "ByteInStream_Feed["+to_string_int()+"]";
563}
564
567 m_parent.close();
568 DBG_PRINT("ByteInStream_Recorder: close.X %s", id().c_str());
569}
570
572 m_buffer.resize(0);
573 m_rec_offset = m_bytes_consumed;
574 m_is_recording = true;
575}
576
578 m_is_recording = false;
579}
580
582 m_is_recording = false;
583 m_buffer.clear();
584 m_rec_offset = 0;
585}
586
587size_t ByteInStream_Recorder::read(void* out, size_t length) noexcept {
588 const size_t consumed_bytes = m_parent.read(out, length);
589 m_bytes_consumed += consumed_bytes;
590 if( is_recording() ) {
591 uint8_t* out_u8 = static_cast<uint8_t*>(out);
592 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
593 }
594 return consumed_bytes;
595}
596
597std::string ByteInStream_Recorder::to_string() const noexcept {
598 return "ByteInStream_Recorder[parent "+m_parent.id()+", recording[on "+std::to_string(m_is_recording)+
599 " offset "+jau::to_decstring(m_rec_offset)+
600 "], consumed "+jau::to_decstring(m_bytes_consumed)+
601 ", iostate["+jau::io::to_string(rdstate())+"]]";
602}
603
604bool ByteOutStream::write(const uint8_t& in) noexcept {
605 return 1 == write(&in, 1);
606}
607
608size_t ByteOutStream_File::write(const void* out, size_t length) noexcept {
609 if( 0 == length || fail() ) {
610 return 0;
611 }
612 const uint8_t* out_u8 = static_cast<const uint8_t*>(out);
613 size_t total = 0;
614 while( total < length ) {
615 ssize_t len;
616 while ( ( len = ::write(m_fd, out_u8+total, length-total) ) < 0 ) {
617 if ( errno == EAGAIN || errno == EINTR ) {
618 // cont temp unavail or interruption
619 // unlikely for regular files and we open w/o O_NONBLOCK
620 // - EAGAIN (file) and EWOULDBLOCK (socket) if blocking
621 // - EINTR (signal)
622 continue;
623 }
624 // Check errno == ETIMEDOUT ??
626 DBG_PRINT("ByteOutStream_File::write: Error occurred in %s, errno %d %s", to_string().c_str(), errno, strerror(errno));
627 return 0;
628 }
629 total += static_cast<size_t>(len);
630 if( 0 == len ) {
632 break;
633 }
634 }
635 m_bytes_consumed += total;
636 return total;
637}
638
640: stats(fd), m_fd(-1),
641 m_bytes_consumed(0)
642{
643 if( !stats.exists() || !stats.has_access() ) {
644 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
645 DBG_PRINT("ByteOutStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(), to_string().c_str());
646 } else {
647 m_fd = ::dup(fd);
648 if ( 0 > m_fd ) {
649 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
650 DBG_PRINT("ByteOutStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(), to_string().c_str());
651 }
652 // open file-descriptor is appending anyways
653 }
654}
655
656ByteOutStream_File::ByteOutStream_File(const int dirfd, const std::string& path, const jau::fs::fmode_t mode) noexcept
657: stats(), m_fd(-1),
658 m_bytes_consumed(0)
659{
661 // cut of leading `file://`
662 std::string path2 = path.substr(7);
663 stats = jau::fs::file_stats(dirfd, path2);
664 } else {
665 stats = jau::fs::file_stats(dirfd, path);
666 }
667 if( ( stats.exists() && !stats.is_file() && !stats.has_fd() ) || !stats.has_access() ) {
668 setstate_impl( iostate::failbit ); // Note: conforming with std::ofstream open (?)
669 DBG_PRINT("ByteOutStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", stats.to_string().c_str(), to_string().c_str());
670 } else {
671 // O_NONBLOCK, is useless on files and counter to this class logic
672 if( stats.has_fd() ) {
673 m_fd = ::dup( stats.fd() );
674 } else {
675 const int dst_flags = ( stats.exists() ? 0 : O_CREAT|O_EXCL ) | O_WRONLY|O_BINARY|O_NOCTTY;
676 m_fd = __posix_openat64(dirfd, stats.path().c_str(), dst_flags, jau::fs::posix_protection_bits( mode ));
677 }
678 if ( 0 > m_fd ) {
679 setstate_impl( iostate::failbit ); // Note: conforming with std::ifstream open
680 DBG_PRINT("ByteOutStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(), to_string().c_str());
681 }
682 if( stats.is_file() ) {
683 off64_t abs_pos = __posix_lseek64(m_fd, 0, SEEK_END);
684 if( 0 > abs_pos ) {
686 ERR_PRINT("Failed to position existing file to end %s, errno %d %s",
687 to_string().c_str(), errno, strerror(errno));
688 }
689 }
690 }
691}
692
693ByteOutStream_File::ByteOutStream_File(const std::string& path, const jau::fs::fmode_t mode) noexcept
694: ByteOutStream_File(AT_FDCWD, path, mode) {}
695
697 if( 0 <= m_fd ) {
698 ::close(m_fd);
699 m_fd = -1;
701 }
702}
703
704std::string ByteOutStream_File::to_string() const noexcept {
705 return "ByteOutStream_File[consumed "+jau::to_decstring(m_bytes_consumed)+
706 ", fd "+std::to_string(m_fd)+
707 ", iostate["+jau::io::to_string(rdstate())+
708 "], "+stats.to_string()+
709 "]";
710}
711
#define O_BINARY
constexpr void copy_mem(void *out, const void *in, size_t n) noexcept
#define __posix_lseek64
#define __posix_openat64
Platform agnostic representation of POSIX ::lstat() and ::stat() for a given pathname.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool is_open() const noexcept override
Checks if the stream has an associated file.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void set_eof(const async_io_result_t result) noexcept
Set end-of-data (EOS), i.e.
ByteInStream_Feed(std::string id_name, const jau::fraction_i64 &timeout) noexcept
Construct a ringbuffer backed externally provisioned byte input stream.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
iostate rdstate() const noexcept override
Returns the current state flags.
bool write(uint8_t in[], size_t length, const jau::fraction_i64 &timeout) noexcept
Write given bytes to the async ringbuffer using explicit given timeout.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
std::string to_string() const noexcept override
size_t read(void *out, size_t length) noexcept override
Read from the source.
std::string to_string() const noexcept override
size_t peek(void *, size_t, size_t) noexcept override
Read from the source but do not modify the internal offset.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t read(void *, size_t) noexcept override
Read from the source.
ByteInStream_File(const std::string &path) noexcept
Construct a stream based byte input stream from filesystem path.
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
bool is_open() const noexcept override
Checks if the stream has an associated file.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void clear_recording() noexcept
Clears the recording.
iostate rdstate() const noexcept override
Returns the current state flags.
void start_recording() noexcept
Starts the recording.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t read(void *, size_t) noexcept override
Read from the source.
std::string to_string() const noexcept override
void stop_recording() noexcept
Stops the recording.
ByteInStream_SecMemory(const std::string &in)
Construct a secure memory source that reads from a string.
std::string to_string() const noexcept override
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t peek(void *, size_t, size_t) noexcept override
Read from the source but do not modify the internal offset.
size_t read(void *, size_t) noexcept override
Read from the source.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
std::string to_string() const noexcept override
size_t read(void *out, size_t length) noexcept override
Read from the source.
ByteInStream_URL(std::string url, const jau::fraction_i64 &timeout) noexcept
Construct a ringbuffer backed Http byte input stream.
iostate rdstate() const noexcept override
Returns the current state flags.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
bool is_open() const noexcept override
Checks if the stream has an associated file.
virtual size_t peek(void *out, size_t length, size_t peek_offset) noexcept=0
Read from the source but do not modify the internal offset.
size_t discard(size_t N) noexcept
Discard the next N bytes of the data.
virtual size_t read(void *out, size_t length) noexcept=0
Read from the source.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
ByteOutStream_File(const std::string &path, const jau::fs::fmode_t mode=jau::fs::fmode_t::def_file_prot) noexcept
Construct a stream based byte output stream from filesystem path, either an existing or new file.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
std::string to_string() const noexcept override
virtual size_t write(const void *in, size_t length) noexcept=0
Write to the data sink.
bool fail() const noexcept
Checks if an error has occurred.
constexpr iostate rdstate_impl() const noexcept
bool good() const noexcept
Checks if no error nor eof() has occurred i.e.
bool timeout() const noexcept
Checks if a timeout (non-recoverable) has occurred.
virtual iostate rdstate() const noexcept
Returns the current state flags.
constexpr void setstate_impl(iostate state) const noexcept
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
Definition debug.hpp:109
#define DBG_PRINT(...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
Definition debug.hpp:52
std::string to_string(const endian_t v) noexcept
Return std::string representation of the given endian.
const uint8_t * cast_char_ptr_to_uint8(const char *s) noexcept
constexpr E & write(E &store, const E bits, bool set) noexcept
If set==true, sets the bits in store, i.e.
fmode_t
Generic file type and POSIX protection mode bits as used in file_stats, touch(), mkdir() etc.
constexpr ::mode_t posix_protection_bits(const fmode_t mask) noexcept
Returns the POSIX protection bits: rwx_all | set_uid | set_gid | sticky, i.e.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
std::unique_ptr< ByteInStream > to_ByteInStream(const std::string &path_or_uri, jau::fraction_i64 timeout=20_s) noexcept
Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supporte...
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
Definition io_util.cpp:204
uint64_t read_url_stream(const std::string &url, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given StreamConsumerFunc consumer_fn.
Definition io_util.cpp:309
async_io_result_t
Asynchronous I/O operation result value.
Definition io_util.hpp:68
const size_t BEST_URLSTREAM_RINGBUFFER_SIZE
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
Definition io_util.cpp:194
@ failbit
Input or output operation failed (formatting or extraction error).
@ eofbit
An input operation reached the end of its stream.
@ timeout
Input or output operation failed due to timeout.
@ NONE
Operation still in progress.
Definition io_util.hpp:73
@ FAILED
Operation failed.
Definition io_util.hpp:70
@ SUCCESS
Operation succeeded.
Definition io_util.hpp:76
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
Definition backtrace.hpp:32
STL namespace.
CXX_ALWAYS_INLINE _Tp load() const noexcept