first commit

This commit is contained in:
Jörg Thalheim 2016-04-15 16:54:12 +02:00
commit 7cce75e8fb
7 changed files with 668 additions and 0 deletions

5
README.md Normal file
View File

@ -0,0 +1,5 @@
## TODO
- start cooldown timer, when service was actually scaled up
- do not freak out, when rancher service is not available
- (free up unneeded services, when removed from rancher)

220
handler/lib.go Normal file
View File

@ -0,0 +1,220 @@
package handler
import (
"acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/scaling"
"errors"
"fmt"
"github.com/influxdata/kapacitor/udf"
"github.com/influxdata/kapacitor/udf/agent"
"github.com/pk-rawat/gostr/src"
"log"
"strconv"
"time"
)
type Handler struct {
Id, When, Scale string
MinInstances int64
MaxInstances int64
Cooldown time.Duration
Simulate, Debug bool
kapacitorAgent *agent.Agent
scaleAgent *scaling.Agent
}
func New(kapacitorAgent *agent.Agent, scaleAgent *scaling.Agent) *Handler {
h := Handler{}
h.kapacitorAgent = kapacitorAgent
h.scaleAgent = scaleAgent
return &h
}
// Return the InfoResponse. Describing the properties of this UDF agent.
func (*Handler) Info() (*udf.InfoResponse, error) {
info := &udf.InfoResponse{
Wants: udf.EdgeType_STREAM,
Provides: udf.EdgeType_STREAM,
Options: map[string]*udf.OptionInfo{
"id": {ValueTypes: []udf.ValueType{udf.ValueType_STRING}},
"when": {ValueTypes: []udf.ValueType{udf.ValueType_STRING}},
"scale": {ValueTypes: []udf.ValueType{udf.ValueType_STRING}},
"min_instances": {ValueTypes: []udf.ValueType{udf.ValueType_INT}},
"max_instances": {ValueTypes: []udf.ValueType{udf.ValueType_INT}},
"cooldown": {ValueTypes: []udf.ValueType{udf.ValueType_STRING}},
"simulate": {ValueTypes: []udf.ValueType{udf.ValueType_BOOL}},
},
}
return info, nil
}
func (h *Handler) debug(format string, args ...interface{}) {
if h.Debug {
log.Printf(format, args...)
}
}
// Initialze the handler based of the provided options.
func (h *Handler) Init(r *udf.InitRequest) (*udf.InitResponse, error) {
init := &udf.InitResponse{Success: true, Error: ""}
h.Debug = false
h.Simulate = false
h.Cooldown = time.Minute
h.Scale = "current + 1"
h.MinInstances = 1
h.MaxInstances = 3
var cooldown string
for _, opt := range r.Options {
switch opt.Name {
case "when":
h.When = opt.Values[0].Value.(*udf.OptionValue_StringValue).StringValue
case "scale":
h.Scale = opt.Values[0].Value.(*udf.OptionValue_StringValue).StringValue
case "min_instances":
h.MinInstances = opt.Values[0].Value.(*udf.OptionValue_IntValue).IntValue
case "max_instances":
h.MaxInstances = opt.Values[0].Value.(*udf.OptionValue_IntValue).IntValue
case "cooldown":
cooldown = opt.Values[0].Value.(*udf.OptionValue_StringValue).StringValue
case "id":
h.Id = opt.Values[0].Value.(*udf.OptionValue_StringValue).StringValue
case "simulate":
h.Simulate = opt.Values[0].Value.(*udf.OptionValue_BoolValue).BoolValue
case "debug":
h.Debug = opt.Values[0].Value.(*udf.OptionValue_BoolValue).BoolValue
}
}
if h.When == "" {
init.Success = false
init.Error += " must supply `when` expression;"
}
if h.Scale == "" {
init.Success = false
init.Error += " must supply `scale` expression;"
}
if h.MinInstances < 0 {
init.Success = false
init.Error += " `MinInstances` must be greater equal 0;"
}
if h.MaxInstances < 0 {
init.Success = false
init.Error += " `MaxInstances` must be greater equal 0;"
}
if h.MaxInstances < h.MinInstances {
init.Success = false
init.Error += " `MaxInstances` must be greater equal minimum instances;"
}
var err error
h.Cooldown, err = time.ParseDuration(cooldown)
if err != nil {
init.Success = false
init.Error += fmt.Sprintf(" `cooldown` '%s' has an invalid format: %v", cooldown, err)
}
if h.Cooldown < 0 {
init.Success = false
init.Error += " `cooldown` must be greater equal 0s"
}
return init, nil
}
// Create a snapshot of the running state of the process.
func (o *Handler) Snaphost() (*udf.SnapshotResponse, error) {
return &udf.SnapshotResponse{}, nil
}
// Restore a previous snapshot.
func (o *Handler) Restore(req *udf.RestoreRequest) (*udf.RestoreResponse, error) {
return &udf.RestoreResponse{
Success: true,
}, nil
}
// Start working with the next batch
func (h *Handler) BeginBatch(begin *udf.BeginBatch) error {
return errors.New("batching not supported")
}
func (h *Handler) evaluateWhen(p *udf.Point) (bool, error) {
fields := make(map[string]interface{})
for k, v := range p.GetFieldsInt() {
fields[k] = v
}
for k, v := range p.GetFieldsDouble() {
fields[k] = v
}
res := gostr.Evaluate(h.When, fields).(string)
doScale, err := strconv.ParseBool(res)
if err != nil {
return false, fmt.Errorf("the expression `when` should evaluate to true or false, got %s", res)
}
h.debug("evaluate '%s' for '%v' -> should scale: %s", h.When, fields, doScale)
return doScale, nil
}
func (h *Handler) evaluateScale(s *scaling.Service) (int64, error) {
scaleContext := make(map[string]interface{})
scaleContext["current"] = s.CurrentInstances
res := gostr.Evaluate(h.Scale, scaleContext).(string)
amount, err := strconv.ParseInt(res, 10, 64)
if err != nil {
return -1, fmt.Errorf("the expression `scale` should evaluate to an integer value, got %s (tipp: there is an ROUND() method)", res)
}
if amount < h.MinInstances {
return h.MinInstances, nil
}
if amount > h.MaxInstances {
return h.MaxInstances, nil
}
return amount, nil
}
func (h *Handler) Point(p *udf.Point) error {
doScale, err := h.evaluateWhen(p)
if !doScale {
return err
}
service, err := h.scaleAgent.RequestScaling(h.Id, time.Unix(0, p.Time))
if err != nil {
return fmt.Errorf("Failed start scaling: %v", err)
}
if service == nil {
h.debug("skip scaling because of cooldown")
return nil
}
defer service.Unlock()
to, err := h.evaluateScale(service)
if err != nil {
return err
}
h.debug("attempt to scale service '%s' from %d to %d", h.Id, service.CurrentInstances, to)
if !h.Simulate {
err = h.scaleAgent.Scale(h.Id, to)
if err != nil {
return err
}
}
service.CurrentInstances = to
service.CooldownUntil = time.Now().Add(h.Cooldown)
p.FieldsDouble = nil
p.FieldsInt = map[string]int64{"scale": to}
p.FieldsString = nil
h.kapacitorAgent.Responses <- &udf.Response{
Message: &udf.Response_Point{
Point: p,
},
}
return nil
}
func (o *Handler) EndBatch(end *udf.EndBatch) error {
return nil
}
// Stop the handler gracefully.
func (o *Handler) Stop() {
close(o.kapacitorAgent.Responses)
}

