diff options
author | Daniel Langesten <daniel.langest@gmail.com> | 2015-03-19 09:52:51 +0100 |
---|---|---|
committer | Daniel Langesten <daniel.langest@gmail.com> | 2015-03-19 09:52:51 +0100 |
commit | e3f4fcc1ffdf9da601013fd524a2d2372edfe520 (patch) | |
tree | 9989623afd39fa041a3c841ce85aae87d1ec782c | |
parent | 4be3cb322d5d909aefe1825962dd1bd4da2e1464 (diff) |
improved logging
-rw-r--r-- | cleaner.go | 26 | ||||
-rw-r--r-- | config.go | 10 | ||||
-rw-r--r-- | main.go | 36 | ||||
-rw-r--r-- | sqlQueries.go | 52 | ||||
-rw-r--r-- | stdin.go | 15 |
5 files changed, 90 insertions, 49 deletions
@@ -9,11 +9,19 @@ import ( "time" ) +var ( + logger *log.Logger +) + +func init() { + logger = log.New(os.Stdout, "[ Cleaner ]", log.LstdFlags) +} + func cleanFromDB(cfg *Config) (rowsLeft int, err error) { db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { - log.Println("Failed to connect to db") + logger.Println("Failed to connect to db") return } defer db.Close() @@ -33,20 +41,20 @@ func cleanFromDB(cfg *Config) (rowsLeft int, err error) { //Fetch data that should be cleaned rDat, err := fetchRawData(db, cfg, cleanLimit) if err != nil { - log.Println("Faild to fetch raw data") + logger.Println("Faild to fetch raw data") return } cDat, err := clean(rDat, cfg) if err != nil { - log.Println("Failed to clean data from db:", err) + logger.Println("Failed to clean data from db:", err) return } //Begin transaction tx, err := db.Begin() if err != nil { - log.Println("Failed to initialize transaction") + logger.Println("Failed to initialize transaction") return } @@ -54,7 +62,7 @@ func cleanFromDB(cfg *Config) (rowsLeft int, err error) { err = insertCleanData(tx, cfg, cDat) if err != nil { tx.Rollback() - log.Println("Failed to save cleaned data") + logger.Println("Failed to save cleaned data") return } @@ -62,13 +70,13 @@ func cleanFromDB(cfg *Config) (rowsLeft int, err error) { err = purgeRawData(tx, cfg, rDat) if err != nil { tx.Rollback() - log.Println("Failed to remove old data") + logger.Println("Failed to remove old data") return } rowsLeft, err = availableRows(tx, cfg, cleanLimit) if err != nil { tx.Rollback() - log.Println("Failed to fetch available rows") + logger.Println("Failed to fetch available rows") return } @@ -80,12 +88,12 @@ func cleanFromStdin(rDatChan <-chan []RawData, cfg *Config) error { for rDat := range rDatChan { cDat, err := clean(rDat, cfg) if err != nil { - log.Println("Failed to clean data from stdin:", err) + logger.Println("Failed to clean data from stdin:", err) return err } err = insertCleanDataToDB(cfg, cDat) if err != nil { - log.Println("Failed to insert clean data from stdin:", err) + logger.Println("Failed to insert clean data from stdin:", err) return err } } @@ -9,6 +9,14 @@ import ( "time" ) +var ( + logger *log.Logger +) + +func init() { + logger = log.New(os.Stdout, "[ Main ]", log.LstdFlags) +} + type Config struct { Limit int `json:limit` Interval string `json:interval` @@ -41,6 +49,7 @@ func (cfg *Config) getInterval() (interval time.Duration, err error) { } func readConfig() (cfg *Config, err error) { + logger.Println("Reading config...") content, err := ioutil.ReadFile("config.json") if err != nil { log.Println(err) @@ -50,4 +59,5 @@ func readConfig() (cfg *Config, err error) { log.Println(err) } return + logger.Println("Done!") } @@ -7,14 +7,20 @@ import ( "time" ) +var ( + logger *log.Logger +) + +func init() { + logger = log.New(os.Stdout, "[ Main ]", log.LstdFlags) +} + func main() { - log.Println("Reading config...") cfg, err := readConfig() if err != nil { - log.Println("Could not read config") + logger.Println("Could not read config") return } - log.Println("Done!") switch cfg.DataSource { case "stdin": @@ -22,51 +28,51 @@ func main() { case "mysq": processFromDB(cfg) default: - log.Println("Invalid dataSource in config. Needs to be either 'stdin' or 'mysql'.") + logger.Println("Invalid dataSource in config. Needs to be either 'stdin' or 'mysql'.") } - log.Println("Finished processing, now exiting") + logger.Println("Finished processing, now exiting") } func processFromStdin(cfg *Config) { - log.Println("Starting to process from stdin...") + logger.Println("Starting to process from stdin...") input := readFromStdin() rDatChan := parseRawData(input, cfg) cleanFromStdin(rDatChan, cfg) } func processFromDB(cfg *Config) { - log.Print("Cleaning data...") + logger.Print("Cleaning data...") starttime := time.Now() numOfRowsNotCleaned, err := cleanFromDB(cfg) if err != nil { - log.Println(err) - log.Println("Exiting...") + logger.Println(err) + logger.Println("Exiting...") return } - log.Println("Done!") + logger.Println("Done!") // If either all rows are processed or if there is no limit for the processing // we can safely add noise to the cleaned data if (numOfRowsNotCleaned == 0 || cfg.Limit == 0) && cfg.Epsilon >= 0 { - log.Println("Adding differential privacy noise to processed data...") + logger.Println("Adding differential privacy noise to processed data...") db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { - log.Println("Failed to connect to db:", err) + logger.Println("Failed to connect to db:", err) return } defer db.Close() ival, err := cfg.getInterval() if err != nil { - log.Println("erronous interval in conf prevents the privatization of data:", err) + logger.Println("erronous interval in conf prevents the privatization of data:", err) return } err = privatizeCleaned(db, starttime.Add(-2*ival), cfg) if err != nil { - log.Println("Failed to privatize data:", err) + logger.Println("Failed to privatize data:", err) } - log.Println("Done!") + logger.Println("Done!") } } diff --git a/sqlQueries.go b/sqlQueries.go index 03f87fb..241f4e8 100644 --- a/sqlQueries.go +++ b/sqlQueries.go @@ -11,6 +11,14 @@ const ( TIMEZONE = "UTC" ) +var ( + logger *log.Logger +) + +func init() { + logger = log.New(os.Stdout, "[ SQL ]", log.LstdFlags) +} + //Retrieves limit Rawdata entries that are older than tim //limit <= 0 returns all entries that are older than tim func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err error) { @@ -22,7 +30,7 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err e prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + cfg.RawTable + " WHERE stamp_processed IS NULL AND stamp_inserted < ?") } if err != nil { - log.Println("Failed to prepare select") + logger.Println("Failed to prepare select") return } @@ -33,26 +41,26 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err e rows, err = prepSel.Query(tim) } if err != nil { - log.Println("Failed to query prepared selection") + logger.Println("Failed to query prepared selection") return } defer rows.Close() tx, err := db.Begin() if err != nil { - log.Println("Failed to initialize transaction") + logger.Println("Failed to initialize transaction") return } prepUp, err := tx.Prepare("UPDATE " + cfg.RawTable + " SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?") if err != nil { - log.Println("Failed to prepare update") + logger.Println("Failed to prepare update") return } loc, err := time.LoadLocation(TIMEZONE) if err != nil { - log.Println("Couldn't load timezone") + logger.Println("Couldn't load timezone") return } for rows.Next() { @@ -60,18 +68,18 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err e var tim []byte err = rows.Scan(&r.IpSrc, &r.IpDst, &r.AsSrc, &r.AsDst, &r.PortSrc, &r.PortDst, &r.Packets, &r.PktLenDist, &tim) if err != nil { - log.Println("Failed to scan result of query") + logger.Println("Failed to scan result of query") return } r.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc) if err != nil { - log.Println("Failed to parse timestamp") + logger.Println("Failed to parse timestamp") return } _, err = prepUp.Exec(time.Now(), r.IpSrc, r.IpDst, r.AsSrc, r.AsDst, r.PortSrc, r.PortDst, r.Packets, r.PktLenDist, r.time) if err != nil { - log.Println("Failed to query prepared update") + logger.Println("Failed to query prepared update") tx.Rollback() return } @@ -123,14 +131,14 @@ func purgeRawData(tx *sql.Tx, cfg *Config, rDat []RawData) (err error) { func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error { prepStmt, err := tx.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?") if err != nil { - log.Println("Failed to prepare statement") + logger.Println("Failed to prepare statement") return err } for ix := range cd { _, err = prepStmt.Exec(cd[ix].ipbSrc, cd[ix].ipbDst, cd[ix].asSrc, cd[ix].asDst, cd[ix].portSrc, cd[ix].portDst, cd[ix].occurences, cd[ix].volume, cd[ix].time, cd[ix].occurences) if err != nil { - log.Println("Failed to execute statement") + logger.Println("Failed to execute statement") return err } } @@ -141,21 +149,21 @@ func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error { func insertCleanDataToDB(cfg *Config, cd []cleanedData) error { db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName) if err != nil { - log.Println("Failed to connect to db") + logger.Println("Failed to connect to db") return err } defer db.Close() prepStmt, err := db.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?") if err != nil { - log.Println("Failed to prepare statement") + logger.Println("Failed to prepare statement") return err } for ix := range cd { _, err = prepStmt.Exec(cd[ix].ipbSrc, cd[ix].ipbDst, cd[ix].asSrc, cd[ix].asDst, cd[ix].portSrc, cd[ix].portDst, cd[ix].occurences, cd[ix].volume, cd[ix].time, cd[ix].occurences) if err != nil { - log.Println("Failed to execute statement") + logger.Println("Failed to execute statement") return err } } @@ -215,26 +223,26 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) { } query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time_added,occurences FROM " + cfg.CleanTable + " WHERE time_added < ?") if err != nil { - log.Println("Failed to prepare query") + logger.Println("Failed to prepare query") return } rows, err := query.Query(t) if err != nil { - log.Println("Failed to query for unprivitized rows") + logger.Println("Failed to query for unprivitized rows") return } defer rows.Close() update, err := db.Prepare("UPDATE " + cfg.CleanTable + " SET occurences = ? , time_privatized = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time_added = ? ") if err != nil { - log.Println("Failed to prepare update") + logger.Println("Failed to prepare update") return } loc, err := time.LoadLocation(TIMEZONE) if err != nil { - log.Println("Couldn't load timezone") + logger.Println("Couldn't load timezone") return } var cd cleanedData @@ -242,12 +250,12 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) { var tim []byte err = rows.Scan(&cd.ipbSrc, &cd.ipbDst, &cd.asSrc, &cd.asDst, &cd.portSrc, &cd.portDst, &cd.volume, &tim, &cd.occurences) if err != nil { - log.Println("Failed to scan row") + logger.Println("Failed to scan row") return } cd.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc) if err != nil { - log.Println("Failed to parse timestamp") + logger.Println("Failed to parse timestamp") return } // Add differential privacy noise @@ -256,7 +264,7 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) { // Update the entry _, err := update.Exec(cd.occurences, time.Now(), cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.volume, cd.time) if err != nil { - log.Println("Failed to update an entry:", err) + logger.Println("Failed to update an entry:", err) } } return @@ -265,14 +273,14 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) { func availableRows(tx *sql.Tx, cfg *Config, timeLimit time.Time) (numRows int, err error) { stmt, err := tx.Prepare("SELECT COUNT(*) FROM " + cfg.RawTable + " WHERE stamp_inserted < ? ") if err != nil { - log.Println("Could not prepare statement") + logger.Println("Could not prepare statement") return } row := stmt.QueryRow(timeLimit) err = row.Scan(&numRows) if err != nil { - log.Println("Failed to scan result") + logger.Println("Failed to scan result") } return } @@ -9,6 +9,14 @@ import ( "time" ) +var ( + logger *log.Logger +) + +func init() { + logger = log.New(os.Stdout, "[stdin]", log.LstdFlags) +} + //Starts a process that reads from stdin and //puts the strings read on the returned channel func readFromStdin() <-chan []byte { @@ -27,18 +35,19 @@ func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData { out := make(chan []RawData) ival, err := cfg.getInterval() if err != nil { - log.Println("Could not parse interval: ", err) + logger.Println("Could not parse interval: ", err) } timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin go func() { rDat := make([]RawData, 0) for line := range in { if !strings.HasPrefix(string(line), "{") { + logger.Println("Got message:", string(line)) //This should be a break in the output from pmacct //so we deploy our collected data and set a new timeBin ival, err := cfg.getInterval() if err != nil { - log.Println("Could not parse interval: ", err) + logger.Println("Could not parse interval: ", err) } timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin //Send the data if we have something to send @@ -52,7 +61,7 @@ func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData { var rd RawData err := json.Unmarshal(line, &rd) if err != nil { - log.Println("Failed in parsing json:", err) + logger.Println("Failed in parsing json:", err) close(out) return } |