C++ - Working with Asynchrony Generally (Sender/Receiver)

7 minute read

Eric Niebler - Working with Asynchrony Generally - CppEurope 2022

In this session, Eric Niebler explores the future of C++ asynchrony through the Sender/Receiver model (P2300), which has been accepted into the C++26 Standard. This model provides a standard way to represent and compose asynchronous operations, moving beyond the limitations of std::future and std::promise.

Core Concepts of P2300

The Sender/Receiver framework is built on four main pillars:

1. Schedulers

A scheduler is a lightweight handle to an execution context (like a thread pool or GPU stream). It is a factory for senders that complete from a specific execution resource.

2. Senders

A sender is a description of asynchronous work to be sent for execution. Senders are lazy—they describe an operation but don’t start it until connected to a receiver and started. They can be composed into task graphs using generic algorithms.

3. Receivers

A receiver is a generalized callback that consumes asynchronous results from a sender. It has three completion channels:

  • set_value: Successful completion
  • set_error: Failure during calculation or scheduling
  • set_stopped: Operation ended before success or failure (cancellation)

4. Operation State

An operation state contains the state needed by an asynchronous operation. It is created by connecting a sender and receiver via std::execution::connect. Work is not enqueued until start() is called.

Sender Factories

Sender factories create senders from non-sender parameters:

Function Purpose
execution::just() Creates sender completing synchronously with provided arguments
execution::just_error() Creates sender that completes with an error
execution::just_stopped() Creates sender completing via set_stopped
execution::read_env() Creates sender querying receiver’s environment
execution::schedule() Prepares task graph for execution on a scheduler

Sender Adaptors

Sender adaptors accept senders and return composed senders:

Function Purpose
starts_on() Starts execution on provided scheduler’s resource
continues_on() Completes on provided scheduler’s resource
on() Transfers execution to scheduler, then back
then() Chains function invocation with sender’s values
upon_error() Chains function invocation on error
upon_stopped() Chains function invocation on stopped signal
let_value() Invokes function with values, returns sender
let_error() Invokes function with error, returns sender
let_stopped() Invokes function with stop token, returns sender
bulk() Multi-shot sender invoking function for each index
split() Converts single-shot to multi-shot sender
when_all() Completes when all input senders complete
into_variant() Sends variant of tuples from possible completions
stopped_as_optional() Maps stopped to std::nullopt
stopped_as_error() Maps stopped channel to error

Sender Consumers

Function Purpose
sync_wait() Blocks until sender completes, returns result
start_detached() Starts sender without waiting for completion

Basic Example: Thread Pool with Concurrent Senders

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <stdexec/execution.hpp>
#include <exec/static_thread_pool.hpp>

int main()
{
    exec::static_thread_pool pool(3);
    auto sched = pool.get_scheduler();

    auto fun = [](int i) { return i * i; };

    auto work = stdexec::when_all(
        stdexec::starts_on(sched, stdexec::just(0) | stdexec::then(fun)),
        stdexec::starts_on(sched, stdexec::just(1) | stdexec::then(fun)),
        stdexec::starts_on(sched, stdexec::just(2) | stdexec::then(fun))
    );

    auto [i, j, k] = stdexec::sync_wait(std::move(work)).value();
    std::printf("%d %d %d\n", i, j, k);  // Output: 0 1 4
}

Using then for Chaining Operations

The then adaptor chains a function invocation with a sender’s values:

1
2
3
4
5
6
7
8
9
10
namespace ex = stdexec;

ex::sender auto handle_classify_request(const http_request& req) {
    return ex::just(req)
         | ex::then(extract_image)
         | ex::then(do_classify)
         | ex::upon_error(on_classification_error)
         | ex::upon_stopped(on_classification_cancelled)
         | ex::then(to_response);
}

HTTP Server Example with let_value

This comprehensive example demonstrates composing asynchronous operations using let_value, let_error, and let_stopped:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#include <iostream>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>

#include <stdexec/execution.hpp>
#include "exec/start_detached.hpp"
#include "exec/static_thread_pool.hpp"

namespace ex = stdexec;

struct http_request {
    std::string url_;
    std::vector<std::pair<std::string, std::string>> headers_;
    std::string body_;
};

struct http_response {
    int status_code_;
    std::string body_;
};

template <ex::scheduler S>
auto schedule_request_start(S sched, int idx) -> ex::sender auto {
    auto url = std::string("/query?image_idx=") + std::to_string(idx);
    if (idx == 7)
        url.clear();  // Simulate invalid request
    http_request req{.url_ = std::move(url), .headers_ = {}, .body_ = {}};
    std::cout << "HTTP request " << idx << " arrived\n";
    return ex::just(std::move(req)) | ex::continues_on(std::forward<S>(sched));
}

auto send_response(http_response const& resp) -> ex::sender auto {
    std::cout << "Sending back response: " << resp.status_code_ << "\n";
    return ex::just();
}

auto validate_request(http_request const& req) -> ex::sender auto {
    std::cout << "validating request " << req.url_ << "\n";
    if (req.url_.empty())
        throw std::invalid_argument("No URL");
    return ex::just(req);
}

auto handle_request(http_request const& req) -> ex::sender auto {
    std::cout << "handling request " << req.url_ << "\n";
    return ex::just(http_response{.status_code_ = 200, .body_ = "image details"});
}

