photon

Photon is a lightweight transparent fiber scheduler. It's inspired by Golang's green thread model and the spawn function is called go doing the same job that Golang's keyword does. The framework API surface is kept to a minimum, many programs can be written using only three primitives: initPhoton to initialize Photon, runScheduler to start fiber scheduler and go to create tasks, including the initial tasks.

Discussion

Example, showcasing channels and std.range interop:

  1. module examples.channels;
    
    import std.algorithm, std.datetime, std.range, std.stdio;
    import photon;
    
    void first(shared Channel!string work, shared Channel!int completion) {
        delay(2.msecs);
        work.put("first #1");
        delay(2.msecs);
        work.put("first #2");
        delay(2.msecs);
        work.put("first #3");
        completion.put(1);
    }
    
    void second(shared Channel!string work, shared Channel!int completion) {
        delay(3.msecs);
        work.put("second #1");
        delay(3.msecs);
        work.put("second #2");
        completion.put(2);
    }
    
    void main() {
        initPhoton();
        auto jobQueue = channel!string(2);
        auto finishQueue = channel!int(1);
        go({
            first(jobQueue, finishQueue);
        });
        go({ // producer # 2
            second(jobQueue, finishQueue);
        });
        go({ // consumer
            foreach (item; jobQueue) {
                delay(1.seconds);
                writeln(item);
            }
        });
        go({ // closer
            auto completions = finishQueue.take(2).array;
            assert(completions.length == 2);
            jobQueue.close(); // all producers are done
        });
        runScheduler();
    }
    

  • Declaration

    struct Task;

    Task result allows one fiber to wait on the other by joining the execution.

  • Declaration

    nothrow @trusted Task initPhoton();

    Initialize event loop and internal data structures for Photon scheduler.

  • go

    Declaration

    @trusted Task go(void delegate() func);
    @safe Task go(void function() func);

    Setup a fiber task to run on the Photon scheduler.

  • Declaration

    @trusted Task goOnSameThread(void delegate() func);
    @safe Task goOnSameThread(void function() func);

    Same as go but make sure the fiber is scheduled on the same thread of the threadpool. Could be useful if there is a need to propagate TLS variable.

  • Declaration

    @trusted T offload(T)(T delegate() work);

    Run work on a dedicated thread pool and pass the result back to the calling fiber or thread. This avoids blocking event loop on computationally intensive tasks.

  • Declaration

    void delay(Duration req);

    Suspend the current fiber or thread for req amount of time. Note the resolution of wait is in milliseconds, a delay of zero will still yield the execution.

  • Declaration

    void yield();

    Yields the execution of current fiber or thread

  • Declaration

    nothrow @safe size_t schedulerThreads();

    Number of threads running the scheduler loop

  • Declaration

    @trusted void runScheduler();

    Start sheduler and run fibers until all are terminated.

  • Declaration

    @trusted void runPhoton(void delegate() main);

    Initialize and run fibers with the given main

  • Declaration

    nothrow @trusted auto mutex();

    Create non-recursive mutex

    Examples

    1. initPhoton();
      auto mtx = mutex();
      int counter = 0;
      go({
          foreach (_; 0..100) {
              mtx.lock();
              int c = counter;
              delay(1.msecs);
              counter = c + 1;
              mtx.unlock();
          }
      });
      go({
          foreach (_; 0..100) {
              mtx.lock();
              int c = counter;
              delay(1.msecs);
              counter = c + 1;
              mtx.unlock();
          }
      });
      runScheduler();
      mtx.dispose();
      assert(counter == 200);
      

  • Declaration

    struct RecursiveMutex;

    • Declaration

      nothrow @trusted void lock() shared;

    • Declaration

      nothrow @trusted bool tryLock() shared;

    • Declaration

      nothrow @trusted bool locked() shared;

    • Declaration

      nothrow @trusted void unlock() shared;

    • Declaration

      nothrow @trusted void dispose() shared;

  • Declaration

    nothrow @trusted auto recursiveMutex();

    Create recursive mutex

  • Declaration

    nothrow @trusted auto condition();

    Create a conditional variable

  • Declaration

    struct Channel(T);

    A ref-counted channel that is safe to share between multiple fibers. In essence it's a multiple producer single consumer queue, that implements OutputRange and InputRange concepts.

    • put

      Declaration

      void put(T value);

      OutputRange contract - puts a new item into the channel.

    • Declaration

      bool empty();

      Part of InputRange contract - checks if there is an item in the queue. Returns true if channel is closed and its buffer is exhausted.

    • Declaration

      ref T front();

      Part of InputRange contract - returns an item available in the channel.

    • Declaration

      void popFront();

      Part of InputRange contract - advances range forward.

  • Declaration

    @safe auto channel(T)(size_t capacity = 1);

    Create a new shared Channel with given capacity.

  • Declaration

    @trusted void select(Args...)(auto ref Args args) if (allSatisfy!(isChannel, Even!Args) && allSatisfy!(isHandler, Odd!Args));

    Multiplex between multiple channels, executes a lambda attached to the first channel that becomes ready to read.

  • Declaration

    enum auto isChannel(T);

    Trait for testing if a type is Channel

  • Declaration

    class Pool(T);

    Generic pool

    • Declaration

      void release(Pooled!T item);

      Put pooled item to reuse

    • Declaration

      void dispose(Pooled!T item);

      call on items that errored or cannot be reused for some reason

  • Declaration

    @trusted auto pool(T)(size_t size, Duration maxIdle, T delegate() open, void delegate(ref T) close);

    Create generic pool for resources, open creates new resource, close releases the resource.