Documentation
¶
Index ¶
- type Command
- func (cmd *Command) Broadcast(outChannels map[int]chan Command) bool
- func (cmd *Command) Fail(msg string) bool
- func (cmd *Command) Forward(outChannel chan Command) bool
- func (cmd *Command) SafeReply(reply CommandReply) error
- func (cmd *Command) SafeSend(out chan Command) error
- func (cmd Command) Send(outChannel chan Command) string
- func (cmd Command) String() string
- func (cmd *Command) Success(msg string) bool
- func (cmd Command) ToJSON() []byte
- type CommandReply
- type HTTPHandler
- type JobqueueKeepAliveHandler
- type KeepAlive
- type KeepAliveConf
- type Runner
- type SignalHandler
- type TaskManager
- func (task *TaskManager) CopyFrom(autotask TaskManager, tPath string)
- func (task *TaskManager) ListWorkers() []string
- func (task *TaskManager) MaintainWorkerCardinality() error
- func (task *TaskManager) RunCommand(cmd Command)
- func (task *TaskManager) Set(cmd Command)
- func (task *TaskManager) Start(commands chan Command, cmd Command)
- func (task *TaskManager) StartWorker() error
- func (task *TaskManager) Status() (ret map[string]string)
- func (task *TaskManager) Stop()
- func (task *TaskManager) StopWorker(pid int, ch chan Command)
- func (task *TaskManager) StopWorkers()
- func (task *TaskManager) StopWorkersByPid(pids []int)
- type TaskManagerConf
- type Worker
- type WorkerInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Command ¶
type Command struct {
Type string `json:"type"`
TaskName string `json:"taskname,omitempty"`
Params map[string]interface{} `json:"params,omitempty"`
Timeout int64 `json:"timeout"`
ReplyChannel chan CommandReply
}
Command sent on the command channel. Might be specific to a task or generic. The type can be one of 'status', 'set', 'stop', 'listworkers', 'stopworkers' or 'stoppedworkers'
func (*Command) Broadcast ¶
Broadcast the command to other channels, wait for all the replies and close the channel
func (*Command) Forward ¶
Forward the command to another channel, wait for the reply and close the channel
func (*Command) SafeReply ¶
func (cmd *Command) SafeReply(reply CommandReply) error
SafeReply sends a reply but recovers from the panic if the output channel is closed
func (*Command) SafeSend ¶
SafeSend attempts sending a command to an output channel, recovering from the panic if the channel was closed in the meanwhile
type CommandReply ¶
CommandReply is the type of a reply on the ReplyChannel for a Command. It contains the successful response (string) or an error on command failure
func (CommandReply) String ¶
func (r CommandReply) String() string
String implements the Stringer interface
type HTTPHandler ¶
HTTPHandler holds the configuration for the HTTP handler
func (*HTTPHandler) Run ¶
func (handler *HTTPHandler) Run()
Run the HTTP handler, which exposes an HTTP interface to control tasks
type JobqueueKeepAliveHandler ¶
JobqueueKeepAliveHandler contains the configuration for the Keep-Alive handler
func (*JobqueueKeepAliveHandler) Run ¶
func (handler *JobqueueKeepAliveHandler) Run(keepalives chan<- KeepAlive)
Run method: Listen to keep-alive messages sent via ZeroMQ and forward them to a channel
type KeepAliveConf ¶
type KeepAliveConf struct {
InboundPort int `json:"inbound_port,omitempty"`
InternalPort int `json:"internal_port,omitempty"`
Host string `json:"host,omitempty"`
StallTimeout int64 `json:"stall_timeout,omitempty"`
GracePeriod int64 `json:"grace_period,omitempty"`
}
KeepAliveConf contains the configuration for Keep-Alive handler and ZeroMQ channel
type Runner ¶
type Runner struct {
Conf TaskManagerConf
// contains filtered or unexported fields
}
Runner is a container for Task Managers
func NewRunner ¶
func NewRunner(taskMgrConf TaskManagerConf) (Runner, error)
NewRunner Returns an instance of a Task Manager Runner
type SignalHandler ¶
SignalHandler wrap the command channel
func (*SignalHandler) Run ¶
func (handler *SignalHandler) Run()
Run the Signal handler, to intercept interrupts and shut down processes cleanly
type TaskManager ¶
type TaskManager struct {
Name string `json:"name,omitempty"` // task name
Cmd string `json:"cmd,omitempty"` // cli command
Args []string `json:"args,omitempty"` // cli args
Cardinality int `json:"cardinality,omitempty"` // number of workers
StallTimeout int64 `json:"stall_timeout,omitempty"` // consider the worker dead if no keep-alives are received for this period (ms)
GracePeriod int64 `json:"grace_period,omitempty"` // grace period (ms) before killing a worker after being asked to stop
AutoStart bool `json:"autostart,omitempty"` // whether to start the task automatically
CaptureOutput bool `json:"capture_output,omitempty"` // whether to capture the output and send it to stdout
Active bool
// contains filtered or unexported fields
}
A TaskManager is a process manager for a specific task, keeping the cardinality of the number of worker processes to the desired value, and managing keep-alives
func NewTaskManager ¶
func NewTaskManager(name string, keepAliveConf KeepAliveConf, feedback chan Command) TaskManager
NewTaskManager creates a new Task Manager instance
func (*TaskManager) CopyFrom ¶
func (task *TaskManager) CopyFrom(autotask TaskManager, tPath string)
CopyFrom Updates settings for a task manager from another task manager
func (*TaskManager) ListWorkers ¶
func (task *TaskManager) ListWorkers() []string
ListWorkers returns status information from each worker process for this task
func (*TaskManager) MaintainWorkerCardinality ¶
func (task *TaskManager) MaintainWorkerCardinality() error
MaintainWorkerCardinality keeps the number of workers to the desired cardinality
func (*TaskManager) RunCommand ¶
func (task *TaskManager) RunCommand(cmd Command)
RunCommand runs a command on this task. Results are sent to the reply channel of the command itself
func (*TaskManager) Set ¶
func (task *TaskManager) Set(cmd Command)
Set task options (only "cardinality" is supported ATM)
func (*TaskManager) Start ¶
func (task *TaskManager) Start(commands chan Command, cmd Command)
Start the workers for this task
func (*TaskManager) StartWorker ¶
func (task *TaskManager) StartWorker() error
StartWorker creates a new worker process
func (*TaskManager) Status ¶
func (task *TaskManager) Status() (ret map[string]string)
Status gets the status for this task (number of active workers, last alive TS, etc.)
func (*TaskManager) Stop ¶
func (task *TaskManager) Stop()
Stop asks all of this task's workers stop gracefully (or forcefully if they don't terminate in a timely fashion)
func (*TaskManager) StopWorker ¶
func (task *TaskManager) StopWorker(pid int, ch chan Command)
StopWorker sends a SIGTERM signal to the worker process identified by the given pid
func (*TaskManager) StopWorkers ¶
func (task *TaskManager) StopWorkers()
StopWorkers asks all workers for this task to stop and waits for them to terminate
func (*TaskManager) StopWorkersByPid ¶
func (task *TaskManager) StopWorkersByPid(pids []int)
StopWorkersByPid asks all workers in the pid list to stop and waits for them to terminate
type TaskManagerConf ¶
type TaskManagerConf struct {
Path string `json:"path,omitempty"`
FileSuffix string `json:"filesuffix,omitempty"`
Port int `json:"port,omitempty"`
Autotasks map[string]TaskManager `json:"autotasks,omitempty"`
Keepalives KeepAliveConf `json:"keepalives,omitempty"`
ForceTimeout int64 `json:"force_timeout"`
Profiler profiler.Config `json:"profiler"`
}
TaskManagerConf contains the configuration for the Task Manager
type Worker ¶
type Worker struct {
Taskname string `json:"taskname"`
Command string `json:"command,omitempty"`
Args []string `json:"args,omitempty"`
CaptureOutput bool `json:"capture_output,omitempty"`
StallTimeout int64 `json:"stall_timeout,omitempty"`
GracePeriod int64 `json:"grace_period,omitempty"` // grace period (ms) before killing a worker after being asked to stop
Pid int `json:"pid,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
LastAliveAt time.Time `json:"last_alive_at,omitempty"`
Logger *log.Logger `json:"-"` // don't export
TaskFeedbackChannel chan<- Command `json:"-"` // don't export
CommandsChannel chan Command `json:"-"` // don't export
// contains filtered or unexported fields
}
Worker manages a single worker process
func (*Worker) HasStalled ¶
HasStalled checks if the worker process is alive and has sent a keep-alive message recently
func (*Worker) Info ¶
func (w *Worker) Info(replyChan chan<- CommandReply)
Info returns the status of the current worker process
func (*Worker) IsProcessAlive ¶
IsProcessAlive checks if the process is still around @see http://stackoverflow.com/questions/15204162/check-if-a-process-exists-in-go-way
func (*Worker) Start ¶
func (w *Worker) Start() (*WorkerInfo, error)
Start spawns a new worker process
type WorkerInfo ¶
WorkerInfo is a wrapper for the process pid and the channels to communicate with the task manager