#include #include #include #ifdef MSWIN #include typedef HANDLE THREAD_HANDLE; #else #include typedef pthread_t THREAD_HANDLE; #define DISPLAY_PTHREAD_ERROR( THREADI, SOURCE_FILE, SOURCE_LINE ) \ perror( "ERROR *** Unable to create a new thread " ); \ fprintf( stderr, " threadid %d, %s:%d.\n", THREADI, SOURCE_FILE, SOURCE_LINE ); #endif static const off_t blocked_compress_pattern = 0x1827364554637281; #define my_ceil( a, b ) ( (a)/(b) + ((a)%(b) > 0) ) class Compress_MT_datastruct{ public: uchar *compressed_buffer, *uncompressed_buffer; uint thread_id, num_threads, work_done, start_block, end_block; uint *compressed_block_partitions, *uncompressed_block_partitions, *compressed_block_sizes; Compress_MT_datastruct(){ init(); } void init(){ compressed_buffer = uncompressed_buffer = NULL; compressed_block_partitions = uncompressed_block_partitions = NULL; thread_id = num_threads = start_block = end_block = 0; work_done = 0; } void init_thread_data( uint in_thread_id, uint in_num_threads, uint num_compress_blocks, uint * in_compressed_block_partitions, uint * in_uncompressed_block_partitions, uint * in_compressed_block_sizes, uchar * in_compressed_buffer, uchar * in_uncompressed_buffer ){ init(); thread_id = in_thread_id; num_threads = in_num_threads; uint num_blocks_per_thread = my_ceil( num_compress_blocks, num_threads ); start_block = thread_id * num_blocks_per_thread; end_block = my_min( (thread_id + 1) * num_blocks_per_thread, num_compress_blocks ); compressed_buffer = in_compressed_buffer; uncompressed_buffer = in_uncompressed_buffer; compressed_block_partitions = in_compressed_block_partitions; uncompressed_block_partitions = in_uncompressed_block_partitions; compressed_block_sizes = in_compressed_block_sizes; work_done = 0; } void compress_datastruct(){ work_done = 1; for( uint i = start_block; i < end_block; i++ ){ ulong compressed_block_size = compressed_block_sizes[i]; if( compress2( compressed_buffer + compressed_block_partitions[i], &compressed_block_size, uncompressed_buffer + uncompressed_block_partitions[i], uncompressed_block_partitions[i+1] - uncompressed_block_partitions[i], 9 ) != Z_OK ) work_done = 0; compressed_block_sizes[i] = compressed_block_size; } } void uncompress_datastruct(){ uint compressed_block_start = 0; for( uint i = 0; i < start_block; i++ ) compressed_block_start += compressed_block_sizes[i]; work_done = 1; for( uint i = start_block; i < end_block; i++ ){ ulong uncompressed_size = uncompressed_block_partitions[i+1] - uncompressed_block_partitions[i]; if( uncompress( uncompressed_buffer + uncompressed_block_partitions[i], &uncompressed_size, compressed_buffer + compressed_block_start, compressed_block_sizes[i] ) != Z_OK ) work_done = 0; compressed_block_start += compressed_block_sizes[i]; } } }; int get_num_processing_cores(); template int wait_until_processing_threads_are_done_then_cleanup( int num_threads, THREAD_HANDLE * &threads, THREAD_DATASTUCT * &thread_para ); int compress_MT( uchar * compressed_buffer, ulong &compressed_size, uchar * uncompressed_buffer, ulong uncompressed_size ); int uncompress_MT( uchar *& uncompressed_buffer, ulong &uncompressed_size, uchar * compressed_buffer, ulong compressed_size ); #ifdef MSWIN DWORD WINAPI compress_MT_thread( LPVOID thread_arg ){ #else static void * compress_MT_thread(void * thread_arg){ #endif Compress_MT_datastruct * compress_args = (Compress_MT_datastruct *)thread_arg; compress_args -> compress_datastruct(); return NULL; } #define CHECK_WORK_DONE \ if( !work_done ){ \ fprintf( stderr, "%s:%d The procedure '%s' did not complete successfully. \n" \ "The destination buffer might be of insufficient size.\n", __FILE__, __LINE__, __FUNCTION__ ); \ return 1; \ } int compress_MT( uchar * compressed_buffer, ulong & compressed_size, uchar * uncompressed_buffer, ulong uncompressed_size ){ uint num_compress_MT_blocks, min_block_size = 320*240; num_compress_MT_blocks = my_ceil( uncompressed_size, min_block_size ); // uncompressed_size / min_block_size + ( uncompressed_size % min_block_size > 0 ); num_compress_MT_blocks = my_min( num_compress_MT_blocks, 8 ); uint * compressed_block_partitions = new uint[num_compress_MT_blocks + 1]; uint * uncompressed_block_partitions = new uint[num_compress_MT_blocks + 1]; uint * compressed_block_sizes = new uint[num_compress_MT_blocks + 1]; int uncompressed_block_size = my_ceil( uncompressed_size, num_compress_MT_blocks ); int compressed_block_size = my_ceil( compressed_size, num_compress_MT_blocks ); for( uint i = 0; i < num_compress_MT_blocks + 1; i++ ){ uncompressed_block_partitions[i] = my_min( i * uncompressed_block_size, uncompressed_size ); compressed_block_partitions[i] = my_min( i * compressed_block_size, compressed_size ); compressed_block_sizes[i] = compressed_block_size; } uchar * compress_dest = compressed_buffer; memcpy( compress_dest, &blocked_compress_pattern, sizeof(blocked_compress_pattern) ); compress_dest = compressed_buffer + sizeof(blocked_compress_pattern); memcpy( compress_dest, &num_compress_MT_blocks, sizeof(int) ); int header_size = sizeof(blocked_compress_pattern) + sizeof(int) + sizeof(uint)*(2*num_compress_MT_blocks); uchar * compressed_buffer_post_header = compressed_buffer + header_size; int num_threads = my_min( get_num_processing_cores(), num_compress_MT_blocks ); Compress_MT_datastruct * thread_datastruct = new Compress_MT_datastruct[num_threads]; for( int i = 0; i < num_threads; i++ ){ thread_datastruct[i].init_thread_data( i, num_threads, num_compress_MT_blocks, compressed_block_partitions, uncompressed_block_partitions, compressed_block_sizes, compressed_buffer_post_header, uncompressed_buffer ); } THREAD_HANDLE * threads = new THREAD_HANDLE[num_threads]; for( int i = 0; i < num_threads; i++ ){ #ifdef MSWIN threads[i] = CreateThread( NULL, 0, compress_MT_thread, (LPVOID)(thread_datastruct + i), 0, NULL ); #else int pthread_err = pthread_create( &threads[i], NULL, compress_MT_thread, (void *)(thread_datastruct + i) ); if( pthread_err != 0 ){ DISPLAY_PTHREAD_ERROR( i, __FILE__, __LINE__ ) threads[i] = NULL; } #endif } int work_done = wait_until_processing_threads_are_done_then_cleanup( num_threads, threads, thread_datastruct ); CHECK_WORK_DONE compress_dest = compressed_buffer_post_header + compressed_block_sizes[0]; compressed_size = header_size + compressed_block_sizes[0]; for( int i = 1; i < num_compress_MT_blocks; i++ ){ memcpy( compress_dest, compressed_buffer_post_header + compressed_block_partitions[i], compressed_block_sizes[i] ); compress_dest += compressed_block_sizes[i]; compressed_size += compressed_block_sizes[i]; } compress_dest = compressed_buffer + sizeof(blocked_compress_pattern) + sizeof(int); memcpy( compress_dest, uncompressed_block_partitions + 1, sizeof(uint) * num_compress_MT_blocks ); compress_dest += sizeof(uint) * num_compress_MT_blocks; memcpy( compress_dest, compressed_block_sizes, sizeof(uint) * num_compress_MT_blocks ); delete [] compressed_block_partitions; delete [] uncompressed_block_partitions; delete [] compressed_block_sizes; return 0; } #ifdef MSWIN DWORD WINAPI uncompress_MT_thread( LPVOID thread_arg ){ #else static void * uncompress_MT_thread(void * thread_arg){ #endif Compress_MT_datastruct * compress_args = (Compress_MT_datastruct *)thread_arg; compress_args -> uncompress_datastruct(); return NULL; } int uncompress_MT( uchar *& uncompressed_buffer, ulong & uncompressed_size, uchar * compressed_buffer, ulong compressed_size ){ uchar * compress_dest = compressed_buffer; off_t input_compress_pattern = 0; memcpy( &input_compress_pattern, compress_dest, sizeof(blocked_compress_pattern) ); if( input_compress_pattern != blocked_compress_pattern ) return uncompress( uncompressed_buffer, &uncompressed_size, compressed_buffer, compressed_size ); int num_compress_MT_blocks = 0; compress_dest = compressed_buffer + sizeof(blocked_compress_pattern); memcpy( &num_compress_MT_blocks, compress_dest, sizeof(int) ); uint * uncompressed_block_partitions = new uint[num_compress_MT_blocks + 1]; uint * compressed_block_sizes = new uint[num_compress_MT_blocks + 1]; uncompressed_block_partitions[0] = 0; compress_dest = compressed_buffer + sizeof(blocked_compress_pattern) + sizeof(int); memcpy( uncompressed_block_partitions + 1, compress_dest, sizeof(uint) * num_compress_MT_blocks ); ulong uncompressed_input_size = uncompressed_block_partitions[num_compress_MT_blocks]; #define CLEANUP_ARRAYS \ delete [] uncompressed_block_partitions; \ delete [] compressed_block_sizes; if( uncompressed_buffer != NULL && uncompressed_size < uncompressed_input_size ){ CLEANUP_ARRAYS uncompressed_size = uncompressed_input_size; return Z_BUF_ERROR; } uncompressed_size = uncompressed_input_size; if( uncompressed_buffer == NULL ){ uncompressed_buffer = new uchar[uncompressed_size]; if( uncompressed_buffer == NULL ){ CLEANUP_ARRAYS return Z_MEM_ERROR; } } compress_dest += sizeof(uint) * num_compress_MT_blocks; memcpy( compressed_block_sizes, compress_dest, sizeof(uint) * num_compress_MT_blocks ); int header_size = sizeof(blocked_compress_pattern) + sizeof(int) + sizeof(uint)*(2*num_compress_MT_blocks); uint compressed_block_sizes_total = header_size; for( int i = 0; i < num_compress_MT_blocks; i++ ) compressed_block_sizes_total += compressed_block_sizes[i]; if( compressed_block_sizes_total != compressed_size ){ fprintf(stderr, "%s:%d Mismatch in compressed sizes %u vs. %lu", __FILE__, __LINE__, compressed_block_sizes_total, compressed_size ); return Z_DATA_ERROR; } uchar * compressed_buffer_post_header = compressed_buffer + header_size; int num_threads = my_min( get_num_processing_cores(), num_compress_MT_blocks ); //fprintf(stderr, "%s:%d Number processing threads: %d \n", __FILE__, __LINE__, num_threads); Compress_MT_datastruct * thread_datastruct = new Compress_MT_datastruct[num_threads]; for( int i = 0; i < num_threads; i++ ){ thread_datastruct[i].init_thread_data( i, num_threads, num_compress_MT_blocks, NULL, uncompressed_block_partitions, compressed_block_sizes, compressed_buffer_post_header, uncompressed_buffer ); } THREAD_HANDLE * threads = new THREAD_HANDLE[num_threads]; for( int i = 0; i < num_threads; i++ ){ #ifdef MSWIN threads[i] = CreateThread( NULL, 0, uncompress_MT_thread, (LPVOID)(thread_datastruct + i), 0, NULL ); #else int pthread_err = pthread_create( &threads[i], NULL, uncompress_MT_thread, (void *)(thread_datastruct + i) ); if( pthread_err != 0 ){ DISPLAY_PTHREAD_ERROR( i, __FILE__, __LINE__ ) threads[i] = NULL; } #endif } int work_done = wait_until_processing_threads_are_done_then_cleanup( num_threads, threads, thread_datastruct ); CHECK_WORK_DONE delete [] uncompressed_block_partitions; delete [] compressed_block_sizes; return Z_OK; } template int wait_until_processing_threads_are_done_then_cleanup( int num_threads, THREAD_HANDLE * &threads, THREAD_DATASTUCT * &thread_para ){ #ifdef MSWIN WaitForMultipleObjects(num_threads, threads, TRUE, INFINITE); for( int threadid = 0; threadid < num_threads; threadid++ ) CloseHandle( threads[threadid] ); #else for( int threadid = 0; threadid < num_threads; threadid++ ){ if( threads[threadid] != NULL ) pthread_join( threads[threadid], NULL ); } #endif int work_done = 1; for( int threadid = 0; threadid < num_threads; threadid++ ) if( !thread_para[threadid].work_done ) work_done = 0; delete [] threads; threads = NULL; for( int threadid = 0; threadid < num_threads; threadid++ ) thread_para[threadid].init(); delete [] thread_para; thread_para = NULL; return work_done; } int get_num_processing_cores(){ #ifdef MSWIN SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); return sysinfo.dwNumberOfProcessors; #else #ifdef MACOS int nm[2]; size_t len = 4; uint32_t count; nm[0] = CTL_HW; nm[1] = HW_AVAILCPU; sysctl(nm, 2, &count, &len, NULL, 0); if(count < 1) { nm[1] = HW_NCPU; sysctl(nm, 2, &count, &len, NULL, 0); if(count < 1) { count = 1; } } return count; #else return sysconf(_SC_NPROCESSORS_ONLN); #endif #endif }