summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Langesten <daniel.langest@gmail.com>2015-03-19 10:59:02 +0100
committerDaniel Langesten <daniel.langest@gmail.com>2015-03-19 10:59:02 +0100
commit048b3a1f7a6c08c215a25c73c0e6d5c63e888a7d (patch)
treedc44a94f2b297526a4cf8716a0e8b5cdca6f2bb2
parent244fba51eb4807dc8195edf722c742acbbc18ac3 (diff)
compile issues fix
-rw-r--r--cleaner.go23
-rw-r--r--config.go9
-rw-r--r--flow-cleaner.go33
-rw-r--r--sqlQueries.go49
-rw-r--r--stdin.go20
-rw-r--r--whois.go8
6 files changed, 73 insertions, 69 deletions
diff --git a/cleaner.go b/cleaner.go
index 0e213c9..001542f 100644
--- a/cleaner.go
+++ b/cleaner.go
@@ -6,22 +6,23 @@ import (
"fmt"
_ "github.com/go-sql-driver/mysql"
"log"
+ "os"
"time"
)
var (
- logger *log.Logger
+ cllogger *log.Logger
)
func init() {
- logger = log.New(os.Stdout, "[ Cleaner ]", log.LstdFlags)
+ cllogger = log.New(os.Stdout, "[ Cleaner ]", log.LstdFlags)
}
func cleanFromDB(cfg *Config) (rowsLeft int, err error) {
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
- logger.Println("Failed to connect to db")
+ cllogger.Println("Failed to connect to db")
return
}
defer db.Close()
@@ -41,20 +42,20 @@ func cleanFromDB(cfg *Config) (rowsLeft int, err error) {
//Fetch data that should be cleaned
rDat, err := fetchRawData(db, cfg, cleanLimit)
if err != nil {
- logger.Println("Faild to fetch raw data")
+ cllogger.Println("Faild to fetch raw data")
return
}
cDat, err := clean(rDat, cfg)
if err != nil {
- logger.Println("Failed to clean data from db:", err)
+ cllogger.Println("Failed to clean data from db:", err)
return
}
//Begin transaction
tx, err := db.Begin()
if err != nil {
- logger.Println("Failed to initialize transaction")
+ cllogger.Println("Failed to initialize transaction")
return
}
@@ -62,7 +63,7 @@ func cleanFromDB(cfg *Config) (rowsLeft int, err error) {
err = insertCleanData(tx, cfg, cDat)
if err != nil {
tx.Rollback()
- logger.Println("Failed to save cleaned data")
+ cllogger.Println("Failed to save cleaned data")
return
}
@@ -70,13 +71,13 @@ func cleanFromDB(cfg *Config) (rowsLeft int, err error) {
err = purgeRawData(tx, cfg, rDat)
if err != nil {
tx.Rollback()
- logger.Println("Failed to remove old data")
+ cllogger.Println("Failed to remove old data")
return
}
rowsLeft, err = availableRows(tx, cfg, cleanLimit)
if err != nil {
tx.Rollback()
- logger.Println("Failed to fetch available rows")
+ cllogger.Println("Failed to fetch available rows")
return
}
@@ -88,12 +89,12 @@ func cleanFromStdin(rDatChan <-chan []RawData, cfg *Config) error {
for rDat := range rDatChan {
cDat, err := clean(rDat, cfg)
if err != nil {
- logger.Println("Failed to clean data from stdin:", err)
+ cllogger.Println("Failed to clean data from stdin:", err)
return err
}
err = insertCleanDataToDB(cfg, cDat)
if err != nil {
- logger.Println("Failed to insert clean data from stdin:", err)
+ cllogger.Println("Failed to insert clean data from stdin:", err)
return err
}
}
diff --git a/config.go b/config.go
index 910d523..8fdfc92 100644
--- a/config.go
+++ b/config.go
@@ -6,15 +6,16 @@ import (
"fmt"
"io/ioutil"
"log"
+ "os"
"time"
)
var (
- logger *log.Logger
+ clogger *log.Logger
)
func init() {
- logger = log.New(os.Stdout, "[ Main ]", log.LstdFlags)
+ clogger = log.New(os.Stdout, "[ Main ]", log.LstdFlags)
}
type Config struct {
@@ -51,7 +52,7 @@ func (cfg *Config) getInterval() (interval time.Duration, err error) {
}
func readConfig() (cfg *Config, err error) {
- logger.Println("Reading config...")
+ clogger.Println("Reading config...")
content, err := ioutil.ReadFile("config.json")
if err != nil {
log.Println(err)
@@ -60,6 +61,6 @@ func readConfig() (cfg *Config, err error) {
if err != nil {
log.Println(err)
}
+ clogger.Println("Done!")
return
- logger.Println("Done!")
}
diff --git a/flow-cleaner.go b/flow-cleaner.go
index e840c41..f40f1c3 100644
--- a/flow-cleaner.go
+++ b/flow-cleaner.go
@@ -4,21 +4,22 @@ import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
"log"
+ "os"
"time"
)
var (
- logger *log.Logger
+ flogger *log.Logger
)
func init() {
- logger = log.New(os.Stdout, "[ Main ]", log.LstdFlags)
+ flogger = log.New(os.Stdout, "[ Flow-cleaner ]", log.LstdFlags)
}
func main() {
cfg, err := readConfig()
if err != nil {
- logger.Println("Could not read config")
+ flogger.Println("Could not read config")
return
}
@@ -28,52 +29,52 @@ func main() {
case "mysq":
processFromDB(cfg)
default:
- logger.Println("Invalid dataSource in config. Needs to be either 'stdin' or 'mysql'.")
+ flogger.Println("Invalid dataSource in config. Needs to be either 'stdin' or 'mysql'.")
}
- logger.Println("Finished processing, now exiting")
+ flogger.Println("Finished processing, now exiting")
}
func processFromStdin(cfg *Config) {
- logger.Println("Starting to process from stdin...")
+ flogger.Println("Starting to process from stdin...")
input := readFromStdin()
rDatChan := parseRawData(input, cfg)
cleanFromStdin(rDatChan, cfg)
- logger.Println("Finished processing from stdin!")
+ flogger.Println("Finished processing from stdin!")
}
func processFromDB(cfg *Config) {
- logger.Print("Starting to process from db...")
+ flogger.Print("Starting to process from db...")
starttime := time.Now()
numOfRowsNotCleaned, err := cleanFromDB(cfg)
if err != nil {
- logger.Println(err)
- logger.Println("Exiting...")
+ flogger.Println(err)
+ flogger.Println("Exiting...")
return
}
- logger.Println("Finished processing from db!")
+ flogger.Println("Finished processing from db!")
// If either all rows are processed or if there is no limit for the processing
// we can safely add noise to the cleaned data
if (numOfRowsNotCleaned == 0 || cfg.Limit == 0) && cfg.Epsilon >= 0 {
- logger.Println("Adding differential privacy noise to processed data...")
+ flogger.Println("Adding differential privacy noise to processed data...")
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
- logger.Println("Failed to connect to db:", err)
+ flogger.Println("Failed to connect to db:", err)
return
}
defer db.Close()
ival, err := cfg.getInterval()
if err != nil {
- logger.Println("erronous interval in conf prevents the privatization of data:", err)
+ flogger.Println("erronous interval in conf prevents the privatization of data:", err)
return
}
err = privatizeCleaned(db, starttime.Add(-2*ival), cfg)
if err != nil {
- logger.Println("Failed to privatize data:", err)
+ flogger.Println("Failed to privatize data:", err)
}
- logger.Println("Done!")
+ flogger.Println("Done!")
}
}
diff --git a/sqlQueries.go b/sqlQueries.go
index 241f4e8..e7eb06f 100644
--- a/sqlQueries.go
+++ b/sqlQueries.go
@@ -4,6 +4,7 @@ import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
"log"
+ "os"
"time"
)
@@ -12,11 +13,11 @@ const (
)
var (
- logger *log.Logger
+ slogger *log.Logger
)
func init() {
- logger = log.New(os.Stdout, "[ SQL ]", log.LstdFlags)
+ slogger = log.New(os.Stdout, "[ SQL ]", log.LstdFlags)
}
//Retrieves limit Rawdata entries that are older than tim
@@ -30,7 +31,7 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err e
prepSel, err = db.Prepare("SELECT ip_src,ip_dst,as_src,as_dst,port_src,port_dst,packets,pkt_len_distrib,stamp_inserted FROM " + cfg.RawTable + " WHERE stamp_processed IS NULL AND stamp_inserted < ?")
}
if err != nil {
- logger.Println("Failed to prepare select")
+ slogger.Println("Failed to prepare select")
return
}
@@ -41,26 +42,26 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err e
rows, err = prepSel.Query(tim)
}
if err != nil {
- logger.Println("Failed to query prepared selection")
+ slogger.Println("Failed to query prepared selection")
return
}
defer rows.Close()
tx, err := db.Begin()
if err != nil {
- logger.Println("Failed to initialize transaction")
+ slogger.Println("Failed to initialize transaction")
return
}
prepUp, err := tx.Prepare("UPDATE " + cfg.RawTable + " SET stamp_processed = ? where ip_src = ? AND ip_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND packets = ? AND pkt_len_distrib = ? AND stamp_inserted = ?")
if err != nil {
- logger.Println("Failed to prepare update")
+ slogger.Println("Failed to prepare update")
return
}
loc, err := time.LoadLocation(TIMEZONE)
if err != nil {
- logger.Println("Couldn't load timezone")
+ slogger.Println("Couldn't load timezone")
return
}
for rows.Next() {
@@ -68,18 +69,18 @@ func fetchRawData(db *sql.DB, cfg *Config, tim time.Time) (rDat []RawData, err e
var tim []byte
err = rows.Scan(&r.IpSrc, &r.IpDst, &r.AsSrc, &r.AsDst, &r.PortSrc, &r.PortDst, &r.Packets, &r.PktLenDist, &tim)
if err != nil {
- logger.Println("Failed to scan result of query")
+ slogger.Println("Failed to scan result of query")
return
}
r.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc)
if err != nil {
- logger.Println("Failed to parse timestamp")
+ slogger.Println("Failed to parse timestamp")
return
}
_, err = prepUp.Exec(time.Now(), r.IpSrc, r.IpDst, r.AsSrc, r.AsDst, r.PortSrc, r.PortDst, r.Packets, r.PktLenDist, r.time)
if err != nil {
- logger.Println("Failed to query prepared update")
+ slogger.Println("Failed to query prepared update")
tx.Rollback()
return
}
@@ -131,14 +132,14 @@ func purgeRawData(tx *sql.Tx, cfg *Config, rDat []RawData) (err error) {
func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error {
prepStmt, err := tx.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
- logger.Println("Failed to prepare statement")
+ slogger.Println("Failed to prepare statement")
return err
}
for ix := range cd {
_, err = prepStmt.Exec(cd[ix].ipbSrc, cd[ix].ipbDst, cd[ix].asSrc, cd[ix].asDst, cd[ix].portSrc, cd[ix].portDst, cd[ix].occurences, cd[ix].volume, cd[ix].time, cd[ix].occurences)
if err != nil {
- logger.Println("Failed to execute statement")
+ slogger.Println("Failed to execute statement")
return err
}
}
@@ -149,21 +150,21 @@ func insertCleanData(tx *sql.Tx, cfg *Config, cd []cleanedData) error {
func insertCleanDataToDB(cfg *Config, cd []cleanedData) error {
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
- logger.Println("Failed to connect to db")
+ slogger.Println("Failed to connect to db")
return err
}
defer db.Close()
prepStmt, err := db.Prepare("INSERT INTO " + cfg.CleanTable + " (ipb_src, ipb_dst, as_src, as_dst, port_src, port_dst, occurences, volume, time_added) VALUES ( ? , ? , ? , ? , ? , ? , ? , ? , ? ) ON DUPLICATE KEY UPDATE occurences = occurences + ?")
if err != nil {
- logger.Println("Failed to prepare statement")
+ slogger.Println("Failed to prepare statement")
return err
}
for ix := range cd {
_, err = prepStmt.Exec(cd[ix].ipbSrc, cd[ix].ipbDst, cd[ix].asSrc, cd[ix].asDst, cd[ix].portSrc, cd[ix].portDst, cd[ix].occurences, cd[ix].volume, cd[ix].time, cd[ix].occurences)
if err != nil {
- logger.Println("Failed to execute statement")
+ slogger.Println("Failed to execute statement")
return err
}
}
@@ -223,26 +224,26 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
}
query, err := db.Prepare("SELECT ipb_src,ipb_dst,as_src,as_dst,port_src,port_dst,volume,time_added,occurences FROM " + cfg.CleanTable + " WHERE time_added < ?")
if err != nil {
- logger.Println("Failed to prepare query")
+ slogger.Println("Failed to prepare query")
return
}
rows, err := query.Query(t)
if err != nil {
- logger.Println("Failed to query for unprivitized rows")
+ slogger.Println("Failed to query for unprivitized rows")
return
}
defer rows.Close()
update, err := db.Prepare("UPDATE " + cfg.CleanTable + " SET occurences = ? , time_privatized = ? WHERE ipb_src = ? AND ipb_dst = ? AND as_src = ? AND as_dst = ? AND port_src = ? AND port_dst = ? AND volume = ? AND time_added = ? ")
if err != nil {
- logger.Println("Failed to prepare update")
+ slogger.Println("Failed to prepare update")
return
}
loc, err := time.LoadLocation(TIMEZONE)
if err != nil {
- logger.Println("Couldn't load timezone")
+ slogger.Println("Couldn't load timezone")
return
}
var cd cleanedData
@@ -250,12 +251,12 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
var tim []byte
err = rows.Scan(&cd.ipbSrc, &cd.ipbDst, &cd.asSrc, &cd.asDst, &cd.portSrc, &cd.portDst, &cd.volume, &tim, &cd.occurences)
if err != nil {
- logger.Println("Failed to scan row")
+ slogger.Println("Failed to scan row")
return
}
cd.time, err = time.ParseInLocation("2006-01-02 15:04:05", string(tim), loc)
if err != nil {
- logger.Println("Failed to parse timestamp")
+ slogger.Println("Failed to parse timestamp")
return
}
// Add differential privacy noise
@@ -264,7 +265,7 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
// Update the entry
_, err := update.Exec(cd.occurences, time.Now(), cd.ipbSrc, cd.ipbDst, cd.asSrc, cd.asDst, cd.portSrc, cd.portDst, cd.volume, cd.time)
if err != nil {
- logger.Println("Failed to update an entry:", err)
+ slogger.Println("Failed to update an entry:", err)
}
}
return
@@ -273,14 +274,14 @@ func privatizeCleaned(db *sql.DB, t time.Time, cfg *Config) (err error) {
func availableRows(tx *sql.Tx, cfg *Config, timeLimit time.Time) (numRows int, err error) {
stmt, err := tx.Prepare("SELECT COUNT(*) FROM " + cfg.RawTable + " WHERE stamp_inserted < ? ")
if err != nil {
- logger.Println("Could not prepare statement")
+ slogger.Println("Could not prepare statement")
return
}
row := stmt.QueryRow(timeLimit)
err = row.Scan(&numRows)
if err != nil {
- logger.Println("Failed to scan result")
+ slogger.Println("Failed to scan result")
}
return
}
diff --git a/stdin.go b/stdin.go
index 7fadf06..aa5ac66 100644
--- a/stdin.go
+++ b/stdin.go
@@ -10,11 +10,11 @@ import (
)
var (
- logger *log.Logger
+ stlogger *log.Logger
)
func init() {
- logger = log.New(os.Stdout, "[stdin]", log.LstdFlags)
+ stlogger = log.New(os.Stdout, "[stdin]", log.LstdFlags)
}
//Starts a process that reads from stdin and
@@ -22,12 +22,12 @@ func init() {
func readFromStdin() <-chan []byte {
out := make(chan []byte)
go func() {
- logger.Println("Now listening on stdin...")
+ stlogger.Println("Now listening on stdin...")
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
out <- []byte(scanner.Text())
}
- logger.Println("Finished listening to stdin!")
+ stlogger.Println("Finished listening to stdin!")
close(out)
}()
return out
@@ -37,20 +37,20 @@ func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
out := make(chan []RawData)
ival, err := cfg.getInterval()
if err != nil {
- logger.Println("Could not parse interval: ", err)
+ stlogger.Println("Could not parse interval: ", err)
}
timeBin := time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
- logger.Println("Now parsing data from stdin...")
+ stlogger.Println("Now parsing data from stdin...")
go func() {
rDat := make([]RawData, 0)
for line := range in {
if !strings.HasPrefix(string(line), "{") {
- logger.Println("Got message:", string(line))
+ stlogger.Println("Got message:", string(line))
//This should be a break in the output from pmacct
//so we deploy our collected data and set a new timeBin
ival, err := cfg.getInterval()
if err != nil {
- logger.Println("Could not parse interval: ", err)
+ stlogger.Println("Could not parse interval: ", err)
}
timeBin = time.Now().Add(ival / -2) //Make sure we are inside the correct timeBin
//Send the data if we have something to send
@@ -64,7 +64,7 @@ func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
var rd RawData
err := json.Unmarshal(line, &rd)
if err != nil {
- logger.Println("Failed in parsing json:", err)
+ stlogger.Println("Failed in parsing json:", err)
close(out)
return
}
@@ -76,7 +76,7 @@ func parseRawData(in <-chan []byte, cfg *Config) <-chan []RawData {
if len(rDat) > 0 {
out <- rDat
}
- logger.Println("Finished parsing data from stdin...")
+ stlogger.Println("Finished parsing data from stdin...")
close(out)
}()
return out
diff --git a/whois.go b/whois.go
index 751a5d1..7196091 100644
--- a/whois.go
+++ b/whois.go
@@ -17,11 +17,11 @@ const (
)
var (
- log *log.Logger
+ wlogger *log.Logger
)
func init() {
- logger = log.New(os.Stdout, "[ Whois ]", log.LstdFlags)
+ wlogger = log.New(os.Stdout, "[ Whois ]", log.LstdFlags)
}
/*
@@ -35,7 +35,7 @@ func main() {
*/
func findIPBlock(domains ...string) (pairs map[string]string, err error) {
- logger.Println("Querying for ip-blocks...")
+ wlogger.Println("Querying for ip-blocks...")
if len(domains) == 0 {
return
}
@@ -61,7 +61,7 @@ func findIPBlock(domains ...string) (pairs map[string]string, err error) {
ipb := strings.TrimSpace(content[2])
pairs[ipaddr] = ipb
}
- logger.Println("ip-blocks returned")
+ wlogger.Println("ip-blocks returned")
return
}