RsBundle  Diff

Differences From Artifact [f13cb767bd]:

  • File src/solver/sync.rs — part of check-in [43abfc4c67] at 2019-11-22 09:24:06 on branch result-sender — problem: make `ResultSender` a trait with implementation `ChannelSender` (user: fifr size: 29818)

To Artifact [4aa0bc1e03]:

  • File src/solver/sync.rs — part of check-in [9198cc93a8] at 2019-11-22 12:00:42 on branch result-sender — Merge trunk (user: fifr size: 30609) [more...]

41
42
43
44
45
46
47
48

49
50

51
52

53
54
55
56

57
58

59
60
61
62
63
64
65
66


67
68
69
70
71
72
73
74










75

76
77


78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

97
98
99
100
101
102
103
104

105
106


107
108
109
110
111
112


113
114
115
116
117
118
119
120

121
122
123
124
125









126
127
128
129
130


131
132
133
134
135
136
137
41
42
43
44
45
46
47

48
49

50
51

52
53
54
55

56
57

58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

87
88

89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

118
119

120
121
122
123
124
125


126
127
128
129
130
131
132
133
134

135
136
137
138


139
140
141
142
143
144
145
146
147
148
149
150


151
152
153
154
155
156
157
158
159







-
+

-
+

-
+



-
+

-
+








+
+








+
+
+
+
+
+
+
+
+
+
-
+

-
+
+



















+







-
+

-
+
+




-
-
+
+







-
+



-
-
+
+
+
+
+
+
+
+
+



-
-
+
+







pub type DefaultSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::FullMasterBuilder>;

/// The minimal bundle solver.
pub type NoBundleSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::MinimalMasterBuilder>;

/// Error raised by the parallel bundle [`Solver`].
#[derive(Debug)]
pub enum Error<E> {
pub enum Error<MErr, PErr> {
    /// An error raised when creating a new master problem solver.
    BuildMaster(Box<dyn std::error::Error>),
    BuildMaster(MErr),
    /// An error raised by the master problem process.
    Master(Box<dyn std::error::Error>),
    Master(MErr),
    /// The iteration limit has been reached.
    IterationLimit { limit: usize },
    /// An error raised by a subproblem evaluation.
    Evaluation(E),
    Evaluation(PErr),
    /// An error raised subproblem update.
    Update(E),
    Update(PErr),
    /// The dimension of some data is wrong.
    Dimension(String),
    /// Invalid bounds for a variable.
    InvalidBounds { lower: Real, upper: Real },
    /// The value of a variable is outside its bounds.
    ViolatedBounds { lower: Real, upper: Real, value: Real },
    /// The variable index is out of bounds.
    InvalidVariable { index: usize, nvars: usize },
    /// Disconnected channel.
    Disconnected,
    /// An error occurred in a subprocess.
    Process(RecvError),
    /// A method requiring an initialized solver has been called.
    NotInitialized,
    /// The problem has not been solved yet.
    NotSolved,
}

/// The result type of the solver.
///
/// - `T` is the value type,
/// - `P` is the `FirstOrderProblem` associated with the solver,
/// - `M` is the `MasterBuilder` associated with the solver.
pub type Result<T, P, M> = std::result::Result<
    T,
    Error<<<M as MasterBuilder>::MasterProblem as MasterProblem>::Err, <P as FirstOrderProblem>::Err>,
>;

impl<E> std::fmt::Display for Error<E>
impl<MErr, PErr> std::fmt::Display for Error<MErr, PErr>
where
    E: std::fmt::Display,
    MErr: std::fmt::Display,
    PErr: std::fmt::Display,
{
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
        use Error::*;
        match self {
            BuildMaster(err) => writeln!(fmt, "Cannot create master problem solver: {}", err),
            Master(err) => writeln!(fmt, "Error in master problem: {}", err),
            IterationLimit { limit } => writeln!(fmt, "The iteration limit has been reached: {}", limit),
            Evaluation(err) => writeln!(fmt, "Error in subproblem evaluation: {}", err),
            Update(err) => writeln!(fmt, "Error in subproblem update: {}", err),
            Dimension(what) => writeln!(fmt, "Wrong dimension for {}", what),
            InvalidBounds { lower, upper } => write!(fmt, "Invalid bounds, lower:{}, upper:{}", lower, upper),
            ViolatedBounds { lower, upper, value } => write!(
                fmt,
                "Violated bounds, lower:{}, upper:{}, value:{}",
                lower, upper, value
            ),
            InvalidVariable { index, nvars } => {
                write!(fmt, "Variable index out of bounds, got:{} must be < {}", index, nvars)
            }
            Disconnected => writeln!(fmt, "A channel got disconnected"),
            Process(err) => writeln!(fmt, "Error in subprocess: {}", err),
            NotInitialized => writeln!(fmt, "The solver must be initialized (called Solver::init()?)"),
            NotSolved => writeln!(fmt, "The problem has not been solved yet"),
        }
    }
}

