diff options
author | Daniel Langesten <daniel.langest@gmail.com> | 2015-03-18 15:34:03 +0100 |
---|---|---|
committer | Daniel Langesten <daniel.langest@gmail.com> | 2015-03-18 15:34:03 +0100 |
commit | cbbfbfd8f72b302fc7dba95c70b6f7bf5c0f1eac (patch) | |
tree | a25e264397971b67defe175f6e3a5b3e2b8165b9 | |
parent | af80532aa7ee6c313a63a0a83af43e6e64128027 (diff) |
added datasource to config
-rw-r--r-- | config.go | 2 | ||||
-rw-r--r-- | config.json | 4 | ||||
-rw-r--r-- | main.go | 82 |
3 files changed, 22 insertions, 66 deletions
@@ -14,6 +14,8 @@ type Config struct { Interval string `json:interval` Epsilon float64 `json:epsilon` + DataSource string `json:dataSource` + DBConn string `json:DBConn` DBName string `json:DBName` RawTable string `json:rawTable` diff --git a/config.json b/config.json index a99db83..6b2dc9e 100644 --- a/config.json +++ b/config.json @@ -8,6 +8,10 @@ "comment Epsilon": "Epsilon is the epsilon value for differential privacy. epsilon < 1 high privacy, 10 < epsilon low privacy. If epsilon is set to 0, differential privacy will not be used.", "epsilon": 0, + + "comment dataSource": "dataSource is from where the program should read and process data. Currently only mydql and stdin is supported." + "dataSource": "stdin", + "DBConn": "", "DBName": "test", "rawTable": "test_raw", @@ -1,15 +1,10 @@ package main import ( - "bufio" "database/sql" - "encoding/json" _ "github.com/go-sql-driver/mysql" "log" - "os" - "strings" "time" - //"strings" ) func main() { @@ -21,10 +16,26 @@ func main() { } log.Println("Done!") + switch cfg.DataSource { + case "stdin": + processFromStdin() + case "mysq": + pricessFromDB() + default: + log.Println("Invalid dataSource in config. Needs to be either 'stdin' or 'mysql'.") + } + + log.Println("Finished processing, now exiting") +} + +func processFromStdin() { + log.Println("Starting to process from stdin...") input := readFromStdin() rDatChan := parseRawData(input, cfg) cleanFromStdin(rDatChan, cfg) +} +func processFromDB() { log.Print("Cleaning data...") starttime := time.Now() numOfRowsNotCleaned, err := cleanFromDB(cfg) @@ -58,65 +69,4 @@ func main() { } log.Println("Done!") } - log.Println("Finished processing, now exiting") -} - -//Starts a process that reads from stdin and -//puts the strings read on the returned channel -func readFromStdin() <-chan []byte { - out := make(chan []byte) - go func() { - scanner := bufio.NewScanner(os.Stdin) - for scanner.Scan() { - out <- scanner.Bytes() - } - close(out) - }() - return out -} - -func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData { - out := make(chan []RawData) - ival, err := cfg.getInterval() - if err != nil { - log.Println("Could not parse interval: ", err) - } - timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin - go func() { - rDat := make([]RawData, 0) - for line := range in { - if !strings.HasPrefix(string(line), "{") { - //This should be a break in the output from pmacct - //so we deploy our collected data and set a new timeBin - ival, err := cfg.getInterval() - if err != nil { - log.Println("Could not parse interval: ", err) - } - timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin - //Send the data if we have something to send - if len(rDat) > 0 { - out <- rDat - rDat = make([]RawData, 0) - } - continue - } - - var rd RawData - err := json.Unmarshal(line, rd) - if err != nil { - log.Println("Failed in parsing json:", err) - close(out) - return - } - rd.time = timeBin - - rDat = append(rDat, rd) - } - //If there is any unsent data after in is closed we make sure to send it. - if len(rDat) > 0 { - out <- rDat - } - close(out) - }() - return out } |