1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
package main
import (
"bufio"
"database/sql"
"encoding/json"
_ "github.com/go-sql-driver/mysql"
"log"
"os"
"strings"
"time"
//"strings"
)
func main() {
log.Println("Reading config...")
cfg, err := readConfig()
if err != nil {
log.Println("Could not read config")
return
}
log.Println("Done!")
input := readFromStdin()
rDatChan := parseRawData(input, cfg)
cleanFromStdin(rDatChan, cfg)
log.Print("Cleaning data...")
starttime := time.Now()
numOfRowsNotCleaned, err := cleanFromDB(cfg)
if err != nil {
log.Println(err)
log.Println("Exiting...")
return
}
log.Println("Done!")
// If either all rows are processed or if there is no limit for the processing
// we can safely add noise to the cleaned data
if (numOfRowsNotCleaned == 0 || cfg.Limit == 0) && cfg.Epsilon >= 0 {
log.Println("Adding differential privacy noise to processed data...")
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
log.Println("Failed to connect to db:", err)
return
}
defer db.Close()
ival, err := cfg.getInterval()
if err != nil {
log.Println("erronous interval in conf prevents the privatization of data:", err)
return
}
err = privatizeCleaned(db, starttime.Add(-2*ival), cfg)
if err != nil {
log.Println("Failed to privatize data:", err)
}
log.Println("Done!")
}
log.Println("Finished processing, now exiting")
}
//Starts a process that reads from stdin and
//puts the strings read on the returned channel
func readFromStdin() <-chan []byte {
out := make(chan []byte)
go func() {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
out <- scanner.Bytes()
}
close(out)
}()
return out
}
func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
out := make(chan []RawData)
ival, err := cfg.getInterval()
if err != nil {
log.Println("Could not parse interval: ", err)
}
timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
go func() {
rDat := make([]RawData, 0)
for line := range in {
if !strings.HasPrefix(string(line), "{") {
//This should be a break in the output from pmacct
//so we deploy our collected data and set a new timeBin
ival, err := cfg.getInterval()
if err != nil {
log.Println("Could not parse interval: ", err)
}
timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
//Send the data if we have something to send
if len(rDat) > 0 {
out <- rDat
rDat = make([]RawData, 0)
}
continue
}
var rd RawData
err := json.Unmarshal(line, rd)
if err != nil {
log.Println("Failed in parsing json:", err)
close(out)
return
}
rd.time = timeBin
rDat = append(rDat, rd)
}
//If there is any unsent data after in is closed we make sure to send it.
if len(rDat) > 0 {
out <- rDat
}
close(out)
}()
return out
}
|