C++程序  |  324行  |  9.12 KB

/*
 * Copyright (C) 2011 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define LOG_TAG "common_time"
#include <utils/Log.h>

#include <fcntl.h>
#include <linux/in.h>
#include <linux/tcp.h>
#include <poll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <utils/Errors.h>
#include <utils/misc.h>

#include <common_time/local_clock.h>

#include "common_clock.h"
#include "diag_thread.h"

#define kMaxEvents 16
#define kListenPort 9876

static bool setNonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL);
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
        ALOGE("Failed to set socket (%d) to non-blocking mode (errno %d)",
             fd, errno);
        return false;
    }

    return true;
}

static bool setNodelay(int fd) {
    int tmp = 1;
    if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &tmp, sizeof(tmp)) < 0) {
        ALOGE("Failed to set socket (%d) to no-delay mode (errno %d)",
             fd, errno);
        return false;
    }

    return true;
}

namespace android {

DiagThread::DiagThread(CommonClock* common_clock, LocalClock* local_clock) {
    common_clock_ = common_clock;
    local_clock_ = local_clock;
    listen_fd_ = -1;
    data_fd_ = -1;
    kernel_logID_basis_known_ = false;
    discipline_log_ID_ = 0;
}

DiagThread::~DiagThread() {
}

status_t DiagThread::startWorkThread() {
    status_t res;
    stopWorkThread();
    res = run("Diag");

    if (res != OK)
        ALOGE("Failed to start work thread (res = %d)", res);

    return res;
}

void DiagThread::stopWorkThread() {
    status_t res;
    res = requestExitAndWait(); // block until thread exit.
    if (res != OK)
        ALOGE("Failed to stop work thread (res = %d)", res);
}

bool DiagThread::openListenSocket() {
    bool ret = false;
    int flags;
    cleanupListenSocket();

    if ((listen_fd_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
        ALOGE("Socket failed.");
        goto bailout;
    }

    // Set non-blocking operation
    if (!setNonblocking(listen_fd_))
        goto bailout;

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_port = htons(kListenPort);

    if (bind(listen_fd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        ALOGE("Bind failed.");
        goto bailout;
    }

    if (listen(listen_fd_, 1) < 0) {
        ALOGE("Listen failed.");
        goto bailout;
    }

    ret = true;
bailout:
    if (!ret)
        cleanupListenSocket();

    return ret;
}

void DiagThread::cleanupListenSocket() {
    if (listen_fd_ >= 0) {
        int res;

        struct linger l;
        l.l_onoff  = 1;
        l.l_linger = 0;

        setsockopt(listen_fd_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
        shutdown(listen_fd_, SHUT_RDWR);
        close(listen_fd_);
        listen_fd_ = -1;
    }
}

void DiagThread::cleanupDataSocket() {
    if (data_fd_ >= 0) {
        int res;

        struct linger l;
        l.l_onoff  = 1;
        l.l_linger = 0;

        setsockopt(data_fd_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
        shutdown(data_fd_, SHUT_RDWR);
        close(data_fd_);
        data_fd_ = -1;
    }
}

void DiagThread::resetLogIDs() {
    // Drain and discard all of the events from the kernel
    struct local_time_debug_event events[kMaxEvents];
    while(local_clock_->getDebugLog(events, kMaxEvents) > 0)
        ;

    {
        Mutex::Autolock lock(&discipline_log_lock_);
        discipline_log_.clear();
        discipline_log_ID_ = 0;
    }

    kernel_logID_basis_known_ = false;
}

void DiagThread::pushDisciplineEvent(int64_t observed_local_time,
                                     int64_t observed_common_time,
                                     int64_t nominal_common_time,
                                     int32_t total_correction,
                                     int32_t rtt) {
    Mutex::Autolock lock(&discipline_log_lock_);

    DisciplineEventRecord evt;

    evt.event_id = discipline_log_ID_++;

    evt.action_local_time = local_clock_->getLocalTime();
    common_clock_->localToCommon(evt.action_local_time,
            &evt.action_common_time);

    evt.observed_local_time  = observed_local_time;
    evt.observed_common_time = observed_common_time;
    evt.nominal_common_time  = nominal_common_time;
    evt.total_correction     = total_correction;
    evt.rtt                  = rtt;

    discipline_log_.push_back(evt);
    while (discipline_log_.size() > kMaxDisciplineLogSize)
        discipline_log_.erase(discipline_log_.begin());
}

bool DiagThread::threadLoop() {
    struct pollfd poll_fds[1];

    if (!openListenSocket()) {
        ALOGE("Failed to open listen socket");
        goto bailout;
    }

    while (!exitPending()) {
        memset(&poll_fds, 0, sizeof(poll_fds));

        if (data_fd_ < 0) {
            poll_fds[0].fd     = listen_fd_;
            poll_fds[0].events = POLLIN;
        } else {
            poll_fds[0].fd     = data_fd_;
            poll_fds[0].events = POLLRDHUP | POLLIN;
        }

        int poll_res = poll(poll_fds, NELEM(poll_fds), 50);
        if (poll_res < 0) {
            ALOGE("Fatal error (%d,%d) while waiting on events",
                 poll_res, errno);
            goto bailout;
        }

        if (exitPending())
            break;

        if (poll_fds[0].revents) {
            if (poll_fds[0].fd == listen_fd_) {
                data_fd_ = accept(listen_fd_, NULL, NULL);

                if (data_fd_ < 0) {
                    ALOGW("Failed accept on socket %d with err %d",
                         listen_fd_, errno);
                } else {
                    if (!setNonblocking(data_fd_))
                        cleanupDataSocket();
                    if (!setNodelay(data_fd_))
                        cleanupDataSocket();
                }
            } else
                if (poll_fds[0].fd == data_fd_) {
                    if (poll_fds[0].revents & POLLRDHUP) {
                        // Connection hung up; time to clean up.
                        cleanupDataSocket();
                    } else
                        if (poll_fds[0].revents & POLLIN) {
                            uint8_t cmd;
                            if (read(data_fd_, &cmd, sizeof(cmd)) > 0) {
                                switch(cmd) {
                                    case 'r':
                                    case 'R':
                                        resetLogIDs();
                                        break;
                                }
                            }
                        }
                }
        }

        struct local_time_debug_event events[kMaxEvents];
        int amt = local_clock_->getDebugLog(events, kMaxEvents);

        if (amt > 0) {
            for (int i = 0; i < amt; i++) {
                struct local_time_debug_event& e = events[i];

                if (!kernel_logID_basis_known_) {
                    kernel_logID_basis_ = e.local_timesync_event_id;
                    kernel_logID_basis_known_ = true;
                }

                char buf[1024];
                int64_t common_time;
                status_t res = common_clock_->localToCommon(e.local_time,
                                                            &common_time);
                snprintf(buf, sizeof(buf), "E,%lld,%lld,%lld,%d\n",
                         e.local_timesync_event_id - kernel_logID_basis_,
                         e.local_time,
                         common_time,
                         (OK == res) ? 1 : 0);
                buf[sizeof(buf) - 1] = 0;

                if (data_fd_ >= 0)
                    write(data_fd_, buf, strlen(buf));
            }
        }

        { // scope for autolock pattern
            Mutex::Autolock lock(&discipline_log_lock_);

            while (discipline_log_.size() > 0) {
                char buf[1024];
                DisciplineEventRecord& e = *discipline_log_.begin();
                snprintf(buf, sizeof(buf),
                         "D,%lld,%lld,%lld,%lld,%lld,%lld,%d,%d\n",
                         e.event_id,
                         e.action_local_time,
                         e.action_common_time,
                         e.observed_local_time,
                         e.observed_common_time,
                         e.nominal_common_time,
                         e.total_correction,
                         e.rtt);
                buf[sizeof(buf) - 1] = 0;

                if (data_fd_ >= 0)
                    write(data_fd_, buf, strlen(buf));

                discipline_log_.erase(discipline_log_.begin());
            }
        }
    }

bailout:
    cleanupDataSocket();
    cleanupListenSocket();
    return false;
}

}  // namespace android