19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
-
+
|
use crate::problem::{
FirstOrderProblem, ResultSender, SubgradientExtender, UpdateSendError, UpdateSender, UpdateState,
};
use crate::{DVector, Minorant, Real};
use log::{debug, error, info};
use mpi::environment::Universe;
use mpi::topology::{Communicator, SystemCommunicator};
use mpi::topology::{Communicator, SimpleCommunicator};
use mpi::Rank;
use serde::{Deserialize, Serialize};
use std::sync::mpsc::{channel, Sender};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use thiserror::Error;
use crate::mpi::msg::{recv_msg, send_msg};
|
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
|
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
|
-
+
-
+
|
let nsubs = {
let mut p = self.problem.write().unwrap();
p.start();
p.num_subproblems()
};
if self.compute_tx.is_none() {
let world = SystemCommunicator::world();
let world = SimpleCommunicator::world();
let nworkers = world.size() - 1;
let (compute_tx, compute_rx) = channel::<ClientMessage<P>>();
// First start one receiver thread for each worker. Each thread gets the result-channels
// (for the algorithm) via another channel.
let result_txs = (0..nworkers)
.map(|client_idx| {
let rank = client_idx as Rank + 1;
let (req_tx, req_rx) = channel::<(usize, Box<dyn ResultSender<Self>>)>();
let compute_tx = compute_tx.clone();
thread::spawn(move || {
let world = SystemCommunicator::world();
let world = SimpleCommunicator::world();
let worker = world.process_at_rank(rank);
let mut result_txs = (0..nsubs).map(|_| None).collect::<Vec<_>>();
loop {
let msg = recv_msg(&worker);
let index = match msg {
ResultMsg::ObjectiveValue { index, .. } => index,
|
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
|
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
|
-
+
|
});
req_tx
})
.collect::<Vec<_>>();
// This thread forwards computation requests to the workers.
thread::spawn(move || {
let world = SystemCommunicator::world();
let world = SimpleCommunicator::world();
let mut numjobs = vec![0; nworkers as usize];
let mut start_times = vec![Instant::now(); nworkers as usize];
while let Ok(clientmsg) = compute_rx.recv() {
match clientmsg {
ClientMessage::WorkerTask { msg, result_tx } => match msg {
WorkerMsg::Terminate | WorkerMsg::ApplyUpdate(..) => {
// send message to all workers
|