summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Langesten <daniel.langest@gmail.com>2015-03-18 15:34:03 +0100
committerDaniel Langesten <daniel.langest@gmail.com>2015-03-18 15:34:03 +0100
commitcbbfbfd8f72b302fc7dba95c70b6f7bf5c0f1eac (patch)
treea25e264397971b67defe175f6e3a5b3e2b8165b9
parentaf80532aa7ee6c313a63a0a83af43e6e64128027 (diff)
added datasource to config
-rw-r--r--config.go2
-rw-r--r--config.json4
-rw-r--r--main.go82
3 files changed, 22 insertions, 66 deletions
diff --git a/config.go b/config.go
index b6c903f..90c58ee 100644
--- a/config.go
+++ b/config.go
@@ -14,6 +14,8 @@ type Config struct {
Interval string `json:interval`
Epsilon float64 `json:epsilon`
+ DataSource string `json:dataSource`
+
DBConn string `json:DBConn`
DBName string `json:DBName`
RawTable string `json:rawTable`
diff --git a/config.json b/config.json
index a99db83..6b2dc9e 100644
--- a/config.json
+++ b/config.json
@@ -8,6 +8,10 @@
"comment Epsilon": "Epsilon is the epsilon value for differential privacy. epsilon < 1 high privacy, 10 < epsilon low privacy. If epsilon is set to 0, differential privacy will not be used.",
"epsilon": 0,
+
+ "comment dataSource": "dataSource is from where the program should read and process data. Currently only mydql and stdin is supported."
+ "dataSource": "stdin",
+
"DBConn": "",
"DBName": "test",
"rawTable": "test_raw",
diff --git a/main.go b/main.go
index 3c2dd4b..05bbac3 100644
--- a/main.go
+++ b/main.go
@@ -1,15 +1,10 @@
package main
import (
- "bufio"
"database/sql"
- "encoding/json"
_ "github.com/go-sql-driver/mysql"
"log"
- "os"
- "strings"
"time"
- //"strings"
)
func main() {
@@ -21,10 +16,26 @@ func main() {
}
log.Println("Done!")
+ switch cfg.DataSource {
+ case "stdin":
+ processFromStdin()
+ case "mysq":
+ pricessFromDB()
+ default:
+ log.Println("Invalid dataSource in config. Needs to be either 'stdin' or 'mysql'.")
+ }
+
+ log.Println("Finished processing, now exiting")
+}
+
+func processFromStdin() {
+ log.Println("Starting to process from stdin...")
input := readFromStdin()
rDatChan := parseRawData(input, cfg)
cleanFromStdin(rDatChan, cfg)
+}
+func processFromDB() {
log.Print("Cleaning data...")
starttime := time.Now()
numOfRowsNotCleaned, err := cleanFromDB(cfg)
@@ -58,65 +69,4 @@ func main() {
}
log.Println("Done!")
}
- log.Println("Finished processing, now exiting")
-}
-
-//Starts a process that reads from stdin and
-//puts the strings read on the returned channel
-func readFromStdin() <-chan []byte {
- out := make(chan []byte)
- go func() {
- scanner := bufio.NewScanner(os.Stdin)
- for scanner.Scan() {
- out <- scanner.Bytes()
- }
- close(out)
- }()
- return out
-}
-
-func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
- out := make(chan []RawData)
- ival, err := cfg.getInterval()
- if err != nil {
- log.Println("Could not parse interval: ", err)
- }
- timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
- go func() {
- rDat := make([]RawData, 0)
- for line := range in {
- if !strings.HasPrefix(string(line), "{") {
- //This should be a break in the output from pmacct
- //so we deploy our collected data and set a new timeBin
- ival, err := cfg.getInterval()
- if err != nil {
- log.Println("Could not parse interval: ", err)
- }
- timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
- //Send the data if we have something to send
- if len(rDat) > 0 {
- out <- rDat
- rDat = make([]RawData, 0)
- }
- continue
- }
-
- var rd RawData
- err := json.Unmarshal(line, rd)
- if err != nil {
- log.Println("Failed in parsing json:", err)
- close(out)
- return
- }
- rd.time = timeBin
-
- rDat = append(rDat, rd)
- }
- //If there is any unsent data after in is closed we make sure to send it.
- if len(rDat) > 0 {
- out <- rDat
- }
- close(out)
- }()
- return out
}