C++程序  |  430行  |  11.66 KB

/******************************************************************************
 *
 *  Copyright (C) 2014 Google, Inc.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at:
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 ******************************************************************************/

#define LOG_TAG "bt_core_counter"

#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <sys/eventfd.h>

#include "osi/include/allocator.h"
#include "osi/include/atomic.h"
#include "btcore/include/counter.h"
#include "osi/include/hash_map.h"
#include "osi/include/list.h"
#include "btcore/include/module.h"
#include "osi/include/osi.h"
#include "osi/include/hash_functions.h"
#include "osi/include/log.h"
#include "osi/include/socket.h"
#include "osi/include/thread.h"

typedef int (*handler_t)(socket_t * socket);

typedef struct counter_t {
  atomic_s64_t val;
} counter_t;

typedef struct hash_element_t {
  const char *key;
  counter_t *val;
} hash_element_t;

typedef struct counter_data_cb_t {
  counter_iter_cb counter_iter_cb;
  void *user_context;
} counter_data_cb_t;

typedef struct {
  socket_t *socket;
  uint8_t buffer[256];
  size_t buffer_size;
} client_t;

typedef struct {
  const char *name;
  const char *help;
  handler_t handler;
} command_t;

// Counter core
static hash_map_t *hash_map_counter_;
static pthread_mutex_t hash_map_lock_;
static int counter_cnt_;

// Counter port access
static socket_t *listen_socket_;
static thread_t *thread_;
static list_t *clients_;

static void accept_ready(socket_t *socket, void *context);
static void read_ready(socket_t *socket, void *context);
static void client_free(void *ptr);
static const command_t *find_command(const char *name);
static void output(socket_t *socket, const char* format, ...);

// Commands
static int help(socket_t *socket);
static int show(socket_t *socket);
static int set(socket_t *socket);
static int quit(socket_t *socket);

static const command_t commands[] = {
  { "help", "<command> - show help text for <command>", help},
  { "quit", "<command> - Quit and exit", quit},
  { "set", "<counter> - Set something", set},
  { "show", "<counter> - Show counters", show},
};

static counter_t *counter_new_(counter_data_t initial_val);
static void counter_free_(counter_t *counter);

static hash_element_t *hash_element_new_(void);
// NOTE: The parameter datatype is void in order to satisfy the hash
// data free function signature
static void hash_element_free_(void *data);

static struct counter_t *name_to_counter_(const char *name);
static bool counter_foreach_cb_(hash_map_entry_t *hash_map_entry, void *context);

static bool counter_socket_open(void);
static void counter_socket_close(void);

static const int COUNTER_NUM_BUCKETS = 53;

// TODO(cmanton) Friendly interface, but may remove for automation
const char *WELCOME = "Welcome to counters\n";
const char *PROMPT = "\n> ";
const char *GOODBYE = "Quitting... Bye !!";

// TODO(cmanton) Develop port strategy; or multiplex all bt across single port
static const port_t LISTEN_PORT = 8879;

static future_t *counter_init(void) {
  assert(hash_map_counter_ == NULL);
  pthread_mutex_init(&hash_map_lock_, NULL);
  hash_map_counter_ = hash_map_new(COUNTER_NUM_BUCKETS, hash_function_string,
      NULL, hash_element_free_, NULL);
  if (hash_map_counter_ == NULL) {
    LOG_ERROR("%s unable to allocate resources", __func__);
    return future_new_immediate(FUTURE_FAIL);
  }

  if (!counter_socket_open()) {
    LOG_ERROR("%s unable to open counter port", __func__);
    return future_new_immediate(FUTURE_FAIL);
  }
  return future_new_immediate(FUTURE_SUCCESS);
}

static future_t *counter_clean_up(void) {
  counter_socket_close();
  hash_map_free(hash_map_counter_);
  pthread_mutex_destroy(&hash_map_lock_);
  hash_map_counter_ = NULL;
  return future_new_immediate(FUTURE_SUCCESS);
}

module_t counter_module = {
  .name = COUNTER_MODULE,
  .init = counter_init,
  .start_up = NULL,
  .shut_down = NULL,
  .clean_up = counter_clean_up,
  .dependencies = {NULL},
};

