普通文本  |  456行  |  13.35 KB

/*
 *  Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */

#include "data_log.h"

#include <assert.h>

#include <algorithm>
#include <list>

#include "critical_section_wrapper.h"
#include "event_wrapper.h"
#include "file_wrapper.h"
#include "rw_lock_wrapper.h"
#include "thread_wrapper.h"

namespace webrtc {

DataLogImpl::CritSectScopedPtr DataLogImpl::crit_sect_(
  CriticalSectionWrapper::CreateCriticalSection());

DataLogImpl* DataLogImpl::instance_ = NULL;

// A Row contains cells, which are indexed by the column names as std::string.
// The string index is treated in a case sensitive way.
class Row {
 public:
  Row();
  ~Row();

  // Inserts a Container into the cell of the column specified with
  // column_name.
  // column_name is treated in a case sensitive way.
  int InsertCell(const std::string& column_name,
                 const Container* value_container);

  // Converts the value at the column specified by column_name to a string
  // stored in value_string.
  // column_name is treated in a case sensitive way.
  void ToString(const std::string& column_name, std::string* value_string);

 private:
  // Collection of containers indexed by column name as std::string
  typedef std::map<std::string, const Container*> CellMap;

  CellMap                   cells_;
  CriticalSectionWrapper*   cells_lock_;
};

// A LogTable contains multiple rows, where only the latest row is active for
// editing. The rows are defined by the ColumnMap, which contains the name of
// each column and the length of the column (1 for one-value-columns and greater
// than 1 for multi-value-columns).
class LogTable {
 public:
  LogTable();
  ~LogTable();

  // Adds the column with name column_name to the table. The column will be a
  // multi-value-column if multi_value_length is greater than 1.
  // column_name is treated in a case sensitive way.
  int AddColumn(const std::string& column_name, int multi_value_length);

  // Buffers the current row while it is waiting to be written to file,
  // which is done by a call to Flush(). A new row is available when the
  // function returns
  void NextRow();

  // Inserts a Container into the cell of the column specified with
  // column_name.
  // column_name is treated in a case sensitive way.
  int InsertCell(const std::string& column_name,
                 const Container* value_container);

  // Creates a log file, named as specified in the string file_name, to
  // where the table will be written when calling Flush().
  int CreateLogFile(const std::string& file_name);

  // Write all complete rows to file.
  // May not be called by two threads simultaneously (doing so may result in
  // a race condition). Will be called by the file_writer_thread_ when that
  // thread is running.
  void Flush();

 private:
  // Collection of multi_value_lengths indexed by column name as std::string
  typedef std::map<std::string, int> ColumnMap;
  typedef std::list<Row*> RowList;

  ColumnMap               columns_;
  RowList                 rows_[2];
  RowList*                rows_history_;
  RowList*                rows_flush_;
  Row*                    current_row_;
  FileWrapper*            file_;
  bool                    write_header_;
  CriticalSectionWrapper* table_lock_;
};

Row::Row()
  : cells_(),
    cells_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
}

Row::~Row() {
  for (CellMap::iterator it = cells_.begin(); it != cells_.end();) {
    delete it->second;
    // For maps all iterators (except the erased) are valid after an erase
    cells_.erase(it++);
  }
  delete cells_lock_;
}

int Row::InsertCell(const std::string& column_name,
                    const Container* value_container) {
  CriticalSectionScoped synchronize(cells_lock_);
  assert(cells_.count(column_name) == 0);
  if (cells_.count(column_name) > 0)
    return -1;
  cells_[column_name] = value_container;
  return 0;
}

void Row::ToString(const std::string& column_name,
                   std::string* value_string) {
  CriticalSectionScoped synchronize(cells_lock_);
  const Container* container = cells_[column_name];
  if (container == NULL) {
    *value_string = "NaN,";
    return;
  }
  container->ToString(value_string);
}

LogTable::LogTable()
  : columns_(),
    rows_(),
    rows_history_(&rows_[0]),
    rows_flush_(&rows_[1]),
    current_row_(new Row),
    file_(FileWrapper::Create()),
    write_header_(true),
    table_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
}

LogTable::~LogTable() {
  for (RowList::iterator row_it = rows_history_->begin();
       row_it != rows_history_->end();) {
    delete *row_it;
    row_it = rows_history_->erase(row_it);
  }
  for (ColumnMap::iterator col_it = columns_.begin();
       col_it != columns_.end();) {
    // For maps all iterators (except the erased) are valid after an erase
    columns_.erase(col_it++);
  }
  if (file_ != NULL) {
    file_->Flush();
    file_->CloseFile();
    delete file_;
  }
  delete current_row_;
  delete table_lock_;
}

