dart:async库
先看一下Dart SDK中的dart:async库,这里介绍了Future和Stream的概念,这个和Java中的感觉一个样,通常我们不会直接使用Future和Stream,通过使用async/await语法来进行异步编程。
dart:async库现在位于dart:core里,所以也不用单独导入了。
基础使用——async/await
通过async/await语法糖来解决异步回调的嵌套地狱问题:
runUsingAsyncAwait() async {
// ...
var entryPoint = await findEntryPoint();
var exitCode = await runExecutable(entryPoint, args);
await flushThenExit(exitCode);
}
异常处理:
var entryPoint = await findEntryPoint();
try {
var exitCode = await runExecutable(entryPoint, args);
await flushThenExit(exitCode);
} catch (e) {
// Handle the error...
}
比如以下调用bind方法返回的Future,可以继续链式调用then和异常捕获处理 ,catchError
要放到then的后面,这样可以同时处理then方法中产生的异常:
HttpServer.bind('127.0.0.1', 4444)
.then((server) => print('${server.isBroadcast}'))
.catchError(print);
async/await的执行流程
Future<void> printOrderMessage() async {
print('Awaiting user order...');
var order = await fetchUserOrder();
print('Your order is: $order');
}
Future<String> fetchUserOrder() {
// Imagine that this function is more complex and slow.
return Future.delayed(Duration(seconds: 4), () => 'Large Latte');
}
Future<void> main() async {
countSeconds(4);
await printOrderMessage();
}
// You can ignore this function - it's here to visualize delay time in this example.
void countSeconds(int s) {
for (var i = 1; i <= s; i++) {
Future.delayed(Duration(seconds: i), () => print(i));
}
}
程序先立即输出“Awaiting user order...“,然后每个一秒输出一个数字:
Awaiting user order...
1
2
3
4
Your order is: Large Latte
多个异步方法链式调用
组合多个异步方法时,可以使用Future的then方法,或者使用多个await来组合,效果是一样的:
Future result = costlyQuery(url);
result
.then((value) => expensiveWork(value))
.then((_) => lengthyComputation())
.then((_) => print('Done!'))
.catchError((exception) {
/* Handle exception... */
});
try {
final value = await costlyQuery(url);
await expensiveWork(value);
await lengthyComputation();
print('Done!');
} catch (e) {
/* Handle exception... */
}
当需要组合多个异步结果都返回后,再继续后续的逻辑时,可以使用Future的wait方法:
Future deleteLotsOfFiles() async => ...
Future copyLotsOfFiles() async => ...
Future checksumLotsOfOtherFiles() async => ...
await Future.wait([
deleteLotsOfFiles(),
copyLotsOfFiles(),
checksumLotsOfOtherFiles(),
]);
print('Done with all the long steps!');
Stream
Stream的创建
可以参考官网文档:Creating streams in Dart
通过Stream的 fromFuture
、fromFutures
、fromIterable
静态方法来创建,也可以通过 StreamController
de sink
来创建生成:
void main() {
final stream = Stream.fromIterable([1, 2, 3]);
print(stream);// output: Instance of '_GeneratedStreamImpl<int>'
}
import 'Dart:async';
void main() {
StreamController<int> controller = StreamController();
controller.stream.listen((data) => print(data));
controller.sink.add(123);
controller.sink.add(456);
controller.close();
}
对一个事件序列,除了Stream的listen方法,还可以使用await for语法:
Future main(List<String> arguments) async {
// ...
if (await FileSystemEntity.isDirectory(searchPath)) {
final startingDir = Directory(searchPath);
await for (var entity in startingDir.list(
recursive: argResults[recursive],
followLinks: argResults[followLinks])) {
if (entity is File) {
searchFile(entity, searchTerms);
}
}
} else {
searchFile(File(searchPath), searchTerms);
}
}
一个点击事件的思考
对于一个点赞按钮,点击一次,请求一下接口数据,如果这个按钮可以连续点击十几次,那么总不能连续发送十几个请求吧,这时候可以将按钮的点击事件序列看作是一个Stream,然后可以使用Stream的相应方法或者属性来处理接口请求逻辑。
下面的代码中,onClick返回的是一个Stream:
// Find a button by ID and add an event handler.
querySelector('#submitInfo').onClick.listen((e) {
// When the button is clicked, it runs this code.
submitData();
});
- 如果只关心一个事件,则可以使用诸如first,last或single这样的属性来获取它。 要在处理事件之前对其进行测试,请使用诸如
firstWhere()
,lastWhere()
或singleWhere()
之类的方法。 - 如果您关心事件的子集,则可以使用诸如
skip()
,skipWhile()
,take()
,takeWhile()
和where()
之类的方法。
转换Stream数据
Dart 提供了 map
、where
、expand
和 take
方法将已存在的 Stream
转换为新的 Stream
在处理Stream数据前,也可以做相应的数据转换操作,这些转换的方法定义在 dart:convert
库中(参考dart:convert section).
var lines = inputStream
.transform(utf8.decoder)
.transform(LineSplitter());
Stream的异常处理
通过在listen方法中设置onDone ,onError函数来处理Stream的正常结束处理和异常处理:
var config = File('config.txt');
Stream<List<int>> inputStream = config.openRead();
inputStream
.transform(utf8.decoder)
.transform(LineSplitter())
.listen((String line) {
print('Got ${line.length} characters from stream');
}, onDone: () {
print('file is now closed');
}, onError: (e) {
print(e);
});
单订阅Stream和广播Stream
单订阅Stream
单订阅流通常用于流式传输很大的连续数据块,它在 Stream
的整个生命周期内只允许存在一个 listener
,且只能被收听一次,它在有 listener
之前不会生成事件,并且在 listener
取消收听后会停止发送事件,即使你仍在 sink.add
更多事件。
需要注意的是,即使 listener
取消收听,单订阅流也不允许出现第二个 listener
,否则将会报错,例如:
import 'dart:async';
void main() {
final controller = StreamController();
controller.stream.listen(print);
controller.stream.listen(print);
controller.sink.add(123);
controller.close();
// Unhandled exception: Bad state: Stream has already been listened to.
}
广播Stream
一般的流都是单订阅流,从 Stream
继承的广播流必须重写 isBroadcast
才会返回 true
,广播流允许存在任意数量的 listener
,并且无论是否存在 listener
,它都能产生事件,所以中途加入的 listener
不会侦听到已发生的事件。
如果在触发事件的同时加入 listener
,则此 listener
不会接收到本次事件,如果 listener
取消收听,则此 listener
会立即停止接收事件。
我们可以通过 asBroadcastStream
在单订阅流之上创建广播流,例如:
import 'dart:async';
void main() {
final controller = StreamController();
final stream = controller.stream.asBroadcastStream();
stream.listen(print);
stream.listen(print);
controller.sink.add(123);
controller.close();
/**
* output:
* 123
* 123
*/
}
参考: