How can I send data on the fly to a worker when using parfeval?

28 views (last 30 days)
I'm writing an application which uses tcpip server sockets and I want it to run in the background so that my client is not blocked. I want to send commands to it at runtime not via tcpip. more specifically i tried to send a command to stop listening to incoming requests and then forced it to poll the queue (or whatever it is called) by sending a request over tcp. I don't want it to stop listening when there is an incoming request over tcp because then it can be closed by something other than my client, and I'm generally paranoid. When I tried just canceling the job from the parallel pool it gave me trouble with the socket not being closed properly. I can't seem to send anything to it using labsend and my search brought only dire news that it can't be done. Is there a way to transfer information to a worker started using parfeval at runtime (not when job starts/ends)? Or is there something else that gives me similar functionality and the ability to send information between the workers?
Oh and I don't want to manage my communication using files

Accepted Answer

Edric Ellis
Edric Ellis on 16 Oct 2018
I think it's possible to do this sort of thing by getting the workers to create a parallel.pool.PollableDataQueue, and getting the client to send messages to the worker. It's a bit awkward to set up, and in the example below, I'm going to use a parallel.pool.Constant to get the worker to hold onto the queue instance.
The general plan is this:
  1. Get the worker to construct a pollable dataqueue
  2. Send the dataqueue instance back to the client
  3. Set the worker off into an loop waiting for messages from the client
  4. Send messages from the client to get the worker to do stuff
  5. When done, get the client to send a "poison pill" which gets the worker to return the result.
Note the code below only works for a pool of size 1. To run on a larger pool, you would need to use parfevalOnAll rather than parfeval, and you'd also need to manage the multiple queues from the workers.
% First, create a parallel pool if necessary
if isempty(gcp())
parpool('local', 1);
end
% Get the worker to construct a data queue on which it can receive
% messages from the client
workerQueueConstant = parallel.pool.Constant(@parallel.pool.PollableDataQueue);
% Get the worker to send the queue object back to the client
workerQueueClient = fetchOutputs(parfeval(@(x) x.Value, 1, workerQueueConstant));
% Get the worker to start waiting for messages
future = parfeval(@doStuff, 1, workerQueueConstant);
% Send a few messages to the worker
for idx = 1:10
send(workerQueueClient, idx);
end
% Send [] as a "poison pill" to the worker to get it to stop
send(workerQueueClient, []);
% Get the result
fetchOutputs(future)
% This function gets the worker to keep processing messages from the client
function out = doStuff(qConstant)
q = qConstant.Value;
out = 0;
while true
% Wait for a message
data = poll(q, Inf);
if isempty(data)
return
else
out = out + data;
end
end
end
  3 Comments
John Smith
John Smith on 22 Jun 2020
Hi Edric,
Your answer is coming the closest to a problem I'm facing myself, but I have not been able to make this work with multiple workers.
I'm using parfeval and supply each worker with the subset of the database I want them to process. however, the process that my workers execute does not always take the same amount of time. This means that some workers finish well in advance. It would be more efficienct to supply all workers with the full database, and dictate 'on the fly' which dataset they need to process.
Your example made this work for one worker, but I have no idea how I would do this for a number of workers.
Does each worker need to construct this workqueueconstant?
my parfeval sits in a loop
for Worker = 1:Parallel_Workers
Answer(Worker) = parfeval(@functionhere, 1, var1, var2, Worker, Q);
end
should I put
workerQueueConstant = parallel.pool.Constant(@parallel.pool.PollableDataQueue);
% Get the worker to send the queue object back to the client
workerQueueClient = fetchOutputs(parfeval(@(x) x.Value, 1, workerQueueConstant));
In that same loop?
Thanks in advance,
sjerra
mirza ali
mirza ali on 20 Feb 2021
@Edric Ellissir thank you for your help in this regard. i am just facing another issue related to this. if i want to send data from function out = doStuff(qConstant) to another function (which is also running in background using parfeval), how i can do that?
Regards

Sign in to comment.

More Answers (0)

Categories

Find more on Asynchronous Parallel Programming in Help Center and File Exchange

Products


Release

R2018a

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!