package com.groupon.core.network.streamparsing;

import com.groupon.core.network.streamparsing.StreamParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import rx.AsyncEmitter;
import rx.Observable;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

/* loaded from: classes2.dex */
public class StreamParsingClient<RESPONSE, ITEM> {
    private final OkHttpClient httpClient;
    private final Object lock = new Object();
    private Request nextRequest;
    private final StreamParsingNextRequestFactory<RESPONSE, ITEM> nextRequestFactory;
    private Request request;
    private final StreamParser<RESPONSE, ITEM> responseParser;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes2.dex */
    public class DefaultEmitter implements Action1<AsyncEmitter<ITEM>> {
        private final Request request;

        private DefaultEmitter() {
            this.request = StreamParsingClient.this.request;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // rx.functions.Action1
        public void call(AsyncEmitter<ITEM> asyncEmitter) {
            SubscribeOnItemParseListener subscribeOnItemParseListener = null;
            Response response = null;
            try {
                response = StreamParsingClient.this.httpClient.newCall(this.request).execute();
                if (response.isSuccessful()) {
                    SubscribeOnItemParseListener subscribeOnItemParseListener2 = new SubscribeOnItemParseListener(asyncEmitter);
                    try {
                        Object parse = StreamParsingClient.this.responseParser.parse(response.body().byteStream(), subscribeOnItemParseListener2);
                        synchronized (StreamParsingClient.this.lock) {
                            if (this.request == StreamParsingClient.this.request) {
                                StreamParsingClient.this.nextRequest = StreamParsingClient.this.nextRequestFactory.createNextRequest(response, (Response) parse);
                            }
                        }
                        asyncEmitter.onCompleted();
                        subscribeOnItemParseListener = subscribeOnItemParseListener2;
                    } catch (IOException e) {
                        e = e;
                        subscribeOnItemParseListener = subscribeOnItemParseListener2;
                        if (subscribeOnItemParseListener != null && !subscribeOnItemParseListener.getItems().isEmpty()) {
                            synchronized (StreamParsingClient.this.lock) {
                                if (this.request == StreamParsingClient.this.request) {
                                    StreamParsingClient.this.nextRequest = StreamParsingClient.this.nextRequestFactory.createNextRequest(response, (List) subscribeOnItemParseListener.getItems());
                                }
                            }
                        }
                        asyncEmitter.onError(e);
                    }
                } else {
                    asyncEmitter.onError(new UnsuccessfulResponseException(response));
                }
            } catch (IOException e2) {
                e = e2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes2.dex */
    public static class SubscribeOnItemParseListener<ITEM> implements StreamParser.OnItemParseListener<ITEM> {
        private final AsyncEmitter<ITEM> emitter;
        private final List<ITEM> items;

        private SubscribeOnItemParseListener(AsyncEmitter<ITEM> asyncEmitter) {
            this.items = new ArrayList();
            this.emitter = asyncEmitter;
        }

        public List<ITEM> getItems() {
            return this.items;
        }

        @Override // com.groupon.core.network.streamparsing.StreamParser.OnItemParseListener
        public void onItemParsed(ITEM item) {
            this.emitter.onNext(item);
            this.items.add(item);
        }
    }

    public StreamParsingClient(@Nonnull OkHttpClient okHttpClient, @Nonnull StreamParser<RESPONSE, ITEM> streamParser, @Nonnull StreamParsingNextRequestFactory<RESPONSE, ITEM> streamParsingNextRequestFactory) {
        this.httpClient = okHttpClient;
        this.responseParser = streamParser;
        this.nextRequestFactory = streamParsingNextRequestFactory;
    }

    private Observable<ITEM> createObservable() {
        return Observable.fromEmitter(new DefaultEmitter(), AsyncEmitter.BackpressureMode.BUFFER).subscribeOn(Schedulers.io());
    }

    public boolean canStreamMore() {
        boolean z;
        synchronized (this.lock) {
            z = this.nextRequest != null;
        }
        return z;
    }

    public Observable<ITEM> stream(Request request) {
        Observable<ITEM> createObservable;
        synchronized (this.lock) {
            this.nextRequest = null;
            this.request = request;
            createObservable = createObservable();
        }
        return createObservable;
    }

    public Observable<ITEM> streamMore() {
        Observable<ITEM> createObservable;
        synchronized (this.lock) {
            if (this.nextRequest == null) {
                createObservable = Observable.empty();
            } else {
                this.request = this.nextRequest;
                createObservable = createObservable();
            }
        }
        return createObservable;
    }
}