int LogTable::AddColumn(const std::string& column_name,
                        int multi_value_length) {
  assert(multi_value_length > 0);
  if (!write_header_) {
    // It's not allowed to add new columns after the header
    // has been written.
    assert(false);
    return -1;
  } else {
    CriticalSectionScoped synchronize(table_lock_);
    if (write_header_)
      columns_[column_name] = multi_value_length;
    else
      return -1;
  }
  return 0;
}

void LogTable::NextRow() {
  CriticalSectionScoped sync_rows(table_lock_);
  rows_history_->push_back(current_row_);
  current_row_ = new Row;
}

int LogTable::InsertCell(const std::string& column_name,
                         const Container* value_container) {
  CriticalSectionScoped synchronize(table_lock_);
  assert(columns_.count(column_name) > 0);
  if (columns_.count(column_name) == 0)
    return -1;
  return current_row_->InsertCell(column_name, value_container);
}

int LogTable::CreateLogFile(const std::string& file_name) {
  if (file_name.length() == 0)
    return -1;
  if (file_->Open())
    return -1;
  file_->OpenFile(file_name.c_str(),
                  false,  // Open with read/write permissions
                  false,  // Don't wraparound and write at the beginning when
                          // the file is full
                  true);  // Open as a text file
  if (file_ == NULL)
    return -1;
  return 0;
}

void LogTable::Flush() {
  ColumnMap::iterator column_it;
  bool commit_header = false;
  if (write_header_) {
    CriticalSectionScoped synchronize(table_lock_);
    if (write_header_) {
      commit_header = true;
      write_header_ = false;
    }
  }
  if (commit_header) {
    for (column_it = columns_.begin();
         column_it != columns_.end(); ++column_it) {
      if (column_it->second > 1) {
        file_->WriteText("%s[%u],", column_it->first.c_str(),
                         column_it->second);
        for (int i = 1; i < column_it->second; ++i)
          file_->WriteText(",");
      } else {
        file_->WriteText("%s,", column_it->first.c_str());
      }
    }
    if (columns_.size() > 0)
      file_->WriteText("\n");
  }

  // Swap the list used for flushing with the list containing the row history
  // and clear the history. We also create a local pointer to the new
  // list used for flushing to avoid race conditions if another thread
  // calls this function while we are writing.
  // We don't want to block the list while we're writing to file.
  {
    CriticalSectionScoped synchronize(table_lock_);
    RowList* tmp = rows_flush_;
    rows_flush_ = rows_history_;
    rows_history_ = tmp;
    rows_history_->clear();
  }

  // Write all complete rows to file and delete them
  for (RowList::iterator row_it = rows_flush_->begin();
       row_it != rows_flush_->end();) {
    for (column_it = columns_.begin();
         column_it != columns_.end(); ++column_it) {
      std::string row_string;
      (*row_it)->ToString(column_it->first, &row_string);
      file_->WriteText("%s", row_string.c_str());
    }
    if (columns_.size() > 0)
      file_->WriteText("\n");
    delete *row_it;
    row_it = rows_flush_->erase(row_it);
  }
}

int DataLog::CreateLog() {
  return DataLogImpl::CreateLog();
}

void DataLog::ReturnLog() {
  return DataLogImpl::ReturnLog();
}

std::string DataLog::Combine(const std::string& table_name, int table_id) {
  std::stringstream ss;
  std::string combined_id = table_name;
  std::string number_suffix;
  ss << "_" << table_id;
  ss >> number_suffix;
  combined_id += number_suffix;
  std::transform(combined_id.begin(), combined_id.end(), combined_id.begin(),
                 ::tolower);
  return combined_id;
}

int DataLog::AddTable(const std::string& table_name) {
  DataLogImpl* data_log = DataLogImpl::StaticInstance();
  if (data_log == NULL)
    return -1;
  return data_log->AddTable(table_name);
}

int DataLog::AddColumn(const std::string& table_name,
                       const std::string& column_name,
                       int multi_value_length) {
  DataLogImpl* data_log = DataLogImpl::StaticInstance();
  if (data_log == NULL)
    return -1;
  return data_log->DataLogImpl::StaticInstance()->AddColumn(table_name,
                                                            column_name,
                                                            multi_value_length);
}

int DataLog::NextRow(const std::string& table_name) {
  DataLogImpl* data_log = DataLogImpl::StaticInstance();
  if (data_log == NULL)
    return -1;
  return data_log->DataLogImpl::StaticInstance()->NextRow(table_name);
}

DataLogImpl::DataLogImpl()
  : counter_(1),
    tables_(),
    flush_event_(EventWrapper::Create()),
    file_writer_thread_(NULL),
    tables_lock_(RWLockWrapper::CreateRWLock()) {
}

DataLogImpl::~DataLogImpl() {
  StopThread();
  Flush();  // Write any remaining rows
  delete file_writer_thread_;
  delete flush_event_;
  for (TableMap::iterator it = tables_.begin(); it != tables_.end();) {
    delete static_cast<LogTable*>(it->second);
    // For maps all iterators (except the erased) are valid after an erase
    tables_.erase(it++);
  }
  delete tables_lock_;
}

int DataLogImpl::CreateLog() {
  CriticalSectionScoped synchronize(crit_sect_.get());
  if (instance_ == NULL) {
    instance_ = new DataLogImpl();
    return instance_->Init();
  } else {
    ++instance_->counter_;
  }
  return 0;
}

int DataLogImpl::Init() {
  file_writer_thread_ = ThreadWrapper::CreateThread(
                          DataLogImpl::Run,
                          instance_,
                          kHighestPriority,
                          "DataLog");
  if (file_writer_thread_ == NULL)
    return -1;
  unsigned int thread_id = 0;
  bool success = file_writer_thread_->Start(thread_id);
  if (!success)
    return -1;
  return 0;
}

DataLogImpl* DataLogImpl::StaticInstance() {
  return instance_;
}

void DataLogImpl::ReturnLog() {
  CriticalSectionScoped synchronize(crit_sect_.get());
  if (instance_ && instance_->counter_ > 1) {
    --instance_->counter_;
    return;
  }
  delete instance_;
  instance_ = NULL;
}

int DataLogImpl::AddTable(const std::string& table_name) {
  WriteLockScoped synchronize(*tables_lock_);
  // Make sure we don't add a table which already exists
  if (tables_.count(table_name) > 0)
    return -1;
  tables_[table_name] = new LogTable();
  if (tables_[table_name]->CreateLogFile(table_name + ".txt") == -1)
    return -1;
  return 0;
}

int DataLogImpl::AddColumn(const std::string& table_name,
                           const std::string& column_name,
                           int multi_value_length) {
  ReadLockScoped synchronize(*tables_lock_);
  if (tables_.count(table_name) == 0)
    return -1;
  return tables_[table_name]->AddColumn(column_name, multi_value_length);
}

int DataLogImpl::InsertCell(const std::string& table_name,
                            const std::string& column_name,
                            const Container* value_container) {
  ReadLockScoped synchronize(*tables_lock_);
  assert(tables_.count(table_name) > 0);
  if (tables_.count(table_name) == 0)
    return -1;
  return tables_[table_name]->InsertCell(column_name, value_container);
}

int DataLogImpl::NextRow(const std::string& table_name) {
  ReadLockScoped synchronize(*tables_lock_);
  if (tables_.count(table_name) == 0)
    return -1;
  tables_[table_name]->NextRow();
  if (file_writer_thread_ == NULL) {
    // Write every row to file as they get complete.
    tables_[table_name]->Flush();
  } else {
    // Signal a complete row
    flush_event_->Set();
  }
  return 0;
}

void DataLogImpl::Flush() {
  ReadLockScoped synchronize(*tables_lock_);
  for (TableMap::iterator it = tables_.begin(); it != tables_.end(); ++it) {
    it->second->Flush();
  }
}

bool DataLogImpl::Run(void* obj) {
  static_cast<DataLogImpl*>(obj)->Process();
  return true;
}

void DataLogImpl::Process() {
  // Wait for a row to be complete
  flush_event_->Wait(WEBRTC_EVENT_INFINITE);
  Flush();
}

void DataLogImpl::StopThread() {
  if (file_writer_thread_ != NULL) {
    file_writer_thread_->SetNotAlive();
    flush_event_->Set();
    // Call Stop() repeatedly, waiting for the Flush() call in Process() to
    // finish.
    while (!file_writer_thread_->Stop()) continue;
  }
}

}  // namespace webrtc