summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Langesten <daniel.langest@gmail.com>2015-03-19 09:52:51 +0100
committerDaniel Langesten <daniel.langest@gmail.com>2015-03-19 09:52:51 +0100
commite3f4fcc1ffdf9da601013fd524a2d2372edfe520 (patch)
tree9989623afd39fa041a3c841ce85aae87d1ec782c
parent4be3cb322d5d909aefe1825962dd1bd4da2e1464 (diff)
improved logging
-rw-r--r--cleaner.go26
-rw-r--r--config.go10
-rw-r--r--main.go36
-rw-r--r--sqlQueries.go52
-rw-r--r--stdin.go15
5 files changed, 90 insertions, 49 deletions
diff --git a/cleaner.go b/cleaner.go
index 186cd44..97ab72a 100644
--- a/cleaner.go
+++ b/cleaner.go
@@ -9,11 +9,19 @@ import (
"time"
)
+var (
+ logger *log.Logger
+)
+
+func init() {
+ logger = log.New(os.Stdout, "[ Cleaner ]", log.LstdFlags)
+}
+
func cleanFromDB(cfg *Config) (rowsLeft int, err error) {
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
- log.Println("Failed to connect to db")
+ logger.Println("Failed to connect to db")
return
}
defer db.Close()
@@ -33,20 +41,20 @@ func cleanFromDB(cfg *Config) (rowsLeft int, err error) {
//Fetch data that should be cleaned
rDat, err := fetchRawData(db, cfg, cleanLimit)
if err != nil {
- log.Println("Faild to fetch raw data")
+ logger.Println("Faild to fetch raw data")
return
}
cDat, err := clean(rDat, cfg)
if err != nil {
- log.Println("Failed to clean data from db:", err)
+ logger.Println("Failed to clean data from db:", err)
return
}
//Begin transaction
tx, err := db.Begin()
if err != nil {
- log.Println("Failed to initialize transaction")
+ logger.Println("Failed to initialize transaction")
return
}
@@ -54,7 +62,7 @@ func cleanFromDB(cfg *Config) (rowsLeft int, err error) {
err = insertCleanData(tx, cfg, cDat)
if err != nil {
tx.Rollback()
- log.Println("Failed to save cleaned data")
+ logger.Println("Failed to save cleaned data")
return
}
@@ -62,13 +70,13 @@ func cleanFromDB(cfg *Config) (rowsLeft int, err error) {
err = purgeRawData(tx, cfg, rDat)
if err != nil {
tx.Rollback()
- log.Println("Failed to remove old data")
+ logger.Println("Failed to remove old data")
return
}
rowsLeft, err = availableRows(tx, cfg, cleanLimit)
if err != nil {
tx.Rollback()
- log.Println("Failed to fetch available rows")
+ logger.Println("Failed to fetch available rows")
return
}
@@ -80,12 +88,12 @@ func cleanFromStdin(rDatChan <-chan []RawData, cfg *Config) error {
for rDat := range rDatChan {
cDat, err := clean(rDat, cfg)
if err != nil {
- log.Println("Failed to clean data from stdin:", err)
+ logger.Println("Failed to clean data from stdin:", err)
return err
}
err = insertCleanDataToDB(cfg, cDat)
if err != nil {
- log.Println("Failed to insert clean data from stdin:", err)
+ logger.Println("Failed to insert clean data from stdin:", err)
return err
}
}
diff --git a/config.go b/config.go
index 90c58ee..d0392d9 100644
--- a/config.go
+++ b/config.go
@@ -9,6 +9,14 @@ import (
"time"
)
+var (
+ logger *log.Logger
+)
+
+func init() {
+ logger = log.New(os.Stdout, "[ Main ]", log.LstdFlags)
+}
+
type Config struct {
Limit int `json:limit`
Interval string `json:interval`
@@ -41,6 +49,7 @@ func (cfg *Config) getInterval() (interval time.Duration, err error) {
}
func readConfig() (cfg *Config, err error) {
+ logger.Println("Reading config...")
content, err := ioutil.ReadFile("config.json")
if err != nil {
log.Println(err)
@@ -50,4 +59,5 @@ func readConfig() (cfg *Config, err error) {
log.Println(err)
}
return
+ logger.Println("Done!")
}
diff --git a/main.go b/main.go
index ac004de..71c1196 100644
--- a/main.go
+++ b/main.go
@@ -7,14 +7,20 @@ import (
"time"
)
+var (
+ logger *log.Logger
+)
+
+func init() {
+ logger = log.New(os.Stdout, "[ Main ]", log.LstdFlags)
+}
+
func main() {
- log.Println("Reading config...")
cfg, err := readConfig()
if err != nil {
- log.Println("Could not read config")
+ logger.Println("Could not read config")
return
}
- log.Println("Done!")
switch cfg.DataSource {
case "stdin":
@@ -22,51 +28,51 @@ func main() {
case "mysq":
processFromDB(cfg)
default:
- log.Println("Invalid dataSource in config. Needs to be either 'stdin' or 'mysql'.")
+ logger.Println("Invalid dataSource in config. Needs to be either 'stdin' or 'mysql'.")
}
- log.Println("Finished processing, now exiting")
+ logger.Println("Finished processing, now exiting")
}
func processFromStdin(cfg *Config) {
- log.Println("Starting to process from stdin...")
+ logger.Println("Starting to process from stdin...")
input := readFromStdin()
rDatChan := parseRawData(input, cfg)
cleanFromStdin(rDatChan, cfg)
}
func processFromDB(cfg *Config) {
- log.Print("Cleaning data...")
+ logger.Print("Cleaning data...")
starttime := time.Now()
numOfRowsNotCleaned, err := cleanFromDB(cfg)
if err != nil {
- log.Println(err)
- log.Println("Exiting...")
+ logger.Println(err)
+ logger.Println("Exiting...")
return
}
- log.Println("Done!")
+ logger.Println("Done!")
// If either all rows are processed or if there is no limit for the processing
// we can safely add noise to the cleaned data
if (numOfRowsNotCleaned == 0 || cfg.Limit == 0) && cfg.Epsilon >= 0 {
- log.Println("Adding differential privacy noise to processed data...")
+ logger.Println("Adding differential privacy noise to processed data...")
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
- log.Println("Failed to connect to db:", err)
+ logger.Println("Failed to connect to db:", err)
return
}
defer db.Close()
ival, err := cfg.getInterval()
if err != nil {
- log.Println("erronous interval in conf prevents the privatization of data:", err)
+ logger.Println("erronous interval in conf prevents the privatization of data:", err)
return
}
err = privatizeCleaned(db, starttime.Add(-2*ival), cfg)
if err != nil {
- log.Println("Failed to privatize data:", err)
+ logger.Println("Failed to privatize data:", err)
}
- log.Println("Done!")
+ logger.Println("Done!")
}
}
diff --git a/sqlQueries.go b/sqlQueries.go
index 03f87fb..241f4e8 100644
--- a/sqlQueries.go
+++ b/sqlQueries.go
@@ -11,6 +11,14 @@ const (
TIMEZONE = "UTC"
)
+var (
+ logger *log.Logger
+)
+
+func init() {
+ logger = log.New(os.Stdout, "[ SQL ]", log.LstdFlags)
+}
+
//Retrieves limit Rawdata entries that are older than tim
//limit <= 0 returns all entries that are older than tim
func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err error) {
@@ -22,7 +30,7 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err e
prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + cfg.RawTable + " WHERE stamp_processed IS NULL AND stamp_inserted < ?")
}
if err != nil {
- log.Println("Failed to prepare select")
+ logger.Println("Failed to prepare select")
return
}
@@ -33,26 +41,26 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err e
rows, err = prepSel.Query(tim)
}
if err != nil {
- log.Println("Failed to query prepared selection")
+ logger.Println("Failed to query prepared selection")
return
}
defer rows.Close()
tx, err := db.Begin()
if err != nil {
- log.Println("Failed to initialize transaction")
+ logger.Println("Failed to initialize transaction")
return
}
prepUp, err := tx.Prepare("UPDATE " + cfg.RawTable + " SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?")
if err != nil {
- log.Println("Failed to prepare update")
+ logger.Println("Failed to prepare update")
return
}
loc, err := time.LoadLocation(TIMEZONE)
if err != nil {
- log.Println("Couldn't load timezone")
+ logger.Println("Couldn't load timezone")
return
}
for rows.Next() {
@@ -60,18 +68,18 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err e
var tim []byte
err = rows.Scan(&r.IpSrc, &r.IpDst, &r.AsSrc, &r.AsDst, &r.PortSrc, &r.PortDst, &r.Packets, &r.PktLenDist, &tim)
if err != nil {
- log.Println("Failed to scan result of query")
+ logger.Println("Failed to scan result of query")
return
}
r.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc)
if err != nil {
- log.Println("Failed to parse timestamp")
+ logger.Println("Failed to parse timestamp")
return
}
_, err = prepUp.Exec(time.Now(), r.IpSrc, r.IpDst, r.AsSrc, r.AsDst, r.PortSrc, r.PortDst, r.Packets, r.PktLenDist, r.time)
if err != nil {
- log.Println("Failed to query prepared update")
+ logger.Println("Failed to query prepared update")
tx.Rollback()
return
}
@@ -123,14 +131,14 @@ func purgeRawData(tx *sql.Tx, cfg *Config, rDat []RawData) (err error) {
func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error {
prepStmt, err := tx.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
- log.Println("Failed to prepare statement")
+ logger.Println("Failed to prepare statement")
return err
}
for ix := range cd {
_, err = prepStmt.Exec(cd[ix].ipbSrc, cd[ix].ipbDst, cd[ix].asSrc, cd[ix].asDst, cd[ix].portSrc, cd[ix].portDst, cd[ix].occurences, cd[ix].volume, cd[ix].time, cd[ix].occurences)
if err != nil {
- log.Println("Failed to execute statement")
+ logger.Println("Failed to execute statement")
return err
}
}
@@ -141,21 +149,21 @@ func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error {
func insertCleanDataToDB(cfg *Config, cd []cleanedData) error {
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
- log.Println("Failed to connect to db")
+ logger.Println("Failed to connect to db")
return err
}
defer db.Close()
prepStmt, err := db.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
- log.Println("Failed to prepare statement")
+ logger.Println("Failed to prepare statement")
return err
}
for ix := range cd {
_, err = prepStmt.Exec(cd[ix].ipbSrc, cd[ix].ipbDst, cd[ix].asSrc, cd[ix].asDst, cd[ix].portSrc, cd[ix].portDst, cd[ix].occurences, cd[ix].volume, cd[ix].time, cd[ix].occurences)
if err != nil {
- log.Println("Failed to execute statement")
+ logger.Println("Failed to execute statement")
return err
}
}
@@ -215,26 +223,26 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
}
query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time_added,occurences FROM " + cfg.CleanTable + " WHERE time_added < ?")
if err != nil {
- log.Println("Failed to prepare query")
+ logger.Println("Failed to prepare query")
return
}
rows, err := query.Query(t)
if err != nil {
- log.Println("Failed to query for unprivitized rows")
+ logger.Println("Failed to query for unprivitized rows")
return
}
defer rows.Close()
update, err := db.Prepare("UPDATE " + cfg.CleanTable + " SET occurences = ? , time_privatized = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time_added = ? ")
if err != nil {
- log.Println("Failed to prepare update")
+ logger.Println("Failed to prepare update")
return
}
loc, err := time.LoadLocation(TIMEZONE)
if err != nil {
- log.Println("Couldn't load timezone")
+ logger.Println("Couldn't load timezone")
return
}
var cd cleanedData
@@ -242,12 +250,12 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
var tim []byte
err = rows.Scan(&cd.ipbSrc, &cd.ipbDst, &cd.asSrc, &cd.asDst, &cd.portSrc, &cd.portDst, &cd.volume, &tim, &cd.occurences)
if err != nil {
- log.Println("Failed to scan row")
+ logger.Println("Failed to scan row")
return
}
cd.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc)
if err != nil {
- log.Println("Failed to parse timestamp")
+ logger.Println("Failed to parse timestamp")
return
}
// Add differential privacy noise
@@ -256,7 +264,7 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
// Update the entry
_, err := update.Exec(cd.occurences, time.Now(), cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.volume, cd.time)
if err != nil {
- log.Println("Failed to update an entry:", err)
+ logger.Println("Failed to update an entry:", err)
}
}
return
@@ -265,14 +273,14 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
func availableRows(tx *sql.Tx, cfg *Config, timeLimit time.Time) (numRows int, err error) {
stmt, err := tx.Prepare("SELECT COUNT(*) FROM " + cfg.RawTable + " WHERE stamp_inserted < ? ")
if err != nil {
- log.Println("Could not prepare statement")
+ logger.Println("Could not prepare statement")
return
}
row := stmt.QueryRow(timeLimit)
err = row.Scan(&numRows)
if err != nil {
- log.Println("Failed to scan result")
+ logger.Println("Failed to scan result")
}
return
}
diff --git a/stdin.go b/stdin.go
index 082dddf..69c8efb 100644
--- a/stdin.go
+++ b/stdin.go
@@ -9,6 +9,14 @@ import (
"time"
)
+var (
+ logger *log.Logger
+)
+
+func init() {
+ logger = log.New(os.Stdout, "[stdin]", log.LstdFlags)
+}
+
//Starts a process that reads from stdin and
//puts the strings read on the returned channel
func readFromStdin() <-chan []byte {
@@ -27,18 +35,19 @@ func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
out := make(chan []RawData)
ival, err := cfg.getInterval()
if err != nil {
- log.Println("Could not parse interval: ", err)
+ logger.Println("Could not parse interval: ", err)
}
timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
go func() {
rDat := make([]RawData, 0)
for line := range in {
if !strings.HasPrefix(string(line), "{") {
+ logger.Println("Got message:", string(line))
//This should be a break in the output from pmacct
//so we deploy our collected data and set a new timeBin
ival, err := cfg.getInterval()
if err != nil {
- log.Println("Could not parse interval: ", err)
+ logger.Println("Could not parse interval: ", err)
}
timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
//Send the data if we have something to send
@@ -52,7 +61,7 @@ func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
var rd RawData
err := json.Unmarshal(line, &rd)
if err != nil {
- log.Println("Failed in parsing json:", err)
+ logger.Println("Failed in parsing json:", err)
close(out)
return
}