Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 22 additions & 36 deletions app/src/main/java/com/demo/maat/hello_rxjava/PollingFragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,22 @@
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;

import rx.functions.Func1;

public class PollingFragment extends Fragment {


static final String TAG = "PollingFragment";
@BindView(R.id.btn_polling)
Button mBtnPolling;

int N=0;
private Subscription subscribe;
private CompositeSubscription mCompositeSubscription;
int N = 0;
private Subscription subscription;

@Override
public View onCreateView(LayoutInflater inflater, ViewGroup container,
Bundle savedInstanceState) {
public View onCreateView(LayoutInflater inflater, ViewGroup container, Bundle savedInstanceState) {
View view = inflater.inflate(R.layout.polling_fragment, container, false);
ButterKnife.bind(this, view);
mCompositeSubscription = new CompositeSubscription();
return view;
}

Expand All @@ -51,48 +44,41 @@ public void onResume() {
@Override
public void onViewCreated(View view, Bundle savedInstanceState) {
super.onViewCreated(view, savedInstanceState);

}


@OnClick(R.id.btn_polling)
public void onClick() {

//此处有bug,subscribe无法释放,望高人pull requests
subscribe=Observable.create(new Observable.OnSubscribe<String>() {
subscription = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> observer) {

Schedulers.newThread().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
observer.onNext(" "+(N++));
}
}, 0, 1000, TimeUnit.MILLISECONDS);
public void call(final Subscriber<? super String> subscriber) {
subscriber.onNext(" " + (N++));
subscriber.onCompleted();
}
})
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
return observable.delay(1, TimeUnit.SECONDS);
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
printLog("polling"+s);
}
});
mCompositeSubscription.add(subscribe);
@Override
public void call(String s) {
printLog("polling" + s);
}
});
}

private void printLog(String s) {
Log.i(TAG, s);
}

@Override
public void onDestroy() {
super.onDestroy();
mCompositeSubscription.unsubscribe();
Log.i(TAG, "subscribe.unsubscribe()");
subscription.unsubscribe();
}


}