From 7cce75e8fb76201b053cdd340d74eb215a3a728b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Fri, 15 Apr 2016 16:54:12 +0200 Subject: [PATCH] first commit --- README.md | 5 + handler/lib.go | 220 ++++++++++++++++++++++++++++++++++++++++++++ handler/lib_test.go | 193 ++++++++++++++++++++++++++++++++++++++ main.go | 87 ++++++++++++++++++ rancher/client.go | 71 ++++++++++++++ rancher/types.go | 8 ++ scaling/agent.go | 84 +++++++++++++++++ 7 files changed, 668 insertions(+) create mode 100644 README.md create mode 100644 handler/lib.go create mode 100644 handler/lib_test.go create mode 100644 main.go create mode 100644 rancher/client.go create mode 100644 rancher/types.go create mode 100644 scaling/agent.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..50e1b21 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +## TODO + +- start cooldown timer, when service was actually scaled up +- do not freak out, when rancher service is not available +- (free up unneeded services, when removed from rancher) diff --git a/handler/lib.go b/handler/lib.go new file mode 100644 index 0000000..84673a2 --- /dev/null +++ b/handler/lib.go @@ -0,0 +1,220 @@ +package handler + +import ( + "acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/scaling" + "errors" + "fmt" + "github.com/influxdata/kapacitor/udf" + "github.com/influxdata/kapacitor/udf/agent" + "github.com/pk-rawat/gostr/src" + "log" + "strconv" + "time" +) + +type Handler struct { + Id, When, Scale string + MinInstances int64 + MaxInstances int64 + Cooldown time.Duration + Simulate, Debug bool + kapacitorAgent *agent.Agent + scaleAgent *scaling.Agent +} + +func New(kapacitorAgent *agent.Agent, scaleAgent *scaling.Agent) *Handler { + h := Handler{} + h.kapacitorAgent = kapacitorAgent + h.scaleAgent = scaleAgent + return &h +} + +// Return the InfoResponse. Describing the properties of this UDF agent. +func (*Handler) Info() (*udf.InfoResponse, error) { + info := &udf.InfoResponse{ + Wants: udf.EdgeType_STREAM, + Provides: udf.EdgeType_STREAM, + Options: map[string]*udf.OptionInfo{ + "id": {ValueTypes: []udf.ValueType{udf.ValueType_STRING}}, + "when": {ValueTypes: []udf.ValueType{udf.ValueType_STRING}}, + "scale": {ValueTypes: []udf.ValueType{udf.ValueType_STRING}}, + "min_instances": {ValueTypes: []udf.ValueType{udf.ValueType_INT}}, + "max_instances": {ValueTypes: []udf.ValueType{udf.ValueType_INT}}, + "cooldown": {ValueTypes: []udf.ValueType{udf.ValueType_STRING}}, + "simulate": {ValueTypes: []udf.ValueType{udf.ValueType_BOOL}}, + }, + } + return info, nil +} + +func (h *Handler) debug(format string, args ...interface{}) { + if h.Debug { + log.Printf(format, args...) + } +} + +// Initialze the handler based of the provided options. +func (h *Handler) Init(r *udf.InitRequest) (*udf.InitResponse, error) { + init := &udf.InitResponse{Success: true, Error: ""} + h.Debug = false + h.Simulate = false + h.Cooldown = time.Minute + h.Scale = "current + 1" + h.MinInstances = 1 + h.MaxInstances = 3 + + var cooldown string + for _, opt := range r.Options { + switch opt.Name { + case "when": + h.When = opt.Values[0].Value.(*udf.OptionValue_StringValue).StringValue + case "scale": + h.Scale = opt.Values[0].Value.(*udf.OptionValue_StringValue).StringValue + case "min_instances": + h.MinInstances = opt.Values[0].Value.(*udf.OptionValue_IntValue).IntValue + case "max_instances": + h.MaxInstances = opt.Values[0].Value.(*udf.OptionValue_IntValue).IntValue + case "cooldown": + cooldown = opt.Values[0].Value.(*udf.OptionValue_StringValue).StringValue + case "id": + h.Id = opt.Values[0].Value.(*udf.OptionValue_StringValue).StringValue + case "simulate": + h.Simulate = opt.Values[0].Value.(*udf.OptionValue_BoolValue).BoolValue + case "debug": + h.Debug = opt.Values[0].Value.(*udf.OptionValue_BoolValue).BoolValue + } + } + + if h.When == "" { + init.Success = false + init.Error += " must supply `when` expression;" + } + if h.Scale == "" { + init.Success = false + init.Error += " must supply `scale` expression;" + } + if h.MinInstances < 0 { + init.Success = false + init.Error += " `MinInstances` must be greater equal 0;" + } + if h.MaxInstances < 0 { + init.Success = false + init.Error += " `MaxInstances` must be greater equal 0;" + } + + if h.MaxInstances < h.MinInstances { + init.Success = false + init.Error += " `MaxInstances` must be greater equal minimum instances;" + } + + var err error + h.Cooldown, err = time.ParseDuration(cooldown) + if err != nil { + init.Success = false + init.Error += fmt.Sprintf(" `cooldown` '%s' has an invalid format: %v", cooldown, err) + } + if h.Cooldown < 0 { + init.Success = false + init.Error += " `cooldown` must be greater equal 0s" + } + + return init, nil +} + +// Create a snapshot of the running state of the process. +func (o *Handler) Snaphost() (*udf.SnapshotResponse, error) { + return &udf.SnapshotResponse{}, nil +} + +// Restore a previous snapshot. +func (o *Handler) Restore(req *udf.RestoreRequest) (*udf.RestoreResponse, error) { + return &udf.RestoreResponse{ + Success: true, + }, nil +} + +// Start working with the next batch +func (h *Handler) BeginBatch(begin *udf.BeginBatch) error { + return errors.New("batching not supported") +} + +func (h *Handler) evaluateWhen(p *udf.Point) (bool, error) { + fields := make(map[string]interface{}) + for k, v := range p.GetFieldsInt() { + fields[k] = v + } + for k, v := range p.GetFieldsDouble() { + fields[k] = v + } + res := gostr.Evaluate(h.When, fields).(string) + doScale, err := strconv.ParseBool(res) + if err != nil { + return false, fmt.Errorf("the expression `when` should evaluate to true or false, got %s", res) + } + h.debug("evaluate '%s' for '%v' -> should scale: %s", h.When, fields, doScale) + return doScale, nil +} + +func (h *Handler) evaluateScale(s *scaling.Service) (int64, error) { + scaleContext := make(map[string]interface{}) + scaleContext["current"] = s.CurrentInstances + res := gostr.Evaluate(h.Scale, scaleContext).(string) + amount, err := strconv.ParseInt(res, 10, 64) + if err != nil { + return -1, fmt.Errorf("the expression `scale` should evaluate to an integer value, got %s (tipp: there is an ROUND() method)", res) + } + if amount < h.MinInstances { + return h.MinInstances, nil + } + if amount > h.MaxInstances { + return h.MaxInstances, nil + } + return amount, nil +} + +func (h *Handler) Point(p *udf.Point) error { + doScale, err := h.evaluateWhen(p) + if !doScale { + return err + } + service, err := h.scaleAgent.RequestScaling(h.Id, time.Unix(0, p.Time)) + if err != nil { + return fmt.Errorf("Failed start scaling: %v", err) + } + if service == nil { + h.debug("skip scaling because of cooldown") + return nil + } + defer service.Unlock() + to, err := h.evaluateScale(service) + if err != nil { + return err + } + h.debug("attempt to scale service '%s' from %d to %d", h.Id, service.CurrentInstances, to) + if !h.Simulate { + err = h.scaleAgent.Scale(h.Id, to) + if err != nil { + return err + } + } + service.CurrentInstances = to + service.CooldownUntil = time.Now().Add(h.Cooldown) + p.FieldsDouble = nil + p.FieldsInt = map[string]int64{"scale": to} + p.FieldsString = nil + h.kapacitorAgent.Responses <- &udf.Response{ + Message: &udf.Response_Point{ + Point: p, + }, + } + return nil +} + +func (o *Handler) EndBatch(end *udf.EndBatch) error { + return nil +} + +// Stop the handler gracefully. +func (o *Handler) Stop() { + close(o.kapacitorAgent.Responses) +} diff --git a/handler/lib_test.go b/handler/lib_test.go new file mode 100644 index 0000000..46ac0f9 --- /dev/null +++ b/handler/lib_test.go @@ -0,0 +1,193 @@ +package handler_test + +import ( + "acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/handler" + "acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/rancher" + "acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/scaling" + "bufio" + "flag" + "fmt" + "github.com/influxdata/kapacitor/udf" + "github.com/influxdata/kapacitor/udf/agent" + "github.com/jarcoal/httpmock" + "net/url" + "os" + "path/filepath" + "runtime" + "syscall" + "testing" + "time" +) + +func ok(tb testing.TB, err error) { + if err != nil { + _, file, line, _ := runtime.Caller(1) + fmt.Printf("\033[31m%s:%d: unexpected error: %s\033[39m\n\n", filepath.Base(file), line, err.Error()) + tb.FailNow() + } +} + +var cwd_arg = flag.String("cwd", "", "set cwd") + +func init() { + flag.Parse() + if *cwd_arg != "" { + if err := os.Chdir(*cwd_arg); err != nil { + fmt.Println("Chdir error:", err) + } + } +} + +type server struct { + Fd *os.File + In *bufio.Reader + t *testing.T + responseBuf []byte +} + +func (s *server) writeRequest(req *udf.Request) { + err := udf.WriteMessage(req, s.Fd) + ok(s.t, err) +} + +func (s *server) writePoint(point *udf.Point) { + req := &udf.Request{ + Message: &udf.Request_Point{point}, + } + s.writeRequest(req) +} + +func (s *server) ReadResponse() *udf.Response { + response := new(udf.Response) + ok(s.t, udf.ReadMessage(&s.responseBuf, s.In, response)) + return response +} + +func (s *server) Close() { + s.Fd.Close() +} + +func fakeConnection(t *testing.T) (server, *os.File) { + fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0) + srvFile := os.NewFile(uintptr(fds[0]), "server") + clientFile := os.NewFile(uintptr(fds[1]), "client") + ok(t, err) + s := server{Fd: srvFile, t: t, In: bufio.NewReader(srvFile)} + return s, clientFile +} + +const RANCHER_URL = "http://secret:accesskey@localhost:8080" + +func setupAgent(t *testing.T, connection *os.File) *agent.Agent { + a := agent.New(connection, connection) + u, err := url.Parse(RANCHER_URL) + ok(t, err) + + scaleAgent := *scaling.New(rancher.New(*u)) + h := handler.New(a, &scaleAgent) + a.Handler = h + ok(t, a.Start()) + go func() { + err = a.Wait() + if err != nil { + t.Fatal(err) + } + }() + return a +} + +func strOpt(name string, value string) *udf.Option { + return &udf.Option{ + Name: name, + Values: []*udf.OptionValue{ + &udf.OptionValue{ + udf.ValueType_STRING, + &udf.OptionValue_StringValue{value}, + }, + }, + } +} + +func intOpt(name string, value int64) *udf.Option { + return &udf.Option{ + Name: name, + Values: []*udf.OptionValue{ + &udf.OptionValue{ + udf.ValueType_INT, + &udf.OptionValue_IntValue{value}, + }, + }, + } +} + +var options = []*udf.Option{ + strOpt("id", "abc"), + strOpt("when", "cpu_usage > 8"), + strOpt("scale", "current + 1"), + intOpt("min_instances", 1), + intOpt("max_instances", 10), + strOpt("cooldown", "1m"), + &udf.Option{ + Name: "simulate", + Values: []*udf.OptionValue{ + &udf.OptionValue{ + udf.ValueType_BOOL, + &udf.OptionValue_BoolValue{false}, + }, + }, + }, +} + +var udfPoint = &udf.Point{ + Time: time.Now().UnixNano(), + Name: "pointName", + Database: "database", + RetentionPolicy: "policy", + Group: "groupId", + Dimensions: []string{}, + Tags: map[string]string{"tag1": "value1", "tag2": "value2"}, + FieldsDouble: map[string]float64{"cpu_usage": 10.0}, + FieldsInt: map[string]int64{"queue_size": 10}, + FieldsString: map[string]string{}, +} + +func TestHandler(t *testing.T) { + server, client := fakeConnection(t) + defer server.Close() + defer client.Close() + + httpmock.Activate() + defer httpmock.DeactivateAndReset() + httpmock.RegisterResponder("GET", RANCHER_URL+"/v1/services/abc", + httpmock.NewStringResponder(200, `{"id": "abc", "name": "chat", "scale": 1, "transitioning": "no"}`)) + httpmock.RegisterResponder("PUT", RANCHER_URL+"/v1/services/abc", + httpmock.NewStringResponder(200, `{"id": "abc", "name": "chat", "scale": 2}`)) + + setupAgent(t, client) + + server.writeRequest(&udf.Request{Message: &udf.Request_Info{ + Info: &udf.InfoRequest{}, + }}) + server.ReadResponse() + + server.writeRequest(&udf.Request{Message: &udf.Request_Init{ + Init: &udf.InitRequest{Options: options}, + }}) + resp := server.ReadResponse() + if init := resp.Message.(*udf.Response_Init).Init; !init.Success { + t.Fatalf("failed to initialize agent: %s", init.Error) + } + server.writePoint(udfPoint) + resp = server.ReadResponse() + point, ok := resp.Message.(*udf.Response_Point) + if !ok { + t.Fatalf("expect to receive a point") + } + val := point.Point.GetFieldsInt()["scale"] + if val != 2 { + t.Fatalf("expected scale to be 2, got %d", val) + } + server.writePoint(udfPoint) + go t.Fatalf("it should not scale up because of cooldown, got '%v'", server.ReadResponse()) + time.Sleep(time.Millisecond * 10) // no response, good! +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..d94c78f --- /dev/null +++ b/main.go @@ -0,0 +1,87 @@ +package main + +import ( + "flag" + "fmt" + "log" + "net" + "net/url" + "os" + "syscall" + + "acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/handler" + "acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/rancher" + "acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/scaling" + "github.com/influxdata/kapacitor/udf/agent" +) + +var ( + socketPath = flag.String("socket", "/tmp/kapacitor-scaling.sock", "Where to create the unix socket") +) + +type acceptor struct { + count int64 + scaleAgent scaling.Agent +} + +// Create a new agent/handler for each new connection. +// Count and log each new connection and termination. +func (acc *acceptor) Accept(conn net.Conn) { + count := acc.count + acc.count++ + a := agent.New(conn, conn) + h := handler.New(a, &acc.scaleAgent) + a.Handler = h + + log.Println("Starting agent for connection", count) + a.Start() + go func() { + err := a.Wait() + if err != nil { + log.Fatal(err) + } + log.Printf("Agent for connection %d finished", count) + }() +} + +func parseArgs() *url.URL { + flag.Parse() + if len(os.Args) < 2 { + fmt.Fprintf(os.Stderr, "USAGE: %s rancherurl\n") + fmt.Fprintf(os.Stderr, "rancher url is expected as first argument, for example: http://accesskey:secretkey@localhost:8080") + os.Exit(1) + } + url, err := url.Parse(os.Args[1]) + if err != nil { + fmt.Fprintf(os.Stderr, "provided url '%s' is malformed: %v", os.Args[1], err) + os.Exit(1) + } + return url +} + +func main() { + rancherUrl := parseArgs() + + // Create unix socket + addr, err := net.ResolveUnixAddr("unix", *socketPath) + if err != nil { + log.Fatal(err) + } + l, err := net.ListenUnix("unix", addr) + if err != nil { + log.Fatal(err) + } + + // Create server that listens on the socket + s := agent.NewServer(l, &acceptor{0, *scaling.New(rancher.New(*rancherUrl))}) + + // Setup signal handler to stop Server on various signals + s.StopOnSignals(os.Interrupt, syscall.SIGTERM) + + log.Println("Server listening on", addr.String()) + err = s.Serve() + if err != nil { + log.Fatal(err) + } + log.Println("Server stopped") +} diff --git a/rancher/client.go b/rancher/client.go new file mode 100644 index 0000000..56810e3 --- /dev/null +++ b/rancher/client.go @@ -0,0 +1,71 @@ +package rancher + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" +) + +type Client struct { + rootUrl url.URL + client http.Client +} + +func New(rootUrl url.URL) Client { + return Client{rootUrl, http.Client{}} +} + +func (c *Client) Get(path string, result interface{}) error { + return c.do("GET", path, nil, result) +} + +func (c *Client) Put(path string, data, result interface{}) error { + var body *bytes.Buffer + if data != nil { + b, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("error marshaling body: {}", err) + } + body = bytes.NewBuffer(b) + } + return c.do("PUT", path, body, result) +} + +func (c *Client) do(method, path string, body io.Reader, result interface{}) error { + url := c.rootUrl + url.Path += path + req, err := http.NewRequest(method, url.String(), body) + if err != nil { + return err + } + user := c.rootUrl.User + if user == nil { + return fmt.Errorf("user and password must be set in url") + } + password, isSet := user.Password() + if !isSet { + return fmt.Errorf("password must be set in url") + } + req.SetBasicAuth(user.Username(), password) + resp, err := c.client.Do(req) + if err != nil { + return err + } + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != 200 { + return fmt.Errorf("Expected to API to return 200, got %s: %s", + resp.Status, + respBody) + } + if result != nil { + return json.Unmarshal(respBody, result) + } + return nil +} diff --git a/rancher/types.go b/rancher/types.go new file mode 100644 index 0000000..76b5f33 --- /dev/null +++ b/rancher/types.go @@ -0,0 +1,8 @@ +package rancher + +type Service struct { + Id string `json:"id"` + Name string `json:"name"` + Scale int64 `json:"scale"` + Transitioning string `json:"transitioning"` +} diff --git a/scaling/agent.go b/scaling/agent.go new file mode 100644 index 0000000..1182bf6 --- /dev/null +++ b/scaling/agent.go @@ -0,0 +1,84 @@ +package scaling + +import ( + "acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/rancher" + "fmt" + "sync" + "time" +) + +type Service struct { + sync.RWMutex + CurrentInstances int64 + Name, Id string + CooldownUntil time.Time +} + +type Agent struct { + sync.RWMutex + serviceMap map[string]*Service + client rancher.Client +} + +func New(client rancher.Client) *Agent { + a := &Agent{} + a.serviceMap = make(map[string]*Service) + a.client = client + return a +} + +func (a *Agent) get(serviceId string) *Service { + a.RLock() + v, ok := a.serviceMap[serviceId] + if ok { + a.RUnlock() + return v + } + a.RUnlock() + a.Lock() + defer a.Unlock() + // FIXME memory leak because services are never freed up + v, ok = a.serviceMap[serviceId] + if !ok { + a.serviceMap[serviceId] = &Service{Id: serviceId} + } + return a.serviceMap[serviceId] +} + +// caller must call unlock service! +func (a *Agent) RequestScaling(serviceId string, eventTime time.Time) (*Service, error) { + s := a.get(serviceId) + s.RLock() + if s.CooldownUntil.Sub(eventTime) > 0 { + s.RUnlock() + return nil, nil + } + s.RUnlock() + s.Lock() + if s.CooldownUntil.Sub(eventTime) > 0 { + s.Unlock() + return nil, nil + } + u := "v1/services/" + serviceId + rancherService := rancher.Service{} + if err := a.client.Get(u, &rancherService); err != nil { + s.Unlock() + return nil, fmt.Errorf("Could not get scale count of service %s: %s", serviceId, err) + } + // Backoff strategy to save requests? + if rancherService.Transitioning != "no" { + s.Unlock() + return nil, nil + } + s.CurrentInstances = rancherService.Scale + s.Name = rancherService.Name + return s, nil +} + +func (a *Agent) Scale(serviceId string, count int64) error { + data := map[string]int64{"scale": count} + if err := a.client.Put("v1/services/"+serviceId, data, nil); err != nil { + return fmt.Errorf("Failed to scale up service: %s", err) + } + return nil +}