summaryrefslogtreecommitdiff
path: root/main.go
blob: 3c2dd4b13ea320afac3cd6d99e6aaa955ff57f4a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main

import (
	"bufio"
	"database/sql"
	"encoding/json"
	_ "github.com/go-sql-driver/mysql"
	"log"
	"os"
	"strings"
	"time"
	//"strings"
)

func main() {
	log.Println("Reading config...")
	cfg, err := readConfig()
	if err != nil {
		log.Println("Could not read config")
		return
	}
	log.Println("Done!")

	input := readFromStdin()
	rDatChan := parseRawData(input, cfg)
	cleanFromStdin(rDatChan, cfg)

	log.Print("Cleaning data...")
	starttime := time.Now()
	numOfRowsNotCleaned, err := cleanFromDB(cfg)
	if err != nil {
		log.Println(err)
		log.Println("Exiting...")
		return
	}
	log.Println("Done!")

	// If  either all rows are processed or if there is no limit for the processing
	// we can safely add noise to the cleaned data
	if (numOfRowsNotCleaned == 0 || cfg.Limit == 0) && cfg.Epsilon >= 0 {
		log.Println("Adding differential privacy noise to processed data...")
		db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
		if err != nil {
			log.Println("Failed to connect to db:", err)
			return
		}
		defer db.Close()

		ival, err := cfg.getInterval()
		if err != nil {
			log.Println("erronous interval in conf prevents the privatization of data:", err)
			return
		}

		err = privatizeCleaned(db, starttime.Add(-2*ival), cfg)
		if err != nil {
			log.Println("Failed to privatize data:", err)
		}
		log.Println("Done!")
	}
	log.Println("Finished processing, now exiting")
}

//Starts a process that reads from stdin and
//puts the strings read on the returned channel
func readFromStdin() <-chan []byte {
	out := make(chan []byte)
	go func() {
		scanner := bufio.NewScanner(os.Stdin)
		for scanner.Scan() {
			out <- scanner.Bytes()
		}
		close(out)
	}()
	return out
}

func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
	out := make(chan []RawData)
	ival, err := cfg.getInterval()
	if err != nil {
		log.Println("Could not parse interval: ", err)
	}
	timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
	go func() {
		rDat := make([]RawData, 0)
		for line := range in {
			if !strings.HasPrefix(string(line), "{") {
				//This should be a break in the output from pmacct
				//so we deploy our collected data and set a new timeBin
				ival, err := cfg.getInterval()
				if err != nil {
					log.Println("Could not parse interval: ", err)
				}
				timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
				//Send the data if we have something to send
				if len(rDat) > 0 {
					out <- rDat
					rDat = make([]RawData, 0)
				}
				continue
			}

			var rd RawData
			err := json.Unmarshal(line, rd)
			if err != nil {
				log.Println("Failed in parsing json:", err)
				close(out)
				return
			}
			rd.time = timeBin

			rDat = append(rDat, rd)
		}
		//If there is any unsent data after in is closed we make sure to send it.
		if len(rDat) > 0 {
			out <- rDat
		}
		close(out)
	}()
	return out
}