본문 바로가기

Dart

[Dart] Stream 처리.

Stream 기본 코드(await for 구문으로 처리, Listen 대신 사용가능)

 

import 'dart:async';

// You can process a stream using either await for or listen() from the Stream API.
Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) { // 여기서 stream 끝날때까지 loop
    print('(sumStream)' + sum.toString());
    sum += value;
  }
  return sum;
}

// * 는 return type 이 Stream 이라서? 추가확인 필요.
Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) { //>
    await Future.delayed(const Duration(seconds: 1));
    yield i;
  }
}

void main() async {
  print('========== start ==========');
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print('(main)' + sum.toString()); // 55
}

 

========== start ==========
(sumStream) 1
(sumStream) 3
(sumStream) 6
(sumStream) 10
(sumStream) 15
(sumStream) 21
(sumStream) 28
(sumStream) 36
(sumStream) 45
(sumStream) 55
(main) 55

 

2곳에서 리스닝하여 오류 발생 샘플

 

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    print('(sumStream)' + sum.toString());
    sum += value;
  }
  return sum;
}

// * 는 return type 이 Stream 이라서?
Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) { //>
    await Future.delayed(const Duration(seconds: 1));
    yield i;
  }
}

void main() async {
  print('========== start ==========');
  var stream = countStream(10);
  // 여기서 Stream 을 1개 추가함, receiving 2곳이라 오류발생.
  stream.listen((value)=>{
    print('current value: ' + value.toString())
  });
  var sum = await sumStream(stream);
  print('(main)' + sum.toString()); // 55
}

 

========== start ==========
Uncaught Error: Bad state: Stream has already been listened to.
current value: 1
current value: 2
current value: 3
current value: 4
current value: 5
current value: 6
current value: 7
current value: 8
current value: 9
current value: 10

 

Stream 기본 코드(Listen 구문으로 처리, await for 대신 사용가능)

 

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    print('(sumStream) ' + sum.toString());
    sum += value;
  }
  return sum;
}

// * 는 return type 이 Stream 이라서?
Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) { //>
    await Future.delayed(const Duration(seconds: 1));
    yield i;
  }
}

void main() async {
  print('========== start ==========');
  var stream = countStream(10);
  stream.listen((value)=>{
    print('current value: ' + value.toString())
  });
// 한곳을 제거하여 오류는 없어짐.  
//   var sum = await sumStream(stream);
//   print('(main) ' + sum.toString()); // 55
}

 

========== start ==========
current value: 1
current value: 2
current value: 3
current value: 4
current value: 5
current value: 6
current value: 7
current value: 8
current value: 9
current value: 10

 

Transformer 샘플 예제

 

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
    print('(sumStream) ' + sum.toString());
  }
  return sum;
}

// * 는 return type 이 Stream 이라서?
Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) { //>
    await Future.delayed(const Duration(seconds: 1));
    yield i;
  }
}

// 데이터 조작을 시도함.
var transformer = new StreamTransformer<int, String>.fromHandlers(handleData: (value, sink) {
  sink.add('value: $value');// ==> 이부분이 변경결과임
});

void main() async {
  print('========== start ==========');
  var stream = countStream(10);
  stream.transform(transformer).listen((value)=>{
    print(value)
  });
//   var sum = await sumStream(stream);
//   print('(main) ' + sum.toString()); // 55
}

 

========== start ==========
value: 1
value: 2
value: 3
value: 4
value: 5
value: 6
value: 7
value: 8
value: 9
value: 10

 

오류 발생 및 오류 처리 샘플 코드

 

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try{
    await for (var value in stream) {
      sum += value;
      print('(sumStream) ' + sum.toString());
    }
  } catch(e) {
    // 오류 처리 추가
    print(e.toString());
    return -1;
  }
  return sum;
}

// * 는 return type 이 Stream 이라서?
Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) { //>
    await Future.delayed(const Duration(seconds: 1));
    // 오류 발생 코드
    if (i == 5) {
      throw new Exception('Intentional Error');
    } else {
      yield i;
    }
  }
}

void main() async {
  print('========== start ==========');
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print('(main) ' + sum.toString()); // 55
}

 

========== start ==========
(sumStream) 1
(sumStream) 3
(sumStream) 6
(sumStream) 10
Exception: Intentional Error
(main) -1

 

2곳에서 Listen 하는 샘플 코드( BroadcastStream )

 

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try{
    await for (var value in stream) {
      sum += value;
      print('(sumStream) ' + sum.toString());
    }
  } catch(e) {
    print(e.toString());
    return -1;
  }
  return sum;
}

// * 는 return type 이 Stream 이라서?
Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) { //>
    await Future.delayed(const Duration(seconds: 1));
    yield i;
  }
}

void main() async {
  print('========== start ==========');
  var stream = countStream(10);

  // Stream 을 2곳에서 받을수 있게 BroadcastStream 처리함
  var bcStream = stream.asBroadcastStream();
  bcStream.listen((value) => {
    print('again $value')
  });
  
  var sum = await sumStream(bcStream);
  print('(main) ' + sum.toString()); // 55
}

 

========== start ==========
again 1
(sumStream) 1
again 2
(sumStream) 3
again 3
(sumStream) 6
again 4
(sumStream) 10
again 5
(sumStream) 15
again 6
(sumStream) 21
again 7
(sumStream) 28
again 8
(sumStream) 36
again 9
(sumStream) 45
again 10
(sumStream) 55
(main) 55

'Dart' 카테고리의 다른 글

[Dart] Class - getter, setter  (0) 2021.05.27
[Dart] Class - 선언 및 생성자  (0) 2021.05.27
[Dart] Null Safety  (0) 2021.05.12
[Dart] Future, async-await  (0) 2021.04.30
[Dart] 상속과 변수 초기화  (0) 2021.04.30