dart:async库

先看一下Dart SDK中的dart:async库,这里介绍了Future和Stream的概念,这个和Java中的感觉一个样,通常我们不会直接使用Future和Stream,通过使用async/await语法来进行异步编程。

dart:async库现在位于dart:core里,所以也不用单独导入了。

基础使用——async/await

通过async/await语法糖来解决异步回调的嵌套地狱问题:

runUsingAsyncAwait() async {
  // ...
  var entryPoint = await findEntryPoint();
  var exitCode = await runExecutable(entryPoint, args);
  await flushThenExit(exitCode);
}

异常处理:

var entryPoint = await findEntryPoint();
try {
  var exitCode = await runExecutable(entryPoint, args);
  await flushThenExit(exitCode);
} catch (e) {
  // Handle the error...
}

比如以下调用bind方法返回的Future,可以继续链式调用then和异常捕获处理 ,catchError要放到then的后面,这样可以同时处理then方法中产生的异常:

HttpServer.bind('127.0.0.1', 4444)
     .then((server) => print('${server.isBroadcast}'))
     .catchError(print);

async/await的执行流程

async函数将同步运行,直到第一个await关键字。 这意味着在异步函数体内,第一个await关键字之前的所有同步代码将立即执行。

Future<void> printOrderMessage() async {
  print('Awaiting user order...');
  var order = await fetchUserOrder();
  print('Your order is: $order');
}

Future<String> fetchUserOrder() {
  // Imagine that this function is more complex and slow.
  return Future.delayed(Duration(seconds: 4), () => 'Large Latte');
}

Future<void> main() async {
  countSeconds(4);
  await printOrderMessage();
}

// You can ignore this function - it's here to visualize delay time in this example.
void countSeconds(int s) {
  for (var i = 1; i <= s; i++) {
    Future.delayed(Duration(seconds: i), () => print(i));
  }
}

程序先立即输出“Awaiting user order...“,然后每个一秒输出一个数字:

Awaiting user order...
1
2
3
4
Your order is: Large Latte

多个异步方法链式调用

组合多个异步方法时,可以使用Future的then方法,或者使用多个await来组合,效果是一样的:

Future result = costlyQuery(url);
result
    .then((value) => expensiveWork(value))
    .then((_) => lengthyComputation())
    .then((_) => print('Done!'))
    .catchError((exception) {
  /* Handle exception... */
});
try {
  final value = await costlyQuery(url);
  await expensiveWork(value);
  await lengthyComputation();
  print('Done!');
} catch (e) {
  /* Handle exception... */
}

当需要组合多个异步结果都返回后,再继续后续的逻辑时,可以使用Future的wait方法:

Future deleteLotsOfFiles() async =>  ...
Future copyLotsOfFiles() async =>  ...
Future checksumLotsOfOtherFiles() async =>  ...

await Future.wait([
  deleteLotsOfFiles(),
  copyLotsOfFiles(),
  checksumLotsOfOtherFiles(),
]);
print('Done with all the long steps!');

Stream

Stream的创建

可以参考官网文档:Creating streams in Dart

通过Stream的 fromFuturefromFuturesfromIterable静态方法来创建,也可以通过 StreamControllerde sink来创建生成:

void main() {
  final stream = Stream.fromIterable([1, 2, 3]);
  print(stream);// output: Instance of '_GeneratedStreamImpl<int>'
}
import 'Dart:async';

void main() {
  StreamController<int> controller = StreamController();
  controller.stream.listen((data) => print(data));
  controller.sink.add(123);
  controller.sink.add(456);
  controller.close();
 }

对一个事件序列,除了Stream的listen方法,还可以使用await for语法:

Future main(List<String> arguments) async {
  // ...
  if (await FileSystemEntity.isDirectory(searchPath)) {
    final startingDir = Directory(searchPath);
    await for (var entity in startingDir.list(
        recursive: argResults[recursive],
        followLinks: argResults[followLinks])) {
      if (entity is File) {
        searchFile(entity, searchTerms);
      }
    }
  } else {
    searchFile(File(searchPath), searchTerms);
  }
}

一个点击事件的思考

对于一个点赞按钮,点击一次,请求一下接口数据,如果这个按钮可以连续点击十几次,那么总不能连续发送十几个请求吧,这时候可以将按钮的点击事件序列看作是一个Stream,然后可以使用Stream的相应方法或者属性来处理接口请求逻辑。

下面的代码中,onClick返回的是一个Stream:

// Find a button by ID and add an event handler.
querySelector('#submitInfo').onClick.listen((e) {
  // When the button is clicked, it runs this code.
  submitData();
});
  • 如果只关心一个事件,则可以使用诸如first,last或single这样的属性来获取它。 要在处理事件之前对其进行测试,请使用诸如firstWhere()lastWhere()singleWhere()之类的方法。
  • 如果您关心事件的子集,则可以使用诸如skip()skipWhile()take()takeWhile()where()之类的方法。

转换Stream数据

Dart 提供了 mapwhereexpandtake 方法将已存在的 Stream 转换为新的 Stream

在处理Stream数据前,也可以做相应的数据转换操作,这些转换的方法定义在 dart:convert 库中(参考dart:convert section).

var lines = inputStream
    .transform(utf8.decoder)
    .transform(LineSplitter());

Stream的异常处理

通过在listen方法中设置onDone ,onError函数来处理Stream的正常结束处理和异常处理:

var config = File('config.txt');
Stream<List<int>> inputStream = config.openRead();

inputStream
    .transform(utf8.decoder)
    .transform(LineSplitter())
    .listen((String line) {
  print('Got ${line.length} characters from stream');
}, onDone: () {
  print('file is now closed');
}, onError: (e) {
  print(e);
});

单订阅Stream和广播Stream

单订阅Stream

单订阅流通常用于流式传输很大的连续数据块,它在 Stream 的整个生命周期内只允许存在一个 listener,且只能被收听一次,它在有 listener 之前不会生成事件,并且在 listener 取消收听后会停止发送事件,即使你仍在 sink.add 更多事件。

需要注意的是,即使 listener 取消收听,单订阅流也不允许出现第二个 listener,否则将会报错,例如:

import 'dart:async';

void main() {
  final controller = StreamController();

  controller.stream.listen(print);
  controller.stream.listen(print);

  controller.sink.add(123);
  controller.close();
  
  // Unhandled exception: Bad state: Stream has already been listened to.
}

广播Stream

一般的流都是单订阅流,从 Stream 继承的广播流必须重写 isBroadcast 才会返回 true,广播流允许存在任意数量的 listener,并且无论是否存在 listener,它都能产生事件,所以中途加入的 listener 不会侦听到已发生的事件。

如果在触发事件的同时加入 listener,则此 listener 不会接收到本次事件,如果 listener 取消收听,则此 listener 会立即停止接收事件。

我们可以通过 asBroadcastStream 在单订阅流之上创建广播流,例如:

import 'dart:async';

void main() {
  final controller = StreamController();

  final stream = controller.stream.asBroadcastStream();

  stream.listen(print);
  stream.listen(print);

  controller.sink.add(123);
  controller.close();
  
  /**
   * output:
   * 123
   * 123
   */
}

参考:

如果觉得我的文章对你有用,请随意赞赏