libfetch 0.0.0
A lightweight asynchronous HTTP/1.1 client library implementing a subset of the WHATWG Fetch API.
Loading...
Searching...
No Matches
stream.c File Reference
#include "stream.h"
#include <assert.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#include <stdatomic.h>
#include <unistd.h>

Data Structures

struct  lock_free_ring_buffer_t
 
struct  atomic_bool_t
 
struct  fetch_readable_stream
 
struct  fetch_writable_stream
 
struct  fetch_transform_stream
 
struct  fetch_stream_reader
 
struct  fetch_stream_writer
 

Macros

#define HAS_FALLBACK_MALLOC   1
 
#define ATOMIC_LOAD_RELAXED(ptr)
 
#define ATOMIC_STORE_RELAXED(ptr, val)
 
#define ATOMIC_LOAD_ACQUIRE(ptr)
 
#define ATOMIC_STORE_RELEASE(ptr, val)
 
#define ATOMIC_CAS_WEAK(ptr, expected, desired)
 
#define ATOMIC_FETCH_ADD(ptr, val)
 
#define RING_BUFFER_SIZE   1024
 
#define RING_BUFFER_MASK   (RING_BUFFER_SIZE - 1)
 

Functions

static void * aligned_malloc (size_t size, size_t alignment)
 
static void aligned_free (void *ptr)
 
typedef _Atomic (uint32_t)
 
static lock_free_ring_buffer_tring_buffer_new (void)
 
static void ring_buffer_free (lock_free_ring_buffer_t *buffer)
 
static bool ring_buffer_enqueue (lock_free_ring_buffer_t *buffer, fetch_stream_chunk_t chunk)
 
static fetch_stream_chunk_t ring_buffer_dequeue (lock_free_ring_buffer_t *buffer)
 
static void ring_buffer_close (lock_free_ring_buffer_t *buffer)
 
static void atomic_bool_init (atomic_bool_t *atomic, bool initial)
 
static void atomic_bool_destroy (atomic_bool_t *atomic)
 
static bool atomic_bool_load (atomic_bool_t *atomic)
 
static void atomic_bool_store (atomic_bool_t *atomic, bool value)
 
void fetch_stream_chunk_free (fetch_stream_chunk_t *chunk)
 
static fetch_stream_chunk_t chunk_copy (const void *data, size_t size)
 
fetch_stream_result_t fetch_readable_stream_controller_enqueue (fetch_readable_stream_controller_t *controller, const void *value, size_t size)
 
void fetch_readable_stream_controller_close (fetch_readable_stream_controller_t *controller)
 
void fetch_readable_stream_controller_error (fetch_readable_stream_controller_t *controller, const char *error)
 
fetch_readable_stream_tfetch_readable_stream_new (fetch_readable_stream_start_fn start, void *userdata)
 
void fetch_readable_stream_free (fetch_readable_stream_t *stream)
 
bool fetch_readable_stream_locked (const fetch_readable_stream_t *stream)
 
fetch_stream_reader_tfetch_readable_stream_get_reader (fetch_readable_stream_t *stream)
 
void fetch_readable_stream_cancel (fetch_readable_stream_t *stream, const char *reason)
 
fetch_readable_stream_tfetch_readable_stream_tee (fetch_readable_stream_t *stream)
 
fetch_writable_stream_tfetch_writable_stream_new (void)
 
void fetch_writable_stream_free (fetch_writable_stream_t *stream)
 
bool fetch_writable_stream_locked (const fetch_writable_stream_t *stream)
 
fetch_stream_writer_tfetch_writable_stream_get_writer (fetch_writable_stream_t *stream)
 
fetch_stream_result_t fetch_writable_stream_abort (fetch_writable_stream_t *stream, const char *reason)
 
fetch_stream_result_t fetch_writable_stream_close (fetch_writable_stream_t *stream)
 
fetch_transform_stream_tfetch_transform_stream_new (void)
 
void fetch_transform_stream_free (fetch_transform_stream_t *stream)
 
fetch_readable_stream_tfetch_transform_stream_readable (fetch_transform_stream_t *stream)
 
fetch_writable_stream_tfetch_transform_stream_writable (fetch_transform_stream_t *stream)
 
void fetch_stream_reader_free (fetch_stream_reader_t *reader)
 
bool fetch_stream_reader_closed (const fetch_stream_reader_t *reader)
 
fetch_stream_chunk_t fetch_stream_reader_read (fetch_stream_reader_t *reader)
 
void fetch_stream_reader_cancel (fetch_stream_reader_t *reader, const char *reason)
 
void fetch_stream_reader_release_lock (fetch_stream_reader_t *reader)
 
void fetch_stream_writer_free (fetch_stream_writer_t *writer)
 
bool fetch_stream_writer_closed (const fetch_stream_writer_t *writer)
 
fetch_stream_state_t fetch_stream_writer_ready (const fetch_stream_writer_t *writer)
 