193
handler/lib_test.go Normal file
View File

@ -0,0 +1,193 @@
package handler_test
import (
"acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/handler"
"acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/rancher"
"acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/scaling"
"bufio"
"flag"
"fmt"
"github.com/influxdata/kapacitor/udf"
"github.com/influxdata/kapacitor/udf/agent"
"github.com/jarcoal/httpmock"
"net/url"
"os"
"path/filepath"
"runtime"
"syscall"
"testing"
"time"
)
func ok(tb testing.TB, err error) {
if err != nil {
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d: unexpected error: %s\033[39m\n\n", filepath.Base(file), line, err.Error())
tb.FailNow()
}
}
var cwd_arg = flag.String("cwd", "", "set cwd")
func init() {
flag.Parse()
if *cwd_arg != "" {
if err := os.Chdir(*cwd_arg); err != nil {
fmt.Println("Chdir error:", err)
}
}
}
type server struct {
Fd *os.File
In *bufio.Reader
t *testing.T
responseBuf []byte
}
func (s *server) writeRequest(req *udf.Request) {
err := udf.WriteMessage(req, s.Fd)
ok(s.t, err)
}
func (s *server) writePoint(point *udf.Point) {
req := &udf.Request{
Message: &udf.Request_Point{point},
}
s.writeRequest(req)
}
func (s *server) ReadResponse() *udf.Response {
response := new(udf.Response)
ok(s.t, udf.ReadMessage(&s.responseBuf, s.In, response))
return response
}
func (s *server) Close() {
s.Fd.Close()
}
func fakeConnection(t *testing.T) (server, *os.File) {
fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0)
srvFile := os.NewFile(uintptr(fds[0]), "server")
clientFile := os.NewFile(uintptr(fds[1]), "client")
ok(t, err)
s := server{Fd: srvFile, t: t, In: bufio.NewReader(srvFile)}
return s, clientFile
}
const RANCHER_URL = "http://secret:accesskey@localhost:8080"
func setupAgent(t *testing.T, connection *os.File) *agent.Agent {
a := agent.New(connection, connection)
u, err := url.Parse(RANCHER_URL)
ok(t, err)
scaleAgent := *scaling.New(rancher.New(*u))
h := handler.New(a, &scaleAgent)
a.Handler = h
ok(t, a.Start())
go func() {
err = a.Wait()
if err != nil {
t.Fatal(err)
}
}()
return a
}
func strOpt(name string, value string) *udf.Option {
return &udf.Option{
Name: name,
Values: []*udf.OptionValue{
&udf.OptionValue{
udf.ValueType_STRING,
&udf.OptionValue_StringValue{value},
},
},
}
}
func intOpt(name string, value int64) *udf.Option {
return &udf.Option{
Name: name,
Values: []*udf.OptionValue{
&udf.OptionValue{
udf.ValueType_INT,
&udf.OptionValue_IntValue{value},
},
},
}
}
var options = []*udf.Option{
strOpt("id", "abc"),
strOpt("when", "cpu_usage > 8"),
strOpt("scale", "current + 1"),
intOpt("min_instances", 1),
intOpt("max_instances", 10),
strOpt("cooldown", "1m"),
&udf.Option{
Name: "simulate",
Values: []*udf.OptionValue{
&udf.OptionValue{
udf.ValueType_BOOL,
&udf.OptionValue_BoolValue{false},
},
},
},
}
var udfPoint = &udf.Point{
Time: time.Now().UnixNano(),
Name: "pointName",
Database: "database",
RetentionPolicy: "policy",
Group: "groupId",
Dimensions: []string{},
Tags: map[string]string{"tag1": "value1", "tag2": "value2"},
FieldsDouble: map[string]float64{"cpu_usage": 10.0},
FieldsInt: map[string]int64{"queue_size": 10},
FieldsString: map[string]string{},
}
func TestHandler(t *testing.T) {
server, client := fakeConnection(t)
defer server.Close()
defer client.Close()
httpmock.Activate()
defer httpmock.DeactivateAndReset()
httpmock.RegisterResponder("GET", RANCHER_URL+"/v1/services/abc",
httpmock.NewStringResponder(200, `{"id": "abc", "name": "chat", "scale": 1, "transitioning": "no"}`))
httpmock.RegisterResponder("PUT", RANCHER_URL+"/v1/services/abc",
httpmock.NewStringResponder(200, `{"id": "abc", "name": "chat", "scale": 2}`))
setupAgent(t, client)
server.writeRequest(&udf.Request{Message: &udf.Request_Info{
Info: &udf.InfoRequest{},
}})
server.ReadResponse()
server.writeRequest(&udf.Request{Message: &udf.Request_Init{
Init: &udf.InitRequest{Options: options},
}})
resp := server.ReadResponse()
if init := resp.Message.(*udf.Response_Init).Init; !init.Success {
t.Fatalf("failed to initialize agent: %s", init.Error)
}
server.writePoint(udfPoint)
resp = server.ReadResponse()
point, ok := resp.Message.(*udf.Response_Point)
if !ok {
t.Fatalf("expect to receive a point")
}
val := point.Point.GetFieldsInt()["scale"]
if val != 2 {
t.Fatalf("expected scale to be 2, got %d", val)
}
server.writePoint(udfPoint)
go t.Fatalf("it should not scale up because of cooldown, got '%v'", server.ReadResponse())
time.Sleep(time.Millisecond * 10) // no response, good!
}

