package main import ( "bufio" "fmt" "io" "io/ioutil" "os" "strings" "testing" "time" "database/sql" _ "github.com/Go-SQL-Driver/MySQL" ) func init() { VERBOSE = true } func TestCleaningFromDB(t *testing.T) { cfg := &Config{ Limit: 0, Interval: "5min", Epsilon: 0, DataSource: "mysql", DBConn: "", DBName: "test", RawTable: "test_raw", CleanTable: "test_clean", DBUser: "flowcleaner", DBPass: "nil", } fmt.Println("== Testing to process from DB ==") prepareDB(t, cfg) processFromDB(cfg) controlCleanDB(t, cfg) controlRawDBMySQL(t, cfg) fmt.Println("== Finished testing to process from DB ==") } func TestCleaningFromJSON(t *testing.T) { cfg := &Config{ Limit: 0, Interval: "5min", Epsilon: 0, DataSource: "json", DBConn: "", DBName: "test", RawTable: "test_raw", CleanTable: "test_clean", DBUser: "flowcleaner", DBPass: "nil", } fmt.Println("== Testing to process from stdin ==") prepareDB(t, cfg) testProcessFromStdin(t, cfg) time.Sleep(15 * time.Second) controlCleanDB(t, cfg) controlRawDBStdin(t, cfg) fmt.Println("== Finished testing to process from stdin ==") } func prepareDB(t *testing.T, cfg *Config) { db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { t.Fatal("Failed to connect to db:", err) } defer db.Close() file, err := ioutil.ReadFile("testdata/dbTestSetup.mysql") if err != nil { t.Fatal(err) } for _, query := range strings.Split(string(file), ";") { query = strings.TrimSpace(query) if len(query) > 0 { _, err = db.Exec(query + ";") if err != nil { fmt.Println("QUERY:", query) t.Fatal(err) } } } } func testProcessFromStdin(t *testing.T, cfg *Config) { stdin := make(chan []byte) go func() { min := time.Now().Minute() binStart := ((min + 5) / 5) * 5 // Set it to the next interval wait := (binStart - min) % 5 dur, err := time.ParseDuration(fmt.Sprintf("%dm", wait)) if err != nil { t.Fatal(err) } fmt.Println("== Waiting for alignment of timebin before testing ==") fmt.Println(fmt.Sprintf("== Will start in %d minute(s) ==", wait)) time.Sleep(dur) fmt.Println("== Now starting to test ==") for i := 0; i < 3; i++ { fakeStdin(t, stdin) if i < 1 { fmt.Println("== Now waiting for next time bin ==") time.Sleep(5 * time.Minute) fmt.Println("== Now continuing testing ==") } } close(stdin) }() rDatChan := parseRawData(stdin, cfg) err := cleanFromStdin(rDatChan, cfg) if err != nil { t.Error(err) } } func fakeStdin(t *testing.T, wr chan<- []byte) { file, err := os.Open("testdata/jsoninput") if err != nil { t.Fatal(err) } defer file.Close() reader := bufio.NewReader(file) for { line, err := reader.ReadBytes('\n') if err == io.EOF { break } else if err != nil { t.Fatal(err) } else { wr <- line } } } func controlRawDBStdin(t *testing.T, cfg *Config) { db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { t.Fatal("Failed to connect to db:", err) } defer db.Close() rows, err := db.Query("SELECT * FROM " + cfg.RawTable) if err != nil { t.Fatal("Failed to select cleaned data:", err) } defer rows.Close() numRows := 0 for rows.Next() { numRows++ } if numRows != 33 { t.Error("Wrong number of rows found in db, should be 33 found:", numRows) } } func controlRawDBMySQL(t *testing.T, cfg *Config) { db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { t.Fatal("Failed to connect to db:", err) } defer db.Close() rows, err := db.Query("SELECT * FROM " + cfg.RawTable) if err != nil { t.Fatal("Failed to select cleaned data:", err) } defer rows.Close() numRows := 0 for rows.Next() { numRows++ } if numRows != 3 { t.Error("Wrong number of rows found in db, should be 3 found:", numRows) } } func controlCleanDB(t *testing.T, cfg *Config) { db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { t.Fatal("Failed to connect to db:", err) } defer db.Close() rows, err := db.Query("SELECT * FROM " + cfg.CleanTable) if err != nil { t.Fatal("Failed to select cleaned data:", err) } defer rows.Close() numRows := 0 for rows.Next() { numRows++ } if numRows != 14 { t.Error("Wrong number of rows found in db, should be 14 found:", numRows) } check1, err := db.Query("SELECT occurences, volume FROM " + cfg.CleanTable + " ORDER BY occurences DESC") if err != nil { t.Fatal("Failed to select occurences and volume:", err) } defer check1.Close() var occurences int var vol string check1.Next() check1.Scan(&occurences, &vol) if occurences != 36 && vol == "0-199" { t.Error("Failed occurences 36, vol 0-199. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 20 && vol == "200-299" { t.Error("Failed occurences 20, vol 200-299. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 18 && vol == "200-299" { t.Error("Failed occurences 18, vol 200-299. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 14 && vol == "200-299" { t.Error("Failed occurences 14, vol 200-299. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 12 && vol == "200-299" { t.Error("Failed occurences 12, vol 200-299. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 10 && vol == "200-299" { t.Error("Failed occurences 10, vol 200-299. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 8 && vol == "200-299" { t.Error("Failed occurences 8, vol 200-299. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 7 && vol == "200-299" { t.Error("Failed occurences 7, vol 200-299. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 6 && vol == "200-299" { t.Error("Failed occurences 6, vol 200-299. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 4 && vol == "200-299" { t.Error("Failed occurences 4, vol 200-299. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 2 && vol == "200-299" { t.Error("Failed occurences 2, vol 200-299. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 1 && vol == "200-299" { t.Error("Failed occurences 1, vol 200-299. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 0 && vol == "200-299" { t.Error("Failed occurences 0, vol 200-299. Actually was:", occurences, vol) } check1.Next() check1.Scan(&occurences, &vol) if occurences != 0 && vol == "200-299" { t.Error("Failed occurences 0, vol 200-299. Actually was:", occurences, vol) } }