High-level and low-level channel types for thread-safe communication. While channels are essentially thread-safe queues under the hood, their primary purpose is to facilitate safe communication between multiple readers and multiple writers. Although they can be used like queues, channels are designed with synchronization and concurrent messaging patterns in mind. Provided types: - `Chan` a high level channel - `Raw_Chan` a low level channel - `Raw_Queue` a low level non-threadsafe queue implementation used internally Example: import "core:sync/chan" import "core:fmt" import "core:thread" // The consumer reads from the channel until it's closed. // Closing the channel acts as a signal to stop. consumer :: proc(recv_chan: chan.Chan(int, .Recv)) { for { value, ok := chan.recv(recv_chan) if !ok { break // More idiomatic than return here } fmt.println("[CONSUMER] Received:", value) } fmt.println("[CONSUMER] Channel closed, stopping.") } // The producer sends `count` number of messages. producer :: proc(send_chan: chan.Chan(int, .Send), count: int) { for i in 0..<count { fmt.println("[PRODUCER] Sending:", i) success := chan.send(send_chan, i) if !success { fmt.println("[PRODUCER] Failed to send, channel may be closed.") return } } // Signal that production is complete by closing the channel. chan.close(send_chan) fmt.println("[PRODUCER] Done producing, channel closed.") } chan_example :: proc() { // Create an unbuffered channel for int messages c, err := chan.create(chan.Chan(int), context.allocator) assert(err == .None) defer chan.destroy(c) // Start the consumer thread consumer_thread := thread.create_and_start_with_poly_data(chan.as_recv(c), consumer) defer thread.destroy(consumer_thread) // Start the producer thread with 5 messages (change count as needed) producer_thread := thread.create_and_start_with_poly_data2(chan.as_send(c), 5, producer) defer thread.destroy(producer_thread) // Wait for both threads to complete thread.join_multiple(consumer_thread, producer_thread) }

Collection Info

View Source
Collection
core
Path
sync/chan
Entries
31

Source Files

Types

4

Chan #

Source
Chan :: Chan

