io.dylemma.frp

EventStream

trait EventStream[+A] extends AnyRef

An EventStream represents a (potentially finite) series of events that may occur at any time in the future. EventStreams are stylistic replacements for Publishers (from the Publisher/Subscriber, or Observer pattern). Events are produced as Event instances - either a Fire or Stop. Each Fire event produced by an EventStream represents a single "data". The Stop event represents the end of the stream. Once an EventStream produces a Stop event, it is considered finished and will produce no more events.

There are several ways to attach event handlers to an EventStream. The most low-level way is to use the sink method, which operates on Event instances. The foreach method is used to attach a handler that operates only on data from Fire events. onNext and onEnd attach handlers to the next Fire and a Stop, respectively. Attaching event handlers requires an implicit Observer, which is used to manage references between the handler and the stream.

EventStreams can be transformed and combined in a variety of ways like filtering, mapping, and concatenation.

Below is a working example of EventStream usage.

class Example extends App with Observer {
	// Create an EventStream instance. EventSource is a subtype of
	// EventStream, with the addition of `fire` and `stop` methods.
	val stream = EventSource[Int]

	// Create a mapped version of the EventStream
	val mappedStream = stream.map(_ * 2)

	// Attach an event handler, using the `foreach` syntax
	for(i <- stream) println("Int event: " + i)
	for(i <- mappedStream) println("Mapped event: " + i)

	// Attach an "end" event handler
	stream onEnd { println("Got the end") }

	// fire some events
	stream fire 1
	// (prints "Int event: 1")
	// (prints "Mapped event: 2")

	stream fire 5
	// (prints "Int event: 5")
	// (prints "Mapped event: 10")

	// stop the stream
	stream.stop
	// (prints "Got the end")
}
Linear Supertypes
AnyRef, Any
Known Subclasses
Ordering
  1. Alphabetic
  2. By inheritance
Inherited
  1. EventStream
  2. AnyRef
  3. Any
Implicitly
  1. by EventStreamFutures
  2. by any2stringadd
  3. by any2stringfmt
  4. by any2ArrowAssoc
  5. by any2Ensuring
  1. Hide All
  2. Show all
Learn more about member selection
Visibility
  1. Public
  2. All

