module Xapi_work_queues:sig
..end
Consult the module documentation.
A worker pool has a limited number of worker threads. Each worker pops one tagged item from the queue in a round-robin fashion. While the item is executed the tag temporarily doesn't participate in round-robin scheduling. If during execution more items get queued with the same tag they get redirected to a private queue. Once the item finishes execution the tag will participate in RR scheduling again.
This ensures that items with the same tag do not get executed in parallel, and that a tag with a lot of items does not starve the execution of other tags.
module Operation = struct
type t = string
let rpc_of_t = Rpc.rpc_of_string
let execute op = print_endline op
end
module WorkerPool = Xapi_work_queues.Make(struct
type task = Operation.t -> unit
type t = Operation.t * task
let dump_item (op, _) = Rpc.rpc_of_string op
let dump_task _ = Rpc.rpc_of_unit ()
let execute (op, f) = f op
let finally _ = ()
let should_keep _ _ = true
end)
let () =
let pool = WorkerPool.create 25 in
WorkerPool.push pool "tag1" (Start, Operation.execute);
WorkerPool.push pool "tag2" (Start, Operation.execute);
WorkerPool.push pool "tag2" (Stop, Operation.execute);
WorkerPool.push pool "tag1" (Stop, Operation.execute);
...
module type Item =sig
..end
module type S =sig
..end
module Make(
I
:
Item
)
:S
with type item = I.t
val tests : OUnit2.test list