fetch_stream_result_t fetch_stream_writer_write (fetch_stream_writer_t *writer, const void *chunk, size_t size)
 
fetch_stream_result_t fetch_stream_writer_close (fetch_stream_writer_t *writer)
 
fetch_stream_result_t fetch_stream_writer_abort (fetch_stream_writer_t *writer, const char *reason)
 
void fetch_stream_writer_release_lock (fetch_stream_writer_t *writer)
 
stream_buffer_t fetch_readable_stream_consume_all (fetch_readable_stream_t *stream, size_t initial_capacity)
 
stream_buffer_t fetch_readable_stream_consume_all_default (fetch_readable_stream_t *stream)
 
void stream_buffer_free (stream_buffer_t *buffer)
 
fetch_stream_result_t fetch_readable_stream_pipe_to (fetch_readable_stream_t *readable, fetch_writable_stream_t *writable)
 
fetch_readable_stream_tfetch_readable_stream_pipe_through (fetch_readable_stream_t *readable, fetch_transform_stream_t *transform)
 

Variables

 ring_slot_t
 

Macro Definition Documentation

◆ ATOMIC_CAS_WEAK

#define ATOMIC_CAS_WEAK ( ptr,
expected,
desired )
Value:
atomic_compare_exchange_weak_explicit((ptr), &(expected), (desired), \
memory_order_acq_rel, \
memory_order_relaxed)

◆ ATOMIC_FETCH_ADD

#define ATOMIC_FETCH_ADD ( ptr,
val )
Value:
atomic_fetch_add_explicit((ptr), (val), memory_order_acq_rel)

◆ ATOMIC_LOAD_ACQUIRE

#define ATOMIC_LOAD_ACQUIRE ( ptr)
Value:
atomic_load_explicit((ptr), memory_order_acquire)

◆ ATOMIC_LOAD_RELAXED

#define ATOMIC_LOAD_RELAXED ( ptr)
Value:
atomic_load_explicit((ptr), memory_order_relaxed)

◆ ATOMIC_STORE_RELAXED

#define ATOMIC_STORE_RELAXED ( ptr,
val )
Value:
atomic_store_explicit((ptr), (val), memory_order_relaxed)

◆ ATOMIC_STORE_RELEASE

#define ATOMIC_STORE_RELEASE ( ptr,
val )
Value:
atomic_store_explicit((ptr), (val), memory_order_release)

◆ HAS_FALLBACK_MALLOC

#define HAS_FALLBACK_MALLOC   1

◆ RING_BUFFER_MASK

#define RING_BUFFER_MASK   (RING_BUFFER_SIZE - 1)

◆ RING_BUFFER_SIZE

#define RING_BUFFER_SIZE   1024

Function Documentation

◆ _Atomic()

typedef _Atomic ( uint32_t )

◆ aligned_free()

static void aligned_free ( void * ptr)
static

◆ aligned_malloc()

static void * aligned_malloc ( size_t size,
size_t alignment )
static

◆ atomic_bool_destroy()

static void atomic_bool_destroy ( atomic_bool_t * atomic)
static

◆ atomic_bool_init()

static void atomic_bool_init ( atomic_bool_t * atomic,
bool initial )
static

◆ atomic_bool_load()

static bool atomic_bool_load ( atomic_bool_t * atomic)
static

◆ atomic_bool_store()

static void atomic_bool_store ( atomic_bool_t * atomic,
bool value )
static

◆ chunk_copy()

static fetch_stream_chunk_t chunk_copy ( const void * data,
size_t size )
static

◆ fetch_readable_stream_cancel()

void fetch_readable_stream_cancel ( fetch_readable_stream_t * stream,
const char * reason )

◆ fetch_readable_stream_consume_all()

stream_buffer_t fetch_readable_stream_consume_all ( fetch_readable_stream_t * stream,
size_t initial_capacity )

Consume entire readable stream into a contiguous buffer This is a blocking operation that reads until the stream is closed

Parameters
streamThe readable stream to consume
initial_capacityInitial buffer capacity (will grow as needed)
Returns
stream_buffer_t containing the data, or {NULL, 0, 0} on error

Note: Caller must free the returned buffer with stream_buffer_free()

◆ fetch_readable_stream_consume_all_default()

stream_buffer_t fetch_readable_stream_consume_all_default ( fetch_readable_stream_t * stream)

Convenience function with default initial capacity (4KB)

◆ fetch_readable_stream_controller_close()

void fetch_readable_stream_controller_close ( fetch_readable_stream_controller_t * controller)

◆ fetch_readable_stream_controller_enqueue()

fetch_stream_result_t fetch_readable_stream_controller_enqueue ( fetch_readable_stream_controller_t * controller,
const void * value,
size_t size )

◆ fetch_readable_stream_controller_error()

void fetch_readable_stream_controller_error ( fetch_readable_stream_controller_t * controller,
const char * error )

◆ fetch_readable_stream_free()

void fetch_readable_stream_free ( fetch_readable_stream_t * stream)