87
main.go Normal file
View File

@ -0,0 +1,87 @@
package main
import (
"flag"
"fmt"
"log"
"net"
"net/url"
"os"
"syscall"
"acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/handler"
"acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/rancher"
"acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/scaling"
"github.com/influxdata/kapacitor/udf/agent"
)
var (
socketPath = flag.String("socket", "/tmp/kapacitor-scaling.sock", "Where to create the unix socket")
)
type acceptor struct {
count int64
scaleAgent scaling.Agent
}
// Create a new agent/handler for each new connection.
// Count and log each new connection and termination.
func (acc *acceptor) Accept(conn net.Conn) {
count := acc.count
acc.count++
a := agent.New(conn, conn)
h := handler.New(a, &acc.scaleAgent)
a.Handler = h
log.Println("Starting agent for connection", count)
a.Start()
go func() {
err := a.Wait()
if err != nil {
log.Fatal(err)
}
log.Printf("Agent for connection %d finished", count)
}()
}
func parseArgs() *url.URL {
flag.Parse()
if len(os.Args) < 2 {
fmt.Fprintf(os.Stderr, "USAGE: %s rancherurl\n")
fmt.Fprintf(os.Stderr, "rancher url is expected as first argument, for example: http://accesskey:secretkey@localhost:8080")
os.Exit(1)
}
url, err := url.Parse(os.Args[1])
if err != nil {
fmt.Fprintf(os.Stderr, "provided url '%s' is malformed: %v", os.Args[1], err)
os.Exit(1)
}
return url
}
func main() {
rancherUrl := parseArgs()
// Create unix socket
addr, err := net.ResolveUnixAddr("unix", *socketPath)
if err != nil {
log.Fatal(err)
}
l, err := net.ListenUnix("unix", addr)
if err != nil {
log.Fatal(err)
}
// Create server that listens on the socket
s := agent.NewServer(l, &acceptor{0, *scaling.New(rancher.New(*rancherUrl))})
// Setup signal handler to stop Server on various signals
s.StopOnSignals(os.Interrupt, syscall.SIGTERM)
log.Println("Server listening on", addr.String())
err = s.Serve()
if err != nil {
log.Fatal(err)
}
log.Println("Server stopped")
}