void counter_set(const char *name, counter_data_t val) {
  assert(name != NULL);
  counter_t *counter = name_to_counter_(name);
  if (counter)
    atomic_store_s64(&counter->val, val);
}

void counter_add(const char *name, counter_data_t val) {
  assert(name != NULL);
  counter_t *counter = name_to_counter_(name);
  if (counter) {
    if (val == 1)
      atomic_inc_prefix_s64(&counter->val);
    else
      atomic_add_s64(&counter->val, val);
  }
}

bool counter_foreach(counter_iter_cb cb, void *context) {
  assert(cb != NULL);
  counter_data_cb_t counter_cb_data = {
    cb,
    context
  };

  hash_map_foreach(hash_map_counter_, counter_foreach_cb_, &counter_cb_data);
  return true;
}

static counter_t *counter_new_(counter_data_t initial_val) {
  counter_t *counter = (counter_t *)osi_calloc(sizeof(counter_t));
  if (!counter) {
    return NULL;
  }
  atomic_store_s64(&counter->val, initial_val);
  return counter;
}

static void counter_free_(counter_t *counter) {
  osi_free(counter);
}

static hash_element_t *hash_element_new_(void) {
  return (hash_element_t *)osi_calloc(sizeof(hash_element_t));
}

static void hash_element_free_(void *data) {
  hash_element_t *hash_element = (hash_element_t *)data;
  // We don't own the key
  counter_free_(hash_element->val);
  osi_free(hash_element);
}

// Returns a counter from the |hash_map_counter_|.  Creates
// a new one if not found and inserts into |hash_map_counter_|.
// Returns NULL upon memory allocation failure.
static counter_t *name_to_counter_(const char *name) {
  assert(hash_map_counter_ != NULL);
  if (hash_map_has_key(hash_map_counter_, name))
    return (counter_t *)hash_map_get(hash_map_counter_, name);

  pthread_mutex_lock(&hash_map_lock_);
  // On the uncommon path double check to make sure that another thread has
  // not already created this counter
  counter_t *counter = (counter_t *)hash_map_get(hash_map_counter_, name);
  if (counter)
    goto exit;

  counter = counter_new_(0);
  if (!counter) {
    LOG_ERROR("%s unable to create new counter name:%s", __func__, name);
    goto exit;
  }

  hash_element_t *element = hash_element_new_();
  if (!element) {
    LOG_ERROR("%s unable to create counter element name:%s", __func__, name);
    counter_free_(counter);
    counter = NULL;
    goto exit;
  }

  element->key = name;
  element->val = counter;
  if (!hash_map_set(hash_map_counter_, name, counter)) {
    LOG_ERROR("%s unable to set new counter into hash map name:%s", __func__, name);
    hash_element_free_(element);
    counter_free_(counter);
    counter = NULL;
  }

 exit:;
  pthread_mutex_unlock(&hash_map_lock_);
  return counter;
}

static bool counter_foreach_cb_(hash_map_entry_t *hash_map_entry, void *context) {
  assert(hash_map_entry != NULL);
  const char *key = (const char *)hash_map_entry->key;
  counter_data_t data = *(counter_data_t *)hash_map_entry->data;
  counter_data_cb_t *counter_cb_data = (counter_data_cb_t *)context;
  counter_cb_data->counter_iter_cb(key, data, counter_cb_data->user_context);
  return true;
}

static bool counter_socket_open(void) {
#if (!defined(BT_NET_DEBUG) || (BT_NET_DEBUG != TRUE))
  return true;          // Disable using network sockets for security reasons
#endif

  assert(listen_socket_ == NULL);
  assert(thread_ == NULL);
  assert(clients_ == NULL);

  clients_ = list_new(client_free);
  if (!clients_) {
    LOG_ERROR("%s unable to create counter clients list", __func__);
    goto error;
  }

  thread_ = thread_new("counter_socket");
  if (!thread_) {
    LOG_ERROR("%s unable to create counter thread", __func__);
    goto error;
  }

  listen_socket_ = socket_new();
  if (!listen_socket_) {
    LOG_ERROR("%s unable to create listen socket", __func__);
    goto error;
  }

  if (!socket_listen(listen_socket_, LISTEN_PORT)) {
    LOG_ERROR("%s unable to setup listen socket", __func__);
    goto error;
  }

  LOG_INFO("%s opened counter server socket", __func__);
  socket_register(listen_socket_, thread_get_reactor(thread_), NULL, accept_ready, NULL);
  return true;

error:;
  counter_socket_close();
  return false;
}

