RsBundle  Diff

Differences From Artifact [f13cb767bd]:

  • File src/solver/sync.rs — part of check-in [43abfc4c67] at 2019-11-22 09:24:06 on branch result-sender — problem: make `ResultSender` a trait with implementation `ChannelSender` (user: fifr size: 29818)

To Artifact [4aa0bc1e03]:

  • File src/solver/sync.rs — part of check-in [9198cc93a8] at 2019-11-22 12:00:42 on branch result-sender — Merge trunk (user: fifr size: 30609) [more...]

41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66


67
68
69
70
71
72
73
74










75
76
77

78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

97
98
99
100
101
102
103
104
105
106

107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124






125

126
127
128
129
130
131
132
133
134
135
136
137
pub type DefaultSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::FullMasterBuilder>;

/// The minimal bundle solver.
pub type NoBundleSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::MinimalMasterBuilder>;

/// Error raised by the parallel bundle [`Solver`].
#[derive(Debug)]
pub enum Error<E> {
    /// An error raised when creating a new master problem solver.
    BuildMaster(Box<dyn std::error::Error>),
    /// An error raised by the master problem process.
    Master(Box<dyn std::error::Error>),
    /// The iteration limit has been reached.
    IterationLimit { limit: usize },
    /// An error raised by a subproblem evaluation.
    Evaluation(E),
    /// An error raised subproblem update.
    Update(E),
    /// The dimension of some data is wrong.
    Dimension(String),
    /// Invalid bounds for a variable.
    InvalidBounds { lower: Real, upper: Real },
    /// The value of a variable is outside its bounds.
    ViolatedBounds { lower: Real, upper: Real, value: Real },
    /// The variable index is out of bounds.
    InvalidVariable { index: usize, nvars: usize },


    /// An error occurred in a subprocess.
    Process(RecvError),
    /// A method requiring an initialized solver has been called.
    NotInitialized,
    /// The problem has not been solved yet.
    NotSolved,
}











impl<E> std::fmt::Display for Error<E>
where
    E: std::fmt::Display,

{
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
        use Error::*;
        match self {
            BuildMaster(err) => writeln!(fmt, "Cannot create master problem solver: {}", err),
            Master(err) => writeln!(fmt, "Error in master problem: {}", err),
            IterationLimit { limit } => writeln!(fmt, "The iteration limit has been reached: {}", limit),
            Evaluation(err) => writeln!(fmt, "Error in subproblem evaluation: {}", err),
            Update(err) => writeln!(fmt, "Error in subproblem update: {}", err),
            Dimension(what) => writeln!(fmt, "Wrong dimension for {}", what),
            InvalidBounds { lower, upper } => write!(fmt, "Invalid bounds, lower:{}, upper:{}", lower, upper),
            ViolatedBounds { lower, upper, value } => write!(
                fmt,
                "Violated bounds, lower:{}, upper:{}, value:{}",
                lower, upper, value
            ),
            InvalidVariable { index, nvars } => {
                write!(fmt, "Variable index out of bounds, got:{} must be < {}", index, nvars)
            }

            Process(err) => writeln!(fmt, "Error in subprocess: {}", err),
            NotInitialized => writeln!(fmt, "The solver must be initialized (called Solver::init()?)"),
            NotSolved => writeln!(fmt, "The problem has not been solved yet"),
        }
    }
}

impl<E> std::error::Error for Error<E>
where
    E: std::error::Error + 'static,

{
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        use Error::*;
        match self {
            BuildMaster(err) => Some(err.as_ref()),
            Master(err) => Some(err.as_ref()),
            Evaluation(err) => Some(err),
            Process(err) => Some(err),
            _ => None,
        }
    }
}

impl<E, MErr> From<masterprocess::Error<MErr>> for Error<E>
where
    MErr: std::error::Error + 'static,
{
    fn from(err: masterprocess::Error<MErr>) -> Error<E> {






        Error::Master(err.into())

    }
}

impl<E> From<RecvError> for Error<E> {
    fn from(err: RecvError) -> Error<E> {
        Error::Process(err.into())
    }
}

type ClientSender<P> =
    Sender<std::result::Result<EvalResult<usize, <P as FirstOrderProblem>::Primal>, <P as FirstOrderProblem>::Err>>;








|

|

|



|

|








>
>








>
>
>
>
>
>
>
>
>
>
|

|
>



















>







|

|
>




|
|







|



|
>
>
>
>
>
>
|
>



|
|







41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
pub type DefaultSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::FullMasterBuilder>;

/// The minimal bundle solver.
pub type NoBundleSolver<P> = Solver<P, StandardTerminator, HKWeighter, crate::master::MinimalMasterBuilder>;