◆ fetch_readable_stream_get_reader()

fetch_stream_reader_t * fetch_readable_stream_get_reader ( fetch_readable_stream_t * stream)

◆ fetch_readable_stream_locked()

bool fetch_readable_stream_locked ( const fetch_readable_stream_t * stream)

◆ fetch_readable_stream_new()

fetch_readable_stream_t * fetch_readable_stream_new ( fetch_readable_stream_start_fn start,
void * userdata )

◆ fetch_readable_stream_pipe_through()

fetch_readable_stream_t * fetch_readable_stream_pipe_through ( fetch_readable_stream_t * readable,
fetch_transform_stream_t * transform )

◆ fetch_readable_stream_pipe_to()

fetch_stream_result_t fetch_readable_stream_pipe_to ( fetch_readable_stream_t * readable,
fetch_writable_stream_t * writable )

◆ fetch_readable_stream_tee()

fetch_readable_stream_t * fetch_readable_stream_tee ( fetch_readable_stream_t * stream)

◆ fetch_stream_chunk_free()

void fetch_stream_chunk_free ( fetch_stream_chunk_t * chunk)

◆ fetch_stream_reader_cancel()

void fetch_stream_reader_cancel ( fetch_stream_reader_t * reader,
const char * reason )

◆ fetch_stream_reader_closed()

bool fetch_stream_reader_closed ( const fetch_stream_reader_t * reader)

◆ fetch_stream_reader_free()

void fetch_stream_reader_free ( fetch_stream_reader_t * reader)

◆ fetch_stream_reader_read()

fetch_stream_chunk_t fetch_stream_reader_read ( fetch_stream_reader_t * reader)

◆ fetch_stream_reader_release_lock()

void fetch_stream_reader_release_lock ( fetch_stream_reader_t * reader)

◆ fetch_stream_writer_abort()

fetch_stream_result_t fetch_stream_writer_abort ( fetch_stream_writer_t * writer,
const char * reason )

◆ fetch_stream_writer_close()

fetch_stream_result_t fetch_stream_writer_close ( fetch_stream_writer_t * writer)

◆ fetch_stream_writer_closed()

bool fetch_stream_writer_closed ( const fetch_stream_writer_t * writer)

◆ fetch_stream_writer_free()

void fetch_stream_writer_free ( fetch_stream_writer_t * writer)

◆ fetch_stream_writer_ready()

fetch_stream_state_t fetch_stream_writer_ready ( const fetch_stream_writer_t * writer)

◆ fetch_stream_writer_release_lock()

void fetch_stream_writer_release_lock ( fetch_stream_writer_t * writer)

◆ fetch_stream_writer_write()

fetch_stream_result_t fetch_stream_writer_write ( fetch_stream_writer_t * writer,
const void * chunk,
size_t size )

◆ fetch_transform_stream_free()

void fetch_transform_stream_free ( fetch_transform_stream_t * stream)

◆ fetch_transform_stream_new()

fetch_transform_stream_t * fetch_transform_stream_new ( void )

◆ fetch_transform_stream_readable()

fetch_readable_stream_t * fetch_transform_stream_readable ( fetch_transform_stream_t * stream)

◆ fetch_transform_stream_writable()

fetch_writable_stream_t * fetch_transform_stream_writable ( fetch_transform_stream_t * stream)

◆ fetch_writable_stream_abort()

fetch_stream_result_t fetch_writable_stream_abort ( fetch_writable_stream_t * stream,
const char * reason )

◆ fetch_writable_stream_close()

fetch_stream_result_t fetch_writable_stream_close ( fetch_writable_stream_t * stream)

◆ fetch_writable_stream_free()

void fetch_writable_stream_free ( fetch_writable_stream_t * stream)

◆ fetch_writable_stream_get_writer()

fetch_stream_writer_t * fetch_writable_stream_get_writer ( fetch_writable_stream_t * stream)

◆ fetch_writable_stream_locked()

bool fetch_writable_stream_locked ( const fetch_writable_stream_t * stream)

◆ fetch_writable_stream_new()

fetch_writable_stream_t * fetch_writable_stream_new ( void )

◆ ring_buffer_close()

static void ring_buffer_close ( lock_free_ring_buffer_t * buffer)
static

◆ ring_buffer_dequeue()

static fetch_stream_chunk_t ring_buffer_dequeue ( lock_free_ring_buffer_t * buffer)
static

◆ ring_buffer_enqueue()

static bool ring_buffer_enqueue ( lock_free_ring_buffer_t * buffer,
fetch_stream_chunk_t chunk )
static

◆ ring_buffer_free()

static void ring_buffer_free ( lock_free_ring_buffer_t * buffer)
static

◆ ring_buffer_new()

static lock_free_ring_buffer_t * ring_buffer_new ( void )
static

◆ stream_buffer_free()

void stream_buffer_free ( stream_buffer_t * buffer)

Free a stream buffer

Variable Documentation

◆ ring_slot_t

ring_slot_t