static void counter_socket_close(void) {
#if (!defined(BT_NET_DEBUG) || (BT_NET_DEBUG != TRUE))
  return;               // Disable using network sockets for security reasons
#endif

  socket_free(listen_socket_);
  thread_free(thread_);
  list_free(clients_);

  listen_socket_ = NULL;
  thread_ = NULL;
  clients_ = NULL;

  LOG_INFO("%s closed counter server socket", __func__);
}

static bool monitor_counter_iter_cb(const char *name, counter_data_t val, void *context) {
  socket_t *socket = (socket_t *)context;
  output(socket, "counter:%s val:%lld\n", name, val);
  return true;
}

static void client_free(void *ptr) {
  if (!ptr)
    return;

  client_t *client = (client_t *)ptr;
  socket_free(client->socket);
  osi_free(client);
}

static void accept_ready(socket_t *socket, UNUSED_ATTR void *context) {
  assert(socket != NULL);
  assert(socket == listen_socket_);

  LOG_INFO("%s accepted OSI monitor socket", __func__);
  socket = socket_accept(socket);
  if (!socket)
    return;

  client_t *client = (client_t *)osi_calloc(sizeof(client_t));
  if (!client) {
    LOG_ERROR("%s unable to allocate memory for client", __func__);
    socket_free(socket);
    return;
  }

  client->socket = socket;

  if (!list_append(clients_, client)) {
    LOG_ERROR("%s unable to add client to list", __func__);
    client_free(client);
    return;
  }

  socket_register(socket, thread_get_reactor(thread_), client, read_ready, NULL);

  output(socket, WELCOME);
  output(socket, PROMPT);
}

static void read_ready(socket_t *socket, void *context) {
  assert(socket != NULL);

  client_t *client = (client_t *)context;

  ssize_t ret = socket_read(socket, client->buffer + client->buffer_size, sizeof(client->buffer) - client->buffer_size);
  if (ret == 0 || (ret == -1 && ret != EWOULDBLOCK && ret != EAGAIN)) {
    list_remove(clients_, client);
    return;
  }

  // Replace newline with end of string termination
  // TODO(cmanton) Need proper semantics
  for (size_t i = ret - 1; i > 0; --i) {
    if (client->buffer[i] < 16)
      *(client->buffer + i) = 0;
    else
      break;
  }

  const command_t *command = find_command((const char *)client->buffer);
  if (!command) {
    output(socket, "unable to find command %s\n", client->buffer);
  } else {
    int rc = command->handler(socket);
    if (rc == 1) {
      output(socket, GOODBYE);
      socket_free(socket);
      return;
    }
  }
  output(socket, PROMPT);
}

static void output(socket_t *socket, const char* format, ...) {
  char dest[4096];
  va_list argptr;
  va_start(argptr, format);
  vsprintf(dest, format, argptr);
  va_end(argptr);
  socket_write(socket, dest, strlen(dest));
}

static int help(UNUSED_ATTR socket_t *socket) {
  output(socket, "help command unimplemented\n");
  return 0;
}

static int quit(UNUSED_ATTR socket_t *socket) {
  return 1;
}

static int set(UNUSED_ATTR socket_t *socket) {
  output(socket, "set command unimplemented\n");
  return 0;
}

static int show(socket_t *socket) {
  output(socket, "counter count registered:%d\n", counter_cnt_);
  counter_foreach(monitor_counter_iter_cb, (void *)socket);
  return 0;
}

static const command_t *find_command(const char *name) {
  for  (size_t i = 0; i < ARRAY_SIZE(commands); ++i)
    if (!strcmp(commands[i].name, name))
      return &commands[i];
  return NULL;
}