auto error_to_response(std::exception_ptr err) -> ex::sender auto {
    try {
        std::rethrow_exception(err);
    } catch (std::invalid_argument const& e) {
        return ex::just(http_response{.status_code_ = 404, .body_ = e.what()});
    } catch (std::exception const& e) {
        return ex::just(http_response{.status_code_ = 500, .body_ = e.what()});
    } catch (...) {
        return ex::just(http_response{.status_code_ = 500, .body_ = "Unknown error"});
    }
}

auto stopped_to_response() -> ex::sender auto {
    return ex::just(http_response{
        .status_code_ = 503,
        .body_ = "Service temporarily unavailable"
    });
}

int main() {
    exec::static_thread_pool pool{8};
    ex::scheduler auto sched = pool.get_scheduler();

    for (int i = 0; i < 10; i++) {
        ex::sender auto snd =
            schedule_request_start(sched, i)
            | ex::let_value(validate_request)
            | ex::let_value(handle_request)
            | ex::let_error(error_to_response)
            | ex::let_stopped(stopped_to_response)
            | ex::let_value(send_response);

        exec::start_detached(std::move(snd));
    }

    pool.request_stop();
    return 0;
}

Key Differences: then vs let_value

  • then: The callback returns a value directly
  • let_value: The callback returns a sender, which is then flattened into the pipeline

Using when_all for Concurrent Operations

1
2
3
4
5
6
7
8
9
10
namespace ex = stdexec;

ex::sender auto sends_1 = ex::just(1);
ex::sender auto sends_abc = ex::just("abc");

ex::sender auto both = ex::when_all(sends_1, sends_abc);

ex::sender auto final = ex::then(both, [](auto... args) {
    std::cout << std::format("the two args: {}, {}", args...);
});

Using run_loop with Schedulers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <execution>
#include <string>
#include <thread>

using namespace std::literals;

int main()
{
    std::execution::run_loop loop;

    std::jthread worker([&](std::stop_token st) {
        std::stop_callback cb{st, [&]{ loop.finish(); }};
        loop.run();
    });

    // Factory: create sender with a value
    std::execution::sender auto hello = std::execution::just("hello world"s);

    // Adaptor: chain with function invocation
    std::execution::sender auto print =
        std::move(hello)
        | std::execution::then([](std::string msg) {
            return std::puts(msg.c_str());
        });

    // Get scheduler from run_loop
    std::execution::scheduler auto io_thread = loop.get_scheduler();

    // Adaptor: transfer execution context
    std::execution::sender auto work =
        std::execution::on(io_thread, std::move(print));

    // Consumer: synchronously wait for result
    auto [result] = std::this_thread::sync_wait(std::move(work)).value();

    return result;
}
// Output: hello world

Writing a Custom Sender

Eric Niebler demonstrates wrapping a classic async C API with sender/receiver:

Classic Async C API

1
2
3
4
5
6
7
8
struct overlapped {
    // ...OS internals here...
};

using overlapped_callback = void(int status, int bytes, overlapped* user);

int read_file(FILE*, char* buffer, int bytes,
              overlapped* user, overlapped_callback* cb);

Sender-Based Wrapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
namespace stdex = std::execution;

struct read_file_sender {
    using sender_concept = stdex::sender_t;

    using completion_signatures =
        stdex::completion_signatures<
            stdex::set_value_t(int, char*),
            stdex::set_error_t(int)>;

    auto connect(stdex::receiver auto rcvr) {
        return read_file_operation{{}, {}, pfile, buffer,
                                   size, std::move(rcvr)};
    }

    FILE* pfile;
    char* buffer;
    int size;
};

Operation State Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template <class Receiver>
struct read_file_operation : overlapped, immovable {
    static void _callback(int status, int bytes, overlapped* data) {
        auto* op = static_cast<read_file_operation*>(data);
        if (status == OK)
            stdex::set_value(std::move(op->rcvr), bytes, op->buffer);
        else
            stdex::set_error(std::move(op->rcvr), status);
    }

    void start() noexcept {
        int status = read_file(pfile, buffer, size, this, &_callback);
        if (status != OK)
            stdex::set_error(std::move(rcvr), status);
    }

    FILE* pfile;
    char* buffer;
    int size;
    Receiver rcvr;
};

Using with Coroutines

1
2
3
4
5
6
7
8
9
10
exec::task<std::string> process_file(FILE* pfile) {
    std::string str;
    str.resize(MAX_BUFFER);

    auto [bytes, buff] =
        co_await async_read_file(pfile, str.data(), str.size());

    str.resize(bytes);
    co_return str;
}

Key Advantages

  • Efficiency: Avoids heap allocations and synchronization overhead often associated with std::future
  • Composition: Rich set of algorithms (then, when_all, let_value, etc.) to compose complex async flows
  • Structured Concurrency: Asynchronous operations have well-defined lifetimes, making error handling and cancellation robust
  • Generic Programming: Write code agnostic of whether it runs on a thread pool, GPU, or I/O loop
  • Lazy Evaluation: Work descriptions are built statically before execution occurs

Conclusion

The Sender/Receiver model represents a significant shift in how we think about asynchrony in C++. By separating the description of work from its execution, it enables highly performant and portable concurrent code. With its acceptance into C++26, this becomes the standard approach for asynchronous programming in modern C++.

Resources