Engine

Engine

The dproc.Engine is similar to a pipeline. It allows for many processes to be created and organized before the entire pipeline is executed. The Engine can be managed with a context.Context, context.CancelFunc and a sync.WaitGroup allowing for the pipeline to be killed.

Here’s the interface:

// Engine manages the pipeline
type Engine interface {
	Start(*sync.WaitGroup)
	Stop()
}

Starting the engine

Starting the Engine is simple. It requires calling engine.Start(sync.WaitGroup). However, there a helper method for creating the Engine which is surprisingly called NewEngine.

Its signature is:

func NewEngine(ctx context.Context, ps ProcessList) Engine

If the context is cancelled, it will be caught and will stop everything. However, the built-in Engine has a built-in context.CancelFunc that is called with engine.Stop.

Example: Creating and starting an Engine

This is the main from the included example. It is somewhat contrived. One process sends random numbers and the child process prints a count every second of how many messages were received. Normally between 2-3M messages are sent per second through this pipeline.

func main() {
	log.SetOutput(os.Stdout)
	ctx := context.Background()

	var wg sync.WaitGroup
	engine := dproc.NewEngine(ctx, dproc.ProcessList{
		dproc.NewDefaultProcess(ctx, "Random Numbers", &RandomGenerator{time.Second * 5}, dproc.ProcessList{
			dproc.NewDefaultProcess(ctx, "Random Logger", &RandomLogger{Ticker: time.NewTicker(time.Second)}, dproc.ProcessList{}),
		}),
	})
	start := time.Now()
	engine.Start(&wg)

	wg.Wait()
	fmt.Println("Elapsed: ", time.Since(start))
	fmt.Println("\nExiting...")
}