diff options
author | Daniel Langesten <daniel.langest@gmail.com> | 2015-03-18 15:34:53 +0100 |
---|---|---|
committer | Daniel Langesten <daniel.langest@gmail.com> | 2015-03-18 15:34:53 +0100 |
commit | 00f8d6f65cedc301d4f5eb441db9babcc2efb901 (patch) | |
tree | 9cc1c5d45ef36f7b89038a1851518fe198c28f18 /stdin.go | |
parent | 1d2a084c0b6b1611d80e95f7420c68682475102d (diff) |
refactored code
Diffstat (limited to 'stdin.go')
-rw-r--r-- | stdin.go | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/stdin.go b/stdin.go new file mode 100644 index 0000000..c331076 --- /dev/null +++ b/stdin.go @@ -0,0 +1,70 @@ +package main + +import ( + "bufio" + "encoding/json" + "log" + "os" + "strings" + "time" +) + +//Starts a process that reads from stdin and +//puts the strings read on the returned channel +func readFromStdin() <-chan []byte { + out := make(chan []byte) + go func() { + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + out <- scanner.Bytes() + } + close(out) + }() + return out +} + +func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData { + out := make(chan []RawData) + ival, err := cfg.getInterval() + if err != nil { + log.Println("Could not parse interval: ", err) + } + timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin + go func() { + rDat := make([]RawData, 0) + for line := range in { + if !strings.HasPrefix(string(line), "{") { + //This should be a break in the output from pmacct + //so we deploy our collected data and set a new timeBin + ival, err := cfg.getInterval() + if err != nil { + log.Println("Could not parse interval: ", err) + } + timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin + //Send the data if we have something to send + if len(rDat) > 0 { + out <- rDat + rDat = make([]RawData, 0) + } + continue + } + + var rd RawData + err := json.Unmarshal(line, rd) + if err != nil { + log.Println("Failed in parsing json:", err) + close(out) + return + } + rd.time = timeBin + + rDat = append(rDat, rd) + } + //If there is any unsent data after in is closed we make sure to send it. + if len(rDat) > 0 { + out <- rDat + } + close(out) + }() + return out +} |