RsBundle  Diff

Differences From Artifact [638e947dea]:

  • File src/mcf/problem.rs — part of check-in [abf6b6de39] at 2023-01-16 14:31:23 on branch trunk — mmcf: add configuration for artificial delays (user: fifr size: 19812) [more...]

To Artifact [2eab402155]:

  • File src/mcf/problem.rs — part of check-in [78fe5f1328] at 2023-04-05 09:46:57 on branch mpi — Add `DistributedFirstOrderProblem` trait (user: fifr size: 22936)

11
12
13
14
15
16
17


18
19
20
21
22
23
24
25
26
27
28
29
30
31

32
33
34
35
36
37
38
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41







+
+














+







// General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program.  If not, see  <http://www.gnu.org/licenses/>
//

use crate::mcf::{self, Solver};
#[cfg(feature = "mpi")]
use crate::mpi::DistributedFirstOrderProblem;
use crate::problem::{
    FirstOrderProblem as ParallelProblem, ResultSender, UpdateSender, UpdateState as ParallelUpdateState,
};
use crate::{DVector, Minorant, Real};

use log::{debug, warn};
use num_traits::Float;
use thiserror::Error;
use threadpool::ThreadPool;

use std::f64::INFINITY;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::result;
use std::sync::mpsc::Sender;
use std::sync::{Arc, RwLock};
use std::time::Duration;

/// An error in the mmcf file format.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum MMCFReadError {
436
437
438
439
440
441
442















































443
444
445
446
447
448
449
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499







+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







            for (j, x) in primal.iter().enumerate() {
                aggr[j].add_scaled(alpha, x);
            }
        }

        aggr
    }

    fn create_update<U>(&self, state: U, sender: Sender<Vec<usize>>) -> Result<bool>
    where
        U: ParallelUpdateState<DVector>,
    {
        if self.inactive_constraints.read().unwrap().is_empty() {
            return Ok(false);
        }

        let subdatas = self.subdatas.clone();
        let inactive_constraints = self.inactive_constraints.clone();

        self.pool.as_ref().unwrap().execute(move || {
            let newconstraints = {
                inactive_constraints
                    .read()
                    .unwrap()
                    .iter()
                    .map(|&cidx| {
                        subdatas
                            .iter()
                            .enumerate()
                            .map(|(fidx, sub)| {
                                let primal = state.aggregated_primal(fidx);
                                sub.evaluate_constraint(primal, cidx)
                            })
                            .sum::<Real>()
                    })
                    .enumerate()
                    .filter_map(|(i, sg)| if sg < 1e-3 { Some(i) } else { None })
                    .collect::<Vec<_>>()
            };

            if !newconstraints.is_empty() {
                sender.send(newconstraints).unwrap();
            }
        });

        Ok(true)
    }

    fn apply_update(&mut self, update: &[usize]) -> result::Result<(), Error> {
        let mut inactive = self.inactive_constraints.write().unwrap();
        let mut active = self.active_constraints.write().unwrap();
        active.extend(update.iter().rev().map(|&i| inactive.swap_remove(i)));
        Ok(())
    }
}

impl ParallelProblem for MMCFProblem {
    type Err = Error;

    type Minorant = (Real, DVector, DVector);

566
567
568
569
570
571
572









































616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663







+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
                }
            }
        });

        Ok(())
    }
}

impl DistributedFirstOrderProblem for MMCFProblem {
    type Update = Vec<usize>;

    fn create_update<U>(&self, state: U, sender: Sender<Self::Update>) -> result::Result<bool, Self::Err>
    where
        U: ParallelUpdateState<<Self::Minorant as Minorant>::Primal>,
    {
        MMCFProblem::create_update(self, state, sender)
    }

    fn apply_update(&mut self, update: &Self::Update) -> result::Result<(), Self::Err> {
        MMCFProblem::apply_update(self, update)
    }

    fn send_update<S>(&self, update: &Self::Update, tx: S) -> result::Result<(), Self::Err>
    where
        S: UpdateSender<Self> + 'static,
    {
        let nnew = update.len();
        let subdatas = self.subdatas.clone();
        let actives = self.active_constraints.read().unwrap().clone();
        if let Err(err) = tx.add_variables(
            vec![(0.0, Real::infinity()); nnew],
            Box::new(move |fidx: usize, m: &mut Self::Minorant| {
                let sub = &subdatas[fidx];
                let n = m.dim();
                let primal = &m.2;
                m.1.extend((n..actives.len()).map(|i| sub.evaluate_constraint(primal, actives[i])));
                Ok(())
            }),
        ) {
            warn!("Error sending problem update: {}", err);
        }

        Ok(())
    }
}

unsafe impl Send for MMCFProblem {}
unsafe impl Sync for MMCFProblem {}