Rxjs - 你取消订阅了吗 ?

本文根据作者在实际项目中遇到的rxjs订阅引发内存泄漏问题展开,介绍了rxjs订阅是如何引发内存泄漏的,以及如何合理的管理订阅避免内存泄漏的发生


刚开始学习Rxjs的时候,官方文档对Subscription的介绍:
A Subscription is an object that represents a disposable resource, usually the execution of an Observable. A Subscription has one important method, unsubscribe, that takes no argument and just disposes the resource held by the subscription. In previous versions of RxJS, Subscription was called "Disposable"


为什么需要取消订阅?

在项目的开发调试过程中发现了一个很奇怪的现象,在调试的时候发现为什么我在订阅中只打印了一次数据,可控制台打印怎么越来越多:

typescript 复制代码
ngOnInit(): void {
  this.store.dispatch(loadPosts());
  this.store.select(selectAllPosts).subscribe(x => {
    console.log(x);
    this.posts = x;
  });
}

控制台第一次只打印了一次数据,但当我切到别的页面再切回到当前组件的页面上时,控制台每次打印两次数据。以此类推随着页面的不断切换,数据的打印次数不断地上升。

让我们来分析一下生产这个现象的原因。
首先在组件 初始化 的时候我们订阅了一次store里面的数据,但注意我们并没有在组件销毁的时候取消掉当前的订阅。当我们切换到其他的页面上时,当前组件销毁。然后我们返回到这个页面,组件再次初始化,重新订阅了一次store数据。加上上次没有取消掉的订阅就是两次。长此以往下去我们的订阅只会不断地增加。假设项目里多个组件都有同样地问题时,不就发生了内存泄漏吗?而我们的项目中大量的使用了rxjs订阅,但都没有对订阅进行处理,可以说是很严重的bug。

因此我们需要解决掉这个潜在的内存泄露问题。

如何手动取消订阅?

紧接着前面的例子。当组件销毁时我们需要对订阅做一些处理。

方式1

typescript 复制代码
sub: Subscription;
ngOnInit(): void {
  this.store.dispatch(loadPosts());
  this.sub = this.store.select(selectAllPosts).subscribe(x => this.posts = x);
}
ngOnDestroy() {
  this.sub.unsubscribe();
}

方式2

有些时候我们组件中的rxjs订阅可能有很多,我们不可能为每个订阅命名一个变量,为此我们可以初始化一个变量对所有的订阅进行统一处理

typescript 复制代码
subs = new Subscription();
ngOnInit(): void {
  this.store.dispatch(loadPosts());
  this.sub.add(this.store.select(selectAllPosts).subscribe(x => this.posts = x));
  this.loadData();
}

ngOnDestroy() {
  this.subs.unsubscribe();
}

通过调用subscription实例上的add方法统一管理我们的订阅。

方式3

利用 Async 管道, Angular会自动帮我们处理订阅。

typescript 复制代码
posts$: Observable<Post[]>;
ngOnInit(): void {
  this.store.dispatch(loadPosts());
  this.posts$ = this.store.select(selectAllPosts);
}
html 复制代码
@for (post of (posts$ | async); track post.id) {
    <div class="post-item">
      <h3>{{ post.title }}</h3>
      <p>{{ post.content }}</p>
    </div>
}

哪些情况不需要取消订阅?

使用 Angular 的 Async 管道

在上面的例子我们已经展示过了。

有限 Observable(如 HTTP 请求)

typescript 复制代码
this.http.get('/api/data').subscribe(data => {
  // 不需要手动取消 - 请求完成后自动结束
});

使用完成操作符

typescript 复制代码
// 使用 take(1) 自动完成
this.route.params.pipe(take(1)).subscribe(params => {
  // 只取一次值,然后自动结束
});

// 使用 takeUntil 自动取消
private destroy$ = new Subject<void>();

ngOnInit() {
  this.dataService.getData()
    .pipe(takeUntil(this.destroy$))
    .subscribe(data => this.data = data);
}

ngOnDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

使用 first() 或 takeWhile() 操作符

typescript 复制代码
// 使用 first()
this.dataService.valueChanged.pipe(first()).subscribe(console.log);

// 使用 takeWhile()
let active = true;

this.dataService.stream
  .pipe(takeWhile(() => active))
  .subscribe(console.log);

// 在组件销毁时
ngOnDestroy() {
  active = false;
}

本文只列举了一些常见的场景。希望对你有所帮助