Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to trigger reading from the source stream using the Scala API? #94

Open
jmarton opened this issue May 15, 2018 · 0 comments
Open

How to trigger reading from the source stream using the Scala API? #94

jmarton opened this issue May 15, 2018 · 0 comments

Comments

@jmarton
Copy link

jmarton commented May 15, 2018

TL/DR: How to trigger loading initial data from the source streams using the Scala API?

Using DBToaster 2.2 (rev. 3387), I was experimenting with the Scala API. I went through the following steps:

Compilation and run from the command line

Following the docs in Sec.1. Compiling and running a query, I successfully compiled and run the examples/queries/simple/rst.sql query from the command line, which gave the expected result:

$ bin/dbtoaster -l scala -c test.jar examples/queries/simple/rst.sql
$ java -classpath "test.jar:lib/dbt_scala/*" ddbt.gen.Dbtoaster
Java 1.8.0_162, Scala 2.10.2-20140310-140650-2481f036a5
Time: 0.002s (30/0)
ATIMESD:
306

Using the Scala API

Then I went on to the example code in Sec.2. Scala API Guide, we created a project to use the example query from Scala, which gave exactly the same output as given in the docs:

Insert a tuple into R.
Insert a tuple into S.
Insert a tuple into T.
Result after this step: 20
Insert another tuple into T.
Final Result: 45

The Scala project has access at in-the-query specified path to the initial stream data for the rst query, but the results show it doesn't load that: initial data gave ATIMESD 306, but the Scala call resulted in 20 and 45 after inserting a few more tuples, but without deleting any.

Is there a way to trigger loading the initial stream data specified in the CREATE STREAM statement through the Scala API?

Inspecting the vanilla Scala generated code

I tried to inspect the generated vanilla Scala code for the query:

$ bin/dbtoaster -l vscala -o rst.scala examples/queries/simple/rst.sql

I found that the def main of the main class invokes def execute, which feeds the streaming relations from the input files, but I found no way to invoke this by sending some events.

def main and def execute from the generated vscala code:

  def execute(args: Array[String], f: List[Any] => Unit) = bench(args, (dataset: String, parallelMode: Int, timeout: Long, batchSize: Int) => run[Rst](Seq(
    (new java.io.FileInputStream("examples/data/simple/r.dat"),new Adaptor.CSV("R","long,long","\\Q,\\E", if (dataset.endsWith("_del")) "ins + del" else "insert"),Split()),
    (new java.io.FileInputStream("examples/data/simple/s.dat"),new Adaptor.CSV("S","long,long","\\Q,\\E", if (dataset.endsWith("_del")) "ins + del" else "insert"),Split()),
    (new java.io.FileInputStream("examples/data/simple/t.dat"),new Adaptor.CSV("T","long,long","\\Q,\\E", if (dataset.endsWith("_del")) "ins + del" else "insert"),Split())
  ), parallelMode, timeout, batchSize), f)
  
  def main(args: Array[String]) {
    execute(args, (res: List[Any]) => {
      println("ATIMESD:\n" + M3Map.toStr(res(0))+"\n")
    })
  }

I was wondering if StreamInit event were intended to initialize initialize the stream from the predefined stream sources, or there are some events missing:

  def receive = {
    case TupleEvent(TupleInsert, "R", List(v0:Long,v1:Long)) => if (t1 > 0 && (tN & 127) == 0) { val t = System.nanoTime; if (t > t1) { t1 = t; tS = 1L; context.become(receive_skip) } else tN += 1L } else tN += 1L; onAddR(v0,v1)
    case TupleEvent(TupleDelete, "R", List(v0:Long,v1:Long)) => if (t1 > 0 && (tN & 127) == 0) { val t = System.nanoTime; if (t > t1) { t1 = t; tS = 1L; context.become(receive_skip) } else tN += 1L } else tN += 1L; onDelR(v0,v1)
    case TupleEvent(TupleInsert, "S", List(v0:Long,v1:Long)) => if (t1 > 0 && (tN & 127) == 0) { val t = System.nanoTime; if (t > t1) { t1 = t; tS = 1L; context.become(receive_skip) } else tN += 1L } else tN += 1L; onAddS(v0,v1)
    case TupleEvent(TupleDelete, "S", List(v0:Long,v1:Long)) => if (t1 > 0 && (tN & 127) == 0) { val t = System.nanoTime; if (t > t1) { t1 = t; tS = 1L; context.become(receive_skip) } else tN += 1L } else tN += 1L; onDelS(v0,v1)
    case TupleEvent(TupleInsert, "T", List(v0:Long,v1:Long)) => if (t1 > 0 && (tN & 127) == 0) { val t = System.nanoTime; if (t > t1) { t1 = t; tS = 1L; context.become(receive_skip) } else tN += 1L } else tN += 1L; onAddT(v0,v1)
    case TupleEvent(TupleDelete, "T", List(v0:Long,v1:Long)) => if (t1 > 0 && (tN & 127) == 0) { val t = System.nanoTime; if (t > t1) { t1 = t; tS = 1L; context.become(receive_skip) } else tN += 1L } else tN += 1L; onDelT(v0,v1)
    case StreamInit(timeout) => 
      
      onSystemReady();
      t0 = System.nanoTime;
      if (timeout > 0) t1 = t0 + timeout * 1000000L
    case EndOfStream | GetSnapshot(_) => 
      t1 = System.nanoTime; 
       sender ! (StreamStat(t1 - t0, tN, tS), List(ATIMESD))
  }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant