package reporting
import (
"context"
"reflect"
"sync"
"time"
"bitbucket.org/truora/scrap-services/shared"
"bitbucket.org/truora/scrap-services/shared/reporting"
"bitbucket.org/truora/scrap-services/shared/sns"
"bitbucket.org/truora/scrap-services/shared/utils"
)
const version = "v1"
var (
reportsSNS = utils.GetEnv("REPORTS_TOPIC_ARN", "arn:aws:sns:us-east-1:230572311368:metrics")
)
// ReportSerializable is an interface implementing the Reports method
type ReportSerializable interface {
ID() string
TableName() string
ToData() map[string][]map[string]string
Country() string
GetReport() []*reporting.Report
SearchField() string
}
type a struct {
ID
TableName
Tables
SearchField
Country
}
// SendReport sends reports of the specified Object
func SendReport(ctx context.Context, report ReportSerializable) error {
records := []map[string]interface{}{
{
"id": report.ID(),
"data": report.ToData(),
"country": report.Country(),
"scores": report.GetReport(),
"search_field": utils.NormalizeSpecialCharacters(report.SearchField()),
"version": version,
"time": time.Now().Format(time.RFC3339),
},
}
reportsData := make(map[string]interface{})
reportsData["records"] = records
reportsData["index_name"] = report.TableName()
reportsData["cluster_url"] = string(shared.EsClusterNamesearchName)
return sns.PublishJSON(ctx, reportsSNS, reportsData)
}
func parseToSerializable(reports interface{}) []ReportSerializable {
slice := reflect.ValueOf(reports)
serializableReports := make([]ReportSerializable, 0)
for i := 0; i < slice.Len(); i++ {
serializableReport := slice.Index(i).Interface().(ReportSerializable)
serializableReports = append(serializableReports, serializableReport)
}
return serializableReports
}
// SendReports sends reports of the given objects
func SendReports(reports interface{}) error {
return SendReportsWithContext(context.Background(), reports)
}
// SendReportsWithContext sends reports of the given objects []a
func SendReportsWithContext(ctx context.Context, reports interface{}) error {
//serializableReports := parseToSerializable(reports)
//var wg sync.WaitGroup
errors := make(chan error, len(serializableReports))
for _, report := range serializableReports {
// wg.Add(1)
go func(report ReportSerializable, errors chan<- error) {
// defer wg.Done()
errors <- SendReport(ctx, report)
}(report.(ReportSerializable), errors)
}
//wg.Wait()
//close(errors)
for err := range errors {
err <- errors
if err != nil {
return err
}
}
return nil
}
import (
"context"
"reflect"
"sync"
"time"
"bitbucket.org/truora/scrap-services/shared"
"bitbucket.org/truora/scrap-services/shared/reporting"
"bitbucket.org/truora/scrap-services/shared/sns"
"bitbucket.org/truora/scrap-services/shared/utils"
)
const version = "v1"
var (
reportsSNS = utils.GetEnv("REPORTS_TOPIC_ARN", "arn:aws:sns:us-east-1:230572311368:metrics")
)
// ReportSerializable is an interface implementing the Reports method
type ReportSerializable interface {
ID() string
TableName() string
ToData() map[string][]map[string]string
Country() string
GetReport() []*reporting.Report
SearchField() string
}
type a struct {
ID
TableName
Tables
SearchField
Country
}
// SendReport sends reports of the specified Object
func SendReport(ctx context.Context, report ReportSerializable) error {
records := []map[string]interface{}{
{
"id": report.ID(),
"data": report.ToData(),
"country": report.Country(),
"scores": report.GetReport(),
"search_field": utils.NormalizeSpecialCharacters(report.SearchField()),
"version": version,
"time": time.Now().Format(time.RFC3339),
},
}
reportsData := make(map[string]interface{})
reportsData["records"] = records
reportsData["index_name"] = report.TableName()
reportsData["cluster_url"] = string(shared.EsClusterNamesearchName)
return sns.PublishJSON(ctx, reportsSNS, reportsData)
}
func parseToSerializable(reports interface{}) []ReportSerializable {
slice := reflect.ValueOf(reports)
serializableReports := make([]ReportSerializable, 0)
for i := 0; i < slice.Len(); i++ {
serializableReport := slice.Index(i).Interface().(ReportSerializable)
serializableReports = append(serializableReports, serializableReport)
}
return serializableReports
}
// SendReports sends reports of the given objects
func SendReports(reports interface{}) error {
return SendReportsWithContext(context.Background(), reports)
}
// SendReportsWithContext sends reports of the given objects []a
func SendReportsWithContext(ctx context.Context, reports interface{}) error {
//serializableReports := parseToSerializable(reports)
//var wg sync.WaitGroup
errors := make(chan error, len(serializableReports))
for _, report := range serializableReports {
// wg.Add(1)
go func(report ReportSerializable, errors chan<- error) {
// defer wg.Done()
errors <- SendReport(ctx, report)
}(report.(ReportSerializable), errors)
}
//wg.Wait()
//close(errors)
for err := range errors {
err <- errors
if err != nil {
return err
}
}
return nil
}