impl<E> std::error::Error for Error<E>
impl<MErr, PErr> std::error::Error for Error<MErr, PErr>
where
    E: std::error::Error + 'static,
    MErr: std::error::Error + 'static,
    PErr: std::error::Error + 'static,
{
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        use Error::*;
        match self {
            BuildMaster(err) => Some(err.as_ref()),
            Master(err) => Some(err.as_ref()),
            BuildMaster(err) => Some(err),
            Master(err) => Some(err),
            Evaluation(err) => Some(err),
            Process(err) => Some(err),
            _ => None,
        }
    }
}

impl<E, MErr> From<masterprocess::Error<MErr>> for Error<E>
impl<MErr, PErr> From<masterprocess::Error<MErr, PErr>> for Error<MErr, PErr>
where
    MErr: std::error::Error + 'static,
{
    fn from(err: masterprocess::Error<MErr>) -> Error<E> {
        Error::Master(err.into())
    fn from(err: masterprocess::Error<MErr, PErr>) -> Error<MErr, PErr> {
        use masterprocess::Error::*;
        match err {
            DisconnectedSender => Error::Disconnected,
            DisconnectedReceiver => Error::Disconnected,
            Aggregation(err) => Error::Master(err),
            SubgradientExtension(err) => Error::Update(err),
            Master(err) => Error::Master(err.into()),
        }
    }
}

impl<E> From<RecvError> for Error<E> {
    fn from(err: RecvError) -> Error<E> {
impl<MErr, PErr> From<RecvError> for Error<MErr, PErr> {
    fn from(err: RecvError) -> Error<MErr, PErr> {
        Error::Process(err.into())
    }
}

type ClientSender<P> =
    Sender<std::result::Result<EvalResult<usize, <P as FirstOrderProblem>::Primal>, <P as FirstOrderProblem>::Err>>;

417
418
419
420
421
422
423
424

425
426
427
428
429
430
431
439
440
441
442
443
444
445

446
447
448
449
450
451
452
453







-
+







    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + Send + 'static,
    P::Err: Send + 'static,
    T: Terminator<SolverData> + Default,
    W: Weighter<SolverData> + Default,
    M: MasterBuilder,
    <M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self
486
487
488
489
490
491
492
493

494
495
496
497
498
499
500
508
509
510
511
512
513
514

515
516
517
518
519
520
521
522







-
+







    ///
    /// This will reset the internal data structures so that a new fresh
    /// solution process can be started.
    ///
    /// It will also setup all worker processes.
    ///
    /// This function is automatically called by [`Solver::solve`].
    pub fn init(&mut self) -> Result<(), Error<P::Err>> {
    pub fn init(&mut self) -> Result<(), P, M> {
        debug!("Initialize solver");

        let n = self.problem.num_variables();
        let m = self.problem.num_subproblems();

        self.data.init(dvec![0.0; n]);
        self.cnt_descent = 0;
565
566
567
568
569
570
571
572

573
574
575
576
577

578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597

598
599
600
601
602
603
604
587
588
589
590
591
592
593

594
595
596
597
598

599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618

619
620
621
622
623
624
625
626







-
+




-
+



















-
+







        self.data.cnt_remaining_mins = num_subproblems;
        self.data.nxt_d = Arc::new(dvec![0.0; num_variables]);
        self.data.nxt_y = Arc::new(dvec![]);
        self.data.updated = true;
    }

    /// Solve the problem with the default maximal iteration limit [`DEFAULT_ITERATION_LIMIT`].
    pub fn solve(&mut self) -> Result<(), Error<P::Err>> {
    pub fn solve(&mut self) -> Result<(), P, M> {
        self.solve_with_limit(DEFAULT_ITERATION_LIMIT)
    }

    /// Solve the problem with a maximal iteration limit.
    pub fn solve_with_limit(&mut self, limit: usize) -> Result<(), Error<P::Err>> {
    pub fn solve_with_limit(&mut self, limit: usize) -> Result<(), P, M> {
        // First initialize the internal data structures.
        self.init()?;

        if self.solve_iter(limit)? {
            Ok(())
        } else {
            Err(Error::IterationLimit { limit })
        }
    }

    /// Solve the problem but stop after at most `niter` iterations.
    ///
    /// The function returns `Ok(true)` if the termination criterion
    /// has been satisfied. Otherwise it returns `Ok(false)` or an
    /// error code.
    ///
    /// If this function is called again, the solution process is
    /// continued from the previous point. Because of this one *must*
    /// call `init()` before the first call to this function.
    pub fn solve_iter(&mut self, niter: usize) -> Result<bool, Error<P::Err>> {
    pub fn solve_iter(&mut self, niter: usize) -> Result<bool, P, M> {
        debug!("Start solving up to {} iterations", niter);

        self.reset_iteration_data(niter);
        loop {
            select! {
                recv(self.client_rx.as_ref().ok_or(Error::NotInitialized)?) -> msg => {
                    let msg = msg?.map_err(Error::Evaluation)?;
620
621
622
623
624
625
626
627

628
629
630
631
632
633
634
642
643
644
645
646
647
648

649
650
651
652
653
654
655
656







-
+








    /// Handle a response from a subproblem evaluation.
    ///
    /// The function returns `Ok(true)` if the final iteration count has been reached.
    fn handle_client_response(
        &mut self,
        msg: EvalResult<usize, <P as FirstOrderProblem>::Primal>,
    ) -> Result<bool, Error<P::Err>> {
    ) -> Result<bool, P, M> {
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        match msg {
            EvalResult::ObjectiveValue { index, value } => {
                debug!("Receive objective from subproblem {}: {}", index, value);
                if self.data.nxt_ubs[index].is_infinite() {
                    self.data.cnt_remaining_ubs -= 1;
                }
721
722
723
724
725
726
727
728

729
730
731
732
733
734
735
743
744
745
746
747
748
749

750
751
752
753
754
755
756
757







-
+







        // Compute the new candidate. The main loop will wait for the result of
        // this solution process of the master problem.
        self.master_proc.as_mut().unwrap().solve(self.data.cur_val)?;

        Ok(self.data.cnt_iter >= self.data.max_iter)
    }

    fn handle_master_response(&mut self, master_res: MasterResponse) -> Result<bool, Error<P::Err>> {
    fn handle_master_response(&mut self, master_res: MasterResponse) -> Result<bool, P, M> {
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;

        self.data.nxt_mod = master_res.nxt_mod;
        self.data.sgnorm = master_res.sgnorm;
        self.data.expected_progress = self.data.cur_val - self.data.nxt_mod;
        self.data.cnt_updates = master_res.cnt_updates;

776
777
778
779
780
781
782
783

784
785
786
787
788
789
790
798
799
800
801
802
803
804

805
806
807
808
809
810
811
812







-
+







            self.problem
                .evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone()))
                .map_err(Error::Evaluation)?;
        }
        Ok(false)
    }

    fn update_problem(&mut self, step: Step) -> Result<bool, Error<P::Err>> {
    fn update_problem(&mut self, step: Step) -> Result<bool, P, M> {
        let master_proc = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        let (update_tx, update_rx) = channel();
        self.problem
            .update(
                UpdateData {
                    cur_y: Arc::new(self.data.cur_y.clone()),
                    nxt_y: self.data.nxt_y.clone(),
875
876
877
878
879
880
881
882

883
884
885
886
887
888
889
897
898
899
900
901
902
903

904
905
906
907
908
909
910
911







-
+







            self.data.nxt_mod,
            self.data.nxt_val,
            self.data.cur_val
        );
    }

    /// Return the aggregated primal of the given subproblem.
    pub fn aggregated_primal(&self, subproblem: usize) -> Result<P::Primal, Error<P::Err>> {
    pub fn aggregated_primal(&self, subproblem: usize) -> Result<P::Primal, P, M> {
        Ok(self
            .master_proc
            .as_ref()
            .ok_or(Error::NotSolved)?
            .get_aggregated_primal(subproblem)?)
    }
}