package de.axelspringer.yana.streamview;

import de.axelspringer.yana.internal.models.FetchingState;
import de.axelspringer.yana.internal.providers.interfaces.ISchedulerProvider;
import de.axelspringer.yana.internal.services.article.ArticleFetchFailure;
import de.axelspringer.yana.internal.services.article.IFetchStatusInteractor;
import de.axelspringer.yana.internal.services.article.Progress;
import de.axelspringer.yana.internal.utils.option.Option;
import de.axelspringer.yana.internal.utils.rx.RxCacheProxy;
import javax.inject.Inject;
import kotlin.NotImplementedError;
import kotlin.jvm.internal.Intrinsics;
import rx.Observable;
import rx.functions.Action1;
import timber.log.Timber;

/* compiled from: FetchStatusInteractor.kt */
/* loaded from: classes3.dex */
public final class FetchStatusInteractor implements IFetchStatusInteractor, IFetchStatusSetter {
    private final RxCacheProxy<Option<ArticleFetchFailure>> fetchingErrorStream;
    private final RxCacheProxy<Progress> inProgress;
    private final ISchedulerProvider schedulerProvider;

    @Inject
    public FetchStatusInteractor(ISchedulerProvider schedulerProvider) {
        Intrinsics.checkParameterIsNotNull(schedulerProvider, "schedulerProvider");
        this.schedulerProvider = schedulerProvider;
        RxCacheProxy<Progress> create = RxCacheProxy.create(new Progress(null, false));
        Intrinsics.checkExpressionValueIsNotNull(create, "RxCacheProxy.create(Progress(null, false))");
        this.inProgress = create;
        RxCacheProxy<Option<ArticleFetchFailure>> create2 = RxCacheProxy.create(Option.none());
        Intrinsics.checkExpressionValueIsNotNull(create2, "RxCacheProxy.create(Opti…e<ArticleFetchFailure>())");
        this.fetchingErrorStream = create2;
    }

    @Override // de.axelspringer.yana.internal.services.article.IFetchStatusInteractor
    public Observable<Progress> getFetchInProgressOnceAndStream() {
        Observable<Progress> distinctUntilChanged = this.inProgress.asObservable(this.schedulerProvider.computation()).distinctUntilChanged();
        Intrinsics.checkExpressionValueIsNotNull(distinctUntilChanged, "inProgress.asObservable(…  .distinctUntilChanged()");
        return distinctUntilChanged;
    }

    @Override // de.axelspringer.yana.internal.services.article.IFetchStatusInteractor
    public Observable<Option<ArticleFetchFailure>> getObserveDistinctErrorOnceAndStream() {
        Observable<Option<ArticleFetchFailure>> doOnNext = this.fetchingErrorStream.asObservable(this.schedulerProvider.computation()).distinctUntilChanged().doOnNext(new Action1<Option<ArticleFetchFailure>>() { // from class: de.axelspringer.yana.streamview.FetchStatusInteractor$observeDistinctErrorOnceAndStream$1
            @Override // rx.functions.Action1
            public final void call(Option<ArticleFetchFailure> option) {
                Timber.d("Stream Error Stream: %s", option);
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(doOnNext, "fetchingErrorStream.asOb… Error Stream: %s\", it) }");
        return doOnNext;
    }

    @Override // de.axelspringer.yana.internal.services.article.IFetchStatusInteractor
    public io.reactivex.Observable<FetchingState> getObserveFetchingState() {
        throw new NotImplementedError("An operation is not implemented: not implemented");
    }

    @Override // de.axelspringer.yana.streamview.IFetchStatusSetter
    public void setFetchError(Option<ArticleFetchFailure> fetchError) {
        Intrinsics.checkParameterIsNotNull(fetchError, "fetchError");
        this.fetchingErrorStream.publish(fetchError);
    }

    @Override // de.axelspringer.yana.streamview.IFetchStatusSetter
    public void setFetchInProgress(Progress progress) {
        Intrinsics.checkParameterIsNotNull(progress, "progress");
        this.inProgress.publish(progress);
    }
}
