diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala deleted file mode 100644 index 4373be443e67d..0000000000000 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.flume.sink - -import java.util.concurrent.{CountDownLatch, Executors} -import java.util.concurrent.atomic.AtomicLong - -import scala.collection.mutable - -import org.apache.flume.Channel -import org.apache.commons.lang.RandomStringUtils -import com.google.common.util.concurrent.ThreadFactoryBuilder - -/** - * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process - * requests. Each getEvents, ack and nack call is forwarded to an instance of this class. - * @param threads Number of threads to use to process requests. - * @param channel The channel that the sink pulls events from - * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark - * is rolled back. - */ -// Flume forces transactions to be thread-local. So each transaction *must* be committed, or -// rolled back from the thread it was originally created in. So each getEvents call from Spark -// creates a TransactionProcessor which runs in a new thread, in which the transaction is created -// and events are pulled off the channel. Once the events are sent to spark, -// that thread is blocked and the TransactionProcessor is saved in a map, -// until an ACK or NACK comes back or the transaction times out (after the specified timeout). -// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then -// unblocked, at which point the transaction is committed or rolled back. - -private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, - val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging { - val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Spark Sink Processor Thread - %d").build())) - // Protected by `sequenceNumberToProcessor` - private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]() - // This sink will not persist sequence numbers and reuses them if it gets restarted. - // So it is possible to commit a transaction which may have been meant for the sink before the - // restart. - // Since the new txn may not have the same sequence number we must guard against accidentally - // committing a new transaction. To reduce the probability of that happening a random string is - // prepended to the sequence number. Does not change for life of sink - private val seqBase = RandomStringUtils.randomAlphanumeric(8) - private val seqCounter = new AtomicLong(0) - - // Protected by `sequenceNumberToProcessor` - private var stopped = false - - @volatile private var isTest = false - private var testLatch: CountDownLatch = null - - /** - * Returns a bunch of events to Spark over Avro RPC. - * @param n Maximum number of events to return in a batch - * @return [[EventBatch]] instance that has a sequence number and an array of at most n events - */ - override def getEventBatch(n: Int): EventBatch = { - logDebug("Got getEventBatch call from Spark.") - val sequenceNumber = seqBase + seqCounter.incrementAndGet() - createProcessor(sequenceNumber, n) match { - case Some(processor) => - transactionExecutorOpt.foreach(_.submit(processor)) - // Wait until a batch is available - will be an error if error message is non-empty - val batch = processor.getEventBatch - if (SparkSinkUtils.isErrorBatch(batch)) { - // Remove the processor if it is an error batch since no ACK is sent. - removeAndGetProcessor(sequenceNumber) - logWarning("Received an error batch - no events were received from channel! ") - } - batch - case None => - new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList()) - } - } - - private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = { - sequenceNumberToProcessor.synchronized { - if (!stopped) { - val processor = new TransactionProcessor( - channel, seq, n, transactionTimeout, backOffInterval, this) - sequenceNumberToProcessor.put(seq, processor) - if (isTest) { - processor.countDownWhenBatchAcked(testLatch) - } - Some(processor) - } else { - None - } - } - } - - /** - * Called by Spark to indicate successful commit of a batch - * @param sequenceNumber The sequence number of the event batch that was successful - */ - override def ack(sequenceNumber: CharSequence): Void = { - logDebug("Received Ack for batch with sequence number: " + sequenceNumber) - completeTransaction(sequenceNumber, success = true) - null - } - - /** - * Called by Spark to indicate failed commit of a batch - * @param sequenceNumber The sequence number of the event batch that failed - * @return - */ - override def nack(sequenceNumber: CharSequence): Void = { - completeTransaction(sequenceNumber, success = false) - logInfo("Spark failed to commit transaction. Will reattempt events.") - null - } - - /** - * Helper method to commit or rollback a transaction. - * @param sequenceNumber The sequence number of the batch that was completed - * @param success Whether the batch was successful or not. - */ - private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) { - removeAndGetProcessor(sequenceNumber).foreach(processor => { - processor.batchProcessed(success) - }) - } - - /** - * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak. - * @param sequenceNumber - * @return An `Option` of the transaction processor for the corresponding batch. Note that this - * instance is no longer tracked and the caller is responsible for that txn processor. - */ - private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): - Option[TransactionProcessor] = { - sequenceNumberToProcessor.synchronized { - sequenceNumberToProcessor.remove(sequenceNumber.toString) - } - } - - private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) { - testLatch = latch - isTest = true - } - - /** - * Shuts down the executor used to process transactions. - */ - def shutdown() { - logInfo("Shutting down Spark Avro Callback Handler") - sequenceNumberToProcessor.synchronized { - stopped = true - sequenceNumberToProcessor.values.foreach(_.shutdown()) - } - transactionExecutorOpt.foreach(_.shutdownNow()) - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f7ad2efc9544e..32bdad012de70 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -429,7 +429,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with if !relation.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && hive.conf.parquetUseDataSourceApi && - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => + relation.tableDescVirtual.getSerdeClassName.toLowerCase.contains("parquet") => val parquetRelation = convertToParquetRelation(relation) val attributedRewrites = relation.output.zip(parquetRelation.output) (relation, parquetRelation, attributedRewrites) @@ -438,7 +438,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with case p @ PhysicalOperation(_, _, relation: MetastoreRelation) if hive.convertMetastoreParquet && hive.conf.parquetUseDataSourceApi && - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => + relation.tableDescVirtual.getSerdeClassName.toLowerCase.contains("parquet") => val parquetRelation = convertToParquetRelation(relation) val attributedRewrites = relation.output.zip(parquetRelation.output) (relation, parquetRelation, attributedRewrites) @@ -692,7 +692,7 @@ private[hive] case class MetastoreRelation } } - val tableDesc = HiveShim.getTableDesc( + val tableDesc = HiveShim.getTableDescVirtual( Class.forName( hiveQlTable.getSerializationLib, true, @@ -706,6 +706,21 @@ private[hive] case class MetastoreRelation hiveQlTable.getMetadata ) + val tableDescVirtual = HiveShim.getTableDesc( + Class.forName( + hiveQlTable.getSerializationLib, + true, + Utils.getContextOrSparkClassLoader).asInstanceOf[Class[Deserializer]], + hiveQlTable.getInputFormatClass, + // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because + // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to + // substitute some output formats, e.g. substituting SequenceFileOutputFormat to + // HiveSequenceFileOutputFormat. + hiveQlTable.getOutputFormatClass, + hiveQlTable.getMetadata + ) + + implicit class SchemaAttribute(f: FieldSchema) { def toAttribute = AttributeReference( f.getName, @@ -721,7 +736,14 @@ private[hive] case class MetastoreRelation /** Non-partitionKey attributes */ val attributes = hiveQlTable.getCols.map(_.toAttribute) - val output = attributes ++ partitionKeys + var virtualAttributes = scala.collection.immutable.List[FieldSchema]() + val inputFileNmae = new FieldSchema("INPUT__FILE__NAME", "string","") + val offset = new FieldSchema("BLOCK__OFFSET__INSIDE__FILE", "int","") + virtualAttributes = offset :: inputFileNmae :: virtualAttributes + val c = virtualAttributes.map(_.toAttribute) + val output = attributes ++ c ++ partitionKeys + val attributes1 = attributes ++ c + //val output = attributes ++ partitionKeys /** An attribute map that can be used to lookup original attributes based on expression id. */ val attributeMap = AttributeMap(output.map(o => (o,o))) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index effaa5a443512..6761ad368cea0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -35,7 +35,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.DateUtils - +import org.apache.spark.sql.hive.VirtualHadoopRDD /** * A trait for subclasses that handle table scans. */ @@ -107,7 +107,13 @@ class HadoopTableReader( // logDebug("Table input: %s".format(tablePath)) val ifc = hiveTable.getInputFormatClass .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) + + val hadoopRDD =if (attributes.seq.toString.contains("BLOCK__OFFSET__INSIDE__FILE") || + attributes.seq.toString.contains("INPUT__FILE__NAME")){ + createHadoopRddVirtual(tableDesc, inputPathStr, ifc) + } else{ + createHadoopRdd(tableDesc, inputPathStr, ifc) + } val attrsWithIndex = attributes.zipWithIndex val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) @@ -240,6 +246,29 @@ class HadoopTableReader( // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + private def createHadoopRddVirtual( + tableDesc: TableDesc, + path: String, + inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { + + val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ + + val rdd = new VirtualHadoopRDD( + sc.sparkContext, + _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + Some(initializeJobConfFunc), + inputFormatClass, + classOf[Writable], + classOf[Writable], + _minSplitsPerRDD, + tableDesc) + + // Only take the value (skip the key) because Hive works only with values. + rdd.map(_._2) + } + + } private[hive] object HadoopTableReader extends HiveInspectors { @@ -273,9 +302,11 @@ private[hive] object HadoopTableReader extends HiveInspectors { nonPartitionKeyAttrs: Seq[(Attribute, Int)], mutableRow: MutableRow): Iterator[Row] = { + val inputfile = "INPUT__FILE__NAME|BLOCK__OFFSET__INSIDE__FILE|input__file__name".r val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector] - val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) => - soi.getStructFieldRef(attr.name) -> ordinal + val (fieldRefs, fieldOrdinals) = + nonPartitionKeyAttrs.map { case (attr, ordinal) if (!attr.name.contains("input__file__name")) => + soi.getStructFieldRef(attr.name) -> ordinal }.unzip // Builds specific unwrappers ahead of time according to object inspector types to avoid pattern diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/VirtualHadoopRDD.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/VirtualHadoopRDD.scala new file mode 100644 index 0000000000000..945c4d34f428b --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/VirtualHadoopRDD.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.EOFException +import java.text.SimpleDateFormat +import java.util.Date + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapred._ +import org.apache.hadoop.mapred.lib.CombineFileSplit +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.DataReadMethod +import org.apache.spark.rdd._ +import org.apache.spark.util.NextIterator +import org.apache.spark.util.Utils + +/** + * Created by ocquery on 3/13/15. + */ +class VirtualHadoopRDD[K, V]( + sc: SparkContext, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + initLocalJobConfFuncOpt: Option[JobConf => Unit], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minPartitions: Int, + tableDesc:TableDesc) + extends HadoopRDD(sc, + broadcastedConf, + initLocalJobConfFuncOpt, + inputFormatClass, + keyClass,valueClass,minPartitions) with Logging { + + val fieldDelim = tableDesc.getProperties().getProperty("field.delim","\001") + val lineDelim = tableDesc.getProperties().getProperty("line.delim") + + private val createTime = new Date() + + override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { + val iter = new NextIterator[(K, V)] { + + val split = theSplit.asInstanceOf[HadoopPartition] + logInfo("Input split: " + split.inputSplit) + val jobConf = getJobConf() + + val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop) + + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // creating RecordReader, because RecordReader's constructor might read some bytes + val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { + split.inputSplit.value match { + case _: FileSplit | _: CombineFileSplit => + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + case _ => None + } + } + inputMetrics.setBytesReadCallback(bytesReadCallback) + + var reader: RecordReader[K, V] = null + val inputFormat = getInputFormat(jobConf) + HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), + context.stageId, theSplit.index, context.attemptNumber, jobConf) + reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) + + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener{ context => closeIfNeeded() } + val key: K = reader.createKey() + val value: V = reader.createValue() + val pathValue = split.inputSplit.value.toString.split(":") + val path = pathValue(0) + ":" + pathValue(1) + + override def getNext() = { + try{ + finished = !reader.next(key, value) + + } catch { + case eof: EOFException => + finished = true + } + if (!finished) { + inputMetrics.incRecordsRead(1) + } + + val t = value.toString + fieldDelim + key.toString + fieldDelim + path + val t2 = TextValue.set(t) + (key, TextValue.get.asInstanceOf[V]) + + } + + override def close() { + try { + reader.close() + if (bytesReadCallback.isDefined) { + inputMetrics.updateBytesRead() + } else if (split.inputSplit.value.isInstanceOf[FileSplit] || + split.inputSplit.value.isInstanceOf[CombineFileSplit]) { + // If we can't get the bytes read from the FS stats, fall back to the split size, + // which may be inaccurate. + try { + inputMetrics.incBytesRead(split.inputSplit.value.getLength) + } catch { + case e: java.io.IOException => + logWarning("Unable to get input size to set InputMetrics for task", e) + } + } + } catch { + case e: Exception => { + if (!Utils.inShutdown()) { + logWarning("Exception in RecordReader.close()", e) + } + } + } + } + } + new InterruptibleIterator[(K, V)](context, iter) + } + +} + +object TextValue{ + val word = new Text() + + def set(value:String){ + word.set(value) + } + + def get(): Text= { + word + } +} + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/VirtualHadoopRDD1.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/VirtualHadoopRDD1.scala new file mode 100644 index 0000000000000..e9f558c3b0ea0 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/VirtualHadoopRDD1.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.text.SimpleDateFormat +import java.util.Date +import java.io.EOFException + +import org.apache.spark.rdd.{HadoopPartition, HadoopRDD} + +import scala.collection.immutable.Map +import scala.reflect.ClassTag +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.mapred.FileSplit +import org.apache.hadoop.mapred.InputFormat +import org.apache.hadoop.mapred.InputSplit +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.RecordReader +import org.apache.hadoop.mapred.Reporter +import org.apache.hadoop.mapred.JobID +import org.apache.hadoop.mapred.TaskAttemptID +import org.apache.hadoop.mapred.TaskID +import org.apache.hadoop.util.ReflectionUtils + +import org.apache.spark._ +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.{DataReadMethod, InputMetrics} +import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD +import org.apache.spark.util.{NextIterator, Utils} +import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation} +import org.apache.hadoop.hive.ql.plan.TableDesc + +/** + * Created by ocquery on 3/13/15. + */ +class VirtualHadoopRDD[K, V]( + sc: SparkContext, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + initLocalJobConfFuncOpt: Option[JobConf => Unit], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minPartitions: Int, + tableDesc:TableDesc) + extends HadoopRDD(sc, + broadcastedConf, + initLocalJobConfFuncOpt, + inputFormatClass, + keyClass,valueClass,minPartitions) with Logging { + + val fieldDelim = tableDesc.getProperties().getProperty("field.delim","\001") + val lineDelim = tableDesc.getProperties().getProperty("line.delim") + + private val createTime = new Date() + + + override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { + val iter = new NextIterator[(K, V)] { + + val split = theSplit.asInstanceOf[HadoopPartition] + logInfo("Input split: " + split.inputSplit) + val jobConf = getJobConf() + + val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // creating RecordReader, because RecordReader's constructor might read some bytes + val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) { + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback( + split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf) + } else { + None + } + if (bytesReadCallback.isDefined) { + context.taskMetrics.inputMetrics = Some(inputMetrics) + } + + var reader: RecordReader[K, V] = null + val inputFormat = getInputFormat(jobConf) + HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), + context.stageId, theSplit.index, context.attemptId.toInt, jobConf) + reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) + + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener{ context => closeIfNeeded() } + val key: K = reader.createKey() + val value: V = reader.createValue() + val pathValue = split.inputSplit.value.toString.split(":") + val path = pathValue(0) + ":" + pathValue(1) + + var recordsSinceMetricsUpdate = 0 + + override def getNext() = { + try { + finished = !reader.next(key, value) + } catch { + case eof: EOFException => + finished = true + } + + // Update bytes read metric every few records + if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES + && bytesReadCallback.isDefined) { + recordsSinceMetricsUpdate = 0 + val bytesReadFn = bytesReadCallback.get + inputMetrics.bytesRead = bytesReadFn() + } else { + recordsSinceMetricsUpdate += 1 + } + val t = value.toString + fieldDelim + key.toString + fieldDelim + path + val t2 = TextValue.set(t) + (key, TextValue.get.asInstanceOf[V]) + } + + override def close() { + try { + reader.close() + if (bytesReadCallback.isDefined) { + val bytesReadFn = bytesReadCallback.get + inputMetrics.bytesRead = bytesReadFn() + } else if (split.inputSplit.value.isInstanceOf[FileSplit]) { + // If we can't get the bytes read from the FS stats, fall back to the split size, + // which may be inaccurate. + try { + inputMetrics.bytesRead = split.inputSplit.value.getLength + context.taskMetrics.inputMetrics = Some(inputMetrics) + } catch { + case e: java.io.IOException => + logWarning("Unable to get input size to set InputMetrics for task", e) + } + } + } catch { + case e: Exception => { + if (!Utils.inShutdown()) { + logWarning("Exception in RecordReader.close()", e) + } + } + } + } + } + new InterruptibleIterator[(K, V)](context, iter) + } + +} + +object TextValue{ + val word = new Text() + + def set(value:String){ + word.set(value) + } + + def get(): Text= { + word + } +} + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 91af35f0965c0..48c68697dbbff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -128,7 +128,7 @@ case class InsertIntoHiveTable( protected[sql] lazy val sideEffectResult: Seq[Row] = { // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. - val tableDesc = table.tableDesc + val tableDesc = table.tableDescVirtual val tableLocation = table.hiveQlTable.getDataLocation val tmpLocation = HiveShim.getExternalTmpPath(hiveContext, tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala index f9fcbdae15745..5e77faa61cc78 100644 --- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala +++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala @@ -169,6 +169,25 @@ private[hive] object HiveShim { new TableDesc(inputFormatClass, outputFormatClass, properties) } + def getTableDescVirtual( + serdeClass: Class[_ <: Deserializer], + inputFormatClass: Class[_ <: InputFormat[_, _]], + outputFormatClass: Class[_], + properties: Properties) = { + val colums = properties.getProperty("columns",null) + ",BLOCK__OFFSET__INSIDE__FILE,INPUT__FILE__NAME" + properties.setProperty("columns", colums ) + + val columsTypes = properties.getProperty("columns.types",null) + ":int:string" + properties.setProperty("columns.types", columsTypes ) + + val serializationDdl = + properties.getProperty("serialization.ddl",null).getBytes + + "int BLOCK__OFFSET__INSIDE__FILE,string INPUT__FILE__NAME" + + properties.setProperty("serialization.ddl", serializationDdl ) + + new TableDesc(inputFormatClass, outputFormatClass, properties) + } def getStringWritableConstantObjectInspector(value: Any): ObjectInspector = PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(