43 #include <sys/types.h>
49 #include <curl/curl.h>
66#if defined(__FreeBSD__)
67 typedef off_t off64_t;
68 #define __posix_openat64 ::openat
69 #define __posix_lseek64 ::lseek
71 #define __posix_openat64 ::openat64
72 #define __posix_lseek64 ::lseek64
81inline constexpr static void copy_mem(
void* out,
const void* in,
size_t n)
noexcept {
82 if(in !=
nullptr && out !=
nullptr && n > 0) {
83 std::memcpy(out, in,
sizeof(uint8_t)*n);
91 uint8_t buf[1024] = { 0 };
96 const size_t got =
read(buf, std::min(n,
sizeof(buf)));
111 }
else if( newPos > m_source.size() || newPos > std::numeric_limits<size_t>::max() ) {
116 m_offset =
static_cast<size_t>(newPos);
117 if( m_mark > newPos ) {
120 if( m_source.size() == m_offset ) {
130 size_t n = std::min(N, m_source.size() - m_offset);
132 if( m_source.size() == m_offset ) {
144 if(
npos == m_mark ) {
147 return m_mark ==
seek(m_mark);
154 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
155 copy_mem(out, m_source.data() + m_offset, got);
157 if( m_source.size() == m_offset ) {
164 return canRead() && m_source.size() - m_offset >= n;
170 const size_t bytes_left = m_source.size() - m_offset;
172 ( peek_offset > std::numeric_limits<size_t>::max() ) ||
173 ( bytes_left < peek_offset + 1 ) ) {
177 const size_t po_sz =
static_cast<size_t>(peek_offset);
178 const size_t peek_len = std::min(bytes_left - po_sz, length);
179 copy_mem(out, &m_source[m_offset + po_sz], peek_len);
187 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
188 copy_mem(m_source.data() + m_offset, out, got);
197 m_offset(0), m_mark(
npos)
207 return "ByteInStream_SecMemory[content size "+
jau::to_decstring(m_source.size())+
217 !m_has_content_length ||
218 ( !
canWrite() && newPos > m_content_size ) ||
219 newPos > std::numeric_limits<off64_t>::max() )
224 if( newPos != m_offset ) {
225 off64_t abs_pos =
__posix_lseek64(m_fd,
static_cast<off64_t
>(newPos), SEEK_SET);
228 ERR_PRINT(
"Failed to seek to position %" PRIu64
" of existing file %s, errno %d %s",
229 newPos,
toString().c_str(), errno, strerror(errno));
233 if( m_mark > m_offset ) {
236 if( m_content_size == m_offset ) {
245 if( m_has_content_length ) {
249 const size_t n = std::min(N, m_content_size - m_offset);
250 const size_t p0 = m_offset;
251 return seek(p0 + n) - p0;
263 if(
npos == m_mark ) {
266 return m_mark ==
seek(m_mark);
273 uint8_t* out_u8 =
static_cast<uint8_t*
>(out);
275 while( total < length ) {
277 while ( ( len =
::read(m_fd, out_u8+total, length-total) ) < 0 ) {
278 if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) {
287 DBG_PRINT(
"ByteInStream_File::read: Error occurred in %s, errno %d %s",
toString().c_str(), errno, strerror(errno));
290 total +=
static_cast<size_t>(len);
291 if( 0 == len || ( m_has_content_length && m_offset + total >= m_content_size ) ) {
301 const size_type bytes_left = ( m_has_content_length ? m_content_size : std::numeric_limits<off64_t>::max() ) - m_offset;
303 ( peek_offset > std::numeric_limits<off64_t>::max() ) ||
304 ( bytes_left < peek_offset + 1 ) ) {
310 if( 0 < peek_offset ) {
311 abs_pos =
__posix_lseek64(m_fd,
static_cast<off64_t
>(peek_offset), SEEK_CUR);
314 DBG_PRINT(
"ByteInStream_File::peek: Error occurred (offset1 %zd) in %s, errno %d %s",
315 peek_offset,
toString().c_str(), errno, strerror(errno));
319 if( abs_pos ==
static_cast<off64_t
>(peek_offset) ) {
321 while ( ( len =
::read(m_fd, out, length) ) < 0 ) {
322 if ( errno == EAGAIN || errno == EINTR ) {
328 DBG_PRINT(
"ByteInStream_File::peak: Error occurred (read) in %s, errno %d %s",
toString().c_str(), errno, strerror(errno));
333 if(
__posix_lseek64(m_fd,
static_cast<off64_t
>(m_offset), SEEK_SET) < 0 ) {
336 DBG_PRINT(
"ByteInStream_File::peek: Error occurred (offset2 %zd) in %s, errno %d %s",
337 peek_offset,
toString().c_str(), errno, strerror(errno));
351 const uint8_t* out_u8 =
static_cast<const uint8_t*
>(out);
353 while( total < length ) {
355 while ( ( len =
::write(m_fd, out_u8+total, length-total) ) < 0 ) {
356 if ( errno == EAGAIN || errno == EINTR ) {
365 DBG_PRINT(
"ByteOutStream_File::write: Error occurred in %s, errno %d %s",
toString().c_str(), errno, strerror(errno));
368 total +=
static_cast<size_t>(len);
375 if( m_has_content_length && m_offset > m_content_size ) {
376 m_content_size = m_offset;
387 if( !stats.is_file() ) {
396 if( cur_pos2 != cur_pos ) {
397 DBG_PRINT(
"ByteInStream_File::file_size: Error rewinding to current position failed, orig-pos %" PRIi64
" -> %" PRIi64
", errno %d %s.",
399 errno, strerror(errno));
408 m_stats(
fd), m_fd(-1), m_has_content_length(
false), m_content_size(0),
409 m_offset(0), m_mark(
npos)
411 if( !m_stats.exists() || !m_stats.has_access() ) {
413 DBG_PRINT(
"ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", m_stats.toString().c_str(),
toString().c_str());
419 DBG_PRINT(
"ByteInStream_File::ctor: Error occurred in %s, %s", m_stats.toString().c_str(),
toString().c_str());
425 ERR_PRINT(
"Failed to read position of existing file %s, errno %d %s",
426 toString().c_str(), errno, strerror(errno));
431 m_has_content_length =
true;
437 m_stats(), m_fd(-1), m_has_content_length(
false), m_content_size(0),
438 m_offset(0), m_mark(
npos)
442 std::string path2 = path.substr(7);
452 if( ( m_stats.exists() && !m_stats.is_file() && !m_stats.has_fd() ) || !m_stats.has_access() ) {
454 DBG_PRINT(
"ByteStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", m_stats.toString().c_str(),
toString().c_str());
457 if( !
canWrite() && !m_stats.exists() ) {
459 DBG_PRINT(
"ByteStream_File::ctor: Error, can't open non-existing read-only file in %s, %s", m_stats.toString().c_str(),
toString().c_str());
462 bool truncated =
false;
463 bool just_opened =
false;
464 if( m_stats.has_fd() ) {
465 m_fd = ::dup( m_stats.fd() );
476 if( !m_stats.exists() ) {
487 DBG_PRINT(
"ByteInStream_File::ctor: Error while opening %s, %s", m_stats.toString().c_str(),
toString().c_str());
494 const off64_t cur_pos = just_opened || !m_stats.is_file() ? 0 :
__posix_lseek64(m_fd, 0, SEEK_CUR);
497 ERR_PRINT(
"Failed to read position of existing file %s, errno %d %s",
498 toString().c_str(), errno, strerror(errno));
503 m_has_content_length =
true;
509 ERR_PRINT(
"Failed to position existing file to end %s, errno %d %s",
510 toString().c_str(), errno, strerror(errno));
538 ", fd "+std::to_string(m_fd)+
541 "], "+m_stats.toString()+
550 m_offset(0), m_mark(
npos), m_rewindbuf()
554 DBG_PRINT(
"ByteInStream_URL: close.0 %s, %s",
id().c_str(), to_string_int().c_str());
556 if( m_stream_resp->processing() ) {
560 m_buffer.close(
true );
561 if( m_stream_resp->thread.joinable() ) {
562 DBG_PRINT(
"ByteInStream_URL: close.1 %s, %s",
id().c_str(), m_buffer.toString().c_str());
563 m_stream_resp->thread.join();
566 m_stream_resp->thread.swap(
none);
567 DBG_PRINT(
"ByteInStream_URL: close.X %s, %s",
id().c_str(), to_string_int().c_str());
571 if( !
good() || !
canRead() || !m_stream_resp->processing() ) {
573 return m_buffer.size() >= n;
575 m_stream_resp->header_resp.wait_until_completion(m_timeout);
576 if( m_stream_resp->has_content_length && m_stream_resp->content_length - m_offset < n ) {
581 bool timeout_occured;
582 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
584 if( timeout_occured ) {
586 if( m_stream_resp->processing() ) {
589 m_buffer.interruptWriter();
599 return m_stream_resp->processing() || m_buffer.size() > 0;
603 m_stream_resp->header_resp.wait_until_completion(m_timeout);
604 return m_stream_resp->has_content_length;
608 m_stream_resp->header_resp.wait_until_completion(m_timeout);
612 }
else if( m_rewindbuf.covered(m_mark, newPos) ){
618 }
else if( newPos > length ) {
620 }
else if(newPos >= m_offset) {
625 DBG_PRINT(
"ByteInStream_URL::seek newPos %" PRIu64
"< position %" PRIu64
" not implemented", newPos, m_offset);
635 if( m_rewindbuf.setMark(m_mark, m_offset, readLimit) ) {
644 if(
npos == m_mark ) {
647 return m_mark ==
seek(m_mark);
651 m_stream_resp->header_resp.wait_until_completion(m_timeout);
655 return m_rewindbuf.read(m_mark, m_offset, newData, out, length);
662 DBG_PRINT(
"ByteInStream_URL::peek not implemented");
667 if ( ( !m_stream_resp->processing() && m_buffer.isEmpty() ) ||
668 ( m_stream_resp->has_content_length && m_offset >= m_stream_resp->content_length ) )
672 if( m_stream_resp->failed() ) {
678std::string ByteInStream_URL::to_string_int() const noexcept {
681 ", result "+
std::
to_string((int8_t)m_stream_resp->result.load())+
690 return "ByteInStream_URL["+to_string_int()+
"]";
697 std::unique_ptr<ByteStream> res = std::make_unique<ByteInStream_URL>(path_or_uri,
timeout);
698 if(
nullptr != res && !res->fail() ) {
702 std::unique_ptr<ByteStream> res = std::make_unique<ByteStream_File>(path_or_uri,
iomode_t::read);
703 if(
nullptr != res && !res->fail() ) {
712 m_has_content_length(
false ), m_content_size( 0 ), m_total_xfered( 0 ), m_result(
io::io_result_t::NONE ),
717 DBG_PRINT(
"ByteInStream_Feed: close.0 %s, %s",
id().c_str(), toStringInt().c_str());
722 m_buffer.close(
true );
723 DBG_PRINT(
"ByteInStream_Feed: close.X %s, %s",
id().c_str(), toStringInt().c_str());
729 return m_buffer.size() >= n;
731 if( m_has_content_length && m_content_size - m_offset < n ) {
736 bool timeout_occured;
737 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
739 if( timeout_occured ) {
744 m_buffer.interruptWriter();
761 }
else if( m_rewindbuf.covered(m_mark, newPos) ){
767 }
else if( newPos > length ) {
769 }
else if(newPos >= m_offset) {
774 DBG_PRINT(
"ByteInStream_Feed::seek newPos %" PRIu64
"< position %" PRIu64
" not implemented", newPos, m_offset);
784 if( m_rewindbuf.setMark(m_mark, m_offset, readLimit) ) {
793 if(
npos == m_mark ) {
796 return m_mark ==
seek(m_mark);
803 return m_rewindbuf.read(m_mark, m_offset, newData, out, length);
810 DBG_PRINT(
"ByteInStream_Feed::peek not implemented");
816 ( m_has_content_length && m_offset >= m_content_size ) )
828 bool timeout_occured;
829 const uint8_t *in8 =
reinterpret_cast<const uint8_t*
>(in);
830 if( m_buffer.putBlocking(in8, in8+length,
timeout, timeout_occured) ) {
831 m_total_xfered.fetch_add(length);
834 if( timeout_occured ) {
836 m_buffer.interruptWriter();
852 m_buffer.set_end_of_input(
true);
855std::string ByteInStream_Feed::toStringInt() const noexcept {
867 return "ByteInStream_Feed["+toStringInt()+
"]";
873 DBG_PRINT(
"ByteInStream_Recorder: close.X %s",
id().c_str());
878 m_rec_offset = m_offset;
879 m_is_recording =
true;
883 m_is_recording =
false;
887 m_is_recording =
false;
893 const size_t consumed_bytes = m_parent.read(out, length);
894 m_offset += consumed_bytes;
896 uint8_t* out_u8 =
static_cast<uint8_t*
>(out);
897 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
899 return consumed_bytes;
903 const size_t consumed_bytes = m_parent.write(out, length);
904 m_offset += consumed_bytes;
906 const uint8_t* out_u8 =
reinterpret_cast<const uint8_t*
>(out);
907 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
909 return consumed_bytes;
913 return "ByteInStream_Recorder[parent "+m_parent.id()+
", recording[on "+std::to_string(m_is_recording)+
static constexpr void copy_mem(void *out, const void *in, size_t n) noexcept
static bool _jau_file_size(const int fd, const jau::io::fs::file_stats &stats, const off64_t cur_pos, ByteStream::size_type &len) noexcept
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
std::string toString() const noexcept override
void setEOF(const io_result_t result) noexcept
Set end-of-data (EOS), i.e.
bool isOpen() const noexcept override
Checks if the stream has an associated file.
size_t peek(void *out, size_t length, size_type peek_offset) noexcept override
Not implemented, returns 0.
iostate_t rdstate() const noexcept override
Returns the current state flags.
ByteInStream_Feed(std::string id_name, const jau::fraction_i64 &timeout, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a ringbuffer backed externally provisioned byte input stream.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t write(const void *in, size_t length, const jau::fraction_i64 &timeout) noexcept
Write given bytes to the async ringbuffer using explicit given timeout.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
size_t read(void *out, size_t length) noexcept override
Read from the source.
size_type seek(size_type newPos) noexcept override
newPos < position() limited to markpos, see setMark().
size_t discard(size_t N) noexcept override
Implemented by skipping input stream via read.
std::string toString() const noexcept override
bool isOpen() const noexcept override
Checks if the stream has an associated file.
ByteInStream_URL(std::string url, const jau::fraction_i64 &timeout, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a ringbuffer backed Http byte input stream.
iostate_t rdstate() const noexcept override
Returns the current state flags.
size_t read(void *out, size_t length) noexcept override
Read from the source.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_t discard(size_t N) noexcept override
Implemented by skipping input stream via read.
size_t peek(void *out, size_t length, size_type peek_offset) noexcept override
Read from the source but do not modify the internal offset.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
size_type seek(size_type newPos) noexcept override
newPos < position() limited to markpos, see setMark().
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_type contentSize() const noexcept override
Returns the content_size if known.
size_type seek(size_type newPos) noexcept override
Sets position indicator for output-streams or input-streams with known length, similar to e....
size_t peek(void *, size_t, size_type) noexcept override
Read from the source but do not modify the internal offset.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_t discard(size_t N) noexcept override
Discard the next N bytes of the data.
bool hasContentSize() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
void flush() noexcept override
Synchronizes all output operations, or do nothing.
bool isOpen() const noexcept override
Checks if the stream has an associated file.
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
std::string toString() const noexcept override
ByteStream_File(const std::string &path, iomode_t iomode=iomode_t::rw, const jau::io::fs::fmode_t fmode=fs::fmode_t::def_file_prot, lb_endian_t byteOrder=lb_endian_t::little) noexcept
Construct a stream based byte stream from filesystem path, either an existing or new file.
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t read(void *, size_t) noexcept override
Read from the source.
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
std::string toString() const noexcept override
void close() noexcept override
Close the stream if supported by the underlying mechanism.
void startRecording() noexcept
Starts the recording.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
bool isRecording() noexcept
void stopRecording() noexcept
Stops the recording.
size_t read(void *, size_t) noexcept override
Read from the source.
iostate_t rdstate() const noexcept override
Returns the current state flags.
void clearRecording() noexcept
Clears the recording.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
std::string toString() const noexcept override
size_t discard(size_t N) noexcept override
Discard the next N bytes of the data.
size_t peek(void *, size_t, size_type) noexcept override
Read from the source but do not modify the internal offset.
bool setMark(size_type readLimit) noexcept override
Set markpos to current position, allowing the stream to be seekMark().
size_type seek(size_type newPos) noexcept override
Sets position indicator for output-streams or input-streams with known length, similar to e....
bool seekMark() noexcept override
Seeks stream position to markpos as set via setMark().
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t read(void *, size_t) noexcept override
Read from the source.
ByteStream_SecMemory(const std::string &in, lb_endian_t byteOrder=lb_endian_t::little)
Construct a secure memory source that reads from a string, iomode_t::read.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
constexpr bool canWrite() const noexcept
Returns true in case stream has iomode::write capabilities.
constexpr bool canRead() const noexcept
Returns true in case stream has iomode::read capabilities.
size_t discardRead(size_t n) noexcept
Fallback slow discard implementation usind read() in case of unknown stream size.
constexpr lb_endian_t byteOrder() const noexcept
Returns endian byte-order of stream storage.
uint64_t size_type
uint64_t size data type, bit position and count
ByteStream(iomode_t mode, lb_endian_t byteOrder=lb_endian_t::little) noexcept
constexpr iomode_t mode() const noexcept
static constexpr size_type npos
Invalid position constant, denoting unset mark() or invalid position.
bool fail() const noexcept
Checks if an error has occurred.
virtual iostate_t rdstate() const noexcept
Returns the current state flags.
void clearStateFlags(const iostate_t clr) noexcept
Clears given state flags from current value.
bool good() const noexcept
Checks if no error nor eof() has occurred i.e.
bool timeout() const noexcept
Checks if a timeout (non-recoverable) has occurred.
constexpr iostate_t rdstate_impl() const noexcept
constexpr void addstate_impl(iostate_t state) const noexcept
Platform agnostic representation of POSIX ::lstat() and ::stat() for a given pathname.
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
#define DBG_PRINT(...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
lb_endian_t
Simplified reduced endian type only covering little- and big-endian.
const uint8_t * cast_char_ptr_to_uint8(const char *s) noexcept
#define PRAGMA_DISABLE_WARNING_TYPE_RANGE_LIMIT
constexpr bool is_set(const E mask, const E bits) noexcept
#define PRAGMA_DISABLE_WARNING_PUSH
#define PRAGMA_DISABLE_WARNING_POP
fmode_t
Generic file type and POSIX protection mode bits as used in file_stats, touch(), mkdir() etc.
constexpr ::mode_t posix_protection_bits(const fmode_t mask) noexcept
Returns the POSIX protection bits: rwx_all | set_uid | set_gid | sticky, i.e.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
AsyncStreamResponseRef read_url_stream_async(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const AsyncStreamConsumerFunc &consumer_fn) noexcept
Asynchronous URL stream reader using the given AsyncStreamConsumerFunc consumer_fn.
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
iomode_t
Stream I/O mode, e.g.
std::string to_string(const ioaccess_t v) noexcept
Return std::string representation of the given ioaccess_t.
io_result_t
I/O operation result value.
ioaccess_t
I/O read or write access.
jau::function< bool(AsyncStreamResponse &, const uint8_t *, size_t, bool)> AsyncStreamConsumerFunc
Asynchronous stream consumer function.
std::string toString(io_result_t v) noexcept
std::unique_ptr< ByteStream > to_ByteInStream(const std::string &path_or_uri, jau::fraction_i64 timeout=20_s) noexcept
Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supporte...
const size_t BEST_URLSTREAM_RINGBUFFER_SIZE
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
@ rw
Read and write capabilities, i.e.
@ atend
Seek to end of (file) stream when opened.
@ trunc
Truncate existing (file) stream when opened with write.
@ NONE
Operation still in progress.
@ FAILED
Operation failed.
@ SUCCESS
Operation succeeded.
@ none
No error occurred nor has EOS being reached.
@ failbit
Input or output operation failed (formatting or extraction error).
@ eofbit
An input operation reached the end of its stream (EOS).
@ timeout
Input or output operation failed due to timeout.
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
relaxed_atomic_uint64 content_length
content_length tracking the content_length
CXX_ALWAYS_INLINE _Tp load() const noexcept