A typed wrapper around `Raw_Chan` which should be used preferably. Note: all procedures accepting `Raw_Chan` also accept `Chan`. **Inputs** - `$T`: The type of the messages - `Direction`: what `Direction` the channel supports Example: import "core:sync/chan" chan_example :: proc() { // Create an unbuffered channel with messages of type int, // supporting both sending and receiving. // Creating unidirectional channels, although possible, is useless. c, _ := chan.create(chan.Chan(int), context.allocator) defer chan.destroy(c) // This channel can now only be used for receiving messages recv_only_channel: chan.Chan(int, .Recv) = chan.as_recv(c) // This channel can now only be used for sending messages send_only_channel: chan.Chan(int, .Send) = chan.as_send(c) }

Raw_Chan #

Source
Raw_Chan :: Raw_Chan

`Raw_Chan` allows for thread-safe communication using fixed-size messages. This is the low-level implementation of `Chan`, which does not include the concept of Direction. Example: import "core:sync/chan" raw_chan_example :: proc() { // Create an unbuffered channel with messages of type int, c, _ := chan.create_raw(size_of(int), align_of(int), context.allocator) defer chan.destroy(c) }

Procedures

25

as_recv #

Source
@(require_results)
as_recv :: proc "contextless" (c: $C/Chan($T, $D)) -> (r: $$deferred_return) {…}

Creates a version of a channel that can only be used for receiving not sending. **Inputs** - `c`: The channel **Returns**: - An `Allocator_Error` Example: import "core:sync/chan" as_recv_example :: proc() { consumer :: proc(c: chan.Chan(int, .Recv)) { value, ok := chan.recv(c) // compile-time error: // chan.send(c, 22) } c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) chan.send(c, 112) consumer(chan.as_recv(c)) }

as_send #

Source
@(require_results)
as_send :: proc "contextless" (c: $C/Chan($T, $D)) -> (s: $$deferred_return) {…}

Creates a version of a channel that can only be used for sending not receiving. **Inputs** - `c`: The channel **Returns**: - An `Allocator_Error` Example: import "core:sync/chan" as_send_example :: proc() { // this procedure takes a channel that can only // be used for sending not receiving. producer :: proc(c: chan.Chan(int, .Send)) { chan.send(c, 112) // compile-time error: // value, ok := chan.recv(c) } c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) producer(chan.as_send(c)) }

can_recv #

Source
@(require_results)
can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool {…}

Returns whether a message can be read without blocking the current thread. Specifically, it checks if the channel is buffered and not full, or if there is already a writer attempting to send a message. **Inputs** - `c`: The channel **Returns** - `true` if a message can be read, `false` otherwise Example: import "core:sync/chan" can_recv_example :: proc() { c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) assert(!chan.can_recv(c), "the cannel is empty") assert(chan.send(c, 2)) assert(chan.can_recv(c), "there is message to read") }

can_send #

Source
@(require_results)
can_send :: proc "contextless" (c: ^Raw_Chan) -> bool {…}

Returns whether a message can be sent without blocking the current thread. Specifically, it checks if the channel is buffered and not full, or if there is already a reader waiting for a message. **Inputs** - `c`: The channel **Returns** - `true` if a message can be sent, `false` otherwise Example: import "core:sync/chan" can_send_example :: proc() { c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) assert(chan.can_send(c), "the channel's buffer is not full") assert(chan.send(c, 2)) assert(!chan.can_send(c), "the channel's buffer is full") }

cap #

Source
@(require_results)
cap :: proc "contextless" (c: ^Raw_Chan) -> int {…}

Returns the number of elements the channel could hold. Note: Unbuffered channels will always return `0` because they cannot hold elements. **Inputs** - `c`: The channel **Returns**: - Number of elements Example: import "core:sync/chan" import "core:fmt" cap_example :: proc() { c, _ := chan.create(chan.Chan(int), 2, context.allocator) defer chan.destroy(c) fmt.println(chan.cap(c)) } Output: 2

close #

Source
close :: proc "contextless" (c: ^Raw_Chan) -> bool {…}

Closes the channel, preventing new messages from being added. **Inputs** - `c`: The channel **Returns**: - `true` if the channel was closed by this operation, `false` if it was already closed Example: import "core:sync/chan" close_example :: proc() { c, _ := chan.create(chan.Chan(int), 2, context.allocator) defer chan.destroy(c) // Sending a message to an open channel assert(chan.send(c, 1), "allowed to send") // Closing the channel successfully assert(chan.close(c), "successfully closed") // Trying to send a message after the channel is closed (should fail) assert(!chan.send(c, 1), "not allowed to send after close") // Trying to close the channel again (should fail since it's already closed) assert(!chan.close(c), "was already closed") }

create_buffered #

Source
@(require_results)
create_buffered :: proc($C: typeid/Chan($T, $D=0), #any_int cap: int, allocator: Allocator) -> (c: $$deferred_return, err: Allocator_Error) {…}

Creates a buffered version of the specified `Chan` type. *Allocates Using Provided Allocator* **Inputs** - `$C`: Type of `Chan` to create - `cap`: The capacity of the channel - `allocator`: The allocator to use **Returns**: - The initialized `Chan` - An `Allocator_Error` Example: import "core:sync/chan" create_buffered_example :: proc() { c, err := chan.create_buffered(chan.Chan(int), 10, context.allocator) assert(err == .None) defer chan.destroy(c) }

create_raw_buffered #

Source
@(require_results)
create_raw_buffered :: proc(#any_int msg_size, #any_int msg_alignment: int, #any_int cap: int, allocator: Allocator, loc := #caller_location) -> (c: ^Raw_Chan, err: Allocator_Error) {…}

Creates a buffered `Raw_Chan` for messages of the specified size and alignment. *Allocates Using Provided Allocator* **Inputs** - `msg_size`: The size of the messages the messages being sent - `msg_alignment`: The alignment of the messages being sent - `cap`: The capacity of the channel - `allocator`: The allocator to use **Returns**: - The initialized `Raw_Chan` - An `Allocator_Error` Example: import "core:sync/chan" create_raw_unbuffered_example :: proc() { c, err := chan.create_raw_buffered(size_of(int), align_of(int), 10, context.allocator) assert(err == .None) defer chan.destroy(c) }

create_raw_unbuffered #

Source
@(require_results)
create_raw_unbuffered :: proc(#any_int msg_size, #any_int msg_alignment: int, allocator: Allocator, loc := #caller_location) -> (c: ^Raw_Chan, err: Allocator_Error) {…}

Creates an unbuffered `Raw_Chan` for messages of the specified size and alignment. *Allocates Using Provided Allocator* **Inputs** - `msg_size`: The size of the messages the messages being sent - `msg_alignment`: The alignment of the messages being sent - `allocator`: The allocator to use **Returns**: - The initialized `Raw_Chan` - An `Allocator_Error` Example: import "core:sync/chan" create_raw_unbuffered_example :: proc() { unbuffered, err := chan.create_raw(size_of(int), align_of(int), context.allocator) assert(err == .None) defer chan.destroy(unbuffered) }

create_unbuffered #

Source
@(require_results)
create_unbuffered :: proc($C: typeid/Chan($T, $D=0), allocator: Allocator) -> (c: $$deferred_return, err: Allocator_Error) {…}

Creates an unbuffered version of the specified `Chan` type. *Allocates Using Provided Allocator* **Inputs** - `$C`: Type of `Chan` to create - `allocator`: The allocator to use **Returns**: - The initialized `Chan` - An `Allocator_Error` Example: import "core:sync/chan" create_unbuffered_example :: proc() { c, err := chan.create_unbuffered(chan.Chan(int), context.allocator) assert(err == .None) defer chan.destroy(c) }

destroy #

Source
destroy :: proc(c: ^Raw_Chan, loc := #caller_location) -> (err: Allocator_Error) {…}

Destroys the Channel. **Inputs** - `c`: The channel to destroy **Returns**: - An `Allocator_Error`

is_buffered #

Source
@(require_results)
is_buffered :: proc "contextless" (c: ^Raw_Chan) -> bool {…}

Checks if the given channel is buffered. **Inputs** - `c`: The channel **Returns**: - `true` if the channel is buffered, `false` otherwise Example: import "core:sync/chan" is_buffered_example :: proc() { c, _ := chan.create(chan.Chan(int), 1, context.allocator) defer chan.destroy(c) assert(chan.is_buffered(c)) }

is_closed #

Source
@(require_results)
is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool {…}

Returns if the channel is closed or not **Inputs** - `c`: The channel **Returns**: - `true` if the channel is closed, `false` otherwise

is_unbuffered #

Source
@(require_results)
is_unbuffered :: proc "contextless" (c: ^Raw_Chan) -> bool {…}

Checks if the given channel is unbuffered. **Inputs** - `c`: The channel **Returns**: - `true` if the channel is unbuffered, `false` otherwise Example: import "core:sync/chan" is_buffered_example :: proc() { c, _ := chan.create(chan.Chan(int), context.allocator) defer chan.destroy(c) assert(chan.is_unbuffered(c)) }

len #

Source
@(require_results)
len :: proc "contextless" (c: ^Raw_Chan) -> int {…}

Returns the number of elements currently in the channel. Note: Unbuffered channels will always return `0` because they cannot hold elements. **Inputs** - `c`: The channel **Returns**: - Number of elements Example: import "core:sync/chan" import "core:fmt" len_example :: proc() { c, _ := chan.create(chan.Chan(int), 2, context.allocator) defer chan.destroy(c) fmt.println(chan.len(c)) assert(chan.send(c, 1)) // add an element fmt.println(chan.len(c)) } Output: 0 1

recv #

Source
@(require_results)
recv :: proc "contextless" (c: $C/Chan($T, $D=0)) -> (data: $$deferred_return, ok: bool) {…}

Reads a message from the channel, blocking the current thread if: - the channel is unbuffered - the channel's buffer is empty until the channel is being written to or the channel is closed. `recv` will return `false` when attempting to receive a message on an already closed channel. **Inputs** - `c`: The channel **Returns** - The message - `true` if a message was received, `false` when the channel was already closed Example: import "core:sync/chan" recv_example :: proc() { c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) assert(chan.send(c, 2)) value, ok := chan.recv(c) assert(ok, "the value was received") // this would block since the channel is now empty // value, ok = chan.recv(c) // reading from a closed channel returns false chan.close(c) value, ok = chan.recv(c) assert(!ok, "the channel is closed") }

recv_raw #

Source
@(require_results)
recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) {…}

Reads a message from the channel, blocking the current thread if: - the channel is unbuffered - the channel's buffer is empty until the channel is being written to or the channel is closed. `recv_raw` will return `false` when attempting to receive a message on an already closed channel. Note: The location pointed to by `msg_out` must match the size and alignment used when the `Raw_Chan` was created. **Inputs** - `c`: The channel - `msg_out`: Pointer to where the message should be stored **Returns** - `true` if a message was received, `false` when the channel was already closed Example: import "core:sync/chan" recv_raw_example :: proc() { c, err := chan.create_raw(size_of(int), align_of(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) value := 2 assert(chan.send_raw(c, &value)) assert(chan.recv_raw(c, &value)) // this would block since the channel is now empty // assert(chan.recv_raw(c, &value)) // reading from a closed channel returns false chan.close(c) assert(! chan.recv_raw(c, &value)) }

send #

Source
send :: proc "contextless" (c: $C/Chan($T, $D), data: $T) -> (ok: bool) {…}

Sends the specified message, blocking the current thread if: - the channel is unbuffered - the channel's buffer is full until the channel is being read from or the channel is closed. `send` will return `false` when attempting to send on an already closed channel. **Inputs** - `c`: The channel - `data`: The message to send **Returns** - `true` if the message was sent, `false` when the channel was already closed Example: import "core:sync/chan" send_example :: proc() { c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) assert(chan.send(c, 2)) // this would block since the channel has a buffersize of 1 // assert(chan.send(c, 2)) // sending on a closed channel returns false chan.close(c) assert(! chan.send(c, 2)) }

send_raw #

Source
@(require_results)
send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) {…}

Sends the specified message, blocking the current thread if: - the channel is unbuffered - the channel's buffer is full until the channel is being read from or the channel is closed. `send_raw` will return `false` when attempting to send on an already closed channel. Note: The message referenced by `msg_out` must match the size and alignment used when the `Raw_Chan` was created. **Inputs** - `c`: The channel - `msg_out`: Pointer to the data to send **Returns** - `true` if the message was sent, `false` when the channel was already closed Example: import "core:sync/chan" send_raw_example :: proc() { c, err := chan.create_raw(size_of(int), align_of(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) value := 2 assert(chan.send_raw(c, &value)) // this would block since the channel has a buffersize of 1 // assert(chan.send_raw(c, &value)) // sending on a closed channel returns false chan.close(c) assert(! chan.send_raw(c, &value)) }

try_recv #

Source
@(require_results)
try_recv :: proc "contextless" (c: $C/Chan($T, $D=0)) -> (data: $$deferred_return, ok: bool) {…}

Tries reading a message from the channel in a non-blocking fashion. **Inputs** - `c`: The channel **Returns** - The message - `true` if a message was received, `false` when the channel was already closed or no message was available Example: import "core:sync/chan" try_recv_example :: proc() { c, err := chan.create(chan.Chan(int), context.allocator) assert(err == .None) defer chan.destroy(c) _, ok := chan.try_recv(c) assert(!ok, "there is not value to read") }

try_recv_raw #

Source
@(require_results)
try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool {…}

Reads a message from the channel if one is available. Note: The location pointed to by `msg_out` must match the size and alignment used when the `Raw_Chan` was created. **Inputs** - `c`: The channel - `msg_out`: Pointer to where the message should be stored **Returns** - `true` if a message was received, `false` when the channel was already closed or no message was available Example: import "core:sync/chan" try_recv_raw_example :: proc() { c, err := chan.create_raw(size_of(int), align_of(int), context.allocator) assert(err == .None) defer chan.destroy(c) value: int assert(!chan.try_recv_raw(c, &value)) }

try_select_raw #

Source
@(require_results)
try_select_raw :: proc(recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []rawptr, recv_out: rawptr) -> (select_idx: int, status: Select_Status) {…}

Attempts to either send or receive messages on the specified channels without blocking. `try_select_raw` first identifies which channels have messages ready to be received and which are available for sending. It then randomly selects one operation (either a send or receive) to perform. If no channels have messages ready, the procedure is a noop. Note: Each message in `send_msgs` corresponds to the send channel at the same index in `sends`. If the message is nil, corresponding send channel will be skipped. **Inputs** - `recv`: A slice of channels to read from - `sends`: A slice of channels to send messages on - `send_msgs`: A slice of messages to send - `recv_out`: A pointer to the location where, when receiving, the message should be stored **Returns** - Position of the available channel which was used for receiving or sending - `true` if sending/receiving was successfull, `false` if the channel was closed or no channel was available Example: import "core:sync/chan" import "core:fmt" try_select_raw_example :: proc() { c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) // sending value '1' on the channel value1 := 1 msgs := [?]rawptr{&value1} send_chans := [?]^chan.Raw_Chan{c} // for simplicity the same channel used for sending is also used for receiving receive_chans := [?]^chan.Raw_Chan{c} // where the value from the read should be stored received_value: int idx, ok := chan.try_select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value) fmt.println("SELECT: ", idx, ok) fmt.println("RECEIVED VALUE ", received_value) idx, ok = chan.try_select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value) fmt.println("SELECT: ", idx, ok) fmt.println("RECEIVED VALUE ", received_value) // closing of a channel also affects the select operation chan.close(c) idx, ok = chan.try_select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value) fmt.println("SELECT: ", idx, ok) } Output: SELECT: 0 Send RECEIVED VALUE 0 SELECT: 0 Recv RECEIVED VALUE 1 SELECT: -1 None

try_send #

Source
@(require_results)
try_send :: proc "contextless" (c: $C/Chan($T, $D), data: $T) -> (ok: bool) {…}

Tries sending the specified message which is: - blocking: given the channel is unbuffered - non-blocking: given the channel is buffered **Inputs** - `c`: The channel - `data`: The message to send **Returns** - `true` if the message was sent, `false` when the channel was already closed or the channel's buffer was full Example: import "core:sync/chan" try_send_example :: proc() { c, err := chan.create(chan.Chan(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) assert(chan.try_send(c, 2), "there is enough space") assert(!chan.try_send(c, 2), "the buffer is already full") }

try_send_raw #

Source
@(require_results)
try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) {…}

Tries sending the specified message which is: - blocking: given the channel is unbuffered - non-blocking: given the channel is buffered Note: The message referenced by `msg_out` must match the size and alignment used when the `Raw_Chan` was created. **Inputs** - `c`: the channel - `msg_out`: pointer to the data to send **Returns** - `true` if the message was sent, `false` when the channel was already closed or the channel's buffer was full Example: import "core:sync/chan" try_send_raw_example :: proc() { c, err := chan.create_raw(size_of(int), align_of(int), 1, context.allocator) assert(err == .None) defer chan.destroy(c) value := 2 assert(chan.try_send_raw(c, &value), "there is enough space") assert(!chan.try_send_raw(c, &value), "the buffer is already full") }

Procedure Groups

2

create #

Source
create :: proc{
	create_unbuffered,
	create_buffered,
}

Creates a buffered or unbuffered `Chan` instance. *Allocates Using Provided Allocator* **Inputs** - `$C`: Type of `Chan` to create - [`cap`: The capacity of the channel] omit for creating unbuffered channels - `allocator`: The allocator to use **Returns**: - The initialized `Chan` - An `Allocator_Error` Example: import "core:sync/chan" create_example :: proc() { unbuffered: chan.Chan(int) buffered: chan.Chan(int) err: runtime.Allocator_Error unbuffered, err = chan.create(chan.Chan(int), context.allocator) assert(err == .None) defer chan.destroy(unbuffered) buffered, err = chan.create(chan.Chan(int), 10, context.allocator) assert(err == .None) defer chan.destroy(buffered) }

create_raw #

Source
create_raw :: proc{
	create_raw_unbuffered,
	create_raw_buffered,
}

Creates a buffered or unbuffered `Raw_Chan` for messages of the specified size and alignment. *Allocates Using Provided Allocator* **Inputs** - `msg_size`: The size of the messages the messages being sent - `msg_alignment`: The alignment of the messages being sent - [`cap`: The capacity of the channel] omit for creating unbuffered channels - `allocator`: The allocator to use **Returns**: - The initialized `Raw_Chan` - An `Allocator_Error` Example: import "core:sync/chan" create_raw_example :: proc() { unbuffered: ^chan.Raw_Chan buffered: ^chan.Raw_Chan err: runtime.Allocator_Error unbuffered, err = chan.create_raw(size_of(int), align_of(int), context.allocator) assert(err == .None) defer chan.destroy(unbuffered) buffered, err = chan.create_raw(size_of(int), align_of(int), 10, context.allocator) assert(err == .None) defer chan.destroy(buffered) }