Abstract Value Members

  1. abstract def ++[A1 >: A](that: EventStream[A1]): EventStream[A1]

    Creates a new EventStream that represents the concatenation of this stream and that.

    Creates a new EventStream that represents the concatenation of this stream and that. The resulting stream will re-fire all events from this stream at first. Once this stream stops, the new stream will begin re-firing events from that stream, up until that one stops as well. If both this and that streams are stopped at the time of creation, the resulting stream will also be stopped.

    that

    Another EventStream whose events will be re-fired after this stream has stopped.

    returns

    The concatenation of this and that event stream.

  2. abstract def before(deadline: Deadline): EventStream[A]

    Creates a new EventStream that re-fires events from this stream that happen before the given deadline.

    Creates a new EventStream that re-fires events from this stream that happen before the given deadline. The resulting stream will stop automatically when the deadline expires, or when this stream stops. Time-based expiration will generally happen on another thread, as it is handled by a ScheduledExecutorService.

  3. abstract def collect[B](pf: PartialFunction[A, B]): EventStream[B]

    Creates a new EventStream by applying a partial function to all events fired by this stream on which the function is defined.

    Creates a new EventStream by applying a partial function to all events fired by this stream on which the function is defined. The resulting stream will stop when this stream stops.

    pf

    The partial function to apply to events from this stream

    returns

    A new stream that fires events transformed by pf

  4. abstract def drop(count: Int): EventStream[A]

    Creates a new EventStream that ignores the first count events fired by this stream, then re-fires all events afterward.

    Creates a new EventStream that ignores the first count events fired by this stream, then re-fires all events afterward. The resulting stream will stop when this stream does.

    count

    The number of events to ignore.

    returns

    A new stream that fires all events from this stream after having ignored count of them.

  5. abstract def dropWhile(p: (A) ⇒ Boolean): EventStream[A]

    Creates a new EventStream that will re-fire all events fired by this stream, starting as soon as the predicate function p returns true in response to one of the events.

    Creates a new EventStream that will re-fire all events fired by this stream, starting as soon as the predicate function p returns true in response to one of the events. All events prior to the first "passed" event will be ignored. The resulting stream will stop when this stream stops.

    p

    The filter predicate function to evaluate events. Once this function returns true, all events (including the current one) will be re-fired.

    returns

    A new stream that ignores events until one of them causes p to return true.

  6. abstract def either[B](that: EventStream[B]): EventStream[Either[A, B]]

    Creates a new EventStream that fires all events from this stream as Lefts, and all events from that stream as Rights.

    Creates a new EventStream that fires all events from this stream as Lefts, and all events from that stream as Rights. It is essentially the same as a union, but appropriate for when this and that are streams of different types. The resulting stream will stop once both parent streams are stopped.

    that

    Any EventStream to be joined with this stream in an "Either" Union.

    returns

    A new stream that fires events from this and that as Eithers.

  7. abstract def filter(p: (A) ⇒ Boolean): EventStream[A]

    Alias for withFilter

  8. abstract def flatMap[B](f: (A) ⇒ EventStream[B]): EventStream[B]

    Creates a new EventStream with the following behavior: for every event fired by this stream, a new stream will be created by applying f to that event; events from the new stream will be fired by the resulting stream until the next event from this stream, when the mapping re-starts.

    Creates a new EventStream with the following behavior: for every event fired by this stream, a new stream will be created by applying f to that event; events from the new stream will be fired by the resulting stream until the next event from this stream, when the mapping re-starts. The resulting stream will stop when this stream stops.

    f

    A function that returns a new EventStream for every event fired by this stream.

    returns

    A new EventStream that fires events from the mapped streams, resetting the mapped stream every time this stream fires a new event.

  9. abstract def foldLeft[B](z: B)(op: (B, A) ⇒ B): EventStream[B]

    Creates a new EventStream that updates its state for each new event fired by this stream.

    Creates a new EventStream that updates its state for each new event fired by this stream. The state starts at z and updates along the lines of state = op(state, event) for every event fired by this stream. Each time the state is updated, the new stream fires an event containing the state.

    z

    The initial state for the fold

    op

    The update function, of the form (state, next) => newState

    returns

    A new stream that fires events as its state updates according to op

  10. abstract def grouped(size: Int): EventStream[List[A]]

    Creates a new EventStream that accumulates events from this stream in a List, re-firing the list once it reaches the specified size (or once this stream stops).

    Creates a new EventStream that accumulates events from this stream in a List, re-firing the list once it reaches the specified size (or once this stream stops). The behavior of the resulting stream is supposed to parallel the behavior of a scala standard library collection's grouped method.

    size

    The group size

    returns

    A new stream that buffers events into a list, firing them when the list reaches the given size or when this stream stops.

  11. abstract def map[B](f: (A) ⇒ B): EventStream[B]

    Creates a mapped version of this EventStream.

    Creates a mapped version of this EventStream. For every event e, fired by this stream, the mapped stream will fire an event equal to f(e). The mapped stream will stop with this stream stops.

    f

    The transformation function to be applied to events from this stream.

    returns

    A new EventStream that fires events from this stream, transformed by f.

  12. abstract def stopped: Boolean

    Marks whether or not this stream is stopped.

    Marks whether or not this stream is stopped. A stopped stream will not produce any more events.

    returns

    true if this stream is stopped, false otherwise.

  13. abstract def take(count: Int): EventStream[A]

    Creates a new EventStream that takes the first count events from this stream and fires them, then stops.

    Creates a new EventStream that takes the first count events from this stream and fires them, then stops. The resulting stream will also stop if this stream stops anytime before it fires count new events.

    count

    The number of events that the resulting stream will re-fire

    returns

    A new stream that re-fires the first count events from this stream.

  14. abstract def takeWhile(p: (A) ⇒ Boolean): EventStream[A]

    Creates a new EventStream that re-fires events from this stream as long as the event data satisfies p, the filter predicate.

    Creates a new EventStream that re-fires events from this stream as long as the event data satisfies p, the filter predicate. The first event e that causes p(e) to be false will cause the resulting stream to stop. The new stream will also stop if this stream is already stopped, or becomes stopped at any time.

    p

    The filter predicate function. Events fired by this stream will be passed into p. As soon as the result is false, the new stream will stop.

    returns

    A new stream that re-fires events from this stream until the filter predicate fails for an event.

  15. abstract def until(end: EventStream[_]): EventStream[A]

    Creates a new EventStream that will re-fire all events from this stream until the end stream fires an event.

    Creates a new EventStream that will re-fire all events from this stream until the end stream fires an event. The end stream stopping does not count as a fired event in this case. The resulting stream will also stop when and if this stream stops.

    end

    An EventStream whose first event marks the end of the resulting stream

    returns

    A new stream that re-fires events from this stream until the first event from the end stream.

  16. abstract def unzip[A1, A2](implicit asPair: (A) ⇒ (A1, A2)): (EventStream[A1], EventStream[A2])

    Where this EventStream's data can be represented as a Tuple2[A1, A2], this method creates two separate EventStreams that each represent a half of that pair, respectively.

    Where this EventStream's data can be represented as a Tuple2[A1, A2], this method creates two separate EventStreams that each represent a half of that pair, respectively.

    Example usage:

    val x = EventSource[(Int, String)]
    val (a,b) = x.unzip
    
    for(i <- a) println("left: " + i)
    for(s <- b) println("right: + s)
    
    x fire 1 -> "hi"
    //prints "left: 1"
    //prints "right: hi"
    
    x fire 5 -> "earth"
    //prints "left: 5"
    //prints "right: earth"
    returns

    Two new streams that represent the left and right halves of this stream's events, respectively.

  17. abstract def withFilter(p: (A) ⇒ Boolean): EventStream[A]

    Creates a new EventStream that fires all events from this stream that match the filter predicate.

    Creates a new EventStream that fires all events from this stream that match the filter predicate. The resulting stream will stop when this stream stops.

    p

    The filter predicate. For each event e, fired by this stream, the filtered stream will fire e as long as p(e) returns true.

    returns

    A filtered version of this stream.

  18. abstract def within(duration: Duration): EventStream[A]

    Creates a new EventStream that re-fires events from this stream that happen within the given duration from the time of creation.

    Creates a new EventStream that re-fires events from this stream that happen within the given duration from the time of creation. The resulting stream will stop when this stream stops, or when the duration runs out. Time-based expiration will generally happen on another thread, as it is handled by a ScheduledExecutorService.

    duration

    The amount of time before the resulting stream stops automatically.

    returns

    A new stream that represents all events fired by this stream, within the given duration from the time of creation.

  19. abstract def zip[B](that: EventStream[B]): EventStream[(A, B)]

    Creates a new EventStream that joins this stream with that stream, firing events as pairs as soon as an event is available from both streams.

    Creates a new EventStream that joins this stream with that stream, firing events as pairs as soon as an event is available from both streams. The new stream will buffer events from both parent streams, so take care to avoid creating a zipped stream if one stream is expected to fire significantly more events than the other; the buffer for the larger stream will continue to accumulate without being emptied.

    Example usage:

    val a = EventSource[Int]
    val b = EventSource[String]
    val c: EventStream[(Int, String)] = a zip b
    
    c foreach println _
    
    a fire 5
    a fire 2
    a fire 4
    b fire "A" // prints "(5, A)"
    b fire "B" // prints "(2, B)"
    b fire "C" // prints "(4, C)"
    that

    The event stream to be zipped with this stream.

    returns

    A new stream that fires a pair for each corresponding pair of events from this stream and that stream.

  20. abstract def zipWithIndex: EventStream[(A, Int)]

    Creates a new EventStream that re-fires events from this stream, paired with the zero-based index of the event.

  21. abstract def zipWithStaleness: EventStream[(A, () ⇒ Boolean)]

    Creates a new EventStream that re-fires events from this stream, paired with a function that checks if that event is currently "stale".

    Creates a new EventStream that re-fires events from this stream, paired with a function that checks if that event is currently "stale". A stale event is one that is not currently the most recent event fired by the stream.

  22. abstract def zipWithTime[T](implicit arg0: Time[T]): EventStream[(A, T)]

    Creates a new EventStream that re-fires events from this stream, paired with a time stamp representing when the event was fired.

    Creates a new EventStream that re-fires events from this stream, paired with a time stamp representing when the event was fired.

    T

    The type of Time

    arg0

    An instance of Time[T] that can generate time stamps. The default implementation is SystemTime which generates Long time stamps by calling System.currentTimeMillis.

  23. abstract def ||[A1 >: A](that: EventStream[A1]): EventStream[A1]

    Creates a new EventStream that represents the union of this and that stream.

    Creates a new EventStream that represents the union of this and that stream. All events from both streams will be re-fired in the order that they are encountered. The resulting stream will stop once both parent streams are stopped.

    that

    Any EventStream, to be joined with this stream in a Union.

    returns

    The Union of this stream and that stream.

Concrete Value Members

  1. final def !=(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  2. final def !=(arg0: Any): Boolean

    Definition Classes
    Any
  3. final def ##(): Int

    Definition Classes
    AnyRef → Any
  4. def +(other: String): String

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to StringAdd performed by method any2stringadd in scala.Predef.
    Definition Classes
    StringAdd
  5. def ->[B](y: B): (EventStream[A], B)

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to ArrowAssoc[EventStream[A]] performed by method any2ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc
    Annotations
    @inline()
  6. final def ==(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  7. final def ==(arg0: Any): Boolean

    Definition Classes
    Any
  8. final def asInstanceOf[T0]: T0

    Definition Classes
    Any
  9. def clone(): AnyRef

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws()
  10. def end[T](implicit obs: Observer, time: Time[T]): Future[T]

    Returns a Future that will complete when this stream stops.

    Returns a Future that will complete when this stream stops. The resulting value will be a rough estimate (System.currentTimeMillis) of when the stream ended. If the stream never ends, the resulting Future will never complete.

    returns

    A Future containing a time stamp describing when this stream stopped.

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to EventStreamFutures[A] performed by method EventStreamFutures in io.dylemma.frp.
    Definition Classes
    EventStreamFutures
  11. def ensuring(cond: (EventStream[A]) ⇒ Boolean, msg: ⇒ Any): EventStream[A]

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to Ensuring[EventStream[A]] performed by method any2Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  12. def ensuring(cond: (EventStream[A]) ⇒ Boolean): EventStream[A]

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to Ensuring[EventStream[A]] performed by method any2Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  13. def ensuring(cond: Boolean, msg: ⇒ Any): EventStream[A]

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to Ensuring[EventStream[A]] performed by method any2Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  14. def ensuring(cond: Boolean): EventStream[A]

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to Ensuring[EventStream[A]] performed by method any2Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  15. final def eq(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  16. def equals(arg0: Any): Boolean

    Definition Classes
    AnyRef → Any
  17. def finalize(): Unit

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws()
  18. def foreach[U](f: (A) ⇒ U)(implicit obs: Observer): Unit

    Attach an event handler for data fired by this stream.

    Attach an event handler for data fired by this stream.

    f

    A function that takes in an event data and performs side effects.

  19. def formatted(fmtstr: String): String

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to StringFormat performed by method any2stringfmt in scala.Predef.
    Definition Classes
    StringFormat
    Annotations
    @inline()
  20. final def getClass(): Class[_]

    Definition Classes
    AnyRef → Any
  21. def hashCode(): Int

    Definition Classes
    AnyRef → Any
  22. final def isInstanceOf[T0]: Boolean

    Definition Classes
    Any
  23. def last(implicit obs: Observer): Future[A]

    Returns a Future that will complete with the value of the last event fired by this stream.

    Returns a Future that will complete with the value of the last event fired by this stream. If the stream is stopped or becomes stopped before firing an event, the Future will fail with a NoSuchElementException.

    returns

    A Future containing the last event fired by this stream.

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to EventStreamFutures[A] performed by method EventStreamFutures in io.dylemma.frp.
    Definition Classes
    EventStreamFutures
  24. final def ne(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  25. def next(implicit obs: Observer): Future[A]

    Returns a Future that will complete with the value of the next event fired by this stream.

    Returns a Future that will complete with the value of the next event fired by this stream. If the stream is stopped, or if it stops before firing an event, the Future will fail with a NoSuchElementException.

    returns

    A Future containing the next event fired by this stream.

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to EventStreamFutures[A] performed by method EventStreamFutures in io.dylemma.frp.
    Definition Classes
    EventStreamFutures
  26. final def notify(): Unit

    Definition Classes
    AnyRef
  27. final def notifyAll(): Unit

    Definition Classes
    AnyRef
  28. def onEnd(f: ⇒ Unit)(implicit obs: Observer): Unit

    Assign a block of code that will run when this stream stops.

    Assign a block of code that will run when this stream stops. If this stream is already stopped, the block of code will run immediately.

    f

    A block of code that will run when this stream sends a Stop event.

  29. def onNext(f: (A) ⇒ Unit)(implicit obs: Observer): Unit

    Assign a handler for the next event fired by this stream.

    Assign a handler for the next event fired by this stream.

    f

    A function that takes in an event data and performs side effects. It can be assumed that f will be run at most once.

  30. def sink(handler: (Event[A]) ⇒ Boolean)(implicit obs: Observer): Unit

    Add a handler function that acts as a sink for items produced by this Source.

    Add a handler function that acts as a sink for items produced by this Source. The handler is expected to return true as long as it remains active. Once the handler function returns false in response to some produced item, it will be deactivated and will no longer receive new items. There is no guarantee of the order that handlers will be called.

    handler

    The handler function to receive items produced by this Source. Once the handler returns false in response to some produced item, it will be deactivated and will no longer receive new items.

    obs

    An implicit Observer which is required in order to properly manage references between this Source and any handlers, avoiding reference loops.

  31. final def synchronized[T0](arg0: ⇒ T0): T0

    Definition Classes
    AnyRef
  32. def toString(): String

    Definition Classes
    AnyRef → Any
  33. final def wait(): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws()
  34. final def wait(arg0: Long, arg1: Int): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws()
  35. final def wait(arg0: Long): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws()
  36. def [B](y: B): (EventStream[A], B)

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to ArrowAssoc[EventStream[A]] performed by method any2ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc

Shadowed Implict Value Members

  1. val self: Any

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to StringAdd performed by method any2stringadd in scala.Predef.
    Shadowing
    This implicitly inherited member is ambiguous. One or more implicitly inherited members have similar signatures, so calling this member may produce an ambiguous implicit conversion compiler error.
    To access this member you can use a type ascription:
    (eventStream: StringAdd).self
    Definition Classes
    StringAdd
  2. val self: Any

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to StringFormat performed by method any2stringfmt in scala.Predef.
    Shadowing
    This implicitly inherited member is ambiguous. One or more implicitly inherited members have similar signatures, so calling this member may produce an ambiguous implicit conversion compiler error.
    To access this member you can use a type ascription:
    (eventStream: StringFormat).self
    Definition Classes
    StringFormat

Deprecated Value Members

  1. def x: EventStream[A]

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to ArrowAssoc[EventStream[A]] performed by method any2ArrowAssoc in scala.Predef.
    Shadowing
    This implicitly inherited member is ambiguous. One or more implicitly inherited members have similar signatures, so calling this member may produce an ambiguous implicit conversion compiler error.
    To access this member you can use a type ascription:
    (eventStream: ArrowAssoc[EventStream[A]]).x
    Definition Classes
    ArrowAssoc
    Annotations
    @deprecated
    Deprecated

    (Since version 2.10.0) Use leftOfArrow instead

  2. def x: EventStream[A]

    Implicit information
    This member is added by an implicit conversion from EventStream[A] to Ensuring[EventStream[A]] performed by method any2Ensuring in scala.Predef.
    Shadowing
    This implicitly inherited member is ambiguous. One or more implicitly inherited members have similar signatures, so calling this member may produce an ambiguous implicit conversion compiler error.
    To access this member you can use a type ascription:
    (eventStream: Ensuring[EventStream[A]]).x
    Definition Classes
    Ensuring
    Annotations
    @deprecated
    Deprecated

    (Since version 2.10.0) Use resultOfEnsuring instead

Inherited from AnyRef

Inherited from Any

Inherited by implicit conversion EventStreamFutures from EventStream[A] to EventStreamFutures[A]

Inherited by implicit conversion any2stringadd from EventStream[A] to StringAdd

Inherited by implicit conversion any2stringfmt from EventStream[A] to StringFormat

Inherited by implicit conversion any2ArrowAssoc from EventStream[A] to ArrowAssoc[EventStream[A]]

Inherited by implicit conversion any2Ensuring from EventStream[A] to Ensuring[EventStream[A]]

Ungrouped