71
rancher/client.go Normal file
View File

@ -0,0 +1,71 @@
package rancher
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
)
type Client struct {
rootUrl url.URL
client http.Client
}
func New(rootUrl url.URL) Client {
return Client{rootUrl, http.Client{}}
}
func (c *Client) Get(path string, result interface{}) error {
return c.do("GET", path, nil, result)
}
func (c *Client) Put(path string, data, result interface{}) error {
var body *bytes.Buffer
if data != nil {
b, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("error marshaling body: {}", err)
}
body = bytes.NewBuffer(b)
}
return c.do("PUT", path, body, result)
}
func (c *Client) do(method, path string, body io.Reader, result interface{}) error {
url := c.rootUrl
url.Path += path
req, err := http.NewRequest(method, url.String(), body)
if err != nil {
return err
}
user := c.rootUrl.User
if user == nil {
return fmt.Errorf("user and password must be set in url")
}
password, isSet := user.Password()
if !isSet {
return fmt.Errorf("password must be set in url")
}
req.SetBasicAuth(user.Username(), password)
resp, err := c.client.Do(req)
if err != nil {
return err
}
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("Expected to API to return 200, got %s: %s",
resp.Status,
respBody)
}
if result != nil {
return json.Unmarshal(respBody, result)
}
return nil
}

8
rancher/types.go Normal file
View File

@ -0,0 +1,8 @@
package rancher
type Service struct {
Id string `json:"id"`
Name string `json:"name"`
Scale int64 `json:"scale"`
Transitioning string `json:"transitioning"`
}

84
scaling/agent.go Normal file
View File

@ -0,0 +1,84 @@
package scaling
import (
"acos.alcatel-lucent.com/scmrepos/git/micro-analytics/kapacitor-scaling/rancher"
"fmt"
"sync"
"time"
)
type Service struct {
sync.RWMutex
CurrentInstances int64
Name, Id string
CooldownUntil time.Time
}
type Agent struct {
sync.RWMutex
serviceMap map[string]*Service
client rancher.Client
}
func New(client rancher.Client) *Agent {
a := &Agent{}
a.serviceMap = make(map[string]*Service)
a.client = client
return a
}
func (a *Agent) get(serviceId string) *Service {
a.RLock()
v, ok := a.serviceMap[serviceId]
if ok {
a.RUnlock()
return v
}
a.RUnlock()
a.Lock()
defer a.Unlock()
// FIXME memory leak because services are never freed up
v, ok = a.serviceMap[serviceId]
if !ok {
a.serviceMap[serviceId] = &Service{Id: serviceId}
}
return a.serviceMap[serviceId]
}
// caller must call unlock service!
func (a *Agent) RequestScaling(serviceId string, eventTime time.Time) (*Service, error) {
s := a.get(serviceId)
s.RLock()
if s.CooldownUntil.Sub(eventTime) > 0 {
s.RUnlock()
return nil, nil
}
s.RUnlock()
s.Lock()
if s.CooldownUntil.Sub(eventTime) > 0 {
s.Unlock()
return nil, nil
}
u := "v1/services/" + serviceId
rancherService := rancher.Service{}
if err := a.client.Get(u, &rancherService); err != nil {
s.Unlock()
return nil, fmt.Errorf("Could not get scale count of service %s: %s", serviceId, err)
}
// Backoff strategy to save requests?
if rancherService.Transitioning != "no" {
s.Unlock()
return nil, nil
}
s.CurrentInstances = rancherService.Scale
s.Name = rancherService.Name
return s, nil
}
func (a *Agent) Scale(serviceId string, count int64) error {
data := map[string]int64{"scale": count}
if err := a.client.Put("v1/services/"+serviceId, data, nil); err != nil {
return fmt.Errorf("Failed to scale up service: %s", err)
}
return nil
}