How to asynchronously control distinct workers in a parallel pool separately

1 view (last 30 days)
I have created a parallel pool of 2 workes, and I would like to control them seperately. My aim is to do something similar to semaphore.
My first worker will create a tcpip server, handle the communication with the client. Normally, it would wait in a Busy state until a connection is established, but I will put it in a worker and do not wait until parfeval future is finished.
My second worker will handle some data capturing.
Meanwhile client will handle some data processing of previously captured data.
The main problem is about using parallel pool with tcpip object.
In some threads, it is adviced to use parallel.pool.Constant to use tcpip object inside parallel pool.
However when I use parallel.pool.Constant, it tries to create tcpip server in both workers and I get "Adress already in use: JVM Bind" error. If it could asynchronously command workers as I desire, I would create tcpip server in a single pool and use in only inside that worker. Is that possible?
spmd is also not okay for me, since it waits until all workers complete their tasks and works synchronously.
My code is basically like following:
p = parpool(2);
is_connection_lost = 1;
is_connection_established = 0;
data_ready = 0;
is_processed_data_sent = 1;
f2 = parfeval(p,@capture_data,1); % Captures data in prior
while(1)
% Worker 1:
% Creates tcpip server,
% waits until connection is established,
% sends data
if is_connection_lost
f1 = parfeval(p,@tcpip_server_create,1);
is_connection_lost = 0;
end
if strcmp(f1.State, 'finished') && ~is_connection_established
[~,t] = fetchNext(f1); % Problem here is that tcpip server object comes as "off"
is_connection_established = 1;
end
if is_connection_established && ~is_processed_data_sent
f1 = parfeval(p,@tcpip_send_data,1,data_processed);
is_processed_data_sent = 1;
end
% For now, it does not check whether client is alive or not. Will be added.
% Worker 2:
% Captures data. Works fine.
if strcmp(f2.State, 'finished')
[~,data] = fetchNext(f2);
f2 = parfeval(p,@capture_data,1); % Starts capturing new data meanwhile
data_ready = 1;
end
% Matlab Client:
% Processes data. Works fine.
if data_ready == 1
data_processed = process_data(data);
data_ready = 0;
is_processed_data_sent = 0;
end
end
function t = tcpip_server_create
t = tcpip('0.0.0.0',5000,'networkrole','server');
fopen(t);
function data = capture_data
data = zeros(100); % Assume we captured this one
function data_processed = process_data(data)
data_processed = 2 * data; % Assume we processed this one

Answers (0)

Categories

Find more on MATLAB Parallel Server in Help Center and File Exchange

Products


Release

R2019a

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!