summaryrefslogtreecommitdiff
path: root/cleaner.go
diff options
context:
space:
mode:
Diffstat (limited to 'cleaner.go')
-rw-r--r--cleaner.go40
1 files changed, 28 insertions, 12 deletions
diff --git a/cleaner.go b/cleaner.go
index 21b6842..186cd44 100644
--- a/cleaner.go
+++ b/cleaner.go
@@ -9,7 +9,7 @@ import (
"time"
)
-func cleanData(cfg *Config) (rowsLeft int, err error) {
+func cleanFromDB(cfg *Config) (rowsLeft int, err error) {
db, err := sql.Open("mysql", cfg.DBUser+":"+cfg.DBPass+"@"+cfg.DBConn+"/"+cfg.DBName)
if err != nil {
@@ -39,7 +39,7 @@ func cleanData(cfg *Config) (rowsLeft int, err error) {
cDat, err := clean(rDat, cfg)
if err != nil {
- log.Println("Failed to clean data")
+ log.Println("Failed to clean data from db:", err)
return
}
@@ -76,6 +76,22 @@ func cleanData(cfg *Config) (rowsLeft int, err error) {
return
}
+func cleanFromStdin(rDatChan <-chan []RawData, cfg *Config) error {
+ for rDat := range rDatChan {
+ cDat, err := clean(rDat, cfg)
+ if err != nil {
+ log.Println("Failed to clean data from stdin:", err)
+ return err
+ }
+ err = insertCleanDataToDB(cfg, cDat)
+ if err != nil {
+ log.Println("Failed to insert clean data from stdin:", err)
+ return err
+ }
+ }
+ return nil
+}
+
func getTimespan(t time.Time, cfg *Config) (span time.Time, err error) {
loc, err := time.LoadLocation(TIMEZONE)
if err != nil {
@@ -111,12 +127,12 @@ func getTimespan(t time.Time, cfg *Config) (span time.Time, err error) {
return
}
-func clean(rDat []rawData, cfg *Config) (cDat []cleanedData, err error) {
+func clean(rDat []RawData, cfg *Config) (cDat []cleanedData, err error) {
// collect all ips so we can query for their ip blocks
ips := make(map[string]struct{})
for _, rd := range rDat {
- ips[rd.ipSrc] = struct{}{}
- ips[rd.ipDst] = struct{}{}
+ ips[rd.IpSrc] = struct{}{}
+ ips[rd.IpDst] = struct{}{}
}
var iplist []string
@@ -139,14 +155,14 @@ func clean(rDat []rawData, cfg *Config) (cDat []cleanedData, err error) {
}
cDat = append(cDat,
cleanedData{
- ipbSrc: pairs[rd.ipSrc],
- ipbDst: pairs[rd.ipDst],
- asSrc: rd.asSrc,
- asDst: rd.asDst,
- portSrc: rd.portSrc,
- portDst: rd.portDst,
+ ipbSrc: pairs[rd.IpSrc],
+ ipbDst: pairs[rd.IpDst],
+ asSrc: rd.AsSrc,
+ asDst: rd.AsDst,
+ portSrc: rd.PortSrc,
+ portDst: rd.PortDst,
occurences: 1,
- volume: rd.pktLenDist,
+ volume: rd.PktLenDist,
time: tim,
})
}