summaryrefslogtreecommitdiff
path: root/flow-cleaner.go
diff options
context:
space:
mode:
Diffstat (limited to 'flow-cleaner.go')
-rw-r--r--flow-cleaner.go78
1 files changed, 78 insertions, 0 deletions
diff --git a/flow-cleaner.go b/flow-cleaner.go
new file mode 100644
index 0000000..71c1196
--- /dev/null
+++ b/flow-cleaner.go
@@ -0,0 +1,78 @@
+package main
+
+import (
+ "database/sql"
+ _ "github.com/go-sql-driver/mysql"
+ "log"
+ "time"
+)
+
+var (
+ logger *log.Logger
+)
+
+func init() {
+ logger = log.New(os.Stdout, "[ Main ]", log.LstdFlags)
+}
+
+func main() {
+ cfg, err := readConfig()
+ if err != nil {
+ logger.Println("Could not read config")
+ return
+ }
+
+ switch cfg.DataSource {
+ case "stdin":
+ processFromStdin(cfg)
+ case "mysq":
+ processFromDB(cfg)
+ default:
+ logger.Println("Invalid dataSource in config. Needs to be either 'stdin' or 'mysql'.")
+ }
+
+ logger.Println("Finished processing, now exiting")
+}
+
+func processFromStdin(cfg *Config) {
+ logger.Println("Starting to process from stdin...")
+ input := readFromStdin()
+ rDatChan := parseRawData(input, cfg)
+ cleanFromStdin(rDatChan, cfg)
+}
+
+func processFromDB(cfg *Config) {
+ logger.Print("Cleaning data...")
+ starttime := time.Now()
+ numOfRowsNotCleaned, err := cleanFromDB(cfg)
+ if err != nil {
+ logger.Println(err)
+ logger.Println("Exiting...")
+ return
+ }
+ logger.Println("Done!")
+
+ // If either all rows are processed or if there is no limit for the processing
+ // we can safely add noise to the cleaned data
+ if (numOfRowsNotCleaned == 0 || cfg.Limit == 0) && cfg.Epsilon >= 0 {
+ logger.Println("Adding differential privacy noise to processed data...")
+ db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
+ if err != nil {
+ logger.Println("Failed to connect to db:", err)
+ return
+ }
+ defer db.Close()
+
+ ival, err := cfg.getInterval()
+ if err != nil {
+ logger.Println("erronous interval in conf prevents the privatization of data:", err)
+ return
+ }
+
+ err = privatizeCleaned(db, starttime.Add(-2*ival), cfg)
+ if err != nil {
+ logger.Println("Failed to privatize data:", err)
+ }
+ logger.Println("Done!")
+ }
+}