/*
This file is part of GNUnet.
Copyright (C) 2013 GNUnet e.V.
GNUnet is free software: you can redistribute it and/or modify it
under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
*/
/**
* @file conversation/gnunet-helper-audio-playback-gst.c
* @brief program to playback audio data to the speaker (GStreamer version)
* @author LRN
*/
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_protocols.h"
#include "conversation.h"
#include "gnunet_constants.h"
#include "gnunet_core_service.h"
#include
#include
#include
#include
#define DEBUG_READ_PURE_OGG 1
/**
* How much data to read in one go
*/
#define MAXLINE 4096
/**
* Max number of microseconds to buffer in audiosink.
* Default is 1000
*/
#define BUFFER_TIME 1000
/**
* Min number of microseconds to buffer in audiosink.
* Default is 1000
*/
#define LATENCY_TIME 1000
/**
* Tokenizer for the data we get from stdin
*/
struct GNUNET_MessageStreamTokenizer *stdin_mst;
/**
* Main pipeline.
*/
static GstElement *pipeline;
/**
* Appsrc instance into which we write data for the pipeline.
*/
static GstElement *source;
static GstElement *demuxer;
static GstElement *decoder;
static GstElement *conv;
static GstElement *resampler;
static GstElement *sink;
/**
* Set to 1 to break the reading loop
*/
static int abort_read;
static void
sink_child_added (GstChildProxy *child_proxy,
GObject *object,
gchar *name,
gpointer user_data)
{
if (GST_IS_AUDIO_BASE_SRC (object))
g_object_set (object,
"buffer-time", (gint64) BUFFER_TIME,
"latency-time", (gint64) LATENCY_TIME,
NULL);
}
static void
ogg_pad_added (GstElement *element,
GstPad *pad,
gpointer data)
{
GstPad *sinkpad;
GstElement *decoder = (GstElement *) data;
/* We can now link this pad with the opus-decoder sink pad */
sinkpad = gst_element_get_static_pad (decoder, "sink");
gst_pad_link (pad, sinkpad);
gst_element_link_many (decoder, conv, resampler, sink, NULL);
gst_object_unref (sinkpad);
}
static void
quit ()
{
if (NULL != source)
gst_app_src_end_of_stream (GST_APP_SRC (source));
if (NULL != pipeline)
gst_element_set_state (pipeline, GST_STATE_NULL);
abort_read = 1;
}
static gboolean
bus_call (GstBus *bus, GstMessage *msg, gpointer data)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Bus message\n");
switch (GST_MESSAGE_TYPE (msg))
{
case GST_MESSAGE_EOS:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"End of stream\n");
quit ();
break;
case GST_MESSAGE_ERROR:
{
gchar *debug;
GError *error;
gst_message_parse_error (msg, &error, &debug);
g_free (debug);
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Error: %s\n",
error->message);
g_error_free (error);
quit ();
break;
}
default:
break;
}
return TRUE;
}
static void
signalhandler (int s)
{
quit ();
}
static int
feed_buffer_to_gst (const char *audio, size_t b_len)
{
GstBuffer *b;
gchar *bufspace;
GstFlowReturn flow;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Feeding %u bytes to GStreamer\n",
(unsigned int) b_len);
bufspace = g_memdup (audio, b_len);
b = gst_buffer_new_wrapped (bufspace, b_len);
if (NULL == b)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Failed to wrap a buffer\n");
g_free (bufspace);
return GNUNET_SYSERR;
}
flow = gst_app_src_push_buffer (GST_APP_SRC (source), b);
/* They all return GNUNET_OK, because currently player stops when
* data stops coming. This might need to be changed for the player
* to also stop when pipeline breaks.
*/
switch (flow)
{
case GST_FLOW_OK:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Fed %u bytes to the pipeline\n",
(unsigned int) b_len);
break;
case GST_FLOW_FLUSHING:
/* buffer was dropped, because pipeline state is not PAUSED or PLAYING */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Dropped a buffer\n");
break;
case GST_FLOW_EOS:
/* end of stream */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"EOS\n");
break;
default:
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Unexpected push result\n");
break;
}
return GNUNET_OK;
}
/**
* Message callback
*
* @param msg message we received.
* @return #GNUNET_OK on success,
* #GNUNET_NO to stop further processing due to disconnect (no error)
* #GNUNET_SYSERR to stop further processing due to error
*/
static int
stdin_receiver (void *cls,
const struct GNUNET_MessageHeader *msg)
{
struct AudioMessage *audio;
size_t b_len;
switch (ntohs (msg->type))
{
case GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO:
audio = (struct AudioMessage *) msg;
b_len = ntohs (audio->header.size) - sizeof (struct AudioMessage);
feed_buffer_to_gst ((const char *) &audio[1], b_len);
break;
default:
break;
}
return GNUNET_OK;
}
int
main (int argc, char **argv)
{
GstBus *bus;
guint bus_watch_id;
uint64_t toff;
typedef void (*SignalHandlerPointer) (int);
SignalHandlerPointer inthandler, termhandler;
#ifdef DEBUG_READ_PURE_OGG
int read_pure_ogg = getenv ("GNUNET_READ_PURE_OGG") ? 1 : 0;
#endif
inthandler = signal (SIGINT,
&signalhandler);
termhandler = signal (SIGTERM,
&signalhandler);
#ifdef WINDOWS
setmode (0, _O_BINARY);
#endif
/* Initialisation */
gst_init (&argc, &argv);
GNUNET_assert (GNUNET_OK ==
GNUNET_log_setup ("gnunet-helper-audio-playback-gst",
"WARNING",
NULL));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Audio sink starts\n");
stdin_mst = GNUNET_MST_create (&stdin_receiver,
NULL);
/* Create gstreamer elements */
pipeline = gst_pipeline_new ("audio-player");
source = gst_element_factory_make ("appsrc", "audio-input");
demuxer = gst_element_factory_make ("oggdemux", "ogg-demuxer");
decoder = gst_element_factory_make ("opusdec", "opus-decoder");
conv = gst_element_factory_make ("audioconvert", "converter");
resampler= gst_element_factory_make ("audioresample", "resampler");
sink = gst_element_factory_make ("autoaudiosink", "audiosink");
if (!pipeline || !source || !conv || !resampler || !decoder || !demuxer || !sink)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"One element could not be created. Exiting.\n");
return -1;
}
g_signal_connect (sink,
"child-added",
G_CALLBACK (sink_child_added),
NULL);
g_signal_connect (demuxer,
"pad-added",
G_CALLBACK (ogg_pad_added),
decoder);
/* Keep a reference to it, we operate on it */
gst_object_ref (GST_OBJECT (source));
/* Set up the pipeline */
/* we feed appsrc as fast as possible, it just blocks when it's full */
g_object_set (G_OBJECT (source),
/* "format", GST_FORMAT_TIME,*/
"block", TRUE,
"is-live", TRUE,
NULL);
g_object_set (G_OBJECT (decoder),
/* "plc", FALSE,*/
/* "apply-gain", TRUE,*/
"use-inband-fec", TRUE,
NULL);
/* we add a message handler */
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
bus_watch_id = gst_bus_add_watch (bus, bus_call, pipeline);
gst_object_unref (bus);
/* we add all elements into the pipeline */
/* audio-input | ogg-demuxer | opus-decoder | converter | resampler | audiosink */
gst_bin_add_many (GST_BIN (pipeline), source, demuxer, decoder, conv,
resampler, sink, NULL);
/* we link the elements together */
gst_element_link_many (source, demuxer, NULL);
/* Set the pipeline to "playing" state*/
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Now playing\n");
gst_element_set_state (pipeline, GST_STATE_PLAYING);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running...\n");
/* Iterate */
toff = 0;
while (!abort_read)
{
char readbuf[MAXLINE];
int ret;
ret = read (0, readbuf, sizeof (readbuf));
if (0 > ret)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_("Read error from STDIN: %d %s\n"),
ret, strerror (errno));
break;
}
toff += ret;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received %d bytes of audio data (total: %llu)\n",
(int) ret,
(unsigned long long) toff);
if (0 == ret)
break;
#ifdef DEBUG_READ_PURE_OGG
if (read_pure_ogg)
{
feed_buffer_to_gst (readbuf, ret);
}
else
#endif
GNUNET_MST_from_buffer (stdin_mst,
readbuf,
ret,
GNUNET_NO,
GNUNET_NO);
}
GNUNET_MST_destroy (stdin_mst);
signal (SIGINT, inthandler);
signal (SIGINT, termhandler);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Returned, stopping playback\n");
quit ();
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Deleting pipeline\n");
gst_object_unref (GST_OBJECT (source));
source = NULL;
gst_object_unref (GST_OBJECT (pipeline));
pipeline = NULL;
g_source_remove (bus_watch_id);
return 0;
}