summaryrefslogtreecommitdiff
path: root/stdin.go
diff options
context:
space:
mode:
authorDaniel Langesten <daniel.langest@gmail.com>2015-03-18 15:34:53 +0100
committerDaniel Langesten <daniel.langest@gmail.com>2015-03-18 15:34:53 +0100
commit00f8d6f65cedc301d4f5eb441db9babcc2efb901 (patch)
tree9cc1c5d45ef36f7b89038a1851518fe198c28f18 /stdin.go
parent1d2a084c0b6b1611d80e95f7420c68682475102d (diff)
refactored code
Diffstat (limited to 'stdin.go')
-rw-r--r--stdin.go70
1 files changed, 70 insertions, 0 deletions
diff --git a/stdin.go b/stdin.go
new file mode 100644
index 0000000..c331076
--- /dev/null
+++ b/stdin.go
@@ -0,0 +1,70 @@
+package main
+
+import (
+ "bufio"
+ "encoding/json"
+ "log"
+ "os"
+ "strings"
+ "time"
+)
+
+//Starts a process that reads from stdin and
+//puts the strings read on the returned channel
+func readFromStdin() <-chan []byte {
+ out := make(chan []byte)
+ go func() {
+ scanner := bufio.NewScanner(os.Stdin)
+ for scanner.Scan() {
+ out <- scanner.Bytes()
+ }
+ close(out)
+ }()
+ return out
+}
+
+func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
+ out := make(chan []RawData)
+ ival, err := cfg.getInterval()
+ if err != nil {
+ log.Println("Could not parse interval: ", err)
+ }
+ timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
+ go func() {
+ rDat := make([]RawData, 0)
+ for line := range in {
+ if !strings.HasPrefix(string(line), "{") {
+ //This should be a break in the output from pmacct
+ //so we deploy our collected data and set a new timeBin
+ ival, err := cfg.getInterval()
+ if err != nil {
+ log.Println("Could not parse interval: ", err)
+ }
+ timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
+ //Send the data if we have something to send
+ if len(rDat) > 0 {
+ out <- rDat
+ rDat = make([]RawData, 0)
+ }
+ continue
+ }
+
+ var rd RawData
+ err := json.Unmarshal(line, rd)
+ if err != nil {
+ log.Println("Failed in parsing json:", err)
+ close(out)
+ return
+ }
+ rd.time = timeBin
+
+ rDat = append(rDat, rd)
+ }
+ //If there is any unsent data after in is closed we make sure to send it.
+ if len(rDat) > 0 {
+ out <- rDat
+ }
+ close(out)
+ }()
+ return out
+}