summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--flow-cleaner_test.go48
1 files changed, 28 insertions, 20 deletions
diff --git a/flow-cleaner_test.go b/flow-cleaner_test.go
index 16bc3b0..ac8e357 100644
--- a/flow-cleaner_test.go
+++ b/flow-cleaner_test.go
@@ -25,39 +25,47 @@ func TestCleaningFromJSON(t *testing.T) {
}
testProcessFromStdin(t, cfg)
+ time.Sleep(15 * time.Second)
controlDB(t, cfg)
}
func testProcessFromStdin(t *testing.T, cfg *Config) {
- file, err := os.Open("testdata/jsoninput")
- if err != nil {
- t.Fatal(err)
- }
+ stdin := make(chan []byte)
- fakeStdin := make(chan []byte)
- reader := bufio.NewReader(file)
go func() {
- defer close(fakeStdin)
- i := 0
- for i < 2 {
- line, err := reader.ReadBytes('\n')
- if err == io.EOF {
- i++
- time.Sleep(2 * time.Second)
- } else if err != nil {
- t.Fatal(err)
- } else {
- fakeStdin <- line
- }
+ for i := 0; i < 3; i++ {
+ fakeStdin(t, stdin)
+ //time.Sleep(3 * time.Minute)
}
+ close(stdin)
}()
- rDatChan := parseRawData(fakeStdin, cfg)
- err = cleanFromStdin(rDatChan, cfg)
+ rDatChan := parseRawData(stdin, cfg)
+ err := cleanFromStdin(rDatChan, cfg)
if err != nil {
t.Fatal(err)
}
}
+func fakeStdin(t *testing.T, wr chan<- []byte) {
+ file, err := os.Open("testdata/jsoninput")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer file.Close()
+
+ reader := bufio.NewReader(file)
+
+ for {
+ line, err := reader.ReadBytes('\n')
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ t.Fatal(err)
+ } else {
+ wr <- line
+ }
+ }
+}
func controlDB(t *testing.T, cfg *Config) {
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)