package main import ( "database/sql" "errors" _ "github.com/go-sql-driver/mysql" "log" "time" ) const ( DATABASE_USER = "root" DATABASE_PASS = "pass" DATABASE_CONNECTION = "" //e.g. "tcp(localhost:55555) DATABASE_NAME = "netflow" MAXIMUM_ENTRIES = 100 TIMESPAN = "day" EPSILON = 1000 ) func cleanData() (err error) { db, err := sql.Open("mysql", DATABASE_USER+":"+DATABASE_PASS+"@/"+DATABASE_NAME) if err != nil { log.Println("Failed to connect to db") return } defer db.Close() //Remove the processed mark on entries older than 1 day err = reprocess(db, time.Now().Add(-24*1*time.Hour)) if err != nil { return } //Fetch data that should be cleaned rDat, err := fetchRawData(db, MAXIMUM_ENTRIES) if err != nil { log.Println("Faild to fetch raw data") return } cDat, err := clean(rDat) if err != nil { log.Println("Failed to clean data") return } //Add noise for differential privacy for i := range cDat { cDat[i].occurances = diffpriv(cDat[i].occurances, 1, EPSILON) } //Begin transaction tx, err := db.Begin() if err != nil { log.Println("Failed to initialize transaction") return } //save cleaned data for _, cd := range cDat { err = insertCleanData(tx, cd.ipbSrc, cd.ipbDst, cd.volume, cd.time, cd.port, cd.occurances) if err != nil { tx.Rollback() log.Println("Failed to save cleaned data") return } } //remove old data err = purgeRawData(tx, rDat) if err != nil { tx.Rollback() log.Println("Failed to remove old data") return } tx.Commit() return } func getTimespan(t time.Time) (span time.Time, err error) { loc, err := time.LoadLocation("Local") switch { case TIMESPAN == "5min": //Round the date into 5 minutes y, m, d, h, min := t.Date() if err != nil { return } min = min / 5 span = time.Date(y, m, d, h, min, 0, 0, loc) case TIMESPAN == "10min": //Round the date into 10 minutes y, m, d, h, min := t.Date() if err != nil { return } min = min / 10 span = time.Date(y, m, d, h, min, 0, 0, loc) case TIMESPAN == "hour": //Round the date into hour y, m, d, h := t.Date() if err != nil { return } span = time.Date(y, m, d, h, 0, 0, 0, loc) case TIMESPAN == "day": //Round the date into day y, m, d := t.Date() if err != nil { return } span = time.Date(y, m, d, 0, 0, 0, 0, loc) default: err = errors.New("Bad timespan") return } return } func clean(rDat []RawData) (cDat []CleanData, err error) { // collect all ips so we can query for their ip blocks ips := make(map[string]*asnipPair) for _, rd := range rDat { ips[rd.ipSrc] = nil ips[rd.ipDst] = nil } var iplist []string for ip := range ips { iplist = append(iplist, ip) } pairs, err := findASAndIPBlock(iplist...) if err != nil { return } for ix, p := range pairs { ips[p.ipAdr] = &pairs[ix] } for _, rd := range rDat { vol := rd.getVolSize() var tim time.Time tim, err = getTimespan(rd.time) if err != nil { return } cDat = append(cDat, CleanData{ ipbSrc: ips[rd.ipSrc].ipBlock, ipbDst: ips[rd.ipDst].ipBlock, time: tim, port: rd.port, volume: vol, occurances: 1, }) } cDat = removeDups(cDat) return } func removeDups(cDat []CleanData) []CleanData { ret := make([]CleanData, 0) var found bool for ci := range cDat { found = false //Check if an equal struct already is appended for ri := range ret { if ret[ri].equals(&cDat[ci]) { //If found, increase it occurances instead of //appending a new struct ret[ri].occurances += cDat[ci].occurances found = true break } } if !found { //if no equal struct is found //append it ret = append(ret, cDat[ci]) } } return ret }