-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
136 lines (118 loc) · 7.36 KB
/
main.go
File metadata and controls
136 lines (118 loc) · 7.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package main
import (
"fmt"
"io/ioutil"
"os"
"time"
"github.com/cloudfoundry-community/go-cfclient"
"github.com/mcplusa/cloudfoundry-sumologic-nozzle/caching"
"github.com/mcplusa/cloudfoundry-sumologic-nozzle/eventQueue"
"github.com/mcplusa/cloudfoundry-sumologic-nozzle/eventRouting"
"github.com/mcplusa/cloudfoundry-sumologic-nozzle/events"
"github.com/mcplusa/cloudfoundry-sumologic-nozzle/firehoseclient"
"github.com/mcplusa/cloudfoundry-sumologic-nozzle/logging"
"github.com/mcplusa/cloudfoundry-sumologic-nozzle/sumoCFFirehose"
"gopkg.in/alecthomas/kingpin.v2"
)
var (
apiEndpoint = kingpin.Flag("api-endpoint", "URL to CF API Endpoint").Envar("API_ENDPOINT").String()
sumoEndpoint = kingpin.Flag("sumo-endpoint", "SUMO-ENDPOINT Complete URL for the endpoint, copied from the Sumo Logic HTTP Source configuration").Envar("SUMO_ENDPOINT").String()
subscriptionId = kingpin.Flag("subscription-id", "Cloud Foundry ID for the subscription.").Default("firehose").Envar("FIREHOSE_SUBSCRIPTION_ID").String()
user = kingpin.Flag("cloudfoundry-user", "Cloud Foundry User").Envar("CLOUDFOUNDRY_USER").String() //user created in CF, authorized to connect the firehose
password = kingpin.Flag("cloudfoundry-password", "Cloud Foundry Password").Envar("CLOUDFOUNDRY_PASSWORD").String()
keepAlive, errK = time.ParseDuration("25s") //default Error,ContainerMetric,HttpStart,HttpStop,HttpStartStop,LogMessage,ValueMetric,CounterEvent
wantedEvents = kingpin.Flag("events", fmt.Sprintf("Comma separated list of events you would like. Valid options are %s", eventRouting.GetListAuthorizedEventEvents())).Default("LogMessage").Envar("EVENTS").String()
boltDatabasePath = "event.db"
skipSSLValidation = kingpin.Flag("skip-ssl-validation", "Skip SSL validation (to allow things like self-signed certs). Do not set to true in production").Default("false").Envar("SKIP_SSL_VALIDATION").Bool()
tickerTime = kingpin.Flag("nozzle-polling-period", "How frequently this Nozzle polls the CF Firehose for data").Default("15s").Envar("NOZZLE_POLLING_PERIOD").Duration()
eventsBatchSize = kingpin.Flag("log-events-batch-size", "When number of messages in the buffer is equal to this flag, send those to Sumo Logic").Default("500").Envar("LOG_EVENTS_BATCH_SIZE").Int()
sumoPostMinimumDelay = kingpin.Flag("sumo-post-minimum-delay", "Minimum time between HTTP POST to Sumo Logic").Default("2000ms").Envar("SUMO_POST_MINIMUM_DELAY").Duration()
sumoCategory = kingpin.Flag("sumo-category", "This value overrides the default 'Source Category' associated with the configured Sumo Logic HTTP Source").Default("").Envar("SUMO_CATEGORY").String()
sumoName = kingpin.Flag("sumo-name", "This value overrides the default 'Source Name' associated with the configured Sumo Logic HTTP Source").Default("").Envar("SUMO_NAME").String()
sumoHost = kingpin.Flag("sumo-host", "This value overrides the default 'Source Host' associated with the configured Sumo Logic HTTP Source").Default("").Envar("SUMO_HOST").String()
verboseLogMessages = kingpin.Flag("verbose-log-messages", "Enable Verbose in 'LogMessage' Event. If this flag NOT present, the LogMessage will contain ONLY the fields: tiemstamp, cf_app_guid, Msg").Default("false").Envar("VERBOSE_LOG_MESSAGES").Bool()
customMetadata = kingpin.Flag("custom-metadata", "Use this flag for addingCustom Metadata to the JSON (key1:value1,key2:value2, etc...)").Default("").Envar("CUSTOM_METADATA").String()
includeOnlyMatchingFilter = kingpin.Flag("include-only-matching-filter", "Adds an 'Include only' filter to Events content (key1:value1,key2:value2, etc...)").Default("").Envar("INCLUDE_ONLY_MATCHING_FILTER").String()
excludeAlwaysMatchingFilter = kingpin.Flag("exclude-always-matching-filter", "Adds an 'Exclude always' filter to Events content (key1:value1,key2:value2, etc...)").Default("").Envar("EXCLUDE_ALWAYS_MATCHING_FILTER").String()
)
var (
version = "0.1.0"
)
func main() {
//logging init
logging.Init(ioutil.Discard, os.Stdout, os.Stdout, os.Stderr)
kingpin.Version(version)
kingpin.Parse()
logging.Info.Println("Set Configurations:")
logging.Info.Println("CF API Endpoint: " + *apiEndpoint)
logging.Info.Println("Sumo Logic Endpoint: " + *sumoEndpoint)
logging.Info.Println("Cloud Foundry Nozzle Subscription ID: " + *subscriptionId)
logging.Info.Println("Cloud Foundry User: " + *user)
logging.Info.Println("Events Selected: " + *wantedEvents)
logging.Info.Printf("Skip SSL Validation: %v", *skipSSLValidation)
logging.Info.Printf("Nozzle Polling Period: %v", *tickerTime)
logging.Info.Printf("Log Events Batch Size: [%d]", *eventsBatchSize)
logging.Info.Printf("Sumo Logic HTTP Post Minimum Delay: %v", *sumoPostMinimumDelay)
if *sumoName != "" {
logging.Info.Println("Sumo Logic Name: " + *sumoName)
}
if *sumoHost != "" {
logging.Info.Println("Sumo Logic Host: " + *sumoHost)
}
if *sumoCategory != "" {
logging.Info.Println("Sumo Logic Category: " + *sumoCategory)
}
logging.Info.Printf("Verbose Log Messages: %v\n", *verboseLogMessages)
logging.Info.Println("Starting Sumo Logic Nozzle " + version)
c := cfclient.Config{
ApiAddress: *apiEndpoint,
Username: *user,
Password: *password,
SkipSslValidation: *skipSSLValidation,
}
cfClient, errCfClient := cfclient.NewClient(&c)
if errCfClient != nil {
logging.Error.Fatal("Error setting up CF Client: ", errCfClient)
os.Exit(1)
}
//Creating Caching
var cachingClient caching.Caching
if caching.IsNeeded(*wantedEvents) {
cachingClient = caching.NewCachingBolt(cfClient, boltDatabasePath)
} else {
cachingClient = caching.NewCachingEmpty()
}
logging.Info.Println("Creating queue")
queue := eventQueue.NewQueue(make([]*events.Event, 100))
loggingClientSumo := sumoCFFirehose.NewSumoLogicAppender(*sumoEndpoint, 5000, &queue, *eventsBatchSize, *sumoPostMinimumDelay, *sumoCategory, *sumoName, *sumoHost, *verboseLogMessages, *customMetadata, *includeOnlyMatchingFilter, *excludeAlwaysMatchingFilter, version)
go loggingClientSumo.Start() //multi
logging.Info.Println("Creating Events")
events := eventRouting.NewEventRouting(cachingClient, &queue)
err := events.SetupEventRouting(*wantedEvents)
if err != nil {
logging.Error.Fatal("Error setting up event routing: ", err)
os.Exit(1)
}
// Parse extra fields from cmd call
cachingClient.CreateBucket()
//Let's Update the database the first time
logging.Info.Printf("Start filling app/space/org cache.\n")
apps := cachingClient.GetAllApp()
logging.Info.Printf("Done filling cache! Found [%d] Apps \n", len(apps))
logging.Info.Println("Apps found: ")
for i := 0; i < len(apps); i++ {
logging.Info.Printf("[%d] "+apps[i].Name+" GUID: "+apps[i].Guid, i+1)
}
cachingClient.PerformPoollingCaching(*tickerTime)
firehoseConfig := &firehoseclient.FirehoseConfig{
TrafficControllerURL: cfClient.Endpoint.DopplerEndpoint,
InsecureSSLSkipVerify: *skipSSLValidation,
IdleTimeoutSeconds: keepAlive,
FirehoseSubscriptionID: *subscriptionId,
}
logging.Info.Printf("Connecting to Firehose... \n")
firehoseClient := firehoseclient.NewFirehoseNozzle(cfClient, events, firehoseConfig)
errFirehose := firehoseClient.Start()
logging.Info.Printf("FirehoseClient Error: %v", errFirehose)
defer cachingClient.Close()
}