/// Error raised by the parallel bundle [`Solver`].
#[derive(Debug)]
pub enum Error<MErr, PErr> {
    /// An error raised when creating a new master problem solver.
    BuildMaster(MErr),
    /// An error raised by the master problem process.
    Master(MErr),
    /// The iteration limit has been reached.
    IterationLimit { limit: usize },
    /// An error raised by a subproblem evaluation.
    Evaluation(PErr),
    /// An error raised subproblem update.
    Update(PErr),
    /// The dimension of some data is wrong.
    Dimension(String),
    /// Invalid bounds for a variable.
    InvalidBounds { lower: Real, upper: Real },
    /// The value of a variable is outside its bounds.
    ViolatedBounds { lower: Real, upper: Real, value: Real },
    /// The variable index is out of bounds.
    InvalidVariable { index: usize, nvars: usize },
    /// Disconnected channel.
    Disconnected,
    /// An error occurred in a subprocess.
    Process(RecvError),
    /// A method requiring an initialized solver has been called.
    NotInitialized,
    /// The problem has not been solved yet.
    NotSolved,
}

/// The result type of the solver.
///
/// - `T` is the value type,
/// - `P` is the `FirstOrderProblem` associated with the solver,
/// - `M` is the `MasterBuilder` associated with the solver.
pub type Result<T, P, M> = std::result::Result<
    T,
    Error<<<M as MasterBuilder>::MasterProblem as MasterProblem>::Err, <P as FirstOrderProblem>::Err>,
>;

impl<MErr, PErr> std::fmt::Display for Error<MErr, PErr>
where
    MErr: std::fmt::Display,
    PErr: std::fmt::Display,
{
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
        use Error::*;
        match self {
            BuildMaster(err) => writeln!(fmt, "Cannot create master problem solver: {}", err),
            Master(err) => writeln!(fmt, "Error in master problem: {}", err),
            IterationLimit { limit } => writeln!(fmt, "The iteration limit has been reached: {}", limit),
            Evaluation(err) => writeln!(fmt, "Error in subproblem evaluation: {}", err),
            Update(err) => writeln!(fmt, "Error in subproblem update: {}", err),
            Dimension(what) => writeln!(fmt, "Wrong dimension for {}", what),
            InvalidBounds { lower, upper } => write!(fmt, "Invalid bounds, lower:{}, upper:{}", lower, upper),
            ViolatedBounds { lower, upper, value } => write!(
                fmt,
                "Violated bounds, lower:{}, upper:{}, value:{}",
                lower, upper, value
            ),
            InvalidVariable { index, nvars } => {
                write!(fmt, "Variable index out of bounds, got:{} must be < {}", index, nvars)
            }
            Disconnected => writeln!(fmt, "A channel got disconnected"),
            Process(err) => writeln!(fmt, "Error in subprocess: {}", err),
            NotInitialized => writeln!(fmt, "The solver must be initialized (called Solver::init()?)"),
            NotSolved => writeln!(fmt, "The problem has not been solved yet"),
        }
    }
}

impl<MErr, PErr> std::error::Error for Error<MErr, PErr>
where
    MErr: std::error::Error + 'static,
    PErr: std::error::Error + 'static,
{
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        use Error::*;
        match self {
            BuildMaster(err) => Some(err),
            Master(err) => Some(err),
            Evaluation(err) => Some(err),
            Process(err) => Some(err),
            _ => None,
        }
    }
}

impl<MErr, PErr> From<masterprocess::Error<MErr, PErr>> for Error<MErr, PErr>
where
    MErr: std::error::Error + 'static,
{
    fn from(err: masterprocess::Error<MErr, PErr>) -> Error<MErr, PErr> {
        use masterprocess::Error::*;
        match err {
            DisconnectedSender => Error::Disconnected,
            DisconnectedReceiver => Error::Disconnected,
            Aggregation(err) => Error::Master(err),
            SubgradientExtension(err) => Error::Update(err),
            Master(err) => Error::Master(err.into()),
        }
    }
}

impl<MErr, PErr> From<RecvError> for Error<MErr, PErr> {
    fn from(err: RecvError) -> Error<MErr, PErr> {
        Error::Process(err.into())
    }
}

type ClientSender<P> =
    Sender<std::result::Result<EvalResult<usize, <P as FirstOrderProblem>::Primal>, <P as FirstOrderProblem>::Err>>;

417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Into<Box<dyn std::error::Error + Sync + Send>> + Send + 'static,
    T: Terminator<SolverData> + Default,
    W: Weighter<SolverData> + Default,
    M: MasterBuilder,
    <M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self







|







439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
    /// This is actually the time of the last call to `Solver::init`.
    start_time: Instant,
}

impl<P, T, W, M> Solver<P, T, W, M>
where
    P: FirstOrderProblem,
    P::Err: Send + 'static,
    T: Terminator<SolverData> + Default,
    W: Weighter<SolverData> + Default,
    M: MasterBuilder,
    <M::MasterProblem as MasterProblem>::MinorantIndex: std::hash::Hash,
{
    /// Create a new parallel bundle solver.
    pub fn new(problem: P) -> Self
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
    ///
    /// This will reset the internal data structures so that a new fresh
    /// solution process can be started.
    ///
    /// It will also setup all worker processes.
    ///
    /// This function is automatically called by [`Solver::solve`].
    pub fn init(&mut self) -> Result<(), Error<P::Err>> {
        debug!("Initialize solver");

        let n = self.problem.num_variables();
        let m = self.problem.num_subproblems();

        self.data.init(dvec![0.0; n]);
        self.cnt_descent = 0;







|







508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
    ///
    /// This will reset the internal data structures so that a new fresh
    /// solution process can be started.
    ///
    /// It will also setup all worker processes.
    ///
    /// This function is automatically called by [`Solver::solve`].
    pub fn init(&mut self) -> Result<(), P, M> {
        debug!("Initialize solver");

        let n = self.problem.num_variables();
        let m = self.problem.num_subproblems();

        self.data.init(dvec![0.0; n]);
        self.cnt_descent = 0;
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
        self.data.cnt_remaining_mins = num_subproblems;
        self.data.nxt_d = Arc::new(dvec![0.0; num_variables]);
        self.data.nxt_y = Arc::new(dvec![]);
        self.data.updated = true;
    }

    /// Solve the problem with the default maximal iteration limit [`DEFAULT_ITERATION_LIMIT`].
    pub fn solve(&mut self) -> Result<(), Error<P::Err>> {
        self.solve_with_limit(DEFAULT_ITERATION_LIMIT)
    }

    /// Solve the problem with a maximal iteration limit.
    pub fn solve_with_limit(&mut self, limit: usize) -> Result<(), Error<P::Err>> {
        // First initialize the internal data structures.
        self.init()?;

        if self.solve_iter(limit)? {
            Ok(())
        } else {
            Err(Error::IterationLimit { limit })
        }
    }

    /// Solve the problem but stop after at most `niter` iterations.
    ///
    /// The function returns `Ok(true)` if the termination criterion
    /// has been satisfied. Otherwise it returns `Ok(false)` or an
    /// error code.
    ///
    /// If this function is called again, the solution process is
    /// continued from the previous point. Because of this one *must*
    /// call `init()` before the first call to this function.
    pub fn solve_iter(&mut self, niter: usize) -> Result<bool, Error<P::Err>> {
        debug!("Start solving up to {} iterations", niter);

        self.reset_iteration_data(niter);
        loop {
            select! {
                recv(self.client_rx.as_ref().ok_or(Error::NotInitialized)?) -> msg => {
                    let msg = msg?.map_err(Error::Evaluation)?;







|




|



















|







587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
        self.data.cnt_remaining_mins = num_subproblems;
        self.data.nxt_d = Arc::new(dvec![0.0; num_variables]);
        self.data.nxt_y = Arc::new(dvec![]);
        self.data.updated = true;
    }

    /// Solve the problem with the default maximal iteration limit [`DEFAULT_ITERATION_LIMIT`].
    pub fn solve(&mut self) -> Result<(), P, M> {
        self.solve_with_limit(DEFAULT_ITERATION_LIMIT)
    }

    /// Solve the problem with a maximal iteration limit.
    pub fn solve_with_limit(&mut self, limit: usize) -> Result<(), P, M> {
        // First initialize the internal data structures.
        self.init()?;

        if self.solve_iter(limit)? {
            Ok(())
        } else {
            Err(Error::IterationLimit { limit })
        }
    }

    /// Solve the problem but stop after at most `niter` iterations.
    ///
    /// The function returns `Ok(true)` if the termination criterion
    /// has been satisfied. Otherwise it returns `Ok(false)` or an
    /// error code.
    ///
    /// If this function is called again, the solution process is
    /// continued from the previous point. Because of this one *must*
    /// call `init()` before the first call to this function.
    pub fn solve_iter(&mut self, niter: usize) -> Result<bool, P, M> {
        debug!("Start solving up to {} iterations", niter);

        self.reset_iteration_data(niter);
        loop {
            select! {
                recv(self.client_rx.as_ref().ok_or(Error::NotInitialized)?) -> msg => {
                    let msg = msg?.map_err(Error::Evaluation)?;
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634

    /// Handle a response from a subproblem evaluation.
    ///
    /// The function returns `Ok(true)` if the final iteration count has been reached.
    fn handle_client_response(
        &mut self,
        msg: EvalResult<usize, <P as FirstOrderProblem>::Primal>,
    ) -> Result<bool, Error<P::Err>> {
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        match msg {
            EvalResult::ObjectiveValue { index, value } => {
                debug!("Receive objective from subproblem {}: {}", index, value);
                if self.data.nxt_ubs[index].is_infinite() {
                    self.data.cnt_remaining_ubs -= 1;
                }







|







642
643
644
645
646
647
648
649
650
651
652
653
654
655
656

    /// Handle a response from a subproblem evaluation.
    ///
    /// The function returns `Ok(true)` if the final iteration count has been reached.
    fn handle_client_response(
        &mut self,
        msg: EvalResult<usize, <P as FirstOrderProblem>::Primal>,
    ) -> Result<bool, P, M> {
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        match msg {
            EvalResult::ObjectiveValue { index, value } => {
                debug!("Receive objective from subproblem {}: {}", index, value);
                if self.data.nxt_ubs[index].is_infinite() {
                    self.data.cnt_remaining_ubs -= 1;
                }
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
        // Compute the new candidate. The main loop will wait for the result of
        // this solution process of the master problem.
        self.master_proc.as_mut().unwrap().solve(self.data.cur_val)?;

        Ok(self.data.cnt_iter >= self.data.max_iter)
    }

    fn handle_master_response(&mut self, master_res: MasterResponse) -> Result<bool, Error<P::Err>> {
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;

        self.data.nxt_mod = master_res.nxt_mod;
        self.data.sgnorm = master_res.sgnorm;
        self.data.expected_progress = self.data.cur_val - self.data.nxt_mod;
        self.data.cnt_updates = master_res.cnt_updates;








|







743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
        // Compute the new candidate. The main loop will wait for the result of
        // this solution process of the master problem.
        self.master_proc.as_mut().unwrap().solve(self.data.cur_val)?;

        Ok(self.data.cnt_iter >= self.data.max_iter)
    }

    fn handle_master_response(&mut self, master_res: MasterResponse) -> Result<bool, P, M> {
        let master = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;

        self.data.nxt_mod = master_res.nxt_mod;
        self.data.sgnorm = master_res.sgnorm;
        self.data.expected_progress = self.data.cur_val - self.data.nxt_mod;
        self.data.cnt_updates = master_res.cnt_updates;

776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
            self.problem
                .evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone()))
                .map_err(Error::Evaluation)?;
        }
        Ok(false)
    }

    fn update_problem(&mut self, step: Step) -> Result<bool, Error<P::Err>> {
        let master_proc = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        let (update_tx, update_rx) = channel();
        self.problem
            .update(
                UpdateData {
                    cur_y: Arc::new(self.data.cur_y.clone()),
                    nxt_y: self.data.nxt_y.clone(),







|







798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
            self.problem
                .evaluate(i, self.data.nxt_y.clone(), ChannelSender::new(i, client_tx.clone()))
                .map_err(Error::Evaluation)?;
        }
        Ok(false)
    }

    fn update_problem(&mut self, step: Step) -> Result<bool, P, M> {
        let master_proc = self.master_proc.as_mut().ok_or(Error::NotInitialized)?;
        let (update_tx, update_rx) = channel();
        self.problem
            .update(
                UpdateData {
                    cur_y: Arc::new(self.data.cur_y.clone()),
                    nxt_y: self.data.nxt_y.clone(),
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
            self.data.nxt_mod,
            self.data.nxt_val,
            self.data.cur_val
        );
    }

    /// Return the aggregated primal of the given subproblem.
    pub fn aggregated_primal(&self, subproblem: usize) -> Result<P::Primal, Error<P::Err>> {
        Ok(self
            .master_proc
            .as_ref()
            .ok_or(Error::NotSolved)?
            .get_aggregated_primal(subproblem)?)
    }
}







|







897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
            self.data.nxt_mod,
            self.data.nxt_val,
            self.data.cur_val
        );
    }

    /// Return the aggregated primal of the given subproblem.
    pub fn aggregated_primal(&self, subproblem: usize) -> Result<P::Primal, P, M> {
        Ok(self
            .master_proc
            .as_ref()
            .ok_or(Error::NotSolved)?
            .get_aggregated_primal(subproblem)?)
    }
}